Skip to content
Permalink
Browse files
ARTEMIS-3851 MQTT sub q exists after restart despite CleanSession=1
MQTT 3.1 and 3.1.1 clients using a clean session should have a
*non-durable* subscription queue. If the broker restarts the queue
should be removed. This is due to [MQTT-3.1.2-6] which states that the
session (and any state) must last only as long as the network
connection.
  • Loading branch information
jbertram authored and clebertsuconic committed Jun 14, 2022
1 parent 3002dce commit c9208aafda81e15c31bbcda3a99557f3fa29dd09
Showing 5 changed files with 60 additions and 7 deletions.
@@ -119,7 +119,7 @@ private void addSubscription(MqttTopicSubscription subscription, Integer subscri
int qos = subscription.qualityOfService().value();
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, session.getWildcardConfiguration());

Queue q = createQueueForSubscription(coreAddress, qos, sharedSubscriptionName);
Queue q = createQueueForSubscription(coreAddress, sharedSubscriptionName);

if (initialStart) {
createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null);
@@ -153,7 +153,7 @@ synchronized void stop() throws Exception {
}
}

private Queue createQueueForSubscription(String address, int qos, String sharedSubscriptionName) throws Exception {
private Queue createQueueForSubscription(String address, String sharedSubscriptionName) throws Exception {
// determine the proper queue name
SimpleString queue;
if (sharedSubscriptionName != null) {
@@ -184,14 +184,20 @@ private Queue createQueueForSubscription(String address, int qos, String sharedS
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
}
return q;
}

private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception {
private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue) throws Exception {
/*
* MQTT 3.1 and 3.1.1 clients using a clean session should have a *non-durable* subscription queue. If the broker
* restarts the queue should be removed. This is due to [MQTT-3.1.2-6] which states that the session (and any
* state) must last only as long as the network connection.
*/
boolean durable = session.getVersion() == MQTTVersion.MQTT_5 || (session.getVersion() != MQTTVersion.MQTT_5 && !session.isClean());
if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
return session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
}

if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
@@ -207,7 +213,7 @@ private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressIn
return session.getServer().locateQueue(name);
} else {
try {
return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(MQTTUtil.DURABLE_MESSAGES && qos >= 0));
return session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(getMessageFilter(addressInfo.getName())).setDurable(durable));
} catch (ActiveMQQueueExistsException e) {
return session.getServer().locateQueue(addressInfo.getName());
}
@@ -8,6 +8,21 @@ This chapter provides the following information for each release:
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
chapter in addition to any version-specific upgrade instructions outlined here.

## 2.24.0
[Full release notes](TBD).

Highlights:
- TBD

#### Upgrading from older versions

Due to [ARTEMIS-3851](https://issues.apache.org/jira/browse/ARTEMIS-3851) the queue
created for an MQTT 3.x subscriber using `CleanSession=1` is now **non-durable**
rather than durable. This may impact `security-settings` for MQTT clients which
previously only had `createDurableQueue` for their role. They will now need
`createNonDurableQueue` as well. Again, this only has potential impact for MQTT 3.x
clients using `CleanSession=1`.

## 2.23.0
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12351677).

@@ -73,6 +73,21 @@ private void testQueueClean(boolean managed) throws Exception {
}
}

@Test
public void testQueueCleanOnRestart() throws Exception {
String topic = "clean/test";
String clientId = "mqtt-client";
String queueName = "mqtt-client.clean.test";

MQTTClientProvider clientProvider = getMQTTClientProvider();
clientProvider.setClientId(clientId);
initializeConnection(clientProvider);
clientProvider.subscribe(topic, AT_LEAST_ONCE);
server.stop();
server.start();
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10);
}

@Test
public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception {
Random random = new Random();
@@ -69,7 +69,7 @@ public void testSendAndReceiveMQTT() throws Exception {
MqttClient consumer = createPahoClient("consumerId");
MqttClient producer = createPahoClient("producerId");
MqttConnectOptions conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
conOpt.setCleanSession(false);
conOpt.setUserName(user1);
conOpt.setPassword(password1.toCharArray());
consumer.connect(conOpt);
@@ -208,4 +208,21 @@ public void testWillFlagFalseWithSessionExpiryDelay() throws Exception {
scanSessions();
assertEquals(0, server.locateQueue("DLA").getMessageCount());
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testQueueCleanOnRestart() throws Exception {
String topic = RandomUtil.randomString();
String clientId = RandomUtil.randomString();

MqttClient client = createPahoClient(clientId);
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.sessionExpiryInterval(999L)
.cleanStart(true)
.build();
client.connect(options);
client.subscribe(topic, AT_LEAST_ONCE);
server.stop();
server.start();
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 3000, 10);
}
}

0 comments on commit c9208aa

Please sign in to comment.