Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - Workflow Middleware #684

Merged
merged 1 commit into from Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions WorkflowCore.sln
Expand Up @@ -147,6 +147,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchP
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.QueueProviders.RabbitMQ", "test\WorkflowCore.Tests.QueueProviders.RabbitMQ\WorkflowCore.Tests.QueueProviders.RabbitMQ.csproj", "{54DE20BA-EBA7-4BF0-9BD9-F03766849716}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample19", "src\samples\WorkflowCore.Sample19\WorkflowCore.Sample19.csproj", "{1223ED47-3E5E-4960-B70D-DFAF550F6666}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -361,6 +363,10 @@ Global
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54DE20BA-EBA7-4BF0-9BD9-F03766849716}.Release|Any CPU.Build.0 = Release|Any CPU
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1223ED47-3E5E-4960-B70D-DFAF550F6666}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -421,6 +427,7 @@ Global
{E32CF21A-29CC-46D1-8044-FCC327F2B281} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
{51BB7DCD-01DD-453D-A1E7-17E5E3DBB14C} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
{54DE20BA-EBA7-4BF0-9BD9-F03766849716} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
{1223ED47-3E5E-4960-B70D-DFAF550F6666} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}
Expand Down
4 changes: 3 additions & 1 deletion docs/samples.md
Expand Up @@ -32,4 +32,6 @@

[Exposing a REST API](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WebApiSample)

[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08)
[Human(User) Workflow](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample08)

[Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19)
279 changes: 279 additions & 0 deletions docs/workflow-middleware.md
@@ -0,0 +1,279 @@
# Workflow Middleware

