Skip to content

Commit

Permalink
Fixed per instance distributed messaging (#5154)
Browse files Browse the repository at this point in the history
- Configured MassTransitWorkflowCancellationDispatcher
- Fixed RabbitMQ endpoint configuration
- Fixed consumer configuration when running InMemory
  • Loading branch information
raymonddenhaan committed Mar 29, 2024
1 parent c3686bf commit 20f0134
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public override void Configure()
RegisterConsumers(consumers);
configure.AddServiceBusMessageScheduler();
// Consumers need to be added before the UsingAzureServiceBus statement to prevent exceptions.
foreach (var consumer in temporaryConsumers)
configure.AddConsumer(consumer.ConsumerType).ExcludeFromConfigureEndpoints();
configure.UsingAzureServiceBus((context, configurator) =>
{
Expand All @@ -72,13 +76,11 @@ public override void Configure()
configurator.UseServiceBusMessageScheduler();
configurator.SetupWorkflowDispatcherEndpoints(context);
ConfigureServiceBus?.Invoke(configurator);
configurator.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("Elsa", false));
var options = context.GetRequiredService<IOptions<MassTransitWorkflowDispatcherOptions>>().Value;
var instanceNameProvider = context.GetRequiredService<IApplicationInstanceNameProvider>();
foreach (var consumer in temporaryConsumers)
{
configure.AddConsumer(consumer.ConsumerType).ExcludeFromConfigureEndpoints();
configurator.ReceiveEndpoint($"Elsa-{instanceNameProvider.GetName()}-{consumer.Name}", endpointConfigurator =>
{
endpointConfigurator.AutoDeleteOnIdle = options.TemporaryQueueTtl ?? TimeSpan.FromHours(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ public override void Configure()
{
massTransitFeature.BusConfigurator = configure =>
{
var tempConsumers = massTransitFeature.GetConsumers()
var temporaryConsumers = massTransitFeature.GetConsumers()
.Where(c => c.IsTemporary)
.ToList();
// Consumers need to be added before the UsingRabbitMq statement to prevent exceptions.
foreach (var consumer in temporaryConsumers)
configure.AddConsumer(consumer.ConsumerType).ExcludeFromConfigureEndpoints();
configure.UsingRabbitMq((context, configurator) =>
{
var options = context.GetRequiredService<IOptions<MassTransitWorkflowDispatcherOptions>>().Value;
Expand All @@ -58,16 +62,17 @@ public override void Configure()
ConfigureServiceBus?.Invoke(configurator);
foreach (var consumer in tempConsumers)
foreach (var consumer in temporaryConsumers)
{
configure.AddConsumer(consumer.ConsumerType).ExcludeFromConfigureEndpoints();
configurator.ReceiveEndpoint($"{instanceNameProvider.GetName()}-{consumer.Name}",
configurator =>
endpointConfigurator =>
{
configurator.QueueExpiration = options.TemporaryQueueTtl ?? TimeSpan.FromHours(1);
configurator.ConcurrentMessageLimit = options.ConcurrentMessageLimit;
configurator.ConfigureConsumer<DispatchCancelWorkflowsRequestConsumer>(context);
endpointConfigurator.QueueExpiration = options.TemporaryQueueTtl ?? TimeSpan.FromHours(1);
endpointConfigurator.ConcurrentMessageLimit = options.ConcurrentMessageLimit;
endpointConfigurator.ConfigureConsumer(context, consumer.ConsumerType);
});
}
Expand Down
5 changes: 4 additions & 1 deletion src/modules/Elsa.MassTransit/Features/MassTransitFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Elsa.MassTransit.Features;
/// </summary>
public class MassTransitFeature : FeatureBase
{
private bool _runInMemory;

/// <inheritdoc />
public MassTransitFeature(IModule module) : base(module)
{
Expand All @@ -48,6 +50,7 @@ public override void Apply()

Services.Configure<MassTransitWorkflowDispatcherOptions>(x => { });
Services.AddActivityProvider<MassTransitActivityTypeProvider>();
_runInMemory = BusConfigurator is null;
var busConfigurator = BusConfigurator ??= ConfigureInMemoryTransport;
AddMassTransit(busConfigurator);

Expand Down Expand Up @@ -81,7 +84,7 @@ private void AddMassTransit(Action<IBusRegistrationConfigurator> busConfigurator

// Concatenate the manually registered consumers with the workflow message consumers.
var consumerTypeDefinitions = this.GetConsumers()
.Where(c => !c.IsTemporary)
.Where(c => !c.IsTemporary || _runInMemory)
.Concat(workflowMessageConsumers)
.ToArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ public override void Configure()
{
Module.AddMassTransitConsumer<DispatchWorkflowRequestConsumer, DispatchWorkflowRequestConsumerDefinition>();
Module.AddMassTransitConsumer<DispatchCancelWorkflowsRequestConsumer>("elsa-dispatch-cancel-workflow", true);
Module.Configure<WorkflowRuntimeFeature>(f => f.WorkflowDispatcher = sp =>
Module.Configure<WorkflowRuntimeFeature>(f =>
{
var decoratedService = ActivatorUtilities.CreateInstance<MassTransitWorkflowDispatcher>(sp);
return ActivatorUtilities.CreateInstance<ValidatingWorkflowDispatcher>(sp, decoratedService);
f.WorkflowDispatcher = sp =>
{
var decoratedService = ActivatorUtilities.CreateInstance<MassTransitWorkflowDispatcher>(sp);
return ActivatorUtilities.CreateInstance<ValidatingWorkflowDispatcher>(sp, decoratedService);
};
f.WorkflowCancellationDispatcher = sp => sp.GetRequiredService<MassTransitWorkflowCancellationDispatcher>();
});
}

Expand All @@ -58,5 +63,6 @@ public override void Apply()
options.Configure(ConfigureDispatcherOptions);

Services.AddSingleton(ChannelQueueFormatterFactory);
Services.AddScoped<MassTransitWorkflowCancellationDispatcher>();
}
}

0 comments on commit 20f0134

Please sign in to comment.