From 7e0cbb75d469a94b93db41f3cf6c9cabbf4dd2db Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Thu, 14 Dec 2017 13:44:51 +0800 Subject: [PATCH] ARTEMIS-1558 Message Grouping Openwire Interoperability Issue Openwire message grouping doesn't work because the groupID of a message is not passed correctly. --- .../openwire/OpenWireMessageConverter.java | 4 +- .../cluster/TemporaryQueueClusterTest.java | 1 - .../openwire/OpenWireGroupingTest.java | 150 ++++++++++++++++++ 3 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireGroupingTest.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 2f9fee4ef59..a5bb0f98c21 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -80,7 +80,7 @@ public class OpenWireMessageConverter implements MessageConverter params() { + return Arrays.asList(new Object[][]{{true, true}, + {true, false}, + {false, true}, + {false, false}}); + } + + public OpenWireGroupingTest(boolean coreSend, boolean coreReceive) { + this.coreSend = coreSend; + this.coreReceive = coreReceive; + } + + @Test + public void testGrouping() throws Exception { + + String jmsxgroupID = null; + + ConnectionFactory sendFact = coreSend ? coreCf : factory; + ConnectionFactory receiveFact = coreReceive ? coreCf : factory; + + final int num = 10; + try (Connection coreConn = sendFact.createConnection()) { + + Session session = coreConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + + for (int j = 0; j < num; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + setProperty(message); + + producer.send(message); + + String prop = message.getStringProperty("JMSXGroupID"); + + assertNotNull(prop); + + if (jmsxgroupID != null) { + assertEquals(jmsxgroupID, prop); + } else { + jmsxgroupID = prop; + } + } + } + try (Connection connection = receiveFact.createConnection()) { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + MessageConsumer consumer3 = session.createConsumer(queue); + + connection.start(); + + List otherConsumers = new ArrayList<>(); + otherConsumers.add(consumer1); + otherConsumers.add(consumer2); + otherConsumers.add(consumer3); + + //find out which one broker picks up + MessageConsumer groupConsumer = null; + for (MessageConsumer consumer : otherConsumers) { + TextMessage tm = (TextMessage) consumer.receive(2000); + if (tm != null) { + assertEquals("Message" + 0, tm.getText()); + otherConsumers.remove(consumer); + groupConsumer = consumer; + break; + } + } + assertNotNull(groupConsumer); + + //All msgs should go to the group consumer + for (int j = 1; j < num; j++) { + + TextMessage tm = (TextMessage) groupConsumer.receive(2000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID); + } + + for (MessageConsumer consumer : otherConsumers) { + assertNull(consumer.receive(100)); + } + } + + } + + protected void setProperty(Message message) { + if (coreSend) { + ((ActiveMQMessage) message).getCoreMessage().putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, new SimpleString("foo")); + } else { + org.apache.activemq.command.ActiveMQMessage m = (org.apache.activemq.command.ActiveMQMessage) message; + m.setGroupID("foo"); + } + } +}