From 982186b16e061be3d7c7dbffabf5fae0ef72ccc5 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Mon, 24 Oct 2016 14:41:19 +0100 Subject: [PATCH] ARTEMIS-817 and ARTEMIS-818 https://issues.apache.org/jira/browse/ARTEMIS-817 https://issues.apache.org/jira/browse/ARTEMIS-818 issues around Openwire protocol, sending a null stream maessage via openwire causes a null pointer and if a topic is auto created with openwire then it cant be destroyed as it checks for the management queue. --- .../protocol/openwire/OpenWireConnection.java | 9 ++++++- .../openwire/OpenWireMessageConverter.java | 25 +++++++++++-------- .../openwire/SimpleOpenWireTest.java | 23 +++++++++++++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index c6582bd4e00..33418e6c715 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -815,7 +815,14 @@ public AMQSession getSession(SessionId sessionId) { public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { - server.destroyQueue(OpenWireUtil.toCoreAddress(dest)); + try { + server.destroyQueue(OpenWireUtil.toCoreAddress(dest)); + } catch (ActiveMQNonExistentQueueException neq) { + //this is ok, ActiveMQ 5 allows this and will actually do it quite often + ActiveMQServerLogger.LOGGER.debug("queue never existed"); + } + + } else { Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest)); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 131cfd13cb5..f49c972ca90 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -499,18 +499,21 @@ private static ActiveMQMessage toAMQMessage(MessageReference reference, } } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { TypedProperties mapData = new TypedProperties(); - mapData.decode(buffer); - - Map map = mapData.getMap(); - ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); - OutputStream os = out; - if (isCompressed) { - os = new DeflaterOutputStream(os); - } - try (DataOutputStream dataOut = new DataOutputStream(os)) { - MarshallingSupport.marshalPrimitiveMap(map, dataOut); + //it could be a null map + if (buffer.readableBytes() > 0) { + mapData.decode(buffer); + Map map = mapData.getMap(); + ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); + OutputStream os = out; + if (isCompressed) { + os = new DeflaterOutputStream(os); + } + try (DataOutputStream dataOut = new DataOutputStream(os)) { + MarshallingSupport.marshalPrimitiveMap(map, dataOut); + } + bytes = out.toByteArray(); } - bytes = out.toByteArray(); + } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { int len = buffer.readInt(); bytes = new byte[len]; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 9a2b8be8fd0..81f0f1bdd15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -19,6 +19,7 @@ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -127,6 +128,28 @@ public void testSendEmpty() throws Exception { } } + @Test + public void testSendNullMapMessage() throws Exception { + try (Connection connection = factory.createConnection()) { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + System.out.println("Queue:" + queue); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + producer.send(session.createMapMessage()); + + Assert.assertNull(consumer.receive(100)); + connection.start(); + + MapMessage message = (MapMessage) consumer.receive(5000); + + Assert.assertNotNull(message); + + message.acknowledge(); + } + } + @Test public void testXASimple() throws Exception { XAConnection connection = xaFactory.createXAConnection();