Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ElemarJR/TheFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
ElemarJR committed Aug 19, 2018
2 parents eb356a6 + 26d8be1 commit 390c354
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 60 deletions.
32 changes: 18 additions & 14 deletions other/Playground/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TheFlow;
Expand All @@ -9,31 +10,34 @@ public static class Program
{
public static void Main()
{
var output = false;
var e1 = false;
var e2 = false;
var e3 = false;

var model = ProcessModel.Create()
.AddEventCatcher("start")
.AddExclusiveGateway("If")
.AddActivity("True", () => output = true)
.AddActivity("False", () => output = false)
.AddExclusiveGateway("EndIf")
.AddActivity("a1", () => { })
.AddActivity("c1", () => e1 = true)
.AttachAsCompensationActivity("c1", "a1")
.AddActivity("a2", () => throw new Exception())
.AddActivity("c2", () => e2 = true)
.AttachAsCompensationActivity("c2", "a2")
.AddActivity("a3", () => { })
.AddActivity("c3", () => e3 = true)
.AttachAsCompensationActivity("c3", "a3")
.AddEventThrower("end")
.AddSequenceFlow("start", "If")
.AddConditionalSequenceFlow("If", "True", true)
.AddConditionalSequenceFlow("If", "False", false)
.AddSequenceFlow("True", "EndIf")
.AddSequenceFlow("False", "EndIf")
.AddSequenceFlow("EndIf", "end");
.AddSequenceFlow("start", "a1", "a2", "a3", "end");

var models = new InMemoryProcessModelsStore(model);
var instances = new InMemoryProcessInstancesStore();

var sc = new ServiceCollection();
sc.AddLogging(cfg => { cfg.AddConsole(); });

var manager = new ProcessManager(models, instances, sc.BuildServiceProvider());
var result = manager.HandleEvent(null).First();

manager.HandleEvent(true);
var relatedInstance = manager.InstancesStore.GetById(result.ProcessInstanceId);

Console.WriteLine(output);
}
}
25 changes: 25 additions & 0 deletions src/TheFlow/CoreConcepts/Association.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace TheFlow.CoreConcepts
{
public class Association
{
public string FirstElement { get; }
public string SecondElement { get; }
public AssociationType AssociationType { get; }

public Association(
string firstElement,
string secondElement,
AssociationType associationType
)
{
FirstElement = firstElement;
SecondElement = secondElement;
AssociationType = associationType;
}
}

public enum AssociationType
{
Compensation
}
}
8 changes: 8 additions & 0 deletions src/TheFlow/CoreConcepts/HistoryItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ public static HistoryItem Create(Token token, string action)
public static HistoryItem Create(Token token, object payload, string action)
=> new HistoryItem(DateTime.Now, token.Id, token.ExecutionPoint, payload, action);
}

public static class HistoryItemActions
{
public static readonly string EventThrown = "eventThrown";
public static readonly string ActitvityStarted = "activityStarted";
public static readonly string ActivityCompleted = "activityCompleted";
public static readonly string EventCatched = "eventCatched";
}
}
37 changes: 27 additions & 10 deletions src/TheFlow/CoreConcepts/ProcessInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object eventData
@event.Element.Handle(ctx, eventData);

_history.Add(new HistoryItem(
DateTime.UtcNow, context.Token.Id, context.Token.ExecutionPoint, eventData, "eventCatched"
DateTime.UtcNow, context.Token.Id, context.Token.ExecutionPoint, eventData, HistoryItemActions.EventCatched
));

var connections = ctx.Model
Expand Down Expand Up @@ -197,7 +197,6 @@ ILogger logger
// TODO: Ensure model is valid (all connections are valid)
var e = context.Model.GetElementByName(context.Token.ExecutionPoint);
var element = e.Element;
//logger?.LogInformation($"Performing {e.Name} ...");

switch (element)
{
Expand All @@ -222,7 +221,7 @@ ILogger logger
ILogger logger,
Activity activity)
{
_history.Add(HistoryItem.Create(context.Token, "activityStarted"));
_history.Add(HistoryItem.Create(context.Token, HistoryItemActions.ActitvityStarted));

logger?.LogInformation($"Activity {context.Token.ExecutionPoint} execution will start now.");
activity.Run(context.WithRunningElement(activity));
Expand All @@ -232,7 +231,7 @@ ILogger logger
ILogger logger,
IEventThrower eventThrower)
{
_history.Add(HistoryItem.Create(context.Token, "eventThrow"));
_history.Add(HistoryItem.Create(context.Token, HistoryItemActions.EventThrown));

eventThrower.Throw(context.WithRunningElement(eventThrower));
logger?.LogInformation($"Event {context.Token.ExecutionPoint} was thrown.");
Expand All @@ -250,7 +249,7 @@ ILogger logger
).ToArray();