Workflows can be extended with Middleware that run before/after workflows start/complete as well as around workflow steps to provide flexibility in implementing cross-cutting concerns such as [log correlation](https://www.frakkingsweet.com/net-core-log-correlation-easy-access-to-headers/), [retries](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/implement-resilient-applications/implement-http-call-retries-exponential-backoff-polly), and other use-cases.

This is done by implementing and registering `IWorkflowMiddleware` for workflows or `IWorkflowStepMiddleware` for steps.

## Step Middleware

Step middleware lets you run additional code around the execution of a given step and alter its behavior. Implementing a step middleware should look familiar to anyone familiar with [ASP.NET Core's middleware pipeline](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-3.1) or [`HttpClient`'s `DelegatingHandler` middleware](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/http-requests?view=aspnetcore-3.1#outgoing-request-middleware).

### Usage

First, create your own middleware class that implements `IWorkflowStepMiddleware`. Here's an example of a middleware that adds workflow ID and step ID to the log correlation context of every workflow step in your app.

**Important:** You must make sure to call `next()` as part of your middleware. If you do not do this, your step will never run.

```cs
public class LogCorrelationStepMiddleware : IWorkflowStepMiddleware
{
private readonly ILogger<LogCorrelationStepMiddleware> _log;

public LogCorrelationStepMiddleware(
ILogger<LogCorrelationStepMiddleware> log)
{
_log = log;
}

public async Task<ExecutionResult> HandleAsync(
IStepExecutionContext context,
IStepBody body,
WorkflowStepDelegate next)
{
var workflowId = context.Workflow.Id;
var stepId = context.Step.Id;

// Uses log scope to add a few attributes to the scope
using (_log.BeginScope("{@WorkflowId}", workflowId))
using (_log.BeginScope("{@StepId}", stepId))
{
// Calling next ensures step gets executed
return await next();
}
}
}
```

Here's another example of a middleware that uses the [Polly](https://github.com/App-vNext/Polly) dotnet resiliency library to implement retries on workflow steps based off a custom retry policy.

```cs
public class PollyRetryStepMiddleware : IWorkflowStepMiddleware
{
private const string StepContextKey = "WorkflowStepContext";
private const int MaxRetries = 3;
private readonly ILogger<PollyRetryStepMiddleware> _log;

public PollyRetryMiddleware(ILogger<PollyRetryStepMiddleware> log)
{
_log = log;
}

// Consult Polly's docs for more information on how to build
// retry policies:
// https://github.com/App-vNext/Polly
public IAsyncPolicy<ExecutionResult> GetRetryPolicy() =>
Policy<ExecutionResult>
.Handle<TimeoutException>()
.RetryAsync(
MaxRetries,
(result, retryCount, context) =>
UpdateRetryCount(
result.Exception,
retryCount,
context[StepContextKey] as IStepExecutionContext)
);

public async Task<ExecutionResult> HandleAsync(
IStepExecutionContext context,
IStepBody body,
WorkflowStepDelegate next
)
{
return await GetRetryPolicy().ExecuteAsync(
ctx => next(),
// The step execution context gets passed down so that
// the step is accessible within the retry policy
new Dictionary<string, object>
{
{ StepContextKey, context }
});
}

private Task UpdateRetryCount(
Exception exception,
int retryCount,
IStepExecutionContext stepContext)
{
var stepInstance = stepContext.ExecutionPointer;
stepInstance.RetryCount = retryCount;
return Task.CompletedTask;
}
}
```

## Pre/Post Workflow Middleware

Workflow middleware run either before a workflow starts or after a workflow completes and can be used to hook into the workflow lifecycle or alter the workflow itself before it is started.

### Pre Workflow Middleware

These middleware get run before the workflow is started and can potentially alter properties on the `WorkflowInstance`.

The following example illustrates setting the `Description` property on the `WorkflowInstance` using a middleware that interprets the data on the passed workflow. This is useful in cases where you want the description of the workflow to be derived from the data passed to the workflow.

Note that you use `WorkflowMiddlewarePhase.PreWorkflow` to specify that it runs before the workflow starts.

**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs.

```cs
// AddDescriptionWorkflowMiddleware.cs
public class AddDescriptionWorkflowMiddleware : IWorkflowMiddleware
{
public WorkflowMiddlewarePhase Phase =>
WorkflowMiddlewarePhase.PreWorkflow;

public Task HandleAsync(
WorkflowInstance workflow,
WorkflowDelegate next
)
{
if (workflow.Data is IDescriptiveWorkflowParams descriptiveParams)
{
workflow.Description = descriptiveParams.Description;
}

return next();
}
}

// IDescriptiveWorkflowParams.cs
public interface IDescriptiveWorkflowParams
{
string Description { get; }
}

// MyWorkflowParams.cs
public MyWorkflowParams : IDescriptiveWorkflowParams
{
public string Description => $"Run task '{TaskName}'";

public string TaskName { get; set; }
}
```

### Exception Handling in Pre Workflow Middleware

Pre workflow middleware exception handling gets treated differently from post workflow middleware. Since the middleware runs before the workflow starts, any exceptions thrown within a pre workflow middleware will bubble up to the `StartWorkflow` method and it is up to the caller of `StartWorkflow` to handle the exception and act accordingly.

```cs
public async Task MyMethodThatStartsAWorkflow()
{
try
{
await host.StartWorkflow("HelloWorld", 1, null);
}
catch(Exception ex)
{
// Handle the exception appropriately
}
}
```

### Post Workflow Middleware

These middleware get run after the workflow has completed and can be used to perform additional actions for all workflows in your app.

The following example illustrates how you can use a post workflow middleware to print a summary of the workflow to console.

Note that you use `WorkflowMiddlewarePhase.PostWorkflow` to specify that it runs after the workflow completes.

**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs.

```cs
public class PrintWorkflowSummaryMiddleware : IWorkflowMiddleware
{
private readonly ILogger<PrintWorkflowSummaryMiddleware> _log;

public PrintWorkflowSummaryMiddleware(
ILogger<PrintWorkflowSummaryMiddleware> log
)
{
_log = log;
}

public WorkflowMiddlewarePhase Phase =>
WorkflowMiddlewarePhase.PostWorkflow;

public Task HandleAsync(
WorkflowInstance workflow,
WorkflowDelegate next
)
{
if (!workflow.CompleteTime.HasValue)
{
return next();
}

var duration = workflow.CompleteTime.Value - workflow.CreateTime;
_log.LogInformation($@"Workflow {workflow.Description} completed in {duration:g}");

foreach (var step in workflow.ExecutionPointers)
{
var stepName = step.StepName;
var stepDuration = (step.EndTime - step.StartTime) ?? TimeSpan.Zero;
_log.LogInformation($" - Step {stepName} completed in {stepDuration:g}");
}

return next();
}
}
```

### Exception Handling in Post Workflow Middleware

Post workflow middleware exception handling gets treated differently from pre workflow middleware. At the time that the workflow completes, your workflow has ran already so an uncaught exception would be difficult to act on.

By default, if a workflow middleware throws an exception, it will be logged and the workflow will complete as normal. This behavior can be changed, however.

To override the default post workflow error handling for all workflows in your app, just register a new `IWorkflowMiddlewareErrorHandler` in the dependency injection framework with your custom behavior as follows.

```cs
// CustomMiddlewareErrorHandler.cs
public class CustomHandler : IWorkflowMiddlewareErrorHandler
{
public Task HandleAsync(Exception ex)
{
// Handle your error asynchronously
}
}

// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// Other workflow configuration
services.AddWorkflow();

// Should go after .AddWorkflow()
services.AddTransient<IWorkflowMiddlewareErrorHandler, CustomHandler>();
}
```

## Registering Middleware

In order for middleware to take effect, they must be registered with the built-in dependency injection framework using the convenience helpers.

**Note:** Middleware will be run in the order that they are registered with middleware that are registered earlier running earlier in the chain and finishing later in the chain. For pre/post workflow middleware, all pre middleware will be run before a workflow starts and all post middleware will be run after a workflow completes.

```cs
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
...

// Add workflow middleware
services.AddWorkflowMiddleware<AddDescriptionWorkflowMiddleware>();
services.AddWorkflowMiddleware<PrintWorkflowSummaryMiddleware>();

// Add step middleware
services.AddWorkflowStepMiddleware<LogCorrelationStepMiddleware>();
services.AddWorkflowStepMiddleware<PollyRetryMiddleware>();

...
}
}
```

## More Information

See the [Workflow Middleware](https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample19) sample for full examples of workflow middleware in action.
1 change: 1 addition & 0 deletions mkdocs.yml
Expand Up @@ -9,6 +9,7 @@ nav:
- Saga transactions: sagas.md
- JSON / YAML Definitions: json-yaml.md
- Persistence: persistence.md
- Middleware: workflow-middleware.md
- Multi-node clusters: multi-node-clusters.md
- ASP.NET Core: using-with-aspnet-core.md
- Elasticsearch plugin: elastic-search.md
Expand Down
1 change: 0 additions & 1 deletion src/WorkflowCore.DSL/Models/v1/DefinitionSourceV1.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.DefinitionStorage.v1
{
Expand Down
4 changes: 2 additions & 2 deletions src/WorkflowCore.DSL/Services/DefinitionLoader.cs
Expand Up @@ -93,7 +93,7 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty

AttachInputs(nextStep, dataType, stepType, targetStep);
AttachOutputs(nextStep, dataType, stepType, targetStep);

if (nextStep.Do != null)
{
foreach (var branch in nextStep.Do)
Expand Down Expand Up @@ -242,7 +242,7 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste
var outcomeParameter = Expression.Parameter(typeof(object), "outcome");

foreach (var nextStep in source.SelectNextStep)
{
{
var sourceDelegate = DynamicExpressionParser.ParseLambda(new[] { dataParameter, outcomeParameter }, typeof(object), nextStep.Value).Compile();
Expression<Func<object, object, bool>> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome));
step.Outcomes.Add(new ExpressionOutcome<object>(sourceExpr)
Expand Down
22 changes: 22 additions & 0 deletions src/WorkflowCore/Interface/IStepExecutor.cs
@@ -0,0 +1,22 @@
using System.Threading.Tasks;
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
/// <summary>
/// Executes a workflow step.
/// </summary>
public interface IStepExecutor
{
/// <summary>
/// Runs the passed <see cref="IStepBody"/> in the given <see cref="IStepExecutionContext"/>.
/// </summary>
/// <param name="context">The <see cref="IStepExecutionContext"/> in which to execute the step.</param>
/// <param name="body">The <see cref="IStepBody"/> body.</param>
/// <returns>A <see cref="Task{ExecutionResult}"/> to wait for the result of running the step</returns>
Task<ExecutionResult> ExecuteStep(
IStepExecutionContext context,
IStepBody body
);
}
}
7 changes: 3 additions & 4 deletions src/WorkflowCore/Interface/IWorkflowBuilder.cs
Expand Up @@ -9,7 +9,7 @@ public interface IWorkflowBuilder
{
List<WorkflowStep> Steps { get; }

int LastStep { get; }
int LastStep { get; }

IWorkflowBuilder<T> UseData<T>();

Expand All @@ -21,7 +21,7 @@ public interface IWorkflowBuilder
}

public interface IWorkflowBuilder<TData> : IWorkflowBuilder, IWorkflowModifier<TData, InlineStepBody>
{
{
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
Expand All @@ -33,6 +33,5 @@ public interface IWorkflowBuilder<TData> : IWorkflowBuilder, IWorkflowModifier<T
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

IWorkflowBuilder<TData> CreateBranch();

}
}
}