diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java index 9338bd18a1b..dda6e95cd39 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java @@ -165,7 +165,7 @@ public synchronized void close() { if (started) { started = false; connection.runLater(() -> { - federation.removeLinkClosedInterceptor(consumerInfo.getFqqn()); + federation.removeLinkClosedInterceptor(consumerInfo.getId()); if (receiver != null) { try { @@ -350,7 +350,7 @@ private void asyncCreateReceiver() { // Intercept remote close and check for valid reasons for remote closure such as // the remote peer not having a matching queue for this subscription or from an // operator manually closing the link. - federation.addLinkClosedInterceptor(consumerInfo.getFqqn(), remoteCloseInterceptor); + federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseInterceptor); receiver = new AMQPFederatedAddressDeliveryReceiver(session, consumerInfo, protonReceiver); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java index ea2455c30cb..8c386ab41f9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java @@ -162,7 +162,7 @@ public synchronized void close() { if (started) { started = false; connection.runLater(() -> { - federation.removeLinkClosedInterceptor(consumerInfo.getFqqn()); + federation.removeLinkClosedInterceptor(consumerInfo.getId()); if (receiver != null) { try { @@ -341,7 +341,7 @@ private void asyncCreateReceiver() { // Intercept remote close and check for valid reasons for remote closure such as // the remote peer not having a matching queue for this subscription or from an // operator manually closing the link. - federation.addLinkClosedInterceptor(consumerInfo.getFqqn(), remoteCloseIntercepter); + federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseIntercepter); receiver = new AMQPFederatedQueueDeliveryReceiver(localQueue, protonReceiver); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java index f370f7e4e6e..15c21eed0e5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java @@ -38,6 +38,11 @@ enum Role { QUEUE_CONSUMER } + /** + * @return a unique Id for the consumer being represented. + */ + String getId(); + /** * @return the type of federation consumer being represented. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java index 10e4ddd81e6..dce40f8fb4a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java @@ -18,42 +18,59 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; /** * An entry type class used to hold a {@link FederationConsumerInternal} and * any other state data needed by the manager that is creating them based on the - * policy configuration for the federation instance. The entry can be extended + * policy configuration for the federation instance. The entry can be extended * by federation implementation to hold additional state data for the federation - * consumer and the managing of its lifetime. + * consumer and the managing active demand. * - * This entry type provides a reference counter that can be used to register demand + * This entry type provides reference tracking state for current demand (bindings) * on a federation resource such that it is not torn down until all demand has been * removed from the local resource. */ public class FederationAddressEntry { - private final FederationConsumerInternal consumer; - + private final AddressInfo addressInfo; private final Set demandBindings = new HashSet<>(); + private FederationConsumerInternal consumer; + /** - * Creates a new address entry with a single reference + * Creates a new address entry for tracking demand on a federated address * - * @param consumer - * The federation consumer that will be carried in this entry. + * @param addressInfo + * The address information object that this entry hold demand state for. */ - public FederationAddressEntry(FederationConsumerInternal consumer) { - this.consumer = consumer; + public FederationAddressEntry(AddressInfo addressInfo) { + this.addressInfo = addressInfo; + } + + /** + * @return the address information that this entry is acting to federate. + */ + public AddressInfo getAddressInfo() { + return addressInfo; } /** * @return the address that this entry is acting to federate. */ public String getAddress() { - return consumer.getConsumerInfo().getAddress(); + return addressInfo.getName().toString(); + } + + /** + * @return true if a consumer is currently set on this entry. + */ + public boolean hasConsumer() { + return consumer != null; } /** @@ -63,6 +80,30 @@ public FederationConsumerInternal getConsumer() { return consumer; } + /** + * Sets the consumer assigned to this entry to the given instance. + * + * @param consumer + * The federation consumer that is currently active for this entry. + * + * @return this federation address consumer entry. + */ + public FederationAddressEntry setConsumer(FederationConsumerInternal consumer) { + Objects.requireNonNull(consumer, "Cannot assign a null consumer to this entry, call clear to unset"); + this.consumer = consumer; + return this; + } + + /** + * Clears the currently assigned consumer from this entry. + * + * @return this federation address consumer entry. + */ + public FederationAddressEntry clearConsumer() { + this.consumer = null; + return this; + } + /** * @return true if there are bindings that are mapped to this federation entry. */ @@ -85,7 +126,7 @@ public FederationAddressEntry addDemand(Binding binding) { * * @return this federation address consumer entry. */ - public FederationAddressEntry removeDenamd(Binding binding) { + public FederationAddressEntry removeDemand(Binding binding) { demandBindings.remove(binding); return this; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java index 5157a2e7a56..8c3685c01bd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java @@ -64,8 +64,8 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi protected final ActiveMQServer server; protected final FederationInternal federation; protected final FederationReceiveFromAddressPolicy policy; - protected final Map remoteConsumers = new HashMap<>(); - protected final Map> matchingDiverts = new HashMap<>(); + protected final Map demandTracking = new HashMap<>(); + protected final Map> divertsTracking = new HashMap<>(); private volatile boolean started; @@ -101,9 +101,24 @@ public synchronized void stop() { if (started) { started = false; server.unRegisterBrokerPlugin(this); - remoteConsumers.forEach((k, v) -> v.getConsumer().close()); // Cleanup and recreate if ever reconnected. - remoteConsumers.clear(); - matchingDiverts.clear(); + demandTracking.forEach((k, v) -> { + if (v.hasConsumer()) { + v.getConsumer().close(); + } + }); + demandTracking.clear(); + divertsTracking.clear(); + } + } + + @Override + public synchronized void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException { + if (started) { + final FederationAddressEntry entry = demandTracking.remove(address.toString()); + + if (entry != null && entry.hasConsumer()) { + entry.getConsumer().close(); + } } } @@ -111,7 +126,7 @@ public synchronized void stop() { public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { if (started) { if (binding instanceof QueueBinding) { - final FederationAddressEntry entry = remoteConsumers.get(binding.getAddress().toString()); + final FederationAddressEntry entry = demandTracking.get(binding.getAddress().toString()); if (entry != null) { // This is QueueBinding that was mapped to a federated address so we can directly remove @@ -122,7 +137,7 @@ public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boo // is bound and remove the mapping for any matches, diverts can have a composite set of address // forwards so each divert must be checked in turn to see if it contains the address the removed // binding was bound to. - matchingDiverts.entrySet().forEach(divertEntry -> { + divertsTracking.entrySet().forEach(divertEntry -> { final String sourceAddress = divertEntry.getKey().getAddress().toString(); final SimpleString forwardAddress = divertEntry.getKey().getDivert().getForwardAddress(); @@ -134,7 +149,7 @@ public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boo divertEntry.getValue().remove(binding); if (divertEntry.getValue().isEmpty()) { - tryRemoveDemandOnAddress(remoteConsumers.get(sourceAddress), divertEntry.getKey()); + tryRemoveDemandOnAddress(demandTracking.get(sourceAddress), divertEntry.getKey()); } } }); @@ -142,14 +157,14 @@ public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boo } else if (policy.isEnableDivertBindings() && binding instanceof DivertBinding) { final DivertBinding divert = (DivertBinding) binding; - if (matchingDiverts.remove(divert) != null) { + if (divertsTracking.remove(divert) != null) { // The divert binding is treated as one unit of demand on a federated address and // when the divert is removed that unit of demand is removed regardless of existing // bindings still remaining on the divert forwards. If the divert demand was the // only thing keeping the federated address consumer open this will result in it // being closed. try { - tryRemoveDemandOnAddress(remoteConsumers.get(divert.getAddress().toString()), divert); + tryRemoveDemandOnAddress(demandTracking.get(divert.getAddress().toString()), divert); } catch (Exception e) { ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divert.getDivert().getForwardAddress(), e); } @@ -160,11 +175,11 @@ public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boo protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Binding binding) { if (entry != null) { - entry.removeDenamd(binding); + entry.removeDemand(binding); logger.trace("Reducing demand on federated address {}, remaining demand? {}", entry.getAddress(), entry.hasDemand()); - if (!entry.hasDemand()) { + if (!entry.hasDemand() && entry.hasConsumer()) { final FederationConsumerInternal federationConsuner = entry.getConsumer(); try { @@ -172,7 +187,7 @@ protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Bind federationConsuner.close(); signalAfterCloseFederationConsumer(federationConsuner); } finally { - remoteConsumers.remove(entry.getAddress()); + demandTracking.remove(entry.getAddress()); } } } @@ -200,7 +215,8 @@ public synchronized void afterAddAddress(AddressInfo addressInfo, boolean reload // match the divert. server.getPostOffice() .getDirectBindings(addressInfo.getName()) - .stream().filter(binding -> binding instanceof DivertBinding) + .stream() + .filter(binding -> binding instanceof DivertBinding) .forEach(this::checkBindingForMatch); } catch (Exception e) { ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e); @@ -260,9 +276,9 @@ protected final void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding div // We only need to check if we've never seen the divert before, afterwards we will // be checking it any time a new QueueBinding is added instead. - if (matchingDiverts.get(divertBinding) == null) { + if (divertsTracking.get(divertBinding) == null) { final Set matchingQueues = new HashSet<>(); - matchingDiverts.put(divertBinding, matchingQueues); + divertsTracking.put(divertBinding, matchingQueues); // We must account for the composite divert case by splitting the address and // getting the bindings on each one. @@ -306,7 +322,7 @@ protected final void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queu final SimpleString queueAddress = queueBinding.getAddress(); - matchingDiverts.entrySet().forEach((e) -> { + divertsTracking.entrySet().forEach((e) -> { final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress(); final DivertBinding divertBinding = e.getKey(); @@ -350,44 +366,62 @@ private static boolean isAddressInDivertForwards(final SimpleString targetAddres return false; } - protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo address, Binding binding) { - logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding); + protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo addressInfo, Binding binding) { + logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", addressInfo, binding); + + final String addressName = addressInfo.getName().toString(); + final FederationAddressEntry entry; // Check for existing consumer add demand from a additional local consumer to ensure - // the remote consumer remains active until all local demand is withdrawn. The federation - // plugin can block creation of the federation consumer at this stage. - if (remoteConsumers.containsKey(address.getName().toString())) { - logger.trace("Federation Address Policy manager found existing demand for address: {}, adding demand", address); - remoteConsumers.get(address.getName().toString()).addDemand(binding); - } else if (!isPluginBlockingFederationConsumerCreate(address)) { - logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address); - - final FederationConsumerInfo consumerInfo = createConsumerInfo(address); - final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); - final FederationAddressEntry entry = createConsumerEntry(queueConsumer); + // the remote consumer remains active until all local demand is withdrawn. + if (demandTracking.containsKey(addressName)) { + entry = demandTracking.get(addressName); + } else { + entry = new FederationAddressEntry(addressInfo); + demandTracking.put(addressName, entry); + } + + // Demand passed all binding plugin blocking checks so we track it, plugin can still + // stop federation of the address based on some external criteria but once it does + // (if ever) allow it we will have tracked all allowed demand. + entry.addDemand(binding); + + tryCreateFederationConsumerForAddress(entry); + } + + private void tryCreateFederationConsumerForAddress(FederationAddressEntry addressEntry) { + final AddressInfo addressInfo = addressEntry.getAddressInfo(); + + if (addressEntry.hasDemand() && !addressEntry.hasConsumer() && !isPluginBlockingFederationConsumerCreate(addressInfo)) { + logger.trace("Federation Address Policy manager creating remote consumer for address: {}", addressInfo); + + final FederationConsumerInfo consumerInfo = createConsumerInfo(addressInfo); + final FederationConsumerInternal addressConsumer = createFederationConsumer(consumerInfo); signalBeforeCreateFederationConsumer(consumerInfo); // Handle remote close with remove of consumer which means that future demand will // attempt to create a new consumer for that demand. Ensure that thread safety is // accounted for here as the notification can be asynchronous. - queueConsumer.setRemoteClosedHandler((closedConsumer) -> { + addressConsumer.setRemoteClosedHandler((closedConsumer) -> { synchronized (this) { try { - remoteConsumers.remove(closedConsumer.getConsumerInfo().getAddress()); + final FederationAddressEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo().getAddress()); + + if (tracked != null) { + tracked.clearConsumer(); + } } finally { closedConsumer.close(); } } }); - // Called under lock so state should stay in sync - remoteConsumers.put(entry.getAddress(), entry.addDemand(binding)); + addressEntry.setConsumer(addressConsumer); - // Now that we are tracking it we can start it - queueConsumer.start(); + addressConsumer.start(); - signalAfterCreateFederationConsumer(queueConsumer); + signalAfterCreateFederationConsumer(addressConsumer); } } @@ -404,21 +438,12 @@ protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInf public synchronized void afterRemoteAddressAdded(String addressName) throws Exception { // Assume that the remote address that matched a previous federation attempt is MULTICAST // so that we retry if current local state matches the policy and if it isn't we will once - // again record the federation attempt with the remote and but updated if the remote removes - // and adds the address again (hopefully with the correct routing type). - - if (started && testIfAddressMatchesPolicy(addressName, RoutingType.MULTICAST) && !remoteConsumers.containsKey(addressName)) { - final SimpleString address = SimpleString.toSimpleString(addressName); - - // Need to trigger check for all bindings that match to accumulate demand on the address - // if any and ensure an outgoing consumer is attempted. - - server.getPostOffice() - .getDirectBindings(address) - .stream() - .filter(binding -> binding instanceof QueueBinding || - (policy.isEnableDivertBindings() && binding instanceof DivertBinding)) - .forEach(this::checkBindingForMatch); + // again record the federation attempt with the remote and be updated if the remote removes + // and adds the address again (hopefully with the correct routing type). We retrain all the + // current demand and don't need to re-check the server state before trying to create the + // remote address consumer. + if (started && testIfAddressMatchesPolicy(addressName, RoutingType.MULTICAST) && demandTracking.containsKey(addressName)) { + tryCreateFederationConsumerForAddress(demandTracking.get(addressName)); } } @@ -470,13 +495,13 @@ protected boolean testIfAddressMatchesPolicy(String address, RoutingType type) { * instance lifetime. A subclass can override this method to return a more customized entry type with * additional state data. * - * @param consumer - * The {@link FederationConsumerInternal} instance that will be housed in this entry. + * @param addressInfo + * The address information that the created entry is meant to track demand for. * - * @return a new {@link FederationAddressEntry} that holds the given federation consumer. + * @return a new {@link FederationAddressEntry} that tracks demand on an address. */ - protected FederationAddressEntry createConsumerEntry(FederationConsumerInternal consumer) { - return new FederationAddressEntry(consumer); + protected FederationAddressEntry createConsumerEntry(AddressInfo addressInfo) { + return new FederationAddressEntry(addressInfo); } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java index 5d533cb6c35..10439dcdb3a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationGenericConsumerInfo.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal; import java.util.Objects; +import java.util.UUID; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -48,6 +49,7 @@ public class FederationGenericConsumerInfo implements FederationConsumerInfo { private final String filterString; private final String fqqn; private final int priority; + private final String id; protected FederationGenericConsumerInfo(Role role, String address, String queueName, RoutingType routingType, String filterString, String fqqn, int priority) { @@ -58,6 +60,7 @@ protected FederationGenericConsumerInfo(Role role, String address, String queueN this.filterString = filterString; this.fqqn = fqqn; this.priority = priority; + this.id = UUID.randomUUID().toString(); } /** @@ -116,6 +119,11 @@ public static FederationGenericConsumerInfo build(String address, String queueNa ActiveMQDefaultConfiguration.getDefaultConsumerPriority()); } + @Override + public String getId() { + return id; + } + @Override public Role getRole() { return role; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java index 1edbc4c451d..0f4655f5bad 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; /** * An entry type class used to hold a {@link FederationConsumerInternal} and @@ -29,24 +31,46 @@ * by federation implementation to hold additional state data for the federation * consumer and the managing of its lifetime. * - * This entry type provides a reference counter that can be used to register demand + * This entry type provides reference tracking state for current demand (bindings) * on a federation resource such that it is not torn down until all demand has been * removed from the local resource. */ public class FederationQueueEntry { - private final FederationConsumerInternal consumer; - + private final FederationConsumerInfo consumerInfo; private final Set consumerDemand = new HashSet<>(); + private FederationConsumerInternal consumer; + /** * Creates a new queue entry with a single reference * - * @param consumer - * The federation consumer that will be carried in this entry. + * @param consumerInfo + * Consumer information object used to define the federation queue consumer */ - public FederationQueueEntry(FederationConsumerInternal consumer) { - this.consumer = consumer; + public FederationQueueEntry(FederationConsumerInfo consumerInfo) { + this.consumerInfo = consumerInfo; + } + + /** + * @return the name of the queue that this entry tracks demand for. + */ + public String getQueueName() { + return consumerInfo.getQueueName(); + } + + /** + * @return the consumer information that defines the properties of federation queue consumers + */ + public FederationConsumerInfo getConsumerInfo() { + return consumerInfo; + } + + /** + * @return true if a consumer is currently set on this entry. + */ + public boolean hasConsumer() { + return consumer != null; } /** @@ -56,6 +80,30 @@ public FederationConsumerInternal getConsumer() { return consumer; } + /** + * Sets the consumer assigned to this entry to the given instance. + * + * @param consumer + * The federation consumer that is currently active for this entry. + * + * @return this federation queue consumer entry. + */ + public FederationQueueEntry setConsumer(FederationConsumerInternal consumer) { + Objects.requireNonNull(consumer, "Cannot assign a null consumer to this entry, call clear to unset"); + this.consumer = consumer; + return this; + } + + /** + * Clears the currently assigned consumer from this entry. + * + * @return this federation queue consumer entry. + */ + public FederationQueueEntry clearConsumer() { + this.consumer = null; + return this; + } + /** * @return true if there are bindings that are mapped to this federation entry. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java index 1b78f3dee1a..6950d6278fd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java @@ -28,13 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; @@ -46,14 +49,14 @@ * monitoring broker queues for demand and creating a consumer for on the remote side * to federate messages back to this peer. */ -public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin { +public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin, ActiveMQServerBindingPlugin { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected final ActiveMQServer server; protected final Predicate federationConsumerMatcher; protected final FederationReceiveFromQueuePolicy policy; - protected final Map remoteConsumers = new HashMap<>(); + protected final Map demandTracking = new HashMap<>(); protected final FederationInternal federation; private volatile boolean started; @@ -93,8 +96,12 @@ public synchronized void stop() { // broker plugin instances. server.unRegisterBrokerPlugin(this); started = false; - remoteConsumers.forEach((k, v) -> v.getConsumer().close()); // Cleanup and recreate if ever reconnected. - remoteConsumers.clear(); + demandTracking.forEach((k, v) -> { + if (v.hasConsumer()) { + v.getConsumer().close(); + } + }); + demandTracking.clear(); } } @@ -109,7 +116,7 @@ public synchronized void afterCreateConsumer(ServerConsumer consumer) { public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) { if (started) { final String queueName = consumer.getQueue().getName().toString(); - final FederationQueueEntry entry = remoteConsumers.get(queueName); + final FederationQueueEntry entry = demandTracking.get(queueName); if (entry == null) { return; @@ -119,7 +126,7 @@ public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean fai logger.trace("Reducing demand on federated queue {}, remaining demand? {}", queueName, entry.hasDemand()); - if (!entry.hasDemand()) { + if (!entry.hasDemand() && entry.hasConsumer()) { final FederationConsumerInternal federationConsuner = entry.getConsumer(); try { @@ -127,12 +134,24 @@ public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean fai federationConsuner.close(); signalAfterCloseFederationConsumer(federationConsuner); } finally { - remoteConsumers.remove(queueName); + demandTracking.remove(queueName); } } } } + @Override + public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { + if (binding instanceof QueueBinding) { + final QueueBinding queueBinding = (QueueBinding) binding; + final FederationQueueEntry entry = demandTracking.remove(queueBinding.getQueue().getName().toString()); + + if (entry != null && entry.hasConsumer()) { + entry.getConsumer().close(); + } + } + } + protected final void scanAllQueueBindings() { server.getPostOffice() .getAllBindings() @@ -150,7 +169,9 @@ protected final void checkQueueForMatch(Queue queue) { } protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { - if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), consumer.getQueueName().toString())) { + final String queueName = consumer.getQueue().getName().toString(); + + if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) { // We should ignore federation consumers from remote peers but configuration does allow // these to be federated again for some very specific use cases so we check before then // moving onto any server plugin checks kick in. @@ -158,49 +179,59 @@ protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { return; } - if (isPluginBlockingFederationConsumerCreate(consumer.getQueue())) { - return; - } - logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding()); - final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer); + final FederationQueueEntry entry; - // Check for existing consumer add demand from a additional local consumer - // to ensure the remote consumer remains active until all local demand is - // withdrawn. - if (remoteConsumers.containsKey(consumerInfo.getQueueName())) { - logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", consumerInfo.getQueueName()); - remoteConsumers.get(consumerInfo.getQueueName()).addDemand(consumer); + // Check for existing consumer add demand from a additional local consumer to ensure + // the remote consumer remains active until all local demand is withdrawn. + if (demandTracking.containsKey(queueName)) { + logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", queueName); + entry = demandTracking.get(queueName); } else { - logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", consumerInfo.getQueueName()); - - signalBeforeCreateFederationConsumer(consumerInfo); - - final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); - final FederationQueueEntry entry = createServerConsumerEntry(queueConsumer).addDemand(consumer); - - // Handle remote close with remove of consumer which means that future demand will - // attempt to create a new consumer for that demand. Ensure that thread safety is - // accounted for here as the notification can be asynchronous. - queueConsumer.setRemoteClosedHandler((closedConsumer) -> { - synchronized (this) { - try { - remoteConsumers.remove(closedConsumer.getConsumerInfo().getQueueName()); - } finally { - closedConsumer.close(); + demandTracking.put(queueName, entry = createConsumerEntry(createConsumerInfo(consumer))); + } + + // Demand passed all binding plugin blocking checks so we track it, plugin can still + // stop federation of the queue based on some external criteria but once it does + // (if ever) allow it we will have tracked all allowed demand. + entry.addDemand(consumer); + + tryCreateFederationConsumerForQueue(entry, consumer.getQueue()); + } + } + + private void tryCreateFederationConsumerForQueue(FederationQueueEntry queueEntry, Queue queue) { + if (queueEntry.hasDemand() && !queueEntry.hasConsumer() && !isPluginBlockingFederationConsumerCreate(queue)) { + logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", queueEntry.getQueueName()); + + signalBeforeCreateFederationConsumer(queueEntry.getConsumerInfo()); + + final FederationConsumerInternal queueConsumer = createFederationConsumer(queueEntry.getConsumerInfo()); + + // Handle remote close with remove of consumer which means that future demand will + // attempt to create a new consumer for that demand. Ensure that thread safety is + // accounted for here as the notification can be asynchronous. + queueConsumer.setRemoteClosedHandler((closedConsumer) -> { + synchronized (this) { + try { + final FederationQueueEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo().getQueueName()); + + if (tracked != null) { + tracked.clearConsumer(); } + } finally { + closedConsumer.close(); } - }); + } + }); - // Called under lock so state should stay in sync - remoteConsumers.put(consumerInfo.getQueueName(), entry); + queueEntry.setConsumer(queueConsumer); - // Now that we are tracking it we can start it - queueConsumer.start(); + // Now that we are tracking it we can start it + queueConsumer.start(); - signalAfterCreateFederationConsumer(queueConsumer); - } + signalAfterCreateFederationConsumer(queueConsumer); } } @@ -220,17 +251,15 @@ public synchronized void afterRemoteQueueAdded(String addressName, String queueN // We ignore the remote address as locally the policy can be a wild card match and we can // try to federate based on the Queue only, if the remote rejects the federation consumer // binding again the request will once more be recorded and we will get another event if - // the queue were recreated such that a match could be made. - if (started && testIfQueueMatchesPolicy(queueName)) { - - // Find a matching Queue with the given name and then check for demand based - // on the attached consumers and the current policy constraints. - - server.getPostOffice() - .getAllBindings() - .filter(b -> b instanceof QueueBinding && ((QueueBinding) b).getQueue().getName().toString().equals(queueName)) - .map(b -> (QueueBinding) b) - .forEach(b -> checkQueueForMatch(b.getQueue())); + // the queue were recreated such that a match could be made. We retain all the current + // demand and don't need to re-check the server state before trying to create the + // remote queue consumer. + if (started && testIfQueueMatchesPolicy(queueName) && demandTracking.containsKey(queueName)) { + final Queue queue = server.locateQueue(queueName); + + if (queue != null) { + tryCreateFederationConsumerForQueue(demandTracking.get(queueName), queue); + } } } @@ -286,13 +315,13 @@ protected FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) { * instance. A subclass can override this method to return a more customized entry type with additional * state data. * - * @param consumer - * The {@link FederationConsumerInternal} instance that will be housed in this entry. + * @param consumerInfo + * The consumer information that defines characteristics of the federation queue consumer * - * @return a new {@link FederationQueueEntry} that holds the given federation consumer. + * @return a new {@link FederationQueueEntry} that holds the given queue name. */ - protected FederationQueueEntry createServerConsumerEntry(FederationConsumerInternal consumer) { - return new FederationQueueEntry(consumer); + protected FederationQueueEntry createConsumerEntry(FederationConsumerInfo consumerInfo) { + return new FederationQueueEntry(consumerInfo); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index cec0b8798bf..e267dc77574 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -3281,6 +3282,196 @@ public void testRemoteBrokerClosesFederationReceiverAfterAddressRemoved() throws } } + @Test(timeout = 20000) + public void testFederationAddressDemandTrackedWhenRemoteRejectsInitialAttempts() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic("test"); + + connection.start(); + + // First consumer we reject the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .respondInKind() + .withNullSource(); + peer.expectFlow().withLinkCredit(1000); + peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10); + peer.expectDetach(); + + final MessageConsumer consumer1 = session.createConsumer(topic); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Second consumer we reject the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .respondInKind() + .withNullSource(); + peer.expectFlow().withLinkCredit(1000); + peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10); + peer.expectDetach(); + + final MessageConsumer consumer2 = session.createConsumer(topic); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Third consumer we accept the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(1000); + + final MessageConsumer consumer3 = session.createConsumer(topic); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer3.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer2.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // Demand should be gone now + consumer1.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + + @Test(timeout = 20000) + public void testFederationAddressDemandTrackedWhenPluginBlocksInitialAttempts() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AtomicInteger blockUntilZero = new AtomicInteger(2); + final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin(); + federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true; + federationPlugin.shouldCreateConsumerForQueue = (q) -> true; + federationPlugin.shouldCreateConsumerForAddress = (a) -> { + return blockUntilZero.getAndDecrement() == 0; + }; + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.registerBrokerPlugin(federationPlugin); + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic("test"); + + connection.start(); + + final MessageConsumer consumer1 = session.createConsumer(topic); + final MessageConsumer consumer2 = session.createConsumer(topic); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Third consumer we expect the plugin to allow the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(1000); + + final MessageConsumer consumer3 = session.createConsumer(topic); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer3.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer2.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // Demand should be gone now + consumer1.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index 307a2a7c381..538ffcaa5a4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -59,6 +59,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -71,6 +78,7 @@ import javax.jms.Queue; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -80,10 +88,15 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin; +import org.apache.activemq.artemis.protocol.amqp.federation.Federation; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; @@ -3249,6 +3262,200 @@ public void testRemoteBrokerClosesFederationReceiverAfterQueueRemoved() throws E } } + @Test(timeout = 20000) + public void testFederationQueueDemandTrackedWhenRemoteRejectsInitialAttempts() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + connection.start(); + + // First consumer we reject the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .respondInKind() + .withNullSource(); + peer.expectFlow().withLinkCredit(1000); + peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10); + peer.expectDetach(); + + final MessageConsumer consumer1 = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Second consumer we reject the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .respondInKind() + .withNullSource(); + peer.expectFlow().withLinkCredit(1000); + peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10); + peer.expectDetach(); + + final MessageConsumer consumer2 = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Third consumer we accept the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + final MessageConsumer consumer3 = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer3.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer2.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // Demand should be gone now + consumer1.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + + @Test(timeout = 20000) + public void testFederationQueueDemandTrackedWhenPluginBlocksInitialAttempts() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AtomicInteger blockUntilZero = new AtomicInteger(2); + final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin(); + federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true; + federationPlugin.shouldCreateConsumerForQueue = (q) -> { + return blockUntilZero.getAndDecrement() == 0; + }; + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.registerBrokerPlugin(federationPlugin); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + connection.start(); + + final MessageConsumer consumer1 = session.createConsumer(queue); + final MessageConsumer consumer2 = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Third consumer we expect the plugin to allow the federation attempt + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(1000); + + final MessageConsumer consumer3 = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer3.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand should remain + consumer2.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // Demand should be gone now + consumer1.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address); @@ -3442,4 +3649,82 @@ private void scriptFederationConnectToRemote(ProtonTestClient peer, String feder .withOfferedCapability(FEDERATION_EVENT_LINK.toString()); } } + + private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin { + + public final AtomicBoolean started = new AtomicBoolean(); + public final AtomicBoolean stopped = new AtomicBoolean(); + + public final AtomicReference beforeCreateConsumerCapture = new AtomicReference<>(); + public final AtomicReference afterCreateConsumerCapture = new AtomicReference<>(); + public final AtomicReference beforeCloseConsumerCapture = new AtomicReference<>(); + public final AtomicReference afterCloseConsumerCapture = new AtomicReference<>(); + + public Consumer beforeCreateConsumer = (c) -> beforeCreateConsumerCapture.set(c);; + public Consumer afterCreateConsumer = (c) -> afterCreateConsumerCapture.set(c); + public Consumer beforeCloseConsumer = (c) -> beforeCloseConsumerCapture.set(c); + public Consumer afterCloseConsumer = (c) -> afterCloseConsumerCapture.set(c); + + public BiConsumer beforeMessageHandled = (c, m) -> { }; + public BiConsumer afterMessageHandled = (c, m) -> { }; + + public Function shouldCreateConsumerForAddress = (a) -> true; + public Function shouldCreateConsumerForQueue = (q) -> true; + public BiFunction shouldCreateConsumerForDivert = (d, q) -> true; + + @Override + public void federationStarted(final Federation federation) throws ActiveMQException { + started.set(true); + } + + @Override + public void federationStopped(final Federation federation) throws ActiveMQException { + stopped.set(true); + } + + @Override + public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException { + beforeCreateConsumer.accept(consumerInfo); + } + + @Override + public void afterCreateFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + afterCreateConsumer.accept(consumer); + } + + @Override + public void beforeCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + beforeCloseConsumer.accept(consumer); + } + + @Override + public void afterCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + afterCloseConsumer.accept(consumer); + } + + @Override + public void beforeFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException { + beforeMessageHandled.accept(consumer, message); + } + + @Override + public void afterFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException { + afterMessageHandled.accept(consumer, message); + } + + @Override + public boolean shouldCreateFederationConsumerForAddress(final AddressInfo address) throws ActiveMQException { + return shouldCreateConsumerForAddress.apply(address); + } + + @Override + public boolean shouldCreateFederationConsumerForQueue(final org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException { + return shouldCreateConsumerForQueue.apply(queue); + } + + @Override + public boolean shouldCreateFederationConsumerForDivert(Divert divert, org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException { + return shouldCreateConsumerForDivert.apply(divert, queue); + } + } }