Skip to content

Commit

Permalink
ARTEMIS-4378 ignore address federation config if connection is config…
Browse files Browse the repository at this point in the history
…ured as pull, consumerWindowSize=0
  • Loading branch information
gtully committed Aug 25, 2023
1 parent 29fafb5 commit 84c16f1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class FederationConnection {
private volatile ClientSessionFactory clientSessionFactory;
private volatile boolean started;
private volatile boolean sharedConnection;
private boolean isPull;

public FederationConnection(Configuration configuration, String name, FederationConnectionConfiguration config) {
this.config = config;
Expand Down Expand Up @@ -95,8 +96,8 @@ public FederationConnection(Configuration configuration, String name, Federation
BeanSupport.setData(serverLocator, possibleLocatorParameters);
} catch (Exception ignoredAsErrorsVisibleViaBeanUtilsLogging) {
}
isPull = ("0".equals(possibleLocatorParameters.get("consumerWindowSize")));
}

}

public synchronized void start() {
Expand All @@ -117,6 +118,10 @@ public boolean isStarted() {
return started;
}

public boolean isPull() {
return isPull;
}

public boolean isSharedConnection() {
return sharedConnection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.federation.address;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -52,6 +53,8 @@
import org.apache.activemq.artemis.core.settings.impl.Match;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Federated Address, replicate messages from the remote brokers address to itself.
Expand All @@ -65,6 +68,8 @@
*/
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String FEDERATED_QUEUE_PREFIX = "federated";

public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
Expand All @@ -74,6 +79,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
private final Set<Matcher> excludes;
private final FederationAddressPolicyConfiguration config;
private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
private final boolean hasPullConnectionConfig;

public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) {
super(federation, server, upstream);
Expand Down Expand Up @@ -102,6 +108,7 @@ public FederatedAddress(Federation federation, FederationAddressPolicyConfigurat
excludes.add(new Matcher(exclude, wildcardConfiguration));
}
}
hasPullConnectionConfig = upstream.getConnection().isPull();
}

@Override
Expand Down Expand Up @@ -310,8 +317,14 @@ private boolean match(AddressInfo addressInfo) {
}

private boolean match(SimpleString address, RoutingType routingType) {
//Currently only supporting Multicast currently.
if (RoutingType.ANYCAST.equals(routingType)) {
logger.debug("ignoring unsupported ANYCAST address {}", address);
return false;
}
if (hasPullConnectionConfig) {
// multicast address federation has no local queue to trigger batch pull requests, a regular fast consumer with credit window is necessary
// otherwise the upstream would fill up and block.
logger.debug("ignoring MULTICAST address {} on unsupported pull connection, consumerWindowSize=0 ", address);
return false;
}
for (Matcher exclude : excludes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.Collections;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
Expand Down Expand Up @@ -65,6 +69,44 @@ protected ConnectionFactory getCF(int i) throws Exception {
return factory;
}

@Test
public void testAddressFederatedConfiguredWithPullQueueConsumerEnabledNotAnOption() throws Exception {
String connector = "server-pull-1";

getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);

getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));
getServer(1).addAddressInfo(new AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));

getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration().setName("default").addFederationPolicy(new FederationAddressPolicyConfiguration().setName("myAddressPolicy").addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch("#"))).addUpstreamConfiguration(new FederationUpstreamConfiguration().setName("server1-upstream").addPolicyRef("myAddressPolicy").setStaticConnectors(Collections.singletonList(connector))));

getServer(0).getFederationManager().deploy();

final ConnectionFactory cf1 = getCF(0);
final ConnectionFactory cf2 = getCF(1);

try (Connection consumer1Connection = cf1.createConnection(); Connection producerConnection = cf2.createConnection()) {
consumer1Connection.start();
final Session session1 = consumer1Connection.createSession();
final Topic topic1 = session1.createTopic("source");
final MessageConsumer consumer1 = session1.createConsumer(topic1);

// Remote
final Session session2 = producerConnection.createSession();
final Topic topic2 = session2.createTopic("source");
final MessageProducer producer = session2.createProducer(topic2);

producer.send(session2.createTextMessage("hello"));

// no federation of this address
// consumer visible on local
assertTrue(waitForBindings(getServer(0), "source", true, 1, 1, 1000));
// federation consumer not visible on remote
assertFalse(waitForBindings(getServer(1), "source", true, 1, 1, 100));
}
}

@Test
public void testFederatedQueuePullFromUpstream() throws Exception {
String queueName = getName();
Expand Down

0 comments on commit 84c16f1

Please sign in to comment.