diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 14d30026e..b1fd0b6db 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -38,6 +38,31 @@ public async Task Inject_Application_Message_At_Session_Level() } } + [TestMethod] + public async Task Inject_ApplicationMessage_To_Client_At_Server_Level() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServer(); + + var receiver = await testEnvironment.ConnectClient(); + + var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + + await receiver.SubscribeAsync("#"); + + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + + var serverEx = (IMqttServerExtensibility)server; + await serverEx.MqttClientSessionsManager.DispatchApplicationMessageToClient(receiver.Options.ClientId, "InjectionSender", server.ServerSessionItems, injectedApplicationMessage, default); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); + Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); + } + } + [TestMethod] public async Task Inject_ApplicationMessage_At_Server_Level() { diff --git a/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs new file mode 100644 index 000000000..30b66a5e8 --- /dev/null +++ b/Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Text; + +namespace MQTTnet.Server +{ + public interface IMqttServerExtensibility + { + + MqttClientSessionsManager MqttClientSessionsManager { get; } + + } +} diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index e3dee096e..8ba505194 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -240,6 +240,132 @@ public async Task DispatchApplicationMessage( return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); } + public async Task DispatchApplicationMessageToClient( + string clientId, + string senderId, + IDictionary senderSessionItems, + MqttApplicationMessage applicationMessage, + CancellationToken cancellationToken) + { + var processPublish = true; + var closeConnection = false; + string reasonString = null; + List userProperties = null; + var reasonCode = 0; // The reason code is later converted into several different but compatible enums! + + // Allow the user to intercept application message... + if (_eventContainer.InterceptingPublishEvent.HasHandlers) + { + var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems); + if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) + { + // This can happen if a topic alias us used but the topic is + // unknown to the server. + interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid; + interceptingPublishEventArgs.ProcessPublish = false; + } + + await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); + + applicationMessage = interceptingPublishEventArgs.ApplicationMessage; + closeConnection = interceptingPublishEventArgs.CloseConnection; + processPublish = interceptingPublishEventArgs.ProcessPublish; + reasonString = interceptingPublishEventArgs.Response.ReasonString; + userProperties = interceptingPublishEventArgs.Response.UserProperties; + reasonCode = (int)interceptingPublishEventArgs.Response.ReasonCode; + } + + // Process the application message... + if (processPublish && applicationMessage != null) + { + try + { + if (applicationMessage.Retain) + { + await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); + } + + MqttSession session; + bool foundSession; + lock (_sessionsManagementLock) + { + foundSession = _sessions.TryGetValue(clientId, out session); + } + if (!foundSession) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + // Calculate application message topic hash once for subscription checks + MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _); + + if (!session.TryCheckSubscriptions( + applicationMessage.Topic, + topicHash, + applicationMessage.QualityOfServiceLevel, + senderId, + out var checkSubscriptionsResult)) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + // Checking the subscriptions has failed for the session. The session + // will be ignored. + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + if (!checkSubscriptionsResult.IsSubscribed) + { + await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false); + + return new DispatchApplicationMessageResult( + (int)MqttPubAckReasonCode.NoMatchingSubscribers, + false, + reasonString, + userProperties); + } + + var publishPacketCopy = MqttPacketFactories.Publish.Create(applicationMessage); + publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; + publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + + if (publishPacketCopy.QualityOfServiceLevel > 0) + { + publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); + } + + if (checkSubscriptionsResult.RetainAsPublished) + { + // Transfer the original retain state from the publisher. This is a MQTTv5 feature. + publishPacketCopy.Retain = applicationMessage.Retain; + } + else + { + publishPacketCopy.Retain = false; + } + + session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + + _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic); + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing next queued application message."); + } + } + + return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); + } + public void Dispose() { _createConnectionSyncRoot.Dispose(); diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index d84826140..e1a659661 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Server { - public class MqttServer : Disposable + public class MqttServer : Disposable, IMqttServerExtensibility { readonly ICollection _adapters; readonly MqttClientSessionsManager _clientSessionsManager; @@ -165,6 +165,8 @@ public event Func ValidatingConnectionAsync public bool IsStarted => _cancellationTokenSource != null; + MqttClientSessionsManager IMqttServerExtensibility.MqttClientSessionsManager => _clientSessionsManager; + /// /// Gives access to the session items which belong to this server. This session items are passed /// to several events instead of the client session items if the event is caused by the server instead of a client. @@ -232,7 +234,7 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl throw new NotSupportedException("Injected application messages must contain a topic. Topic alias is not supported."); } - var sessionItems = injectedApplicationMessage.CustomSessionItems ?? ServerSessionItems; + var sessionItems = ServerSessionItems; return _clientSessionsManager.DispatchApplicationMessage( injectedApplicationMessage.SenderClientId,