Skip to content

Commit

Permalink
This closes #3269
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Sep 21, 2020
2 parents b896908 + d9d98df commit ddef389
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ public void serverSend(ProtonServerReceiverContext context,

context.incrementSettle();

RoutingType routingType = null;
if (address != null) {
message.setAddress(address);
} else {
Expand All @@ -474,10 +475,15 @@ public void serverSend(ProtonServerReceiverContext context,
rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}

routingType = message.getRoutingType();
}

//here check queue-autocreation
RoutingType routingType = context.getRoutingType(receiver, address);
if (routingType == null) {
routingType = context.getRoutingType(receiver, address);
}

if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;

Expand Down Expand Up @@ -611,6 +612,25 @@ public void setText(String value) throws IllegalStateException {
getWrappedMessage().setBody(body);
}

/**
* Attempts to retrieve the message body as a String from an AmqpValue body.
*
* @return the string
* @throws NoSuchElementException if the body does not contain a AmqpValue with String.
*/
public String getText() throws NoSuchElementException {
Section body = getWrappedMessage().getBody();
if (body instanceof AmqpValue) {
AmqpValue value = (AmqpValue) body;

if (value.getValue() instanceof String) {
return (String) value.getValue();
}
}

throw new NoSuchElementException("Message does not contain a String body");
}

/**
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
Expand All @@ -28,6 +34,10 @@

public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {

private static final String AUTO_CREATION_QUEUE_PREFIX = "AmqpAnonymousRelayTest-AutoCreateQueues.";
private static final String AUTO_CREATION_TOPIC_PREFIX = "AmqpAnonymousRelayTest-AutoCreateTopics.";

// Disable auto-creation in the general config created by the superclass, we add specific prefixed areas with it enabled
@Override
protected boolean isAutoCreateQueues() {
return false;
Expand All @@ -38,6 +48,232 @@ protected boolean isAutoCreateAddresses() {
return false;
}

// Additional address configuration for auto creation of queues and topics
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
super.configureAddressPolicy(server);

AddressSettings autoCreateQueueAddressSettings = new AddressSettings();
autoCreateQueueAddressSettings.setAutoCreateQueues(true);
autoCreateQueueAddressSettings.setAutoCreateAddresses(true);
autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST);

server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX + "#", autoCreateQueueAddressSettings);

AddressSettings autoCreateTopicAddressSettings = new AddressSettings();
autoCreateTopicAddressSettings.setAutoCreateQueues(true);
autoCreateTopicAddressSettings.setAutoCreateAddresses(true);
autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST);

server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX + "#", autoCreateTopicAddressSettings);
}

@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

// We use an address in the QUEUE prefixed auto-creation area to ensure the broker picks this up
// and creates a queue, in the absense of any other message annotation / terminus capability config.
String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName();
try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(queueName);
message.setText(getTestName());

AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertFalse(addressQueryResult.isExists());

sender.send(message);
sender.close();

addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.isAutoCreated());

// Create a receiver and verify it can consume the message from the auto-created queue
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals(getTestName(), received.getText());
received.accept();

receiver.close();
} finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

// We use an address in the TOPIC prefixed auto-creation area to ensure the broker picks this up
// and creates a topic, in the absense of any other message annotation / terminus capability config.
String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName();

try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();

message.setAddress(topicName);
message.setText("creating-topic-address");

AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertFalse(addressQueryResult.isExists());

sender.send(message);

addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertTrue(addressQueryResult.isAutoCreated());

// Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
AmqpReceiver receiver1 = session.createReceiver(topicName);
AmqpReceiver receiver2 = session.createReceiver(topicName);
receiver1.flow(1);
receiver2.flow(1);

AmqpMessage message2 = new AmqpMessage();
message2.setAddress(topicName);
message2.setText(getTestName());

sender.send(message2);

AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received1);
assertEquals(getTestName(), received1.getText());
received1.accept();

AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received2);
assertEquals(getTestName(), received2.getText());
received1.accept();

receiver1.close();
receiver2.close();
sender.close();
} finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.QUEUE_TYPE);

// We deliberately use the TOPIC prefixed auto-creation area, not the QUEUE prefix, to ensure
// we get a queue because the broker inspects the value we send on the message, and not just
// because it was taken as a default from the address settings.
String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName();

message.setAddress(queueName);
message.setText(getTestName());

AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertFalse(addressQueryResult.isExists());

sender.send(message);
sender.close();

addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.isAutoCreated());

// Create a receiver and verify it can consume the message from the auto-created queue
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals(getTestName(), received.getText());
received.accept();

receiver.close();
} finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

try {
AmqpSession session = connection.createSession();

AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);

// We deliberately use the QUEUE prefixed auto-creation area, not the TOPIC prefix, to ensure
// we get a topic because the broker inspects the value we send on the message, and not just
// because it was taken as a default from the address settings.
String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName();
message.setAddress(topicName);
message.setText("creating-topic-address");

AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertFalse(addressQueryResult.isExists());

sender.send(message);

addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertTrue(addressQueryResult.isAutoCreated());

// Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
AmqpReceiver receiver1 = session.createReceiver(topicName);
AmqpReceiver receiver2 = session.createReceiver(topicName);
receiver1.flow(1);
receiver2.flow(1);

AmqpMessage message2 = new AmqpMessage();
message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);
message2.setAddress(topicName);
message2.setText(getTestName());

sender.send(message2);

AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received1);
assertEquals(getTestName(), received1.getText());
received1.accept();

AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received2);
assertEquals(getTestName(), received2.getText());
received1.accept();

receiver1.close();
receiver2.close();
sender.close();
} finally {
connection.close();
}
}

@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@

public class JMSMessageProducerTest extends JMSClientTestSupport {

@Test(timeout = 30000)
public void testAnonymousProducerWithQueueAutoCreation() throws Exception {
Connection connection = createConnection();

try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = UUID.randomUUID().toString() + ":" + getQueueName();
Queue queue = session.createQueue(queueName);
MessageProducer p = session.createProducer(null);

TextMessage message = session.createTextMessage();
message.setText(getTestName());
// This will auto-create the address, and be retained for subsequent consumption
p.send(queue, message);

{
MessageConsumer consumer = session.createConsumer(queue);
p.send(queue, message);
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close();
}
} finally {
connection.close();
}
}

@Test(timeout = 30000)
public void testAnonymousProducer() throws Exception {
Connection connection = createConnection();
Expand Down Expand Up @@ -71,25 +100,32 @@ public void testAnonymousProducer() throws Exception {
}

@Test(timeout = 30000)
public void testAnonymousProducerWithAutoCreation() throws Exception {
public void testAnonymousProducerWithTopicAutoCreation() throws Exception {
Connection connection = createConnection();

try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(UUID.randomUUID().toString());
String topicName = UUID.randomUUID().toString() + ":" + getQueueName();
Topic topic = session.createTopic(topicName);
MessageProducer p = session.createProducer(null);

TextMessage message = session.createTextMessage();
message.setText("hello");
// this will auto-create the address
message.setText("creating-topic-address");
// This will auto-create the address, but msg will be discarded as there are no consumers
p.send(topic, message);

{
// This will create a new consumer, on the topic address, verifying it can attach
// and then receives a further sent message
MessageConsumer consumer = session.createConsumer(topic);
p.send(topic, message);
Message message2 = message = session.createTextMessage(getTestName());

p.send(topic, message2);

Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close();
}
} finally {
Expand Down

0 comments on commit ddef389

Please sign in to comment.