Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async workflow state exporter for DB workflow instance #3291

Merged
merged 3 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ packages/

#Rider
.idea
.run

#wwwroot
wwwroot/
Expand All @@ -59,6 +60,7 @@ App_Data/
*.db
*.db-shm
*.db-wal
.DS_Store

# Fody - auto-generated XML schema
FodyWeavers.xsd
Expand Down
5 changes: 1 addition & 4 deletions Elsa.sln
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.Console3", "sr
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "worker", "worker", "{389D40B8-005F-46A1-9493-1FE6065F04FD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.Worker1", "src\samples\Elsa.Samples.Worker1\Elsa.Samples.Worker1.csproj", "{197223FB-2472-442C-BD10-2E1F931285BC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Persistence.EntityFrameworkCore", "src\modules\Elsa.Persistence.EntityFrameworkCore\Elsa.Persistence.EntityFrameworkCore.csproj", "{7CC95512-7F44-45C3-A669-95B7048D4730}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Persistence.EntityFrameworkCore.Sqlite", "src\modules\Elsa.Persistence.EntityFrameworkCore.Sqlite\Elsa.Persistence.EntityFrameworkCore.Sqlite.csproj", "{CAC22763-4A1C-4FC5-AC0D-6148927596D6}"
Expand Down Expand Up @@ -282,8 +280,6 @@ Global
{26087ED2-9249-4FF3-8869-C11C405E441A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{517B703A-7653-4036-AAB5-7B4293D39D0F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{517B703A-7653-4036-AAB5-7B4293D39D0F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{197223FB-2472-442C-BD10-2E1F931285BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{197223FB-2472-442C-BD10-2E1F931285BC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CC95512-7F44-45C3-A669-95B7048D4730}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7CC95512-7F44-45C3-A669-95B7048D4730}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7CC95512-7F44-45C3-A669-95B7048D4730}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -343,5 +339,6 @@ Global
{26087ED2-9249-4FF3-8869-C11C405E441A} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
{7CC95512-7F44-45C3-A669-95B7048D4730} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
{CAC22763-4A1C-4FC5-AC0D-6148927596D6} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
{389D40B8-005F-46A1-9493-1FE6065F04FD} = {155227F0-A33B-40AA-A4B4-06F813EB921B}
EndGlobalSection
EndGlobal
7 changes: 6 additions & 1 deletion src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using Elsa.Workflows.Management.Extensions;
using Elsa.Workflows.Management.Services;
using Elsa.Workflows.Runtime.Extensions;
using Elsa.Workflows.Runtime.Implementations;
using Elsa.WorkflowServer.Web.Jobs;
using FastEndpoints;
using Microsoft.AspNetCore.Authentication.JwtBearer;
Expand Down Expand Up @@ -68,7 +69,11 @@
identity.IdentityOptions = options => identitySection.Bind(options);
})
//.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString))))
.UseRuntime(runtime => runtime.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)))
.UseRuntime(runtime =>
{
runtime.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString));
runtime.WorkflowStateExporter = sp => sp.GetRequiredService<AsyncWorkflowStateExporter>();
})
.UseLabels(labels => labels.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)))
.UseActivityDefinitions(feature => feature.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)))
.UseJobActivities()
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.JavaScript/Elsa.JavaScript.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Jint" Version="3.0.0-beta-2083" />
<PackageReference Include="Jint" Version="3.0.0-preview-328" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Workflows.Runtime.Commands;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Services;
using MassTransit;
Expand Down
1 change: 1 addition & 0 deletions src/modules/Elsa.Mediator/Elsa.Mediator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
using Elsa.Features.Services;
using Elsa.Mediator.Features;
using Elsa.Mediator.HostedServices;
using Elsa.Mediator.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Elsa.Mediator.Extensions;

Expand All @@ -24,8 +27,10 @@ public static IModule AddMediator(this IModule module, Action<MediatorFeature>?
return module;
}

