diff --git a/README.md b/README.md index 5b66ba4a3..9ad87fb24 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ public class MyWorkflow : IWorkflow } ``` -* Resilient service orchestration +* Saga Transactions ```c# public class MyWorkflow : IWorkflow @@ -70,6 +70,21 @@ public class MyWorkflow : IWorkflow } ``` +```c# +builder + .StartWith() + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)) + .Then(); +``` + ## Persistence Since workflows are typically long running processes, they will need to be persisted to storage between steps. @@ -106,6 +121,8 @@ There are several persistence providers available as separate Nuget packages. * [Parallel Tasks](src/samples/WorkflowCore.Sample13) +* [Saga Transactions (with compensation)](src/samples/WorkflowCore.Sample17) + * [Scheduled Background Tasks](src/samples/WorkflowCore.Sample16) * [Recurring Background Tasks](src/samples/WorkflowCore.Sample14) diff --git a/ReleaseNotes/1.6.0.md b/ReleaseNotes/1.6.0.md new file mode 100644 index 000000000..01a065991 --- /dev/null +++ b/ReleaseNotes/1.6.0.md @@ -0,0 +1,141 @@ +# Workflow Core 1.6.0 + + +* Added Saga transaction feature +* Added `.CompensateWith` feature + + +#### Specifying compensation steps for each component of a saga transaction + +In this sample, if `Task2` throws an exception, then `UndoTask2` and `UndoTask1` will be triggered. + +```c# +builder + .StartWith() + .CompensateWith() + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .Then(); +``` + +#### Retrying a failed transaction + +This particular example will retry the entire saga every 5 seconds + +```c# +builder + .StartWith() + .CompensateWith() + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)) + .Then(); +``` + +#### Compensating the entire transaction + +You could also only specify a master compensation step, as follows + +```c# +builder + .StartWith() + .CompensateWith() + .Saga(saga => saga + .StartWith() + .Then() + .Then() + ) + .CompensateWithSequence(comp => comp + .StartWith() + .Then() + .Then() + ) + .Then(); +``` + +#### Passing parameters + +Parameters can be passed to a compensation step as follows + +```c# +builder + .StartWith() + .CompensateWith(compensate => + { + compensate.Input(step => step.Message, data => "undoing..."); + }) +``` + + +### Expressing a saga in JSON + +A saga transaction can be expressed in JSON, by using the `WorkflowCore.Primitives.Sequence` step and setting the `Saga` parameter to `true`. + +The compensation steps can be defined by specifying the `CompensateWith` parameter. + +```json +{ + "Id": "Saga-Sample", + "Version": 1, + "DataType": "MyApp.MyDataClass, MyApp", + "Steps": [ + { + "Id": "Hello", + "StepType": "MyApp.HelloWorld, MyApp", + "NextStepId": "MySaga" + }, + { + "Id": "MySaga", + "StepType": "WorkflowCore.Primitives.Sequence, WorkflowCore", + "NextStepId": "Bye", + "Saga": true, + "Do": [ + [ + { + "Id": "do1", + "StepType": "MyApp.Task1, MyApp", + "NextStepId": "do2", + "CompensateWith": [ + { + "Id": "undo1", + "StepType": "MyApp.UndoTask1, MyApp" + } + ] + }, + { + "Id": "do2", + "StepType": "MyApp.Task2, MyApp", + "CompensateWith": [ + { + "Id": "undo2-1", + "NextStepId": "undo2-2", + "StepType": "MyApp.UndoTask2, MyApp" + }, + { + "Id": "undo2-2", + "StepType": "MyApp.DoSomethingElse, MyApp" + } + ] + } + ] + ] + }, + { + "Id": "Bye", + "StepType": "MyApp.GoodbyeWorld, MyApp" + } + ] +} +``` diff --git a/WorkflowCore.sln b/WorkflowCore.sln index 06e23da0f..e027e43a5 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.27004.2008 +VisualStudioVersion = 15.0.27130.2010 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}" EndProject @@ -90,6 +90,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote ReleaseNotes\1.3.2.md = ReleaseNotes\1.3.2.md ReleaseNotes\1.3.3.md = ReleaseNotes\1.3.3.md ReleaseNotes\1.4.0.md = ReleaseNotes\1.4.0.md + ReleaseNotes\1.6.0.md = ReleaseNotes\1.6.0.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}" @@ -106,6 +107,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample16", "sr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchPad\ScratchPad.csproj", "{6396453F-4D0E-4CD4-BC89-87E8970F2A80}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample17", "src\samples\WorkflowCore.Sample17\WorkflowCore.Sample17.csproj", "{42F475BC-95F4-42E1-8CCD-7B9C27487E33}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -268,6 +271,10 @@ Global {6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Debug|Any CPU.Build.0 = Debug|Any CPU {6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.ActiveCfg = Release|Any CPU {6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.Build.0 = Release|Any CPU + {42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.Build.0 = Debug|Any CPU + {42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.ActiveCfg = Release|Any CPU + {42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -315,6 +322,7 @@ Global {9B7811AC-68D6-4D19-B1E9-65423393ED83} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {0C9617A9-C8B7-45F6-A54A-261A23AC881B} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {6396453F-4D0E-4CD4-BC89-87E8970F2A80} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} + {42F475BC-95F4-42E1-8CCD-7B9C27487E33} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4} diff --git a/src/WorkflowCore/Interface/IExecutionPointerFactory.cs b/src/WorkflowCore/Interface/IExecutionPointerFactory.cs new file mode 100644 index 000000000..f224dc43e --- /dev/null +++ b/src/WorkflowCore/Interface/IExecutionPointerFactory.cs @@ -0,0 +1,12 @@ +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + public interface IExecutionPointerFactory + { + ExecutionPointer BuildStartingPointer(WorkflowDefinition def); + ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, ExecutionPointer pointer, ExecutionPointer exceptionPointer, int compensationStepId); + ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointer pointer, StepOutcome outcomeTarget); + ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch); + } +} \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IExecutionResultProcessor.cs b/src/WorkflowCore/Interface/IExecutionResultProcessor.cs new file mode 100644 index 000000000..7d5e67c03 --- /dev/null +++ b/src/WorkflowCore/Interface/IExecutionResultProcessor.cs @@ -0,0 +1,11 @@ +using System; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + public interface IExecutionResultProcessor + { + void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step); + void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult); + } +} \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IStepBody.cs b/src/WorkflowCore/Interface/IStepBody.cs index d5c23dd30..cc786fd87 100644 --- a/src/WorkflowCore/Interface/IStepBody.cs +++ b/src/WorkflowCore/Interface/IStepBody.cs @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface { public interface IStepBody { - Task RunAsync(IStepExecutionContext context); + Task RunAsync(IStepExecutionContext context); } } diff --git a/src/WorkflowCore/Interface/IStepBuilder.cs b/src/WorkflowCore/Interface/IStepBuilder.cs index c5f32496f..a9fc7b299 100644 --- a/src/WorkflowCore/Interface/IStepBuilder.cs +++ b/src/WorkflowCore/Interface/IStepBuilder.cs @@ -163,6 +163,12 @@ public interface IStepBuilder /// IParallelStepBuilder Parallel(); + /// + /// Execute a sequence of steps in a container + /// + /// + IStepBuilder Saga(Action> builder); + /// /// Schedule a block of steps to execute in parallel sometime in the future /// @@ -177,5 +183,36 @@ public interface IStepBuilder /// Resolves a condition to stop the recurring task /// IContainerStepBuilder Recur(Expression> interval, Expression> until); + + + /// + /// Undo step if unhandled exception is thrown by this step + /// + /// The type of the step to execute + /// Configure additional parameters for this step + /// + IStepBuilder CompensateWith(Action> stepSetup = null) where TStep : IStepBody; + + /// + /// Undo step if unhandled exception is thrown by this step + /// + /// + /// + IStepBuilder CompensateWith(Func body); + + /// + /// Undo step if unhandled exception is thrown by this step + /// + /// + /// + IStepBuilder CompensateWith(Action body); + + /// + /// Undo step if unhandled exception is thrown by this step + /// + /// + /// + IStepBuilder CompensateWithSequence(Action> builder); + } } \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IWorkflowExecutor.cs b/src/WorkflowCore/Interface/IWorkflowExecutor.cs index 710bc037c..798057fdc 100644 --- a/src/WorkflowCore/Interface/IWorkflowExecutor.cs +++ b/src/WorkflowCore/Interface/IWorkflowExecutor.cs @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface { public interface IWorkflowExecutor { - Task Execute(WorkflowInstance workflow, WorkflowOptions options); + Task Execute(WorkflowInstance workflow); } } \ No newline at end of file diff --git a/src/WorkflowCore/Models/DefinitionStorage/v1/StepSourceV1.cs b/src/WorkflowCore/Models/DefinitionStorage/v1/StepSourceV1.cs index 66e3787ee..fe351fa0f 100644 --- a/src/WorkflowCore/Models/DefinitionStorage/v1/StepSourceV1.cs +++ b/src/WorkflowCore/Models/DefinitionStorage/v1/StepSourceV1.cs @@ -20,6 +20,10 @@ public class StepSourceV1 public List> Do { get; set; } = new List>(); + public List CompensateWith { get; set; } = new List(); + + public bool Saga { get; set; } = false; + public string NextStepId { get; set; } public Dictionary Inputs { get; set; } = new Dictionary(); diff --git a/src/WorkflowCore/Models/ExecutionPointer.cs b/src/WorkflowCore/Models/ExecutionPointer.cs index dffef4d2b..0bb15fbee 100644 --- a/src/WorkflowCore/Models/ExecutionPointer.cs +++ b/src/WorkflowCore/Models/ExecutionPointer.cs @@ -40,5 +40,21 @@ public class ExecutionPointer public string PredecessorId { get; set; } public object Outcome { get; set; } + + public PointerStatus Status { get; set; } = PointerStatus.Legacy; + + public Stack Scope { get; set; } = new Stack(); + } + + public enum PointerStatus + { + Legacy = 0, + Pending = 1, + Running = 2, + Complete = 3, + Sleeping = 4, + WaitingForEvent = 5, + Failed = 6, + Compensated = 7 } } diff --git a/src/WorkflowCore/Models/StepBody.cs b/src/WorkflowCore/Models/StepBody.cs index 06f4476ae..5ec927528 100644 --- a/src/WorkflowCore/Models/StepBody.cs +++ b/src/WorkflowCore/Models/StepBody.cs @@ -11,7 +11,7 @@ public abstract class StepBody : IStepBody public Task RunAsync(IStepExecutionContext context) { return Task.FromResult(Run(context)); - } + } protected ExecutionResult OutcomeResult(object value) { diff --git a/src/WorkflowCore/Models/WorkflowDefinition.cs b/src/WorkflowCore/Models/WorkflowDefinition.cs index 71b696083..df0695087 100644 --- a/src/WorkflowCore/Models/WorkflowDefinition.cs +++ b/src/WorkflowCore/Models/WorkflowDefinition.cs @@ -26,6 +26,7 @@ public enum WorkflowErrorHandling { Retry = 0, Suspend = 1, - Terminate = 2 + Terminate = 2, + Compensate = 3 } } diff --git a/src/WorkflowCore/Models/WorkflowStep.cs b/src/WorkflowCore/Models/WorkflowStep.cs index 1c1a35601..6d9bb012a 100644 --- a/src/WorkflowCore/Models/WorkflowStep.cs +++ b/src/WorkflowCore/Models/WorkflowStep.cs @@ -25,7 +25,13 @@ public abstract class WorkflowStep public WorkflowErrorHandling? ErrorBehavior { get; set; } - public TimeSpan? RetryInterval { get; set; } + public TimeSpan? RetryInterval { get; set; } + + public int? CompensationStepId { get; set; } + + public virtual bool ResumeChildrenAfterCompensation => true; + + public virtual bool RevertChildrenAfterCompensation => false; public virtual ExecutionPipelineDirective InitForExecution(WorkflowExecutorResult executorResult, WorkflowDefinition defintion, WorkflowInstance workflow, ExecutionPointer executionPointer) { @@ -41,6 +47,10 @@ public virtual void AfterExecute(WorkflowExecutorResult executorResult, IStepExe { } + public virtual void PrimeForRetry(ExecutionPointer pointer) + { + } + /// /// Called after every workflow execution round, /// every exectuon pointer with no end time, even if this step was not executed in this round diff --git a/src/WorkflowCore/Primitives/SagaContainer.cs b/src/WorkflowCore/Primitives/SagaContainer.cs new file mode 100644 index 000000000..2034f6bd4 --- /dev/null +++ b/src/WorkflowCore/Primitives/SagaContainer.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; +using WorkflowCore.Exceptions; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Primitives +{ + public class SagaContainer : WorkflowStep + where TStepBody : IStepBody + { + public override bool ResumeChildrenAfterCompensation => false; + public override bool RevertChildrenAfterCompensation => true; + + public override void PrimeForRetry(ExecutionPointer pointer) + { + base.PrimeForRetry(pointer); + pointer.PersistenceData = null; + } + } +} diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index 3ac784332..203ad535d 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -38,6 +38,8 @@ public static void AddWorkflow(this IServiceCollection services, Action(); services.AddTransient(); services.AddTransient(); + services.AddTransient(); + services.AddTransient(); services.AddTransient, InjectedObjectPoolPolicy>(); services.AddTransient, InjectedObjectPoolPolicy>(); diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index c7384f909..296f8deb3 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -44,7 +44,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance var executor = _executorPool.Get(); try { - result = await executor.Execute(workflow, Options); + result = await executor.Execute(workflow); } finally { diff --git a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs index 2897470fc..28b1a3643 100644 --- a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs +++ b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs @@ -59,7 +59,8 @@ private List ConvertSteps(ICollection source, Type d int i = 0; var stack = new Stack(source.Reverse()); var parents = new List(); - + var compensatables = new List(); + while (stack.Count > 0) { var nextStep = stack.Pop(); @@ -76,7 +77,13 @@ private List ConvertSteps(ICollection source, Type d var cancelExpr = DynamicExpressionParser.ParseLambda(new[] { dataParameter }, typeof(bool), nextStep.CancelCondition); targetStep = (containerType.GetConstructor(new Type[] { cancelExprType }).Invoke(new[] { cancelExpr }) as WorkflowStep); } - + + if (nextStep.Saga) //TODO: cancellable saga??? + { + containerType = typeof(SagaContainer<>).MakeGenericType(stepType); + targetStep = (containerType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep); + } + targetStep.Id = i; targetStep.Name = nextStep.Name; targetStep.ErrorBehavior = nextStep.ErrorBehavior; @@ -97,7 +104,16 @@ private List ConvertSteps(ICollection source, Type d if (nextStep.Do.Count > 0) parents.Add(nextStep); } - + + if (nextStep.CompensateWith != null) + { + foreach (var compChild in nextStep.CompensateWith.Reverse()) + stack.Push(compChild); + + if (nextStep.CompensateWith.Count > 0) + compensatables.Add(nextStep); + } + if (!string.IsNullOrEmpty(nextStep.NextStepId)) targetStep.Outcomes.Add(new StepOutcome() { Tag = $"{nextStep.NextStepId}" }); @@ -135,6 +151,18 @@ private List ConvertSteps(ICollection source, Type d } } + foreach (var item in compensatables) + { + var target = result.Single(x => x.Tag == item.Id); + var tag = item.CompensateWith.Select(x => x.Id).FirstOrDefault(); + if (tag != null) + { + var compStep = result.FirstOrDefault(x => x.Tag == tag); + if (compStep != null) + target.CompensationStepId = compStep.Id; + } + } + return result; } diff --git a/src/WorkflowCore/Services/ExecutionPointerFactory.cs b/src/WorkflowCore/Services/ExecutionPointerFactory.cs new file mode 100644 index 000000000..ae74b6821 --- /dev/null +++ b/src/WorkflowCore/Services/ExecutionPointerFactory.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services +{ + public class ExecutionPointerFactory : IExecutionPointerFactory + { + + public ExecutionPointer BuildStartingPointer(WorkflowDefinition def) + { + return new ExecutionPointer + { + Id = GenerateId(), + StepId = 0, + Active = true, + Status = PointerStatus.Pending, + StepName = Enumerable.First(def.Steps, x => x.Id == 0).Name + }; + } + + public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointer pointer, StepOutcome outcomeTarget) + { + var nextId = GenerateId(); + return new ExecutionPointer() + { + Id = nextId, + PredecessorId = pointer.Id, + StepId = outcomeTarget.NextStep, + Active = true, + ContextItem = pointer.ContextItem, + Status = PointerStatus.Pending, + StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name, + Scope = new Stack(pointer.Scope) + }; + } + + public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch) + { + var childPointerId = GenerateId(); + var childScope = new Stack(pointer.Scope); + childScope.Push(pointer.Id); + pointer.Children.Add(childPointerId); + + return new ExecutionPointer() + { + Id = childPointerId, + PredecessorId = pointer.Id, + StepId = childDefinitionId, + Active = true, + ContextItem = branch, + Status = PointerStatus.Pending, + StepName = def.Steps.First(x => x.Id == childDefinitionId).Name, + Scope = childScope + }; + } + + public ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, ExecutionPointer pointer, ExecutionPointer exceptionPointer, int compensationStepId) + { + var nextId = GenerateId(); + return new ExecutionPointer() + { + Id = nextId, + PredecessorId = exceptionPointer.Id, + StepId = compensationStepId, + Active = true, + ContextItem = pointer.ContextItem, + Status = PointerStatus.Pending, + StepName = def.Steps.First(x => x.Id == compensationStepId).Name, + Scope = new Stack(pointer.Scope) + }; + } + + private string GenerateId() + { + return Guid.NewGuid().ToString(); + } + } +} diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs new file mode 100644 index 000000000..8dc946ae8 --- /dev/null +++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs @@ -0,0 +1,187 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services +{ + public class ExecutionResultProcessor : IExecutionResultProcessor + { + private readonly IExecutionPointerFactory _pointerFactory; + private readonly IDateTimeProvider _datetimeProvider; + private readonly ILogger _logger; + private readonly WorkflowOptions _options; + + public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTimeProvider datetimeProvider, WorkflowOptions options, ILoggerFactory loggerFactory) + { + _pointerFactory = pointerFactory; + _datetimeProvider = datetimeProvider; + _options = options; + _logger = loggerFactory.CreateLogger(); + } + + public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult) + { + pointer.PersistenceData = result.PersistenceData; + pointer.Outcome = result.OutcomeValue; + if (result.SleepFor.HasValue) + { + pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(result.SleepFor.Value); + pointer.Status = PointerStatus.Sleeping; + } + + if (!string.IsNullOrEmpty(result.EventName)) + { + pointer.EventName = result.EventName; + pointer.EventKey = result.EventKey; + pointer.Active = false; + pointer.Status = PointerStatus.WaitingForEvent; + + workflowResult.Subscriptions.Add(new EventSubscription() + { + WorkflowId = workflow.Id, + StepId = pointer.StepId, + EventName = pointer.EventName, + EventKey = pointer.EventKey, + SubscribeAsOf = result.EventAsOf + }); + } + + if (result.Proceed) + { + pointer.Active = false; + pointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + pointer.Status = PointerStatus.Complete; + + foreach (var outcomeTarget in step.Outcomes.Where(x => object.Equals(x.GetValue(workflow.Data), result.OutcomeValue) || x.GetValue(workflow.Data) == null)) + { + workflow.ExecutionPointers.Add(_pointerFactory.BuildNextPointer(def, pointer, outcomeTarget)); + } + } + else + { + foreach (var branch in result.BranchValues) + { + foreach (var childDefId in step.Children) + { + workflow.ExecutionPointers.Add(_pointerFactory.BuildChildPointer(def, pointer, childDefId, branch)); + } + } + } + } + + public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step) + { + pointer.Status = PointerStatus.Failed; + var compensatingStepId = FindScopeCompensationStepId(workflow, def, pointer); + var errorOption = (step.ErrorBehavior ?? (compensatingStepId.HasValue ? WorkflowErrorHandling.Compensate : def.DefaultErrorBehavior)); + SelectErrorStrategy(errorOption, workflow, def, pointer, step); + } + + private void SelectErrorStrategy(WorkflowErrorHandling errorOption, WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step) + { + switch (errorOption) + { + case WorkflowErrorHandling.Retry: + pointer.RetryCount++; + pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? _options.ErrorRetryInterval); + step.PrimeForRetry(pointer); + break; + case WorkflowErrorHandling.Suspend: + workflow.Status = WorkflowStatus.Suspended; + break; + case WorkflowErrorHandling.Terminate: + workflow.Status = WorkflowStatus.Terminated; + break; + case WorkflowErrorHandling.Compensate: + Compensate(workflow, def, pointer); + break; + } + } + + private void Compensate(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer exceptionPointer) + { + var scope = new Stack(exceptionPointer.Scope); + scope.Push(exceptionPointer.Id); + + exceptionPointer.Active = false; + exceptionPointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + exceptionPointer.Status = PointerStatus.Failed; + + while (scope.Any()) + { + var pointerId = scope.Pop(); + var pointer = workflow.ExecutionPointers.First(x => x.Id == pointerId); + var step = def.Steps.First(x => x.Id == pointer.StepId); + + var resume = true; + var revert = false; + + if (scope.Any()) + { + var parentId = scope.Peek(); + var parentPointer = workflow.ExecutionPointers.First(x => x.Id == parentId); + var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); + resume = parentStep.ResumeChildrenAfterCompensation; + revert = parentStep.RevertChildrenAfterCompensation; + } + + if ((step.ErrorBehavior ?? WorkflowErrorHandling.Compensate) != WorkflowErrorHandling.Compensate) + { + SelectErrorStrategy(step.ErrorBehavior ?? WorkflowErrorHandling.Retry, workflow, def, pointer, step); + continue; + } + + if (step.CompensationStepId.HasValue) + { + pointer.Active = false; + pointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + pointer.Status = PointerStatus.Compensated; + + var compensationPointer = _pointerFactory.BuildCompensationPointer(def, pointer, exceptionPointer, step.CompensationStepId.Value); + workflow.ExecutionPointers.Add(compensationPointer); + + if (resume) + { + foreach (var outcomeTarget in step.Outcomes.Where(x => x.GetValue(workflow.Data) == null)) + workflow.ExecutionPointers.Add(_pointerFactory.BuildNextPointer(def, pointer, outcomeTarget)); + } + } + + if (revert) + { + var prevSiblings = workflow.ExecutionPointers.Where(x => pointer.Scope.SequenceEqual(x.Scope) && x.Id != pointer.Id && x.Status == PointerStatus.Complete).ToList(); + foreach (var siblingPointer in prevSiblings) + { + var siblingStep = def.Steps.First(x => x.Id == siblingPointer.StepId); + if (siblingStep.CompensationStepId.HasValue) + { + var compensationPointer = _pointerFactory.BuildCompensationPointer(def, siblingPointer, exceptionPointer, siblingStep.CompensationStepId.Value); + workflow.ExecutionPointers.Add(compensationPointer); + siblingPointer.Status = PointerStatus.Compensated; + } + } + } + } + } + + private int? FindScopeCompensationStepId(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer currentPointer) + { + var scope = new Stack(currentPointer.Scope); + scope.Push(currentPointer.Id); + + while (scope.Count > 0) + { + var pointerId = scope.Pop(); + var pointer = workflow.ExecutionPointers.First(x => x.Id == pointerId); + var step = def.Steps.First(x => x.Id == pointer.StepId); + if (step.CompensationStepId.HasValue) + return step.CompensationStepId.Value; + } + + return null; + } + } +} \ No newline at end of file diff --git a/src/WorkflowCore/Services/FluentBuilders/ReturnStepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/ReturnStepBuilder.cs index 30c010b6c..0777533b5 100644 --- a/src/WorkflowCore/Services/FluentBuilders/ReturnStepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/ReturnStepBuilder.cs @@ -24,7 +24,7 @@ public ReturnStepBuilder(IWorkflowBuilder workflowBuilder, WorkflowStep Do(Action> builder) { builder.Invoke(WorkflowBuilder); - Step.Children.Add(Step.Id + 1); //TODO: make more elegant + Step.Children.Add(Step.Id + 1); //TODO: make more elegant return _referenceBuilder; } diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs index 2a2417f3c..25cd3b6c7 100644 --- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs @@ -333,6 +333,18 @@ public IContainerStepBuilder When(Expression Saga(Action> builder) + { + var newStep = new SagaContainer(); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + Step.Outcomes.Add(new StepOutcome() { NextStep = newStep.Id }); + builder.Invoke(WorkflowBuilder); + stepBuilder.Step.Children.Add(stepBuilder.Step.Id + 1); //TODO: make more elegant + + return stepBuilder; + } + public IParallelStepBuilder Parallel() { var newStep = new WorkflowStep(); @@ -383,7 +395,56 @@ public IContainerStepBuilder Recur(Expression Do(Action> builder) { builder.Invoke(WorkflowBuilder); - Step.Children.Add(Step.Id + 1); //TODO: make more elegant + Step.Children.Add(Step.Id + 1); //TODO: make more elegant + + return this; + } + + public IStepBuilder CompensateWith(Action> stepSetup = null) where TStep : IStepBody + { + WorkflowStep newStep = new WorkflowStep(); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + + if (stepSetup != null) + { + stepSetup.Invoke(stepBuilder); + } + + newStep.Name = newStep.Name ?? typeof(TStep).Name; + Step.CompensationStepId = newStep.Id; + + return this; + } + + public IStepBuilder CompensateWith(Func body) + { + WorkflowStepInline newStep = new WorkflowStepInline(); + newStep.Body = body; + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + Step.CompensationStepId = newStep.Id; + return this; + } + + public IStepBuilder CompensateWith(Action body) + { + var newStep = new WorkflowStep(); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + stepBuilder.Input(x => x.Body, x => body); + Step.CompensationStepId = newStep.Id; + return this; + } + + public IStepBuilder CompensateWithSequence(Action> builder) + { + var newStep = new WorkflowStep(); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + Step.CompensationStepId = newStep.Id; + builder.Invoke(WorkflowBuilder); + stepBuilder.Step.Children.Add(stepBuilder.Step.Id + 1); //TODO: make more elegant return this; } diff --git a/src/WorkflowCore/Services/WorkflowController.cs b/src/WorkflowCore/Services/WorkflowController.cs index 9938aa58a..c9f68ccdf 100644 --- a/src/WorkflowCore/Services/WorkflowController.cs +++ b/src/WorkflowCore/Services/WorkflowController.cs @@ -16,15 +16,17 @@ public class WorkflowController : IWorkflowController private readonly IDistributedLockProvider _lockProvider; private readonly IWorkflowRegistry _registry; private readonly IQueueProvider _queueProvider; + private readonly IExecutionPointerFactory _pointerFactory; private readonly ILogger _logger; - public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, ILoggerFactory loggerFactory) + public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, IExecutionPointerFactory pointerFactory, ILoggerFactory loggerFactory) { _persistenceStore = persistenceStore; _lockProvider = lockProvider; _registry = registry; _queueProvider = queueProvider; + _pointerFactory = pointerFactory; _logger = loggerFactory.CreateLogger(); } @@ -70,13 +72,7 @@ public async Task StartWorkflow(string workflowId, int? version, wf.Data = TypeExtensions.GetConstructor(def.DataType, new Type[] { }).Invoke(null); } - wf.ExecutionPointers.Add(new ExecutionPointer - { - Id = Guid.NewGuid().ToString(), - StepId = 0, - Active = true, - StepName = Enumerable.First(def.Steps, x => x.Id == 0).Name - }); + wf.ExecutionPointers.Add(_pointerFactory.BuildStartingPointer(def)); string id = await _persistenceStore.CreateNewWorkflow(wf); await _queueProvider.QueueWork(id, QueueType.Workflow); diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 0e8e85602..53c5918b6 100644 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -13,23 +13,26 @@ namespace WorkflowCore.Services { public class WorkflowExecutor : IWorkflowExecutor { - protected readonly IWorkflowRegistry _registry; protected readonly IServiceProvider _serviceProvider; protected readonly IDateTimeProvider _datetimeProvider; protected readonly ILogger _logger; + private readonly IExecutionResultProcessor _executionResultProcessor; + private readonly WorkflowOptions _options; private IWorkflowHost Host => _serviceProvider.GetService(); - public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IDateTimeProvider datetimeProvider, ILoggerFactory loggerFactory) + public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IDateTimeProvider datetimeProvider, IExecutionResultProcessor executionResultProcessor, WorkflowOptions options, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; _registry = registry; _datetimeProvider = datetimeProvider; + _options = options; _logger = loggerFactory.CreateLogger(); + _executionResultProcessor = executionResultProcessor; } - public async Task Execute(WorkflowInstance workflow, WorkflowOptions options) + public async Task Execute(WorkflowInstance workflow) { var wfResult = new WorkflowExecutorResult(); @@ -48,6 +51,7 @@ public async Task Execute(WorkflowInstance workflow, Wor { try { + pointer.Status = PointerStatus.Running; switch (step.InitForExecution(wfResult, def, workflow, pointer)) { case ExecutionPipelineDirective.Defer: @@ -70,7 +74,7 @@ public async Task Execute(WorkflowInstance workflow, Wor if (body == null) { _logger.LogError("Unable to construct step body {0}", step.BodyType.ToString()); - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(options.ErrorRetryInterval); + pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); wfResult.Errors.Add(new ExecutionError() { WorkflowId = workflow.Id, @@ -109,12 +113,11 @@ public async Task Execute(WorkflowInstance workflow, Wor ProcessOutputs(workflow, step, body); } - ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); + _executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); step.AfterExecute(wfResult, context, result, pointer); } catch (Exception ex) { - pointer.RetryCount++; _logger.LogError("Workflow {0} raised error on step {1} Message: {2}", workflow.Id, pointer.StepId, ex.Message); wfResult.Errors.Add(new ExecutionError() { @@ -124,26 +127,14 @@ public async Task Execute(WorkflowInstance workflow, Wor Message = ex.Message }); - switch (step.ErrorBehavior ?? def.DefaultErrorBehavior) - { - case WorkflowErrorHandling.Retry: - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? options.ErrorRetryInterval); - break; - case WorkflowErrorHandling.Suspend: - workflow.Status = WorkflowStatus.Suspended; - break; - case WorkflowErrorHandling.Terminate: - workflow.Status = WorkflowStatus.Terminated; - break; - } - + _executionResultProcessor.HandleStepException(workflow, def, pointer, step); Host.ReportStepError(workflow, step, ex); } } else { _logger.LogError("Unable to find step {0} in workflow definition", pointer.StepId); - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(options.ErrorRetryInterval); + pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); wfResult.Errors.Add(new ExecutionError() { WorkflowId = workflow.Id, @@ -160,73 +151,6 @@ public async Task Execute(WorkflowInstance workflow, Wor return wfResult; } - private void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult) - { - //TODO: refactor this into it's own class - pointer.PersistenceData = result.PersistenceData; - pointer.Outcome = result.OutcomeValue; - if (result.SleepFor.HasValue) - { - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(result.SleepFor.Value); - } - - if (!string.IsNullOrEmpty(result.EventName)) - { - pointer.EventName = result.EventName; - pointer.EventKey = result.EventKey; - pointer.Active = false; - - workflowResult.Subscriptions.Add(new EventSubscription() - { - WorkflowId = workflow.Id, - StepId = pointer.StepId, - EventName = pointer.EventName, - EventKey = pointer.EventKey, - SubscribeAsOf = result.EventAsOf - }); - } - - if (result.Proceed) - { - pointer.Active = false; - pointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); - - foreach (var outcomeTarget in step.Outcomes.Where(x => object.Equals(x.GetValue(workflow.Data), result.OutcomeValue) || x.GetValue(workflow.Data) == null)) - { - workflow.ExecutionPointers.Add(new ExecutionPointer() - { - Id = Guid.NewGuid().ToString(), - PredecessorId = pointer.Id, - StepId = outcomeTarget.NextStep, - Active = true, - ContextItem = pointer.ContextItem, - StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name - }); - } - } - else - { - foreach (var branch in result.BranchValues) - { - foreach (var childDefId in step.Children) - { - var childPointerId = Guid.NewGuid().ToString(); - workflow.ExecutionPointers.Add(new ExecutionPointer() - { - Id = childPointerId, - PredecessorId = pointer.Id, - StepId = childDefId, - Active = true, - ContextItem = branch, - StepName = def.Steps.First(x => x.Id == childDefId).Name - }); - - pointer.Children.Add(childPointerId); - } - } - } - } - private void ProcessInputs(WorkflowInstance workflow, WorkflowStep step, IStepBody body, IStepExecutionContext context) { //TODO: Move to own class diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 65a034753..a2b677f4a 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -16,9 +16,9 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 1.4.1 - 1.4.1.0 - 1.4.1.0 + 1.6.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs index 936999f6d..9018bf73d 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs @@ -59,6 +59,11 @@ internal static PersistedWorkflow ToPersistable(this WorkflowInstance instance, persistedEP.EventPublished = ep.EventPublished; persistedEP.EventData = JsonConvert.SerializeObject(ep.EventData, SerializerSettings); persistedEP.Outcome = JsonConvert.SerializeObject(ep.Outcome, SerializerSettings); + persistedEP.Status = ep.Status; + + persistedEP.Scope = string.Empty; + foreach (var item in ep.Scope) + persistedEP.Scope += item + ";"; foreach (var attr in ep.ExtensionAttributes) { @@ -96,7 +101,7 @@ internal static PersistedSubscription ToPersistable(this EventSubscription insta result.EventName = instance.EventName; result.StepId = instance.StepId; result.WorkflowId = instance.WorkflowId; - result.SubscribeAsOf = instance.SubscribeAsOf; + result.SubscribeAsOf = DateTime.SpecifyKind(instance.SubscribeAsOf, DateTimeKind.Utc); return result; } @@ -107,7 +112,7 @@ internal static PersistedEvent ToPersistable(this Event instance) result.EventId = new Guid(instance.Id); result.EventKey = instance.EventKey; result.EventName = instance.EventName; - result.EventTime = instance.EventTime; + result.EventTime = DateTime.SpecifyKind(instance.EventTime, DateTimeKind.Utc); result.IsProcessed = instance.IsProcessed; result.EventData = JsonConvert.SerializeObject(instance.EventData, SerializerSettings); @@ -125,9 +130,10 @@ internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow insta result.Version = instance.Version; result.WorkflowDefinitionId = instance.WorkflowDefinitionId; result.Status = instance.Status; - result.CreateTime = instance.CreateTime; - result.CompleteTime = instance.CompleteTime; - + result.CreateTime = DateTime.SpecifyKind(instance.CreateTime, DateTimeKind.Utc); + if (result.CompleteTime.HasValue) + result.CompleteTime = DateTime.SpecifyKind(instance.CompleteTime.Value, DateTimeKind.Utc); + foreach (var ep in instance.ExecutionPointers) { var pointer = new ExecutionPointer(); @@ -136,10 +142,18 @@ internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow insta pointer.Id = ep.Id; pointer.StepId = ep.StepId; pointer.Active = ep.Active; - pointer.SleepUntil = ep.SleepUntil; + + if (ep.SleepUntil.HasValue) + pointer.SleepUntil = DateTime.SpecifyKind(ep.SleepUntil.Value, DateTimeKind.Utc); + pointer.PersistenceData = JsonConvert.DeserializeObject(ep.PersistenceData ?? string.Empty, SerializerSettings); - pointer.StartTime = ep.StartTime; - pointer.EndTime = ep.EndTime; + + if (ep.StartTime.HasValue) + pointer.StartTime = DateTime.SpecifyKind(ep.StartTime.Value, DateTimeKind.Utc); + + if (ep.EndTime.HasValue) + pointer.EndTime = DateTime.SpecifyKind(ep.EndTime.Value, DateTimeKind.Utc); + pointer.StepName = ep.StepName; pointer.RetryCount = ep.RetryCount; @@ -154,6 +168,10 @@ internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow insta pointer.EventPublished = ep.EventPublished; pointer.EventData = JsonConvert.DeserializeObject(ep.EventData ?? string.Empty, SerializerSettings); pointer.Outcome = JsonConvert.DeserializeObject(ep.Outcome ?? string.Empty, SerializerSettings); + pointer.Status = ep.Status; + + if (!string.IsNullOrEmpty(ep.Scope)) + pointer.Scope = new Stack(ep.Scope.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries)); foreach (var attr in ep.ExtensionAttributes) { @@ -172,7 +190,7 @@ internal static EventSubscription ToEventSubscription(this PersistedSubscription result.EventName = instance.EventName; result.StepId = instance.StepId; result.WorkflowId = instance.WorkflowId; - result.SubscribeAsOf = instance.SubscribeAsOf; + result.SubscribeAsOf = DateTime.SpecifyKind(instance.SubscribeAsOf, DateTimeKind.Utc); return result; } @@ -183,7 +201,7 @@ internal static Event ToEvent(this PersistedEvent instance) result.Id = instance.EventId.ToString(); result.EventKey = instance.EventKey; result.EventName = instance.EventName; - result.EventTime = instance.EventTime; + result.EventTime = DateTime.SpecifyKind(instance.EventTime, DateTimeKind.Utc); result.IsProcessed = instance.IsProcessed; result.EventData = JsonConvert.DeserializeObject(instance.EventData, SerializerSettings); diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedExecutionPointer.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedExecutionPointer.cs index 2f0b1500c..984e27735 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedExecutionPointer.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedExecutionPointer.cs @@ -58,5 +58,9 @@ public class PersistedExecutionPointer public string PredecessorId { get; set; } public string Outcome { get; set; } + + public PointerStatus Status { get; set; } = PointerStatus.Legacy; + + public string Scope { get; set; } } } diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 5477d9c12..aa095a281 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -250,6 +250,7 @@ public async Task> GetSubcriptions(string eventNa _mutex.WaitOne(); try { + asOf = asOf.ToUniversalTime(); var raw = await Set() .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf) .ToListAsync(); @@ -306,6 +307,7 @@ public async Task> GetRunnableEvents(DateTime asAt) _mutex.WaitOne(); try { + asAt = asAt.ToUniversalTime(); var raw = await Set() .Where(x => !x.IsProcessed) .Where(x => x.EventTime <= now) diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj b/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj index ff685184c..851918adb 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj @@ -15,10 +15,10 @@ false false false - 1.5.0 + 1.6.0 Base package for Workflow-core peristence providers using entity framework - 1.5.0.0 - 1.5.0.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index 53e41d493..fea94b5bb 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -15,7 +15,6 @@ namespace WorkflowCore.Persistence.MongoDB.Services { public class MongoPersistenceProvider : IPersistenceProvider { - private readonly IMongoDatabase _database; public MongoPersistenceProvider(IMongoDatabase database) diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/WorkflowCore.Persistence.MongoDB.csproj b/src/providers/WorkflowCore.Persistence.MongoDB/WorkflowCore.Persistence.MongoDB.csproj index 4a4e88a8a..6d006caf0 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/WorkflowCore.Persistence.MongoDB.csproj +++ b/src/providers/WorkflowCore.Persistence.MongoDB/WorkflowCore.Persistence.MongoDB.csproj @@ -17,10 +17,10 @@ false false false - 1.4.0 + 1.6.0 Provides support to persist workflows running on Workflow Core to a MongoDB database. - 1.4.0.0 - 1.4.0.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.Designer.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.Designer.cs new file mode 100644 index 000000000..8aaa6a901 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.Designer.cs @@ -0,0 +1,244 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using System; +using WorkflowCore.Models; +using WorkflowCore.Persistence.PostgreSQL; + +namespace WorkflowCore.Persistence.PostgreSQL.Migrations +{ + [DbContext(typeof(PostgresPersistenceProvider))] + [Migration("20171223020844_StepScope")] + partial class StepScope + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("EventData"); + + b.Property("EventId"); + + b.Property("EventKey") + .HasMaxLength(200); + + b.Property("EventName") + .HasMaxLength(200); + + b.Property("EventTime"); + + b.Property("IsProcessed"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventId") + .IsUnique(); + + b.HasIndex("EventTime"); + + b.HasIndex("IsProcessed"); + + b.HasIndex("EventName", "EventKey"); + + b.ToTable("Event","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("ErrorTime"); + + b.Property("ExecutionPointerId") + .HasMaxLength(100); + + b.Property("Message"); + + b.Property("WorkflowId") + .HasMaxLength(100); + + b.HasKey("PersistenceId"); + + b.ToTable("ExecutionError","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("Active"); + + b.Property("Children"); + + b.Property("ContextItem"); + + b.Property("EndTime"); + + b.Property("EventData"); + + b.Property("EventKey") + .HasMaxLength(100); + + b.Property("EventName") + .HasMaxLength(100); + + b.Property("EventPublished"); + + b.Property("Id") + .HasMaxLength(50); + + b.Property("Outcome"); + + b.Property("PersistenceData"); + + b.Property("PredecessorId") + .HasMaxLength(100); + + b.Property("RetryCount"); + + b.Property("Scope"); + + b.Property("SleepUntil"); + + b.Property("StartTime"); + + b.Property("Status"); + + b.Property("StepId"); + + b.Property("StepName") + .HasMaxLength(100); + + b.Property("WorkflowId"); + + b.HasKey("PersistenceId"); + + b.HasIndex("WorkflowId"); + + b.ToTable("ExecutionPointer","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("AttributeKey") + .HasMaxLength(100); + + b.Property("AttributeValue"); + + b.Property("ExecutionPointerId"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecutionPointerId"); + + b.ToTable("ExtensionAttribute","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("EventKey") + .HasMaxLength(200); + + b.Property("EventName") + .HasMaxLength(200); + + b.Property("StepId"); + + b.Property("SubscribeAsOf"); + + b.Property("SubscriptionId") + .HasMaxLength(200); + + b.Property("WorkflowId") + .HasMaxLength(200); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventKey"); + + b.HasIndex("EventName"); + + b.HasIndex("SubscriptionId") + .IsUnique(); + + b.ToTable("Subscription","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd(); + + b.Property("CompleteTime"); + + b.Property("CreateTime"); + + b.Property("Data"); + + b.Property("Description") + .HasMaxLength(500); + + b.Property("InstanceId") + .HasMaxLength(200); + + b.Property("NextExecution"); + + b.Property("Reference") + .HasMaxLength(200); + + b.Property("Status"); + + b.Property("Version"); + + b.Property("WorkflowDefinitionId") + .HasMaxLength(200); + + b.HasKey("PersistenceId"); + + b.HasIndex("InstanceId") + .IsUnique(); + + b.HasIndex("NextExecution"); + + b.ToTable("Workflow","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", "Workflow") + .WithMany("ExecutionPointers") + .HasForeignKey("WorkflowId") + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", "ExecutionPointer") + .WithMany("ExtensionAttributes") + .HasForeignKey("ExecutionPointerId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.cs new file mode 100644 index 000000000..439bfd655 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20171223020844_StepScope.cs @@ -0,0 +1,39 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace WorkflowCore.Persistence.PostgreSQL.Migrations +{ + public partial class StepScope : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "Scope", + schema: "wfc", + table: "ExecutionPointer", + nullable: true); + + migrationBuilder.AddColumn( + name: "Status", + schema: "wfc", + table: "ExecutionPointer", + nullable: false, + defaultValue: 0); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "Scope", + schema: "wfc", + table: "ExecutionPointer"); + + migrationBuilder.DropColumn( + name: "Status", + schema: "wfc", + table: "ExecutionPointer"); + + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs index 70df92086..7fc91a776 100644 --- a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs @@ -1,10 +1,13 @@ -using System; +// using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.EntityFrameworkCore.Migrations; -using WorkflowCore.Persistence.PostgreSQL; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using System; using WorkflowCore.Models; +using WorkflowCore.Persistence.PostgreSQL; namespace WorkflowCore.Persistence.PostgreSQL.Migrations { @@ -13,9 +16,10 @@ partial class PostgresPersistenceProviderModelSnapshot : ModelSnapshot { protected override void BuildModel(ModelBuilder modelBuilder) { +#pragma warning disable 612, 618 modelBuilder .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.SerialColumn) - .HasAnnotation("ProductVersion", "1.1.2"); + .HasAnnotation("ProductVersion", "2.0.1-rtm-125"); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => { @@ -47,11 +51,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("EventName", "EventKey"); - b.ToTable("PersistedEvent"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "Event"); + b.ToTable("Event","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => @@ -71,11 +71,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("PersistenceId"); - b.ToTable("PersistedExecutionError"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "ExecutionError"); + b.ToTable("ExecutionError","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -113,10 +109,14 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("RetryCount"); + b.Property("Scope"); + b.Property("SleepUntil"); b.Property("StartTime"); + b.Property("Status"); + b.Property("StepId"); b.Property("StepName") @@ -128,11 +128,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("WorkflowId"); - b.ToTable("PersistedExecutionPointer"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "ExecutionPointer"); + b.ToTable("ExecutionPointer","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => @@ -151,11 +147,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("ExecutionPointerId"); - b.ToTable("PersistedExtensionAttribute"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "ExtensionAttribute"); + b.ToTable("ExtensionAttribute","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => @@ -188,11 +180,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("SubscriptionId") .IsUnique(); - b.ToTable("PersistedSubscription"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "Subscription"); + b.ToTable("Subscription","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => @@ -231,11 +219,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("NextExecution"); - b.ToTable("PersistedWorkflow"); - - b.HasAnnotation("Npgsql:Schema", "wfc"); - - b.HasAnnotation("Npgsql:TableName", "Workflow"); + b.ToTable("Workflow","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -253,6 +237,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("ExecutionPointerId") .OnDelete(DeleteBehavior.Cascade); }); +#pragma warning restore 612, 618 } } } diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/WorkflowCore.Persistence.PostgreSQL.csproj b/src/providers/WorkflowCore.Persistence.PostgreSQL/WorkflowCore.Persistence.PostgreSQL.csproj index 33e16bb50..6dcd7f3b2 100644 --- a/src/providers/WorkflowCore.Persistence.PostgreSQL/WorkflowCore.Persistence.PostgreSQL.csproj +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/WorkflowCore.Persistence.PostgreSQL.csproj @@ -16,9 +16,9 @@ false false Provides support to persist workflows running on Workflow Core to a PostgreSQL database. - 1.5.0 - 1.5.0.0 - 1.5.0.0 + 1.6.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.Designer.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.Designer.cs new file mode 100644 index 000000000..21ede0209 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.Designer.cs @@ -0,0 +1,250 @@ +// +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using System; +using WorkflowCore.Models; +using WorkflowCore.Persistence.SqlServer; + +namespace WorkflowCore.Persistence.SqlServer.Migrations +{ + [DbContext(typeof(SqlServerPersistenceProvider))] + [Migration("20171223020645_StepScope")] + partial class StepScope + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "2.0.1-rtm-125") + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("EventData"); + + b.Property("EventId"); + + b.Property("EventKey") + .HasMaxLength(200); + + b.Property("EventName") + .HasMaxLength(200); + + b.Property("EventTime"); + + b.Property("IsProcessed"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventId") + .IsUnique(); + + b.HasIndex("EventTime"); + + b.HasIndex("IsProcessed"); + + b.HasIndex("EventName", "EventKey"); + + b.ToTable("Event","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("ErrorTime"); + + b.Property("ExecutionPointerId") + .HasMaxLength(100); + + b.Property("Message"); + + b.Property("WorkflowId") + .HasMaxLength(100); + + b.HasKey("PersistenceId"); + + b.ToTable("ExecutionError","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("Active"); + + b.Property("Children"); + + b.Property("ContextItem"); + + b.Property("EndTime"); + + b.Property("EventData"); + + b.Property("EventKey") + .HasMaxLength(100); + + b.Property("EventName") + .HasMaxLength(100); + + b.Property("EventPublished"); + + b.Property("Id") + .HasMaxLength(50); + + b.Property("Outcome"); + + b.Property("PersistenceData"); + + b.Property("PredecessorId") + .HasMaxLength(100); + + b.Property("RetryCount"); + + b.Property("Scope"); + + b.Property("SleepUntil"); + + b.Property("StartTime"); + + b.Property("Status"); + + b.Property("StepId"); + + b.Property("StepName") + .HasMaxLength(100); + + b.Property("WorkflowId"); + + b.HasKey("PersistenceId"); + + b.HasIndex("WorkflowId"); + + b.ToTable("ExecutionPointer","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("AttributeKey") + .HasMaxLength(100); + + b.Property("AttributeValue"); + + b.Property("ExecutionPointerId"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecutionPointerId"); + + b.ToTable("ExtensionAttribute","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("EventKey") + .HasMaxLength(200); + + b.Property("EventName") + .HasMaxLength(200); + + b.Property("StepId"); + + b.Property("SubscribeAsOf"); + + b.Property("SubscriptionId") + .HasMaxLength(200); + + b.Property("WorkflowId") + .HasMaxLength(200); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventKey"); + + b.HasIndex("EventName"); + + b.HasIndex("SubscriptionId") + .IsUnique(); + + b.ToTable("Subscription","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("CompleteTime"); + + b.Property("CreateTime"); + + b.Property("Data"); + + b.Property("Description") + .HasMaxLength(500); + + b.Property("InstanceId") + .HasMaxLength(200); + + b.Property("NextExecution"); + + b.Property("Reference") + .HasMaxLength(200); + + b.Property("Status"); + + b.Property("Version"); + + b.Property("WorkflowDefinitionId") + .HasMaxLength(200); + + b.HasKey("PersistenceId"); + + b.HasIndex("InstanceId") + .IsUnique(); + + b.HasIndex("NextExecution"); + + b.ToTable("Workflow","wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", "Workflow") + .WithMany("ExecutionPointers") + .HasForeignKey("WorkflowId") + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", "ExecutionPointer") + .WithMany("ExtensionAttributes") + .HasForeignKey("ExecutionPointerId") + .OnDelete(DeleteBehavior.Cascade); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.cs new file mode 100644 index 000000000..e9ff500e1 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20171223020645_StepScope.cs @@ -0,0 +1,39 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using System; +using System.Collections.Generic; + +namespace WorkflowCore.Persistence.SqlServer.Migrations +{ + public partial class StepScope : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "Scope", + schema: "wfc", + table: "ExecutionPointer", + nullable: true); + + migrationBuilder.AddColumn( + name: "Status", + schema: "wfc", + table: "ExecutionPointer", + nullable: false, + defaultValue: 0); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "Scope", + schema: "wfc", + table: "ExecutionPointer"); + + migrationBuilder.DropColumn( + name: "Status", + schema: "wfc", + table: "ExecutionPointer"); + + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs index e230f1026..82e959442 100644 --- a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs @@ -1,10 +1,13 @@ -using System; +// using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.EntityFrameworkCore.Migrations; -using WorkflowCore.Persistence.SqlServer; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore.Storage.Internal; +using System; using WorkflowCore.Models; +using WorkflowCore.Persistence.SqlServer; namespace WorkflowCore.Persistence.SqlServer.Migrations { @@ -13,8 +16,9 @@ partial class SqlServerPersistenceProviderModelSnapshot : ModelSnapshot { protected override void BuildModel(ModelBuilder modelBuilder) { +#pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "1.1.2") + .HasAnnotation("ProductVersion", "2.0.1-rtm-125") .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => @@ -48,11 +52,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("EventName", "EventKey"); - b.ToTable("PersistedEvent"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "Event"); + b.ToTable("Event","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => @@ -73,11 +73,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("PersistenceId"); - b.ToTable("PersistedExecutionError"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "ExecutionError"); + b.ToTable("ExecutionError","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -116,10 +112,14 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("RetryCount"); + b.Property("Scope"); + b.Property("SleepUntil"); b.Property("StartTime"); + b.Property("Status"); + b.Property("StepId"); b.Property("StepName") @@ -131,11 +131,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("WorkflowId"); - b.ToTable("PersistedExecutionPointer"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "ExecutionPointer"); + b.ToTable("ExecutionPointer","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => @@ -155,11 +151,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("ExecutionPointerId"); - b.ToTable("PersistedExtensionAttribute"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "ExtensionAttribute"); + b.ToTable("ExtensionAttribute","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => @@ -193,11 +185,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("SubscriptionId") .IsUnique(); - b.ToTable("PersistedSubscription"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "Subscription"); + b.ToTable("Subscription","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => @@ -237,11 +225,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("NextExecution"); - b.ToTable("PersistedWorkflow"); - - b.HasAnnotation("SqlServer:Schema", "wfc"); - - b.HasAnnotation("SqlServer:TableName", "Workflow"); + b.ToTable("Workflow","wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -259,6 +243,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("ExecutionPointerId") .OnDelete(DeleteBehavior.Cascade); }); +#pragma warning restore 612, 618 } } } diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/WorkflowCore.Persistence.SqlServer.csproj b/src/providers/WorkflowCore.Persistence.SqlServer/WorkflowCore.Persistence.SqlServer.csproj index 4747087fb..717973bbf 100644 --- a/src/providers/WorkflowCore.Persistence.SqlServer/WorkflowCore.Persistence.SqlServer.csproj +++ b/src/providers/WorkflowCore.Persistence.SqlServer/WorkflowCore.Persistence.SqlServer.csproj @@ -15,10 +15,10 @@ false false false - 1.5.0 + 1.6.0 Provides support to persist workflows running on Workflow Core to a SQL Server database. - 1.5.0.0 - 1.5.0.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/providers/WorkflowCore.Persistence.Sqlite/WorkflowCore.Persistence.Sqlite.csproj b/src/providers/WorkflowCore.Persistence.Sqlite/WorkflowCore.Persistence.Sqlite.csproj index 8f90b7231..51964723b 100644 --- a/src/providers/WorkflowCore.Persistence.Sqlite/WorkflowCore.Persistence.Sqlite.csproj +++ b/src/providers/WorkflowCore.Persistence.Sqlite/WorkflowCore.Persistence.Sqlite.csproj @@ -16,9 +16,9 @@ false false Provides support to persist workflows running on Workflow Core to a Sqlite database. - 1.5.0 - 1.5.0.0 - 1.5.0.0 + 1.6.0 + 1.6.0.0 + 1.6.0.0 diff --git a/src/samples/WorkflowCore.Sample17/CompensatingWorkflow.cs b/src/samples/WorkflowCore.Sample17/CompensatingWorkflow.cs new file mode 100644 index 000000000..94db0806e --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/CompensatingWorkflow.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using WorkflowCore.Interface; +using WorkflowCore.Sample17.Steps; + +namespace WorkflowCore.Sample17 +{ + class CompensatingWorkflow : IWorkflow + { + public string Id => "compensate-sample"; + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => Console.WriteLine("Begin")) + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)) + .Then(context => Console.WriteLine("End")); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Program.cs b/src/samples/WorkflowCore.Sample17/Program.cs new file mode 100644 index 000000000..534c42e97 --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Program.cs @@ -0,0 +1,39 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.Interface; + +namespace WorkflowCore.Sample17 +{ + class Program + { + static void Main(string[] args) + { + var serviceProvider = ConfigureServices(); + + //start the workflow host + var host = serviceProvider.GetService(); + host.RegisterWorkflow(); + host.Start(); + + Console.WriteLine("Starting workflow..."); + var workflowId = host.StartWorkflow("compensate-sample").Result; + + Console.ReadLine(); + host.Stop(); + } + + private static IServiceProvider ConfigureServices() + { + //setup dependency injection + IServiceCollection services = new ServiceCollection(); + services.AddLogging(); + services.AddWorkflow(); + //services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow")); + //services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true)); + //services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=5432;Database=workflow;User Id=postgres;", true, true)); + + var serviceProvider = services.BuildServiceProvider(); + return serviceProvider; + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/README.md b/src/samples/WorkflowCore.Sample17/README.md new file mode 100644 index 000000000..1926e829e --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/README.md @@ -0,0 +1,129 @@ +# Saga transaction with compensation sample + +Illustrates how to encapsulate a sequence of steps within a saga transaction and specify compensation steps for each. + +In the sample, `Task2` will throw an exception, then `UndoTask2` and `UndoTask1` will be triggered. + +```c# +builder + .StartWith(context => Console.WriteLine("Begin")) + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)) + .Then(context => Console.WriteLine("End")); +``` + +## Retry policy for failed saga transaction + +This particular example will retry the saga every 5 seconds, but you could also simply fail completely, and process a master compensation task for the whole saga. + +```c# +builder + .StartWith(context => Console.WriteLine("Begin")) + .Saga(saga => saga + .StartWith() + .CompensateWith() + .Then() + .CompensateWith() + .Then() + .CompensateWith() + ) + .CompensateWith() + .Then(context => Console.WriteLine("End")); +``` + +## Compensate entire saga transaction + +You could also only specify a master compensation step, as follows + +```c# +builder + .StartWith(context => Console.WriteLine("Begin")) + .Saga(saga => saga + .StartWith() + .Then() + .Then() + ) + .CompensateWith() + .Then(context => Console.WriteLine("End")); +``` + +## Passing parameters to compensation steps + +Parameters can be passed to a compensation step as follows + +```c# +builder + .StartWith() + .CompensateWith(compensate => + { + compensate.Input(step => step.Message, data => "undoing..."); + }) +``` + +## Expressing a saga in JSON + +A saga transaction can be expressed in JSON, by using the `WorkflowCore.Primitives.Sequence` step and setting the `Saga` parameter to `true`. + +The compensation steps can be defined by specifying the `CompensateWith` parameter. + +```json +{ + "Id": "Saga-Sample", + "Version": 1, + "DataType": "MyApp.MyDataClass, MyApp", + "Steps": [ + { + "Id": "Hello", + "StepType": "MyApp.HelloWorld, MyApp", + "NextStepId": "MySaga" + }, + { + "Id": "MySaga", + "StepType": "WorkflowCore.Primitives.Sequence, WorkflowCore", + "NextStepId": "Bye", + "Saga": true, + "Do": [ + [ + { + "Id": "do1", + "StepType": "MyApp.Task1, MyApp", + "NextStepId": "do2", + "CompensateWith": [ + { + "Id": "undo1", + "StepType": "MyApp.UndoTask1, MyApp" + } + ] + }, + { + "Id": "do2", + "StepType": "MyApp.Task2, MyApp", + "CompensateWith": [ + { + "Id": "undo2-1", + "NextStepId": "undo2-2", + "StepType": "MyApp.UndoTask2, MyApp" + }, + { + "Id": "undo2-2", + "StepType": "MyApp.DoSomethingElse, MyApp" + } + ] + } + ] + ] + }, + { + "Id": "Bye", + "StepType": "MyApp.GoodbyeWorld, MyApp" + } + ] +} +``` \ No newline at end of file diff --git a/src/samples/WorkflowCore.Sample17/Steps/CustomMessage.cs b/src/samples/WorkflowCore.Sample17/Steps/CustomMessage.cs new file mode 100644 index 000000000..4eeb68067 --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/CustomMessage.cs @@ -0,0 +1,17 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class CustomMessage : StepBody + { + public string Message { get; set; } + + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine(Message); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/Task1.cs b/src/samples/WorkflowCore.Sample17/Steps/Task1.cs new file mode 100644 index 000000000..a3bf79bbc --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/Task1.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class Task1 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Doing Task 1"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/Task2.cs b/src/samples/WorkflowCore.Sample17/Steps/Task2.cs new file mode 100644 index 000000000..d4c7e460a --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/Task2.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class Task2 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Doing Task 2"); + throw new Exception(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/Task3.cs b/src/samples/WorkflowCore.Sample17/Steps/Task3.cs new file mode 100644 index 000000000..2f0d73bfc --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/Task3.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class Task3 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Doing Task 3"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/UndoTask1.cs b/src/samples/WorkflowCore.Sample17/Steps/UndoTask1.cs new file mode 100644 index 000000000..6cf208d27 --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/UndoTask1.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class UndoTask1 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Undoing Task 1"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/UndoTask2.cs b/src/samples/WorkflowCore.Sample17/Steps/UndoTask2.cs new file mode 100644 index 000000000..25ac22ced --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/UndoTask2.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class UndoTask2 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Undoing Task 2"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/Steps/UndoTask3.cs b/src/samples/WorkflowCore.Sample17/Steps/UndoTask3.cs new file mode 100644 index 000000000..d0041ee8c --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/Steps/UndoTask3.cs @@ -0,0 +1,15 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample17.Steps +{ + public class UndoTask3 : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Undoing Task 3"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample17/WorkflowCore.Sample17.csproj b/src/samples/WorkflowCore.Sample17/WorkflowCore.Sample17.csproj new file mode 100644 index 000000000..e58dcab19 --- /dev/null +++ b/src/samples/WorkflowCore.Sample17/WorkflowCore.Sample17.csproj @@ -0,0 +1,19 @@ + + + + Exe + netcoreapp2.0 + + + + + + + + + + + + + + diff --git a/test/ScratchPad/HelloWorld.json b/test/ScratchPad/HelloWorld.json index 50909b312..257cddcdc 100644 --- a/test/ScratchPad/HelloWorld.json +++ b/test/ScratchPad/HelloWorld.json @@ -18,48 +18,54 @@ { "Id": "Print", "StepType": "ScratchPad.PrintMessage, ScratchPad", - "NextStepId": "If", + "NextStepId": "saga", "Inputs": { "Message": "data.Value3 + \" - \" + DateTime.Now.ToString()" } }, { - "Id": "If", - "StepType": "WorkflowCore.Primitives.If, WorkflowCore", + "Id": "saga", + "StepType": "WorkflowCore.Primitives.Sequence, WorkflowCore", "NextStepId": "Bye", - "Inputs": { "Condition": "true" }, + "Saga": true, "Do": [ [ { "Id": "do1", "StepType": "ScratchPad.PrintMessage, ScratchPad", "NextStepId": "do2", - "Inputs": { "Message": "\"inner 1\"" } + "Inputs": { "Message": "\"inner 1\"" }, + "CompensateWith": [ + { + "Id": "comp0", + "StepType": "ScratchPad.PrintMessage, ScratchPad", + "Inputs": { "Message": "\"undoing do1\"" } + } + ] }, { "Id": "do2", - "StepType": "ScratchPad.PrintMessage, ScratchPad", - "Inputs": { "Message": "\"inner 2\"" } - } - ], - [ - { - "Id": "Wait", - "StepType": "WorkflowCore.Primitives.WaitFor, WorkflowCore", - "NextStepId": "Wait2", - "CancelCondition": "false", - "Inputs": { - "EventName": "\"Event1\"", - "EventKey": "\"Key1\"", - "EffectiveDate": "DateTime.Now" - } + "StepType": "ScratchPad.Throw, ScratchPad", + "NextStepId": "do3", + "CompensateWith": [ + { + "Id": "comp1", + "NextStepId": "comp2", + "StepType": "ScratchPad.PrintMessage, ScratchPad", + "Inputs": { "Message": "\"undoing do2\"" } + }, + { + "Id": "comp2", + "StepType": "ScratchPad.PrintMessage, ScratchPad", + "Inputs": { "Message": "\"still undoing do2\"" } + } + ] }, { - "Id": "Wait2", + "Id": "do3", "StepType": "ScratchPad.PrintMessage, ScratchPad", - "Inputs": { "Message": "\"wait 2\"" } + "Inputs": { "Message": "\"inner 3\"" } } ] - ] }, diff --git a/test/ScratchPad/Program.cs b/test/ScratchPad/Program.cs index 8201ffa85..31a869b92 100644 --- a/test/ScratchPad/Program.cs +++ b/test/ScratchPad/Program.cs @@ -31,10 +31,7 @@ public static void Main(string[] args) host.Start(); host.StartWorkflow("HelloWorld", 1, new MyDataClass() { Value3 = "hi there" }); - - Console.WriteLine("Enter value to publish"); - string value = Console.ReadLine(); - host.PublishEvent("Event1", "Key1", value); + Console.ReadLine(); host.Stop(); @@ -76,6 +73,15 @@ public override ExecutionResult Run(IStepExecutionContext context) } } + public class Throw : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("throwing..."); + throw new Exception("up"); + } + } + public class PrintMessage : StepBody { public string Message { get; set; } diff --git a/test/ScratchPad/ScratchPad.csproj b/test/ScratchPad/ScratchPad.csproj index dad3e4a99..c2e79a161 100644 --- a/test/ScratchPad/ScratchPad.csproj +++ b/test/ScratchPad/ScratchPad.csproj @@ -1,25 +1,26 @@  - netcoreapp1.0 + netcoreapp2.0 ScratchPad Exe ScratchPad - 1.0.3 - $(PackageTargetFallback);dnxcore50 false false false + + - - - + + + + diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/CompensationScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/CompensationScenario.cs new file mode 100644 index 000000000..55c336020 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/CompensationScenario.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class CompensationScenario : WorkflowTest + { + public class MyDataClass + { + public bool ThrowException { get; set; } + } + + public class Workflow : IWorkflow + { + public static bool Event1Fired = false; + public static bool Event2Fired = false; + public static bool TailEventFired = false; + public static bool CompensationFired = false; + + public string Id => "CompensationWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .Then(context => + { + Event1Fired = true; + if ((context.Workflow.Data as MyDataClass).ThrowException) + throw new Exception(); + Event2Fired = true; + }) + .CompensateWith(context => CompensationFired = true) + .Then(context => TailEventFired = true); + } + } + + public CompensationScenario() + { + Setup(); + Workflow.Event1Fired = false; + Workflow.Event2Fired = false; + Workflow.CompensationFired = false; + Workflow.TailEventFired = false; + } + + [Fact] + public void NoExceptionScenario() + { + var workflowId = StartWorkflow(new MyDataClass() { ThrowException = false }); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + Workflow.Event1Fired.Should().BeTrue(); + Workflow.Event2Fired.Should().BeTrue(); + Workflow.CompensationFired.Should().BeFalse(); + Workflow.TailEventFired.Should().BeTrue(); + } + + [Fact] + public void ExceptionScenario() + { + var workflowId = StartWorkflow(new MyDataClass() { ThrowException = true }); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(1); + Workflow.Event1Fired.Should().BeTrue(); + Workflow.Event2Fired.Should().BeFalse(); + Workflow.CompensationFired.Should().BeTrue(); + Workflow.TailEventFired.Should().BeTrue(); + } + } +} diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/EventScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/EventScenario.cs index d40652259..86a6703df 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/EventScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/EventScenario.cs @@ -14,7 +14,8 @@ public class EventScenario : WorkflowTest @@ -25,8 +26,10 @@ public void Build(IWorkflowBuilder builder) { builder .StartWith(context => ExecutionResult.Next()) - .WaitFor("MyEvent", data => data.StrValue) - .Output(data => data.StrValue, step => step.EventData); + .WaitFor("MyEvent", data => data.StrValue1, data => DateTime.Now) + .Output(data => data.StrValue1, step => step.EventData) + .WaitFor("MyEvent2", data => data.StrValue2) + .Output(data => data.StrValue2, step => step.EventData); } } @@ -39,14 +42,18 @@ public EventScenario() public void Scenario() { var eventKey = Guid.NewGuid().ToString(); - var workflowId = StartWorkflow(new MyDataClass() { StrValue = eventKey }); + var workflowId = StartWorkflow(new MyDataClass() { StrValue1 = eventKey, StrValue2 = eventKey }); WaitForEventSubscription("MyEvent", eventKey, TimeSpan.FromSeconds(30)); - Host.PublishEvent("MyEvent", eventKey, "Pass"); + Host.PublishEvent("MyEvent", eventKey, "Pass1"); + WaitForEventSubscription("MyEvent2", eventKey, TimeSpan.FromSeconds(30)); + Host.PublishEvent("MyEvent2", eventKey, "Pass2"); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); UnhandledStepErrors.Count.Should().Be(0); - GetData(workflowId).StrValue.Should().Be("Pass"); + GetData(workflowId).StrValue1.Should().Be("Pass1"); + GetData(workflowId).StrValue2.Should().Be("Pass2"); } } } diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaScenario.cs new file mode 100644 index 000000000..70ce71510 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaScenario.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class RetrySagaScenario : WorkflowTest + { + public class MyDataClass + { + } + + public class Workflow : IWorkflow + { + public static int Event1Fired; + public static int Event2Fired; + public static int Event3Fired; + public static int TailEventFired; + public static int Compensation1Fired; + public static int Compensation2Fired; + public static int Compensation3Fired; + public static int Compensation4Fired; + + public string Id => "RetrySagaWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation1Fired++) + .Saga(x => x + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation2Fired++) + .Then(context => + { + Event1Fired++; + if (Event1Fired < 3) + throw new Exception(); + Event2Fired++; + }) + .CompensateWith(context => Compensation3Fired++) + .Then(context => Event3Fired++) + .CompensateWith(context => Compensation4Fired++) + ) + .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1)) + .Then(context => TailEventFired++); + } + } + + public RetrySagaScenario() + { + Setup(); + Workflow.Event1Fired = 0; + Workflow.Event2Fired = 0; + Workflow.Event3Fired = 0; + Workflow.Compensation1Fired = 0; + Workflow.Compensation2Fired = 0; + Workflow.Compensation3Fired = 0; + Workflow.Compensation4Fired = 0; + Workflow.TailEventFired = 0; + } + + [Fact] + public void Scenario() + { + var workflowId = StartWorkflow(new MyDataClass()); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(2); + Workflow.Event1Fired.Should().Be(3); + Workflow.Event2Fired.Should().Be(1); + Workflow.Event3Fired.Should().Be(1); + Workflow.Compensation1Fired.Should().Be(0); + Workflow.Compensation2Fired.Should().Be(2); + Workflow.Compensation3Fired.Should().Be(2); + Workflow.Compensation4Fired.Should().Be(0); + Workflow.TailEventFired.Should().Be(1); + } + } +} diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/SagaScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/SagaScenario.cs new file mode 100644 index 000000000..fea1e4402 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/SagaScenario.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class SagaScenario : WorkflowTest + { + public class MyDataClass + { + public bool ThrowException { get; set; } + } + + public class Workflow : IWorkflow + { + public static bool Event1Fired = false; + public static bool Event2Fired = false; + public static bool Event3Fired = false; + public static bool TailEventFired = false; + public static bool Compensation1Fired = false; + public static bool Compensation2Fired = false; + public static bool Compensation3Fired = false; + public static bool Compensation4Fired = false; + public static bool Compensation5Fired = false; + public static bool Compensation6Fired = false; + + public string Id => "SagaWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation1Fired = true) + .Saga(x => x + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation2Fired = true) + .Then(context => + { + Event1Fired = true; + if ((context.Workflow.Data as MyDataClass).ThrowException) + throw new Exception(); + Event2Fired = true; + }) + .CompensateWith(context => Compensation3Fired = true) + .Then(context => Event3Fired = true) + .CompensateWith(context => Compensation4Fired = true) + ) + .CompensateWith(context => Compensation5Fired = true) + .Then(context => TailEventFired = true) + .CompensateWith(context => Compensation6Fired = true); + } + } + + public SagaScenario() + { + Setup(); + Workflow.Event1Fired = false; + Workflow.Event2Fired = false; + Workflow.Event3Fired = false; + Workflow.Compensation1Fired = false; + Workflow.Compensation2Fired = false; + Workflow.Compensation3Fired = false; + Workflow.Compensation4Fired = false; + Workflow.Compensation5Fired = false; + Workflow.Compensation6Fired = false; + Workflow.TailEventFired = false; + } + + [Fact] + public void NoExceptionScenario() + { + var workflowId = StartWorkflow(new MyDataClass() { ThrowException = false }); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + Workflow.Event1Fired.Should().BeTrue(); + Workflow.Event2Fired.Should().BeTrue(); + Workflow.Event3Fired.Should().BeTrue(); + Workflow.Compensation1Fired.Should().BeFalse(); + Workflow.Compensation2Fired.Should().BeFalse(); + Workflow.Compensation3Fired.Should().BeFalse(); + Workflow.Compensation4Fired.Should().BeFalse(); + Workflow.Compensation5Fired.Should().BeFalse(); + Workflow.Compensation6Fired.Should().BeFalse(); + Workflow.TailEventFired.Should().BeTrue(); + } + + [Fact] + public void ExceptionScenario() + { + var workflowId = StartWorkflow(new MyDataClass() { ThrowException = true }); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(1); + Workflow.Event1Fired.Should().BeTrue(); + Workflow.Event2Fired.Should().BeFalse(); + Workflow.Event3Fired.Should().BeFalse(); + Workflow.Compensation1Fired.Should().BeFalse(); + Workflow.Compensation2Fired.Should().BeTrue(); + Workflow.Compensation3Fired.Should().BeTrue(); + Workflow.Compensation4Fired.Should().BeFalse(); + Workflow.Compensation5Fired.Should().BeTrue(); + Workflow.Compensation6Fired.Should().BeFalse(); + Workflow.TailEventFired.Should().BeTrue(); + } + } +} diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoCompensationScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoCompensationScenario.cs new file mode 100644 index 000000000..00fbec0c9 --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoCompensationScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoCompensationScenario : CompensationScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests")); + } + } +} diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoRetrySagaScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoRetrySagaScenario.cs new file mode 100644 index 000000000..311043260 --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoRetrySagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoRetrySagaScenario : RetrySagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests")); + } + } +} diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoSagaScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoSagaScenario.cs new file mode 100644 index 000000000..1e5e384f9 --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoSagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoSagaScenario : SagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests")); + } + } +} diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresRetrySagaScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresRetrySagaScenario.cs new file mode 100644 index 000000000..f8c792aa6 --- /dev/null +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresRetrySagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.PostgreSQL.Scenarios +{ + [Collection("Postgres collection")] + public class PostgresRetrySagaScenario : RetrySagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresSagaScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresSagaScenario.cs new file mode 100644 index 000000000..38f304029 --- /dev/null +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresSagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.PostgreSQL.Scenarios +{ + [Collection("Postgres collection")] + public class PostgresSagaScenario : SagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerCompenstationScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerCompenstationScenario.cs new file mode 100644 index 000000000..2114b5d19 --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerCompenstationScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerCompenstationScenario : CompensationScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerRetrySagaScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerRetrySagaScenario.cs new file mode 100644 index 000000000..258ca298f --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerRetrySagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerRetrySagaScenario : RetrySagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSagaScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSagaScenario.cs new file mode 100644 index 000000000..2f1bf58ae --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSagaScenario.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerSagaScenario : SagaScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs index 3c22ed1be..a315eba59 100644 --- a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs +++ b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs @@ -55,7 +55,8 @@ public void GetWorkflowInstance() { Id = Guid.NewGuid().ToString(), Active = true, - StepId = 0 + StepId = 0, + SleepUntil = new DateTime(2000, 1, 1).ToUniversalTime() }); var workflowId = Subject.CreateNewWorkflow(workflow).Result; diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs index 735fb206d..3705152d3 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs @@ -58,6 +58,7 @@ public void Build(IWorkflowBuilder builder) protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected IWorkflowRegistry Registry; + protected IExecutionResultProcessor ResultProcesser; protected WorkflowOptions Options; public WorkflowExecutorFixture() @@ -66,10 +67,15 @@ public WorkflowExecutorFixture() IServiceCollection services = new ServiceCollection(); services.AddLogging(); + //TODO: mock these dependencies to make true unit tests Options = new WorkflowOptions(); + services.AddSingleton(Options); services.AddTransient(); services.AddTransient(); - + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + Host = A.Fake(); PersistenceProvider = A.Fake(); var serviceProvider = services.BuildServiceProvider(); @@ -79,8 +85,9 @@ public WorkflowExecutorFixture() loggerFactory.AddConsole(LogLevel.Debug); Registry = serviceProvider.GetService(); + ResultProcesser = serviceProvider.GetService(); - Subject = new WorkflowExecutor(Registry, serviceProvider, new DateTimeProvider(), loggerFactory); + Subject = new WorkflowExecutor(Registry, serviceProvider, new DateTimeProvider(), ResultProcesser, Options, loggerFactory); } [Fact] @@ -106,7 +113,7 @@ public void EventSubscribe() instance.ExecutionPointers.Add(executionPointer); //act - Subject.Execute(instance, Options); + Subject.Execute(instance); //assert executionPointer.EventName.Should().Be("MyEvent"); @@ -135,8 +142,8 @@ public void StepExecution() }); //act - Subject.Execute(instance, Options); - Subject.Execute(instance, Options); + Subject.Execute(instance); + Subject.Execute(instance); //assert StepExecutionTestWorkflow.Step1StepTicker.Should().Be(1);