// TODO: Move this to the model validation
if (connections.Count() != 1)
if (connections.Length != 1)
{
throw new NotSupportedException();
}
Expand All @@ -260,13 +259,20 @@ ILogger logger
}
}


public IEnumerable<Token> HandleActivityCompletion(ExecutionContext context, object completionData)
// TODO: Invalidate parallel tokens (?!)
// TODO: Wait for pending tokens (?!)
public IEnumerable<Token> HandleActivityFailure(ExecutionContext context, object failureData)
{

var logger = context.ServiceProvider?
.GetService<ILogger<ProcessInstance>>();

context.Token.ExecutionPoint = "__compensation_start__";
MoveOn(context, logger);
return context.Token.GetActionableTokens();
}

public IEnumerable<Token> HandleActivityCompletion(ExecutionContext context, object completionData)
{
if (!IsRunning)
{
return Enumerable.Empty<Token>();
Expand All @@ -283,7 +289,7 @@ public IEnumerable<Token> HandleActivityCompletion(ExecutionContext context, obj
}

// TODO: Handle Exceptions
_history.Add(HistoryItem.Create(context.Token, completionData, "activityCompleted"));
_history.Add(HistoryItem.Create(context.Token, completionData, HistoryItemActions.ActivityCompleted));

var connections = context.Model
.GetOutcomingConnections(activity.Name)
Expand Down Expand Up @@ -324,8 +330,11 @@ public IEnumerable<Token> HandleActivityCompletion(ExecutionContext context, obj
else
{
context.Token.ExecutionPoint = connections.First().Element.To;

var logger = context.ServiceProvider?
.GetService<ILogger<ProcessInstance>>();
MoveOn(context, logger);

}
return context.Token.GetActionableTokens();
}
Expand All @@ -335,5 +344,13 @@ public IEnumerable<Token> HandleActivityCompletion(ExecutionContext context, obj
? this
: null;

public bool WasActivityCompleted(string activityId)
{
var result = _history.Any(item =>
item.ExecutionPoint == activityId &&
item.Action == HistoryItemActions.ActivityCompleted
);
return result;
}
}
}
73 changes: 58 additions & 15 deletions src/TheFlow/CoreConcepts/ProcessModel.Add.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@ public partial class ProcessModel
public ProcessModel AddNullElement(string name, NullElement n)
=> AddElement(NamedProcessElement<NullElement>.Create(name, n));

public ProcessModel AddEventCatcher(string name)
public ProcessModel AddEventCatcher(string name)
=> AddEventCatcher(name, CatchAnyEventCatcher.Create());

public ProcessModel AddEventCatcher(string name, IEventCatcher catcher)
=> AddEventCatcher(NamedProcessElement<IEventCatcher>.Create(name, catcher));

public ProcessModel AddEventCatcher(NamedProcessElement<IEventCatcher> catcher)
=> AddElement(catcher);

public ProcessModel AddEventCatcher(NamedProcessElement<IEventCatcher> catcher)
=> AddElement(catcher);

public ProcessModel AddBoundaryEventCatcher(
NamedProcessElement<IEventCatcher> catcher,
string activityName
) => AddElement(catcher);

public ProcessModel AddEventThrower(string name)
=> AddEventThrower(name, SilentEventThrower.Instance);

public ProcessModel AddEventThrower(string name, IEventThrower thrower)
=> AddEventThrower(NamedProcessElement<IEventThrower>.Create(name, thrower));
public ProcessModel AddEventThrower(NamedProcessElement<IEventThrower> thrower)

public ProcessModel AddEventThrower(NamedProcessElement<IEventThrower> thrower)
=> AddElement(thrower);

// validate null
Expand All @@ -42,7 +47,7 @@ DataAssociation association
public ProcessModel AddSequenceFlow(
string from,
string to,
params string [] path)
params string[] path)
{
var result = AddSequenceFlow(SequenceFlow.Create(from, to));
from = to;
Expand All @@ -53,20 +58,20 @@ DataAssociation association
}
return result;
}


public ProcessModel AddSequenceFlow(SequenceFlow sequenceFlow)
=> AddSequenceFlow(ProcessElement<SequenceFlow>.Create(sequenceFlow));

