Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1289,29 +1289,14 @@ public MessageReference reload(final Message message, final Queue queue, final T
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
Comment thread
gtully marked this conversation as resolved.
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());

if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
copyRedistribute.setAddress(originatingQueue.getAddress());

if (tx != null) {
Comment thread
gtully marked this conversation as resolved.
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterRollback(Transaction tx) {
try {
//this will cause large message file to be
//cleaned up
// copyRedistribute.refDown();
} catch (Exception e) {
logger.warn("Failed to clean up message: " + copyRedistribute);
}
}
});
}
copyRedistribute.setAddress(message.getAddress());

RoutingContext context = new RoutingContextImpl(tx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,10 @@ void slowConsumerDetected(String sessionID,
@Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT)
void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222303, value = "Redistribution by {0} of messageID = {1} failed", format = Message.Format.MESSAGE_FORMAT)
void errorRedistributing(@Cause Throwable t, String queueName, long m);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void run() {
queue.deliverAsync();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID());
try {
tx.rollback();
} catch (Exception e2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,12 @@ public synchronized RoutingStatus doSend(final Transaction tx,
}
} */

AddressInfo art = getAddressAndRoutingType(new AddressInfo(msg.getAddressSimpleString(), routingType));
final AddressInfo targetFromMessage = new AddressInfo(msg.getAddressSimpleString(), routingType);
AddressInfo art = getAddressAndRoutingType(targetFromMessage);
if (art != targetFromMessage) {
// remove the prefix from the message, with the address model change, only non prefixed addresses exist on the broker
msg.setAddress(art.getName());
}

// check the user has write access to this address.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public void setUp() throws Exception {
bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
notificationsQueue = SimpleString.toSimpleString("Notifications");

setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
setupClusterConnection("cluster-1->2", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
setupClusterConnection("cluster-2->1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);

server0.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ public void internalTestCoreClientPrefixes(boolean security) throws Exception {
for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) {
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
// this seems to be the only assert of this non requirement
assertFalse(message.getAddress().contains(queuePrefix));
message.acknowledge();
}
assertNull(consumer.receiveImmediate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
Expand Down Expand Up @@ -100,6 +101,101 @@ public void loadBalanceRequests() throws Exception {
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));

assertNonWildcardTopic(message1);
assertNonWildcardTopic(message2);
assertNonWildcardTopic(message3);
assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);


} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
if (connection2 != null) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
}
}

@Test
public void verifyRedistribution() throws Exception {
final String TOPIC = "test/+/some/#";
final String clientId = "SubOne";

WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');

setupServer(0, false, isNetty());
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);

setupServer(1, false, isNetty());
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);

// allow redistribution
AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedistributionDelay(0);
servers[0].getConfiguration().addAddressesSetting("#", addressSettings);
servers[1].getConfiguration().addAddressesSetting("#", addressSettings);

setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);

BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId);

// Subscribe to topics
Topic[] topics = {new Topic(TOPIC, QoS.EXACTLY_ONCE)};
connection2.subscribe(topics);

waitForBindings(0, TOPIC, 0, 0, true);
waitForBindings(1, TOPIC, 1, 1, true);

waitForBindings(0, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 0, 0, false);

// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
String payload3 = "This is message 3";

connection1.publish("test/1/some/la", payload1.getBytes(), QoS.EXACTLY_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.EXACTLY_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.EXACTLY_ONCE, false);


waitForMessages(1, TOPIC, 3);

connection2.disconnect();

// force redistribution
connection2 = retrieveMQTTConnection("tcp://localhost:61616", clientId);
connection2.subscribe(topics);

Message message4 = connection2.receive(15, TimeUnit.SECONDS);
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
Message message6 = connection2.receive(5, TimeUnit.SECONDS);

assertEquals(payload1, new String(message4.getPayload()));
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));

assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);

} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
Expand Down Expand Up @@ -189,6 +285,14 @@ public void wildcardsWithBroker1Disconnected() throws Exception {
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));

assertNonWildcardTopic(message1);
assertNonWildcardTopic(message2);
assertNonWildcardTopic(message3);
assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);


} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
Expand All @@ -202,9 +306,31 @@ public void wildcardsWithBroker1Disconnected() throws Exception {
}
}

private void assertNonWildcardTopic(Message message1) {
assertNotNull(message1);
String payload = new String(message1.getPayload());
System.err.println("got payload: " + payload);

assertTrue(payload.contains("message"));
String topic = message1.getTopic();
System.err.println("got topic: " + topic);
assertTrue(!topic.contains("+"));
assertTrue(!topic.contains("*"));
assertTrue(!topic.contains("#"));
}


private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
return retrieveMQTTConnection(host, null);
}

private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
if (clientId != null) {
mqtt.setClientId(clientId);
mqtt.setCleanSession(false);
}
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
Expand Down