From 4af92f3b69ddd34d54a3a9fcef370bdd681eba07 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Tue, 19 Feb 2019 19:52:57 -0800 Subject: [PATCH 1/3] wip --- src/WorkflowCore/Models/WorkflowDefinition.cs | 2 +- .../Models/WorkflowStepCollection.cs | 81 +++++++++++++++++++ .../DefinitionStorage/DefinitionLoader.cs | 4 +- .../ErrorHandlers/CompensateHandler.cs | 6 +- .../Services/ExecutionPointerFactory.cs | 8 +- .../Services/ExecutionResultProcessor.cs | 4 +- .../FluentBuilders/WorkflowBuilder.cs | 2 +- src/WorkflowCore/Services/WorkflowExecutor.cs | 4 +- .../Services/WorkflowExecutorFixture.cs | 2 +- 9 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 src/WorkflowCore/Models/WorkflowStepCollection.cs diff --git a/src/WorkflowCore/Models/WorkflowDefinition.cs b/src/WorkflowCore/Models/WorkflowDefinition.cs index df0695087..f39a563ed 100644 --- a/src/WorkflowCore/Models/WorkflowDefinition.cs +++ b/src/WorkflowCore/Models/WorkflowDefinition.cs @@ -12,7 +12,7 @@ public class WorkflowDefinition public string Description { get; set; } - public List Steps { get; set; } + public WorkflowStepCollection Steps { get; set; } = new WorkflowStepCollection(); public Type DataType { get; set; } diff --git a/src/WorkflowCore/Models/WorkflowStepCollection.cs b/src/WorkflowCore/Models/WorkflowStepCollection.cs new file mode 100644 index 000000000..90c2e4710 --- /dev/null +++ b/src/WorkflowCore/Models/WorkflowStepCollection.cs @@ -0,0 +1,81 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace WorkflowCore.Models +{ + public class WorkflowStepCollection : ICollection + { + private readonly Dictionary _dictionary = new Dictionary(); + + public WorkflowStepCollection() + { + } + + public WorkflowStepCollection(int capacity) + { + _dictionary = new Dictionary(capacity); + } + + public WorkflowStepCollection(ICollection steps) + { + foreach (var step in steps) + { + Add(step); + } + } + + public IEnumerator GetEnumerator() + { + return _dictionary.Values.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public WorkflowStep FindById(int id) + { + if (!_dictionary.ContainsKey(id)) + return null; + + return _dictionary[id]; + } + + public void Add(WorkflowStep item) + { + _dictionary.Add(item.Id, item); + } + + public void Clear() + { + _dictionary.Clear(); + } + + public bool Contains(WorkflowStep item) + { + return _dictionary.ContainsValue(item); + } + + public void CopyTo(WorkflowStep[] array, int arrayIndex) + { + _dictionary.Values.CopyTo(array, arrayIndex); + } + + public bool Remove(WorkflowStep item) + { + return _dictionary.Remove(item.Id); + } + + public WorkflowStep Find(Predicate match) + { + return _dictionary.Values.FirstOrDefault(x => match(x)); + } + + public int Count => _dictionary.Count; + public bool IsReadOnly => false; + } +} diff --git a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs index 6959990f6..18552ddf7 100644 --- a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs +++ b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs @@ -53,9 +53,9 @@ private WorkflowDefinition Convert(DefinitionSourceV1 source) } - private List ConvertSteps(ICollection source, Type dataType) + private WorkflowStepCollection ConvertSteps(ICollection source, Type dataType) { - var result = new List(); + var result = new WorkflowStepCollection(); int i = 0; var stack = new Stack(source.Reverse()); var parents = new List(); diff --git a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs index a6708ad08..aca56d6b6 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs @@ -34,7 +34,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP { var pointerId = scope.Pop(); var scopePointer = workflow.ExecutionPointers.FindById(pointerId); - var scopeStep = def.Steps.First(x => x.Id == scopePointer.StepId); + var scopeStep = def.Steps.FindById(scopePointer.StepId); var resume = true; var revert = false; @@ -44,7 +44,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP { var parentId = txnStack.Pop(); var parentPointer = workflow.ExecutionPointers.FindById(parentId); - var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId); + var parentStep = def.Steps.FindById(parentPointer.StepId); if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation)) { resume = parentStep.ResumeChildrenAfterCompensation; @@ -86,7 +86,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP foreach (var siblingPointer in prevSiblings) { - var siblingStep = def.Steps.First(x => x.Id == siblingPointer.StepId); + var siblingStep = def.Steps.FindById(siblingPointer.StepId); if (siblingStep.CompensationStepId.HasValue) { var compensationPointer = _pointerFactory.BuildCompensationPointer(def, siblingPointer, exceptionPointer, siblingStep.CompensationStepId.Value); diff --git a/src/WorkflowCore/Services/ExecutionPointerFactory.cs b/src/WorkflowCore/Services/ExecutionPointerFactory.cs index 43ff0b5cc..0ed6fe553 100644 --- a/src/WorkflowCore/Services/ExecutionPointerFactory.cs +++ b/src/WorkflowCore/Services/ExecutionPointerFactory.cs @@ -17,7 +17,7 @@ public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def) StepId = 0, Active = true, Status = PointerStatus.Pending, - StepName = Enumerable.First(def.Steps, x => x.Id == 0).Name + StepName = def.Steps.FindById(0).Name }; } @@ -32,7 +32,7 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe Active = true, ContextItem = pointer.ContextItem, Status = PointerStatus.Pending, - StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name, + StepName = def.Steps.FindById(outcomeTarget.NextStep).Name, Scope = new List(pointer.Scope) }; } @@ -52,7 +52,7 @@ public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPoint Active = true, ContextItem = branch, Status = PointerStatus.Pending, - StepName = def.Steps.First(x => x.Id == childDefinitionId).Name, + StepName = def.Steps.FindById(childDefinitionId).Name, Scope = new List(childScope) }; } @@ -68,7 +68,7 @@ public ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, Executi Active = true, ContextItem = pointer.ContextItem, Status = PointerStatus.Pending, - StepName = def.Steps.First(x => x.Id == compensationStepId).Name, + StepName = def.Steps.FindById(compensationStepId).Name, Scope = new List(pointer.Scope) }; } diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs index 9c8bad6d9..cdf0a9b42 100755 --- a/src/WorkflowCore/Services/ExecutionResultProcessor.cs +++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs @@ -109,7 +109,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de while (queue.Count > 0) { var exceptionPointer = queue.Dequeue(); - var exceptionStep = def.Steps.Find(x => x.Id == exceptionPointer.StepId); + var exceptionStep = def.Steps.FindById(exceptionPointer.StepId); var compensatingStepId = FindScopeCompensationStepId(workflow, def, exceptionPointer); var errorOption = (exceptionStep.ErrorBehavior ?? (compensatingStepId.HasValue ? WorkflowErrorHandling.Compensate : def.DefaultErrorBehavior)); @@ -129,7 +129,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de { var pointerId = scope.Pop(); var pointer = workflow.ExecutionPointers.FindById(pointerId); - var step = def.Steps.First(x => x.Id == pointer.StepId); + var step = def.Steps.FindById(pointer.StepId); if (step.CompensationStepId.HasValue) return step.CompensationStepId.Value; } diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs index a98a34185..47d45dbf0 100644 --- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs @@ -28,7 +28,7 @@ public virtual WorkflowDefinition Build(string id, int version) WorkflowDefinition result = new WorkflowDefinition(); result.Id = id; result.Version = version; - result.Steps = this.Steps; + result.Steps = new WorkflowStepCollection(Steps); result.DefaultErrorBehavior = DefaultErrorBehavior; result.DefaultErrorRetryInterval = DefaultErrorRetryInterval; return result; diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 45716b83d..b31debc3b 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -54,7 +54,7 @@ public async Task Execute(WorkflowInstance workflow) if (pointer.Status == PointerStatus.Cancelled) continue; - var step = def.Steps.First(x => x.Id == pointer.StepId); + var step = def.Steps.FindById(pointer.StepId); if (step != null) { try @@ -186,7 +186,7 @@ private void ProcessAfterExecutionIteration(WorkflowInstance workflow, WorkflowD foreach (var pointer in pointers) { - var step = workflowDef.Steps.First(x => x.Id == pointer.StepId); + var step = workflowDef.Steps.FindById(pointer.StepId); step?.AfterWorkflowIteration(workflowResult, workflowDef, workflow, pointer); } } diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs index 7597532d5..fec58c547 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs @@ -324,7 +324,7 @@ private void Given1StepWorkflow(WorkflowStep step1, string id, int version) Id = id, Version = version, DataType = typeof(object), - Steps = new List() + Steps = new WorkflowStepCollection() { step1 } From c62746061b7f1b7575764dc69c0a5270e24e9cc6 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Wed, 20 Feb 2019 07:53:16 -0800 Subject: [PATCH 2/3] wip --- src/WorkflowCore/Interface/IStepBuilder.cs | 14 ++++ src/WorkflowCore/Models/StepOutcome.cs | 2 +- src/WorkflowCore/Models/WorkflowStep.cs | 2 +- .../DefinitionStorage/DefinitionLoader.cs | 22 +++---- .../Services/FluentBuilders/StepBuilder.cs | 24 ++++++- .../FluentBuilders/WorkflowBuilder.cs | 32 +++++++--- src/WorkflowCore/WorkflowCore.csproj | 6 +- .../Scenarios/AttachScenario.cs | 64 +++++++++++++++++++ .../DefinitionLoaderTests.cs | 4 +- 9 files changed, 141 insertions(+), 29 deletions(-) create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs diff --git a/src/WorkflowCore/Interface/IStepBuilder.cs b/src/WorkflowCore/Interface/IStepBuilder.cs index df6f92404..174956a1d 100644 --- a/src/WorkflowCore/Interface/IStepBuilder.cs +++ b/src/WorkflowCore/Interface/IStepBuilder.cs @@ -21,6 +21,13 @@ public interface IStepBuilder /// IStepBuilder Name(string name); + /// + /// Specifies a custom Id to reference this step + /// + /// A custom Id to reference this step + /// + IStepBuilder Id(string id); + /// /// Specify the next step in the workflow /// @@ -51,6 +58,13 @@ public interface IStepBuilder /// IStepBuilder Then(Action body); + /// + /// Specify the next step in the workflow by Id + /// + /// + /// + IStepBuilder Attach(string id); + /// /// Configure an outcome for this step, then wire it to another step /// diff --git a/src/WorkflowCore/Models/StepOutcome.cs b/src/WorkflowCore/Models/StepOutcome.cs index 3c5b32dcb..cf4e1a0b6 100644 --- a/src/WorkflowCore/Models/StepOutcome.cs +++ b/src/WorkflowCore/Models/StepOutcome.cs @@ -16,7 +16,7 @@ public Expression> Value public string Label { get; set; } - public string Tag { get; set; } + public string ExternalNextStepId { get; set; } public object GetValue(object data) { diff --git a/src/WorkflowCore/Models/WorkflowStep.cs b/src/WorkflowCore/Models/WorkflowStep.cs index 00bcf26c4..73fc37b77 100644 --- a/src/WorkflowCore/Models/WorkflowStep.cs +++ b/src/WorkflowCore/Models/WorkflowStep.cs @@ -14,7 +14,7 @@ public abstract class WorkflowStep public virtual string Name { get; set; } - public virtual string Tag { get; set; } + public virtual string ExternalId { get; set; } public virtual List Children { get; set; } = new List(); diff --git a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs index 18552ddf7..fd44f7873 100644 --- a/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs +++ b/src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs @@ -87,7 +87,7 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty targetStep.Name = nextStep.Name; targetStep.ErrorBehavior = nextStep.ErrorBehavior; targetStep.RetryInterval = nextStep.RetryInterval; - targetStep.Tag = $"{nextStep.Id}"; + targetStep.ExternalId = $"{nextStep.Id}"; AttachInputs(nextStep, dataType, stepType, targetStep); AttachOutputs(nextStep, dataType, stepType, targetStep); @@ -114,7 +114,7 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty } if (!string.IsNullOrEmpty(nextStep.NextStepId)) - targetStep.Outcomes.Add(new StepOutcome() { Tag = $"{nextStep.NextStepId}" }); + targetStep.Outcomes.Add(new StepOutcome() { ExternalNextStepId = $"{nextStep.NextStepId}" }); result.Add(targetStep); @@ -123,26 +123,26 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty foreach (var step in result) { - if (result.Any(x => x.Tag == step.Tag && x.Id != step.Id)) - throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.Tag}"); + if (result.Any(x => x.ExternalId == step.ExternalId && x.Id != step.Id)) + throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.ExternalId}"); foreach (var outcome in step.Outcomes) { - if (result.All(x => x.Tag != outcome.Tag)) - throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.Tag}"); + if (result.All(x => x.ExternalId != outcome.ExternalNextStepId)) + throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.ExternalNextStepId}"); - outcome.NextStep = result.Single(x => x.Tag == outcome.Tag).Id; + outcome.NextStep = result.Single(x => x.ExternalId == outcome.ExternalNextStepId).Id; } } foreach (var parent in parents) { - var target = result.Single(x => x.Tag == parent.Id); + var target = result.Single(x => x.ExternalId == parent.Id); foreach (var branch in parent.Do) { var childTags = branch.Select(x => x.Id).ToList(); target.Children.AddRange(result - .Where(x => childTags.Contains(x.Tag)) + .Where(x => childTags.Contains(x.ExternalId)) .OrderBy(x => x.Id) .Select(x => x.Id) .Take(1) @@ -152,11 +152,11 @@ private WorkflowStepCollection ConvertSteps(ICollection source, Ty foreach (var item in compensatables) { - var target = result.Single(x => x.Tag == item.Id); + var target = result.Single(x => x.ExternalId == item.Id); var tag = item.CompensateWith.Select(x => x.Id).FirstOrDefault(); if (tag != null) { - var compStep = result.FirstOrDefault(x => x.Tag == tag); + var compStep = result.FirstOrDefault(x => x.ExternalId == tag); if (compStep != null) target.CompensationStepId = compStep.Id; } diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs index 37ca0cd29..d65262ac9 100644 --- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs @@ -26,6 +26,12 @@ public IStepBuilder Name(string name) return this; } + public IStepBuilder Id(string id) + { + Step.ExternalId = id; + return this; + } + public IStepBuilder Then(Action> stepSetup = null) where TStep : IStepBody { @@ -72,11 +78,23 @@ public IStepBuilder Then(Action bo return stepBuilder; } + public IStepBuilder Attach(string id) + { + Step.Outcomes.Add(new StepOutcome() + { + ExternalNextStepId = id + }); + + return this; + } + public IStepOutcomeBuilder When(object outcomeValue, string label = null) { - StepOutcome result = new StepOutcome(); - result.Value = x => outcomeValue; - result.Label = label; + StepOutcome result = new StepOutcome + { + Value = x => outcomeValue, + Label = label + }; Step.Outcomes.Add(result); var outcomeBuilder = new StepOutcomeBuilder(WorkflowBuilder, result); return outcomeBuilder; diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs index 47d45dbf0..195231c10 100644 --- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs @@ -25,20 +25,36 @@ public IWorkflowBuilder UseData() public virtual WorkflowDefinition Build(string id, int version) { - WorkflowDefinition result = new WorkflowDefinition(); - result.Id = id; - result.Version = version; - result.Steps = new WorkflowStepCollection(Steps); - result.DefaultErrorBehavior = DefaultErrorBehavior; - result.DefaultErrorRetryInterval = DefaultErrorRetryInterval; - return result; + AttachExternalIds(); + return new WorkflowDefinition + { + Id = id, + Version = version, + Steps = new WorkflowStepCollection(Steps), + DefaultErrorBehavior = DefaultErrorBehavior, + DefaultErrorRetryInterval = DefaultErrorRetryInterval + }; } public void AddStep(WorkflowStep step) { step.Id = Steps.Count(); Steps.Add(step); - } + } + + private void AttachExternalIds() + { + foreach (var step in Steps) + { + foreach (var outcome in step.Outcomes.Where(x => !string.IsNullOrEmpty(x.ExternalNextStepId))) + { + if (Steps.All(x => x.ExternalId != outcome.ExternalNextStepId)) + throw new KeyNotFoundException($"Cannot find step id {outcome.ExternalNextStepId}"); + + outcome.NextStep = Steps.Single(x => x.ExternalId == outcome.ExternalNextStepId).Id; + } + } + } } diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 55b2e2337..df3e383f0 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -15,9 +15,9 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 1.8.2 - 1.8.2.0 - 1.8.2.0 + 1.8.3 + 1.8.3.0 + 1.8.3.0 https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs new file mode 100644 index 000000000..9ad8445d3 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using System.Threading; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class AttachScenario : WorkflowTest + { + internal static int Step1Ticker = 0; + internal static int Step2Ticker = 0; + + public class MyDataClass + { + } + + public class GotoWorkflow : IWorkflow + { + public string Id => "GotoWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => + { + Step1Ticker++; + return ExecutionResult.Next(); + }) + .Id("step1") + .If(data => Step1Ticker < 3).Do(then => then + .StartWith(context => + { + Step2Ticker++; + return ExecutionResult.Next(); + }) + .Attach("step1") + ); + } + } + + public AttachScenario() + { + Setup(); + } + + [Fact] + public void Scenario() + { + var workflowId = StartWorkflow(new MyDataClass()); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + Step1Ticker.Should().Be(3); + Step2Ticker.Should().Be(3); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + } + } +} diff --git a/test/WorkflowCore.UnitTests/Services/DefinitionStorage/DefinitionLoaderTests.cs b/test/WorkflowCore.UnitTests/Services/DefinitionStorage/DefinitionLoaderTests.cs index fc92a874c..99a9b49ce 100644 --- a/test/WorkflowCore.UnitTests/Services/DefinitionStorage/DefinitionLoaderTests.cs +++ b/test/WorkflowCore.UnitTests/Services/DefinitionStorage/DefinitionLoaderTests.cs @@ -63,8 +63,8 @@ public void ParseDefinitionDynamic() private bool MatchTestDefinition(WorkflowDefinition def) { //TODO: make this better - var step1 = def.Steps.Single(s => s.Tag == "Step1"); - var step2 = def.Steps.Single(s => s.Tag == "Step2"); + var step1 = def.Steps.Single(s => s.ExternalId == "Step1"); + var step2 = def.Steps.Single(s => s.ExternalId == "Step2"); step1.Outcomes.Count.Should().Be(1); step1.Inputs.Count.Should().Be(1); From f289f572b5b57208866d9b50dfb927c5e661aea1 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Sat, 23 Feb 2019 07:28:19 -0800 Subject: [PATCH 3/3] wip --- src/WorkflowCore/Interface/IQueueProvider.cs | 2 +- .../SingleNodeQueueProvider.cs | 33 +++++++---------- .../Services/SQSQueueProvider.cs | 32 ++++------------ .../WorkflowCore.Providers.AWS.csproj | 4 +- .../Services/AzureStorageQueueProvider.cs | 37 ++++++------------- .../WorkflowCore.Providers.Azure.csproj | 6 +-- .../Services/RedisQueueProvider.cs | 26 +++++-------- .../WorkflowCore.Providers.Redis.csproj | 2 +- .../Services/RabbitMQProvider.cs | 2 + ...orkflowCore.QueueProviders.RabbitMQ.csproj | 6 +-- .../Services/QueueConfigProvider.cs | 29 +++++---------- .../SqlServerQueueProviderMigrator.cs | 3 +- ...rkflowCore.QueueProviders.SqlServer.csproj | 2 +- .../Scenarios/AttachScenario.cs | 4 +- 14 files changed, 68 insertions(+), 120 deletions(-) diff --git a/src/WorkflowCore/Interface/IQueueProvider.cs b/src/WorkflowCore/Interface/IQueueProvider.cs index dfbc2f14e..7c73607e0 100644 --- a/src/WorkflowCore/Interface/IQueueProvider.cs +++ b/src/WorkflowCore/Interface/IQueueProvider.cs @@ -32,5 +32,5 @@ public interface IQueueProvider : IDisposable Task Stop(); } - public enum QueueType { Workflow = 0, Event = 1 } + public enum QueueType { Workflow = 0, Event = 1, Index = 2 } } diff --git a/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs b/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs index 2f3b5bb45..38a8c5c4f 100644 --- a/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using WorkflowCore.Interface; @@ -12,48 +13,42 @@ namespace WorkflowCore.Services /// public class SingleNodeQueueProvider : IQueueProvider { - - private readonly BlockingCollection _runQueue = new BlockingCollection(); - private readonly BlockingCollection _eventQueue = new BlockingCollection(); + + private readonly Dictionary> _queues = new Dictionary>() + { + [QueueType.Workflow] = new BlockingCollection(), + [QueueType.Event] = new BlockingCollection(), + [QueueType.Index] = new BlockingCollection() + }; public bool IsDequeueBlocking => true; public async Task QueueWork(string id, QueueType queue) { - SelectQueue(queue).Add(id); + _queues[queue].Add(id); } public async Task DequeueWork(QueueType queue, CancellationToken cancellationToken) { - if (SelectQueue(queue).TryTake(out string id, 100, cancellationToken)) + if (_queues[queue].TryTake(out string id, 100, cancellationToken)) return id; return null; } - public async Task Start() + public Task Start() { + return Task.CompletedTask; } - public async Task Stop() + public Task Stop() { + return Task.CompletedTask; } public void Dispose() { } - - private BlockingCollection SelectQueue(QueueType queue) - { - switch (queue) - { - case QueueType.Workflow: - return _runQueue; - case QueueType.Event: - return _eventQueue; - } - return null; - } } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs index 6d52de105..6daed010b 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -15,8 +16,7 @@ public class SQSQueueProvider : IQueueProvider private const int WaitTime = 5; private readonly ILogger _logger; private readonly IAmazonSQS _client; - private string _workflowQueue; - private string _eventQueue; + private readonly Dictionary _queues = new Dictionary(); public bool IsDequeueBlocking => true; @@ -28,32 +28,14 @@ public SQSQueueProvider(AWSCredentials credentials, AmazonSQSConfig config, ILog public async Task QueueWork(string id, QueueType queue) { - var queueUrl = string.Empty; - switch (queue) - { - case QueueType.Workflow: - queueUrl = _workflowQueue; - break; - case QueueType.Event: - queueUrl = _eventQueue; - break; - } + var queueUrl = _queues[queue]; await _client.SendMessageAsync(new SendMessageRequest(queueUrl, id)); } public async Task DequeueWork(QueueType queue, CancellationToken cancellationToken) { - var queueUrl = string.Empty; - switch (queue) - { - case QueueType.Workflow: - queueUrl = _workflowQueue; - break; - case QueueType.Event: - queueUrl = _eventQueue; - break; - } + var queueUrl = _queues[queue]; var result = await _client.ReceiveMessageAsync(new ReceiveMessageRequest(queueUrl) { @@ -74,9 +56,11 @@ public async Task Start() { var workflowQueue = await _client.CreateQueueAsync(new CreateQueueRequest("workflowcore-workflows")); var eventQueue = await _client.CreateQueueAsync(new CreateQueueRequest("workflowcore-events")); + var indexQueue = await _client.CreateQueueAsync(new CreateQueueRequest("workflowcore-index")); - _workflowQueue = workflowQueue.QueueUrl; - _eventQueue = eventQueue.QueueUrl; + _queues[QueueType.Workflow] = workflowQueue.QueueUrl; + _queues[QueueType.Event] = eventQueue.QueueUrl; + _queues[QueueType.Index] = indexQueue.QueueUrl; } public Task Stop() => Task.CompletedTask; diff --git a/src/providers/WorkflowCore.Providers.AWS/WorkflowCore.Providers.AWS.csproj b/src/providers/WorkflowCore.Providers.AWS/WorkflowCore.Providers.AWS.csproj index 70c6896ef..b7e3ddddc 100644 --- a/src/providers/WorkflowCore.Providers.AWS/WorkflowCore.Providers.AWS.csproj +++ b/src/providers/WorkflowCore.Providers.AWS/WorkflowCore.Providers.AWS.csproj @@ -11,8 +11,8 @@ https://github.com/danielgerlag/workflow-core https://github.com/danielgerlag/workflow-core.git git - 1.8.1 - 1.8.1.0 + 1.8.2 + 1.8.2.0 diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/AzureStorageQueueProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/AzureStorageQueueProvider.cs index 4c297a8ee..5b104ab30 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/AzureStorageQueueProvider.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/AzureStorageQueueProvider.cs @@ -13,8 +13,8 @@ namespace WorkflowCore.Providers.Azure.Services public class AzureStorageQueueProvider : IQueueProvider { private readonly ILogger _logger; - private readonly CloudQueue _workflowQueue; - private readonly CloudQueue _eventQueue; + + private readonly Dictionary _queues = new Dictionary(); public bool IsDequeueBlocking => false; @@ -24,37 +24,20 @@ public AzureStorageQueueProvider(string connectionString, ILoggerFactory logFact var account = CloudStorageAccount.Parse(connectionString); var client = account.CreateCloudQueueClient(); - _workflowQueue = client.GetQueueReference("workflowcore-workflows"); - _eventQueue = client.GetQueueReference("workflowcore-events"); + _queues[QueueType.Workflow] = client.GetQueueReference("workflowcore-workflows"); + _queues[QueueType.Event] = client.GetQueueReference("workflowcore-events"); + _queues[QueueType.Index] = client.GetQueueReference("workflowcore-index"); } public async Task QueueWork(string id, QueueType queue) { var msg = new CloudQueueMessage(id); - - switch (queue) - { - case QueueType.Workflow: - await _workflowQueue.AddMessageAsync(msg); - break; - case QueueType.Event: - await _eventQueue.AddMessageAsync(msg); - break; - } + await _queues[queue].AddMessageAsync(msg); } public async Task DequeueWork(QueueType queue, CancellationToken cancellationToken) { - CloudQueue cloudQueue = null; - switch (queue) - { - case QueueType.Workflow: - cloudQueue = _workflowQueue; - break; - case QueueType.Event: - cloudQueue = _eventQueue; - break; - } + CloudQueue cloudQueue = _queues[queue]; if (cloudQueue == null) return null; @@ -70,8 +53,10 @@ public async Task DequeueWork(QueueType queue, CancellationToken cancell public async Task Start() { - await _workflowQueue.CreateIfNotExistsAsync(); - await _eventQueue.CreateIfNotExistsAsync(); + foreach (var queue in _queues.Values) + { + await queue.CreateIfNotExistsAsync(); + } } public Task Stop() => Task.CompletedTask; diff --git a/src/providers/WorkflowCore.Providers.Azure/WorkflowCore.Providers.Azure.csproj b/src/providers/WorkflowCore.Providers.Azure/WorkflowCore.Providers.Azure.csproj index f07725052..b5e058c85 100644 --- a/src/providers/WorkflowCore.Providers.Azure/WorkflowCore.Providers.Azure.csproj +++ b/src/providers/WorkflowCore.Providers.Azure/WorkflowCore.Providers.Azure.csproj @@ -7,15 +7,15 @@ - Provides distributed lock management on Workflow Core - Provides Queueing support on Workflow Core workflow workflowcore dlm - 1.7.0 + 1.8.2 $(PackageTargetFallback);dnxcore50 https://github.com/danielgerlag/workflow-core https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md git https://github.com/danielgerlag/workflow-core.git Daniel Gerlag - 1.7.0.0 - 1.7.0.0 + 1.8.2.0 + 1.8.2.0 diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisQueueProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisQueueProvider.cs index fdfb50240..dbed9fa98 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisQueueProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisQueueProvider.cs @@ -14,11 +14,17 @@ public class RedisQueueProvider : IQueueProvider private readonly ILogger _logger; private readonly string _connectionString; private readonly string _prefix; - private const string WORKFLOW_QUEUE = "workflows"; - private const string EVENT_QUEUE = "events"; + private IConnectionMultiplexer _multiplexer; private IDatabase _redis; + private readonly Dictionary _queues = new Dictionary() + { + [QueueType.Workflow] = "workflows", + [QueueType.Event] = "events", + [QueueType.Index] = "index" + }; + public RedisQueueProvider(string connectionString, string prefix, ILoggerFactory logFactory) { _connectionString = connectionString; @@ -66,20 +72,6 @@ public void Dispose() { } - private string GetQueueName(QueueType queue) - { - var queueName = string.Empty; - switch (queue) - { - case QueueType.Workflow: - queueName = $"{_prefix}-{WORKFLOW_QUEUE}"; - break; - case QueueType.Event: - queueName = $"{_prefix}-{EVENT_QUEUE}"; - break; - } - - return queueName; - } + private string GetQueueName(QueueType queue) => $"{_prefix}-{_queues[queue]}"; } } diff --git a/src/providers/WorkflowCore.Providers.Redis/WorkflowCore.Providers.Redis.csproj b/src/providers/WorkflowCore.Providers.Redis/WorkflowCore.Providers.Redis.csproj index 910b79be1..bb0169a34 100644 --- a/src/providers/WorkflowCore.Providers.Redis/WorkflowCore.Providers.Redis.csproj +++ b/src/providers/WorkflowCore.Providers.Redis/WorkflowCore.Providers.Redis.csproj @@ -2,7 +2,7 @@ netstandard2.0 - 1.8.0 + 1.8.2 https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md https://github.com/danielgerlag/workflow-core.git git diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs index c09c63166..c28091092 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs @@ -96,6 +96,8 @@ private string GetQueueName(QueueType queue) return "wfc.workflow_queue"; case QueueType.Event: return "wfc.event_queue"; + case QueueType.Index: + return "wfc.index_queue"; } return null; } diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj index 5d13307c4..c24f2285c 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj @@ -17,10 +17,10 @@ false false false - 1.7.0 + 1.8.2 Queue provider for Workflow-core using RabbitMQ - 1.7.0.0 - 1.7.0.0 + 1.8.2.0 + 1.8.2.0 diff --git a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/QueueConfigProvider.cs b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/QueueConfigProvider.cs index 605b2e8a6..691e6561b 100644 --- a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/QueueConfigProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/QueueConfigProvider.cs @@ -1,6 +1,8 @@ #region using using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using WorkflowCore.Interface; using WorkflowCore.QueueProviders.SqlServer.Interfaces; @@ -15,26 +17,13 @@ namespace WorkflowCore.QueueProviders.SqlServer.Services /// public class QueueConfigProvider : IQueueConfigProvider { - private readonly QueueConfig _workflowQueueConfig; - private readonly QueueConfig _eventQueueConfig; - - public QueueConfigProvider() - { - _workflowQueueConfig = new QueueConfig("workflow"); - _eventQueueConfig = new QueueConfig("event"); - } - - public QueueConfig GetByQueue(QueueType queue) + private readonly Dictionary _queues = new Dictionary() { - switch (queue) - { - case QueueType.Workflow: - return _workflowQueueConfig; - case QueueType.Event: - return _eventQueueConfig; - default: - throw new ArgumentOutOfRangeException(nameof(queue), queue, null); - } - } + [QueueType.Workflow] = new QueueConfig("workflow"), + [QueueType.Event] = new QueueConfig("event"), + [QueueType.Index] = new QueueConfig("indexq") + }; + + public QueueConfig GetByQueue(QueueType queue) => _queues[queue]; } } \ No newline at end of file diff --git a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs index 7853d7800..46e91b79f 100644 --- a/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs +++ b/src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProviderMigrator.cs @@ -40,7 +40,8 @@ public void MigrateDb() var queueConfigurations = new[] { _configProvider.GetByQueue(QueueType.Workflow), - _configProvider.GetByQueue(QueueType.Event) + _configProvider.GetByQueue(QueueType.Event), + _configProvider.GetByQueue(QueueType.Index) }; foreach (var item in queueConfigurations) diff --git a/src/providers/WorkflowCore.QueueProviders.SqlServer/WorkflowCore.QueueProviders.SqlServer.csproj b/src/providers/WorkflowCore.QueueProviders.SqlServer/WorkflowCore.QueueProviders.SqlServer.csproj index 6ef3bf079..8b600c89a 100644 --- a/src/providers/WorkflowCore.QueueProviders.SqlServer/WorkflowCore.QueueProviders.SqlServer.csproj +++ b/src/providers/WorkflowCore.QueueProviders.SqlServer/WorkflowCore.QueueProviders.SqlServer.csproj @@ -5,7 +5,7 @@ Roberto Paterlini Queue provider for Workflow-core using SQL Server Service Broker - 1.0.1-alpha + 1.0.2-alpha diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs index 9ad8445d3..9f7f93245 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/AttachScenario.cs @@ -32,7 +32,7 @@ public void Build(IWorkflowBuilder builder) return ExecutionResult.Next(); }) .Id("step1") - .If(data => Step1Ticker < 3).Do(then => then + .If(data => Step1Ticker < 4).Do(then => then .StartWith(context => { Step2Ticker++; @@ -54,7 +54,7 @@ public void Scenario() var workflowId = StartWorkflow(new MyDataClass()); WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); - Step1Ticker.Should().Be(3); + Step1Ticker.Should().Be(4); Step2Ticker.Should().Be(3); GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);