Skip to content

Commit

Permalink
ARTEMIS-4684 Internal Queues should not redistribute
Browse files Browse the repository at this point in the history
This is particularly true for the Mirrored SNF queue. Redistribution is not meant for internal queues. If an internal queue happens to have the same name on another server, it should not trigger redistribution when consumers are removed.

It would be possible to work around this by adding an address-setting specific to the address with redistribution disabled.

ClusteredMirrorSoakTest was intermittently failing because of this. For a few seconds while the mirror connection is still being made connections could move messages from one node towards another node if both have the same name.
  • Loading branch information
clebertsuconic committed Mar 13, 2024
1 parent c5b81d9 commit 11b7671
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 6 deletions.
Expand Up @@ -170,6 +170,10 @@ public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Q
assert snfQueue != null;
this.replicaConfig = replicaConfig;
this.snfQueue = snfQueue;
if (!snfQueue.isInternalQueue()) {
logger.debug("marking queue {} as internal to avoid redistribution kicking in", snfQueue.getName());
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.addQueues = replicaConfig.isQueueCreation();
Expand Down
Expand Up @@ -1594,8 +1594,17 @@ private boolean checkConsumerDirectDeliver() {
return supports;
}

public synchronized Redistributor getRedistributor() {
return redistributor == null ? null : redistributor.consumer;
}

@Override
public synchronized void addRedistributor(final long delay) {
if (isInternalQueue()) {
logger.debug("Queue {} is internal, can't be redistributed!", this.name);
return;
}

clearRedistributorFuture();

if (redistributor != null) {
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -68,10 +69,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
largeBody = writer.toString();
}

public static final String DC1_NODE_A = "mirror/DC1/A";
public static final String DC2_NODE_A = "mirror/DC2/A";
public static final String DC1_NODE_B = "mirror/DC1/B";
public static final String DC2_NODE_B = "mirror/DC2/B";
public static final String DC1_NODE_A = "ClusteredMirrorSoakTest/DC1/A";
public static final String DC2_NODE_A = "ClusteredMirrorSoakTest/DC2/A";
public static final String DC1_NODE_B = "ClusteredMirrorSoakTest/DC1/B";
public static final String DC2_NODE_B = "ClusteredMirrorSoakTest/DC2/B";

Process processDC1_node_A;
Process processDC1_node_B;
Expand Down Expand Up @@ -115,6 +116,16 @@ private static void createServer(String serverName, String connectionName, Strin
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
}


@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
cleanupData(DC2_NODE_B);
}


@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
Expand Down Expand Up @@ -286,7 +297,7 @@ public void testAutoCreateQueue() throws Exception {

startServers();

String queueName = "queue" + RandomUtil.randomString();
String queueName = "testqueue" + RandomUtil.randomString();

final int numberOfMessages = 50;

Expand All @@ -307,7 +318,7 @@ public void testAutoCreateQueue() throws Exception {

sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);

Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);

Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
Expand Down
Expand Up @@ -1084,6 +1084,15 @@ public void testNoMatchOn3AllowsRedistribution() throws Exception {
}
}

@Test
public void testNoRedistributorInternalQueue() throws Exception {
QueueImpl queue = getTemporaryQueue();
queue.setInternalQueue(true);

queue.addRedistributor(0);
Assert.assertNull(queue.getRedistributor());
}

private void testConsumerWithFilters(final boolean direct) throws Exception {
QueueImpl queue = getTemporaryQueue();

Expand Down

0 comments on commit 11b7671

Please sign in to comment.