Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow interception of will messages #1613

Merged
merged 2 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
* [Server] Fix not properly reset statistics (#1587, thanks to @damikun).
* [Server] Now using an empty string as the sender client ID for injected application messages (#1583, thanks to @xljiulang).
* [Server] Improved memory usage for ASP.NET connections (#1582, thanks to @xljiulang).
* [Server] Improved memory usage and performance for ASP.NET integration (#1596, thanks to @xljiulang).
* [Server] Improved memory usage and performance for ASP.NET integration (#1596, thanks to @xljiulang).
* [Server] Add support for interception of will messages (#1613).
24 changes: 24 additions & 0 deletions Source/MQTTnet.Tests/Server/Injection_Tests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Internal;
using MQTTnet.Server;

namespace MQTTnet.Tests.Server
Expand Down Expand Up @@ -60,5 +61,28 @@ public async Task Inject_ApplicationMessage_At_Server_Level()
Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
}
}

[TestMethod]
public async Task Intercept_Injected_Application_Message()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

MqttApplicationMessage interceptedMessage = null;
server.InterceptingPublishAsync += eventArgs =>
{
interceptedMessage = eventArgs.ApplicationMessage;
return CompletedTask.Instance;
};

var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();
await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage));

await LongTestDelay();

Assert.IsNotNull(interceptedMessage);
}
}
}
}
70 changes: 40 additions & 30 deletions Source/MQTTnet.Tests/Server/Will_Tests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
Expand All @@ -11,60 +10,71 @@ namespace MQTTnet.Tests.Server
public sealed class Will_Tests : BaseTestClass
{
[TestMethod]
public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect()
public async Task Intercept_Will_Message()
{
using (var testEnvironment = CreateTestEnvironment())
{
var receivedMessagesCount = 0;

await testEnvironment.StartServer();
var server = await testEnvironment.StartServer().ConfigureAwait(false);

var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will");

var c1 = await testEnvironment.ConnectClient();

c1.ApplicationMessageReceivedAsync += e =>
MqttApplicationMessage willMessage = null;
server.InterceptingPublishAsync += eventArgs =>
{
Interlocked.Increment(ref receivedMessagesCount);
willMessage = eventArgs.ApplicationMessage;
return CompletedTask.Instance;
};

await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());

var c2 = await testEnvironment.ConnectClient(clientOptions);
await c2.DisconnectAsync().ConfigureAwait(false);
await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()).ConfigureAwait(false);
var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will").WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce);
var takeOverClient = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false);
takeOverClient.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.

await Task.Delay(1000);
await LongTestDelay().ConfigureAwait(false);

Assert.AreEqual(0, receivedMessagesCount);
Assert.IsNotNull(willMessage);
}
}

[TestMethod]
public async Task Will_Message_Send()
public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect()
{
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder());
var receiver = await testEnvironment.ConnectClient().ConfigureAwait(false);

var receivedMessagesCount = 0;
c1.ApplicationMessageReceivedAsync += e =>
{
Interlocked.Increment(ref receivedMessagesCount);
return CompletedTask.Instance;
};
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver);

await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());

var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will");
var sender = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false);
await sender.DisconnectAsync().ConfigureAwait(false);

await LongTestDelay().ConfigureAwait(false);

Assert.AreEqual(0, receivedMessages.ReceivedEventArgs.Count);
}
}

[TestMethod]
public async Task Will_Message_Send()
{
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
var receiver = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()).ConfigureAwait(false);
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver);
await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());

var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will").WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce);
var c2 = await testEnvironment.ConnectClient(clientOptions);
c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
var takeOverClient = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false);
takeOverClient.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.

await Task.Delay(1000);
await LongTestDelay().ConfigureAwait(false);

Assert.AreEqual(1, receivedMessagesCount);
Assert.AreEqual(1, receivedMessages.ReceivedEventArgs.Count);
}
}
}
Expand Down
28 changes: 9 additions & 19 deletions Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,26 @@ public sealed class MqttPubAckPacketFactory
{
public MqttPubAckPacket Create(
MqttPublishPacket publishPacket,
InterceptingPublishEventArgs interceptingPublishEventArgs,
DispatchApplicationMessageResult dispatchApplicationMessageResult)
{
if (publishPacket == null)
{
throw new ArgumentNullException(nameof(publishPacket));
}

var pubAckPacket = new MqttPubAckPacket
{
PacketIdentifier = publishPacket.PacketIdentifier
};

if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0)
{
// NoMatchingSubscribers is ONLY sent by the server!
pubAckPacket.ReasonCode = MqttPubAckReasonCode.NoMatchingSubscribers;
}
else
if (dispatchApplicationMessageResult == null)
{
pubAckPacket.ReasonCode = MqttPubAckReasonCode.Success;
throw new ArgumentNullException(nameof(dispatchApplicationMessageResult));
}

if (interceptingPublishEventArgs != null)
var pubAckPacket = new MqttPubAckPacket
{
pubAckPacket.ReasonCode = (MqttPubAckReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode;
pubAckPacket.ReasonString = interceptingPublishEventArgs.Response.ReasonString;
pubAckPacket.UserProperties = interceptingPublishEventArgs.Response.UserProperties;
}

PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = (MqttPubAckReasonCode)dispatchApplicationMessageResult.ReasonCode,
ReasonString = dispatchApplicationMessageResult.ReasonString,
UserProperties = dispatchApplicationMessageResult.UserProperties
};

return pubAckPacket;
}

