Skip to content

Commit

Permalink
ARTEMIS-4653 Create federation consumers for specific queue consumer …
Browse files Browse the repository at this point in the history
…filters

When Queue consumers attach with filters use those instead of the Queue
filter to filter the messages that are federated to avoid stranding of
messages on the local broker. This will result in multiple federation
consumers if the various attached local consumers all use different
filters but does keep unwanted messages on the remote so that consumers
there can consume those.
  • Loading branch information
tabish121 authored and gemmellr committed Feb 21, 2024
1 parent 7d3ed3e commit 9155672
Show file tree
Hide file tree
Showing 16 changed files with 607 additions and 89 deletions.
Expand Up @@ -151,6 +151,16 @@ public synchronized boolean isStarted() {
*/
public abstract int getLargeMessageThreshold();

/**
* @return the true if the federation should ignore filters on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerFilters();

/**
* @return the true if the federation should ignore priorities on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerPriorities();

/**
* @return the true if the federation should support core message tunneling.
*/
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
Expand All @@ -31,9 +32,11 @@
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,12 +62,16 @@ public AMQPFederationAddressPolicyManager(AMQPFederation federation, FederationR

@Override
protected FederationGenericConsumerInfo createConsumerInfo(AddressInfo address) {
return FederationGenericConsumerInfo.build(address.getName().toString(),
generateQueueName(address),
address.getRoutingType(),
remoteQueueFilter,
federation,
policy);
final String addressName = address.getName().toString();
final String generatedQueueName = generateQueueName(address);

return new FederationGenericConsumerInfo(Role.ADDRESS_CONSUMER,
addressName,
generatedQueueName,
address.getRoutingType(),
remoteQueueFilter,
CompositeAddress.toFullyQualified(addressName, generatedQueueName),
ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
}

protected String generateQueueName(AddressInfo address) {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -52,6 +54,20 @@ public final class AMQPFederationConfiguration {
*/
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;

/**
* Default value for the filtering applied to federation queue consumers that controls if
* the filter specified by a consumer subscription is used or if the higher level queue
* filter only is applied when creating a federation queue consumer.
*/
private static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_FILTERS = false;

/**
* Default value for the priority applied to federation queue consumers that controls if
* the priority specified by a consumer subscription is used or if the policy priority
* offset value is simply applied to the default consumer priority value.
*/
private static final boolean DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES = false;

private final Map<String, Object> properties;
private final AMQPConnectionContext connection;

Expand Down Expand Up @@ -138,6 +154,34 @@ public boolean isCoreMessageTunnelingEnabled() {
}
}

/**
* @return <code>true</code> if federation is configured to ignore filters on individual queue consumers
*/
public boolean isIgnoreSubscriptionFilters() {
final Object property = properties.get(IGNORE_QUEUE_CONSUMER_FILTERS);
if (property instanceof Boolean) {
return (Boolean) property;
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return DEFAULT_IGNNORE_QUEUE_CONSUMER_FILTERS;
}
}

/**
* @return <code>true</code> if federation is configured to ignore priorities on individual queue consumers
*/
public boolean isIgnoreSubscriptionPriorities() {
final Object property = properties.get(IGNORE_QUEUE_CONSUMER_PRIORITIES);
if (property instanceof Boolean) {
return (Boolean) property;
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return DEFAULT_IGNNORE_QUEUE_CONSUMER_PRIORITIES;
}
}

/**
* Enumerate the configuration options in this configuration object and return a {@link Map} that
* contains the values which can be sent to a remote peer
Expand All @@ -151,6 +195,8 @@ public Map<String, Object> toConfigurationMap() {
configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow());
configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold());
configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout());
configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, isIgnoreSubscriptionFilters());
configMap.put(IGNORE_QUEUE_CONSUMER_PRIORITIES, isIgnoreSubscriptionPriorities());
configMap.put(AmqpSupport.TUNNEL_CORE_MESSAGES, isCoreMessageTunnelingEnabled());

return configMap;
Expand Down
Expand Up @@ -84,6 +84,27 @@ public final class AMQPFederationConstants {
*/
public static final String LARGE_MESSAGE_THRESHOLD = "minLargeMessageSize";

/**
* Configuration property used to convey the local side value to use when considering if federation queue
* consumers should filter using the filters defined on individual queue subscriptions, this can be sent
* to the peer so that dual federation configurations share the same configuration on both sides of the
* connection. This can be used to prevent multiple subscriptions on the same queue based on local demand
* with differing subscription filters but does imply that message that don't match those filters would
* be federated to the local broker.
*/
public static final String IGNORE_QUEUE_CONSUMER_FILTERS = "ignoreQueueConsumerFilters";

/**
* Configuration property used to convey the local side value to use when considering if federation queue
* consumers should apply a consumer priority offset based on the subscription priority or should use a
* singular priority offset based on policy configuration. This can be sent to the peer so that dual
* federation configurations share the same configuration on both sides of the connection. This can be
* used to prevent multiple subscriptions on the same queue based on local demand with differing consumer
* priorities but does imply that care needs to be taken to ensure remote consumers would normally have
* a higher priority value than the configured default priority offset.
*/
public static final String IGNORE_QUEUE_CONSUMER_PRIORITIES = "ignoreQueueConsumerPriorities";

/**
* A desired capability added to the federation queue receiver link that must be offered
* in return for a federation queue receiver to be successfully opened. On the remote the
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
Expand Down Expand Up @@ -104,4 +106,26 @@ public boolean isCoreMessageTunnelingEnabled() {
return federation.isCoreMessageTunnelingEnabled();
}
}

public boolean isIgnoreSubscriptionFilters() {
final Object property = properties.get(IGNORE_QUEUE_CONSUMER_FILTERS);
if (property instanceof Boolean) {
return (Boolean) property;
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerFilters();
}
}

public boolean isIgnoreSubscriptionPriorities() {
final Object property = properties.get(IGNORE_QUEUE_CONSUMER_PRIORITIES);
if (property instanceof Boolean) {
return (Boolean) property;
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerPriorities();
}
}
}
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -98,6 +99,11 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};


// Sequence ID value used to keep links that would otherwise have the same name from overlapping
// this generally occurs when consumers on the same queue have differing filters.
private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();

private final AMQPFederation federation;
private final AMQPFederationConsumerConfiguration configuration;
private final FederationConsumerInfo consumerInfo;
Expand Down Expand Up @@ -250,7 +256,8 @@ private void signalAfterFederationConsumerMessageHandled(Message message) throws
private String generateLinkName() {
return "federation-" + federation.getName() +
"-queue-receiver-" + consumerInfo.getFqqn() +
"-" + federation.getServer().getNodeID();
"-" + federation.getServer().getNodeID() + ":" +
LINK_SEQUENCE_ID.getAndIncrement();
}

private void asyncCreateReceiver() {
Expand Down
Expand Up @@ -21,14 +21,20 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +55,28 @@ public AMQPFederationQueuePolicyManager(AMQPFederation federation, FederationRec
this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties());
}

@Override
protected FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) {
final Queue queue = consumer.getQueue();
final String queueName = queue.getName().toString();
final String address = queue.getAddress().toString();

final int priority = configuration.isIgnoreSubscriptionPriorities() ?
ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + policy.getPriorityAjustment() :
consumer.getPriority() + policy.getPriorityAjustment();

final String filterString =
selectFilter(queue.getFilter(), configuration.isIgnoreSubscriptionFilters() ? null : consumer.getFilter());

return new FederationGenericConsumerInfo(Role.QUEUE_CONSUMER,
address,
queueName,
queue.getRoutingType(),
filterString,
CompositeAddress.toFullyQualified(address, queueName),
priority);
}

@Override
protected FederationConsumerInternal createFederationConsumer(FederationConsumerInfo consumerInfo) {
Objects.requireNonNull(consumerInfo, "Federation Queue consumer information object was null");
Expand Down Expand Up @@ -132,4 +160,12 @@ protected final boolean isPluginBlockingFederationConsumerCreate(Queue queue) {

return !canCreate.get();
}

private static String selectFilter(Filter queueFilter, Filter consumerFilter) {
if (consumerFilter != null) {
return consumerFilter.getFilterString().toString();
} else {
return queueFilter != null ? queueFilter.getFilterString().toString() : null;
}
}
}
Expand Up @@ -92,22 +92,6 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
// indicated it was desired, however unless offered by the remote we cannot use it.
sender.setDesiredCapabilities(new Symbol[] {AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});

// An queue receiver may supply a filter if the queue being federated had a filter attached
// to it at creation, this ensures that we only bring back message that match the original
// queue filter and not others that would simply increase traffic for no reason.
final Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);

if (filter != null) {
selector = filter.getValue().getDescribed().toString();
try {
SelectorParser.parse(selector);
} catch (FilterException e) {
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
}
} else {
selector = null;
}

final RoutingType routingType = getRoutingType(source);
final SimpleString targetAddress;
final SimpleString targetQueue;
Expand All @@ -131,6 +115,29 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
throw new ActiveMQAMQPNotFoundException("Queue: '" + targetQueue + "' is not mapped to specified address: " + targetAddress);
}

// An queue receiver may supply a filter if the queue being federated had a filter attached
// to it at creation, this ensures that we only bring back message that match the original
// queue filter and not others that would simply increase traffic for no reason.
final Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);

if (filter != null) {
final String filterString = filter.getValue().getDescribed().toString();
try {
SelectorParser.parse(filterString);
} catch (FilterException e) {
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
}

// No need to apply another filter if the current one on the Queue already matches that.
if (result.getFilterString() == null || !filterString.equals(result.getFilterString().toString())) {
selector = filterString;
} else {
selector = null;
}
} else {
selector = null;
}

// We need to check that the remote offers its ability to read tunneled core messages and
// if not we must not send them but instead convert all messages to AMQP messages first.
tunnelCoreMessages = verifyOfferedCapabilities(sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
Expand Down
Expand Up @@ -183,6 +183,25 @@ public boolean isCoreMessageTunnelingEnabled() {
return configuration.isCoreMessageTunnelingEnabled();
}


@Override
public boolean isIgnoreQueueConsumerFilters() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.isIgnoreSubscriptionFilters();
}

@Override
public boolean isIgnoreQueueConsumerPriorities() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.isIgnoreSubscriptionPriorities();
}

/**
* Adds a new {@link FederationReceiveFromQueuePolicy} entry to the set of policies that the
* remote end of this federation will use to create demand on the this server when local
Expand Down
Expand Up @@ -91,6 +91,16 @@ public boolean isCoreMessageTunnelingEnabled() {
return configuration.isCoreMessageTunnelingEnabled();
}

@Override
public boolean isIgnoreQueueConsumerFilters() {
return configuration.isIgnoreSubscriptionFilters();
}

@Override
public boolean isIgnoreQueueConsumerPriorities() {
return configuration.isIgnoreSubscriptionPriorities();
}

@Override
protected void handleFederationStarted() throws ActiveMQException {
// Tag the session with Federation metadata which will allow local federation policies sent by
Expand Down

0 comments on commit 9155672

Please sign in to comment.