diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 2891f1f07d9..48f1ef4343a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -170,6 +170,10 @@ public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Q assert snfQueue != null; this.replicaConfig = replicaConfig; this.snfQueue = snfQueue; + if (!snfQueue.isInternalQueue()) { + logger.debug("marking queue {} as internal to avoid redistribution kicking in", snfQueue.getName()); + snfQueue.setInternalQueue(true); // to avoid redistribution kicking in + } this.server = server; this.idSupplier = protonProtocolManager.getReferenceIDSupplier(); this.addQueues = replicaConfig.isQueueCreation(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8dd75829170..b7efc1c7213 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1594,8 +1594,17 @@ private boolean checkConsumerDirectDeliver() { return supports; } + public synchronized Redistributor getRedistributor() { + return redistributor == null ? null : redistributor.consumer; + } + @Override public synchronized void addRedistributor(final long delay) { + if (isInternalQueue()) { + logger.debug("Queue {} is internal, can't be redistributed!", this.name); + return; + } + clearRedistributorFuture(); if (redistributor != null) { diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java index 0964050f0c4..a2f765036c9 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -68,10 +69,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase { largeBody = writer.toString(); } - public static final String DC1_NODE_A = "mirror/DC1/A"; - public static final String DC2_NODE_A = "mirror/DC2/A"; - public static final String DC1_NODE_B = "mirror/DC1/B"; - public static final String DC2_NODE_B = "mirror/DC2/B"; + public static final String DC1_NODE_A = "ClusteredMirrorSoakTest/DC1/A"; + public static final String DC2_NODE_A = "ClusteredMirrorSoakTest/DC2/A"; + public static final String DC1_NODE_B = "ClusteredMirrorSoakTest/DC1/B"; + public static final String DC2_NODE_B = "ClusteredMirrorSoakTest/DC2/B"; Process processDC1_node_A; Process processDC1_node_B; @@ -115,6 +116,16 @@ private static void createServer(String serverName, String connectionName, Strin Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); } + + @Before + public void cleanupServers() { + cleanupData(DC1_NODE_A); + cleanupData(DC2_NODE_A); + cleanupData(DC2_NODE_B); + cleanupData(DC2_NODE_B); + } + + @BeforeClass public static void createServers() throws Exception { createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0); @@ -286,7 +297,7 @@ public void testAutoCreateQueue() throws Exception { startServers(); - String queueName = "queue" + RandomUtil.randomString(); + String queueName = "testqueue" + RandomUtil.randomString(); final int numberOfMessages = 50; @@ -307,7 +318,7 @@ public void testAutoCreateQueue() throws Exception { sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10); - Wait.assertEquals(numberOfMessages, receiverCount::get, 5000); + Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000); Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName)); Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName)); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 4821e618c5f..f9c2bc74ac6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1084,6 +1084,15 @@ public void testNoMatchOn3AllowsRedistribution() throws Exception { } } + @Test + public void testNoRedistributorInternalQueue() throws Exception { + QueueImpl queue = getTemporaryQueue(); + queue.setInternalQueue(true); + + queue.addRedistributor(0); + Assert.assertNull(queue.getRedistributor()); + } + private void testConsumerWithFilters(final boolean direct) throws Exception { QueueImpl queue = getTemporaryQueue();