Expand Down
21 changes: 4 additions & 17 deletions Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ public MqttPubRecPacket Create(MqttApplicationMessageReceivedEventArgs applicati
return pubRecPacket;
}

public MqttPacket Create(
MqttPublishPacket publishPacket,
InterceptingPublishEventArgs interceptingPublishEventArgs,
DispatchApplicationMessageResult dispatchApplicationMessageResult)
public MqttPacket Create(MqttPublishPacket publishPacket, DispatchApplicationMessageResult dispatchApplicationMessageResult)
{
if (publishPacket == null)
{
Expand All @@ -38,21 +35,11 @@ public MqttPubRecPacket Create(MqttApplicationMessageReceivedEventArgs applicati
var pubRecPacket = new MqttPubRecPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
ReasonCode = (MqttPubRecReasonCode)dispatchApplicationMessageResult.ReasonCode,
ReasonString = dispatchApplicationMessageResult.ReasonString,
UserProperties = dispatchApplicationMessageResult.UserProperties
};

if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0)
{
pubRecPacket.ReasonCode = MqttPubRecReasonCode.NoMatchingSubscribers;
}

if (interceptingPublishEventArgs != null)
{
pubRecPacket.ReasonCode = (MqttPubRecReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode;
pubRecPacket.ReasonString = interceptingPublishEventArgs.Response.ReasonString;
pubRecPacket.UserProperties = interceptingPublishEventArgs.Response.UserProperties;
}

return pubRecPacket;
}

Expand Down
18 changes: 15 additions & 3 deletions Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,27 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using MQTTnet.Packets;

namespace MQTTnet.Server
{
public sealed class DispatchApplicationMessageResult
{
public DispatchApplicationMessageResult(int matchingSubscribersCount)
public DispatchApplicationMessageResult(int reasonCode, bool closeConnection, string reasonString, List<MqttUserProperty> userProperties)
{
MatchingSubscribersCount = matchingSubscribersCount;
ReasonCode = reasonCode;
CloseConnection = closeConnection;
ReasonString = reasonString;
UserProperties = userProperties;
}

public int MatchingSubscribersCount { get; }
public bool CloseConnection { get; }

public int ReasonCode { get; }

public string ReasonString { get; }

public List<MqttUserProperty> UserProperties { get; }
}
}
42 changes: 9 additions & 33 deletions Source/MQTTnet/Server/Internal/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public async Task RunAsync()
var willPublishPacket = MqttPacketFactories.Publish.Create(Session.LatestConnectPacket);
var willApplicationMessage = MqttApplicationMessageFactory.Create(willPublishPacket);

_ = _sessionsManager.DispatchApplicationMessage(Id, willApplicationMessage);
_ = _sessionsManager.DispatchApplicationMessage(Id, Session.Items, willApplicationMessage, CancellationToken.None);
Session.WillMessageSent = true;

_logger.Info("Client '{0}': Published will message", Id);
Expand Down Expand Up @@ -210,41 +210,17 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella
{
HandleTopicAlias(publishPacket);

InterceptingPublishEventArgs interceptingPublishEventArgs = null;
var applicationMessage = MqttApplicationMessageFactory.Create(publishPacket);
var closeConnection = false;
var processPublish = true;

if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{
interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, Id, Session.Items);
if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic))
{
// This can happen if a topic alias us used but the topic is
// unknown to the server.
interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid;
interceptingPublishEventArgs.ProcessPublish = false;
}

await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false);

applicationMessage = interceptingPublishEventArgs.ApplicationMessage;
closeConnection = interceptingPublishEventArgs.CloseConnection;
processPublish = interceptingPublishEventArgs.ProcessPublish;
}
var dispatchApplicationMessageResult =
await _sessionsManager.DispatchApplicationMessage(Id, Session.Items, applicationMessage, cancellationToken).ConfigureAwait(false);

if (closeConnection)
if (dispatchApplicationMessageResult.CloseConnection)
{
await StopAsync(MqttDisconnectReasonCode.UnspecifiedError);
return;
}

DispatchApplicationMessageResult dispatchResult = null;
if (processPublish && applicationMessage != null)
{
dispatchResult = await _sessionsManager.DispatchApplicationMessage(Id, applicationMessage).ConfigureAwait(false);
}

switch (publishPacket.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
Expand All @@ -254,13 +230,13 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
var pubAckPacket = MqttPacketFactories.PubAck.Create(publishPacket, interceptingPublishEventArgs, dispatchResult);
var pubAckPacket = MqttPacketFactories.PubAck.Create(publishPacket, dispatchApplicationMessageResult);
Session.EnqueueControlPacket(new MqttPacketBusItem(pubAckPacket));
break;
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
var pubRecPacket = MqttPacketFactories.PubRec.Create(publishPacket, interceptingPublishEventArgs, dispatchResult);
var pubRecPacket = MqttPacketFactories.PubRec.Create(publishPacket, dispatchApplicationMessageResult);
Session.EnqueueControlPacket(new MqttPacketBusItem(pubRecPacket));
break;
}
Expand Down Expand Up @@ -478,16 +454,16 @@ async Task ReceivePackagesLoop(CancellationToken cancellationToken)
}

var logLevel = MqttNetLogLevel.Error;

if (!IsRunning)
{
// There was an exception but the connection is already closed. So there is no chance to send a response to the client etc.
logLevel = MqttNetLogLevel.Warning;
}

if (currentPacket == null)
{
_logger.Publish(logLevel, exception, "Client '{0}': Error while receiving packets", Id);
_logger.Publish(logLevel, exception, "Client '{0}': Error while receiving packets", Id);
}
else
{
Expand Down