diff --git a/WorkflowCore.sln b/WorkflowCore.sln index 6fdcee18b..2514b44a6 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -86,6 +86,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.Sqlite", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.LockProviders.SqlServer", "src\providers\WorkflowCore.LockProviders.SqlServer\WorkflowCore.LockProviders.SqlServer.csproj", "{AAE2E9F9-37EF-4AE1-A200-D37417C9040C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample13", "src\samples\WorkflowCore.Sample13\WorkflowCore.Sample13.csproj", "{77C49ACA-203E-428C-A4DB-114DFE454988}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -228,6 +230,10 @@ Global {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Debug|Any CPU.Build.0 = Debug|Any CPU {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Release|Any CPU.ActiveCfg = Release|Any CPU {AAE2E9F9-37EF-4AE1-A200-D37417C9040C}.Release|Any CPU.Build.0 = Release|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Debug|Any CPU.Build.0 = Debug|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.ActiveCfg = Release|Any CPU + {77C49ACA-203E-428C-A4DB-114DFE454988}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -270,5 +276,6 @@ Global {BB776411-D279-419F-8697-5C6F52BCD5CD} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {F9F8F9CD-01D9-468B-856D-6A87F0762A01} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} {AAE2E9F9-37EF-4AE1-A200-D37417C9040C} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2} + {77C49ACA-203E-428C-A4DB-114DFE454988} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} EndGlobalSection EndGlobal diff --git a/src/WorkflowCore/Interface/IParallelStepBuilder.cs b/src/WorkflowCore/Interface/IParallelStepBuilder.cs new file mode 100644 index 000000000..fa50cfab9 --- /dev/null +++ b/src/WorkflowCore/Interface/IParallelStepBuilder.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Primitives; + +namespace WorkflowCore.Interface +{ + public interface IParallelStepBuilder + where TStepBody : IStepBody + { + IParallelStepBuilder Do(Action> builder); + IStepBuilder Join(); + } +} diff --git a/src/WorkflowCore/Interface/IStepBuilder.cs b/src/WorkflowCore/Interface/IStepBuilder.cs index 172c28e01..c70b7af59 100644 --- a/src/WorkflowCore/Interface/IStepBuilder.cs +++ b/src/WorkflowCore/Interface/IStepBuilder.cs @@ -101,5 +101,7 @@ public interface IStepBuilder /// /// IContainerStepBuilder When(Expression> outcomeValue, string label = null); + + IParallelStepBuilder Parallel(); } } \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IWorkflowBuilder.cs b/src/WorkflowCore/Interface/IWorkflowBuilder.cs index 599a7c68d..f23a96009 100644 --- a/src/WorkflowCore/Interface/IWorkflowBuilder.cs +++ b/src/WorkflowCore/Interface/IWorkflowBuilder.cs @@ -8,13 +8,14 @@ namespace WorkflowCore.Interface { public interface IWorkflowBuilder { - int InitialStep { get; set; } + int LastStep { get; } IWorkflowBuilder UseData(); WorkflowDefinition Build(string id, int version); - void AddStep(WorkflowStep step); + void AddStep(WorkflowStep step); + } public interface IWorkflowBuilder : IWorkflowBuilder diff --git a/src/WorkflowCore/Services/ParallelStepBuilder.cs b/src/WorkflowCore/Services/ParallelStepBuilder.cs new file mode 100644 index 000000000..93f34055d --- /dev/null +++ b/src/WorkflowCore/Services/ParallelStepBuilder.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Threading.Tasks; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Primitives; + +namespace WorkflowCore.Services +{ + public class ParallelStepBuilder : IParallelStepBuilder + where TStepBody : IStepBody + { + private readonly IStepBuilder _referenceBuilder; + private readonly IStepBuilder _stepBuilder; + + public IWorkflowBuilder WorkflowBuilder { get; private set; } + + public WorkflowStep Step { get; set; } + + public ParallelStepBuilder(IWorkflowBuilder workflowBuilder, IStepBuilder stepBuilder, IStepBuilder referenceBuilder) + { + WorkflowBuilder = workflowBuilder; + Step = stepBuilder.Step; + _stepBuilder = stepBuilder; + _referenceBuilder = referenceBuilder; + } + + public IParallelStepBuilder Do(Action> builder) + { + int lastStep = WorkflowBuilder.LastStep; + builder.Invoke(WorkflowBuilder); + Step.Children.Add(lastStep + 1); //TODO: make more elegant + + return this; + } + + public IStepBuilder Join() + { + return _referenceBuilder; + } + } +} diff --git a/src/WorkflowCore/Services/StepBuilder.cs b/src/WorkflowCore/Services/StepBuilder.cs index 7bcd777d9..5fac10ba8 100644 --- a/src/WorkflowCore/Services/StepBuilder.cs +++ b/src/WorkflowCore/Services/StepBuilder.cs @@ -250,6 +250,18 @@ public IContainerStepBuilder When(Expression Parallel() + { + var newStep = new WorkflowStep(); + var newBuilder = new StepBuilder(WorkflowBuilder, newStep); + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new ParallelStepBuilder(WorkflowBuilder, newBuilder, newBuilder); + + Step.Outcomes.Add(new StepOutcome() { NextStep = newStep.Id }); + + return stepBuilder; + } + public IStepBuilder Do(Action> builder) { builder.Invoke(WorkflowBuilder); diff --git a/src/WorkflowCore/Services/WorkflowBuilder.cs b/src/WorkflowCore/Services/WorkflowBuilder.cs index 6b80ce9c1..af6793a28 100644 --- a/src/WorkflowCore/Services/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/WorkflowBuilder.cs @@ -10,14 +10,14 @@ namespace WorkflowCore.Services { public class WorkflowBuilder : IWorkflowBuilder { - public int InitialStep { get; set; } - protected List Steps { get; set; } = new List(); protected WorkflowErrorHandling DefaultErrorBehavior = WorkflowErrorHandling.Retry; protected TimeSpan? DefaultErrorRetryInterval; + public int LastStep => Steps.Max(x => x.Id); + public IWorkflowBuilder UseData() { IWorkflowBuilder result = new WorkflowBuilder(Steps); diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 3fa629142..1f7e3f11e 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -18,9 +18,10 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 1.2.5 - 1.2.5.0 - 1.2.5.0 + 1.2.6 + 1.2.6.0 + 1.2.6.0 + Added Parallel steps diff --git a/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs b/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs new file mode 100644 index 000000000..d885f9f55 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/ParallelWorkflow.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class ParallelWorkflow : IWorkflow + { + public string Id => "parallel-sample"; + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith() + .Parallel() + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 1.1") + .Then() + .Input(step => step.Message, data => "Item 1.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 2.1") + .Then() + .Input(step => step.Message, data => "Item 2.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 3.1") + .Then() + .Input(step => step.Message, data => "Item 3.2")) + .Join() + .Then(); + } + } + + public class MyData + { + public int Counter { get; set; } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Program.cs b/src/samples/WorkflowCore.Sample13/Program.cs new file mode 100644 index 000000000..df2a97a9c --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Program.cs @@ -0,0 +1,45 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using WorkflowCore.Interface; + +namespace WorkflowCore.Sample13 +{ + class Program + { + public static void Main(string[] args) + { + IServiceProvider serviceProvider = ConfigureServices(); + + //start the workflow host + var host = serviceProvider.GetService(); + host.RegisterWorkflow(); + host.Start(); + + Console.WriteLine("Starting workflow..."); + string workflowId = host.StartWorkflow("parallel-sample").Result; + + Console.ReadLine(); + host.Stop(); + } + + private static IServiceProvider ConfigureServices() + { + //setup dependency injection + IServiceCollection services = new ServiceCollection(); + services.AddLogging(); + services.AddWorkflow(); + //services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow-test002")); + //services.AddWorkflow(x => x.UseSqlServer(@"Server=.\SQLEXPRESS;Database=WorkflowCoreTest001;Trusted_Connection=True;", true, true)); + //services.AddWorkflow(x => x.UseSqlite(@"Data Source=wfc001.db;", true)); + + + var serviceProvider = services.BuildServiceProvider(); + + //config logging + var loggerFactory = serviceProvider.GetService(); + //loggerFactory.AddDebug(LogLevel.Debug); + return serviceProvider; + } + } +} \ No newline at end of file diff --git a/src/samples/WorkflowCore.Sample13/README.md b/src/samples/WorkflowCore.Sample13/README.md new file mode 100644 index 000000000..f7eceff2e --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/README.md @@ -0,0 +1,27 @@ +# Parallel sample + +Illustrates how to run several branches of steps in parallel, and then join once all are complete. + + +```c# +builder + .StartWith() + .Parallel() + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 1.1") + .Then() + .Input(step => step.Message, data => "Item 1.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 2.1") + .Then() + .Input(step => step.Message, data => "Item 2.2")) + .Do(then => + then.StartWith() + .Input(step => step.Message, data => "Item 3.1") + .Then() + .Input(step => step.Message, data => "Item 3.2")) + .Join() + .Then(); +``` diff --git a/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs b/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs new file mode 100644 index 000000000..dbb59578f --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/PrintMessage.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class PrintMessage : StepBody + { + public string Message { get; set; } + + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine(Message); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs b/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs new file mode 100644 index 000000000..b7779d3b8 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/SayGoodbye.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class SayGoodbye : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Goodbye"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs b/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs new file mode 100644 index 000000000..bf174e0c0 --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/Steps/SayHello.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Sample13 +{ + public class SayHello : StepBody + { + public override ExecutionResult Run(IStepExecutionContext context) + { + Console.WriteLine("Hello"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj b/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj new file mode 100644 index 000000000..87628eb2f --- /dev/null +++ b/src/samples/WorkflowCore.Sample13/WorkflowCore.Sample13.csproj @@ -0,0 +1,18 @@ + + + + Exe + netcoreapp1.1 + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs new file mode 100644 index 000000000..4a248eb44 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelScenario.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Linq; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class ParallelScenario : BaseScenario + { + private static int StartStepTicker = 0; + private static int EndStepTicker = 0; + + private static int Step11Ticker = 0; + private static int Step12Ticker = 0; + private static int Step21Ticker = 0; + private static int Step22Ticker = 0; + private static int Step31Ticker = 0; + private static int Step32Ticker = 0; + + public class MyDataClass + { + } + + public class ParallelWorkflow : IWorkflow + { + public string Id => "ParallelWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(x => + { + StartStepTicker++; + return ExecutionResult.Next(); + }) + .Parallel() + .Do(then => + then.StartWith(x => + { + Step11Ticker++; + return ExecutionResult.Next(); + }) + .Then(x => + { + Step12Ticker++; + return ExecutionResult.Next(); + })) + .Do(then => + then.StartWith(x => + { + Step21Ticker++; + return ExecutionResult.Next(); + }) + .WaitFor("MyEvent", data => "0") + .Then(x => + { + Step22Ticker++; + return ExecutionResult.Next(); + })) + .Do(then => + then.StartWith(x => + { + Step31Ticker++; + return ExecutionResult.Next(); + }) + .Then(x => + { + Step32Ticker++; + return ExecutionResult.Next(); + })) + .Join() + .Then(x => + { + EndStepTicker++; + return ExecutionResult.Next(); + }); + } + } + + [Fact] + public void Scenario() + { + var workflowId = Host.StartWorkflow("ParallelWorkflow", new MyDataClass()).Result; + + int counter = 0; + while ((Step12Ticker == 0) && (Step32Ticker == 0) && (PersistenceProvider.GetSubcriptions("MyEvent", "0", DateTime.MaxValue).Result.Count() == 0) && (counter < 150)) + { + System.Threading.Thread.Sleep(200); + counter++; + } + + Step22Ticker.Should().Be(0); + + Host.PublishEvent("MyEvent", "0", "Pass"); + + var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result; + counter = 0; + while ((instance.Status == WorkflowStatus.Runnable) && (counter < 150)) + { + System.Threading.Thread.Sleep(200); + counter++; + instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result; + } + + instance.Status.Should().Be(WorkflowStatus.Complete); + StartStepTicker.Should().Be(1); + EndStepTicker.Should().Be(1); + Step11Ticker.Should().Be(1); + Step12Ticker.Should().Be(1); + Step21Ticker.Should().Be(1); + Step22Ticker.Should().Be(1); + Step31Ticker.Should().Be(1); + Step32Ticker.Should().Be(1); + } + } +}