Skip to content

Commit

Permalink
ARTEMIS-2103 - use the full openwire consumer queue for the mapped vi…
Browse files Browse the repository at this point in the history
…rtual topic queue binding, fix and test
  • Loading branch information
gtully authored and clebertsuconic committed Oct 10, 2018
1 parent 714a3f8 commit b812bfd
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 8 deletions.
Expand Up @@ -657,8 +657,8 @@ public ActiveMQDestination virtualTopicConsumerToFQQN(final ActiveMQDestination
fqqn.append(paths[i]);
}
fqqn.append(CompositeAddress.SEPARATOR);
// consumer queue
for (int i = 0; i < filterPathTerminus; i++) {
// consumer queue - the full vt queue
for (int i = 0; i < paths.length; i++) {
if (i > 0) {
fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
}
Expand Down
4 changes: 2 additions & 2 deletions docs/migration-guide/en/VirtualTopics.md
Expand Up @@ -33,7 +33,7 @@ For example, a default 5.x consumer destination for topic `VirtualTopic.Orders`
would be replaced with an Artemis FQQN comprised of the address and queue.
```
...
Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A");
Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders");
session.createConsumer(subscriptionQueue);
```

Expand All @@ -49,7 +49,7 @@ E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the
However, there is a caveat because this value needs to be encoded in a uri for the xml configuration. Any unsafe url characters
, in this case: ```> ;``` need to be escaped with their hex code point representation; leading to a value of ```Consumer.*.%3E%3B2```.
In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of
```VirtualTopic.Orders::Consumer.A```.
```VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders```.


Durable topic subscribers in a network of brokers
Expand Down
4 changes: 2 additions & 2 deletions docs/user-manual/en/openwire.md
Expand Up @@ -85,7 +85,7 @@ The two parameters are configured on an OpenWire `acceptor`, e.g.:

For existing OpenWire consumers of virtual topic destinations it is possible to
configure a mapping function that will translate the virtual topic consumer
destination into a FQQN address. This address then represents the consumer as a
destination into a FQQN address. This address will then represents the consumer as a
multicast binding to an address representing the virtual topic.

The configuration string property `virtualTopicConsumerWildcards` has two parts
Expand All @@ -103,7 +103,7 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters
```

This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of
`VirtualTopic.Orders::Consumer.A` using the int component `2` of the
`VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the
configuration to identify the consumer queue as the first two paths of the
destination. `virtualTopicConsumerWildcards` is multi valued using a `,`
separator.
Expand Down
Expand Up @@ -18,7 +18,7 @@ Address.
The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor url parameter `virtualTopicConsumerWildcards`,
(see below), Artemis maps the consumer consuming from `Consumer.A.VirtualTopic.Orders` to actually consume from
FQQN of `VirtualTopic.Orders::Consumer.A`
FQQN of `VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders`


```xml
Expand Down
Expand Up @@ -31,7 +31,7 @@
* The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue
* using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor parameter virtualTopicConsumerWildcards
* Artemis maps the consumer consuming from "Consumer.A.VirtualTopic.Orders" to actually consume from
* FQQN "VirtualTopic.Orders::Consumer.A"
* FQQN "VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders"
*/
public class VirtualTopicMappingExample {

Expand Down
Expand Up @@ -84,4 +84,142 @@ public void testAutoVirtualTopicFQQN() throws Exception {
}
}
}

@Test
public void testTwoTopicSubsSameNameAutoVirtualTopicFQQN() throws Exception {
Connection connection = null;

SimpleString topic1 = new SimpleString("VirtualTopic.Orders1");
SimpleString topic2 = new SimpleString("VirtualTopic.Orders2");

this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination1 = session.createTopic(topic1.toString());
Destination destination2 = session.createTopic(topic2.toString());

MessageConsumer messageConsumer1 = session.createConsumer(session.createQueue("Consumer.A." + topic1.toString()));
MessageConsumer messageConsumer2 = session.createConsumer(session.createQueue("Consumer.A." + topic2.toString()));

MessageProducer producer = session.createProducer(null);
TextMessage message = session.createTextMessage("This is a text message to 1");
producer.send(destination1, message);
message = session.createTextMessage("This is a text message to 2");
producer.send(destination2, message);


TextMessage messageReceived1 = (TextMessage) messageConsumer1.receive(2000);
TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(2000);

assertNotNull(messageReceived1);
assertNotNull(messageReceived2);

String text = messageReceived1.getText();
assertEquals("This is a text message to 1", text);

text = messageReceived2.getText();
assertEquals("This is a text message to 2", text);

messageConsumer1.close();
messageConsumer2.close();

} finally {
if (connection != null) {
connection.close();
}
}
}


@Test
public void testAutoVirtualTopicWildcardFQQN() throws Exception {
Connection connection = null;

SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
SimpleString topic = new SimpleString("VirtualTopic.Orders.>");

this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());

MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
// MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString()));

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);

TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);

assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);

messageConsumerA.close();

} finally {
if (connection != null) {
connection.close();
}
}
}

@Test
public void testAutoVirtualTopicWildcardStarFQQN() throws Exception {
Connection connection = null;

SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
SimpleString topic = new SimpleString("VirtualTopic.Orders.*");

this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());

MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);

TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);

assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);

messageConsumerA.close();

} finally {
if (connection != null) {
connection.close();
}
}
}
}

0 comments on commit b812bfd

Please sign in to comment.