Skip to content

Commit

Permalink
Adding amqps support to the rabbit mq tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartmcgillivray committed Nov 6, 2023
1 parent 1edc452 commit b373a05
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Security.Authentication;
using Elsa.Activities.RabbitMq.Helpers;
using Elsa.ActivityResults;
using Elsa.Attributes;
Expand Down Expand Up @@ -46,6 +47,21 @@ public class RabbitMqMessageReceived : Activity
Order = 2,
Category = PropertyCategories.Configuration)]
public string ConnectionString { get; set; } = default!;


[ActivityInput( Order = 3,Category = PropertyCategories.Configuration)]
public bool EnableSSL { get; set; }

[ActivityInput( Order = 4,Category = PropertyCategories.Configuration)]
public string SSLHost { get; set; }

[ActivityInput(
Order = 5,
Category = PropertyCategories.Configuration,
UIHint = ActivityInputUIHints.CheckList,
DefaultSyntax = SyntaxNames.Json,
Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" })]
public IEnumerable<string> SslProtocols { get; set; }

public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SendRabbitMqMessage(IMessageSenderClientFactory messageSenderClientFactor
Order = 3,
UIHint = ActivityInputUIHints.Dictionary,
DefaultSyntax = SyntaxNames.Json,
SupportedSyntaxes = new[] { SyntaxNames.Json, SyntaxNames.JavaScript })]
SupportedSyntaxes = new[] { SyntaxNames.Json, SyntaxNames.JavaScript })]
public Dictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();

[ActivityInput(
Expand All @@ -61,11 +61,33 @@ public SendRabbitMqMessage(IMessageSenderClientFactory messageSenderClientFactor
Category = PropertyCategories.Configuration)]
public string ConnectionString { get; set; } = default!;

[ActivityInput(
Order = 2,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid },
Category = PropertyCategories.Configuration)]
public bool EnableSSL { get; set; }

[ActivityInput(
Order = 3,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid },
Category = PropertyCategories.Configuration)]
public string SSLHost { get; set; }

[ActivityInput(
Order = 4,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid,SyntaxNames.Json },
Category = PropertyCategories.Configuration,
UIHint = ActivityInputUIHints.CheckList,
DefaultSyntax = SyntaxNames.Json,
Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" }
)]
public HashSet<string> SslProtocols { get; set; } = new() { "Tls12", "Tls13" };

public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id);

protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(ActivityExecutionContext context)
{
var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId);
var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId, EnableSSL, SSLHost, SslProtocols);

var client = await _messageSenderClientFactory.GetSenderAsync(config);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Elsa.Services;
using System.Collections.Generic;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -11,18 +12,24 @@ public MessageReceivedBookmark()
{
}

public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary<string, string> headers)
public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary<string, string> headers, bool sslEnabled, string sslHost, IEnumerable<string> sslProtocols)
{
ExchangeName = exchangeName;
RoutingKey = routingKey;
ConnectionString = connectionString;
SslEnabled = sslEnabled;
SSLHost = sslHost;
SslProtocols = sslProtocols;
Headers = headers ?? new Dictionary<string, string>();
}

public string ExchangeName { get; set; } = default!;
public string RoutingKey { get; set; } = default!;
public string ConnectionString { get; set; } = default!;
public bool SslEnabled { get; set; } = default!;
public string SSLHost { get; set; } = default!;
public Dictionary<string, string> Headers { get; set; } = default!;
public IEnumerable<string> SslProtocols { get; set; }
}

public class QueueMessageReceivedBookmarkProvider : BookmarkProvider<MessageReceivedBookmark, RabbitMqMessageReceived>
Expand All @@ -35,7 +42,10 @@ public class QueueMessageReceivedBookmarkProvider : BookmarkProvider<MessageRece
exchangeName: (await context.ReadActivityPropertyAsync(x => x.ExchangeName, cancellationToken))!,
routingKey: (await context.ReadActivityPropertyAsync(x => x.RoutingKey, cancellationToken))!,
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!,
sslEnabled:(await context.ReadActivityPropertyAsync(x => x.EnableSSL, cancellationToken))!,
sslHost: (await context.ReadActivityPropertyAsync(x => x.SSLHost, cancellationToken))!,
sslProtocols: (await context.ReadActivityPropertyAsync(x => x.SslProtocols, cancellationToken))!
))
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;


namespace Elsa.Activities.RabbitMq.Configuration
{
Expand All @@ -12,12 +15,19 @@ public class RabbitMqBusConfiguration
public string ClientId { get; }
public bool AutoDeleteQueue { get; }

public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary<string, string> headers, string clientId, bool autoDeleteQueue = false)
public bool EnableSSL { get; }
public string SslHost { get; }
public SslProtocols SslProtocols { get; }

