From 650105a1fc95d9a1f34280ecbee26fdcef031cf2 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Wed, 31 May 2017 20:43:40 -0700 Subject: [PATCH 1/3] Parallel steps --- WorkflowCore.sln | 7 +++ .../Interface/IParallelStepBuilder.cs | 14 ++++++ src/WorkflowCore/Interface/IStepBuilder.cs | 2 + .../Services/ParallelStepBuilder.cs | 45 +++++++++++++++++++ src/WorkflowCore/Services/StepBuilder.cs | 10 +++++ .../WorkflowCore.Sample13/ParallelWorkflow.cs | 43 ++++++++++++++++++ src/samples/WorkflowCore.Sample13/Program.cs | 45 +++++++++++++++++++ .../Steps/PrintMessage.cs | 19 ++++++++ .../WorkflowCore.Sample13/Steps/SayGoodbye.cs | 17 +++++++ .../WorkflowCore.Sample13/Steps/SayHello.cs | 17 +++++++ .../WorkflowCore.Sample13.csproj | 18 ++++++++ 11 files changed, 237 insertions(+) create mode 100644 src/WorkflowCore/Interface/IParallelStepBuilder.cs create mode 100644 src/WorkflowCore/Services/ParallelStepBuilder.cs create mode 100644 src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs create mode 100644 src/samples/WorkflowCore.Sample13/Program.cs create mode 100644 src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs create mode 100644 src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs create mode 100644 src/samples/WorkflowCore.Sample13/Steps/SayHello.cs create mode 100644 src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj diff --git a/WorkflowCore.sln b/WorkflowCore.sln index 6fdcee18b..2514b44a6 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -86,6 +86,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.Sqlite", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.LockProviders.SqlServer", "src\providers\WorkflowCore.LockProviders.SqlServer\WorkflowCore.LockProviders.SqlServer.csproj", "{AAE2E9F9-37EF-4AE1-A200-D37417C9040C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample13", "src\samples\WorkflowCore.Sample13\WorkflowCore.Sample13.csproj", "{77C49ACA-203E-428C-A4DB-114DFE454988}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -228,6 +230,10 @@ Global {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Debug|Any CPU.Build.0 = Debug|Any CPU {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Release|Any CPU.ActiveCfg = Release|Any CPU {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Release|Any CPU.Build.0 = Release|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.Build.0 = Debug|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.ActiveCfg = Release|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -270,5 +276,6 @@ Global {BB776411-D279-419F-8697-5C6F52BCD5CD} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {F9F8F9CD-01D9-468B-856D-6A87F0762A01} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} {AAE2E9F9-37EF-4AE1-A200-D37417C9040C} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2} + {77C49ACA-203E-428C-A4DB-114DFE454988} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} EndGlobalSection EndGlobal diff --git a/src/WorkflowCore/Interface/IParallelStepBuilder.cs b/src/WorkflowCore/Interface/IParallelStepBuilder.cs new file mode 100644 index 000000000..c8e40936c --- /dev/null +++ b/src/WorkflowCore/Interface/IParallelStepBuilder.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkflowCore.Interface +{ + public interface IParallelStepBuilder + where TStepBody : IStepBody + where TReturnStep : IStepBody + { + IParallelStepBuilder Do(Action> builder); + IStepBuilder Join(); + } +} diff --git a/src/WorkflowCore/Interface/IStepBuilder.cs b/src/WorkflowCore/Interface/IStepBuilder.cs index 172c28e01..da73441e2 100644 --- a/src/WorkflowCore/Interface/IStepBuilder.cs +++ b/src/WorkflowCore/Interface/IStepBuilder.cs @@ -101,5 +101,7 @@ public interface IStepBuilder /// /// IContainerStepBuilder When(Expression> outcomeValue, string label = null); + + IParallelStepBuilder Parallel(); } } \ No newline at end of file diff --git a/src/WorkflowCore/Services/ParallelStepBuilder.cs b/src/WorkflowCore/Services/ParallelStepBuilder.cs new file mode 100644 index 000000000..f6f395fbd --- /dev/null +++ b/src/WorkflowCore/Services/ParallelStepBuilder.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Primitives; + +namespace WorkflowCore.Services +{ + public class ParallelStepBuilder : IParallelStepBuilder + where TStepBody : IStepBody + where TParentStep : IStepBody + { + private readonly IStepBuilder _referenceBuilder; + private readonly IStepBuilder _stepBuilder; + + public IWorkflowBuilder WorkflowBuilder { get; private set; } + + public WorkflowStep Step { get; set; } + + public ParallelStepBuilder(IWorkflowBuilder workflowBuilder, IStepBuilder stepBuilder, IStepBuilder referenceBuilder) + { + WorkflowBuilder = workflowBuilder; + Step = stepBuilder.Step; + _stepBuilder = stepBuilder; + _referenceBuilder = referenceBuilder; + } + + public IParallelStepBuilder Do(Action> builder) + { + builder.Invoke(WorkflowBuilder); + Step.Children.Add(Step.Id + 1); //TODO: make more elegant + + return this; + } + + public IStepBuilder Join() + { + return _referenceBuilder; + } + } +} diff --git a/src/WorkflowCore/Services/StepBuilder.cs b/src/WorkflowCore/Services/StepBuilder.cs index 7bcd777d9..432d5a32d 100644 --- a/src/WorkflowCore/Services/StepBuilder.cs +++ b/src/WorkflowCore/Services/StepBuilder.cs @@ -250,6 +250,16 @@ public IContainerStepBuilder When(Expression Parallel() + { + var newStep = new WorkflowStep(); + var newBuilder = new StepBuilder(WorkflowBuilder, newStep); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new ParallelStepBuilder(WorkflowBuilder, newBuilder, this); + + return stepBuilder; + } + public IStepBuilder Do(Action> builder) { builder.Invoke(WorkflowBuilder); diff --git a/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs b/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs new file mode 100644 index 000000000..d885f9f55 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class ParallelWorkflow : IWorkflow + { + public string Id => "parallel-sample"; + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith() + .Parallel() + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 1.1") + .Then() + .Input(step => step.Message, data => "Item 1.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 2.1") + .Then() + .Input(step => step.Message, data => "Item 2.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 3.1") + .Then() + .Input(step => step.Message, data => "Item 3.2")) + .Join() + .Then(); + } + } + + public class MyData + { + public int Counter { get; set; } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Program.cs b/src/samples/WorkflowCore.Sample13/Program.cs new file mode 100644 index 000000000..81d03eed1 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Program.cs @@ -0,0 +1,45 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; + +namespace WorkflowCore.Sample13 +{ + class Program + { + public static void Main(string[] args) + { + IServiceProvider serviceProvider = ConfigureServices(); + + //start the workflow host + var host = serviceProvider.GetService(); + host.RegisterWorkflow(); + host.Start(); + + Console.WriteLine("Starting workflow..."); + string workflowId = host.StartWorkflow("parallel-sample").Result; + + Console.ReadLine(); + host.Stop(); + } + + private static IServiceProvider ConfigureServices() + { + //setup dependency injection + IServiceCollection services = new ServiceCollection(); + services.AddLogging(); + services.AddWorkflow(); + //services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow-test002")); + //services.AddWorkflow(x => x.UseSqlServer(@"Server=.\SQLEXPRESS;Database=WorkflowCoreTest001;Trusted_Connection=True;", true, true)); + //services.AddWorkflow(x => x.UseSqlite(@"Data Source=wfc001.db;", true)); + + + var serviceProvider = services.BuildServiceProvider(); + + //config logging + var loggerFactory = serviceProvider.GetService(); + //loggerFactory.AddDebug(LogLevel.Debug); + return serviceProvider; + } + } +} \ No newline at end of file diff --git a/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs b/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs new file mode 100644 index 000000000..dbb59578f --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class PrintMessage : StepBody + { + public string Message { get; set; } + + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine(Message); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs b/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs new file mode 100644 index 000000000..b7779d3b8 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class SayGoodbye : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Goodbye"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs b/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs new file mode 100644 index 000000000..bf174e0c0 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class SayHello : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Hello"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj b/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj new file mode 100644 index 000000000..87628eb2f --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj @@ -0,0 +1,18 @@ + + + + Exe + netcoreapp1.1 + + + + + + + + + + + + + \ No newline at end of file From 4ce7524fff98d8bf2d8c390d09b3be243209f63c Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Sat, 3 Jun 2017 10:03:14 -0700 Subject: [PATCH 2/3] parallel steps --- .../Interface/IParallelStepBuilder.cs | 8 ++++---- src/WorkflowCore/Interface/IStepBuilder.cs | 2 +- src/WorkflowCore/Interface/IWorkflowBuilder.cs | 5 +++-- src/WorkflowCore/Services/ParallelStepBuilder.cs | 16 ++++++++-------- src/WorkflowCore/Services/StepBuilder.cs | 8 +++++--- src/WorkflowCore/Services/WorkflowBuilder.cs | 4 ++-- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/WorkflowCore/Interface/IParallelStepBuilder.cs b/src/WorkflowCore/Interface/IParallelStepBuilder.cs index c8e40936c..fa50cfab9 100644 --- a/src/WorkflowCore/Interface/IParallelStepBuilder.cs +++ b/src/WorkflowCore/Interface/IParallelStepBuilder.cs @@ -1,14 +1,14 @@ using System; using System.Collections.Generic; using System.Text; +using WorkflowCore.Primitives; namespace WorkflowCore.Interface { - public interface IParallelStepBuilder + public interface IParallelStepBuilder where TStepBody : IStepBody - where TReturnStep : IStepBody { - IParallelStepBuilder Do(Action> builder); - IStepBuilder Join(); + IParallelStepBuilder Do(Action> builder); + IStepBuilder Join(); } } diff --git a/src/WorkflowCore/Interface/IStepBuilder.cs b/src/WorkflowCore/Interface/IStepBuilder.cs index da73441e2..c70b7af59 100644 --- a/src/WorkflowCore/Interface/IStepBuilder.cs +++ b/src/WorkflowCore/Interface/IStepBuilder.cs @@ -102,6 +102,6 @@ public interface IStepBuilder /// IContainerStepBuilder When(Expression> outcomeValue, string label = null); - IParallelStepBuilder Parallel(); + IParallelStepBuilder Parallel(); } } \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IWorkflowBuilder.cs b/src/WorkflowCore/Interface/IWorkflowBuilder.cs index 599a7c68d..f23a96009 100644 --- a/src/WorkflowCore/Interface/IWorkflowBuilder.cs +++ b/src/WorkflowCore/Interface/IWorkflowBuilder.cs @@ -8,13 +8,14 @@ namespace WorkflowCore.Interface { public interface IWorkflowBuilder { - int InitialStep { get; set; } + int LastStep { get; } IWorkflowBuilder UseData(); WorkflowDefinition Build(string id, int version); - void AddStep(WorkflowStep step); + void AddStep(WorkflowStep step); + } public interface IWorkflowBuilder : IWorkflowBuilder diff --git a/src/WorkflowCore/Services/ParallelStepBuilder.cs b/src/WorkflowCore/Services/ParallelStepBuilder.cs index f6f395fbd..93f34055d 100644 --- a/src/WorkflowCore/Services/ParallelStepBuilder.cs +++ b/src/WorkflowCore/Services/ParallelStepBuilder.cs @@ -10,18 +10,17 @@ namespace WorkflowCore.Services { - public class ParallelStepBuilder : IParallelStepBuilder + public class ParallelStepBuilder : IParallelStepBuilder where TStepBody : IStepBody - where TParentStep : IStepBody { - private readonly IStepBuilder _referenceBuilder; + private readonly IStepBuilder _referenceBuilder; private readonly IStepBuilder _stepBuilder; public IWorkflowBuilder WorkflowBuilder { get; private set; } public WorkflowStep Step { get; set; } - public ParallelStepBuilder(IWorkflowBuilder workflowBuilder, IStepBuilder stepBuilder, IStepBuilder referenceBuilder) + public ParallelStepBuilder(IWorkflowBuilder workflowBuilder, IStepBuilder stepBuilder, IStepBuilder referenceBuilder) { WorkflowBuilder = workflowBuilder; Step = stepBuilder.Step; @@ -29,15 +28,16 @@ public ParallelStepBuilder(IWorkflowBuilder workflowBuilder, IStepBuilder _referenceBuilder = referenceBuilder; } - public IParallelStepBuilder Do(Action> builder) + public IParallelStepBuilder Do(Action> builder) { - builder.Invoke(WorkflowBuilder); - Step.Children.Add(Step.Id + 1); //TODO: make more elegant + int lastStep = WorkflowBuilder.LastStep; + builder.Invoke(WorkflowBuilder); + Step.Children.Add(lastStep + 1); //TODO: make more elegant return this; } - public IStepBuilder Join() + public IStepBuilder Join() { return _referenceBuilder; } diff --git a/src/WorkflowCore/Services/StepBuilder.cs b/src/WorkflowCore/Services/StepBuilder.cs index 432d5a32d..5fac10ba8 100644 --- a/src/WorkflowCore/Services/StepBuilder.cs +++ b/src/WorkflowCore/Services/StepBuilder.cs @@ -250,13 +250,15 @@ public IContainerStepBuilder When(Expression Parallel() + public IParallelStepBuilder Parallel() { var newStep = new WorkflowStep(); var newBuilder = new StepBuilder(WorkflowBuilder, newStep); WorkflowBuilder.AddStep(newStep); - var stepBuilder = new ParallelStepBuilder(WorkflowBuilder, newBuilder, this); - + var stepBuilder = new ParallelStepBuilder(WorkflowBuilder, newBuilder, newBuilder); + + Step.Outcomes.Add(new StepOutcome() { NextStep = newStep.Id }); + return stepBuilder; } diff --git a/src/WorkflowCore/Services/WorkflowBuilder.cs b/src/WorkflowCore/Services/WorkflowBuilder.cs index 6b80ce9c1..af6793a28 100644 --- a/src/WorkflowCore/Services/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/WorkflowBuilder.cs @@ -10,14 +10,14 @@ namespace WorkflowCore.Services { public class WorkflowBuilder : IWorkflowBuilder { - public int InitialStep { get; set; } - protected List Steps { get; set; } = new List(); protected WorkflowErrorHandling DefaultErrorBehavior = WorkflowErrorHandling.Retry; protected TimeSpan? DefaultErrorRetryInterval; + public int LastStep => Steps.Max(x => x.Id); + public IWorkflowBuilder UseData() { IWorkflowBuilder result = new WorkflowBuilder(Steps); From 724a95d6fb32b858e9c339e071b75e760ee5bf66 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Sat, 3 Jun 2017 14:21:49 -0700 Subject: [PATCH 3/3] Parallel steps --- src/WorkflowCore/WorkflowCore.csproj | 7 +- src/samples/WorkflowCore.Sample13/Program.cs | 2 +- src/samples/WorkflowCore.Sample13/README.md | 27 ++++ .../Scenarios/ParallelScenario.cs | 120 ++++++++++++++++++ 4 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 src/samples/WorkflowCore.Sample13/README.md create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 3fa629142..1f7e3f11e 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -18,9 +18,10 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 1.2.5 - 1.2.5.0 - 1.2.5.0 + 1.2.6 + 1.2.6.0 + 1.2.6.0 + Added Parallel steps diff --git a/src/samples/WorkflowCore.Sample13/Program.cs b/src/samples/WorkflowCore.Sample13/Program.cs index 81d03eed1..df2a97a9c 100644 --- a/src/samples/WorkflowCore.Sample13/Program.cs +++ b/src/samples/WorkflowCore.Sample13/Program.cs @@ -18,7 +18,7 @@ public static void Main(string[] args) Console.WriteLine("Starting workflow..."); string workflowId = host.StartWorkflow("parallel-sample").Result; - + Console.ReadLine(); host.Stop(); } diff --git a/src/samples/WorkflowCore.Sample13/README.md b/src/samples/WorkflowCore.Sample13/README.md new file mode 100644 index 000000000..f7eceff2e --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/README.md @@ -0,0 +1,27 @@ +# Parallel sample + +Illustrates how to run several branches of steps in parallel, and then join once all are complete. + + +```c# +builder + .StartWith() + .Parallel() + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 1.1") + .Then() + .Input(step => step.Message, data => "Item 1.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 2.1") + .Then() + .Input(step => step.Message, data => "Item 2.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 3.1") + .Then() + .Input(step => step.Message, data => "Item 3.2")) + .Join() + .Then(); +``` diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs new file mode 100644 index 000000000..4a248eb44 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class ParallelScenario : BaseScenario + { + private static int StartStepTicker = 0; + private static int EndStepTicker = 0; + + private static int Step11Ticker = 0; + private static int Step12Ticker = 0; + private static int Step21Ticker = 0; + private static int Step22Ticker = 0; + private static int Step31Ticker = 0; + private static int Step32Ticker = 0; + + public class MyDataClass + { + } + + public class ParallelWorkflow : IWorkflow + { + public string Id => "ParallelWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(x => + { + StartStepTicker++; + return ExecutionResult.Next(); + }) + .Parallel() + .Do(then => + then.StartWith(x => + { + Step11Ticker++; + return ExecutionResult.Next(); + }) + .Then(x => + { + Step12Ticker++; + return ExecutionResult.Next(); + })) + .Do(then => + then.StartWith(x => + { + Step21Ticker++; + return ExecutionResult.Next(); + }) + .WaitFor("MyEvent", data => "0") + .Then(x => + { + Step22Ticker++; + return ExecutionResult.Next(); + })) + .Do(then => + then.StartWith(x => + { + Step31Ticker++; + return ExecutionResult.Next(); + }) + .Then(x => + { + Step32Ticker++; + return ExecutionResult.Next(); + })) + .Join() + .Then(x => + { + EndStepTicker++; + return ExecutionResult.Next(); + }); + } + } + + [Fact] + public void Scenario() + { + var workflowId = Host.StartWorkflow("ParallelWorkflow", new MyDataClass()).Result; + + int counter = 0; + while ((Step12Ticker == 0) && (Step32Ticker == 0) && (PersistenceProvider.GetSubcriptions("MyEvent", "0", DateTime.MaxValue).Result.Count() == 0) && (counter < 150)) + { + System.Threading.Thread.Sleep(200); + counter++; + } + + Step22Ticker.Should().Be(0); + + Host.PublishEvent("MyEvent", "0", "Pass"); + + var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result; + counter = 0; + while ((instance.Status == WorkflowStatus.Runnable) && (counter < 150)) + { + System.Threading.Thread.Sleep(200); + counter++; + instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result; + } + + instance.Status.Should().Be(WorkflowStatus.Complete); + StartStepTicker.Should().Be(1); + EndStepTicker.Should().Be(1); + Step11Ticker.Should().Be(1); + Step12Ticker.Should().Be(1); + Step21Ticker.Should().Be(1); + Step22Ticker.Should().Be(1); + Step31Ticker.Should().Be(1); + Step32Ticker.Should().Be(1); + } + } +}