Skip to content

Commit

Permalink
ARTEMIS-4501 clean up MQTT subscription queues when session expires
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Nov 27, 2023
1 parent 60200b4 commit 8e68bb1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ public void scanSessions() {
for (String key : toRemove) {
try {
MQTTSessionState state = removeSessionState(key);
if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
if (state != null) {
if (state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
}
state.getSession().clean(false);
}
} catch (Exception e) {
MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
Expand Down
26 changes: 7 additions & 19 deletions docs/user-manual/mqtt.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,27 +165,15 @@ In the case of MQTT 5 clients they will receive a disconnect reason code of http

== Automatic Subscription Clean-up

Sometimes MQTT clients using `CleanSession=false` don't clean up their subscriptions.
In such situations the following address-setting can be used to clean up the abandoned subscription queues:
Sometimes MQTT 3.x clients using `CleanSession=false` don't properly unsubscribe. The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` so that abandoned sessions and subscription queues will be cleaned up automatically after the expiry interval elapses.

[,xml]
----
<address-setting match="myMqttAddress">
<auto-delete-created-queues>true</auto-delete-created-queues>
<auto-delete-queues-delay>3600000</auto-delete-queues-delay> <!-- 1 hour delay -->
<auto-delete-queues-message-count>-1</auto-delete-queues-message-count> <!-- doesn't matter how many messages there are -->
</address-setting>
----

However, the MQTT session meta-data is still present in memory and needs to be cleaned up as well.
The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` to deal with this situation.

MQTT 5 added a new https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property with the same basic semantics.
The broker will use the client's value for this property if it is set.
If it is not set then it will apply the `defaultMqttSessionExpiryInterval`.
MQTT 5 has the same basic semantics with slightly different configuration.
The `CleanSession` flag was replaced with `CleanStart` and a https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property.
The broker will use the client's session expiry interval if it is set.
If it is not set then the broker will apply the `defaultMqttSessionExpiryInterval`.

The default `defaultMqttSessionExpiryInterval` is `-1` which means no MQTT 3.x session states will be expired and no MQTT 5 session states which do not pass their own session expiry interval will be expired.
Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state.
The default `defaultMqttSessionExpiryInterval` is `-1` which means no clean up will happen for MQTT 3.x clients or for MQTT 5 clients which do not pass their own session expiry interval.
Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state and subscription queues.

MQTT session state is scanned every 5,000 milliseconds by default.
This can be changed using the `mqtt-session-scan-interval` element set in the `core` section of `broker.xml`.
Expand Down
19 changes: 19 additions & 0 deletions docs/user-manual/versions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@ NOTE: If the upgrade spans multiple versions then the steps from *each* version

NOTE: Follow the general upgrade procedure outlined in the xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in addition to any version-specific upgrade instructions outlined here.

== 2.32.0

https://issues.apache.org/jira/secure/ReleaseNote.jspa...

=== Highlights

* highlight 1
* highlight 2

=== Upgrading from 2.31.x

* Due to https://issues.apache.org/jira/browse/ARTEMIS-4501[ARTEMIS-4501] MQTT subscription queues will be automatically removed when the corresponding session expires, either based on the session expiry interval passed by an MQTT 5 client or based on the configured `defaultMqttSessionExpiryInterval` for MQTT 3.x clients or MQTT 5 clients which don't explicitly pass a session expiry interval.
+
Prior to this change removing subscription queues relied on the generic `auto-delete-*` `address-settings`.
+
These settings are now no longer required.
+
Configure `defaultMqttSessionExpiryInterval` instead.

== 2.31.2

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353776[Full release notes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,13 +968,20 @@ public void testCleanStartFalseWithNon0SessionExpiryInterval() throws Exception
.build();
consumer.connect(options);
consumer.subscribe(TOPIC, 2);
long start = System.currentTimeMillis();
consumer.disconnect();

// session should *not* still exist since session expiry interval has passed
long start = System.currentTimeMillis();
// ensure the subscription queue still exists since the session hasn't expired
assertNotNull(getSubscriptionQueue(TOPIC, CONSUMER_ID));

Wait.assertEquals(0, () -> getSessionStates().size(), EXPIRY_INTERVAL * 1000 * 2, 100);
assertTrue(System.currentTimeMillis() - start > (EXPIRY_INTERVAL * 1000));
assertTrue(System.currentTimeMillis() - start >= (EXPIRY_INTERVAL * 1000));

// session should *not* still exist since session expiry interval has passed
assertNull(getSessionStates().get(CONSUMER_ID));

// ensure the subscription queue is cleaned up when the session expires
Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) == null, 2000, 100);
}

/*
Expand Down

0 comments on commit 8e68bb1

Please sign in to comment.