Skip to content

Commit

Permalink
ARTEMIS-4235 fix map msg conversion from OpenWire->core
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and gemmellr committed Apr 10, 2023
1 parent e368dac commit 8abdee2
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,13 @@ public static org.apache.activemq.artemis.api.core.Message inbound(final Message
final ActiveMQBuffer body = coreMessage.getBodyBuffer();

final ByteSequence contents = messageSend.getContent();
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (contents != null) {
if (contents == null) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
body.writeByte(DataConstants.NULL);
}
} else {
final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;

import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
Expand All @@ -28,7 +29,9 @@
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageDispatch;
Expand Down Expand Up @@ -135,6 +138,16 @@ public void testProperties() throws Exception {
}
}

@Test
public void testEmptyMapMessage() throws Exception {
CoreMessage artemisMessage = (CoreMessage) OpenWireMessageConverter.inbound(new ActiveMQMapMessage().getMessage(), openWireFormat, null);
assertEquals(Message.MAP_TYPE, artemisMessage.getType());
ActiveMQBuffer buffer = artemisMessage.getDataBuffer();
TypedProperties map = new TypedProperties();
buffer.resetReaderIndex();
map.decode(buffer.byteBuf());
}

@Test
public void testProducerId() throws Exception {
final String PRODUCER_ID = "123:456:789";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -31,6 +32,7 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -193,4 +195,146 @@ private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupp
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenOpenWireAndAMQP() throws Exception {
testEmptyMapMessageConversion(createOpenWireConnection(), createConnection());
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenAMQPAndOpenWire() throws Exception {
testEmptyMapMessageConversion(createConnection(), createOpenWireConnection());
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenCoreAndAMQP() throws Exception {
testEmptyMapMessageConversion(createCoreConnection(), createConnection());
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenAMQPAndCore() throws Exception {
testEmptyMapMessageConversion(createConnection(), createCoreConnection());
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenCoreAndOpenWire() throws Exception {
testEmptyMapMessageConversion(createCoreConnection(), createOpenWireConnection());
}

@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenOpenWireAndCore() throws Exception {
testEmptyMapMessageConversion(createOpenWireConnection(), createCoreConnection());
}

private void testEmptyMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception {
try {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));

Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
MapMessage message = senderSession.createMapMessage();
producer.send(message);

Message received = consumer.receive(1000);

assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of MapMessage", received instanceof MapMessage);
} finally {
senderConnection.close();
consumerConnection.close();
}
}

@Test(timeout = 30000)
public void testMapMessageConversionBetweenAMQPAndOpenWire() throws Exception {
testMapMessageConversion(createConnection(), createOpenWireConnection());
}

@Test(timeout = 30000)
public void testMapMessageConversionBetweenCoreAndAMQP() throws Exception {
testMapMessageConversion(createCoreConnection(), createConnection());
}

@Test(timeout = 30000)
public void testMapMessageConversionBetweenAMQPAndCore() throws Exception {
testMapMessageConversion(createConnection(), createCoreConnection());
}

@Test(timeout = 30000)
public void testMapMessageConversionBetweenCoreAndOpenWire() throws Exception {
testMapMessageConversion(createCoreConnection(), createOpenWireConnection());
}

@Test(timeout = 30000)
public void testMapMessageConversionBetweenOpenWireAndCore() throws Exception {
testMapMessageConversion(createOpenWireConnection(), createCoreConnection());
}

private void testMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception {
final boolean BOOLEAN_VALUE = RandomUtil.randomBoolean();
final String BOOLEAN_KEY = "myBoolean";
final byte BYTE_VALUE = RandomUtil.randomByte();
final String BYTE_KEY = "myByte";
final byte[] BYTES_VALUE = RandomUtil.randomBytes();
final String BYTES_KEY = "myBytes";
final char CHAR_VALUE = RandomUtil.randomChar();
final String CHAR_KEY = "myChar";
final double DOUBLE_VALUE = RandomUtil.randomDouble();
final String DOUBLE_KEY = "myDouble";
final float FLOAT_VALUE = RandomUtil.randomFloat();
final String FLOAT_KEY = "myFloat";
final int INT_VALUE = RandomUtil.randomInt();
final String INT_KEY = "myInt";
final long LONG_VALUE = RandomUtil.randomLong();
final String LONG_KEY = "myLong";
final Boolean OBJECT_VALUE = RandomUtil.randomBoolean();
final String OBJECT_KEY = "myObject";
final short SHORT_VALUE = RandomUtil.randomShort();
final String SHORT_KEY = "myShort";
final String STRING_VALUE = RandomUtil.randomString();
final String STRING_KEY = "myString";

try {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));

Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
MapMessage message = senderSession.createMapMessage();message.setBoolean(BOOLEAN_KEY, BOOLEAN_VALUE);
message.setByte(BYTE_KEY, BYTE_VALUE);
message.setBytes(BYTES_KEY, BYTES_VALUE);
message.setChar(CHAR_KEY, CHAR_VALUE);
message.setDouble(DOUBLE_KEY, DOUBLE_VALUE);
message.setFloat(FLOAT_KEY, FLOAT_VALUE);
message.setInt(INT_KEY, INT_VALUE);
message.setLong(LONG_KEY, LONG_VALUE);
message.setObject(OBJECT_KEY, OBJECT_VALUE);
message.setShort(SHORT_KEY, SHORT_VALUE);
message.setString(STRING_KEY, STRING_VALUE);
producer.send(message);

Message received = consumer.receive(1000);

assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of MapMessage", received instanceof MapMessage);
MapMessage receivedMapMessage = (MapMessage) received;

assertEquals(BOOLEAN_VALUE, receivedMapMessage.getBoolean(BOOLEAN_KEY));
assertEquals(BYTE_VALUE, receivedMapMessage.getByte(BYTE_KEY));
assertEqualsByteArrays(BYTES_VALUE, receivedMapMessage.getBytes(BYTES_KEY));
assertEquals(CHAR_VALUE, receivedMapMessage.getChar(CHAR_KEY));
assertEquals(DOUBLE_VALUE, receivedMapMessage.getDouble(DOUBLE_KEY), 0);
assertEquals(FLOAT_VALUE, receivedMapMessage.getFloat(FLOAT_KEY), 0);
assertEquals(INT_VALUE, receivedMapMessage.getInt(INT_KEY));
assertEquals(LONG_VALUE, receivedMapMessage.getLong(LONG_KEY));
assertTrue(receivedMapMessage.getObject(OBJECT_KEY) instanceof Boolean);
assertEquals(OBJECT_VALUE, receivedMapMessage.getObject(OBJECT_KEY));
assertEquals(SHORT_VALUE, receivedMapMessage.getShort(SHORT_KEY));
assertEquals(STRING_VALUE, receivedMapMessage.getString(STRING_KEY));
} finally {
senderConnection.close();
consumerConnection.close();
}
}
}

0 comments on commit 8abdee2

Please sign in to comment.