From df3a00bf9df5ad1eb0fc3ec6fe87cda351f5d948 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Wed, 13 Feb 2019 17:34:34 -0800 Subject: [PATCH 1/2] wip --- .../ErrorHandlers/CompensateHandler.cs | 18 +- .../Scenarios/NestedRetrySagaScenario.cs | 90 ++++++++++ .../RetrySagaWithUserTaskScenario.cs | 160 ++++++++++++++++++ 3 files changed, 263 insertions(+), 5 deletions(-) create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs diff --git a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs index de3e98c01..128b7aba5 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs @@ -45,11 +45,19 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP if (scope.Any()) { - var parentId = scope.Peek(); - var parentPointer = workflow.ExecutionPointers.FindById(parentId); - var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); - resume = parentStep.ResumeChildrenAfterCompensation; - revert = parentStep.RevertChildrenAfterCompensation; + var txnStack = new Stack(scope.Reverse()); + while (txnStack.Count > 0) + { + var parentId = txnStack.Pop(); + var parentPointer = workflow.ExecutionPointers.FindById(parentId); + var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); + if ((resume != parentStep.ResumeChildrenAfterCompensation) || (revert != parentStep.RevertChildrenAfterCompensation)) + { + resume = parentStep.ResumeChildrenAfterCompensation; + revert = parentStep.RevertChildrenAfterCompensation; + break; + } + } } if ((scopeStep.ErrorBehavior ?? WorkflowErrorHandling.Compensate) != WorkflowErrorHandling.Compensate) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs new file mode 100644 index 000000000..eee0bf9f4 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class NestedRetrySagaScenario : WorkflowTest + { + public class MyDataClass + { + } + + public class Workflow : IWorkflow + { + public static int Event1Fired; + public static int Event2Fired; + public static int Event3Fired; + public static int TailEventFired; + public static int Compensation1Fired; + public static int Compensation2Fired; + public static int Compensation3Fired; + public static int Compensation4Fired; + + public string Id => "RetrySagaWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation1Fired++) + .Saga(x => x + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation2Fired++) + .If(data => true) + .Do(i => i + .StartWith(context => + { + Event1Fired++; + if (Event1Fired < 3) + throw new Exception(); + Event2Fired++; + }) + .CompensateWith(context => Compensation3Fired++) + ) + .Then(context => Event3Fired++) + .CompensateWith(context => Compensation4Fired++) + ) + .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1)) + .Then(context => TailEventFired++); + } + } + + public NestedRetrySagaScenario() + { + Setup(); + Workflow.Event1Fired = 0; + Workflow.Event2Fired = 0; + Workflow.Event3Fired = 0; + Workflow.Compensation1Fired = 0; + Workflow.Compensation2Fired = 0; + Workflow.Compensation3Fired = 0; + Workflow.Compensation4Fired = 0; + Workflow.TailEventFired = 0; + } + + [Fact] + public void Scenario() + { + var workflowId = StartWorkflow(new MyDataClass()); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(2); + Workflow.Event1Fired.Should().Be(3); + Workflow.Event2Fired.Should().Be(1); + Workflow.Event3Fired.Should().Be(1); + Workflow.Compensation1Fired.Should().Be(0); + Workflow.Compensation2Fired.Should().Be(2); + Workflow.Compensation3Fired.Should().Be(2); + Workflow.Compensation4Fired.Should().Be(0); + Workflow.TailEventFired.Should().Be(1); + } + } +} diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs new file mode 100644 index 000000000..52ab2b948 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; +using System.Threading.Tasks; + +using WorkflowCore.Testing; +using WorkflowCore.Users.Models; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class RetrySagaWithUserTaskScenario : WorkflowTest + { + public class MyDataClass + { + } + + public class Workflow : IWorkflow + { + public static int Event1Fired; + public static int Event2Fired; + public static int Event3Fired; + public static int TailEventFired; + public static int Compensation1Fired; + public static int Compensation2Fired; + public static int Compensation3Fired; + public static int Compensation4Fired; + + public string Id => "RetrySagaWithUserTaskWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation1Fired++) + .Saga(x => x + .StartWith(context => ExecutionResult.Next()) + .CompensateWith(context => Compensation2Fired++) + .UserTask("prompt", data => "assigner") + .WithOption("a", "Option A") + .Do(wb => wb + .StartWith(context => ExecutionResult.Next()) + .Then(context => + { + Event1Fired++; + if (Event1Fired < 3) + throw new Exception(); + Event2Fired++; + }) + .Name("Event1+2") + .CompensateWith(context => Compensation3Fired++) + .Then(context => + { + Event3Fired++; + }).Name("Event3") + + .CompensateWith(context => Compensation4Fired++) + ).Name("Do") + ).Name("saga") + .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1)) + .Then(context => TailEventFired++); + } + } + + public RetrySagaWithUserTaskScenario() + { + Setup(); + Workflow.Event1Fired = 0; + Workflow.Event2Fired = 0; + Workflow.Event3Fired = 0; + Workflow.Compensation1Fired = 0; + Workflow.Compensation2Fired = 0; + Workflow.Compensation3Fired = 0; + Workflow.Compensation4Fired = 0; + Workflow.TailEventFired = 0; + } + + [Fact] + public async Task Scenario() + { + var workflowId = StartWorkflow(new MyDataClass()); + var instance = await Host.PersistenceStore.GetWorkflowInstance(workflowId); + + string oldUserOptionKey = null; + for (var i = 0; i != 3; ++i) + { + var userOptions = await WaitForDifferentUserStepAsync(instance, TimeSpan.FromSeconds(1), oldUserOptionKey); + userOptions.Count.Should().Be(1); + + var userOption = userOptions.Single(); + userOption.Prompt.Should().Be("prompt"); + userOption.AssignedPrincipal.Should().Be("assigner"); + userOption.Options.Count.Should().Be(1); + + var selectionOption = userOption.Options.Single(); + selectionOption.Key.Should().Be("Option A"); + selectionOption.Value.Should().Be("a"); + await Host.PublishUserAction(userOption.Key, string.Empty, selectionOption.Value); + + oldUserOptionKey = userOption.Key; + } + + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(2); + Workflow.Event1Fired.Should().Be(3); + Workflow.Event2Fired.Should().Be(1); + Workflow.Event3Fired.Should().Be(1); + Workflow.Compensation1Fired.Should().Be(0); + Workflow.Compensation2Fired.Should().Be(2); + Workflow.Compensation3Fired.Should().Be(2); + Workflow.Compensation4Fired.Should().Be(0); + Workflow.TailEventFired.Should().Be(1); + } + + private static async Task> WaitForDifferentUserStepAsync( + WorkflowInstance instance, + TimeSpan timeout, + string oldUserActionKey = null) + { + var startTime = DateTime.UtcNow; + + while (DateTime.UtcNow - startTime <= timeout) + { + var userActions = await WaitForUserStepAsync(instance); + + if (oldUserActionKey != null && userActions.Any(x => x.Key == oldUserActionKey)) + { + continue; + } + + return userActions; + } + + return Array.Empty(); + } + + private static async Task> WaitForUserStepAsync(WorkflowInstance instance) + { + var delayCount = 200; + var openActions = instance.GetOpenUserActions()?.ToList(); + while ((openActions?.Count ?? 0) == 0) + { + await Task.Delay(TimeSpan.FromMilliseconds(10)); + openActions = instance.GetOpenUserActions()?.ToList(); + if (delayCount-- == 0) + { + break; + } + } + + return openActions; + } + } +} \ No newline at end of file From 6a97665e5f6228c00abe87ef0b1fb30fc297f223 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Thu, 14 Feb 2019 17:24:15 -0800 Subject: [PATCH 2/2] wip --- .../ErrorHandlers/CompensateHandler.cs | 37 ++++++++---------- .../Services/ExecutionPointerFactory.cs | 5 +-- src/WorkflowCore/WorkflowCore.csproj | 6 +-- .../WorkflowCore.Users/Primitives/UserTask.cs | 4 +- .../WorkflowCore.Users.csproj | 6 +-- .../Scenarios/NestedRetrySagaScenario.cs | 2 +- .../RetrySagaWithUserTaskScenario.cs | 39 ++++++++----------- 7 files changed, 44 insertions(+), 55 deletions(-) diff --git a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs index 128b7aba5..a6708ad08 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs @@ -27,13 +27,9 @@ public CompensateHandler(IExecutionPointerFactory pointerFactory, ILifeCycleEven public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer exceptionPointer, WorkflowStep exceptionStep, Exception exception, Queue bubbleUpQueue) { - var scope = new Stack(exceptionPointer.Scope); + var scope = new Stack(exceptionPointer.Scope.Reverse()); scope.Push(exceptionPointer.Id); - - exceptionPointer.Active = false; - exceptionPointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); - exceptionPointer.Status = PointerStatus.Failed; - + while (scope.Any()) { var pointerId = scope.Pop(); @@ -42,21 +38,18 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP var resume = true; var revert = false; - - if (scope.Any()) + + var txnStack = new Stack(scope.Reverse()); + while (txnStack.Count > 0) { - var txnStack = new Stack(scope.Reverse()); - while (txnStack.Count > 0) + var parentId = txnStack.Pop(); + var parentPointer = workflow.ExecutionPointers.FindById(parentId); + var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); + if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation)) { - var parentId = txnStack.Pop(); - var parentPointer = workflow.ExecutionPointers.FindById(parentId); - var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); - if ((resume != parentStep.ResumeChildrenAfterCompensation) || (revert != parentStep.RevertChildrenAfterCompensation)) - { - resume = parentStep.ResumeChildrenAfterCompensation; - revert = parentStep.RevertChildrenAfterCompensation; - break; - } + resume = parentStep.ResumeChildrenAfterCompensation; + revert = parentStep.RevertChildrenAfterCompensation; + break; } } @@ -66,10 +59,12 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP continue; } + scopePointer.Active = false; + scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + scopePointer.Status = PointerStatus.Failed; + if (scopeStep.CompensationStepId.HasValue) { - scopePointer.Active = false; - scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); scopePointer.Status = PointerStatus.Compensated; var compensationPointer = _pointerFactory.BuildCompensationPointer(def, scopePointer, exceptionPointer, scopeStep.CompensationStepId.Value); diff --git a/src/WorkflowCore/Services/ExecutionPointerFactory.cs b/src/WorkflowCore/Services/ExecutionPointerFactory.cs index 927e06e4e..43ff0b5cc 100644 --- a/src/WorkflowCore/Services/ExecutionPointerFactory.cs +++ b/src/WorkflowCore/Services/ExecutionPointerFactory.cs @@ -9,7 +9,6 @@ namespace WorkflowCore.Services { public class ExecutionPointerFactory : IExecutionPointerFactory { - public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def) { return new ExecutionPointer @@ -41,8 +40,8 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch) { var childPointerId = GenerateId(); - var childScope = new Stack(pointer.Scope); - childScope.Push(pointer.Id); + var childScope = new List(pointer.Scope); + childScope.Insert(0, pointer.Id); pointer.Children.Add(childPointerId); return new ExecutionPointer() diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index aaef3c388..55b2e2337 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -15,9 +15,9 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 1.8.1 - 1.8.1.0 - 1.8.1.0 + 1.8.2 + 1.8.2.0 + 1.8.2.0 https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png diff --git a/src/extensions/WorkflowCore.Users/Primitives/UserTask.cs b/src/extensions/WorkflowCore.Users/Primitives/UserTask.cs index 536e56d84..5f67ca443 100644 --- a/src/extensions/WorkflowCore.Users/Primitives/UserTask.cs +++ b/src/extensions/WorkflowCore.Users/Primitives/UserTask.cs @@ -80,7 +80,9 @@ private void SetupEscalations(IStepExecutionContext context) Id = Guid.NewGuid().ToString(), PredecessorId = context.ExecutionPointer.Id, StepId = esc.Id, - StepName = esc.Name + StepName = esc.Name, + Status = PointerStatus.Pending, + Scope = new List(context.ExecutionPointer.Scope) }); } } diff --git a/src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj b/src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj index 8c7bd192d..9bb4c502f 100644 --- a/src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj +++ b/src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj @@ -18,9 +18,9 @@ false false Provides extensions for Workflow Core to enable human workflows. - 1.8.0 - 1.8.0.0 - 1.8.0.0 + 1.8.2 + 1.8.2.0 + 1.8.2.0 diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs index eee0bf9f4..dd4d2d720 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/NestedRetrySagaScenario.cs @@ -27,7 +27,7 @@ public class Workflow : IWorkflow public static int Compensation3Fired; public static int Compensation4Fired; - public string Id => "RetrySagaWorkflow"; + public string Id => "NestedRetrySagaWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder builder) { diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs index 52ab2b948..cf397fd8c 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/RetrySagaWithUserTaskScenario.cs @@ -1,13 +1,11 @@ using System; using System.Collections.Generic; -using System.Text; using WorkflowCore.Interface; using WorkflowCore.Models; using Xunit; using FluentAssertions; using System.Linq; using System.Threading.Tasks; - using WorkflowCore.Testing; using WorkflowCore.Users.Models; @@ -39,28 +37,23 @@ public void Build(IWorkflowBuilder builder) .CompensateWith(context => Compensation1Fired++) .Saga(x => x .StartWith(context => ExecutionResult.Next()) - .CompensateWith(context => Compensation2Fired++) + .CompensateWith(context => Compensation2Fired++) .UserTask("prompt", data => "assigner") - .WithOption("a", "Option A") - .Do(wb => wb - .StartWith(context => ExecutionResult.Next()) - .Then(context => - { - Event1Fired++; - if (Event1Fired < 3) - throw new Exception(); - Event2Fired++; - }) - .Name("Event1+2") - .CompensateWith(context => Compensation3Fired++) - .Then(context => - { - Event3Fired++; - }).Name("Event3") - - .CompensateWith(context => Compensation4Fired++) - ).Name("Do") - ).Name("saga") + .WithOption("a", "Option A") + .Do(wb => wb + .StartWith(context => ExecutionResult.Next()) + .Then(context => + { + Event1Fired++; + if (Event1Fired < 3) + throw new Exception(); + Event2Fired++; + }) + .CompensateWith(context => Compensation3Fired++) + .Then(context => Event3Fired++) + .CompensateWith(context => Compensation4Fired++) + ) + ) .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1)) .Then(context => TailEventFired++); }