public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary<string, string> headers, string clientId, bool enableSsl, string sslHost, IEnumerable<string> sslProtocols, bool autoDeleteQueue = false)
{
ConnectionString = connectionString;
ExchangeName = exchangeName;
RoutingKey = routingKey;
Headers = headers ?? new Dictionary<string, string>();
EnableSSL = enableSsl;
SslHost = sslHost;
SslProtocols = ResolveSslProtocols(sslProtocols);
ClientId = clientId;
AutoDeleteQueue = autoDeleteQueue;
}
Expand All @@ -30,5 +40,29 @@ public override int GetHashCode()
}

public string TopicFullName => string.IsNullOrEmpty(ExchangeName) ? RoutingKey : $"{RoutingKey}@{ExchangeName}";

public IEnumerable<string> SslProtocolsString => Enum.GetValues(typeof(SslProtocols))
.Cast<SslProtocols>()
.Where(c => SslProtocols.HasFlag(c) && c != SslProtocols.None)
.Select(c => c.ToString());

private SslProtocols ResolveSslProtocols(IEnumerable<string> sslProtocols)
{
var parsed = sslProtocols
.Select(s =>
{
var val = (SslProtocols)Enum.Parse(typeof(System.Security.Authentication.SslProtocols), s);
return val;
}).ToList();

SslProtocols values = SslProtocols.None;

foreach (var sslProtocol in parsed)
{
values |= sslProtocol;
}
return values;
}
}
}
12 changes: 10 additions & 2 deletions src/activities/Elsa.Activities.RabbitMq/Services/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Rebus.RabbitMq;

namespace Elsa.Activities.RabbitMq.Services
{
Expand Down Expand Up @@ -41,7 +43,9 @@ public void SubscribeWithHandler(Func<TransportMessage, CancellationToken, Task>
}))
.Transport(t =>
{
t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId).InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue));
t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId)
.InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue))
.Ssl(new SslSettings(Configuration.EnableSSL, Configuration.SslHost, version: Configuration.SslProtocols));
})
.Start();

Expand All @@ -64,7 +68,11 @@ private void ConfigureAsOneWayClient()
{
_bus = Configure
.With(_activator)
.Transport(t => t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString).InputQueueOptions(o => o.SetAutoDelete(autoDelete: true)))
.Transport(t =>
{
t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString).InputQueueOptions(o => o.SetAutoDelete(autoDelete: true))
.Ssl(new SslSettings(Configuration.EnableSSL, Configuration.SslHost, version: Configuration.SslProtocols));
})
.Start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ private RabbitMqBusConfiguration CreateConfigurationFromBookmark(MessageReceived
var exchangeName = bookmark.ExchangeName;
var routingKey = bookmark.RoutingKey;
var headers = bookmark.Headers;
var enableSsl = bookmark.SslEnabled;
var sslHost = bookmark.SSLHost;
var sslProtocols = bookmark.SslProtocols;
var clientId = RabbitMqClientConfigurationHelper.GetClientId(activityId);

return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId);
return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId, enableSsl, sslHost, sslProtocols);
}

private async Task DisposeExistingWorkersAsync()
Expand Down
4 changes: 3 additions & 1 deletion src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using Microsoft.Extensions.Logging;
using Rebus.Messages;
using System;
using System.Linq;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -50,7 +52,7 @@ private async Task TriggerWorkflowsAsync(TransportMessage message, CancellationT

var config = _client.Configuration;

var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers);
var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers, config.EnableSSL, config.SslHost, config.SslProtocolsString);
var launchContext = new WorkflowsQuery(ActivityType, bookmark);

using var scope = _scopeFactory.CreateScope();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ private async IAsyncEnumerable<RabbitMqBusConfiguration> GetConfigurationsAsync(
var routingKey = await activity.EvaluatePropertyValueAsync(x => x.RoutingKey, cancellationToken);
var exchangeName = await activity.EvaluatePropertyValueAsync(x => x.ExchangeName, cancellationToken);
var headers = await activity.EvaluatePropertyValueAsync(x => x.Headers, cancellationToken);
var enableSSL = await activity.EvaluatePropertyValueAsync(x => x.EnableSSL, cancellationToken);
var sslHost = await activity.EvaluatePropertyValueAsync(x => x.SSLHost, cancellationToken);
var sslProtocols = await activity.EvaluatePropertyValueAsync(x => x.SslProtocols, cancellationToken);
var clientId = RabbitMqClientConfigurationHelper.GetTestClientId(activity.ActivityBlueprint.Id);

var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, autoDeleteQueue: true);
var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, enableSSL, sslHost, sslProtocols, autoDeleteQueue: true);

yield return config;
}
Expand Down

0 comments on commit b373a05

Please sign in to comment.