From bbf206214a7564edb7d3aca5fac63fa7467bc06e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 4 Mar 2016 13:21:33 -0500 Subject: [PATCH 1/3] idea --- .../core/impl/ActiveMQSessionContext.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index f72380288aa..029edd1db96 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -24,7 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -99,6 +101,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; +import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.apache.activemq.artemis.utils.VersionLoader; @@ -647,9 +650,28 @@ public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) th sendPacketWithoutLock(sessionChannel, createQueueRequest); } - SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false); + ChannelHandler originalHandler = sessionChannel.getHandler(); + FailoverHandler failoverHandler = new FailoverHandler(); - sendPacketWithoutLock(sessionChannel, createConsumerRequest); + failoverHandler.latch.countUp(); + + sessionChannel.setHandler(failoverHandler); + + try { + + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), true); + + sendPacketWithoutLock(sessionChannel, createConsumerRequest); + + failoverHandler.latch.await(1, TimeUnit.SECONDS); + } + catch (Exception e) { + // Remove me.. just to compile the idea + e.printStackTrace(); + } + finally { + sessionChannel.setHandler(originalHandler); + } int clientWindowSize = consumerInternal.getClientWindowSize(); @@ -734,6 +756,14 @@ protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessag handleReceiveProducerFailCredits(message.getAddress(), message.getCredits()); } + class FailoverHandler implements ChannelHandler { + + public ReusableLatch latch = new ReusableLatch(0); + @Override + public void handlePacket(Packet packet) { + new Exception("Handler here " + packet).printStackTrace(); + } + } class ClientSessionPacketHandler implements ChannelHandler { @Override From 1dd45e2bb69722d309b66fdc6c52d44b9a859dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ville=20Skytt=C3=A4?= Date: Sat, 5 Mar 2016 12:57:09 +0200 Subject: [PATCH 2/3] Document behavior with both exclusive/non-exclusive diverts present --- docs/user-manual/en/diverts.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/user-manual/en/diverts.md b/docs/user-manual/en/diverts.md index 22282c519c2..9df9e451a76 100644 --- a/docs/user-manual/en/diverts.md +++ b/docs/user-manual/en/diverts.md @@ -17,6 +17,10 @@ may be a requirement to monitor every order sent to an order queue. Diverts can also be configured to have an optional message filter. If specified then only messages that match the filter will be diverted. +When an address has both exclusive and non-exclusive diverts configured, +the exclusive ones are processed first. If any of the exclusive diverts +diverted the message, the non-exclusive ones are not processed. + Diverts can also be configured to apply a `Transformer`. If specified, all diverted messages will have the opportunity of being transformed by the `Transformer`. From f26ac50731b8de7152291adcd69e6e8811baeca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ville=20Skytt=C3=A4?= Date: Sat, 5 Mar 2016 12:58:07 +0200 Subject: [PATCH 3/3] Document transformer result non-visibility to other diverts --- docs/user-manual/en/diverts.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/user-manual/en/diverts.md b/docs/user-manual/en/diverts.md index 9df9e451a76..75c95bdd5d2 100644 --- a/docs/user-manual/en/diverts.md +++ b/docs/user-manual/en/diverts.md @@ -23,7 +23,10 @@ diverted the message, the non-exclusive ones are not processed. Diverts can also be configured to apply a `Transformer`. If specified, all diverted messages will have the opportunity of being transformed by -the `Transformer`. +the `Transformer`. When an address has multiple diverts configured, all +of them receive the same, original message. This means that the results +of a transformer on a message are not directly available for other +diverts or their filters on the same address. A divert will only divert a message to an address on the *same server*, however, if you want to divert to an address on a different server, a