Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MyWorkflow : IWorkflow
}
```

* Resilient service orchestration
* Saga Transactions

```c#
public class MyWorkflow : IWorkflow
Expand All @@ -70,6 +70,21 @@ public class MyWorkflow : IWorkflow
}
```

```c#
builder
.StartWith<LogStart>()
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10))
.Then<LogEnd>();
```

## Persistence

Since workflows are typically long running processes, they will need to be persisted to storage between steps.
Expand Down Expand Up @@ -106,6 +121,8 @@ There are several persistence providers available as separate Nuget packages.

* [Parallel Tasks](src/samples/WorkflowCore.Sample13)

* [Saga Transactions (with compensation)](src/samples/WorkflowCore.Sample17)

* [Scheduled Background Tasks](src/samples/WorkflowCore.Sample16)

* [Recurring Background Tasks](src/samples/WorkflowCore.Sample14)
Expand Down
141 changes: 141 additions & 0 deletions ReleaseNotes/1.6.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Workflow Core 1.6.0


* Added Saga transaction feature
* Added `.CompensateWith` feature


#### Specifying compensation steps for each component of a saga transaction

In this sample, if `Task2` throws an exception, then `UndoTask2` and `UndoTask1` will be triggered.

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.CompensateWith<UndoTask1>()
.Then<DoTask2>()
.CompensateWith<UndoTask2>()
.Then<DoTask3>()
.CompensateWith<UndoTask3>()
)
.Then<SayGoodbye>();
```

#### Retrying a failed transaction

This particular example will retry the entire saga every 5 seconds

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.CompensateWith<UndoTask1>()
.Then<DoTask2>()
.CompensateWith<UndoTask2>()
.Then<DoTask3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then<SayGoodbye>();
```

#### Compensating the entire transaction

You could also only specify a master compensation step, as follows

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.Then<DoTask2>()
.Then<DoTask3>()
)
.CompensateWithSequence(comp => comp
.StartWith<UndoTask1>()
.Then<UndoTask2>()
.Then<UndoTask3>()
)
.Then<SayGoodbye>();
```

#### Passing parameters

Parameters can be passed to a compensation step as follows

```c#
builder
.StartWith<SayHello>()
.CompensateWith<PrintMessage>(compensate =>
{
compensate.Input(step => step.Message, data => "undoing...");
})
```


### Expressing a saga in JSON

A saga transaction can be expressed in JSON, by using the `WorkflowCore.Primitives.Sequence` step and setting the `Saga` parameter to `true`.

The compensation steps can be defined by specifying the `CompensateWith` parameter.

```json
{
"Id": "Saga-Sample",
"Version": 1,
"DataType": "MyApp.MyDataClass, MyApp",
"Steps": [
{
"Id": "Hello",
"StepType": "MyApp.HelloWorld, MyApp",
"NextStepId": "MySaga"
},
{
"Id": "MySaga",
"StepType": "WorkflowCore.Primitives.Sequence, WorkflowCore",
"NextStepId": "Bye",
"Saga": true,
"Do": [
[
{
"Id": "do1",
"StepType": "MyApp.Task1, MyApp",
"NextStepId": "do2",
"CompensateWith": [
{
"Id": "undo1",
"StepType": "MyApp.UndoTask1, MyApp"
}
]
},
{
"Id": "do2",
"StepType": "MyApp.Task2, MyApp",
"CompensateWith": [
{
"Id": "undo2-1",
"NextStepId": "undo2-2",
"StepType": "MyApp.UndoTask2, MyApp"
},
{
"Id": "undo2-2",
"StepType": "MyApp.DoSomethingElse, MyApp"
}
]
}
]
]
},
{
"Id": "Bye",
"StepType": "MyApp.GoodbyeWorld, MyApp"
}
]
}
```
10 changes: 9 additions & 1 deletion WorkflowCore.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2008
VisualStudioVersion = 15.0.27130.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}"
EndProject
Expand Down Expand Up @@ -90,6 +90,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
ReleaseNotes\1.3.2.md = ReleaseNotes\1.3.2.md
ReleaseNotes\1.3.3.md = ReleaseNotes\1.3.3.md
ReleaseNotes\1.4.0.md = ReleaseNotes\1.4.0.md
ReleaseNotes\1.6.0.md = ReleaseNotes\1.6.0.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"
Expand All @@ -106,6 +107,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample16", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchPad\ScratchPad.csproj", "{6396453F-4D0E-4CD4-BC89-87E8970F2A80}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample17", "src\samples\WorkflowCore.Sample17\WorkflowCore.Sample17.csproj", "{42F475BC-95F4-42E1-8CCD-7B9C27487E33}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -268,6 +271,10 @@ Global
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.Build.0 = Release|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.Build.0 = Debug|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.ActiveCfg = Release|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -315,6 +322,7 @@ Global
{9B7811AC-68D6-4D19-B1E9-65423393ED83} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
{0C9617A9-C8B7-45F6-A54A-261A23AC881B} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
{6396453F-4D0E-4CD4-BC89-87E8970F2A80} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
{42F475BC-95F4-42E1-8CCD-7B9C27487E33} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}
Expand Down
12 changes: 12 additions & 0 deletions src/WorkflowCore/Interface/IExecutionPointerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IExecutionPointerFactory
{
ExecutionPointer BuildStartingPointer(WorkflowDefinition def);
ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, ExecutionPointer pointer, ExecutionPointer exceptionPointer, int compensationStepId);
ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointer pointer, StepOutcome outcomeTarget);
ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch);
}
}
11 changes: 11 additions & 0 deletions src/WorkflowCore/Interface/IExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IExecutionResultProcessor
{
void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step);
void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult);
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IStepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface
{
public interface IStepBody
{
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
}
}
37 changes: 37 additions & 0 deletions src/WorkflowCore/Interface/IStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IParallelStepBuilder<TData, Sequence> Parallel();

