From 71f75e2ded2fe5f61e7099848b571e993f9ebbea Mon Sep 17 00:00:00 2001 From: Roddie Kieley Date: Tue, 31 Jul 2018 10:04:56 -0230 Subject: [PATCH 1/2] ARTEMIS-2000: For ScaleDown set the RoutingType header property on the message so if the address does not exist on the other end it will be created correctly. --- .../core/server/impl/ScaleDownHandler.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index a1c2229449b..02fe1bf2697 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -162,14 +163,28 @@ public long scaleDownRegularMessages(final SimpleString address, } // compile a list of all the relevant queues and queue iterators for this address + RoutingType routingType; + Integer routingTypeOrdinal; + String routingTypeString = ""; for (Queue loopQueue : queues) { logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue); try (LinkedListIterator messagesIterator = loopQueue.browserIterator()) { + routingType = loopQueue.getRoutingType(); + if (null != routingType) { + routingTypeOrdinal = routingType.ordinal(); + routingTypeString = routingTypeOrdinal.toString(); + } + while (messagesIterator.hasNext()) { MessageReference messageReference = messagesIterator.next(); - Message message = messageReference.getMessage().copy(); + Message originalMessage = messageReference.getMessage(); + + if (null != routingType) { + originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString); + } + Message message = originalMessage.copy(); logger.debug("Reading message " + message + " from queue " + loopQueue); Set queuesFound = new HashSet<>(); @@ -179,7 +194,7 @@ public long scaleDownRegularMessages(final SimpleString address, // no need to lookup on itself, we just add it queuesFound.add(controlEntry.getValue()); } else if (controlEntry.getValue().lookup(messageReference)) { - logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID()); + logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID(message)); queuesFound.add(controlEntry.getValue()); } } @@ -188,7 +203,7 @@ public long scaleDownRegularMessages(final SimpleString address, ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8); for (QueuesXRefInnerManager control : queuesFound) { - long queueID = control.getQueueID(); + long queueID = control.getQueueID(message); buffer.putLong(queueID); } @@ -339,7 +354,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory, if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); + queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -361,7 +376,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory, if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); } else { - queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString()); + queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } Pair, List> queueIds = queuesToSendTo.get(message); @@ -426,13 +441,14 @@ public void scaleDownDuplicateIDs(Map>> du * send directly to a queue, we have to send to an address instead but not all the queues related to the * address may need the message */ - private long createQueueIfNecessaryAndGetID(ClientSession session, - Queue queue, - SimpleString addressName) throws Exception { + private long createQueueWithRoutingTypeIfNecessaryAndGetID(ClientSession session, + Queue queue, + SimpleString addressName, + RoutingType routingType) throws Exception { long queueID = getQueueID(session, queue.getName()); if (queueID == -1) { - session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable()); - logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]"); + session.createQueue(addressName, routingType, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable()); + logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", routingType=" + queue.getRoutingType() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]"); queueID = getQueueID(session, queue.getName()); } @@ -523,10 +539,10 @@ public Queue getQueue() { return queue; } - public long getQueueID() throws Exception { + public long getQueueID(Message message) throws Exception { if (targetQueueID < 0) { - targetQueueID = createQueueIfNecessaryAndGetID(clientSession, queue, queue.getAddress()); + targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), message.getRoutingType()); } return targetQueueID; } From 052b67d815a8fefdb74860e866b8ba921f5f7196 Mon Sep 17 00:00:00 2001 From: andytaylor Date: Fri, 14 Sep 2018 16:22:43 +0100 Subject: [PATCH 2/2] ARTEMIS-2000: test added --- .../integration/server/ScaleDownTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index ec49eceef2c..1a1d007792d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -290,6 +292,31 @@ public void testScaleDownWithMissingQueue() throws Exception { removeConsumer(0); } + @Test + public void testScaleDownWithMissingAnycastQueue() throws Exception { + final int TEST_SIZE = 2; + final String addressName = "testAddress"; + final String queueName1 = "testQueue1"; + final String queueName2 = "testQueue2"; + + // create 2 queues on each node mapped to the same address + createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.ANYCAST); + + // send messages to node 0 + send(0, addressName, TEST_SIZE, false, null); + + // trigger scaleDown from node 0 to node 1 + servers[0].stop(); + + Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.ANYCAST); + // get the 1 message from queue 2 + addConsumer(0, 1, queueName2, null); + ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + Assert.assertNotNull(clientMessage); + clientMessage.acknowledge(); + + } + @Test public void testMessageProperties() throws Exception { final int TEST_SIZE = 5;