diff --git a/other/Playground/Program.cs b/other/Playground/Program.cs index 3a04c7c..d610c6a 100644 --- a/other/Playground/Program.cs +++ b/other/Playground/Program.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TheFlow; @@ -9,31 +10,34 @@ public static class Program { public static void Main() { - var output = false; + var e1 = false; + var e2 = false; + var e3 = false; + var model = ProcessModel.Create() .AddEventCatcher("start") - .AddExclusiveGateway("If") - .AddActivity("True", () => output = true) - .AddActivity("False", () => output = false) - .AddExclusiveGateway("EndIf") + .AddActivity("a1", () => { }) + .AddActivity("c1", () => e1 = true) + .AttachAsCompensationActivity("c1", "a1") + .AddActivity("a2", () => throw new Exception()) + .AddActivity("c2", () => e2 = true) + .AttachAsCompensationActivity("c2", "a2") + .AddActivity("a3", () => { }) + .AddActivity("c3", () => e3 = true) + .AttachAsCompensationActivity("c3", "a3") .AddEventThrower("end") - .AddSequenceFlow("start", "If") - .AddConditionalSequenceFlow("If", "True", true) - .AddConditionalSequenceFlow("If", "False", false) - .AddSequenceFlow("True", "EndIf") - .AddSequenceFlow("False", "EndIf") - .AddSequenceFlow("EndIf", "end"); + .AddSequenceFlow("start", "a1", "a2", "a3", "end"); var models = new InMemoryProcessModelsStore(model); var instances = new InMemoryProcessInstancesStore(); var sc = new ServiceCollection(); sc.AddLogging(cfg => { cfg.AddConsole(); }); - + var manager = new ProcessManager(models, instances, sc.BuildServiceProvider()); + var result = manager.HandleEvent(null).First(); - manager.HandleEvent(true); + var relatedInstance = manager.InstancesStore.GetById(result.ProcessInstanceId); - Console.WriteLine(output); } } \ No newline at end of file diff --git a/src/TheFlow/CoreConcepts/Association.cs b/src/TheFlow/CoreConcepts/Association.cs new file mode 100644 index 0000000..740ad0c --- /dev/null +++ b/src/TheFlow/CoreConcepts/Association.cs @@ -0,0 +1,25 @@ +namespace TheFlow.CoreConcepts +{ + public class Association + { + public string FirstElement { get; } + public string SecondElement { get; } + public AssociationType AssociationType { get; } + + public Association( + string firstElement, + string secondElement, + AssociationType associationType + ) + { + FirstElement = firstElement; + SecondElement = secondElement; + AssociationType = associationType; + } + } + + public enum AssociationType + { + Compensation + } +} diff --git a/src/TheFlow/CoreConcepts/HistoryItem.cs b/src/TheFlow/CoreConcepts/HistoryItem.cs index c2557c3..1538946 100644 --- a/src/TheFlow/CoreConcepts/HistoryItem.cs +++ b/src/TheFlow/CoreConcepts/HistoryItem.cs @@ -25,4 +25,12 @@ public static HistoryItem Create(Token token, string action) public static HistoryItem Create(Token token, object payload, string action) => new HistoryItem(DateTime.Now, token.Id, token.ExecutionPoint, payload, action); } + + public static class HistoryItemActions + { + public static readonly string EventThrown = "eventThrown"; + public static readonly string ActitvityStarted = "activityStarted"; + public static readonly string ActivityCompleted = "activityCompleted"; + public static readonly string EventCatched = "eventCatched"; + } } \ No newline at end of file diff --git a/src/TheFlow/CoreConcepts/ProcessInstance.cs b/src/TheFlow/CoreConcepts/ProcessInstance.cs index cc75192..241705c 100644 --- a/src/TheFlow/CoreConcepts/ProcessInstance.cs +++ b/src/TheFlow/CoreConcepts/ProcessInstance.cs @@ -133,7 +133,7 @@ object eventData @event.Element.Handle(ctx, eventData); _history.Add(new HistoryItem( - DateTime.UtcNow, context.Token.Id, context.Token.ExecutionPoint, eventData, "eventCatched" + DateTime.UtcNow, context.Token.Id, context.Token.ExecutionPoint, eventData, HistoryItemActions.EventCatched )); var connections = ctx.Model @@ -197,7 +197,6 @@ ILogger logger // TODO: Ensure model is valid (all connections are valid) var e = context.Model.GetElementByName(context.Token.ExecutionPoint); var element = e.Element; - //logger?.LogInformation($"Performing {e.Name} ..."); switch (element) { @@ -222,7 +221,7 @@ ILogger logger ILogger logger, Activity activity) { - _history.Add(HistoryItem.Create(context.Token, "activityStarted")); + _history.Add(HistoryItem.Create(context.Token, HistoryItemActions.ActitvityStarted)); logger?.LogInformation($"Activity {context.Token.ExecutionPoint} execution will start now."); activity.Run(context.WithRunningElement(activity)); @@ -232,7 +231,7 @@ ILogger logger ILogger logger, IEventThrower eventThrower) { - _history.Add(HistoryItem.Create(context.Token, "eventThrow")); + _history.Add(HistoryItem.Create(context.Token, HistoryItemActions.EventThrown)); eventThrower.Throw(context.WithRunningElement(eventThrower)); logger?.LogInformation($"Event {context.Token.ExecutionPoint} was thrown."); @@ -250,7 +249,7 @@ ILogger logger ).ToArray(); // TODO: Move this to the model validation - if (connections.Count() != 1) + if (connections.Length != 1) { throw new NotSupportedException(); } @@ -260,13 +259,20 @@ ILogger logger } } - - public IEnumerable HandleActivityCompletion(ExecutionContext context, object completionData) + // TODO: Invalidate parallel tokens (?!) + // TODO: Wait for pending tokens (?!) + public IEnumerable HandleActivityFailure(ExecutionContext context, object failureData) { - var logger = context.ServiceProvider? .GetService>(); + context.Token.ExecutionPoint = "__compensation_start__"; + MoveOn(context, logger); + return context.Token.GetActionableTokens(); + } + + public IEnumerable HandleActivityCompletion(ExecutionContext context, object completionData) + { if (!IsRunning) { return Enumerable.Empty(); @@ -283,7 +289,7 @@ public IEnumerable HandleActivityCompletion(ExecutionContext context, obj } // TODO: Handle Exceptions - _history.Add(HistoryItem.Create(context.Token, completionData, "activityCompleted")); + _history.Add(HistoryItem.Create(context.Token, completionData, HistoryItemActions.ActivityCompleted)); var connections = context.Model .GetOutcomingConnections(activity.Name) @@ -324,8 +330,11 @@ public IEnumerable HandleActivityCompletion(ExecutionContext context, obj else { context.Token.ExecutionPoint = connections.First().Element.To; + + var logger = context.ServiceProvider? + .GetService>(); MoveOn(context, logger); - + } return context.Token.GetActionableTokens(); } @@ -335,5 +344,13 @@ public IEnumerable HandleActivityCompletion(ExecutionContext context, obj ? this : null; + public bool WasActivityCompleted(string activityId) + { + var result = _history.Any(item => + item.ExecutionPoint == activityId && + item.Action == HistoryItemActions.ActivityCompleted + ); + return result; + } } } \ No newline at end of file diff --git a/src/TheFlow/CoreConcepts/ProcessModel.Add.cs b/src/TheFlow/CoreConcepts/ProcessModel.Add.cs index 294f22e..e5471a3 100644 --- a/src/TheFlow/CoreConcepts/ProcessModel.Add.cs +++ b/src/TheFlow/CoreConcepts/ProcessModel.Add.cs @@ -13,22 +13,27 @@ public partial class ProcessModel public ProcessModel AddNullElement(string name, NullElement n) => AddElement(NamedProcessElement.Create(name, n)); - public ProcessModel AddEventCatcher(string name) + public ProcessModel AddEventCatcher(string name) => AddEventCatcher(name, CatchAnyEventCatcher.Create()); public ProcessModel AddEventCatcher(string name, IEventCatcher catcher) => AddEventCatcher(NamedProcessElement.Create(name, catcher)); - - public ProcessModel AddEventCatcher(NamedProcessElement catcher) - => AddElement(catcher); + + public ProcessModel AddEventCatcher(NamedProcessElement catcher) + => AddElement(catcher); + + public ProcessModel AddBoundaryEventCatcher( + NamedProcessElement catcher, + string activityName + ) => AddElement(catcher); public ProcessModel AddEventThrower(string name) => AddEventThrower(name, SilentEventThrower.Instance); - + public ProcessModel AddEventThrower(string name, IEventThrower thrower) => AddEventThrower(NamedProcessElement.Create(name, thrower)); - - public ProcessModel AddEventThrower(NamedProcessElement thrower) + + public ProcessModel AddEventThrower(NamedProcessElement thrower) => AddElement(thrower); // validate null @@ -42,7 +47,7 @@ DataAssociation association public ProcessModel AddSequenceFlow( string from, string to, - params string [] path) + params string[] path) { var result = AddSequenceFlow(SequenceFlow.Create(from, to)); from = to; @@ -53,20 +58,20 @@ DataAssociation association } return result; } - + public ProcessModel AddSequenceFlow(SequenceFlow sequenceFlow) => AddSequenceFlow(ProcessElement.Create(sequenceFlow)); - + public ProcessModel AddSequenceFlow(ProcessElement sequenceFlow) => AddElement(sequenceFlow); public ProcessModel AddActivity(string name, Activity activity) => AddElement(ProcessElement.Create(name, activity)); - - public ProcessModel AddActivity(NamedProcessElement activity) + + public ProcessModel AddActivity(NamedProcessElement activity) => AddElement(activity); - + public ProcessModel AddParallelGateway(string name) => AddElement(NamedProcessElement.Create(name, new ParallelGateway())); @@ -79,9 +84,47 @@ public ProcessModel AddActivity(string name, Action activity) public ProcessModel AddConditionalSequenceFlow(string @from, string @to, object filterValue) => AddElement(ProcessElement.Create(SequenceFlow.Create(@from, @to, filterValue))); - + private ProcessModel AddElement(IProcessElement element) - => new ProcessModel(Id, Version + 1, Elements.Add(element)); + => new ProcessModel(Id, Version + 1, Elements.Add(element), Associations); + // TODO: Ensure that COMPENSATION has no incoming or outcoming sequence flow + public ProcessModel AttachAsCompensationActivity(string compensation, string to) + { + var that = this; + + if (that.GetElementByName("__compensation_start__") == null) + { + that = that + .AddParallelGateway("__compensation_start__") + .AddParallelGateway("__compensation_end__") + .AddEventThrower("__process_failure__") + .AddSequenceFlow("__compensation_end__", "__process_failure__") + ; + } + + + var preifname = $"__compensation_pre_if_{to}__"; + var ifname = $"__compensation_if_{to}_was_completed__"; + var endifname = $"__compensation_endif_{to}_was_completed__"; + + return new ProcessModel(Id, that.Version + 1, that.Elements, that.Associations.Add(new Association( + compensation, + to, + AssociationType.Compensation + ))) + .AddActivity(preifname, LambdaActivity.Create((act, ctx) => + { + var shouldRun = ctx.Instance.WasActivityCompleted(to); + act.GetDataOutputByName("default").Update(ctx, ctx.Token.ExecutionPoint, shouldRun); + })) + .AddExclusiveGateway(ifname) + .AddExclusiveGateway(endifname) + .AddSequenceFlow("__compensation_start__", preifname, ifname) + .AddConditionalSequenceFlow(ifname, compensation, true) + .AddSequenceFlow(compensation, endifname) + .AddConditionalSequenceFlow(ifname, endifname, false) + .AddSequenceFlow(endifname, "__compensation_end__"); + } } } \ No newline at end of file diff --git a/src/TheFlow/CoreConcepts/ProcessModel.cs b/src/TheFlow/CoreConcepts/ProcessModel.cs index cff98e8..b36f690 100644 --- a/src/TheFlow/CoreConcepts/ProcessModel.cs +++ b/src/TheFlow/CoreConcepts/ProcessModel.cs @@ -2,11 +2,9 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; -using System.Xml.Schema; using TheFlow.Elements; using TheFlow.Elements.Activities; using TheFlow.Elements.Connections; -using TheFlow.Elements.Data; using TheFlow.Elements.Events; namespace TheFlow.CoreConcepts @@ -17,12 +15,14 @@ public partial class ProcessModel : IProcessModelProvider public int Version { get; } public ImmutableList> Elements { get; } + public ImmutableList Associations { get; } #region Constructor and Empty Object public ProcessModel( string id, int version, - ImmutableList> elements) + ImmutableList> elements, + ImmutableList associations) { if (elements.HasDuplicatedNames()) { @@ -31,6 +31,7 @@ public partial class ProcessModel : IProcessModelProvider Id = id; Version = version; Elements = elements; + Associations = associations ?? ImmutableList.Create(); } // @@ -39,8 +40,6 @@ public partial class ProcessModel : IProcessModelProvider // ); #endregion - - public IEnumerable> GetIncomingConnections( string elementName ) @@ -104,12 +103,14 @@ public static ProcessModel Create(Guid guid) { throw new ArgumentException("ProcessModel's Id cannot be Empty", nameof(guid)); } - + return new ProcessModel( guid.ToString(), 0, - ImmutableList.Create>() + ImmutableList.Create>(), + ImmutableList.Create() ); + } public bool CanStartWith(ExecutionContext context, object eventData) => diff --git a/src/TheFlow/Elements/Activities/LambdaActivity.cs b/src/TheFlow/Elements/Activities/LambdaActivity.cs index d33b82e..bc7c6a0 100644 --- a/src/TheFlow/Elements/Activities/LambdaActivity.cs +++ b/src/TheFlow/Elements/Activities/LambdaActivity.cs @@ -52,10 +52,18 @@ Action action ExecutionContext context ) { - Action(this, context); + try + { + Action(this, context); + context.Instance + .HandleActivityCompletion(context, null); + } + catch (Exception e) + { + context.Instance + .HandleActivityFailure(context, e); + } - context.Instance - .HandleActivityCompletion(context, null); } } diff --git a/src/TheFlow/Elements/Events/SilentEventThrower.cs b/src/TheFlow/Elements/Events/SilentEventThrower.cs index ec15c36..19babd2 100644 --- a/src/TheFlow/Elements/Events/SilentEventThrower.cs +++ b/src/TheFlow/Elements/Events/SilentEventThrower.cs @@ -13,7 +13,7 @@ private SilentEventThrower() public void Throw(ExecutionContext context) { - // This implementation will not throw any event. It is silent! + // This implementation will not throw any event. It is silent! } } } \ No newline at end of file diff --git a/src/TheFlow/Elements/Gateways/ParallelGateway.cs b/src/TheFlow/Elements/Gateways/ParallelGateway.cs index f4141eb..c303504 100644 --- a/src/TheFlow/Elements/Gateways/ParallelGateway.cs +++ b/src/TheFlow/Elements/Gateways/ParallelGateway.cs @@ -22,7 +22,7 @@ public override void Run(ExecutionContext context) .ToArray(); - if (incomingConnections.Length == 1) + if (incomingConnections.Length <= 1) { context.Instance .HandleActivityCompletion(context.WithRunningElement(null), null); diff --git a/src/TheFlow/IProcessManager.cs b/src/TheFlow/IProcessManager.cs index f7ae9f3..725994d 100644 --- a/src/TheFlow/IProcessManager.cs +++ b/src/TheFlow/IProcessManager.cs @@ -13,7 +13,8 @@ public interface IProcessManager string GetExecutionPoint(Guid instanceId, Guid tokenId); HandleResult HandleEvent(Guid processInstanceId, Guid tokenId, object eventData); HandleResult HandleActivityCompletion(Guid processInstanceId, Guid tokenId, object completionData); - + HandleResult HandleActivityFailure(Guid processInstanceId, Guid tokenId, object failureData); + IEnumerable HandleEvent(object e); void Attach(ProcessInstance instance); diff --git a/src/TheFlow/ProcessManager.cs b/src/TheFlow/ProcessManager.cs index 0ed49f0..108ce10 100644 --- a/src/TheFlow/ProcessManager.cs +++ b/src/TheFlow/ProcessManager.cs @@ -118,6 +118,15 @@ Guid tokenId ?.Token.FindById(tokenId)?.ExecutionPoint; } + public HandleResult HandleActivityFailure( + Guid processInstanceId, + Guid tokenId, + object failureData + ) + { + throw new NotImplementedException(); + } + public HandleResult HandleActivityCompletion( Guid processInstanceId, Guid tokenId, diff --git a/test/TheFlow.Tests/Functional/DataCommunications.cs b/test/TheFlow.Tests/Functional/DataCommunications.cs index f0bd63a..18b36aa 100644 --- a/test/TheFlow.Tests/Functional/DataCommunications.cs +++ b/test/TheFlow.Tests/Functional/DataCommunications.cs @@ -1,7 +1,4 @@ -using System; -using System.Diagnostics; -using FluentAssertions; -using TheFlow; +using FluentAssertions; using TheFlow.CoreConcepts; using TheFlow.Elements.Activities; using TheFlow.Elements.Data; @@ -9,8 +6,6 @@ using TheFlow.Infrastructure.Stores; using Xunit; -using static System.Console; - namespace TheFlow.Tests.Functional { public class DataCommunications diff --git a/test/TheFlow.Tests/Functional/Transactions.cs b/test/TheFlow.Tests/Functional/Transactions.cs new file mode 100644 index 0000000..4121a8d --- /dev/null +++ b/test/TheFlow.Tests/Functional/Transactions.cs @@ -0,0 +1,74 @@ +using System; +using System.Data; +using System.Linq; +using FluentAssertions; +using TheFlow.CoreConcepts; +using TheFlow.Infrastructure.Stores; +using Xunit; + +namespace TheFlow.Tests.Functional +{ + public class Transactions + { + [Fact] + public void WhenTheProcessFailsCompensationActivitiesRun() + { + var data = 0; + var model = ProcessModel.Create() + .AddEventCatcher("start") + .AddActivity("regular", () => data = 10) + .AddActivity("compensation", () => data -= 5) + .AttachAsCompensationActivity("compensation", "regular") + .AddActivity("failing", () => throw new Exception()) + .AddEventCatcher("end") + .AddSequenceFlow("start", "regular", "failing", "end"); + + var models = new InMemoryProcessModelsStore(model); + var instances = new InMemoryProcessInstancesStore(); + + var manager = new ProcessManager(models, instances); + + manager.HandleEvent(null); + data.Should().Be(5); + } + + [Fact] + public void WhenProcessFailCompensationsAreExecutedOnlyForActivitiesThatWerePerformed() + { + var e1 = false; + var e2 = false; + var e3 = false; + + var model = ProcessModel.Create() + .AddEventCatcher("start") + .AddActivity("a1", () => { }) + .AddActivity("c1", () => e1 = true) + .AttachAsCompensationActivity("c1", "a1") + .AddActivity("a2", () => throw new Exception()) + .AddActivity("c2", () => e2 = true) + .AttachAsCompensationActivity("c2", "a2") + .AddActivity("a3", () => { }) + .AddActivity("c3", () => e3 = true) + .AttachAsCompensationActivity("c3", "a3") + .AddEventThrower("end") + .AddSequenceFlow("start", "a1", "a2", "a3", "end"); + + var models = new InMemoryProcessModelsStore(model); + var instances = new InMemoryProcessInstancesStore(); + + var manager = new ProcessManager(models, instances); + + var result = manager.HandleEvent(null).First(); + var relatedInstance = manager.InstancesStore.GetById(result.ProcessInstanceId); + + relatedInstance.WasActivityCompleted("a1").Should().BeTrue(); + relatedInstance.WasActivityCompleted("a2").Should().BeFalse(); + relatedInstance.WasActivityCompleted("a3").Should().BeFalse(); + + e1.Should().BeTrue(); + e2.Should().BeFalse(); + e3.Should().BeFalse(); + } + + } +} diff --git a/test/TheFlow.Tests/Unit/ProcessModelShould.cs b/test/TheFlow.Tests/Unit/ProcessModelShould.cs index f0a4eb7..9005fb6 100644 --- a/test/TheFlow.Tests/Unit/ProcessModelShould.cs +++ b/test/TheFlow.Tests/Unit/ProcessModelShould.cs @@ -1,7 +1,5 @@ using System; -using System.Linq; using FluentAssertions; -using Newtonsoft.Json.Bson; using TheFlow.CoreConcepts; using TheFlow.Elements; using TheFlow.Elements.Activities;