Skip to content
Permalink
Browse files
ARTEMIS-3833 Preserve JMSCorrelationID of distributed AMQP large mess…
…ages
  • Loading branch information
brusdev authored and clebertsuconic committed May 17, 2022
1 parent f58db5a commit f632e8104bbdae1fbf3658fec47e180784e957da
Showing 2 changed files with 38 additions and 3 deletions.
@@ -245,13 +245,17 @@ protected AMQPMessage(AMQPMessage copy) {

this.headerPosition = copy.headerPosition;
this.encodedHeaderSize = copy.encodedHeaderSize;
this.header = copy.header == null ? null : new Header(copy.header);
this.deliveryAnnotationsPosition = copy.deliveryAnnotationsPosition;
this.encodedDeliveryAnnotationsSize = copy.encodedDeliveryAnnotationsSize;
this.deliveryAnnotations = copy.deliveryAnnotations == null ? null : new DeliveryAnnotations(copy.deliveryAnnotations.getValue());
this.messageAnnotationsPosition = copy.messageAnnotationsPosition;
this.messageAnnotations = copy.messageAnnotations == null ? null : new MessageAnnotations(copy.messageAnnotations.getValue());
this.propertiesPosition = copy.propertiesPosition;
this.properties = copy.properties == null ? null : new Properties(copy.properties);
this.applicationPropertiesPosition = copy.applicationPropertiesPosition;
this.applicationProperties = copy.applicationProperties == null ? null : new ApplicationProperties(copy.applicationProperties.getValue());
this.remainingBodyPosition = copy.remainingBodyPosition;
this.applicationProperties = copy.applicationProperties;
this.messageDataScanned = copy.messageDataScanned;
}

@@ -21,12 +21,14 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -80,6 +82,29 @@ protected boolean isNetty() {

@Test(timeout = RECEIVE_TIMEOUT_MILLIS * (MESSAGES + 1))
public void testSendReceiveLargeMessage() throws Exception {
testSendReceiveLargeMessage(message -> { }, message -> { });
}

@Test(timeout = RECEIVE_TIMEOUT_MILLIS * (MESSAGES + 1))
public void testSendReceiveLargeMessageWithJMSCorrelationID() throws Exception {
final String jmsCorrelationID = "123456";
testSendReceiveLargeMessage(message -> {
try {
message.setJMSCorrelationID(jmsCorrelationID);
} catch (JMSException e) {
fail("Exception not expected: " + e);
}
}, message -> {
try {
Assert.assertEquals(jmsCorrelationID, message.getJMSCorrelationID());
} catch (JMSException e) {
fail("Exception not expected: " + e);
}
});
}


private void testSendReceiveLargeMessage(Consumer<Message> beforeSending, Consumer<Message> afterReceiving) throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);

startServers(0, 1);
@@ -99,12 +124,16 @@ public void testSendReceiveLargeMessage() throws Exception {

String producerUri = "amqp://localhost:61616";
final JmsConnectionFactory producerFactory = new JmsConnectionFactory(producerUri);
try (Connection producerConnection = producerFactory.createConnection(); Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (Connection producerConnection = producerFactory.createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
producerConnection.start();
final Destination queue = producerSession.createQueue(queueName);
String consumerUri = "amqp://localhost:61617";
final JmsConnectionFactory consumerConnectionFactory = new JmsConnectionFactory(consumerUri);
try (Connection consumerConnection = consumerConnectionFactory.createConnection(); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = producerSession.createProducer(queue)) {
try (Connection consumerConnection = consumerConnectionFactory.createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = producerSession.createProducer(queue)) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
consumerConnection.start();
final byte[] largeMessageContent = new byte[MESSAGE_SIZE];
@@ -113,6 +142,7 @@ public void testSendReceiveLargeMessage() throws Exception {
for (int i = 0; i < MESSAGES; i++) {
final BytesMessage sentMessage = producerSession.createBytesMessage();
sentMessage.writeBytes(largeMessageContent);
beforeSending.accept(sentMessage);
producer.send(sentMessage);
final Message receivedMessage = consumer.receive(RECEIVE_TIMEOUT_MILLIS);
Assert.assertNotNull("A message should be received in " + RECEIVE_TIMEOUT_MILLIS + " ms", receivedMessage);
@@ -124,6 +154,7 @@ public void testSendReceiveLargeMessage() throws Exception {
e.printStackTrace();
System.exit(-1);
}
afterReceiving.accept(receivedMessage);
}
}
}

0 comments on commit f632e81

Please sign in to comment.