Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
Expand Down Expand Up @@ -162,14 +163,28 @@ public long scaleDownRegularMessages(final SimpleString address,
}

// compile a list of all the relevant queues and queue iterators for this address
RoutingType routingType;
Integer routingTypeOrdinal;
String routingTypeString = "";
for (Queue loopQueue : queues) {
logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);

try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {

routingType = loopQueue.getRoutingType();
if (null != routingType) {
routingTypeOrdinal = routingType.ordinal();
routingTypeString = routingTypeOrdinal.toString();
}

while (messagesIterator.hasNext()) {
MessageReference messageReference = messagesIterator.next();
Message message = messageReference.getMessage().copy();
Message originalMessage = messageReference.getMessage();

if (null != routingType) {
originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
}
Message message = originalMessage.copy();

logger.debug("Reading message " + message + " from queue " + loopQueue);
Set<QueuesXRefInnerManager> queuesFound = new HashSet<>();
Expand All @@ -179,7 +194,7 @@ public long scaleDownRegularMessages(final SimpleString address,
// no need to lookup on itself, we just add it
queuesFound.add(controlEntry.getValue());
} else if (controlEntry.getValue().lookup(messageReference)) {
logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID());
logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID(message));
queuesFound.add(controlEntry.getValue());
}
}
Expand All @@ -188,7 +203,7 @@ public long scaleDownRegularMessages(final SimpleString address,
ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8);

for (QueuesXRefInnerManager control : queuesFound) {
long queueID = control.getQueueID();
long queueID = control.getQueueID(message);
buffer.putLong(queueID);
}

Expand Down Expand Up @@ -339,7 +354,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory,
if (queueIDs.containsKey(queueName)) {
queueID = queueIDs.get(queueName);
} else {
queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time
}
Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
Expand All @@ -361,7 +376,7 @@ public void scaleDownTransactions(ClientSessionFactory sessionFactory,
if (queueIDs.containsKey(queueName)) {
queueID = queueIDs.get(queueName);
} else {
queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time
}
Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
Expand Down Expand Up @@ -426,13 +441,14 @@ public void scaleDownDuplicateIDs(Map<SimpleString, List<Pair<byte[], Long>>> du
* send directly to a queue, we have to send to an address instead but not all the queues related to the
* address may need the message
*/
private long createQueueIfNecessaryAndGetID(ClientSession session,
Queue queue,
SimpleString addressName) throws Exception {
private long createQueueWithRoutingTypeIfNecessaryAndGetID(ClientSession session,
Queue queue,
SimpleString addressName,
RoutingType routingType) throws Exception {
long queueID = getQueueID(session, queue.getName());
if (queueID == -1) {
session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable());
logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]");
session.createQueue(addressName, routingType, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable());
logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", routingType=" + queue.getRoutingType() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]");
queueID = getQueueID(session, queue.getName());
}

Expand Down Expand Up @@ -523,10 +539,10 @@ public Queue getQueue() {
return queue;
}

public long getQueueID() throws Exception {
public long getQueueID(Message message) throws Exception {

if (targetQueueID < 0) {
targetQueueID = createQueueIfNecessaryAndGetID(clientSession, queue, queue.getAddress());
targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), message.getRoutingType());
}
return targetQueueID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
Expand All @@ -33,6 +34,7 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
Expand Down Expand Up @@ -290,6 +292,31 @@ public void testScaleDownWithMissingQueue() throws Exception {
removeConsumer(0);
}

@Test
public void testScaleDownWithMissingAnycastQueue() throws Exception {
final int TEST_SIZE = 2;
final String addressName = "testAddress";
final String queueName1 = "testQueue1";
final String queueName2 = "testQueue2";

// create 2 queues on each node mapped to the same address
createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.ANYCAST);

// send messages to node 0
send(0, addressName, TEST_SIZE, false, null);

// trigger scaleDown from node 0 to node 1
servers[0].stop();

Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
// get the 1 message from queue 2
addConsumer(0, 1, queueName2, null);
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();

}

@Test
public void testMessageProperties() throws Exception {
final int TEST_SIZE = 5;
Expand Down