From b94662c65eda4694429f81f0cc3bbef5634daff2 Mon Sep 17 00:00:00 2001 From: Roddie Kieley Date: Tue, 31 Jul 2018 10:04:56 -0230 Subject: [PATCH 1/2] ARTEMIS-2000: For ScaleDown set the RoutingType header property on the message so if the address does not exist on the other end it will be created correctly. (cherry picked from commit 71f75e2ded2fe5f61e7099848b571e993f9ebbea) --- .../core/server/impl/ScaleDownHandler.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index a1c2229449b..02fe1bf2697 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -162,14 +163,28 @@ public long scaleDownRegularMessages(final SimpleString address, } // compile a list of all the relevant queues and queue iterators for this address + RoutingType routingType; + Integer routingTypeOrdinal; + String routingTypeString = ""; for (Queue loopQueue : queues) { logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue); try (LinkedListIterator messagesIterator = loopQueue.browserIterator()) { + routingType = loopQueue.getRoutingType(); + if (null != routingType) { + routingTypeOrdinal = routingType.ordinal(); + routingTypeString = routingTypeOrdinal.toString(); + } + while (messagesIterator.hasNext()) { MessageReference messageReference = messagesIterator.next(); - Message message = messageReference.getMessage().copy(); + Message originalMessage = messageReference.getMessage(); + + if (null != routingType) { + originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString); + } + Message message = originalMessage.copy(); logger.debug("Reading message " + message + " from queue " + loopQueue); Set queuesFound = new HashSet<>(); @@ -179,7 +194,7 @@ public long scaleDownRegularMessages(final SimpleString address, // no need to lookup on itself, we just add it queuesFound.add(controlEntry.getValue()); } else if (controlEntry.getValue().lookup(messageReference)) { - logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID()); + logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID(message)); queuesFound.add(controlEntry.getValue()); } } @@ -188,7 +203,7 @@ public long scaleDownRegularMessages(final SimpleString address, ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8); for (QueuesXRefInnerManager control : queuesFound) { - long queueID = control.getQueueID(); + long queueID = control.getQueueID(message); buffer.putLong(queueID); } @@ -339,7 +354,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory, if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); + queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -361,7 +376,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory, if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); + queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -426,13 +441,14 @@ public void scaleDownDuplicateIDs(Map>> du * send directly to a queue, we have to send to an address instead but not all the queues related to the * address may need the message */ - private long createQueueIfNecessaryAndGetID(ClientSession session, - Queue queue, - SimpleString addressName) throws Exception { + private long createQueueWithRoutingTypeIfNecessaryAndGetID(ClientSession session, + Queue queue, + SimpleString addressName, + RoutingType routingType) throws Exception { long queueID = getQueueID(session, queue.getName()); if (queueID == -1) { - session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable()); - logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]"); + session.createQueue(addressName, routingType, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable()); + logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", routingType=" + queue.getRoutingType() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]"); queueID = getQueueID(session, queue.getName()); } @@ -523,10 +539,10 @@ public Queue getQueue() { return queue; } - public long getQueueID() throws Exception { + public long getQueueID(Message message) throws Exception { if (targetQueueID < 0) { - targetQueueID = createQueueIfNecessaryAndGetID(clientSession, queue, queue.getAddress()); + targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), message.getRoutingType()); } return targetQueueID; } From 5059e883c510f70714dbc938a94f4dbd2702b307 Mon Sep 17 00:00:00 2001 From: andytaylor Date: Fri, 14 Sep 2018 16:22:43 +0100 Subject: [PATCH 2/2] ARTEMIS-2000: test added (cherry picked from commit 052b67d815a8fefdb74860e866b8ba921f5f7196) --- .../integration/server/ScaleDownTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index ec49eceef2c..1a1d007792d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -290,6 +292,31 @@ public void testScaleDownWithMissingQueue() throws Exception { removeConsumer(0); } + @Test + public void testScaleDownWithMissingAnycastQueue() throws Exception { + final int TEST_SIZE = 2; + final String addressName = "testAddress"; + final String queueName1 = "testQueue1"; + final String queueName2 = "testQueue2"; + + // create 2 queues on each node mapped to the same address + createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.ANYCAST); + + // send messages to node 0 + send(0, addressName, TEST_SIZE, false, null); + + // trigger scaleDown from node 0 to node 1 + servers[0].stop(); + + Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.ANYCAST); + // get the 1 message from queue 2 + addConsumer(0, 1, queueName2, null); + ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + Assert.assertNotNull(clientMessage); + clientMessage.acknowledge(); + + } + @Test public void testMessageProperties() throws Exception { final int TEST_SIZE = 5;