From f0ea13c07f783ed6de401b0c7ba3de411b505b9e Mon Sep 17 00:00:00 2001 From: logicaloud <47704763+logicaloud@users.noreply.github.com> Date: Sat, 13 Nov 2021 22:05:54 +1300 Subject: [PATCH] Fix MQTT 3.1.1 session persistence when CleanSession=1 (#1294) --- .../Server/Internal/MqttClientSession.cs | 14 ++++- .../Internal/MqttClientSessionsManager.cs | 41 +++++++++++--- Tests/MQTTnet.Core.Tests/Server/General.cs | 4 +- .../Server/MqttSubscriptionsManager_Tests.cs | 5 +- .../Server/Session_Tests.cs | 54 ++++++++++++++++++- .../MQTTnet.Core.Tests/Server/Status_Tests.cs | 6 +-- 6 files changed, 107 insertions(+), 17 deletions(-) diff --git a/Source/MQTTnet/Server/Internal/MqttClientSession.cs b/Source/MQTTnet/Server/Internal/MqttClientSession.cs index 3f68fdfdf..291b3e5aa 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSession.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSession.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using MQTTnet.Server.Status; @@ -8,24 +8,34 @@ public sealed class MqttClientSession : IDisposable { readonly DateTime _createdTimestamp = DateTime.UtcNow; + /// + /// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5) + /// + readonly bool _isPersistent; + public MqttClientSession( string clientId, IDictionary items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, - IMqttRetainedMessagesManager retainedMessagesManager) + IMqttRetainedMessagesManager retainedMessagesManager, + bool isPersistent + ) { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); Items = items ?? throw new ArgumentNullException(nameof(items)); SubscriptionsManager = new MqttClientSubscriptionsManager(this, serverOptions, eventDispatcher, retainedMessagesManager); ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); + _isPersistent = isPersistent; } public string ClientId { get; } public bool IsCleanSession { get; set; } = true; + public bool IsPersistent => _isPersistent; + public MqttApplicationMessage WillMessage { get; set; } public MqttClientSubscriptionsManager SubscriptionsManager { get; } diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 8dccde9a5..8831a05b5 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -126,7 +126,7 @@ public void Start(CancellationToken cancellationToken) // Pass connAckPacket so that IsSessionPresent flag can be set if the client session already exists clientConnection = await CreateClientConnection(connectPacket, connAckPacket, channelAdapter, - connectionValidatorContext.SessionItems).ConfigureAwait(false); + connectionValidatorContext).ConfigureAwait(false); await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false); @@ -156,7 +156,7 @@ await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket, channelAdap _clientConnections.Remove(clientConnection.ClientId); } - if (!_options.EnablePersistentSessions) + if ((!_options.EnablePersistentSessions) || (!clientConnection.Session.IsPersistent)) { await DeleteSessionAsync(clientConnection.ClientId).ConfigureAwait(false); } @@ -491,10 +491,35 @@ await _retainedMessagesManager.HandleMessageAsync(senderClientId, applicationMes MqttConnectPacket connectPacket, MqttConnAckPacket connAckPacket, IMqttChannelAdapter channelAdapter, - IDictionary sessionItems) + MqttConnectionValidatorContext context) { MqttClientConnection connection; + bool sessionShouldPersist; + + if (context.ProtocolVersion == MqttProtocolVersion.V500) + { + // MQTT 5.0 section 3.1.2.11.2 + // The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23]. + // + // A Client that only wants to process messages while connected will set the Clean Start to 1 and set the Session Expiry Interval to 0. + // It will not receive Application Messages published before it connected and has to subscribe afresh to any topics that it is interested + // in each time it connects. + + // Persist if SessionExpiryInterval != 0, but may start with a clean session + sessionShouldPersist = context.SessionExpiryInterval != 0; + } + else + { + // MQTT 3.1.1 section 3.1.2.4: persist only if 'not CleanSession' + // + // If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. + // This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be + // reused in any subsequent Session [MQTT-3.1.2-6]. + + sessionShouldPersist = !connectPacket.CleanSession; + } + using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false)) { MqttClientSession session; @@ -503,14 +528,14 @@ await _retainedMessagesManager.HandleMessageAsync(senderClientId, applicationMes if (!_clientSessions.TryGetValue(connectPacket.ClientId, out session)) { _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); - session = CreateSession(connectPacket.ClientId, sessionItems); + session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist); } else { if (connectPacket.CleanSession) { _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); - session = CreateSession(connectPacket.ClientId, sessionItems); + session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist); } else { @@ -604,14 +629,16 @@ MqttClientSession GetClientSession(string clientId) _rootLogger); } - MqttClientSession CreateSession(string clientId, IDictionary sessionItems) + MqttClientSession CreateSession(string clientId, IDictionary sessionItems, bool isPersistent) { return new MqttClientSession( clientId, sessionItems, _eventDispatcher, _options, - _retainedMessagesManager); + _retainedMessagesManager, + isPersistent + ); } } } \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/Server/General.cs b/Tests/MQTTnet.Core.Tests/Server/General.cs index f39b52150..7d0ba9342 100644 --- a/Tests/MQTTnet.Core.Tests/Server/General.cs +++ b/Tests/MQTTnet.Core.Tests/Server/General.cs @@ -1167,7 +1167,7 @@ public async Task Collect_Messages_In_Disconnected_Session() var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions()); // Create the session including the subscription. - var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a")); + var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a").WithCleanSession(false)); await client1.SubscribeAsync("x"); await client1.DisconnectAsync(); await Task.Delay(500); @@ -1175,7 +1175,7 @@ public async Task Collect_Messages_In_Disconnected_Session() var clientStatus = await server.GetClientStatusAsync(); Assert.AreEqual(0, clientStatus.Count); - var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b")); + var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b").WithCleanSession(false)); await client2.PublishAsync("x", "1"); await client2.PublishAsync("x", "2"); await client2.PublishAsync("x", "3"); diff --git a/Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs b/Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs index 4ef18b4de..2a6f58f9a 100644 --- a/Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs @@ -1,4 +1,4 @@ -using System.Collections.Concurrent; +using System.Collections.Concurrent; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Packets; @@ -107,7 +107,8 @@ MqttClientSession CreateSession() new ConcurrentDictionary(), new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), - new MqttRetainedMessagesManager()); + new MqttRetainedMessagesManager(), + false); } } } diff --git a/Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs b/Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs index e9000cdca..c502edffb 100644 --- a/Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs @@ -1,4 +1,4 @@ -using System.Linq; +using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -107,6 +107,58 @@ public async Task Manage_Session_MaxParallel() } } + [TestMethod] + public async Task Clean_Session_Persistence() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + // Create server with persistent sessions enabled + + await testEnvironment.StartServer(o => o.WithPersistentSessions()); + + const string ClientId = "Client1"; + + // Create client with clean session and long session expiry interval + + var client1 = await testEnvironment.ConnectClient(o => o + .WithProtocolVersion(Formatter.MqttProtocolVersion.V311) + .WithTcpServer("127.0.0.1", testEnvironment.ServerPort) + .WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact + .WithCleanSession(true) // start and end with clean session + .WithClientId(ClientId) + .Build() + ); + + // Disconnect; empty session should be removed from server + + await client1.DisconnectAsync(); + + // Simulate some time delay between connections + + await Task.Delay(1000); + + // Reconnect the same client ID without clean session + + var client2 = testEnvironment.CreateClient(); + var options = testEnvironment.Factory.CreateClientOptionsBuilder() + .WithProtocolVersion(Formatter.MqttProtocolVersion.V311) + .WithTcpServer("127.0.0.1", testEnvironment.ServerPort) + .WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact + .WithCleanSession(false) // see if there is a session + .WithClientId(ClientId) + .Build(); + + + var result = await client2.ConnectAsync(options).ConfigureAwait(false); + + await client2.DisconnectAsync(); + + // Session should NOT be present for MQTT v311 and initial CleanSession == true + + Assert.IsTrue(!result.IsSessionPresent, "Session present"); + } + } + async Task TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options) { try diff --git a/Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs b/Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs index c3f9f3891..2995859ed 100644 --- a/Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs @@ -1,4 +1,4 @@ -using System.Linq; +using System.Linq; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; @@ -81,8 +81,8 @@ public async Task Keep_Persistent_Session() { var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions()); - var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1")); - var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2")); + var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1").WithCleanSession(false)); + var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2").WithCleanSession(false)); await c1.DisconnectAsync();