diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index cfc4aa64aed..5f6570ddc1a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; @@ -943,7 +944,7 @@ private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, final AMQConsumer consumer) throws IOException { for (SimpleString s : props) { final String keyStr = s.toString(); - if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { + if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { continue; } final Object prop = coreMessage.getObjectProperty(s); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index 75b4cecb254..f851872719a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -23,9 +23,13 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; import java.util.Collection; import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -51,6 +55,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.command.ActiveMQTopic; import org.junit.Assert; import org.junit.Test; @@ -58,6 +63,41 @@ public class DivertTest extends ActiveMQTestBase { private static final int TIMEOUT = 3000; + @Test + public void testDivertedNotificationMessagePropertiesOpenWire() throws Exception { + final String testAddress = ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress().toString(); + + final String forwardAddress = "forwardAddress"; + + DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setFilterString("_AMQ_NotifType = 'CONSUMER_CREATED' OR _AMQ_NotifType = 'CONSUMER_CLOSED'"); + + Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf); + + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false)); + + server.start(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + connectionFactory.setClientID("myClientID"); + + Topic forwardTopic = new ActiveMQTopic(forwardAddress); + Connection connection = connectionFactory.createConnection(); + + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber subscriber = session.createDurableSubscriber(forwardTopic, "mySubscriptionName"); + + javax.jms.Message message = subscriber.receive(DivertTest.TIMEOUT); + + connection.close(); + + Assert.assertNotNull(message); + + Assert.assertEquals("CONSUMER_CREATED", message.getStringProperty("_AMQ_NotifType")); + } + @Test public void testSingleNonExclusiveDivert() throws Exception { final String testAddress = "testAddress";