Skip to content

Commit

Permalink
ARTEMIS-4570 filter not applied to all brokers in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
howardgao authored and clebertsuconic committed Jan 23, 2024
1 parent 4b78cab commit 07b0215
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum CoreNotificationType implements NotificationType {
SESSION_CREATED(26),
SESSION_CLOSED(27),
MESSAGE_DELIVERED(28),
MESSAGE_EXPIRED(29);
MESSAGE_EXPIRED(29),
BINDING_UPDATED(30);

private final int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean f
if ((forceUpdate || newFilter != oldFilter) && !Objects.equals(oldFilter, newFilter)) {
changed = true;
queue.setFilter(newFilter);
notifyBindingUpdatedForQueue(queueBinding);
}
if ((forceUpdate || queueConfiguration.isConfigurationManaged() != null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), queue.isConfigurationManaged())) {
changed = true;
Expand Down Expand Up @@ -836,6 +837,22 @@ public QueueBinding updateQueue(QueueConfiguration queueConfiguration, boolean f
}
}

public void notifyBindingUpdatedForQueue(QueueBinding binding) throws Exception {
//only the filter could be updated
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
Filter filter = binding.getFilter();
if (filter != null) {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
}
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());

String uid = UUIDGenerator.getInstance().generateStringUUID();
logger.debug("ClusterCommunication::Sending notification for updateBinding {} from server {}", binding, server);
managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_UPDATED, props));
}

@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.cluster;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;

Expand All @@ -34,5 +35,7 @@ public interface RemoteQueueBinding extends QueueBinding {

long getRemoteQueueID();

void setFilter(Filter filter);

MessageLoadBalancingType getMessageLoadBalancingType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ private void setupNotificationConsumer() throws Exception {
"', '" +
CoreNotificationType.BINDING_REMOVED +
"', '" +
CoreNotificationType.BINDING_UPDATED +
"', '" +
CoreNotificationType.CONSUMER_CREATED +
"', '" +
CoreNotificationType.CONSUMER_CLOSED +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.activemq.artemis.core.client.impl.TopologyManager;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
Expand Down Expand Up @@ -1134,6 +1135,10 @@ private void handleNotificationMessage(ClientMessage message) throws Exception {

break;
}
case BINDING_UPDATED: {
doBindingUpdated(message);
break;
}
case CONSUMER_CREATED: {
doConsumerCreated(message);

Expand Down Expand Up @@ -1269,6 +1274,22 @@ public synchronized void disconnectBindings() throws Exception {
}
}

private synchronized void doBindingUpdated(final ClientMessage message) throws Exception {
logger.trace("{} Update binding {}", ClusterConnectionImpl.this, message);
if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
throw new IllegalStateException("clusterName is null");
}

SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);

RemoteQueueBinding existingBinding = (RemoteQueueBinding) postOffice.getBinding(clusterName);

if (existingBinding != null) {
existingBinding.setFilter(FilterImpl.createFilter(filterString));
}
}

private synchronized void doBindingAdded(final ClientMessage message) throws Exception {
logger.trace("{} Adding binding {}", ClusterConnectionImpl.this, message);
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {

private final long remoteQueueID;

private final Filter queueFilter;

private final Set<Filter> filters = new HashSet<>();

private final Map<SimpleString, Integer> filterCounts = new HashMap<>();

private Filter queueFilter;

private int consumerCount;

private final SimpleString idsHeaderName;
Expand Down Expand Up @@ -351,6 +351,11 @@ public long getRemoteQueueID() {
return remoteQueueID;
}

@Override
public void setFilter(Filter filter) {
this.queueFilter = filter;
}

@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4322,7 +4322,9 @@ public QueueConfiguration getQueueConfiguration() {
.setTemporary(temporary)
.setInternal(internalQueue)
.setTransient(refCountForConsumers instanceof TransientQueueManagerImpl)
.setAutoCreated(autoCreated);
.setAutoCreated(autoCreated)
.setEnabled(enabled)
.setGroupRebalancePauseDispatch(groupRebalancePauseDispatch);
}

protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -86,6 +94,53 @@ public void testMessageLoadBalancingOff() throws Exception {
Assert.assertNull(clientMessage);
}

@Test
public void testMessageLoadBalancingWithFiltersUpdate() throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);

waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, false);

Binding[] bindings = new Binding[2];
PostOffice[] po = new PostOffice[2];
for (int i = 0; i < 2; i++) {
po[i] = servers[i].getPostOffice();
bindings[i] = po[i].getBinding(new SimpleString("queue0"));
Assert.assertNotNull(bindings[i]);

Queue queue0 = (Queue)bindings[i].getBindable();
Assert.assertNotNull(queue0);

QueueConfiguration qconfig = queue0.getQueueConfiguration();
Assert.assertNotNull(qconfig);

qconfig.setFilterString("color = 'red'");
po[i].updateQueue(qconfig, true);
}

SimpleString clusterName0 = bindings[1].getClusterName();
RemoteQueueBinding remoteBinding = (RemoteQueueBinding) po[0].getBinding(clusterName0);
Assert.assertNotNull(remoteBinding);

Wait.assertEquals("color = 'red'", () -> {
Filter filter = remoteBinding.getFilter();
if (filter == null) {
return filter;
}
return filter.getFilterString().toString();
});
}

protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.Test;

public class ConfigurationTest extends ActiveMQTestBase {

@Test
public void testStartWithDuplicateQueues() throws Exception {
ActiveMQServer server = getActiveMQServer("duplicate-queues.xml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ public long getRemoteQueueID() {
return 0;
}

@Override
public void setFilter(Filter filter) {
}

@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;
Expand Down

0 comments on commit 07b0215

Please sign in to comment.