Skip to content

Commit

Permalink
ARTEMIS-2262: Correlate management response messages with the request
Browse files Browse the repository at this point in the history
  • Loading branch information
k-wall authored and clebertsuconic committed Mar 8, 2019
1 parent 5af589d commit 95bcfae
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 13 deletions.
Expand Up @@ -266,6 +266,15 @@ default Message setGroupSequence(int sequence) {
return this;
}

default Object getCorrelationID() {
return null;
}

default Message setCorrelationID(Object correlationID) {

return this;
}

SimpleString getReplyTo();

Message setReplyTo(SimpleString address);
Expand Down
Expand Up @@ -297,6 +297,17 @@ public CoreMessage setGroupSequence(int sequence) {
return this.putIntProperty(Message.HDR_GROUP_SEQUENCE, sequence);
}

@Override
public Object getCorrelationID() {
return getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME);
}

@Override
public Message setCorrelationID(final Object correlationID) {
putObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME, correlationID);
return this;
}

/**
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
Expand Down
Expand Up @@ -1147,6 +1147,21 @@ public int getGroupSequence() {
}
}

@Override
public Object getCorrelationID() {
return properties != null ? properties.getCorrelationId() : null;
}

@Override
public org.apache.activemq.artemis.api.core.Message setCorrelationID(final Object correlationID) {
if (properties == null) {
properties = new Properties();
}

properties.setCorrelationId(correlationID);
return this;
}

@Override
public Long getScheduledDeliveryTime() {
if (scheduledTime < 0) {
Expand Down
Expand Up @@ -325,8 +325,13 @@ private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properti
if (properties.getReplyTo() != null) {
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
}
if (properties.getCorrelationId() != null) {
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
} catch (IllegalArgumentException e) {
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
}
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
Expand Down
Expand Up @@ -72,6 +72,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
Expand Down Expand Up @@ -177,14 +178,19 @@ public static AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception {
properties.setReplyTo(toAddress(replyTo));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
String correlationId = message.getJMSCorrelationID();
if (correlationId != null) {

Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
try {
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setCorrelationId(correlationId);
properties.setCorrelationId(correlationID);
}
} else {
properties.setCorrelationId(correlationID);
}

long expiration = message.getJMSExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Enumeration;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
Expand Down Expand Up @@ -118,21 +117,29 @@ public final byte[] getJMSCorrelationIDAsBytes() throws JMSException {

@Override
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
try {
MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
} catch (ActiveMQException e) {
throw new JMSException(e.getMessage());
if (correlationID == null || correlationID.length == 0) {
throw new JMSException("Please specify a non-zero length byte[]");
}
message.setCorrelationID(correlationID);
}

@Override
public final String getJMSCorrelationID() throws JMSException {
return MessageUtil.getJMSCorrelationID(message);

Object correlationID = message.getCorrelationID();
if (correlationID instanceof String) {

return ((String) correlationID);
} else if (correlationID != null) {
return String.valueOf(correlationID);
} else {
return null;
}
}

@Override
public final void setJMSCorrelationID(String correlationID) throws JMSException {
MessageUtil.setJMSCorrelationID(message, correlationID);
message.setCorrelationID(correlationID);
}

@Override
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.management.impl;

import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;

import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
Expand Down Expand Up @@ -390,6 +392,11 @@ public ICoreMessage handleMessage(Message message) throws Exception {
reply.setType(Message.TEXT_TYPE);
reply.setReplyTo(message.getReplyTo());

Object correlationID = getCorrelationIdentity(message);
if (correlationID != null) {
reply.setCorrelationID(correlationID);
}

String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
if (logger.isDebugEnabled()) {
logger.debug("handling management message for " + resourceName);
Expand Down Expand Up @@ -781,5 +788,24 @@ private Object invokeOperation(final String resourceName,
return result;
}

/**
* Correlate management responses using the Correlation ID Pattern, if the request supplied a correlation id,
* or fallback to the Message ID Pattern providing the request had a message id.
* @param request
* @return correlation identify
*/
private Object getCorrelationIdentity(final Message request) {
Object correlationId = request.getCorrelationID();
if (correlationId == null) {
// CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
// change is a nice to have for my point of view. I suggested it for completeness. The application could
// always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part.
Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID);
correlationId = underlying == null ? null : String.valueOf(underlying);
}
return correlationId;
}

