From 2aaf7bddef4eac9c0f3f6ad205b9090c3ec40d21 Mon Sep 17 00:00:00 2001 From: DanilF Date: Mon, 19 Oct 2020 16:18:20 -0400 Subject: [PATCH] Add middleware runner, step executor, and ability to run middleware around workflow steps and before/after workflow Add sample with a sample middleware for retrying and log correlation as well as workflow pre/post samples Add async overloads for StartWorkflow and WaitForWorkflowToComplete in integration tests Add error handling of post workflow middleware Add docs for workflow middleware --- WorkflowCore.sln | 7 + docs/samples.md | 4 +- docs/workflow-middleware.md | 279 ++++++++++++++++++ mkdocs.yml | 1 + .../Models/v1/DefinitionSourceV1.cs | 1 - .../Services/DefinitionLoader.cs | 4 +- src/WorkflowCore/Interface/IStepExecutor.cs | 22 ++ .../Interface/IWorkflowBuilder.cs | 7 +- .../Interface/IWorkflowMiddleware.cs | 41 +++ .../IWorkflowMiddlewareErrorHandler.cs | 18 ++ .../Interface/IWorkflowMiddlewareRunner.cs | 33 +++ .../Interface/IWorkflowStepMiddleware.cs | 28 ++ src/WorkflowCore/Models/WorkflowDefinition.cs | 12 +- src/WorkflowCore/Models/WorkflowDelegate.cs | 9 + .../Models/WorkflowStepDelegate.cs | 9 + .../ServiceCollectionExtensions.cs | 39 ++- .../DefaultWorkflowMiddlewareErrorHandler.cs | 32 ++ .../FluentBuilders/WorkflowBuilder.cs | 2 +- src/WorkflowCore/Services/StepExecutor.cs | 52 ++++ .../Services/WorkflowController.cs | 10 +- src/WorkflowCore/Services/WorkflowExecutor.cs | 34 ++- .../Services/WorkflowMiddlewareRunner.cs | 96 ++++++ .../FlakyConnectionParams.cs | 9 + .../FlakyConnectionWorkflow.cs | 25 ++ .../IDescriptiveWorkflowParams.cs | 7 + .../AddDescriptionWorkflowMiddleware.cs | 20 ++ .../LogCorrelationStepMiddleware.cs | 38 +++ .../Middleware/PollyRetryMiddleware.cs | 63 ++++ .../PrintWorkflowSummaryMiddleware.cs | 41 +++ src/samples/WorkflowCore.Sample19/Program.cs | 66 +++++ .../Steps/FlakyConnection.cs | 27 ++ .../WorkflowCore.Sample19/Steps/LogMessage.cs | 29 ++ .../WorkflowCore.Sample19.csproj | 20 ++ .../Scenarios/MiddlewareScenario.cs | 167 +++++++++++ .../Scenarios/StoredJsonScenario.cs | 3 +- .../Scenarios/StoredYamlScenario.cs | 3 +- .../DistributedLockProviderTests.cs | 11 +- .../stored-definition.json | 2 +- test/WorkflowCore.Testing/JsonWorkflowTest.cs | 4 +- test/WorkflowCore.Testing/WorkflowTest.cs | 24 +- test/WorkflowCore.Testing/YamlWorkflowTest.cs | 3 +- .../Services/StepExecutorTests.cs | 148 ++++++++++ .../Services/WorkflowExecutorFixture.cs | 45 ++- .../Services/WorkflowMiddlewareRunnerTests.cs | 276 +++++++++++++++++ 44 files changed, 1713 insertions(+), 58 deletions(-) create mode 100644 docs/workflow-middleware.md create mode 100644 src/WorkflowCore/Interface/IStepExecutor.cs create mode 100644 src/WorkflowCore/Interface/IWorkflowMiddleware.cs create mode 100644 src/WorkflowCore/Interface/IWorkflowMiddlewareErrorHandler.cs create mode 100644 src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs create mode 100644 src/WorkflowCore/Interface/IWorkflowStepMiddleware.cs create mode 100644 src/WorkflowCore/Models/WorkflowDelegate.cs create mode 100644 src/WorkflowCore/Models/WorkflowStepDelegate.cs create mode 100644 src/WorkflowCore/Services/DefaultWorkflowMiddlewareErrorHandler.cs create mode 100644 src/WorkflowCore/Services/StepExecutor.cs create mode 100644 src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs create mode 100644 src/samples/WorkflowCore.Sample19/FlakyConnectionParams.cs create mode 100644 src/samples/WorkflowCore.Sample19/FlakyConnectionWorkflow.cs create mode 100644 src/samples/WorkflowCore.Sample19/IDescriptiveWorkflowParams.cs create mode 100644 src/samples/WorkflowCore.Sample19/Middleware/AddDescriptionWorkflowMiddleware.cs create mode 100644 src/samples/WorkflowCore.Sample19/Middleware/LogCorrelationStepMiddleware.cs create mode 100644 src/samples/WorkflowCore.Sample19/Middleware/PollyRetryMiddleware.cs create mode 100644 src/samples/WorkflowCore.Sample19/Middleware/PrintWorkflowSummaryMiddleware.cs create mode 100644 src/samples/WorkflowCore.Sample19/Program.cs create mode 100644 src/samples/WorkflowCore.Sample19/Steps/FlakyConnection.cs create mode 100644 src/samples/WorkflowCore.Sample19/Steps/LogMessage.cs create mode 100644 src/samples/WorkflowCore.Sample19/WorkflowCore.Sample19.csproj create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/MiddlewareScenario.cs create mode 100644 test/WorkflowCore.UnitTests/Services/StepExecutorTests.cs create mode 100644 test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs diff --git a/WorkflowCore.sln b/WorkflowCore.sln index 9231faa76..09cda3de8 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -147,6 +147,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchP EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.QueueProviders.RabbitMQ", "test\WorkflowCore.Tests.QueueProviders.RabbitMQ\WorkflowCore.Tests.QueueProviders.RabbitMQ.csproj", "{54DE20BA-EBA7-4BF0-9BD9-F03766849716}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample19", "src\samples\WorkflowCore.Sample19\WorkflowCore.Sample19.csproj", "{1223ED47-3E5E-4960-B70D-DFAF550F6666}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -361,6 +363,10 @@ Global {54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Debug|Any CPU.Build.0 = Debug|Any CPU {54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.ActiveCfg = Release|Any CPU {54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.Build.0 = Release|Any CPU + {1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -421,6 +427,7 @@ Global {E32CF21A-29CC-46D1-8044-FCC327F2B281} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} {54DE20BA-EBA7-4BF0-9BD9-F03766849716} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} + {1223ED47-3E5E-4960-B70D-DFAF550F6666} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4} diff --git a/docs/samples.md b/docs/samples.md index a1b862799..f69290c57 100644 --- a/docs/samples.md +++ b/docs/samples.md @@ -32,4 +32,6 @@ [Exposing a REST API](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WebApiSample) -[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08) \ No newline at end of file +[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08) + +[Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19) diff --git a/docs/workflow-middleware.md b/docs/workflow-middleware.md new file mode 100644 index 000000000..488d0c7ad --- /dev/null +++ b/docs/workflow-middleware.md @@ -0,0 +1,279 @@ +# Workflow Middleware + +Workflows can be extended with Middleware that run before/after workflows start/complete as well as around workflow steps to provide flexibility in implementing cross-cutting concerns such as [log correlation](https://www.frakkingsweet.com/net-core-log-correlation-easy-access-to-headers/), [retries](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/implement-resilient-applications/implement-http-call-retries-exponential-backoff-polly), and other use-cases. + +This is done by implementing and registering `IWorkflowMiddleware` for workflows or `IWorkflowStepMiddleware` for steps. + +## Step Middleware + +Step middleware lets you run additional code around the execution of a given step and alter its behavior. Implementing a step middleware should look familiar to anyone familiar with [ASP.NET Core's middleware pipeline](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-3.1) or [`HttpClient`'s `DelegatingHandler` middleware](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/http-requests?view=aspnetcore-3.1#outgoing-request-middleware). + +### Usage + +First, create your own middleware class that implements `IWorkflowStepMiddleware`. Here's an example of a middleware that adds workflow ID and step ID to the log correlation context of every workflow step in your app. + +**Important:** You must make sure to call `next()` as part of your middleware. If you do not do this, your step will never run. + +```cs +public class LogCorrelationStepMiddleware : IWorkflowStepMiddleware +{ + private readonly ILogger _log; + + public LogCorrelationStepMiddleware( + ILogger log) + { + _log = log; + } + + public async Task HandleAsync( + IStepExecutionContext context, + IStepBody body, + WorkflowStepDelegate next) + { + var workflowId = context.Workflow.Id; + var stepId = context.Step.Id; + + // Uses log scope to add a few attributes to the scope + using (_log.BeginScope("{@WorkflowId}", workflowId)) + using (_log.BeginScope("{@StepId}", stepId)) + { + // Calling next ensures step gets executed + return await next(); + } + } +} +``` + +Here's another example of a middleware that uses the [Polly](https://github.com/App-vNext/Polly) dotnet resiliency library to implement retries on workflow steps based off a custom retry policy. + +```cs +public class PollyRetryStepMiddleware : IWorkflowStepMiddleware +{ + private const string StepContextKey = "WorkflowStepContext"; + private const int MaxRetries = 3; + private readonly ILogger _log; + + public PollyRetryMiddleware(ILogger log) + { + _log = log; + } + + // Consult Polly's docs for more information on how to build + // retry policies: + // https://github.com/App-vNext/Polly + public IAsyncPolicy GetRetryPolicy() => + Policy + .Handle() + .RetryAsync( + MaxRetries, + (result, retryCount, context) => + UpdateRetryCount( + result.Exception, + retryCount, + context[StepContextKey] as IStepExecutionContext) + ); + + public async Task HandleAsync( + IStepExecutionContext context, + IStepBody body, + WorkflowStepDelegate next + ) + { + return await GetRetryPolicy().ExecuteAsync( + ctx => next(), + // The step execution context gets passed down so that + // the step is accessible within the retry policy + new Dictionary + { + { StepContextKey, context } + }); + } + + private Task UpdateRetryCount( + Exception exception, + int retryCount, + IStepExecutionContext stepContext) + { + var stepInstance = stepContext.ExecutionPointer; + stepInstance.RetryCount = retryCount; + return Task.CompletedTask; + } +} +``` + +## Pre/Post Workflow Middleware + +Workflow middleware run either before a workflow starts or after a workflow completes and can be used to hook into the workflow lifecycle or alter the workflow itself before it is started. + +### Pre Workflow Middleware + +These middleware get run before the workflow is started and can potentially alter properties on the `WorkflowInstance`. + +The following example illustrates setting the `Description` property on the `WorkflowInstance` using a middleware that interprets the data on the passed workflow. This is useful in cases where you want the description of the workflow to be derived from the data passed to the workflow. + +Note that you use `WorkflowMiddlewarePhase.PreWorkflow` to specify that it runs before the workflow starts. + +**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs. + +```cs +// AddDescriptionWorkflowMiddleware.cs +public class AddDescriptionWorkflowMiddleware : IWorkflowMiddleware +{ + public WorkflowMiddlewarePhase Phase => + WorkflowMiddlewarePhase.PreWorkflow; + + public Task HandleAsync( + WorkflowInstance workflow, + WorkflowDelegate next + ) + { + if (workflow.Data is IDescriptiveWorkflowParams descriptiveParams) + { + workflow.Description = descriptiveParams.Description; + } + + return next(); + } +} + +// IDescriptiveWorkflowParams.cs +public interface IDescriptiveWorkflowParams +{ + string Description { get; } +} + +// MyWorkflowParams.cs +public MyWorkflowParams : IDescriptiveWorkflowParams +{ + public string Description => $"Run task '{TaskName}'"; + + public string TaskName { get; set; } +} +``` + +### Exception Handling in Pre Workflow Middleware + +Pre workflow middleware exception handling gets treated differently from post workflow middleware. Since the middleware runs before the workflow starts, any exceptions thrown within a pre workflow middleware will bubble up to the `StartWorkflow` method and it is up to the caller of `StartWorkflow` to handle the exception and act accordingly. + +```cs +public async Task MyMethodThatStartsAWorkflow() +{ + try + { + await host.StartWorkflow("HelloWorld", 1, null); + } + catch(Exception ex) + { + // Handle the exception appropriately + } +} +``` + +### Post Workflow Middleware + +These middleware get run after the workflow has completed and can be used to perform additional actions for all workflows in your app. + +The following example illustrates how you can use a post workflow middleware to print a summary of the workflow to console. + +Note that you use `WorkflowMiddlewarePhase.PostWorkflow` to specify that it runs after the workflow completes. + +**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs. + +```cs +public class PrintWorkflowSummaryMiddleware : IWorkflowMiddleware +{ + private readonly ILogger _log; + + public PrintWorkflowSummaryMiddleware( + ILogger log + ) + { + _log = log; + } + + public WorkflowMiddlewarePhase Phase => + WorkflowMiddlewarePhase.PostWorkflow; + + public Task HandleAsync( + WorkflowInstance workflow, + WorkflowDelegate next + ) + { + if (!workflow.CompleteTime.HasValue) + { + return next(); + } + + var duration = workflow.CompleteTime.Value - workflow.CreateTime; + _log.LogInformation($@"Workflow {workflow.Description} completed in {duration:g}"); + + foreach (var step in workflow.ExecutionPointers) + { + var stepName = step.StepName; + var stepDuration = (step.EndTime - step.StartTime) ?? TimeSpan.Zero; + _log.LogInformation($" - Step {stepName} completed in {stepDuration:g}"); + } + + return next(); + } +} +``` + +### Exception Handling in Post Workflow Middleware + +Post workflow middleware exception handling gets treated differently from pre workflow middleware. At the time that the workflow completes, your workflow has ran already so an uncaught exception would be difficult to act on. + +By default, if a workflow middleware throws an exception, it will be logged and the workflow will complete as normal. This behavior can be changed, however. + +To override the default post workflow error handling for all workflows in your app, just register a new `IWorkflowMiddlewareErrorHandler` in the dependency injection framework with your custom behavior as follows. + +```cs +// CustomMiddlewareErrorHandler.cs +public class CustomHandler : IWorkflowMiddlewareErrorHandler +{ + public Task HandleAsync(Exception ex) + { + // Handle your error asynchronously + } +} + +// Startup.cs +public void ConfigureServices(IServiceCollection services) +{ + // Other workflow configuration + services.AddWorkflow(); + + // Should go after .AddWorkflow() + services.AddTransient(); +} +``` + +## Registering Middleware + +In order for middleware to take effect, they must be registered with the built-in dependency injection framework using the convenience helpers. + +**Note:** Middleware will be run in the order that they are registered with middleware that are registered earlier running earlier in the chain and finishing later in the chain. For pre/post workflow middleware, all pre middleware will be run before a workflow starts and all post middleware will be run after a workflow completes. + +```cs +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + ... + + // Add workflow middleware + services.AddWorkflowMiddleware(); + services.AddWorkflowMiddleware(); + + // Add step middleware + services.AddWorkflowStepMiddleware(); + services.AddWorkflowStepMiddleware(); + + ... + } +} +``` + +## More Information + +See the [Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19) sample for full examples of workflow middleware in action. diff --git a/mkdocs.yml b/mkdocs.yml index fcbf9e4a8..57ed12c94 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -9,6 +9,7 @@ nav: - Saga transactions: sagas.md - JSON / YAML Definitions: json-yaml.md - Persistence: persistence.md + - Middleware: workflow-middleware.md - Multi-node clusters: multi-node-clusters.md - ASP.NET Core: using-with-aspnet-core.md - Elasticsearch plugin: elastic-search.md diff --git a/src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs b/src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs index 1de64c35e..46f0521ca 100644 --- a/src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs +++ b/src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Text; namespace WorkflowCore.Models.DefinitionStorage.v1 { diff --git a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs index 810d6338b..0f5a89558 100644 --- a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs +++ b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs @@ -93,7 +93,7 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty AttachInputs(nextStep, dataType, stepType, targetStep); AttachOutputs(nextStep, dataType, stepType, targetStep); - + if (nextStep.Do != null) { foreach (var branch in nextStep.Do) @@ -242,7 +242,7 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste var outcomeParameter = Expression.Parameter(typeof(object), "outcome"); foreach (var nextStep in source.SelectNextStep) - { + { var sourceDelegate = DynamicExpressionParser.ParseLambda(new[] { dataParameter, outcomeParameter }, typeof(object), nextStep.Value).Compile(); Expression> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome)); step.Outcomes.Add(new ExpressionOutcome(sourceExpr) diff --git a/src/WorkflowCore/Interface/IStepExecutor.cs b/src/WorkflowCore/Interface/IStepExecutor.cs new file mode 100644 index 000000000..aed3d3026 --- /dev/null +++ b/src/WorkflowCore/Interface/IStepExecutor.cs @@ -0,0 +1,22 @@ +using System.Threading.Tasks; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + /// + /// Executes a workflow step. + /// + public interface IStepExecutor + { + /// + /// Runs the passed in the given . + /// + /// The in which to execute the step. + /// The body. + /// A to wait for the result of running the step + Task ExecuteStep( + IStepExecutionContext context, + IStepBody body + ); + } +} diff --git a/src/WorkflowCore/Interface/IWorkflowBuilder.cs b/src/WorkflowCore/Interface/IWorkflowBuilder.cs index ec7b7bad2..b7b371882 100644 --- a/src/WorkflowCore/Interface/IWorkflowBuilder.cs +++ b/src/WorkflowCore/Interface/IWorkflowBuilder.cs @@ -9,7 +9,7 @@ public interface IWorkflowBuilder { List Steps { get; } - int LastStep { get; } + int LastStep { get; } IWorkflowBuilder UseData(); @@ -21,7 +21,7 @@ public interface IWorkflowBuilder } public interface IWorkflowBuilder : IWorkflowBuilder, IWorkflowModifier - { + { IStepBuilder StartWith(Action> stepSetup = null) where TStep : IStepBody; IStepBuilder StartWith(Func body); @@ -33,6 +33,5 @@ public interface IWorkflowBuilder : IWorkflowBuilder, IWorkflowModifier UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null); IWorkflowBuilder CreateBranch(); - } -} \ No newline at end of file +} diff --git a/src/WorkflowCore/Interface/IWorkflowMiddleware.cs b/src/WorkflowCore/Interface/IWorkflowMiddleware.cs new file mode 100644 index 000000000..71781b30d --- /dev/null +++ b/src/WorkflowCore/Interface/IWorkflowMiddleware.cs @@ -0,0 +1,41 @@ +using System.Threading.Tasks; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + /// + /// Determines at which point to run the middleware. + /// + public enum WorkflowMiddlewarePhase + { + /// + /// The middleware should run before a workflow starts. + /// + PreWorkflow, + + /// + /// The middleware should run after a workflow completes. + /// + PostWorkflow + } + + /// + /// Middleware that can run before a workflow starts or after a workflow completes. + /// + public interface IWorkflowMiddleware + { + /// + /// The phase in the workflow execution to run this middleware in + /// + WorkflowMiddlewarePhase Phase { get; } + + /// + /// Runs the middleware on the given . + /// + /// The . + /// The next middleware in the chain. + /// A that completes asynchronously once the + /// middleware chain finishes running. + Task HandleAsync(WorkflowInstance workflow, WorkflowDelegate next); + } +} diff --git a/src/WorkflowCore/Interface/IWorkflowMiddlewareErrorHandler.cs b/src/WorkflowCore/Interface/IWorkflowMiddlewareErrorHandler.cs new file mode 100644 index 000000000..5522bae39 --- /dev/null +++ b/src/WorkflowCore/Interface/IWorkflowMiddlewareErrorHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; + +namespace WorkflowCore.Interface +{ + /// + /// Handles exceptions within workflow middleware. + /// + public interface IWorkflowMiddlewareErrorHandler + { + /// + /// Asynchronously handle the given exception. + /// + /// The exception to handle + /// A task that completes when handling is done. + Task HandleAsync(Exception ex); + } +} diff --git a/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs b/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs new file mode 100644 index 000000000..96aab8b74 --- /dev/null +++ b/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs @@ -0,0 +1,33 @@ +using System.Threading.Tasks; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + /// + /// Runs workflow pre/post middleware. + /// + public interface IWorkflowMiddlewareRunner + { + /// + /// Runs workflow-level middleware that is set to run at the + /// phase. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The to run for. + /// The definition. + /// A task that will complete when all middleware has run. + Task RunPreMiddleware(WorkflowInstance workflow, WorkflowDefinition def); + + /// + /// Runs workflow-level middleware that is set to run at the + /// phase. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The to run for. + /// The definition. + /// A task that will complete when all middleware has run. + Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def); + } +} diff --git a/src/WorkflowCore/Interface/IWorkflowStepMiddleware.cs b/src/WorkflowCore/Interface/IWorkflowStepMiddleware.cs new file mode 100644 index 000000000..91f888589 --- /dev/null +++ b/src/WorkflowCore/Interface/IWorkflowStepMiddleware.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + /// + /// Middleware that runs around a workflow step and can enhance or alter + /// the steps behavior. + /// + public interface IWorkflowStepMiddleware + { + /// + /// Handle the workflow step and return an + /// asynchronously. It is important to invoke at some point + /// in the middleware. Not doing so will prevent the workflow step from ever + /// getting executed. + /// + /// The step's context. + /// An instance of the step body that is going to be run. + /// The next middleware in the chain. + /// A of the workflow result. + Task HandleAsync( + IStepExecutionContext context, + IStepBody body, + WorkflowStepDelegate next + ); + } +} diff --git a/src/WorkflowCore/Models/WorkflowDefinition.cs b/src/WorkflowCore/Models/WorkflowDefinition.cs index f39a563ed..207457e19 100644 --- a/src/WorkflowCore/Models/WorkflowDefinition.cs +++ b/src/WorkflowCore/Models/WorkflowDefinition.cs @@ -18,14 +18,16 @@ public class WorkflowDefinition public WorkflowErrorHandling DefaultErrorBehavior { get; set; } - public TimeSpan? DefaultErrorRetryInterval { get; set; } + public Type OnPostMiddlewareError { get; set; } + + public TimeSpan? DefaultErrorRetryInterval { get; set; } } - public enum WorkflowErrorHandling - { - Retry = 0, - Suspend = 1, + public enum WorkflowErrorHandling + { + Retry = 0, + Suspend = 1, Terminate = 2, Compensate = 3 } diff --git a/src/WorkflowCore/Models/WorkflowDelegate.cs b/src/WorkflowCore/Models/WorkflowDelegate.cs new file mode 100644 index 000000000..2d8876163 --- /dev/null +++ b/src/WorkflowCore/Models/WorkflowDelegate.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace WorkflowCore.Models +{ + /// + /// Represents a function that executes before or after a workflow starts or completes. + /// + public delegate Task WorkflowDelegate(); +} diff --git a/src/WorkflowCore/Models/WorkflowStepDelegate.cs b/src/WorkflowCore/Models/WorkflowStepDelegate.cs new file mode 100644 index 000000000..21befb575 --- /dev/null +++ b/src/WorkflowCore/Models/WorkflowStepDelegate.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace WorkflowCore.Models +{ + /// + /// Represents a function that executes a workflow step and returns a result. + /// + public delegate Task WorkflowStepDelegate(); +} diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index 6fe511845..8598abe03 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -35,7 +35,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A services.AddSingleton(); services.AddSingleton(options); - services.AddSingleton(); + services.AddSingleton(); services.AddTransient(); services.AddTransient(); @@ -51,6 +51,9 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddTransient(); services.AddTransient(); @@ -69,6 +72,40 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A return services; } + + /// + /// Adds a middleware that will run around the execution of a workflow step. + /// + /// The services collection. + /// Optionally configure using your own factory. + /// The type of middleware. + /// It must implement . + /// The services collection for chaining. + public static IServiceCollection AddWorkflowStepMiddleware( + this IServiceCollection services, + Func factory = null) + where TMiddleware : class, IWorkflowStepMiddleware => + factory == null + ? services.AddTransient() + : services.AddTransient(factory); + + /// + /// Adds a middleware that will run either before a workflow is kicked off or after + /// a workflow completes. Specify the phase of the workflow execution process that + /// you want to execute this middleware using . + /// + /// The services collection. + /// Optionally configure using your own factory. + /// The type of middleware. + /// It must implement . + /// The services collection for chaining. + public static IServiceCollection AddWorkflowMiddleware( + this IServiceCollection services, + Func factory = null) + where TMiddleware : class, IWorkflowMiddleware => + factory == null + ? services.AddTransient() + : services.AddTransient(factory); } } diff --git a/src/WorkflowCore/Services/DefaultWorkflowMiddlewareErrorHandler.cs b/src/WorkflowCore/Services/DefaultWorkflowMiddlewareErrorHandler.cs new file mode 100644 index 000000000..99c4652ff --- /dev/null +++ b/src/WorkflowCore/Services/DefaultWorkflowMiddlewareErrorHandler.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; + +namespace WorkflowCore.Services +{ + /// + /// Default implementation of . Just logs the + /// thrown exception and moves on. + /// + public class DefaultWorkflowMiddlewareErrorHandler : IWorkflowMiddlewareErrorHandler + { + private readonly ILogger _log; + + public DefaultWorkflowMiddlewareErrorHandler(ILogger log) + { + _log = log; + } + + /// + /// Asynchronously handle the given exception. + /// + /// The exception to handle + /// A task that completes when handling is done. + public Task HandleAsync(Exception ex) + { + _log.LogError(ex, "An error occurred running workflow middleware: {Message}", ex.Message); + return Task.CompletedTask; + } + } +} diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs index f5acd7694..98788fa0c 100644 --- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs @@ -225,7 +225,7 @@ public IContainerStepBuilder ForEach(Expression ForEach(Expression> collection, Expression> runParallel) { return Start().ForEach(collection, runParallel); diff --git a/src/WorkflowCore/Services/StepExecutor.cs b/src/WorkflowCore/Services/StepExecutor.cs new file mode 100644 index 000000000..4e45574e5 --- /dev/null +++ b/src/WorkflowCore/Services/StepExecutor.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services +{ + /// + /// Executes the workflow step and applies any to the step. + /// + public class StepExecutor : IStepExecutor + { + private readonly IEnumerable _stepMiddleware; + + public StepExecutor( + IEnumerable stepMiddleware + ) + { + _stepMiddleware = stepMiddleware; + } + + /// + /// Runs the passed in the given while applying + /// any registered in the system. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The in which to execute the step. + /// The body. + /// A to wait for the result of running the step + public async Task ExecuteStep( + IStepExecutionContext context, + IStepBody body + ) + { + // Build the middleware chain by reducing over all the middleware in reverse starting with step body + // and building step delegates that call out to the next delegate in the chain + Task Step() => body.RunAsync(context); + var middlewareChain = _stepMiddleware + .Reverse() + .Aggregate( + (WorkflowStepDelegate) Step, + (previous, middleware) => () => middleware.HandleAsync(context, body, previous) + ); + + // Run the middleware chain + return await middlewareChain(); + } + } +} diff --git a/src/WorkflowCore/Services/WorkflowController.cs b/src/WorkflowCore/Services/WorkflowController.cs index 203da3139..20b7621d9 100755 --- a/src/WorkflowCore/Services/WorkflowController.cs +++ b/src/WorkflowCore/Services/WorkflowController.cs @@ -21,9 +21,10 @@ public class WorkflowController : IWorkflowController private readonly IExecutionPointerFactory _pointerFactory; private readonly ILifeCycleEventHub _eventHub; private readonly IServiceProvider _serviceProvider; + private readonly IWorkflowMiddlewareRunner _middlewareRunner; private readonly ILogger _logger; - public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, IExecutionPointerFactory pointerFactory, ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory, IServiceProvider serviceProvider) + public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, IExecutionPointerFactory pointerFactory, ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowMiddlewareRunner middlewareRunner) { _persistenceStore = persistenceStore; _lockProvider = lockProvider; @@ -32,6 +33,7 @@ public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLoc _pointerFactory = pointerFactory; _eventHub = eventHub; _serviceProvider = serviceProvider; + _middlewareRunner = middlewareRunner; _logger = loggerFactory.CreateLogger(); } @@ -45,7 +47,7 @@ public Task StartWorkflow(string workflowId, int? version, object data = return StartWorkflow(workflowId, version, data, reference); } - public Task StartWorkflow(string workflowId, TData data = null, string reference=null) + public Task StartWorkflow(string workflowId, TData data = null, string reference=null) where TData : class, new() { return StartWorkflow(workflowId, null, data, reference); @@ -83,6 +85,8 @@ public async Task StartWorkflow(string workflowId, int? version, wf.ExecutionPointers.Add(_pointerFactory.BuildGenesisPointer(def)); + await _middlewareRunner.RunPreMiddleware(wf, def); + string id = await _persistenceStore.CreateNewWorkflow(wf); await _queueProvider.QueueWork(id, QueueType.Workflow); await _queueProvider.QueueWork(id, QueueType.Index); @@ -230,4 +234,4 @@ public void RegisterWorkflow() _registry.RegisterWorkflow(wf); } } -} \ No newline at end of file +} diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index e557a2bb2..a70de87a2 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -22,10 +22,12 @@ public class WorkflowExecutor : IWorkflowExecutor private readonly ICancellationProcessor _cancellationProcessor; private readonly ILifeCycleEventPublisher _publisher; private readonly WorkflowOptions _options; + private readonly IStepExecutor _stepExecutor; + private readonly IWorkflowMiddlewareRunner _middlewareRunner; private IWorkflowHost Host => _serviceProvider.GetService(); - public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IScopeProvider scopeProvider, IDateTimeProvider datetimeProvider, IExecutionResultProcessor executionResultProcessor, ILifeCycleEventPublisher publisher, ICancellationProcessor cancellationProcessor, WorkflowOptions options, ILoggerFactory loggerFactory) + public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IScopeProvider scopeProvider, IDateTimeProvider datetimeProvider, IExecutionResultProcessor executionResultProcessor, ILifeCycleEventPublisher publisher, ICancellationProcessor cancellationProcessor, WorkflowOptions options, IWorkflowMiddlewareRunner middlewareRunner, IStepExecutor stepExecutor, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; _scopeProvider = scopeProvider; @@ -36,6 +38,8 @@ public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProv _options = options; _logger = loggerFactory.CreateLogger(); _executionResultProcessor = executionResultProcessor; + _middlewareRunner = middlewareRunner; + _stepExecutor = stepExecutor; } public async Task Execute(WorkflowInstance workflow, CancellationToken cancellationToken = default) @@ -49,7 +53,7 @@ public async Task Execute(WorkflowInstance workflow, Can _logger.LogError("Workflow {0} version {1} is not registered", workflow.WorkflowDefinitionId, workflow.Version); return wfResult; } - + _cancellationProcessor.ProcessCancellations(workflow, def, wfResult); foreach (var pointer in exePointers) @@ -71,10 +75,10 @@ public async Task Execute(WorkflowInstance workflow, Can }); continue; } - + try { - if (!InitializeStep(workflow, step, wfResult, def, pointer)) + if (!InitializeStep(workflow, step, wfResult, def, pointer)) continue; await ExecuteStep(workflow, step, pointer, wfResult, def, cancellationToken); @@ -89,14 +93,14 @@ public async Task Execute(WorkflowInstance workflow, Can ErrorTime = _datetimeProvider.UtcNow, Message = ex.Message }); - + _executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex); Host.ReportStepError(workflow, step, ex); } _cancellationProcessor.ProcessCancellations(workflow, def, wfResult); } ProcessAfterExecutionIteration(workflow, def, wfResult); - DetermineNextExecutionTime(workflow); + await DetermineNextExecutionTime(workflow, def); return wfResult; } @@ -147,7 +151,7 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe Item = pointer.ContextItem, CancellationToken = cancellationToken }; - + using (var scope = _scopeProvider.CreateScope(context)) { _logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id); @@ -181,7 +185,7 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe return; } - var result = await body.RunAsync(context); + var result = await _stepExecutor.ExecuteStep(context, body); if (result.Proceed) { @@ -205,7 +209,7 @@ private void ProcessAfterExecutionIteration(WorkflowInstance workflow, WorkflowD } } - private void DetermineNextExecutionTime(WorkflowInstance workflow) + private async Task DetermineNextExecutionTime(WorkflowInstance workflow, WorkflowDefinition def) { //TODO: move to own class workflow.NextExecution = null; @@ -229,9 +233,9 @@ private void DetermineNextExecutionTime(WorkflowInstance workflow) { foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count > 0)) { - if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue)) + if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue)) continue; - + if (!pointer.SleepUntil.HasValue) { workflow.NextExecution = 0; @@ -243,11 +247,14 @@ private void DetermineNextExecutionTime(WorkflowInstance workflow) } } - if ((workflow.NextExecution != null) || (workflow.ExecutionPointers.Any(x => x.EndTime == null))) + if ((workflow.NextExecution != null) || (workflow.ExecutionPointers.Any(x => x.EndTime == null))) return; - + workflow.Status = WorkflowStatus.Complete; workflow.CompleteTime = _datetimeProvider.UtcNow; + + await _middlewareRunner.RunPostMiddleware(workflow, def); + _publisher.PublishNotification(new WorkflowCompleted() { EventTimeUtc = _datetimeProvider.UtcNow, @@ -257,6 +264,5 @@ private void DetermineNextExecutionTime(WorkflowInstance workflow) Version = workflow.Version }); } - } } diff --git a/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs b/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs new file mode 100644 index 000000000..6eb6968b8 --- /dev/null +++ b/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services +{ + /// + public class WorkflowMiddlewareRunner : IWorkflowMiddlewareRunner + { + private static readonly WorkflowDelegate NoopWorkflowDelegate = () => Task.CompletedTask; + private readonly IEnumerable _middleware; + private readonly IServiceProvider _serviceProvider; + + public WorkflowMiddlewareRunner( + IEnumerable middleware, + IServiceProvider serviceProvider + ) + { + _middleware = middleware; + _serviceProvider = serviceProvider; + } + + + /// + /// Runs workflow-level middleware that is set to run at the + /// phase. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The to run for. + /// The definition. + /// A task that will complete when all middleware has run. + public async Task RunPreMiddleware(WorkflowInstance workflow, WorkflowDefinition def) + { + var preMiddleware = _middleware + .Where(m => m.Phase == WorkflowMiddlewarePhase.PreWorkflow) + .ToArray(); + + await RunWorkflowMiddleware(workflow, preMiddleware); + } + + /// + /// Runs workflow-level middleware that is set to run at the + /// phase. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The to run for. + /// The definition. + /// A task that will complete when all middleware has run. + public async Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def) + { + var postMiddleware = _middleware + .Where(m => m.Phase == WorkflowMiddlewarePhase.PostWorkflow) + .ToArray(); + + try + { + await RunWorkflowMiddleware(workflow, postMiddleware); + } + catch (Exception exception) + { + // On error, determine which error handler to run and then run it + var errorHandlerType = def.OnPostMiddlewareError ?? typeof(IWorkflowMiddlewareErrorHandler); + using (var scope = _serviceProvider.CreateScope()) + { + var typeInstance = scope.ServiceProvider.GetService(errorHandlerType); + if (typeInstance != null && typeInstance is IWorkflowMiddlewareErrorHandler handler) + { + await handler.HandleAsync(exception); + } + } + } + } + + private static async Task RunWorkflowMiddleware( + WorkflowInstance workflow, + IEnumerable middlewareCollection + ) + { + // Build the middleware chain + var middlewareChain = middlewareCollection + .Reverse() + .Aggregate( + NoopWorkflowDelegate, + (previous, middleware) => () => middleware.HandleAsync(workflow, previous) + ); + + await middlewareChain(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/FlakyConnectionParams.cs b/src/samples/WorkflowCore.Sample19/FlakyConnectionParams.cs new file mode 100644 index 000000000..0fda1a91f --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/FlakyConnectionParams.cs @@ -0,0 +1,9 @@ +using System; + +namespace WorkflowCore.Sample19 +{ + public class FlakyConnectionParams : IDescriptiveWorkflowParams + { + public string Description { get; set; } + } +} diff --git a/src/samples/WorkflowCore.Sample19/FlakyConnectionWorkflow.cs b/src/samples/WorkflowCore.Sample19/FlakyConnectionWorkflow.cs new file mode 100644 index 000000000..a388930d2 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/FlakyConnectionWorkflow.cs @@ -0,0 +1,25 @@ +using WorkflowCore.Interface; +using WorkflowCore.Sample19.Steps; + +namespace WorkflowCore.Sample19 +{ + public class FlakyConnectionWorkflow : IWorkflow + { + public string Id => "flaky-sample"; + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith() + .Input(x => x.Message, _ => "Starting workflow") + + .Then() + .Input(x => x.SucceedAfterAttempts, _ => 3) + + .Then() + .Input(x => x.Message, _ => "Finishing workflow"); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/IDescriptiveWorkflowParams.cs b/src/samples/WorkflowCore.Sample19/IDescriptiveWorkflowParams.cs new file mode 100644 index 000000000..73bd26892 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/IDescriptiveWorkflowParams.cs @@ -0,0 +1,7 @@ +namespace WorkflowCore.Sample19 +{ + public interface IDescriptiveWorkflowParams + { + string Description { get; } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Middleware/AddDescriptionWorkflowMiddleware.cs b/src/samples/WorkflowCore.Sample19/Middleware/AddDescriptionWorkflowMiddleware.cs new file mode 100644 index 000000000..8c6080c12 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Middleware/AddDescriptionWorkflowMiddleware.cs @@ -0,0 +1,20 @@ +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Middleware +{ + public class AddDescriptionWorkflowMiddleware : IWorkflowMiddleware + { + public WorkflowMiddlewarePhase Phase => WorkflowMiddlewarePhase.PreWorkflow; + public Task HandleAsync(WorkflowInstance workflow, WorkflowDelegate next) + { + if (workflow.Data is IDescriptiveWorkflowParams descriptiveParams) + { + workflow.Description = descriptiveParams.Description; + } + + return next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Middleware/LogCorrelationStepMiddleware.cs b/src/samples/WorkflowCore.Sample19/Middleware/LogCorrelationStepMiddleware.cs new file mode 100644 index 000000000..38fefc362 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Middleware/LogCorrelationStepMiddleware.cs @@ -0,0 +1,38 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Middleware +{ + /// + /// Loosely based off this article: + /// https://www.frakkingsweet.com/net-core-log-correlation-easy-access-to-headers/ + /// + public class AddMetadataToLogsMiddleware: IWorkflowStepMiddleware + { + private readonly ILogger _log; + + public AddMetadataToLogsMiddleware(ILogger log) + { + _log = log; + } + + public async Task HandleAsync( + IStepExecutionContext context, + IStepBody body, + WorkflowStepDelegate next) + { + var workflowId = context.Workflow.Id; + var stepId = context.Step.Id; + + using (_log.BeginScope("WorkflowId => {@WorkflowId}", workflowId)) + using (_log.BeginScope("StepId => {@StepId}", stepId)) + { + return await next(); + } + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Middleware/PollyRetryMiddleware.cs b/src/samples/WorkflowCore.Sample19/Middleware/PollyRetryMiddleware.cs new file mode 100644 index 000000000..24e7621e7 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Middleware/PollyRetryMiddleware.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Polly; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Middleware +{ + public class PollyRetryMiddleware : IWorkflowStepMiddleware + { + private const string StepContextKey = "WorkflowStepContext"; + private const int MaxRetries = 3; + private readonly ILogger _log; + + public PollyRetryMiddleware(ILogger log) + { + _log = log; + } + + public IAsyncPolicy GetRetryPolicy() => + Policy + .Handle() + .RetryAsync( + MaxRetries, + (result, retryCount, context) => + UpdateRetryCount(result.Exception, retryCount, context[StepContextKey] as IStepExecutionContext) + ); + + public async Task HandleAsync( + IStepExecutionContext context, + IStepBody body, + WorkflowStepDelegate next + ) + { + return await GetRetryPolicy().ExecuteAsync(ctx => next(), new Dictionary + { + { StepContextKey, context } + }); + } + + private Task UpdateRetryCount( + Exception exception, + int retryCount, + IStepExecutionContext stepContext) + { + var stepInstance = stepContext.ExecutionPointer; + stepInstance.RetryCount = retryCount; + + _log.LogWarning( + exception, + "Exception occurred in step {StepId}. Retrying [{RetryCount}/{MaxCount}]", + stepInstance.Id, + retryCount, + MaxRetries + ); + + // TODO: Come up with way to persist workflow + return Task.CompletedTask; + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Middleware/PrintWorkflowSummaryMiddleware.cs b/src/samples/WorkflowCore.Sample19/Middleware/PrintWorkflowSummaryMiddleware.cs new file mode 100644 index 000000000..2cdba963e --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Middleware/PrintWorkflowSummaryMiddleware.cs @@ -0,0 +1,41 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Middleware +{ + public class PrintWorkflowSummaryMiddleware : IWorkflowMiddleware + { + private readonly ILogger _log; + + public PrintWorkflowSummaryMiddleware(ILogger log) + { + _log = log; + } + + public WorkflowMiddlewarePhase Phase => WorkflowMiddlewarePhase.PostWorkflow; + + public Task HandleAsync(WorkflowInstance workflow, WorkflowDelegate next) + { + if (!workflow.CompleteTime.HasValue) + { + return next(); + } + + var duration = workflow.CompleteTime.Value - workflow.CreateTime; + _log.LogInformation($@"Workflow {workflow.Description} completed in {duration:g}"); + + foreach (var step in workflow.ExecutionPointers) + { + var stepName = step.StepName; + var stepDuration = (step.EndTime - step.StartTime) ?? TimeSpan.Zero; + _log.LogInformation($" - Step {stepName} completed in {stepDuration:g}"); + } + + return next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Program.cs b/src/samples/WorkflowCore.Sample19/Program.cs new file mode 100644 index 000000000..1d6e38a48 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Program.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using Microsoft.Extensions.Logging.Console; +using WorkflowCore.Interface; +using WorkflowCore.Sample19.Middleware; +using WorkflowCore.Sample19.Steps; + +namespace WorkflowCore.Sample19 +{ + class Program + { + static void Main(string[] args) + { + var serviceProvider = ConfigureServices(); + + // Start the workflow host + var host = serviceProvider.GetService(); + host.RegisterWorkflow(); + host.Start(); + + var workflowParams = new FlakyConnectionParams + { + Description = "Flaky connection workflow" + }; + var workflowId = host.StartWorkflow("flaky-sample", workflowParams).Result; + Console.WriteLine($"Kicked off workflow {workflowId}"); + + Console.ReadLine(); + host.Stop(); + } + + private static IServiceProvider ConfigureServices() + { + // Setup dependency injection + IServiceCollection services = new ServiceCollection(); + services.AddWorkflow(); + + // Add step middleware + // Note that middleware will get executed in the order in which they were registered + services.AddWorkflowStepMiddleware(); + services.AddWorkflowStepMiddleware(); + + // Add some pre workflow middleware + // This middleware will run before the workflow starts + services.AddWorkflowMiddleware(); + + // Add some post workflow middleware + // This middleware will run after the workflow completes + services.AddWorkflowMiddleware(); + + // Add workflow steps + services.AddTransient(); + services.AddTransient(); + + services.AddLogging(cfg => + { + cfg.AddConsole(x => x.IncludeScopes = true); + cfg.AddDebug(); + }); + + var serviceProvider = services.BuildServiceProvider(); + return serviceProvider; + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Steps/FlakyConnection.cs b/src/samples/WorkflowCore.Sample19/Steps/FlakyConnection.cs new file mode 100644 index 000000000..f04f33f11 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Steps/FlakyConnection.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Steps +{ + public class FlakyConnection : StepBodyAsync + { + private static readonly TimeSpan Delay = TimeSpan.FromSeconds(1); + private int _currentCallCount = 0; + + public int? SucceedAfterAttempts { get; set; } = 3; + + public override async Task RunAsync(IStepExecutionContext context) + { + if (SucceedAfterAttempts.HasValue && _currentCallCount >= SucceedAfterAttempts.Value) + { + return ExecutionResult.Next(); + } + + _currentCallCount++; + await Task.Delay(Delay); + throw new TimeoutException("A call has timed out"); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/Steps/LogMessage.cs b/src/samples/WorkflowCore.Sample19/Steps/LogMessage.cs new file mode 100644 index 000000000..ac8f4b678 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/Steps/LogMessage.cs @@ -0,0 +1,29 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample19.Steps +{ + public class LogMessage : StepBodyAsync + { + private readonly ILogger _log; + + public LogMessage(ILogger log) + { + _log = log; + } + + public string Message { get; set; } + + public override Task RunAsync(IStepExecutionContext context) + { + if (Message != null) + { + _log.LogInformation(Message); + } + + return Task.FromResult(ExecutionResult.Next()); + } + } +} diff --git a/src/samples/WorkflowCore.Sample19/WorkflowCore.Sample19.csproj b/src/samples/WorkflowCore.Sample19/WorkflowCore.Sample19.csproj new file mode 100644 index 000000000..b2a7796c3 --- /dev/null +++ b/src/samples/WorkflowCore.Sample19/WorkflowCore.Sample19.csproj @@ -0,0 +1,20 @@ + + + + Exe + netcoreapp3.0 + + + + + + + + + + + + + + + diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/MiddlewareScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/MiddlewareScenario.cs new file mode 100644 index 000000000..1c6929209 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/MiddlewareScenario.cs @@ -0,0 +1,167 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Testing; +using Xunit; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class MiddlewareScenario : WorkflowTest + { + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(60); + private static readonly TimeSpan Delay = TimeSpan.FromMilliseconds(5); + private readonly List _workflowMiddleware = new List(); + private readonly List _stepMiddleware = new List(); + private readonly TestStep _step = new TestStep(); + + public MiddlewareScenario() + { + Setup(); + } + + public TestWorkflowMiddleware[] PreMiddleware => _workflowMiddleware + .Where(x => x.Phase == WorkflowMiddlewarePhase.PreWorkflow) + .ToArray(); + + public TestWorkflowMiddleware[] PostMiddleware => _workflowMiddleware + .Where(x => x.Phase == WorkflowMiddlewarePhase.PostWorkflow) + .ToArray(); + + public class MyWorkflow: IWorkflow + { + public string Id => nameof(MyWorkflow); + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) => + builder.StartWith(); + } + + public class TestStep : StepBodyAsync + { + + public DateTime? StartTime { get; private set; } + public DateTime? EndTime { get; private set; } + public bool HasCompleted => StartTime.HasValue && EndTime.HasValue; + + public override async Task RunAsync(IStepExecutionContext context) + { + StartTime = DateTime.UtcNow; + await Task.Delay(Delay); + EndTime = DateTime.UtcNow; + return ExecutionResult.Next(); + } + } + + public class TestWorkflowMiddleware : IWorkflowMiddleware + { + public TestWorkflowMiddleware(WorkflowMiddlewarePhase phase) + { + Phase = phase; + } + + public WorkflowMiddlewarePhase Phase { get; } + + public DateTime? StartTime { get; private set; } + public DateTime? EndTime { get; private set; } + public bool HasCompleted => StartTime.HasValue && EndTime.HasValue; + + public async Task HandleAsync(WorkflowInstance workflow, WorkflowDelegate next) + { + StartTime = DateTime.UtcNow; + await Task.Delay(Delay); + await next(); + await Task.Delay(Delay); + EndTime = DateTime.UtcNow; + } + } + + public class TestStepMiddleware : IWorkflowStepMiddleware + { + public DateTime? StartTime { get; private set; } + public DateTime? EndTime { get; private set; } + + public bool HasCompleted => StartTime.HasValue && EndTime.HasValue; + + public async Task HandleAsync(IStepExecutionContext context, IStepBody body, WorkflowStepDelegate next) + { + StartTime = DateTime.UtcNow; + await Task.Delay(Delay); + var result = await next(); + await Task.Delay(Delay); + EndTime = DateTime.UtcNow; + return result; + } + } + + protected override void ConfigureServices(IServiceCollection services) + { + base.ConfigureServices(services); + + services.AddTransient(_ => _step); + + // Configure 3 middleware of each type + const int middlewareCount = 3; + foreach (var _ in Enumerable.Range(0, middlewareCount)) + { + var preMiddleware = new TestWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow); + var postMiddleware = new TestWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow); + _workflowMiddleware.Add(preMiddleware); + _workflowMiddleware.Add(postMiddleware); + services.AddWorkflowMiddleware(p => preMiddleware); + services.AddWorkflowMiddleware(p => postMiddleware); + } + + // Configure 3 step middleware + foreach (var _ in Enumerable.Range(0, middlewareCount)) + { + var middleware = new TestStepMiddleware(); + services.AddWorkflowStepMiddleware(p => middleware); + _stepMiddleware.Add(middleware); + } + + } + + [Fact(DisplayName = "Should run all workflow and step middleware")] + public async Task Should_run_all_workflow_and_step_middleware() + { + var workflowId = await StartWorkflowAsync(new object()); + var status = await WaitForWorkflowToCompleteAsync(workflowId, Timeout); + + // Workflow should complete without errors + status.Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + + // Each middleware should have run + _workflowMiddleware.Should() + .HaveCount(6).And + .OnlyContain(x => x.HasCompleted); + _stepMiddleware.Should() + .HaveCount(3) + .And + .OnlyContain(x => x.HasCompleted); + + // Step middleware should have been run in order + _stepMiddleware.Should().BeInAscendingOrder(x => x.StartTime); + _stepMiddleware.Should().BeInDescendingOrder(x => x.EndTime); + + // Step should have been called after all step middleware + _step.HasCompleted.Should().BeTrue(); + _step.StartTime.Should().BeAfter(_stepMiddleware.Last().StartTime.Value); + _step.EndTime.Should().BeBefore(_stepMiddleware.Last().EndTime.Value); + + // Pre workflow middleware should have been run in order + PreMiddleware.Should().BeInAscendingOrder(x => x.StartTime); + PreMiddleware.Should().BeInDescendingOrder(x => x.EndTime); + + // Post workflow middleware should have been run in order + PostMiddleware.Should().BeInAscendingOrder(x => x.StartTime); + PostMiddleware.Should().BeInDescendingOrder(x => x.EndTime); + } + } +} diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs index fb7d2eed1..7d8a0117a 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs @@ -5,13 +5,14 @@ using WorkflowCore.Models; using Xunit; using FluentAssertions; +using WorkflowCore.Services.DefinitionStorage; using WorkflowCore.Testing; using WorkflowCore.TestAssets.DataTypes; namespace WorkflowCore.IntegrationTests.Scenarios { public class StoredJsonScenario : JsonWorkflowTest - { + { public StoredJsonScenario() { Setup(); diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StoredYamlScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StoredYamlScenario.cs index 820f86bd8..3b37d2eb7 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StoredYamlScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StoredYamlScenario.cs @@ -5,13 +5,14 @@ using WorkflowCore.Models; using Xunit; using FluentAssertions; +using WorkflowCore.Services.DefinitionStorage; using WorkflowCore.Testing; using WorkflowCore.TestAssets.DataTypes; namespace WorkflowCore.IntegrationTests.Scenarios { public class StoredYamlScenario : YamlWorkflowTest - { + { public StoredYamlScenario() { Setup(); diff --git a/test/WorkflowCore.TestAssets/LockProvider/DistributedLockProviderTests.cs b/test/WorkflowCore.TestAssets/LockProvider/DistributedLockProviderTests.cs index 7f1eebf1d..677fb511c 100644 --- a/test/WorkflowCore.TestAssets/LockProvider/DistributedLockProviderTests.cs +++ b/test/WorkflowCore.TestAssets/LockProvider/DistributedLockProviderTests.cs @@ -2,12 +2,13 @@ using System.Collections.Generic; using System.Text; using System.Threading; +using System.Threading.Tasks; using WorkflowCore.Interface; using FluentAssertions; using NUnit.Framework; namespace WorkflowCore.TestAssets.LockProvider -{ +{ public abstract class DistributedLockProviderTests { protected IDistributedLockProvider Subject; @@ -19,10 +20,10 @@ public void Setup() Subject.Start(); } - protected abstract IDistributedLockProvider CreateProvider(); + protected abstract IDistributedLockProvider CreateProvider(); [Test] - public async void AcquiresLock() + public async Task AcquiresLock() { const string lock1 = "lock1"; const string lock2 = "lock2"; @@ -34,7 +35,7 @@ public async void AcquiresLock() } [Test] - public async void DoesNotAcquireWhenLocked() + public async Task DoesNotAcquireWhenLocked() { const string lock1 = "lock1"; await Subject.AcquireLock(lock1, new CancellationToken()); @@ -45,7 +46,7 @@ public async void DoesNotAcquireWhenLocked() } [Test] - public async void ReleasesLock() + public async Task ReleasesLock() { const string lock1 = "lock1"; await Subject.AcquireLock(lock1, new CancellationToken()); diff --git a/test/WorkflowCore.TestAssets/stored-definition.json b/test/WorkflowCore.TestAssets/stored-definition.json index 56f36ac5e..1db3a9174 100644 --- a/test/WorkflowCore.TestAssets/stored-definition.json +++ b/test/WorkflowCore.TestAssets/stored-definition.json @@ -90,4 +90,4 @@ } ] -} \ No newline at end of file +} diff --git a/test/WorkflowCore.Testing/JsonWorkflowTest.cs b/test/WorkflowCore.Testing/JsonWorkflowTest.cs index 16fd7c232..760966a52 100644 --- a/test/WorkflowCore.Testing/JsonWorkflowTest.cs +++ b/test/WorkflowCore.Testing/JsonWorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class JsonWorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected IDefinitionLoader DefinitionLoader; + protected IWorkflowRegistry Registry; protected List UnhandledStepErrors = new List(); protected virtual void Setup() @@ -34,6 +35,7 @@ protected virtual void Setup() PersistenceProvider = serviceProvider.GetService(); DefinitionLoader = serviceProvider.GetService(); + Registry = serviceProvider.GetService(); Host = serviceProvider.GetService(); Host.OnStepError += Host_OnStepError; Host.Start(); @@ -106,5 +108,5 @@ public void Dispose() Host.Stop(); } } - + } diff --git a/test/WorkflowCore.Testing/WorkflowTest.cs b/test/WorkflowCore.Testing/WorkflowTest.cs index 1a0dc8d81..f61d6be0a 100644 --- a/test/WorkflowCore.Testing/WorkflowTest.cs +++ b/test/WorkflowCore.Testing/WorkflowTest.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Text; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using WorkflowCore.Interface; @@ -18,7 +19,7 @@ public abstract class WorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected List UnhandledStepErrors = new List(); - + protected virtual void Setup() { //setup dependency injection @@ -61,6 +62,13 @@ public string StartWorkflow(TData data) return workflowId; } + public async Task StartWorkflowAsync(TData data) + { + var def = new TWorkflow(); + var workflowId = await Host.StartWorkflow(def.Id, data); + return workflowId; + } + protected void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut) { var status = GetStatus(workflowId); @@ -73,6 +81,20 @@ protected void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut) } } + protected async Task WaitForWorkflowToCompleteAsync(string workflowId, TimeSpan timeOut) + { + var status = GetStatus(workflowId); + var counter = 0; + while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100))) + { + await Task.Delay(100); + counter++; + status = GetStatus(workflowId); + } + + return status; + } + protected IEnumerable GetActiveSubscriptons(string eventName, string eventKey) { return PersistenceProvider.GetSubscriptions(eventName, eventKey, DateTime.MaxValue).Result; diff --git a/test/WorkflowCore.Testing/YamlWorkflowTest.cs b/test/WorkflowCore.Testing/YamlWorkflowTest.cs index 58cbb3cad..b16d07a6d 100644 --- a/test/WorkflowCore.Testing/YamlWorkflowTest.cs +++ b/test/WorkflowCore.Testing/YamlWorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class YamlWorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected IDefinitionLoader DefinitionLoader; + protected IWorkflowRegistry Registry; protected List UnhandledStepErrors = new List(); protected virtual void Setup() @@ -34,6 +35,7 @@ protected virtual void Setup() PersistenceProvider = serviceProvider.GetService(); DefinitionLoader = serviceProvider.GetService(); + Registry = serviceProvider.GetService(); Host = serviceProvider.GetService(); Host.OnStepError += Host_OnStepError; Host.Start(); @@ -106,5 +108,4 @@ public void Dispose() Host.Stop(); } } - } diff --git a/test/WorkflowCore.UnitTests/Services/StepExecutorTests.cs b/test/WorkflowCore.UnitTests/Services/StepExecutorTests.cs new file mode 100644 index 000000000..86d2bfcee --- /dev/null +++ b/test/WorkflowCore.UnitTests/Services/StepExecutorTests.cs @@ -0,0 +1,148 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading.Tasks; +using FakeItEasy; +using FluentAssertions; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Services; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.UnitTests.Services +{ + public class StepExecutorTests + { + protected List Middleware { get; } + protected IStepBody Body { get; } + protected IStepExecutionContext Context { get; } + protected IStepExecutor Runner { get; } + protected ExecutionResult DummyResult { get; } = ExecutionResult.Persist(null); + protected ITestOutputHelper Out { get; } + + public StepExecutorTests(ITestOutputHelper output) + { + Middleware = new List(); + Body = A.Fake(); + Context = A.Fake(); + Out = output; + Runner = new StepExecutor(Middleware); + + A + .CallTo(() => Body.RunAsync(A._)) + .Invokes(() => Out.WriteLine("Called step body")) + .Returns(DummyResult); + } + + [Fact(DisplayName = "ExecuteStep should run step when no middleware")] + public async Task ExecuteStep_should_run_step_when_no_middleware() + { + // Act + var result = await Runner.ExecuteStep(Context, Body); + + // Assert + result.Should().Be(DummyResult); + } + + [Fact(DisplayName = "ExecuteStep should run middleware and step when one middleware")] + public async Task ExecuteStep_should_run_middleware_and_step_when_one_middleware() + { + // Arrange + var middleware = BuildStepMiddleware(); + Middleware.Add(middleware); + + // Act + var result = await Runner.ExecuteStep(Context, Body); + + // Assert + result.Should().Be(DummyResult); + A + .CallTo(RunMethodFor(Body)) + .MustHaveHappenedOnceExactly() + .Then( + A.CallTo(HandleMethodFor(middleware)) + .MustHaveHappenedOnceExactly()); + } + + [Fact(DisplayName = + "ExecuteStep should run middleware chain completing in reverse order and step when multiple middleware")] + public async Task + ExecuteStep_should_run_middleware_chain_completing_in_reverse_order_and_step_when_multiple_middleware() + { + // Arrange + var middleware1 = BuildStepMiddleware(1); + var middleware2 = BuildStepMiddleware(2); + var middleware3 = BuildStepMiddleware(3); + Middleware.AddRange(new[] { middleware1, middleware2, middleware3 }); + + // Act + var result = await Runner.ExecuteStep(Context, Body); + + // Assert + result.Should().Be(DummyResult); + A + .CallTo(RunMethodFor(Body)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(middleware3)) + .MustHaveHappenedOnceExactly()) + .Then(A + .CallTo(HandleMethodFor(middleware2)) + .MustHaveHappenedOnceExactly()) + .Then(A + .CallTo(HandleMethodFor(middleware1)) + .MustHaveHappenedOnceExactly()); + } + + [Fact(DisplayName = "ExecuteStep should bubble up exceptions in middleware")] + public void ExecuteStep_should_bubble_up_exceptions_in_middleware() + { + // Arrange + var middleware1 = BuildStepMiddleware(1); + var middleware2 = BuildStepMiddleware(2); + var middleware3 = BuildStepMiddleware(3); + Middleware.AddRange(new[] { middleware1, middleware2, middleware3 }); + A + .CallTo(HandleMethodFor(middleware2)) + .Throws(new ApplicationException("Failed")); + + // Act + Func> action = async () => await Runner.ExecuteStep(Context, Body); + + // Assert + action + .ShouldThrow() + .WithMessage("Failed"); + } + + #region Helpers + + private IWorkflowStepMiddleware BuildStepMiddleware(int id = 0) + { + var middleware = A.Fake(); + A + .CallTo(HandleMethodFor(middleware)) + .ReturnsLazily(async call => + { + Out.WriteLine($@"Before step middleware {id}"); + var result = await call.Arguments[2].As().Invoke(); + Out.WriteLine($@"After step middleware {id}"); + return result; + }); + + return middleware; + } + + private static Expression>> HandleMethodFor(IWorkflowStepMiddleware middleware) => + () => middleware.HandleAsync( + A._, + A._, + A._); + + private static Expression>> RunMethodFor(IStepBody body) => + () => body.RunAsync(A._); + + #endregion + } +} diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs index 622af03ff..8b6f22e22 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs @@ -3,6 +3,8 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Threading.Tasks; +using FluentAssertions; using WorkflowCore.Interface; using WorkflowCore.Models; using WorkflowCore.Services; @@ -22,6 +24,8 @@ public class WorkflowExecutorFixture protected IServiceProvider ServiceProvider; protected IScopeProvider ScopeProvider; protected IDateTimeProvider DateTimeProvider; + protected IStepExecutor StepExecutor; + protected IWorkflowMiddlewareRunner MiddlewareRunner; protected WorkflowOptions Options; public WorkflowExecutorFixture() @@ -35,6 +39,8 @@ public WorkflowExecutorFixture() EventHub = A.Fake(); CancellationProcessor = A.Fake(); DateTimeProvider = A.Fake(); + MiddlewareRunner = A.Fake(); + StepExecutor = A.Fake(); Options = new WorkflowOptions(A.Fake()); @@ -45,17 +51,26 @@ public WorkflowExecutorFixture() A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now); A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow); + A.CallTo(() => MiddlewareRunner + .RunPostMiddleware(A._, A._)) + .Returns(Task.CompletedTask); + + A.CallTo(() => StepExecutor.ExecuteStep(A._, A._)) + .ReturnsLazily(call => + call.Arguments[1].As().RunAsync( + call.Arguments[0].As())); + //config logging var loggerFactory = new LoggerFactory(); - //loggerFactory.AddConsole(LogLevel.Debug); + //loggerFactory.AddConsole(LogLevel.Debug); - Subject = new WorkflowExecutor(Registry, ServiceProvider, ScopeProvider, DateTimeProvider, ResultProcesser, EventHub, CancellationProcessor, Options, loggerFactory); + Subject = new WorkflowExecutor(Registry, ServiceProvider, ScopeProvider, DateTimeProvider, ResultProcesser, EventHub, CancellationProcessor, Options, MiddlewareRunner, StepExecutor, loggerFactory); } [Fact(DisplayName = "Should execute active step")] public void should_execute_active_step() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); WorkflowStep step1 = BuildFakeStep(step1Body); @@ -72,7 +87,7 @@ public void should_execute_active_step() { new ExecutionPointer() { Id = "1", Active = true, StepId = 0 } }) - }; + }; //act Subject.Execute(instance); @@ -85,7 +100,7 @@ public void should_execute_active_step() [Fact(DisplayName = "Should trigger step hooks")] public void should_trigger_step_hooks() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); WorkflowStep step1 = BuildFakeStep(step1Body); @@ -116,7 +131,7 @@ public void should_trigger_step_hooks() [Fact(DisplayName = "Should not execute inactive step")] public void should_not_execute_inactive_step() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); WorkflowStep step1 = BuildFakeStep(step1Body); @@ -134,7 +149,7 @@ public void should_not_execute_inactive_step() new ExecutionPointer() { Id = "1", Active = false, StepId = 0 } }) }; - + //act Subject.Execute(instance); @@ -148,7 +163,7 @@ public void should_map_inputs() //arrange var param = A.Fake(); - var step1Body = A.Fake(); + var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); WorkflowStep step1 = BuildFakeStep(step1Body, new List() { @@ -183,7 +198,7 @@ public void should_map_inputs() [Fact(DisplayName = "Should map outputs")] public void should_map_outputs() { - //arrange + //arrange var param = A.Fake(); var step1Body = A.Fake(); @@ -212,7 +227,7 @@ public void should_map_outputs() new ExecutionPointer() { Id = "1", Active = true, StepId = 0 } }) }; - + //act Subject.Execute(instance); @@ -221,12 +236,12 @@ public void should_map_outputs() .MustHaveHappened(); } - + [Fact(DisplayName = "Should handle step exception")] public void should_handle_step_exception() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Throws(); WorkflowStep step1 = BuildFakeStep(step1Body); @@ -251,13 +266,13 @@ public void should_handle_step_exception() //assert A.CallTo(() => step1Body.RunAsync(A.Ignored)).MustHaveHappened(); A.CallTo(() => ResultProcesser.HandleStepException(instance, A.Ignored, A.Ignored, step1, A.Ignored)).MustHaveHappened(); - A.CallTo(() => ResultProcesser.ProcessExecutionResult(instance, A.Ignored, A.Ignored, step1, A.Ignored, A.Ignored)).MustNotHaveHappened(); + A.CallTo(() => ResultProcesser.ProcessExecutionResult(instance, A.Ignored, A.Ignored, step1, A.Ignored, A.Ignored)).MustNotHaveHappened(); } [Fact(DisplayName = "Should process after execution iteration")] public void should_process_after_execution_iteration() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Persist(null)); WorkflowStep step1 = BuildFakeStep(step1Body); @@ -286,7 +301,7 @@ public void should_process_after_execution_iteration() [Fact(DisplayName = "Should process cancellations")] public void should_process_cancellations() { - //arrange + //arrange var step1Body = A.Fake(); A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Persist(null)); WorkflowStep step1 = BuildFakeStep(step1Body); diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs b/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs new file mode 100644 index 000000000..991acfb23 --- /dev/null +++ b/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs @@ -0,0 +1,276 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading.Tasks; +using FakeItEasy; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Services; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.UnitTests.Services +{ + public class WorkflowMiddlewareRunnerTests + { + protected List Middleware { get; } + protected WorkflowInstance Workflow { get; } + protected WorkflowDefinition Definition { get; } + protected IServiceProvider ServiceProvider { get; } + protected IWorkflowMiddlewareErrorHandler TopLevelErrorHandler { get; } + protected IDefLevelErrorHandler DefLevelErrorHandler { get; } + protected IWorkflowMiddlewareRunner Runner { get; } + protected ITestOutputHelper Out { get; } + + public WorkflowMiddlewareRunnerTests(ITestOutputHelper output) + { + Out = output; + Middleware = new List(); + Workflow = new WorkflowInstance(); + Definition = new WorkflowDefinition(); + TopLevelErrorHandler = A.Fake(); + DefLevelErrorHandler = A.Fake(); + ServiceProvider = new ServiceCollection() + .AddTransient(_ => TopLevelErrorHandler) + .AddTransient(_ => DefLevelErrorHandler) + .BuildServiceProvider(); + + A + .CallTo(HandleMethodFor(TopLevelErrorHandler)) + .Returns(Task.CompletedTask); + A + .CallTo(HandleMethodFor(DefLevelErrorHandler)) + .Returns(Task.CompletedTask); + + Runner = new WorkflowMiddlewareRunner(Middleware, ServiceProvider); + } + + + [Fact(DisplayName = "RunPreMiddleware should run nothing when no middleware")] + public void RunPreMiddleware_should_run_nothing_when_no_middleware() + { + // Act + Func action = async () => await Runner.RunPreMiddleware(Workflow, Definition); + + // Assert + action.ShouldNotThrow(); + } + + [Fact(DisplayName = "RunPreMiddleware should run middleware when one middleware")] + public async Task RunPreMiddleware_should_run_middleware_when_one_middleware() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow); + Middleware.Add(middleware); + + // Act + await Runner.RunPreMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware)) + .MustHaveHappenedOnceExactly(); + } + + [Fact(DisplayName = "RunPreMiddleware should run all middleware when multiple middleware")] + public async Task RunPreMiddleware_should_run_all_middleware_when_multiple_middleware() + { + // Arrange + var middleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 1); + var middleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 2); + var middleware3 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 3); + Middleware.AddRange(new[] { middleware1, middleware2, middleware3 }); + + // Act + await Runner.RunPreMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware3)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(middleware2)) + .MustHaveHappenedOnceExactly()) + .Then(A + .CallTo(HandleMethodFor(middleware1)) + .MustHaveHappenedOnceExactly()); + } + + [Fact(DisplayName = "RunPreMiddleware should only run middleware in PreWorkflow phase")] + public async Task RunPreMiddleware_should_only_run_middleware_in_PreWorkflow_phase() + { + // Arrange + var preMiddleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 1); + var preMiddleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 2); + var postMiddleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 3); + var postMiddleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 4); + Middleware.AddRange(new[] { postMiddleware1, postMiddleware2, preMiddleware1, preMiddleware2 }); + + // Act + await Runner.RunPreMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(preMiddleware2)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(preMiddleware1)) + .MustHaveHappenedOnceExactly()); + + A.CallTo(HandleMethodFor(postMiddleware1)).MustNotHaveHappened(); + A.CallTo(HandleMethodFor(postMiddleware2)).MustNotHaveHappened(); + } + + [Fact(DisplayName = "RunPostMiddleware should run nothing when no middleware")] + public void RunPostMiddleware_should_run_nothing_when_no_middleware() + { + // Act + Func action = async () => await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + action.ShouldNotThrow(); + } + + [Fact(DisplayName = "RunPostMiddleware should run middleware when one middleware")] + public async Task RunPostMiddleware_should_run_middleware_when_one_middleware() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow); + Middleware.Add(middleware); + + // Act + await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware)) + .MustHaveHappenedOnceExactly(); + } + + [Fact(DisplayName = "RunPostMiddleware should run all middleware when multiple middleware")] + public async Task RunPostMiddleware_should_run_all_middleware_when_multiple_middleware() + { + // Arrange + var middleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 1); + var middleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 2); + var middleware3 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 3); + Middleware.AddRange(new[] { middleware1, middleware2, middleware3 }); + + // Act + await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware3)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(middleware2)) + .MustHaveHappenedOnceExactly()) + .Then(A + .CallTo(HandleMethodFor(middleware1)) + .MustHaveHappenedOnceExactly()); + } + + [Fact(DisplayName = "RunPostMiddleware should only run middleware in PostWorkflow phase")] + public async Task RunPostMiddleware_should_only_run_middleware_in_PostWorkflow_phase() + { + // Arrange + var postMiddleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 1); + var postMiddleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 2); + var preMiddleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 3); + var preMiddleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 4); + Middleware.AddRange(new[] { preMiddleware1, postMiddleware1, preMiddleware2, postMiddleware2 }); + + // Act + await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(postMiddleware2)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(postMiddleware1)) + .MustHaveHappenedOnceExactly()); + + A.CallTo(HandleMethodFor(preMiddleware1)).MustNotHaveHappened(); + A.CallTo(HandleMethodFor(preMiddleware1)).MustNotHaveHappened(); + } + + [Fact(DisplayName = "RunPostMiddleware should call top level error handler when middleware throws")] + public async Task RunPostMiddleware_should_call_top_level_error_handler_when_middleware_throws() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 1); + A.CallTo(HandleMethodFor(middleware)).ThrowsAsync(new ApplicationException("Something went wrong")); + Middleware.AddRange(new[] { middleware }); + + // Act + await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(TopLevelErrorHandler)) + .MustHaveHappenedOnceExactly(); + } + + [Fact(DisplayName = + "RunPostMiddleware should call error handler on workflow def when middleware throws and def has handler defined")] + public async Task + RunPostMiddleware_should_call_error_handler_on_workflow_def_when_middleware_throws_and_def_has_handler() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 1); + A.CallTo(HandleMethodFor(middleware)).ThrowsAsync(new ApplicationException("Something went wrong")); + Middleware.AddRange(new[] { middleware }); + Definition.OnPostMiddlewareError = typeof(IDefLevelErrorHandler); + + // Act + await Runner.RunPostMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(TopLevelErrorHandler)) + .MustNotHaveHappened(); + A + .CallTo(HandleMethodFor(DefLevelErrorHandler)) + .MustHaveHappenedOnceExactly(); + } + + #region Helpers + + private IWorkflowMiddleware BuildWorkflowMiddleware( + WorkflowMiddlewarePhase phase, + int id = 0 + ) + { + var middleware = A.Fake(); + A.CallTo(() => middleware.Phase).Returns(phase); + A + .CallTo(HandleMethodFor(middleware)) + .ReturnsLazily(async call => + { + Out.WriteLine($@"Before workflow middleware {id}"); + await call.Arguments[1].As().Invoke(); + Out.WriteLine($@"After workflow middleware {id}"); + }); + + return middleware; + } + + private static Expression> HandleMethodFor(IWorkflowMiddleware middleware) => + () => middleware.HandleAsync( + A._, + A._); + + private static Expression> HandleMethodFor(IWorkflowMiddlewareErrorHandler errorHandler) => + () => errorHandler.HandleAsync(A._); + + public interface IDefLevelErrorHandler : IWorkflowMiddlewareErrorHandler + { + } + + #endregion + } +}