Skip to content

Commit

Permalink
ARTEMIS-3670 support diverting to multiple addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 4, 2022
1 parent 8063110 commit e184038
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
Expand All @@ -41,6 +41,8 @@ public class DivertImpl implements Divert {

private volatile SimpleString forwardAddress;

private final SimpleString[] forwardAddresses;

private final SimpleString uniqueName;

private final SimpleString routingName;
Expand Down Expand Up @@ -69,6 +71,12 @@ public DivertImpl(final SimpleString uniqueName,

this.forwardAddress = forwardAddress;

String[] split = forwardAddress.toString().split(",");
forwardAddresses = new SimpleString[split.length];
for (int i = 0; i < split.length; i++) {
forwardAddresses[i] = new SimpleString(split[i].trim());
}

this.uniqueName = uniqueName;

this.routingName = routingName;
Expand All @@ -91,51 +99,53 @@ public void route(final Message message, final RoutingContext context) throws Ex
// We must make a copy of the message, otherwise things like returning credits to the page won't work
// properly on ack, since the original address will be overwritten

if (logger.isTraceEnabled()) {
logger.trace("Diverting message " + message + " into " + this);
}
for (SimpleString forwardAddress : forwardAddresses) {
if (logger.isTraceEnabled()) {
logger.trace("Diverting message " + message + " into " + this);
}

context.setReusable(false);
context.setReusable(false);

Message copy = null;
Message copy = null;

// Shouldn't copy if it's not routed anywhere else
if (!forwardAddress.equals(context.getAddress(message))) {
long id = storageManager.generateID();
copy = message.copy(id);
// Shouldn't copy if it's not routed anywhere else
if (!forwardAddress.equals(context.getAddress(message))) {
long id = storageManager.generateID();
copy = message.copy(id);

// This will set the original MessageId, and the original address
copy.referenceOriginalMessage(message, this.getUniqueName());
// This will set the original MessageId, and the original address
copy.referenceOriginalMessage(message, this.getUniqueName());

copy.setAddress(forwardAddress);
copy.setAddress(forwardAddress);

copy.setExpiration(message.getExpiration());
copy.setExpiration(message.getExpiration());

switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
break;
case MULTICAST:
copy.setRoutingType(RoutingType.MULTICAST);
break;
case STRIP:
copy.setRoutingType(null);
break;
case PASS:
break;
}
switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
break;
case MULTICAST:
copy.setRoutingType(RoutingType.MULTICAST);
break;
case STRIP:
copy.setRoutingType(null);
break;
case PASS:
break;
}

if (transformer != null) {
copy = transformer.transform(copy);
if (transformer != null) {
copy = transformer.transform(copy);
}

// We call reencode at the end only, in a single call.
copy.reencode();
} else {
copy = message;
}

// We call reencode at the end only, in a single call.
copy.reencode();
} else {
copy = message;
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
}

postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
}

@Override
Expand Down
24 changes: 19 additions & 5 deletions docs/user-manual/en/diverts.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
Apache ActiveMQ Artemis allows you to configure objects called *diverts* with some
simple server configuration.

Diverts allow you to transparently divert messages routed to one address
to some other address, without making any changes to any client
application logic.
Diverts allow you to transparently divert messages routed to one address to one
or more other addresses, without making any changes to any client application
logic.

Diverts can be *exclusive*, meaning that the message is diverted to the
new address, and does not go to the old address at all, or they can be
new address(es), and does not go to the old address at all, or they can be
*non-exclusive* which means the message continues to go the old address,
and a *copy* of it is also sent to the new address. Non-exclusive
and a *copy* of it is also sent to the new address(es). Non-exclusive
diverts can therefore be used for *splitting* message flows, e.g. there
may be a requirement to monitor every order sent to an order queue.

Expand Down Expand Up @@ -133,3 +133,17 @@ non-exclusive divert, again from the divert example:
The above divert example takes a copy of every message sent to the
address '`orders`' and sends it to a local address called
'`spyTopic`'.

## Composite Divert

A _composite_ divert is one which forwards messages to multiple addresses. This
pattern is sometimes referred to as _fan-out_. Configuration is simple. Just
use a comma separated list in `forwarding-address`, e.g.:

```xml
<divert name="shipping-divert">
<address>shipping</address>
<forwarding-address>dallas, chicago, denver</forwarding-address>
<exclusive>false</exclusive>
</divert>
```
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
Expand All @@ -49,11 +49,9 @@
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.api.core.RoutingType;

import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
Expand Down Expand Up @@ -765,6 +763,74 @@ public void testSingleExclusiveDivert() throws Exception {
Assert.assertNull(consumer4.receiveImmediate());
}

@Test
public void testCompositeDivert() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress1 = "forwardAddress1";
final String forwardAddress2 = "forwardAddress2";
final String forwardAddress3 = "forwardAddress3";
final String forwardAddresses = forwardAddress1 + ", " + forwardAddress2 + ", " + forwardAddress3;

Configuration config = createDefaultInVMConfig().addDivertConfiguration(new DivertConfiguration()
.setName("divert1")
.setRoutingName("divert1")
.setAddress(testAddress)
.setForwardingAddress(forwardAddresses)
.setExclusive(true));

ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));

server.start();

ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = sf.createSession(false, true, true);

final SimpleString queueName1 = new SimpleString("queue1");
final SimpleString queueName2 = new SimpleString("queue2");
final SimpleString queueName3 = new SimpleString("queue3");

session.createQueue(new QueueConfiguration(queueName1).setAddress(forwardAddress1).setDurable(false));
session.createQueue(new QueueConfiguration(queueName2).setAddress(forwardAddress2).setDurable(false));
session.createQueue(new QueueConfiguration(queueName3).setAddress(forwardAddress3).setDurable(false));

session.start();

ClientProducer producer = session.createProducer(new SimpleString(testAddress));

final int numMessages = 10;

final SimpleString propKey = new SimpleString("testkey");

for (int i = 0; i < numMessages; i++) {
ClientMessage message = session.createMessage(false);
message.putIntProperty(propKey, i);
producer.send(message);
}

ClientConsumer consumer1 = session.createConsumer(queueName1);
ClientConsumer consumer2 = session.createConsumer(queueName2);
ClientConsumer consumer3 = session.createConsumer(queueName3);

ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2, consumer3};

for (int i = 0; i < numMessages; i++) {
for (int j = 0; j < consumers.length; j++) {
ClientMessage message = consumers[j].receive(DivertTest.TIMEOUT);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
Assert.assertEquals("forwardAddress" + (j + 1), message.getAddress());
Assert.assertEquals("testAddress", message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
message.acknowledge();
}
}

Assert.assertNull(consumer1.receiveImmediate());
Assert.assertNull(consumer2.receiveImmediate());
Assert.assertNull(consumer3.receiveImmediate());
}

@Test
public void testSinglePersistedDivert() throws Exception {
final String testAddress = "testAddress";
Expand Down

0 comments on commit e184038

Please sign in to comment.