public static IServiceCollection AddMediator(this IServiceCollection services)
{
public static IServiceCollection AddMediator(this IServiceCollection services, Action<MediatorOptions>? configure = default)
{
services.Configure(configure ?? (_ => { }));

return services
.AddSingleton<IMediator, DefaultMediator>()
.AddSingleton<IRequestSender>(sp => sp.GetRequiredService<IMediator>())
Expand All @@ -36,13 +41,38 @@ public static IServiceCollection AddMediator(this IServiceCollection services)
.AddSingleton<IRequestPipeline, RequestPipeline>()
.AddSingleton<ICommandPipeline, CommandPipeline>()
.AddSingleton<INotificationPipeline, NotificationPipeline>()
.AddHostedService<BackgroundCommandSenderHostedService>()
.AddHostedService<BackgroundEventPublisherHostedService>()

.AddHostedService(sp =>
{
var options = sp.GetRequiredService<IOptions<MediatorOptions>>().Value;
return ActivatorUtilities.CreateInstance<BackgroundCommandSenderHostedService>(sp, options.CommandWorkerCount);
})

.AddHostedService(sp =>
{
var options = sp.GetRequiredService<IOptions<MediatorOptions>>().Value;
return ActivatorUtilities.CreateInstance<BackgroundEventPublisherHostedService>(sp, options.NotificationWorkerCount);
})

.CreateChannel<ICommand>()
.CreateChannel<INotification>()
;
}

public static IServiceCollection AddConsumer<T, TConsumer>(this IServiceCollection services, int workers = 1) where TConsumer : class, IConsumer<T>
{
services.AddSingleton<IConsumer<T>, TConsumer>();

services.AddHostedService(sp =>
{
var channel = Channel.CreateUnbounded<T>();
var consumer = sp.GetRequiredService<IConsumer<T>>();
var logger = sp.GetRequiredService<ILogger<MessageProcessorHostedService<T>>>();
return new MessageProcessorHostedService<T>(workers, channel, consumer, logger);
});
return services;
}

public static IServiceCollection AddCommandHandler<THandler, TCommand>(this IServiceCollection services)
where THandler : class, ICommandHandler<TCommand>
where TCommand : ICommand<Unit> =>
Expand Down Expand Up @@ -88,10 +118,9 @@ public static IServiceCollection AddHandlersFrom(this IServiceCollection service
public static IServiceCollection CreateChannel<T>(this IServiceCollection services) =>
services
.AddSingleton(CreateChannel<T>())
.AddSingleton(CreateChannelReader<T>)
.AddSingleton(CreateChannelWriter<T>);
.AddTransient(CreateChannelReader<T>)
.AddTransient(CreateChannelWriter<T>);


private static IServiceCollection AddHandlersFromInternal<TService, TMarker>(this IServiceCollection services) => services.AddHandlersFromInternal<TService>(typeof(TMarker));
private static IServiceCollection AddHandlersFromInternal<TService>(this IServiceCollection services, Type assemblyMarkerType) => services.AddHandlersFromInternal<TService>(assemblyMarkerType.Assembly);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,57 @@ namespace Elsa.Mediator.HostedServices;
/// </summary>
public class BackgroundCommandSenderHostedService : BackgroundService
{
private readonly int _workerCount;
private readonly ChannelReader<ICommand> _channelReader;
private readonly ICommandSender _commandSender;
private readonly IList<Channel<ICommand>> _outputs;
private readonly ILogger _logger;

public BackgroundCommandSenderHostedService(ChannelReader<ICommand> channelReader, ICommandSender commandSender, ILogger<BackgroundCommandSenderHostedService> logger)
public BackgroundCommandSenderHostedService(int workerCount, ChannelReader<ICommand> channelReader, ICommandSender commandSender, ILogger<BackgroundCommandSenderHostedService> logger)
{
_workerCount = workerCount;
_channelReader = channelReader;
_commandSender = commandSender;
_logger = logger;
_outputs = new List<Channel<ICommand>>(workerCount);
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await foreach (var request in _channelReader.ReadAllAsync(cancellationToken))
var index = 0;

for (var i = 0; i < _workerCount; i++)
{
var output = Channel.CreateUnbounded<ICommand>();
_outputs.Add(output);
_ = ReadOutputAsync(output, cancellationToken);
}

await foreach (var command in _channelReader.ReadAllAsync(cancellationToken))
{
var output = _outputs[index];
await output.Writer.WriteAsync(command, cancellationToken);
index = (index + 1) % _workerCount;
}

foreach (var output in _outputs)
{
output.Writer.Complete();
}
}

private async Task ReadOutputAsync(Channel<ICommand> output, CancellationToken cancellationToken)
{
await foreach (var command in output.Reader.ReadAllAsync(cancellationToken))
{
try
{
await _commandSender.ExecuteAsync(request, cancellationToken);
await _commandSender.ExecuteAsync(command, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "An unhandled exception occured while processing the queue");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,44 @@ public class BackgroundEventPublisherHostedService : BackgroundService
{
private readonly ChannelReader<INotification> _channelReader;
private readonly IEventPublisher _eventPublisher;
private readonly IList<Channel<INotification>> _outputs;
private readonly ILogger _logger;

public BackgroundEventPublisherHostedService(ChannelReader<INotification> channelReader, IEventPublisher eventPublisher, ILogger<BackgroundEventPublisherHostedService> logger)
public BackgroundEventPublisherHostedService(int workerCount, ChannelReader<INotification> channelReader, IEventPublisher eventPublisher, ILogger<BackgroundEventPublisherHostedService> logger)
{
_channelReader = channelReader;
_eventPublisher = eventPublisher;
_logger = logger;
_outputs = new List<Channel<INotification>>(workerCount);
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var index = 0;

for (var i = 0; i < _outputs.Count; i++)
{
var output = Channel.CreateUnbounded<INotification>();
_outputs[i] = output;
_ = ReadOutputAsync(output, cancellationToken);
}

await foreach (var notification in _channelReader.ReadAllAsync(cancellationToken))
{
var output = _outputs[index];
await output.Writer.WriteAsync(notification, cancellationToken);
index = (index + 1) % _outputs.Count;
}

foreach (var output in _outputs)
{
output.Writer.Complete();
}
}

private async Task ReadOutputAsync(Channel<INotification> output, CancellationToken cancellationToken)
{
await foreach (var notification in output.Reader.ReadAllAsync(cancellationToken))
{
try
{
Expand All @@ -32,7 +58,7 @@ await foreach (var notification in _channelReader.ReadAllAsync(cancellationToken
catch (Exception e)
{
_logger.LogError(e, "An unhandled exception occured while processing the queue");
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Threading.Channels;
using Elsa.Mediator.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Elsa.Mediator.HostedServices;

/// <summary>
/// Continuously reads from a channel to which commands can be sent, executing each received command.
/// </summary>
public class MessageProcessorHostedService<T> : BackgroundService
{
private readonly Channel<T> _channel;
private readonly IConsumer<T> _consumer;
private readonly ILogger _logger;
private readonly IList<MessageWorker<T>> _workers;

public MessageProcessorHostedService(int workerCount, Channel<T> channel, IConsumer<T> consumer, ILogger<MessageProcessorHostedService<T>> logger)
{
_channel = channel;
_consumer = consumer;
_logger = logger;
_workers = new List<MessageWorker<T>>(workerCount);
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var index = 0;

for (var i = 0; i < _workers.Count; i++)
{
var worker = new MessageWorker<T>(Channel.CreateUnbounded<T>(), _consumer);
_workers[i] = worker;
_ = worker.StartAsync(cancellationToken);
}

await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken))
{
var worker = _workers[index];
await worker.DeliverMessageAsync(message, cancellationToken);
index = (index + 1) % _workers.Count;
}

foreach (var worker in _workers)
worker.Complete();

_workers.Clear();
}
}

public class MessageWorker<T>
{
private readonly Channel<T> _channel;
private readonly IConsumer<T> _consumer;

public MessageWorker(Channel<T> channel, IConsumer<T> consumer)
{
_channel = channel;
_consumer = consumer;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken))
await _consumer.ConsumeAsync(message, cancellationToken);
}

public async Task DeliverMessageAsync(T message, CancellationToken cancellationToken)
{
await _channel.Writer.WriteAsync(message, cancellationToken);
}

public void Complete()
{
_channel.Writer.Complete();
}
}
21 changes: 21 additions & 0 deletions src/modules/Elsa.Mediator/Implementations/DefaultMessageSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Threading.Channels;
using Elsa.Mediator.Services;
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Mediator.Implementations;

public class DefaultMessageSender : IMessageSender
{
private readonly IServiceProvider _serviceProvider;

public DefaultMessageSender(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
{
var channel = _serviceProvider.GetRequiredService<Channel<T>>();
await channel.Writer.WriteAsync(message, cancellationToken);
}
}
7 changes: 7 additions & 0 deletions src/modules/Elsa.Mediator/Options/MediatorOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Elsa.Mediator.Options;

public class MediatorOptions
{
public int CommandWorkerCount { get; set; } = 4;
public int NotificationWorkerCount { get; set; } = 4;
}
6 changes: 6 additions & 0 deletions src/modules/Elsa.Mediator/Services/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.Mediator.Services;

public interface IConsumer<in T>
{
ValueTask ConsumeAsync(T message, CancellationToken cancellationToken);
}
Loading