// Inner classes -------------------------------------------------
}
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.management.ResourceNames;
Expand All @@ -27,6 +29,8 @@
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
Expand All @@ -39,6 +43,8 @@

public class AmqpManagementTest extends AmqpClientTestSupport {

private static final Binary BINARY_CORRELATION_ID = new Binary("mystring".getBytes(StandardCharsets.UTF_8));

@Test(timeout = 60000)
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = createAmqpClient();
Expand Down Expand Up @@ -101,4 +107,67 @@ public void testUnsignedValues() throws Exception {
msg = createMapMessage(1, map, null);
assertEquals(msg.getByte("sequence"), sequence);
}

@Test(timeout = 60000)
public void testCorrelationByMessageIDUUID() throws Throwable {
doTestReplyCorrelation(UUID.randomUUID(), false);
}

@Test(timeout = 60000)
public void testCorrelationByMessageIDString() throws Throwable {
doTestReplyCorrelation("mystring", false);
}

@Test(timeout = 60000)
public void testCorrelationByMessageIDBinary() throws Throwable {
doTestReplyCorrelation(BINARY_CORRELATION_ID, false);
}

@Test(timeout = 60000)
public void testCorrelationByCorrelationIDUUID() throws Throwable {
doTestReplyCorrelation(UUID.randomUUID(), true);
}

@Test(timeout = 60000)
public void testCorrelationByCorrelationIDString() throws Throwable {
doTestReplyCorrelation("mystring", true);
}

@Test(timeout = 60000)
public void testCorrelationByCorrelationIDBinary() throws Throwable {
doTestReplyCorrelation(BINARY_CORRELATION_ID, true);
}

private void doTestReplyCorrelation(final Object correlationId, final boolean sendCorrelAsCorrelation) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

try {
String destinationAddress = getQueueName(1);
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);

// Create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
if (sendCorrelAsCorrelation) {
request.setRawCorrelationId(correlationId);
} else {
request.setRawMessageId(correlationId);
}
request.setText("[]");

sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(correlationId, response.getRawCorrelationId());
response.accept();
} finally {
connection.close();
}
}
}
Expand Up @@ -31,10 +31,13 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -153,6 +156,49 @@ public void testGetResources() throws Exception {
Assert.assertEquals(queue.getName().toString(), queueControl.getName());
}

@Test
public void testCorrelateResponseByCorrelationID() throws Exception {
String queue = RandomUtil.randomString();
String address = RandomUtil.randomString();
String correlationID = UUIDGenerator.getInstance().generateStringUUID();

Configuration config = createBasicConfig().setJMXManagementEnabled(false);

ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();

// invoke attribute and operation on the server
CoreMessage message = new CoreMessage(1, 100);
MessageUtil.setJMSCorrelationID(message, correlationID);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address);

Message reply = server.getManagementService().handleMessage(message);
Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
Assert.assertEquals(correlationID, MessageUtil.getJMSCorrelationID(reply));
}

@Test
public void testCorrelateResponseByMessageID() throws Exception {
String queue = RandomUtil.randomString();
String address = RandomUtil.randomString();
UUID messageId = UUIDGenerator.getInstance().generateUUID();

Configuration config = createBasicConfig().setJMXManagementEnabled(false);

ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();

// invoke attribute and operation on the server
CoreMessage message = new CoreMessage(1, 100);
message.setUserID(messageId);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address);

Message reply = server.getManagementService().handleMessage(message);
Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
Assert.assertEquals(messageId.toString(), MessageUtil.getJMSCorrelationID(reply));
}


// Package protected ---------------------------------------------

// Protected -----------------------------------------------------
Expand Down

0 comments on commit 95bcfae

Please sign in to comment.