public ProcessModel AddSequenceFlow(ProcessElement<SequenceFlow> sequenceFlow)
=> AddElement(sequenceFlow);

public ProcessModel AddActivity(string name, Activity activity)
=> AddElement(ProcessElement<Activity>.Create(name, activity));
public ProcessModel AddActivity(NamedProcessElement<Activity> activity)

public ProcessModel AddActivity(NamedProcessElement<Activity> activity)
=> AddElement(activity);

public ProcessModel AddParallelGateway(string name)
=> AddElement(NamedProcessElement<ParallelGateway>.Create(name, new ParallelGateway()));

Expand All @@ -79,9 +84,47 @@ public ProcessModel AddActivity(string name, Action activity)

public ProcessModel AddConditionalSequenceFlow(string @from, string @to, object filterValue)
=> AddElement(ProcessElement<SequenceFlow>.Create(SequenceFlow.Create(@from, @to, filterValue)));

private ProcessModel AddElement(IProcessElement<IElement> element)
=> new ProcessModel(Id, Version + 1, Elements.Add(element));
=> new ProcessModel(Id, Version + 1, Elements.Add(element), Associations);

// TODO: Ensure that COMPENSATION has no incoming or outcoming sequence flow
public ProcessModel AttachAsCompensationActivity(string compensation, string to)
{
var that = this;

if (that.GetElementByName("__compensation_start__") == null)
{
that = that
.AddParallelGateway("__compensation_start__")
.AddParallelGateway("__compensation_end__")
.AddEventThrower("__process_failure__")
.AddSequenceFlow("__compensation_end__", "__process_failure__")
;
}


var preifname = $"__compensation_pre_if_{to}__";
var ifname = $"__compensation_if_{to}_was_completed__";
var endifname = $"__compensation_endif_{to}_was_completed__";

return new ProcessModel(Id, that.Version + 1, that.Elements, that.Associations.Add(new Association(
compensation,
to,
AssociationType.Compensation
)))
.AddActivity(preifname, LambdaActivity.Create((act, ctx) =>
{
var shouldRun = ctx.Instance.WasActivityCompleted(to);
act.GetDataOutputByName("default").Update(ctx, ctx.Token.ExecutionPoint, shouldRun);
}))
.AddExclusiveGateway(ifname)
.AddExclusiveGateway(endifname)
.AddSequenceFlow("__compensation_start__", preifname, ifname)
.AddConditionalSequenceFlow(ifname, compensation, true)
.AddSequenceFlow(compensation, endifname)
.AddConditionalSequenceFlow(ifname, endifname, false)
.AddSequenceFlow(endifname, "__compensation_end__");
}
}
}
15 changes: 8 additions & 7 deletions src/TheFlow/CoreConcepts/ProcessModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Xml.Schema;
using TheFlow.Elements;
using TheFlow.Elements.Activities;
using TheFlow.Elements.Connections;
using TheFlow.Elements.Data;
using TheFlow.Elements.Events;

namespace TheFlow.CoreConcepts
Expand All @@ -17,12 +15,14 @@ public partial class ProcessModel : IProcessModelProvider
public int Version { get; }

public ImmutableList<IProcessElement<IElement>> Elements { get; }
public ImmutableList<Association> Associations { get; }

#region Constructor and Empty Object
public ProcessModel(
string id,
int version,
ImmutableList<IProcessElement<IElement>> elements)
ImmutableList<IProcessElement<IElement>> elements,
ImmutableList<Association> associations)
{
if (elements.HasDuplicatedNames())
{
Expand All @@ -31,6 +31,7 @@ public partial class ProcessModel : IProcessModelProvider
Id = id;
Version = version;
Elements = elements;
Associations = associations ?? ImmutableList.Create<Association>();
}

//
Expand All @@ -39,8 +40,6 @@ public partial class ProcessModel : IProcessModelProvider
// );
#endregion



public IEnumerable<IProcessElement<IConnectionElement>> GetIncomingConnections(
string elementName
)
Expand Down Expand Up @@ -104,12 +103,14 @@ public static ProcessModel Create(Guid guid)
{
throw new ArgumentException("ProcessModel's Id cannot be Empty", nameof(guid));
}

return new ProcessModel(
guid.ToString(),
0,
ImmutableList.Create<IProcessElement<IElement>>()
ImmutableList.Create<IProcessElement<IElement>>(),
ImmutableList.Create<Association>()
);

}

public bool CanStartWith(ExecutionContext context, object eventData) =>
Expand Down

0 comments on commit 390c354

Please sign in to comment.