Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added server extensibility interface #1640

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions Source/MQTTnet.Tests/Server/Injection_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@ public async Task Inject_Application_Message_At_Session_Level()
}
}

[TestMethod]
public async Task Inject_ApplicationMessage_To_Client_At_Server_Level()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

var receiver = await testEnvironment.ConnectClient();

var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver);

await receiver.SubscribeAsync("#");

var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build();

var serverEx = (IMqttServerExtensibility)server;
await serverEx.MqttClientSessionsManager.DispatchApplicationMessageToClient(receiver.Options.ClientId, "InjectionSender", server.ServerSessionItems, injectedApplicationMessage, default);

await LongTestDelay();

Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count);
Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic);
}
}

[TestMethod]
public async Task Inject_ApplicationMessage_At_Server_Level()
{
Expand Down
14 changes: 14 additions & 0 deletions Source/MQTTnet/Server/Internal/IMqttServerExtensibility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;

namespace MQTTnet.Server
{
public interface IMqttServerExtensibility
{

MqttClientSessionsManager MqttClientSessionsManager { get; }

}
}
126 changes: 126 additions & 0 deletions Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,132 @@ public async Task DeleteSessionAsync(string clientId)
return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties);
}

public async Task<DispatchApplicationMessageResult> DispatchApplicationMessageToClient(
string clientId,
string senderId,
IDictionary senderSessionItems,
MqttApplicationMessage applicationMessage,
CancellationToken cancellationToken)
{
var processPublish = true;
var closeConnection = false;
string reasonString = null;
List<MqttUserProperty> userProperties = null;
var reasonCode = 0; // The reason code is later converted into several different but compatible enums!

// Allow the user to intercept application message...
if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{
var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems);
if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic))
{
// This can happen if a topic alias us used but the topic is
// unknown to the server.
interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid;
interceptingPublishEventArgs.ProcessPublish = false;
}

await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false);

applicationMessage = interceptingPublishEventArgs.ApplicationMessage;
closeConnection = interceptingPublishEventArgs.CloseConnection;
processPublish = interceptingPublishEventArgs.ProcessPublish;
reasonString = interceptingPublishEventArgs.Response.ReasonString;
userProperties = interceptingPublishEventArgs.Response.UserProperties;
reasonCode = (int)interceptingPublishEventArgs.Response.ReasonCode;
}

// Process the application message...
if (processPublish && applicationMessage != null)
{
try
{
if (applicationMessage.Retain)
{
await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false);
}

MqttSession session;
bool foundSession;
lock (_sessionsManagementLock)
{
foundSession = _sessions.TryGetValue(clientId, out session);
}
if (!foundSession)
{
await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false);

return new DispatchApplicationMessageResult(
(int)MqttPubAckReasonCode.NoMatchingSubscribers,
false,
reasonString,
userProperties);
}

// Calculate application message topic hash once for subscription checks
MqttSubscription.CalculateTopicHash(applicationMessage.Topic, out var topicHash, out _, out _);

if (!session.TryCheckSubscriptions(
applicationMessage.Topic,
topicHash,
applicationMessage.QualityOfServiceLevel,
senderId,
out var checkSubscriptionsResult))
{
await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false);

// Checking the subscriptions has failed for the session. The session
// will be ignored.
return new DispatchApplicationMessageResult(
(int)MqttPubAckReasonCode.NoMatchingSubscribers,
false,
reasonString,
userProperties);
}

if (!checkSubscriptionsResult.IsSubscribed)
{
await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false);

return new DispatchApplicationMessageResult(
(int)MqttPubAckReasonCode.NoMatchingSubscribers,
false,
reasonString,
userProperties);
}

var publishPacketCopy = MqttPacketFactories.Publish.Create(applicationMessage);
publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers;

if (publishPacketCopy.QualityOfServiceLevel > 0)
{
publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier();
}

if (checkSubscriptionsResult.RetainAsPublished)
{
// Transfer the original retain state from the publisher. This is a MQTTv5 feature.
publishPacketCopy.Retain = applicationMessage.Retain;
}
else
{
publishPacketCopy.Retain = false;
}

session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));

_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic);
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while processing next queued application message.");
}
}

return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties);
}

public void Dispose()
{
_createConnectionSyncRoot.Dispose();
Expand Down
6 changes: 4 additions & 2 deletions Source/MQTTnet/Server/MqttServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace MQTTnet.Server
{
public class MqttServer : Disposable
public class MqttServer : Disposable, IMqttServerExtensibility
{
readonly ICollection<IMqttServerAdapter> _adapters;
readonly MqttClientSessionsManager _clientSessionsManager;
Expand Down Expand Up @@ -165,6 +165,8 @@ public MqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> ada

public bool IsStarted => _cancellationTokenSource != null;

MqttClientSessionsManager IMqttServerExtensibility.MqttClientSessionsManager => _clientSessionsManager;

/// <summary>
/// Gives access to the session items which belong to this server. This session items are passed
/// to several events instead of the client session items if the event is caused by the server instead of a client.
Expand Down Expand Up @@ -232,7 +234,7 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl
throw new NotSupportedException("Injected application messages must contain a topic. Topic alias is not supported.");
}

var sessionItems = injectedApplicationMessage.CustomSessionItems ?? ServerSessionItems;
var sessionItems = ServerSessionItems;

return _clientSessionsManager.DispatchApplicationMessage(
injectedApplicationMessage.SenderClientId,
Expand Down