Skip to content

Commit

Permalink
Fix MQTT 3.1.1 session persistence when CleanSession=1 (#1294)
Browse files Browse the repository at this point in the history
  • Loading branch information
logicaloud committed Nov 13, 2021
1 parent 47cd5aa commit f0ea13c
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 17 deletions.
14 changes: 12 additions & 2 deletions Source/MQTTnet/Server/Internal/MqttClientSession.cs
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using MQTTnet.Server.Status;

Expand All @@ -8,24 +8,34 @@ public sealed class MqttClientSession : IDisposable
{
readonly DateTime _createdTimestamp = DateTime.UtcNow;

/// <summary>
/// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5)
/// </summary>
readonly bool _isPersistent;

public MqttClientSession(
string clientId,
IDictionary<object, object> items,
MqttServerEventDispatcher eventDispatcher,
IMqttServerOptions serverOptions,
IMqttRetainedMessagesManager retainedMessagesManager)
IMqttRetainedMessagesManager retainedMessagesManager,
bool isPersistent
)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Items = items ?? throw new ArgumentNullException(nameof(items));

SubscriptionsManager = new MqttClientSubscriptionsManager(this, serverOptions, eventDispatcher, retainedMessagesManager);
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);
_isPersistent = isPersistent;
}

public string ClientId { get; }

public bool IsCleanSession { get; set; } = true;

public bool IsPersistent => _isPersistent;

public MqttApplicationMessage WillMessage { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }
Expand Down
41 changes: 34 additions & 7 deletions Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Expand Up @@ -126,7 +126,7 @@ public void Start(CancellationToken cancellationToken)

// Pass connAckPacket so that IsSessionPresent flag can be set if the client session already exists
clientConnection = await CreateClientConnection(connectPacket, connAckPacket, channelAdapter,
connectionValidatorContext.SessionItems).ConfigureAwait(false);
connectionValidatorContext).ConfigureAwait(false);

await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -156,7 +156,7 @@ await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket, channelAdap
_clientConnections.Remove(clientConnection.ClientId);
}

if (!_options.EnablePersistentSessions)
if ((!_options.EnablePersistentSessions) || (!clientConnection.Session.IsPersistent))
{
await DeleteSessionAsync(clientConnection.ClientId).ConfigureAwait(false);
}
Expand Down Expand Up @@ -491,10 +491,35 @@ await _retainedMessagesManager.HandleMessageAsync(senderClientId, applicationMes
MqttConnectPacket connectPacket,
MqttConnAckPacket connAckPacket,
IMqttChannelAdapter channelAdapter,
IDictionary<object, object> sessionItems)
MqttConnectionValidatorContext context)
{
MqttClientConnection connection;

bool sessionShouldPersist;

if (context.ProtocolVersion == MqttProtocolVersion.V500)
{
// MQTT 5.0 section 3.1.2.11.2
// The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].
//
// A Client that only wants to process messages while connected will set the Clean Start to 1 and set the Session Expiry Interval to 0.
// It will not receive Application Messages published before it connected and has to subscribe afresh to any topics that it is interested
// in each time it connects.

// Persist if SessionExpiryInterval != 0, but may start with a clean session
sessionShouldPersist = context.SessionExpiryInterval != 0;
}
else
{
// MQTT 3.1.1 section 3.1.2.4: persist only if 'not CleanSession'
//
// If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one.
// This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be
// reused in any subsequent Session [MQTT-3.1.2-6].

sessionShouldPersist = !connectPacket.CleanSession;
}

using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
MqttClientSession session;
Expand All @@ -503,14 +528,14 @@ await _retainedMessagesManager.HandleMessageAsync(senderClientId, applicationMes
if (!_clientSessions.TryGetValue(connectPacket.ClientId, out session))
{
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, sessionItems);
session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist);
}
else
{
if (connectPacket.CleanSession)
{
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, sessionItems);
session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist);
}
else
{
Expand Down Expand Up @@ -604,14 +629,16 @@ MqttClientSession GetClientSession(string clientId)
_rootLogger);
}

MqttClientSession CreateSession(string clientId, IDictionary<object, object> sessionItems)
MqttClientSession CreateSession(string clientId, IDictionary<object, object> sessionItems, bool isPersistent)
{
return new MqttClientSession(
clientId,
sessionItems,
_eventDispatcher,
_options,
_retainedMessagesManager);
_retainedMessagesManager,
isPersistent
);
}
}
}
4 changes: 2 additions & 2 deletions Tests/MQTTnet.Core.Tests/Server/General.cs
Expand Up @@ -1167,15 +1167,15 @@ public async Task Collect_Messages_In_Disconnected_Session()
var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions());

// Create the session including the subscription.
var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a"));
var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a").WithCleanSession(false));
await client1.SubscribeAsync("x");
await client1.DisconnectAsync();
await Task.Delay(500);

var clientStatus = await server.GetClientStatusAsync();
Assert.AreEqual(0, clientStatus.Count);

var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b"));
var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b").WithCleanSession(false));
await client2.PublishAsync("x", "1");
await client2.PublishAsync("x", "2");
await client2.PublishAsync("x", "3");
Expand Down
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Packets;
Expand Down Expand Up @@ -107,7 +107,8 @@ MqttClientSession CreateSession()
new ConcurrentDictionary<object, object>(),
new MqttServerEventDispatcher(new TestLogger()),
new MqttServerOptions(),
new MqttRetainedMessagesManager());
new MqttRetainedMessagesManager(),
false);
}
}
}
54 changes: 53 additions & 1 deletion Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs
@@ -1,4 +1,4 @@
using System.Linq;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -107,6 +107,58 @@ public async Task Manage_Session_MaxParallel()
}
}

[TestMethod]
public async Task Clean_Session_Persistence()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
// Create server with persistent sessions enabled

await testEnvironment.StartServer(o => o.WithPersistentSessions());

const string ClientId = "Client1";

// Create client with clean session and long session expiry interval

var client1 = await testEnvironment.ConnectClient(o => o
.WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
.WithCleanSession(true) // start and end with clean session
.WithClientId(ClientId)
.Build()
);

// Disconnect; empty session should be removed from server

await client1.DisconnectAsync();

// Simulate some time delay between connections

await Task.Delay(1000);

// Reconnect the same client ID without clean session

var client2 = testEnvironment.CreateClient();
var options = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
.WithCleanSession(false) // see if there is a session
.WithClientId(ClientId)
.Build();


var result = await client2.ConnectAsync(options).ConfigureAwait(false);

await client2.DisconnectAsync();

// Session should NOT be present for MQTT v311 and initial CleanSession == true

Assert.IsTrue(!result.IsSessionPresent, "Session present");
}
}

async Task<IMqttClient> TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options)
{
try
Expand Down
6 changes: 3 additions & 3 deletions Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs
@@ -1,4 +1,4 @@
using System.Linq;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
Expand Down Expand Up @@ -81,8 +81,8 @@ public async Task Keep_Persistent_Session()
{
var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions());

var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1"));
var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2"));
var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1").WithCleanSession(false));
var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2").WithCleanSession(false));

await c1.DisconnectAsync();

Expand Down

0 comments on commit f0ea13c

Please sign in to comment.