From cf7431cca28d0cc2d3e89fe455bc79c542b8dfed Mon Sep 17 00:00:00 2001 From: Otavio Rodolfo Piske Date: Wed, 8 Mar 2017 15:28:36 +0100 Subject: [PATCH] Add a test for MQTT will message with non-retain flag This patch adds a new test that verifies if the broker is able to send a will message if the retain flag is set to false. --- .../integration/mqtt/imported/MQTTTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 91db1d2a013..49f37fb8638 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ConcurrentHashSet; @@ -1066,6 +1067,49 @@ public boolean isSatisfied() throws Exception { } + @Test(timeout = 60 * 1000) + public void testWillMessageIsReceivedWithNonRetain() throws Exception { + getServer().createQueue(SimpleString.toSimpleString("will"), RoutingType.MULTICAST, SimpleString.toSimpleString("will"), null, true, false); + + MQTT mqtt = createMQTTConnection("1", false); + mqtt.setKeepAlive((short) 1); + mqtt.setWillMessage("test message with non-retain"); + mqtt.setWillTopic("will"); + mqtt.setWillQos(QoS.AT_LEAST_ONCE); + mqtt.setWillRetain(false); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return connection.isConnected(); + } + }); + + // kill transport + final CountDownLatch latch = new CountDownLatch(1); + server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() { + @Override + public void connectionClosed() { + latch.countDown(); + } + }); + connection.kill(); + latch.await(10, TimeUnit.SECONDS); + + MQTT mqtt2 = createMQTTConnection("2", false); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)}); + + Message m = connection2.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(m); + m.ack(); + assertEquals("test message with non-retain", new String(m.getPayload())); + } + + @Test(timeout = 60 * 1000) public void testCleanSession() throws Exception { final String CLIENTID = "cleansession";