/// <summary>
/// Execute a sequence of steps in a container
/// </summary>
/// <returns></returns>
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

/// <summary>
/// Schedule a block of steps to execute in parallel sometime in the future
/// </summary>
Expand All @@ -177,5 +183,36 @@ public interface IStepBuilder<TData, TStepBody>
/// <param name="until">Resolves a condition to stop the recurring task</param>
/// <returns></returns>
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);


/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <typeparam name="TStep">The type of the step to execute</typeparam>
/// <param name="stepSetup">Configure additional parameters for this step</param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowExecutor
{
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, WorkflowOptions options);
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class StepSourceV1

public List<List<StepSourceV1>> Do { get; set; } = new List<List<StepSourceV1>>();

public List<StepSourceV1> CompensateWith { get; set; } = new List<StepSourceV1>();

public bool Saga { get; set; } = false;

public string NextStepId { get; set; }

public Dictionary<string, string> Inputs { get; set; } = new Dictionary<string, string>();
Expand Down
16 changes: 16 additions & 0 deletions src/WorkflowCore/Models/ExecutionPointer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,21 @@ public class ExecutionPointer
public string PredecessorId { get; set; }

public object Outcome { get; set; }

public PointerStatus Status { get; set; } = PointerStatus.Legacy;

public Stack<string> Scope { get; set; } = new Stack<string>();
}

public enum PointerStatus
{
Legacy = 0,
Pending = 1,
Running = 2,
Complete = 3,
Sleeping = 4,
WaitingForEvent = 5,
Failed = 6,
Compensated = 7
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/StepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public abstract class StepBody : IStepBody
public Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
return Task.FromResult(Run(context));
}
}

protected ExecutionResult OutcomeResult(object value)
{
Expand Down
3 changes: 2 additions & 1 deletion src/WorkflowCore/Models/WorkflowDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum WorkflowErrorHandling
{
Retry = 0,
Suspend = 1,
Terminate = 2
Terminate = 2,
Compensate = 3
}
}
12 changes: 11 additions & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ public abstract class WorkflowStep

public WorkflowErrorHandling? ErrorBehavior { get; set; }

public TimeSpan? RetryInterval { get; set; }
public TimeSpan? RetryInterval { get; set; }

public int? CompensationStepId { get; set; }

public virtual bool ResumeChildrenAfterCompensation => true;

public virtual bool RevertChildrenAfterCompensation => false;

public virtual ExecutionPipelineDirective InitForExecution(WorkflowExecutorResult executorResult, WorkflowDefinition defintion, WorkflowInstance workflow, ExecutionPointer executionPointer)
{
Expand All @@ -41,6 +47,10 @@ public virtual void AfterExecute(WorkflowExecutorResult executorResult, IStepExe
{
}

public virtual void PrimeForRetry(ExecutionPointer pointer)
{
}

/// <summary>
/// Called after every workflow execution round,
/// every exectuon pointer with no end time, even if this step was not executed in this round
Expand Down
Loading