diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java index 362ea8ef8ea..d2a8cf35422 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java @@ -151,6 +151,16 @@ public synchronized boolean isStarted() { */ public abstract int getLargeMessageThreshold(); + /** + * @return the true if the federation should ignore filters on queue consumers. + */ + public abstract boolean isIgnoreQueueConsumerFilters(); + + /** + * @return the true if the federation should ignore priorities on queue consumers. + */ + public abstract boolean isIgnoreQueueConsumerPriorities(); + /** * @return the true if the federation should support core message tunneling. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java index c551a4ae7fb..f6bc4f9e2dd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Divert; @@ -31,9 +32,11 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +62,16 @@ public AMQPFederationAddressPolicyManager(AMQPFederation federation, FederationR @Override protected FederationGenericConsumerInfo createConsumerInfo(AddressInfo address) { - return FederationGenericConsumerInfo.build(address.getName().toString(), - generateQueueName(address), - address.getRoutingType(), - remoteQueueFilter, - federation, - policy); + final String addressName = address.getName().toString(); + final String generatedQueueName = generateQueueName(address); + + return new FederationGenericConsumerInfo(Role.ADDRESS_CONSUMER, + addressName, + generatedQueueName, + address.getRoutingType(), + remoteQueueFilter, + CompositeAddress.toFullyQualified(addressName, generatedQueueName), + ActiveMQDefaultConfiguration.getDefaultConsumerPriority()); } protected String generateQueueName(AddressInfo address) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java index 77efed6192f..f549878323b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java @@ -21,6 +21,8 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import java.util.Collections; import java.util.HashMap; @@ -52,6 +54,20 @@ public final class AMQPFederationConfiguration { */ public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true; + /** + * Default value for the filtering applied to federation queue consumers that controls if + * the filter specified by a consumer subscription is used or if the higher level queue + * filter only is applied when creating a federation queue consumer. + */ + private static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_FILTERS = false; + + /** + * Default value for the priority applied to federation queue consumers that controls if + * the priority specified by a consumer subscription is used or if the policy priority + * offset value is simply applied to the default consumer priority value. + */ + private static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES = false; + private final Map properties; private final AMQPConnectionContext connection; @@ -138,6 +154,34 @@ public boolean isCoreMessageTunnelingEnabled() { } } + /** + * @return true if federation is configured to ignore filters on individual queue consumers + */ + public boolean isIgnoreSubscriptionFilters() { + final Object property = properties.get(IGNORE_QUEUE_CONSUMER_FILTERS); + if (property instanceof Boolean) { + return (Boolean) property; + } else if (property instanceof String) { + return Boolean.parseBoolean((String) property); + } else { + return DEFAULT_IGNNORE_QUEUE_CONSUMER_FILTERS; + } + } + + /** + * @return true if federation is configured to ignore priorities on individual queue consumers + */ + public boolean isIgnoreSubscriptionPriorities() { + final Object property = properties.get(IGNORE_QUEUE_CONSUMER_PRIORITIES); + if (property instanceof Boolean) { + return (Boolean) property; + } else if (property instanceof String) { + return Boolean.parseBoolean((String) property); + } else { + return DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES; + } + } + /** * Enumerate the configuration options in this configuration object and return a {@link Map} that * contains the values which can be sent to a remote peer @@ -151,6 +195,8 @@ public Map toConfigurationMap() { configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow()); configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold()); configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout()); + configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, isIgnoreSubscriptionFilters()); + configMap.put(IGNORE_QUEUE_CONSUMER_PRIORITIES, isIgnoreSubscriptionPriorities()); configMap.put(AmqpSupport.TUNNEL_CORE_MESSAGES, isCoreMessageTunnelingEnabled()); return configMap; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java index cefce21e6f7..30e94a2be91 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java @@ -84,6 +84,27 @@ public final class AMQPFederationConstants { */ public static final String LARGE_MESSAGE_THRESHOLD = "minLargeMessageSize"; + /** + * Configuration property used to convey the local side value to use when considering if federation queue + * consumers should filter using the filters defined on individual queue subscriptions, this can be sent + * to the peer so that dual federation configurations share the same configuration on both sides of the + * connection. This can be used to prevent multiple subscriptions on the same queue based on local demand + * with differing subscription filters but does imply that message that don't match those filters would + * be federated to the local broker. + */ + public static final String IGNORE_QUEUE_CONSUMER_FILTERS = "ignoreQueueConsumerFilters"; + + /** + * Configuration property used to convey the local side value to use when considering if federation queue + * consumers should apply a consumer priority offset based on the subscription priority or should use a + * singular priority offset based on policy configuration. This can be sent to the peer so that dual + * federation configurations share the same configuration on both sides of the connection. This can be + * used to prevent multiple subscriptions on the same queue based on local demand with differing consumer + * priorities but does imply that care needs to be taken to ensure remote consumers would normally have + * a higher priority value than the configured default priority offset. + */ + public static final String IGNORE_QUEUE_CONSUMER_PRIORITIES = "ignoreQueueConsumerPriorities"; + /** * A desired capability added to the federation queue receiver link that must be offered * in return for a federation queue receiver to be successfully opened. On the remote the diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java index 96c4bfc2267..0f5dfd0c1c4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.protocol.amqp.connect.federation; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS; @@ -104,4 +106,26 @@ public boolean isCoreMessageTunnelingEnabled() { return federation.isCoreMessageTunnelingEnabled(); } } + + public boolean isIgnoreSubscriptionFilters() { + final Object property = properties.get(IGNORE_QUEUE_CONSUMER_FILTERS); + if (property instanceof Boolean) { + return (Boolean) property; + } else if (property instanceof String) { + return Boolean.parseBoolean((String) property); + } else { + return federation.isIgnoreQueueConsumerFilters(); + } + } + + public boolean isIgnoreSubscriptionPriorities() { + final Object property = properties.get(IGNORE_QUEUE_CONSUMER_PRIORITIES); + if (property instanceof Boolean) { + return (Boolean) property; + } else if (property instanceof String) { + return Boolean.parseBoolean((String) property); + } else { + return federation.isIgnoreQueueConsumerPriorities(); + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java index 8c386ab41f9..086ce23c014 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Predicate; @@ -98,6 +99,11 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal { private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}; + + // Sequence ID value used to keep links that would otherwise have the same name from overlapping + // this generally occurs when consumers on the same queue have differing filters. + private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong(); + private final AMQPFederation federation; private final AMQPFederationConsumerConfiguration configuration; private final FederationConsumerInfo consumerInfo; @@ -250,7 +256,8 @@ private void signalAfterFederationConsumerMessageHandled(Message message) throws private String generateLinkName() { return "federation-" + federation.getName() + "-queue-receiver-" + consumerInfo.getFqqn() + - "-" + federation.getServer().getNodeID(); + "-" + federation.getServer().getNodeID() + ":" + + LINK_SEQUENCE_ID.getAndIncrement(); } private void asyncCreateReceiver() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java index 3d5b9adae50..343a305b7af 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java @@ -21,14 +21,20 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; +import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +55,28 @@ public AMQPFederationQueuePolicyManager(AMQPFederation federation, FederationRec this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties()); } + @Override + protected FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) { + final Queue queue = consumer.getQueue(); + final String queueName = queue.getName().toString(); + final String address = queue.getAddress().toString(); + + final int priority = configuration.isIgnoreSubscriptionPriorities() ? + ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + policy.getPriorityAjustment() : + consumer.getPriority() + policy.getPriorityAjustment(); + + final String filterString = + selectFilter(queue.getFilter(), configuration.isIgnoreSubscriptionFilters() ? null : consumer.getFilter()); + + return new FederationGenericConsumerInfo(Role.QUEUE_CONSUMER, + address, + queueName, + queue.getRoutingType(), + filterString, + CompositeAddress.toFullyQualified(address, queueName), + priority); + } + @Override protected FederationConsumerInternal createFederationConsumer(FederationConsumerInfo consumerInfo) { Objects.requireNonNull(consumerInfo, "Federation Queue consumer information object was null"); @@ -132,4 +160,12 @@ protected final boolean isPluginBlockingFederationConsumerCreate(Queue queue) { return !canCreate.get(); } + + private static String selectFilter(Filter queueFilter, Filter consumerFilter) { + if (consumerFilter != null) { + return consumerFilter.getFilterString().toString(); + } else { + return queueFilter != null ? queueFilter.getFilterString().toString() : null; + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueSenderController.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueSenderController.java index 5333a250885..416a1a56786 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueSenderController.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueSenderController.java @@ -92,22 +92,6 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception { // indicated it was desired, however unless offered by the remote we cannot use it. sender.setDesiredCapabilities(new Symbol[] {AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}); - // An queue receiver may supply a filter if the queue being federated had a filter attached - // to it at creation, this ensures that we only bring back message that match the original - // queue filter and not others that would simply increase traffic for no reason. - final Map.Entry filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); - - if (filter != null) { - selector = filter.getValue().getDescribed().toString(); - try { - SelectorParser.parse(selector); - } catch (FilterException e) { - throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION); - } - } else { - selector = null; - } - final RoutingType routingType = getRoutingType(source); final SimpleString targetAddress; final SimpleString targetQueue; @@ -131,6 +115,29 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception { throw new ActiveMQAMQPNotFoundException("Queue: '" + targetQueue + "' is not mapped to specified address: " + targetAddress); } + // An queue receiver may supply a filter if the queue being federated had a filter attached + // to it at creation, this ensures that we only bring back message that match the original + // queue filter and not others that would simply increase traffic for no reason. + final Map.Entry filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); + + if (filter != null) { + final String filterString = filter.getValue().getDescribed().toString(); + try { + SelectorParser.parse(filterString); + } catch (FilterException e) { + throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION); + } + + // No need to apply another filter if the current one on the Queue already matches that. + if (result.getFilterString() == null || !filterString.equals(result.getFilterString().toString())) { + selector = filterString; + } else { + selector = null; + } + } else { + selector = null; + } + // We need to check that the remote offers its ability to read tunneled core messages and // if not we must not send them but instead convert all messages to AMQP messages first. tunnelCoreMessages = verifyOfferedCapabilities(sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java index b9cd2044f17..d68fc6aa8c7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java @@ -183,6 +183,25 @@ public boolean isCoreMessageTunnelingEnabled() { return configuration.isCoreMessageTunnelingEnabled(); } + + @Override + public boolean isIgnoreQueueConsumerFilters() { + if (!connected) { + throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); + } + + return configuration.isIgnoreSubscriptionFilters(); + } + + @Override + public boolean isIgnoreQueueConsumerPriorities() { + if (!connected) { + throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); + } + + return configuration.isIgnoreSubscriptionPriorities(); + } + /** * Adds a new {@link FederationReceiveFromQueuePolicy} entry to the set of policies that the * remote end of this federation will use to create demand on the this server when local diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java index 9ebcfe64857..581dd477450 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java @@ -91,6 +91,16 @@ public boolean isCoreMessageTunnelingEnabled() { return configuration.isCoreMessageTunnelingEnabled(); } + @Override + public boolean isIgnoreQueueConsumerFilters() { + return configuration.isIgnoreSubscriptionFilters(); + } + + @Override + public boolean isIgnoreQueueConsumerPriorities() { + return configuration.isIgnoreSubscriptionPriorities(); + } + @Override protected void handleFederationStarted() throws ActiveMQException { // Tag the session with Federation metadata which will allow local federation policies sent by diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java index 10439dcdb3a..a473b2ee00c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java @@ -22,19 +22,14 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.Federation; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy; -import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.utils.CompositeAddress; /** - * Information and identification class for Federation consumers created to - * federate queues. Instances of this class should be usable in Collections + * Information and identification class for Federation consumers created to federate + * queues and addresses. Instances of this class should be usable in Collections * classes where equality and hashing support is needed. */ public class FederationGenericConsumerInfo implements FederationConsumerInfo { @@ -51,8 +46,8 @@ public class FederationGenericConsumerInfo implements FederationConsumerInfo { private final int priority; private final String id; - protected FederationGenericConsumerInfo(Role role, String address, String queueName, RoutingType routingType, - String filterString, String fqqn, int priority) { + public FederationGenericConsumerInfo(Role role, String address, String queueName, RoutingType routingType, + String filterString, String fqqn, int priority) { this.role = role; this.address = address; this.queueName = queueName; @@ -63,34 +58,6 @@ protected FederationGenericConsumerInfo(Role role, String address, String queueN this.id = UUID.randomUUID().toString(); } - /** - * Factory for creating federation queue consumer information objects from server resources. - * - * @param consumer - * The {@link ServerConsumer} that this federation consumer is created for - * @param federation - * The parent {@link Federation} that this federation consumer is created for - * @param policy - * The {@link FederationReceiveFromQueuePolicy} that triggered this information object to be created. - * - * @return a newly created and configured {@link FederationConsumerInfo} instance. - */ - public static FederationGenericConsumerInfo build(ServerConsumer consumer, Federation federation, FederationReceiveFromQueuePolicy policy) { - final Queue queue = consumer.getQueue(); - final String queueName = queue.getName().toString(); - final String address = queue.getAddress().toString(); - final int priority = consumer.getPriority() + policy.getPriorityAjustment(); - final SimpleString filterString = Filter.toFilterString(queue.getFilter()); - - return new FederationGenericConsumerInfo(Role.QUEUE_CONSUMER, - address, - queueName, - queue.getRoutingType(), - filterString != null ? filterString.toString() : null, - CompositeAddress.toFullyQualified(address, queueName), - priority); - } - /** * Factory for creating federation address consumer information objects from server resources. * diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java index 6950d6278fd..23330c44790 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java @@ -56,7 +56,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons protected final ActiveMQServer server; protected final Predicate federationConsumerMatcher; protected final FederationReceiveFromQueuePolicy policy; - protected final Map demandTracking = new HashMap<>(); + protected final Map demandTracking = new HashMap<>(); protected final FederationInternal federation; private volatile boolean started; @@ -116,7 +116,8 @@ public synchronized void afterCreateConsumer(ServerConsumer consumer) { public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) { if (started) { final String queueName = consumer.getQueue().getName().toString(); - final FederationQueueEntry entry = demandTracking.get(queueName); + final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer); + final FederationQueueEntry entry = demandTracking.get(consumerInfo); if (entry == null) { return; @@ -134,7 +135,7 @@ public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean fai federationConsuner.close(); signalAfterCloseFederationConsumer(federationConsuner); } finally { - demandTracking.remove(queueName); + demandTracking.remove(consumerInfo); } } } @@ -144,11 +145,13 @@ public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean fai public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { if (binding instanceof QueueBinding) { final QueueBinding queueBinding = (QueueBinding) binding; - final FederationQueueEntry entry = demandTracking.remove(queueBinding.getQueue().getName().toString()); + final String queueName = queueBinding.getQueue().getName().toString(); - if (entry != null && entry.hasConsumer()) { - entry.getConsumer().close(); - } + demandTracking.values().forEach((entry) -> { + if (entry.getConsumerInfo().getQueueName().equals(queueName) && entry.hasConsumer()) { + entry.getConsumer().close(); + } + }); } } @@ -182,14 +185,15 @@ protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding()); final FederationQueueEntry entry; + final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer); // Check for existing consumer add demand from a additional local consumer to ensure // the remote consumer remains active until all local demand is withdrawn. - if (demandTracking.containsKey(queueName)) { + if (demandTracking.containsKey(consumerInfo)) { logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", queueName); - entry = demandTracking.get(queueName); + entry = demandTracking.get(consumerInfo); } else { - demandTracking.put(queueName, entry = createConsumerEntry(createConsumerInfo(consumer))); + demandTracking.put(consumerInfo, entry = createConsumerEntry(consumerInfo)); } // Demand passed all binding plugin blocking checks so we track it, plugin can still @@ -215,7 +219,7 @@ private void tryCreateFederationConsumerForQueue(FederationQueueEntry queueEntry queueConsumer.setRemoteClosedHandler((closedConsumer) -> { synchronized (this) { try { - final FederationQueueEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo().getQueueName()); + final FederationQueueEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo()); if (tracked != null) { tracked.clearConsumer(); @@ -254,11 +258,15 @@ public synchronized void afterRemoteQueueAdded(String addressName, String queueN // the queue were recreated such that a match could be made. We retain all the current // demand and don't need to re-check the server state before trying to create the // remote queue consumer. - if (started && testIfQueueMatchesPolicy(queueName) && demandTracking.containsKey(queueName)) { + if (started && testIfQueueMatchesPolicy(queueName)) { final Queue queue = server.locateQueue(queueName); if (queue != null) { - tryCreateFederationConsumerForQueue(demandTracking.get(queueName), queue); + demandTracking.forEach((k, v) -> { + if (k.getQueueName().equals(queueName)) { + tryCreateFederationConsumerForQueue(v, queue); + } + }); } } } @@ -297,7 +305,7 @@ protected boolean testIfQueueMatchesPolicy(String queueName) { /** * Create a new {@link FederationConsumerInfo} based on the given {@link ServerConsumer} - * and the configured {@link FederationReceiveFromQueuePolicy}. A subclass can override this + * and the configured {@link FederationReceiveFromQueuePolicy}. A subclass must override this * method to return a consumer information object with additional data used be that implementation. * * @param consumer @@ -305,9 +313,7 @@ protected boolean testIfQueueMatchesPolicy(String queueName) { * * @return a new {@link FederationConsumerInfo} instance based on the server consumer */ - protected FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) { - return FederationGenericConsumerInfo.build(consumer, federation, policy); - } + protected abstract FederationConsumerInfo createConsumerInfo(ServerConsumer consumer); /** * Creates a {@link FederationQueueEntry} instance that will be used to store an instance of diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java index 7870b400b00..201833245de 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java @@ -264,6 +264,16 @@ public void testFederationConnectsToSecondPeerWhenConfigurationUpdatedWithNewCon .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) .respondInKind(); peer2.expectFlow().withLinkCredit(10); + peer2.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString(getTestName() + ":2"), + containsString("test"), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), expectedSourceProperties) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer2.expectFlow().withLinkCredit(1000); peer2.start(); final URI remoteURI2 = peer2.getServerURI(); @@ -293,18 +303,6 @@ public void testFederationConnectsToSecondPeerWhenConfigurationUpdatedWithNewCon protocolFactory.updateProtocolServices(server, Collections.emptyList()); - peer2.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer2.expectAttach().ofReceiver() - .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) - .withName(allOf(containsString(getTestName() + ":2"), - containsString("test"), - containsString("address-receiver"), - containsString(server.getNodeID().toString()))) - .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), expectedSourceProperties) - .respond() - .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); - peer2.expectFlow().withLinkCredit(1000); - peer2.waitForScriptToComplete(5, TimeUnit.SECONDS); peer2.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java index bf43b52e1bf..11bcf8c6420 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java @@ -40,6 +40,8 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.containsString; @@ -141,12 +143,16 @@ public void testFederationConfiguredCreatesControlLink() throws Exception { final int AMQP_CREDITS_LOW = 50; final int AMQP_LINK_ATTACH_TIMEOUT = 60; final boolean AMQP_TUNNEL_CORE_MESSAGES = false; + final boolean AMQP_INGNORE_CONSUMER_FILTERS = false; + final boolean AMQP_INGNORE_CONSUMER_PRIORITIES = false; final Map federationConfiguration = new HashMap<>(); federationConfiguration.put(RECEIVER_CREDITS, AMQP_CREDITS); federationConfiguration.put(RECEIVER_CREDITS_LOW, AMQP_CREDITS_LOW); federationConfiguration.put(LARGE_MESSAGE_THRESHOLD, AMQP_MIN_LARGE_MESSAGE_SIZE); federationConfiguration.put(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT); + federationConfiguration.put(IGNORE_QUEUE_CONSUMER_FILTERS, AMQP_INGNORE_CONSUMER_FILTERS); + federationConfiguration.put(IGNORE_QUEUE_CONSUMER_PRIORITIES, AMQP_INGNORE_CONSUMER_PRIORITIES); federationConfiguration.put(AmqpSupport.TUNNEL_CORE_MESSAGES, AMQP_TUNNEL_CORE_MESSAGES); try (ProtonTestServer peer = new ProtonTestServer()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index c8e07d26c3d..fc341bc0bae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -24,6 +24,8 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_RECEIVER_PRIORITY; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME; @@ -285,6 +287,180 @@ public void testFederationQueueReceiverCarriesConfiguredQueueFilter() throws Exc } } + @Test(timeout = 20000) + public void testFederationQueueReceiverCarriesConsumerQueueFilter() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setFilterString("color='red' OR color = 'blue'") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withSource().withJMSSelector("color='red'").and() + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + session.createConsumer(queue, "color='red'"); // new receiver for this selector + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withSource().withJMSSelector("color='blue'").and() + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + session.createConsumer(queue, "color='blue'"); // new receiver for this selector + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withSource().withJMSSelector("color='red' OR color = 'blue'").and() + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + session.createConsumer(queue); // No selector so the queue filter should be used + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Should not see more attaches as these match previous variations + session.createConsumer(queue, "color='blue'"); + session.createConsumer(queue, "color='red'"); + session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + + @Test(timeout = 20000) + public void testFederationQueueReceiverCanIgnoreConsumerQueueFilter() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + receiveFromQueue.addProperty(IGNORE_QUEUE_CONSUMER_FILTERS, "true"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setFilterString("color='red' OR color = 'blue'") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withSource().withJMSSelector("color='red' OR color = 'blue'").and() + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + session.createConsumer(queue, "color='red'"); // Consumer filter should be ignored + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + session.createConsumer(queue, "color='blue'"); // Consumer filter should be ignored + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + @Test(timeout = 20000) public void testFederationCreatesQueueReceiverLinkForQueueMatchUsingPolicyCredit() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { @@ -1208,6 +1384,90 @@ private void doTestFederationCreatesQueueReceiverLinkWithAdjustedPriority(int ad } } + @Test(timeout = 20000) + public void testFederationCreatesQueueReceiverLinkConsumerPriorityOffset() throws Exception { + doTestFederationCreatesQueueReceiverWithCorrectPriorityOffset(false); + } + + @Test(timeout = 20000) + public void testFederationCreatesQueueReceiverLinkIngoringConsumerPriorityOffset() throws Exception { + doTestFederationCreatesQueueReceiverWithCorrectPriorityOffset(true); + } + + private void doTestFederationCreatesQueueReceiverWithCorrectPriorityOffset(boolean ignoreConsumerPriority) throws Exception { + final int TEST_BASE_PRIORITY_ADJUSTMENT = -2; + final int TEST_CONSUMER_PRIORITY = 2; + + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.setPriorityAdjustment(TEST_BASE_PRIORITY_ADJUSTMENT); + receiveFromQueue.addToIncludes("test", "test"); + if (ignoreConsumerPriority) { + receiveFromQueue.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, Boolean.valueOf(ignoreConsumerPriority).toString()); + } + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + final int expectedPriority = + ignoreConsumerPriority ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + TEST_BASE_PRIORITY_ADJUSTMENT : + TEST_CONSUMER_PRIORITY + TEST_BASE_PRIORITY_ADJUSTMENT; + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("test::test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), expectedPriority) + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue("test?consumer-priority=" + TEST_CONSUMER_PRIORITY)); + + connection.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + @Test(timeout = 20000) public void testLinkCreatedForEachDistinctQueueMatchInSameConfiguredPolicyWithSameAddressMatch() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java index 3396ac2c58b..3165c149d53 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java @@ -1007,4 +1007,98 @@ private void testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient(S assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode()); } } + + @Test(timeout = 20000) + public void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemoteAMQP() throws Exception { + testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote("AMQP"); + } + + @Test(timeout = 20000) + public void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemoteCORE() throws Exception { + testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote("CORE"); + } + + private void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote(String clientProtocol) throws Exception { + logger.info("Test started: {}", getTestName()); + + final AMQPFederationQueuePolicyElement localQueuePolicy = new AMQPFederationQueuePolicyElement(); + localQueuePolicy.setName("test-policy"); + localQueuePolicy.addToIncludes("#", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(localQueuePolicy); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE); + amqpConnection.setReconnectAttempts(10);// Limit reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + remoteServer.start(); + remoteServer.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setFilterString("color='red' OR color='green' OR color='blue'") + .setAutoCreated(false)); + server.start(); + + final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT); + final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:" + SERVER_PORT_REMOTE); + + try (Connection connectionL = factoryLocal.createConnection(); + Connection connectionR = factoryRemote.createConnection()) { + + final Session sessionL = connectionL.createSession(Session.AUTO_ACKNOWLEDGE); + final Session sessionR = connectionR.createSession(Session.AUTO_ACKNOWLEDGE); + + final Queue queue = sessionL.createQueue("test"); + + final MessageConsumer consumerL1 = sessionL.createConsumer(queue, "color='red'"); + final MessageConsumer consumerL2 = sessionL.createConsumer(queue, "color='blue'"); + + connectionL.start(); + connectionR.start(); + + // Demand on local queue should trigger receiver on remote. + Wait.assertTrue(() -> server.queueQuery(SimpleString.toSimpleString("test")).isExists()); + + final MessageProducer producerR = sessionR.createProducer(queue); + + final TextMessage message1 = sessionR.createTextMessage("Hello World 1"); + message1.setStringProperty("color", "green"); + final TextMessage message2 = sessionR.createTextMessage("Hello World 2"); + message2.setStringProperty("color", "red"); + final TextMessage message3 = sessionR.createTextMessage("Hello World 3"); + message3.setStringProperty("color", "blue"); + + producerR.send(message1); + producerR.send(message2); + producerR.send(message3); + + final Message receivedL1 = consumerL1.receive(5_000); + assertNotNull(receivedL1); + assertTrue(receivedL1 instanceof TextMessage); + assertEquals("Hello World 2", ((TextMessage) receivedL1).getText()); + assertTrue(receivedL1.propertyExists("color")); + assertEquals("red", receivedL1.getStringProperty("color")); + + final Message receivedL2 = consumerL2.receive(5_000); + assertNotNull(receivedL2); + assertTrue(receivedL2 instanceof TextMessage); + assertEquals("Hello World 3", ((TextMessage) receivedL2).getText()); + assertTrue(receivedL2.propertyExists("color")); + assertEquals("blue", receivedL2.getStringProperty("color")); + + // See if the green message is still on the remote where it should be as the + // filter should prevent it from moving across the federation link(s) + final MessageConsumer consumerR = sessionR.createConsumer(queue, "color='green'"); + + final Message receivedR = consumerR.receive(5_000); + assertNotNull(receivedR); + assertTrue(receivedR instanceof TextMessage); + assertEquals("Hello World 1", ((TextMessage) receivedR).getText()); + assertTrue(receivedR.propertyExists("color")); + assertEquals("green", receivedR.getStringProperty("color")); + } + } }