Skip to content

Commit

Permalink
Elsa.Activities.Mqtt.Activities.MqttMessage substitute MQTTnet.MqttAp…
Browse files Browse the repository at this point in the history
…plicationMessage problem of serialization (#4675)

some classes bcome sealed for better performance.
  • Loading branch information
mircotamburini committed Dec 26, 2023
1 parent a2384ba commit b719d4b
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 30 deletions.
10 changes: 10 additions & 0 deletions src/activities/Elsa.Activities.Mqtt/Activities/MqttMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Elsa.Activities.Mqtt.Activities
{
public record MqttMessage ( string Topic, string Payload);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Services.Models;
using MQTTnet;


namespace Elsa.Activities.Mqtt.Activities.MqttMessageReceived
Expand Down Expand Up @@ -34,10 +33,10 @@ public MqttMessageReceived(IMessageReceiverClientFactory messageReceiver)

private IActivityExecutionResult ExecuteInternalAsync(ActivityExecutionContext context)
{
if (context.Input != null && context.Input.GetType() == typeof(MqttApplicationMessage))
if (context.Input != null && context.Input.GetType() == typeof(MqttMessage))
{
var message = (MqttApplicationMessage)context.Input;
Output = System.Text.Encoding.UTF8.GetString(message.Payload);
var message = (MqttMessage)context.Input;
Output = message.Payload;
TopicReceived = message.Topic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Elsa.Activities.Mqtt.Options
{
public class MqttClientOptions
public sealed class MqttClientOptions
{
public string Topic { get; }
public string Host { get; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using Microsoft.Extensions.DependencyInjection;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Activities.Mqtt.Services
{
public class BusClientFactory : IMessageReceiverClientFactory, IMessageSenderClientFactory
public sealed class BusClientFactory : IMessageReceiverClientFactory, IMessageSenderClientFactory
{
private readonly IServiceProvider _serviceProvider;
private readonly IDictionary<int, IMqttClientWrapper> _senders = new Dictionary<int, IMqttClientWrapper>();
Expand Down Expand Up @@ -53,13 +52,13 @@ public async Task<IMqttClientWrapper> GetReceiverAsync(Options.MqttClientOptions
{
return messageReceiverDateTime;
}


var mqttFactory = new MqttFactory();
var newClient = mqttFactory.CreateMqttClient();
var newMessageReceiver = ActivatorUtilities.CreateInstance<MqttClientWrapper>(_serviceProvider, newClient, options);

_receivers.Add(options.GetHashCode(), newMessageReceiver );
_receivers.Add(options.GetHashCode(), newMessageReceiver);
return newMessageReceiver;
}
finally
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Elsa.Activities.Mqtt.Options;
using MQTTnet;
using Elsa.Activities.Mqtt.Activities;
using MQTTnet.Client;
using System;
using System.Threading.Tasks;
Expand All @@ -11,6 +10,6 @@ public interface IMqttClientWrapper : IDisposable
IMqttClient Client { get; }
Options.MqttClientOptions Options { get; }
Task PublishMessageAsync(string topic, string message);
Task SetMessageHandlerAsync(Func<MqttApplicationMessage, Task> handler);
Task SetMessageHandlerAsync(Func<MqttMessage, Task> handler);
}
}
19 changes: 13 additions & 6 deletions src/activities/Elsa.Activities.Mqtt/Services/MqttClientWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Activities.Mqtt.Activities;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
Expand All @@ -10,10 +11,10 @@

namespace Elsa.Activities.Mqtt.Services
{
public class MqttClientWrapper : IMqttClientWrapper
public sealed class MqttClientWrapper : IMqttClientWrapper
{
private readonly ILogger _logger;
private Func<MqttApplicationMessage, Task>? _messageHandler;
private Func<MqttMessage, Task>? _messageHandler;
private readonly SemaphoreSlim _semaphore = new(1);
public IMqttClient Client { get; }
public Options.MqttClientOptions Options { get; }
Expand All @@ -25,8 +26,8 @@ public MqttClientWrapper(IMqttClient client, Options.MqttClientOptions options,
_logger = logger;
}

private async Task SubscribeAsync(string topic, Func<MqttApplicationMessage, Task> handler)

private async Task SubscribeAsync(string topic, Func<MqttMessage, Task> handler)
{
if (!Client.IsConnected)
{
Expand All @@ -39,7 +40,13 @@ private async Task SubscribeAsync(string topic, Func<MqttApplicationMessage, Tas
Client.ApplicationMessageReceivedAsync += async e =>
{
if (_messageHandler != null)
await _messageHandler(e.ApplicationMessage);
{
var appmsg = e.ApplicationMessage;
var msg = new MqttMessage(
appmsg.Topic,
appmsg.Payload == null ? null : System.Text.Encoding.UTF8.GetString(appmsg.Payload));
await _messageHandler(msg);
}
else
_logger.LogWarning("Attempted to subscribe to topic {Topic}, but no message handler was set.", Options.Topic);
};
Expand All @@ -57,7 +64,7 @@ public async Task PublishMessageAsync(string topic, string message)
await DisconnectAsync();
}

public async Task SetMessageHandlerAsync(Func<MqttApplicationMessage, Task> handler)
public async Task SetMessageHandlerAsync(Func<MqttMessage, Task> handler)
{
await SubscribeAsync(Options.Topic, handler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Elsa.Activities.Mqtt.Bookmarks;
using Elsa.Activities.Mqtt.Options;
using Elsa.Models;
using Elsa.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -8,22 +10,18 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Elsa.Activities.Mqtt.Bookmarks;
using Elsa.Models;
using Elsa.Services.Models;
using Elsa.Services.WorkflowStorage;

namespace Elsa.Activities.Mqtt.Services
{
public class MqttTopicsStarter : IMqttTopicsStarter
public sealed class MqttTopicsStarter : IMqttTopicsStarter
{
private readonly SemaphoreSlim _semaphore = new(1);
private readonly IMessageReceiverClientFactory _receiverFactory;
private readonly IBookmarkSerializer _bookmarkSerializer;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<MqttTopicsStarter> _logger;
private readonly IDictionary<int,Worker> _workers;
private readonly IDictionary<int, Worker> _workers;

public MqttTopicsStarter(
IMessageReceiverClientFactory receiverFactory,
Expand Down Expand Up @@ -127,7 +125,7 @@ public async IAsyncEnumerable<MqttClientOptions> GetConfigurationsAsync([Enumera

private MqttClientOptions CreateConfigurationFromBookmark(MessageReceivedBookmark bookmark, string activityId)
{
return new MqttClientOptions(bookmark.Topic,bookmark.Host,bookmark.Port,bookmark.Username,bookmark.Password,bookmark.Qos);
return new MqttClientOptions(bookmark.Topic, bookmark.Host, bookmark.Port, bookmark.Username, bookmark.Password, bookmark.Qos);
}

private async Task DisposeExistingWorkersAsync()
Expand All @@ -141,6 +139,6 @@ private async Task DisposeExistingWorkersAsync()

private async Task DisposeReceiverAsync(IMqttClientWrapper messageReceiver) => await _receiverFactory.DisposeReceiverAsync(messageReceiver);


}
}
8 changes: 4 additions & 4 deletions src/activities/Elsa.Activities.Mqtt/Services/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using Elsa.Activities.Mqtt.Activities;
using Elsa.Activities.Mqtt.Activities.MqttMessageReceived;
using Elsa.Activities.Mqtt.Bookmarks;
using Elsa.Activities.Mqtt.Options;
using Elsa.Models;
using Elsa.Services;
using Elsa.Services.Models;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Activities.Mqtt.Services
{
public class Worker
public sealed class Worker
{
private readonly Func<IMqttClientWrapper, Task> _disposeReceiverAction;
private readonly IMqttClientWrapper _receiverClient;
Expand Down Expand Up @@ -40,7 +40,7 @@ public void Ping()

private IBookmark CreateBookmark(MqttClientOptions options) => new MessageReceivedBookmark(options.Topic, options.Host, options.Port, options.Username, options.Password, options.QualityOfService);

private async Task TriggerWorkflowsAsync(MqttApplicationMessage message, CancellationToken cancellationToken)
private async Task TriggerWorkflowsAsync(MqttMessage message, CancellationToken cancellationToken)
{
var bookmark = CreateBookmark(_receiverClient.Options);
var launchContext = new WorkflowsQuery(ActivityType, bookmark);
Expand All @@ -49,7 +49,7 @@ private async Task TriggerWorkflowsAsync(MqttApplicationMessage message, Cancell
await workflowLaunchpad.CollectAndDispatchWorkflowsAsync(launchContext, new WorkflowInput(message), cancellationToken);
}

private async Task OnMessageReceived(MqttApplicationMessage message) => await TriggerWorkflowsAsync(message, CancellationToken.None);
private async Task OnMessageReceived(MqttMessage message) => await TriggerWorkflowsAsync(message, CancellationToken.None);

}
}

0 comments on commit b719d4b

Please sign in to comment.