From df485cdcd2cd7ffc4c93d6b1b759c5450ce90fb8 Mon Sep 17 00:00:00 2001 From: Christian <6939810+chkr1011@users.noreply.github.com> Date: Tue, 6 Dec 2022 19:36:45 +0100 Subject: [PATCH 1/2] Allow interception of will messages --- .../MQTTnet.Tests/Server/Injection_Tests.cs | 24 +++ Source/MQTTnet.Tests/Server/Will_Tests.cs | 70 ++++---- .../Formatter/MqttPubAckPacketFactory.cs | 28 +-- .../Formatter/MqttPubRecPacketFactory.cs | 21 +-- .../DispatchApplicationMessageResult.cs | 18 +- Source/MQTTnet/Server/Internal/MqttClient.cs | 42 +---- .../Internal/MqttClientSessionsManager.cs | 159 +++++++++++------- Source/MQTTnet/Server/MqttServer.cs | 40 ++--- Source/MQTTnet/Server/PublishResponse.cs | 2 +- 9 files changed, 209 insertions(+), 195 deletions(-) diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 7ce3687db..14d30026e 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -1,6 +1,7 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; +using MQTTnet.Internal; using MQTTnet.Server; namespace MQTTnet.Tests.Server @@ -60,5 +61,28 @@ public async Task Inject_ApplicationMessage_At_Server_Level() Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); } } + + [TestMethod] + public async Task Intercept_Injected_Application_Message() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServer(); + + MqttApplicationMessage interceptedMessage = null; + server.InterceptingPublishAsync += eventArgs => + { + interceptedMessage = eventArgs.ApplicationMessage; + return CompletedTask.Instance; + }; + + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + + await LongTestDelay(); + + Assert.IsNotNull(interceptedMessage); + } + } } } \ No newline at end of file diff --git a/Source/MQTTnet.Tests/Server/Will_Tests.cs b/Source/MQTTnet.Tests/Server/Will_Tests.cs index 12b24f636..e9c364ca1 100644 --- a/Source/MQTTnet.Tests/Server/Will_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Will_Tests.cs @@ -1,4 +1,3 @@ -using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; @@ -11,60 +10,71 @@ namespace MQTTnet.Tests.Server public sealed class Will_Tests : BaseTestClass { [TestMethod] - public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect() + public async Task Intercept_Will_Message() { using (var testEnvironment = CreateTestEnvironment()) { - var receivedMessagesCount = 0; - - await testEnvironment.StartServer(); + var server = await testEnvironment.StartServer().ConfigureAwait(false); - var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will"); - - var c1 = await testEnvironment.ConnectClient(); - - c1.ApplicationMessageReceivedAsync += e => + MqttApplicationMessage willMessage = null; + server.InterceptingPublishAsync += eventArgs => { - Interlocked.Increment(ref receivedMessagesCount); + willMessage = eventArgs.ApplicationMessage; return CompletedTask.Instance; }; - await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); - - var c2 = await testEnvironment.ConnectClient(clientOptions); - await c2.DisconnectAsync().ConfigureAwait(false); + await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()).ConfigureAwait(false); + var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will").WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce); + var takeOverClient = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false); + takeOverClient.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent. - await Task.Delay(1000); + await LongTestDelay().ConfigureAwait(false); - Assert.AreEqual(0, receivedMessagesCount); + Assert.IsNotNull(willMessage); } } [TestMethod] - public async Task Will_Message_Send() + public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect() { using (var testEnvironment = CreateTestEnvironment()) { await testEnvironment.StartServer(); - var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()); + var receiver = await testEnvironment.ConnectClient().ConfigureAwait(false); - var receivedMessagesCount = 0; - c1.ApplicationMessageReceivedAsync += e => - { - Interlocked.Increment(ref receivedMessagesCount); - return CompletedTask.Instance; - }; + var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + + await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); + + var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will"); + var sender = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false); + await sender.DisconnectAsync().ConfigureAwait(false); + + await LongTestDelay().ConfigureAwait(false); + + Assert.AreEqual(0, receivedMessages.ReceivedEventArgs.Count); + } + } + + [TestMethod] + public async Task Will_Message_Send() + { + using (var testEnvironment = CreateTestEnvironment()) + { + await testEnvironment.StartServer(); - await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); + var receiver = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()).ConfigureAwait(false); + var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will").WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce); - var c2 = await testEnvironment.ConnectClient(clientOptions); - c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent. + var takeOverClient = await testEnvironment.ConnectClient(clientOptions).ConfigureAwait(false); + takeOverClient.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent. - await Task.Delay(1000); + await LongTestDelay().ConfigureAwait(false); - Assert.AreEqual(1, receivedMessagesCount); + Assert.AreEqual(1, receivedMessages.ReceivedEventArgs.Count); } } } diff --git a/Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs b/Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs index 536ec0110..c79bed165 100644 --- a/Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs +++ b/Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs @@ -14,7 +14,6 @@ public sealed class MqttPubAckPacketFactory { public MqttPubAckPacket Create( MqttPublishPacket publishPacket, - InterceptingPublishEventArgs interceptingPublishEventArgs, DispatchApplicationMessageResult dispatchApplicationMessageResult) { if (publishPacket == null) @@ -22,28 +21,19 @@ public sealed class MqttPubAckPacketFactory throw new ArgumentNullException(nameof(publishPacket)); } - var pubAckPacket = new MqttPubAckPacket - { - PacketIdentifier = publishPacket.PacketIdentifier - }; - - if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0) - { - // NoMatchingSubscribers is ONLY sent by the server! - pubAckPacket.ReasonCode = MqttPubAckReasonCode.NoMatchingSubscribers; - } - else + if (dispatchApplicationMessageResult == null) { - pubAckPacket.ReasonCode = MqttPubAckReasonCode.Success; + throw new ArgumentNullException(nameof(dispatchApplicationMessageResult)); } - if (interceptingPublishEventArgs != null) + var pubAckPacket = new MqttPubAckPacket { - pubAckPacket.ReasonCode = (MqttPubAckReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode; - pubAckPacket.ReasonString = interceptingPublishEventArgs.Response.ReasonString; - pubAckPacket.UserProperties = interceptingPublishEventArgs.Response.UserProperties; - } - + PacketIdentifier = publishPacket.PacketIdentifier, + ReasonCode = (MqttPubAckReasonCode)dispatchApplicationMessageResult.ReasonCode, + ReasonString = dispatchApplicationMessageResult.ReasonString, + UserProperties = dispatchApplicationMessageResult.UserProperties + }; + return pubAckPacket; } diff --git a/Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs b/Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs index 0a6bba243..9e825ab9d 100644 --- a/Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs +++ b/Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs @@ -25,10 +25,7 @@ public MqttPubRecPacket Create(MqttApplicationMessageReceivedEventArgs applicati return pubRecPacket; } - public MqttPacket Create( - MqttPublishPacket publishPacket, - InterceptingPublishEventArgs interceptingPublishEventArgs, - DispatchApplicationMessageResult dispatchApplicationMessageResult) + public MqttPacket Create(MqttPublishPacket publishPacket, DispatchApplicationMessageResult dispatchApplicationMessageResult) { if (publishPacket == null) { @@ -38,21 +35,11 @@ public MqttPubRecPacket Create(MqttApplicationMessageReceivedEventArgs applicati var pubRecPacket = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier, - ReasonCode = MqttPubRecReasonCode.Success + ReasonCode = (MqttPubRecReasonCode)dispatchApplicationMessageResult.ReasonCode, + ReasonString = dispatchApplicationMessageResult.ReasonString, + UserProperties = dispatchApplicationMessageResult.UserProperties }; - if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0) - { - pubRecPacket.ReasonCode = MqttPubRecReasonCode.NoMatchingSubscribers; - } - - if (interceptingPublishEventArgs != null) - { - pubRecPacket.ReasonCode = (MqttPubRecReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode; - pubRecPacket.ReasonString = interceptingPublishEventArgs.Response.ReasonString; - pubRecPacket.UserProperties = interceptingPublishEventArgs.Response.UserProperties; - } - return pubRecPacket; } diff --git a/Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs b/Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs index bc4637a13..12527f4d7 100644 --- a/Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs +++ b/Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs @@ -2,15 +2,27 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Collections.Generic; +using MQTTnet.Packets; + namespace MQTTnet.Server { public sealed class DispatchApplicationMessageResult { - public DispatchApplicationMessageResult(int matchingSubscribersCount) + public DispatchApplicationMessageResult(int reasonCode, bool closeConnection, string reasonString, List userProperties) { - MatchingSubscribersCount = matchingSubscribersCount; + ReasonCode = reasonCode; + CloseConnection = closeConnection; + ReasonString = reasonString; + UserProperties = userProperties; } - public int MatchingSubscribersCount { get; } + public bool CloseConnection { get; } + + public int ReasonCode { get; } + + public string ReasonString { get; } + + public List UserProperties { get; } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClient.cs b/Source/MQTTnet/Server/Internal/MqttClient.cs index 6c4b9bc82..4a09cd90f 100644 --- a/Source/MQTTnet/Server/Internal/MqttClient.cs +++ b/Source/MQTTnet/Server/Internal/MqttClient.cs @@ -116,7 +116,7 @@ public async Task RunAsync() var willPublishPacket = MqttPacketFactories.Publish.Create(Session.LatestConnectPacket); var willApplicationMessage = MqttApplicationMessageFactory.Create(willPublishPacket); - _ = _sessionsManager.DispatchApplicationMessage(Id, willApplicationMessage); + _ = _sessionsManager.DispatchApplicationMessage(Id, Session.Items, willApplicationMessage, CancellationToken.None); Session.WillMessageSent = true; _logger.Info("Client '{0}': Published will message", Id); @@ -210,41 +210,17 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella { HandleTopicAlias(publishPacket); - InterceptingPublishEventArgs interceptingPublishEventArgs = null; var applicationMessage = MqttApplicationMessageFactory.Create(publishPacket); - var closeConnection = false; - var processPublish = true; - if (_eventContainer.InterceptingPublishEvent.HasHandlers) - { - interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, Id, Session.Items); - if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) - { - // This can happen if a topic alias us used but the topic is - // unknown to the server. - interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid; - interceptingPublishEventArgs.ProcessPublish = false; - } - - await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); - - applicationMessage = interceptingPublishEventArgs.ApplicationMessage; - closeConnection = interceptingPublishEventArgs.CloseConnection; - processPublish = interceptingPublishEventArgs.ProcessPublish; - } + var dispatchApplicationMessageResult = + await _sessionsManager.DispatchApplicationMessage(Id, Session.Items, applicationMessage, cancellationToken).ConfigureAwait(false); - if (closeConnection) + if (dispatchApplicationMessageResult.CloseConnection) { await StopAsync(MqttDisconnectReasonCode.UnspecifiedError); return; } - DispatchApplicationMessageResult dispatchResult = null; - if (processPublish && applicationMessage != null) - { - dispatchResult = await _sessionsManager.DispatchApplicationMessage(Id, applicationMessage).ConfigureAwait(false); - } - switch (publishPacket.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: @@ -254,13 +230,13 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella } case MqttQualityOfServiceLevel.AtLeastOnce: { - var pubAckPacket = MqttPacketFactories.PubAck.Create(publishPacket, interceptingPublishEventArgs, dispatchResult); + var pubAckPacket = MqttPacketFactories.PubAck.Create(publishPacket, dispatchApplicationMessageResult); Session.EnqueueControlPacket(new MqttPacketBusItem(pubAckPacket)); break; } case MqttQualityOfServiceLevel.ExactlyOnce: { - var pubRecPacket = MqttPacketFactories.PubRec.Create(publishPacket, interceptingPublishEventArgs, dispatchResult); + var pubRecPacket = MqttPacketFactories.PubRec.Create(publishPacket, dispatchApplicationMessageResult); Session.EnqueueControlPacket(new MqttPacketBusItem(pubRecPacket)); break; } @@ -478,16 +454,16 @@ async Task ReceivePackagesLoop(CancellationToken cancellationToken) } var logLevel = MqttNetLogLevel.Error; - + if (!IsRunning) { // There was an exception but the connection is already closed. So there is no chance to send a response to the client etc. logLevel = MqttNetLogLevel.Warning; } - + if (currentPacket == null) { - _logger.Publish(logLevel, exception, "Client '{0}': Error while receiving packets", Id); + _logger.Publish(logLevel, exception, "Client '{0}': Error while receiving packets", Id); } else { diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 75dc3f9f0..a21b6926a 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -126,81 +126,118 @@ public async Task DeleteSessionAsync(string clientId) _logger.Verbose("Session for client '{0}' deleted.", clientId); } - - public async Task DispatchApplicationMessage(string senderId, MqttApplicationMessage applicationMessage) + + public async Task DispatchApplicationMessage( + string senderId, + IDictionary senderSessionItems, + MqttApplicationMessage applicationMessage, + CancellationToken cancellationToken) { - var matchingSubscribersCount = 0; - try + var processPublish = true; + var closeConnection = false; + string reasonString = null; + List userProperties = null; + var reasonCode = 0; // The reason code is later converted into several different but compatible enums! + + // Allow the user to intercept application message... + if (_eventContainer.InterceptingPublishEvent.HasHandlers) { - if (applicationMessage.Retain) - { - await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); - } - - List subscriberSessions; - lock (_sessionsManagementLock) + var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems); + if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) { - subscriberSessions = _subscriberSessions.ToList(); + // This can happen if a topic alias us used but the topic is + // unknown to the server. + interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid; + interceptingPublishEventArgs.ProcessPublish = false; } - // Calculate application message topic hash once for subscription checks - MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _); + await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); - foreach (var session in subscriberSessions) + applicationMessage = interceptingPublishEventArgs.ApplicationMessage; + closeConnection = interceptingPublishEventArgs.CloseConnection; + processPublish = interceptingPublishEventArgs.ProcessPublish; + reasonString = interceptingPublishEventArgs.Response.ReasonString; + userProperties = interceptingPublishEventArgs.Response.UserProperties; + reasonCode = (int)interceptingPublishEventArgs.Response.ReasonCode; + } + + // Process the application message... + if (processPublish && applicationMessage != null) + { + var matchingSubscribersCount = 0; + try { - if(!session.TryCheckSubscriptions( - applicationMessage.Topic, - topicHash, - applicationMessage.QualityOfServiceLevel, - senderId, - out var checkSubscriptionsResult)) + if (applicationMessage.Retain) { - // Checking the subscriptions has failed for the session. The session - // will be ignored. - continue; + await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); } - if (!checkSubscriptionsResult.IsSubscribed) + List subscriberSessions; + lock (_sessionsManagementLock) { - continue; + subscriberSessions = _subscriberSessions.ToList(); } - var newPublishPacket = MqttPacketFactories.Publish.Create(applicationMessage); - newPublishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; - newPublishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + // Calculate application message topic hash once for subscription checks + MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _); - if (newPublishPacket.QualityOfServiceLevel > 0) + foreach (var session in subscriberSessions) { - newPublishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); - } + if (!session.TryCheckSubscriptions( + applicationMessage.Topic, + topicHash, + applicationMessage.QualityOfServiceLevel, + senderId, + out var checkSubscriptionsResult)) + { + // Checking the subscriptions has failed for the session. The session + // will be ignored. + continue; + } - if (checkSubscriptionsResult.RetainAsPublished) - { - // Transfer the original retain state from the publisher. This is a MQTTv5 feature. - newPublishPacket.Retain = applicationMessage.Retain; + if (!checkSubscriptionsResult.IsSubscribed) + { + continue; + } + + var publishPacketCopy = MqttPacketFactories.Publish.Create(applicationMessage); + publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; + publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + + if (publishPacketCopy.QualityOfServiceLevel > 0) + { + publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); + } + + if (checkSubscriptionsResult.RetainAsPublished) + { + // Transfer the original retain state from the publisher. This is a MQTTv5 feature. + publishPacketCopy.Retain = applicationMessage.Retain; + } + else + { + publishPacketCopy.Retain = false; + } + + session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + matchingSubscribersCount++; + + _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic); } - else + + if (matchingSubscribersCount == 0) { - newPublishPacket.Retain = false; + reasonCode = (int)MqttPubAckReasonCode.NoMatchingSubscribers; + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); } - - session.EnqueueDataPacket(new MqttPacketBusItem(newPublishPacket)); - matchingSubscribersCount++; - - _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic); } - - if (matchingSubscribersCount == 0) + catch (Exception exception) { - await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + _logger.Error(exception, "Unhandled exception while processing next queued application message."); } } - catch (Exception exception) - { - _logger.Error(exception, "Unhandled exception while processing next queued application message."); - } - return new DispatchApplicationMessageResult(matchingSubscribersCount); + return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); } public void Dispose() @@ -495,11 +532,11 @@ MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter cha { MqttSession oldSession; MqttClient oldClient; - + lock (_sessionsManagementLock) { MqttSession session; - + // Create a new session (if required). if (!_sessions.TryGetValue(connectPacket.ClientId, out oldSession)) { @@ -518,7 +555,7 @@ MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter cha _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); session = oldSession; oldSession = null; - + // Session persistence could change for MQTT 5 clients that reconnect with different SessionExpiryInterval session.IsPersistent = sessionShouldPersist; connAckPacket.IsSessionPresent = true; @@ -527,7 +564,7 @@ MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter cha } _sessions[connectPacket.ClientId] = session; - + // Create a new client (always required). _clients.TryGetValue(connectPacket.ClientId, out oldClient); if (oldClient != null) @@ -536,7 +573,7 @@ MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter cha // for a later DISCONNECT packet. oldClient.IsTakenOver = true; } - + client = CreateClient(connectPacket, channelAdapter, session); _clients[connectPacket.ClientId] = client; @@ -548,19 +585,15 @@ MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter cha var preparingSessionEventArgs = new PreparingSessionEventArgs(); await _eventContainer.PreparingSessionEvent.TryInvokeAsync(preparingSessionEventArgs, _logger).ConfigureAwait(false); } - + if (oldClient != null) { await oldClient.StopAsync(MqttDisconnectReasonCode.SessionTakenOver).ConfigureAwait(false); if (_eventContainer.ClientDisconnectedEvent.HasHandlers) { - var eventArgs = new ClientDisconnectedEventArgs( - oldClient.Id, - MqttClientDisconnectType.Takeover, - oldClient.Endpoint, - oldClient.Session.Items); - + var eventArgs = new ClientDisconnectedEventArgs(oldClient.Id, MqttClientDisconnectType.Takeover, oldClient.Endpoint, oldClient.Session.Items); + await _eventContainer.ClientDisconnectedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false); } } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 2c5969e7d..6568f0b56 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -19,17 +19,17 @@ namespace MQTTnet.Server { public class MqttServer : Disposable { - readonly MqttServerEventContainer _eventContainer = new MqttServerEventContainer(); - - readonly IDictionary _sessionItems = new ConcurrentDictionary(); readonly ICollection _adapters; readonly MqttClientSessionsManager _clientSessionsManager; + readonly MqttServerEventContainer _eventContainer = new MqttServerEventContainer(); readonly MqttServerKeepAliveMonitor _keepAliveMonitor; readonly MqttNetSourceLogger _logger; readonly MqttServerOptions _options; readonly MqttRetainedMessagesManager _retainedMessagesManager; readonly IMqttNetLogger _rootLogger; + readonly IDictionary _sessionItems = new ConcurrentDictionary(); + CancellationTokenSource _cancellationTokenSource; public MqttServer(MqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) @@ -207,7 +207,7 @@ public Task> GetSessionsAsync() return _clientSessionsManager.GetSessionStatusAsync(); } - public async Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedApplicationMessage) + public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedApplicationMessage, CancellationToken cancellationToken = default) { if (injectedApplicationMessage == null) { @@ -223,34 +223,16 @@ public async Task InjectApplicationMessage(InjectedMqttApplicationMessage inject ThrowIfNotStarted(); - var processPublish = true; - var applicationMessage = injectedApplicationMessage.ApplicationMessage; - - if (_eventContainer.InterceptingPublishEvent.HasHandlers) - { - var interceptingPublishEventArgs = new InterceptingPublishEventArgs( - applicationMessage, - _cancellationTokenSource.Token, - injectedApplicationMessage.SenderClientId, - _sessionItems); - - await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); - - applicationMessage = interceptingPublishEventArgs.ApplicationMessage; - processPublish = interceptingPublishEventArgs.ProcessPublish; - } - - if (!processPublish) - { - return; - } - - if (string.IsNullOrEmpty(applicationMessage.Topic)) + if (string.IsNullOrEmpty(injectedApplicationMessage.ApplicationMessage.Topic)) { throw new NotSupportedException("Injected application messages must contain a topic. Topic alias is not supported."); } - - await _clientSessionsManager.DispatchApplicationMessage(injectedApplicationMessage.SenderClientId, applicationMessage).ConfigureAwait(false); + + return _clientSessionsManager.DispatchApplicationMessage( + injectedApplicationMessage.SenderClientId, + _sessionItems, + injectedApplicationMessage.ApplicationMessage, + cancellationToken); } public async Task StartAsync() diff --git a/Source/MQTTnet/Server/PublishResponse.cs b/Source/MQTTnet/Server/PublishResponse.cs index d6c7c11e6..049861737 100644 --- a/Source/MQTTnet/Server/PublishResponse.cs +++ b/Source/MQTTnet/Server/PublishResponse.cs @@ -14,6 +14,6 @@ public sealed class PublishResponse public string ReasonString { get; set; } - public List UserProperties { get; } + public List UserProperties { get; set; } } } \ No newline at end of file From b1f265ae1861714c95f402960c30383db8f0d7fa Mon Sep 17 00:00:00 2001 From: Christian <6939810+chkr1011@users.noreply.github.com> Date: Tue, 6 Dec 2022 19:38:39 +0100 Subject: [PATCH 2/2] Update ReleaseNotes.md --- .github/workflows/ReleaseNotes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ReleaseNotes.md b/.github/workflows/ReleaseNotes.md index 0b38d44bf..8c4732c3a 100644 --- a/.github/workflows/ReleaseNotes.md +++ b/.github/workflows/ReleaseNotes.md @@ -4,4 +4,5 @@ * [Server] Fix not properly reset statistics (#1587, thanks to @damikun). * [Server] Now using an empty string as the sender client ID for injected application messages (#1583, thanks to @xljiulang). * [Server] Improved memory usage for ASP.NET connections (#1582, thanks to @xljiulang). -* [Server] Improved memory usage and performance for ASP.NET integration (#1596, thanks to @xljiulang). \ No newline at end of file +* [Server] Improved memory usage and performance for ASP.NET integration (#1596, thanks to @xljiulang). +* [Server] Add support for interception of will messages (#1613). \ No newline at end of file