From 5ad68ba0fbd4651bab68fd3fe334cace91de5659 Mon Sep 17 00:00:00 2001 From: gtully Date: Mon, 26 Jul 2021 22:54:36 +0100 Subject: [PATCH 1/2] ARTEMIS-3223 - ensure distribution uses the address from the message, rather than the address from the queue which may be a wildcard sub and not valid for publishng on, fix and test --- .../core/postoffice/impl/PostOfficeImpl.java | 19 +-- .../core/server/ActiveMQServerLogger.java | 4 + .../server/cluster/impl/Redistributor.java | 1 + .../AmqpBridgeClusterRedistributionTest.java | 4 +- .../imported/MqttClusterWildcardTest.java | 126 ++++++++++++++++++ 5 files changed, 135 insertions(+), 19 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index af7dda61617..5d5bcd9c31a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1289,29 +1289,14 @@ public MessageReference reload(final Message message, final Queue queue, final T public Pair redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { - Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); if (bindings != null && bindings.allowRedistribute()) { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 Message copyRedistribute = message.copy(storageManager.generateID()); - copyRedistribute.setAddress(originatingQueue.getAddress()); - - if (tx != null) { - tx.addOperation(new TransactionOperationAbstract() { - @Override - public void afterRollback(Transaction tx) { - try { - //this will cause large message file to be - //cleaned up - // copyRedistribute.refDown(); - } catch (Exception e) { - logger.warn("Failed to clean up message: " + copyRedistribute); - } - } - }); - } + copyRedistribute.setAddress(message.getAddress()); RoutingContext context = new RoutingContextImpl(tx); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6bae2956b69..6e62c72b81c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1755,6 +1755,10 @@ void slowConsumerDetected(String sessionID, @Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT) void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222303, value = "Redistribution by {0} of messageID = {1} failed", format = Message.Format.MESSAGE_FORMAT) + void errorRedistributing(@Cause Throwable t, String queueName, long m); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 79820186bb9..15a1b54a17a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -178,6 +178,7 @@ public void run() { queue.deliverAsync(); } } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID()); try { tx.rollback(); } catch (Exception e2) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java index 91f6a2485b9..52fe95f5cda 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java @@ -136,8 +136,8 @@ public void setUp() throws Exception { bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications"); notificationsQueue = SimpleString.toSimpleString("Notifications"); - setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2); - setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1); + setupClusterConnection("cluster-1->2", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2); + setupClusterConnection("cluster-2->1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1); server0.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java index ab8d30c271b..5a8d314f40f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -100,6 +101,101 @@ public void loadBalanceRequests() throws Exception { assertEquals(payload2, new String(message5.getPayload())); assertEquals(payload3, new String(message6.getPayload())); + assertNonWildcardTopic(message1); + assertNonWildcardTopic(message2); + assertNonWildcardTopic(message3); + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + + + } finally { + String[] topics = new String[]{TOPIC}; + if (connection1 != null) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + } + + @Test + public void verifyRedistribution() throws Exception { + final String TOPIC = "test/+/some/#"; + final String clientId = "SubOne"; + + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + wildcardConfiguration.setAnyWords('#'); + wildcardConfiguration.setDelimiter('/'); + wildcardConfiguration.setRoutingEnabled(true); + wildcardConfiguration.setSingleWord('+'); + + setupServer(0, false, isNetty()); + servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + setupServer(1, false, isNetty()); + servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + // allow redistribution + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setRedistributionDelay(0); + servers[0].getConfiguration().addAddressesSetting("#", addressSettings); + servers[1].getConfiguration().addAddressesSetting("#", addressSettings); + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + connection1 = retrieveMQTTConnection("tcp://localhost:61616"); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId); + + // Subscribe to topics + Topic[] topics = {new Topic(TOPIC, QoS.EXACTLY_ONCE)}; + connection2.subscribe(topics); + + waitForBindings(0, TOPIC, 0, 0, true); + waitForBindings(1, TOPIC, 1, 1, true); + + waitForBindings(0, TOPIC, 1, 1, false); + waitForBindings(1, TOPIC, 0, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.EXACTLY_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.EXACTLY_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.EXACTLY_ONCE, false); + + + waitForMessages(1, TOPIC, 3); + + connection2.disconnect(); + + // force redistribution + connection2 = retrieveMQTTConnection("tcp://localhost:61616", clientId); + connection2.subscribe(topics); + + Message message4 = connection2.receive(15, TimeUnit.SECONDS); + Message message5 = connection2.receive(5, TimeUnit.SECONDS); + Message message6 = connection2.receive(5, TimeUnit.SECONDS); + + assertEquals(payload1, new String(message4.getPayload())); + assertEquals(payload2, new String(message5.getPayload())); + assertEquals(payload3, new String(message6.getPayload())); + + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + } finally { String[] topics = new String[]{TOPIC}; if (connection1 != null) { @@ -189,6 +285,14 @@ public void wildcardsWithBroker1Disconnected() throws Exception { assertEquals(payload2, new String(message5.getPayload())); assertEquals(payload3, new String(message6.getPayload())); + assertNonWildcardTopic(message1); + assertNonWildcardTopic(message2); + assertNonWildcardTopic(message3); + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + + } finally { String[] topics = new String[]{TOPIC}; if (connection1 != null) { @@ -202,9 +306,31 @@ public void wildcardsWithBroker1Disconnected() throws Exception { } } + private void assertNonWildcardTopic(Message message1) { + assertNotNull(message1); + String payload = new String(message1.getPayload()); + System.err.println("got payload: " + payload); + + assertTrue(payload.contains("message")); + String topic = message1.getTopic(); + System.err.println("got topic: " + topic); + assertTrue(!topic.contains("+")); + assertTrue(!topic.contains("*")); + assertTrue(!topic.contains("#")); + } + + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + return retrieveMQTTConnection(host, null); + } + + private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); + if (clientId != null) { + mqtt.setClientId(clientId); + mqtt.setCleanSession(false); + } BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); return connection; From 303897e105380a61dc7ce45fd37c572949561700 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 27 Jul 2021 12:44:05 +0100 Subject: [PATCH 2/2] ARTEMIS-1612 - strip any acceptor matching prefix from the message address such that further routing will match the lack of prefixes in broker routing, different fix for redistribution with prefixes --- .../artemis/core/server/impl/ServerSessionImpl.java | 7 ++++++- .../artemis/tests/integration/client/CoreClientTest.java | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index dfc7771da1c..6e7805ab812 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -2166,7 +2166,12 @@ public synchronized RoutingStatus doSend(final Transaction tx, } } */ - AddressInfo art = getAddressAndRoutingType(new AddressInfo(msg.getAddressSimpleString(), routingType)); + final AddressInfo targetFromMessage = new AddressInfo(msg.getAddressSimpleString(), routingType); + AddressInfo art = getAddressAndRoutingType(targetFromMessage); + if (art != targetFromMessage) { + // remove the prefix from the message, with the address model change, only non prefixed addresses exist on the broker + msg.setAddress(art.getName()); + } // check the user has write access to this address. try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java index cc9894f2815..9ef50b75acf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java @@ -258,6 +258,8 @@ public void internalTestCoreClientPrefixes(boolean security) throws Exception { for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) { ClientMessage message = consumer.receive(1000); assertNotNull(message); + // this seems to be the only assert of this non requirement + assertFalse(message.getAddress().contains(queuePrefix)); message.acknowledge(); } assertNull(consumer.receiveImmediate());