From 713c48fd26ce6577b870845ea883b2d90e5915b5 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Wed, 24 Jan 2018 11:03:12 -0500 Subject: [PATCH] ARTEMIS-1633 - fire message routing callbacks for all results Make sure ActiveMQServer plugin implementations are always notified of message route events --- .../artemis/core/postoffice/impl/PostOfficeImpl.java | 9 ++++++--- .../integration/plugin/ConfigurationVerifier.java | 10 ++++++++++ .../tests/integration/plugin/CorePluginTest.java | 6 ++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b0d37d1ea1e..456f20080e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -772,6 +772,8 @@ public RoutingStatus route(final Message message, message.cleanupInternalProperties(); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null); + Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message)); // TODO auto-create queues here? @@ -841,10 +843,7 @@ public RoutingStatus route(final Message message, } } else { try { - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null); processRoute(message, context, direct); - final RoutingStatus finalResult = result; - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null); } catch (ActiveMQAddressFullException e) { if (startedTX.get()) { context.getTransaction().rollback(); @@ -858,6 +857,10 @@ public RoutingStatus route(final Message message, if (startedTX.get()) { context.getTransaction().commit(); } + + final RoutingStatus finalResult = result; + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null); + return result; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ConfigurationVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ConfigurationVerifier.java index 88f0ec392be..e63401fcb57 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ConfigurationVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ConfigurationVerifier.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -40,6 +41,7 @@ public class ConfigurationVerifier implements ActiveMQServerPlugin, Serializable public String value2; public String value3; public AtomicInteger afterSendCounter = new AtomicInteger(); + public AtomicInteger successRoutedCounter = new AtomicInteger(); @Override public void init(Map properties) { @@ -61,4 +63,12 @@ public void afterSend(ServerSession session, afterSendCounter.incrementAndGet(); } + @Override + public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) throws ActiveMQException { + if (result == RoutingStatus.OK) { + successRoutedCounter.incrementAndGet(); + } + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java index 518dd3ffb7b..a89b43a85b9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -129,12 +129,14 @@ public void testSendReceive() throws Exception { BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, - BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, - AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); + verifier.validatePluginMethodsAtLeast(1, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE); assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get()); + assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get()); assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1); }