Skip to content

Commit

Permalink
notify active merge nodes when a branch finishies execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mihainradu committed Mar 20, 2024
1 parent 5821f50 commit 949dd0c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
23 changes: 18 additions & 5 deletions src/UiPath.Workflow.Runtime/Statements/FlowMerge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,20 @@ MergeState GetJoinState(Func<MergeState> add = null)
joinStates.TryGetValue(key, out var joinState);
if (joinState is null)
{
joinState = add();
joinState = new()
{
CompletedNodeIndeces = new HashSet<int>(),
};
joinStates.Add(key, joinState);
}
return joinState;
}
internal override void Execute(FlowNode predecessorNode)
{
var joinState = GetJoinState(() => new()
{
CompletedNodeIndeces = new HashSet<int>(),
});
var joinState = GetJoinState();
var branch = Owner.GetBranch(predecessorNode);
if (!ConnectedBranches.Contains(branch))
return;
joinState.CompletedNodeIndeces.Add(branch.NodeIndex);
if (Completion is not null)
{
Expand All @@ -120,7 +122,10 @@ protected override void OnCompletionCallback(bool result)
}

if (incompleteBranches.Any())
{
Owner.MarkDoNotCompleteNode();
return;
}

joinState.Done = true;
Owner.EnqueueNodeExecution(Next);
Expand All @@ -146,4 +151,12 @@ void Cancel(List<int> toCancel)
}
}
}

internal void OnBranchEnded(FlowNode current)
{
var branch = Owner.GetBranch(current);

if (ConnectedBranches.Contains(branch) && !GetJoinState().Done)
Owner.EnqueueNodeExecution(this);
}
}
2 changes: 2 additions & 0 deletions src/UiPath.Workflow.Runtime/Statements/FlowNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ internal virtual void OnCompletionCallback<T>(T result)
case bool b:
OnCompletionCallback(b);
break;
default:
throw new NotSupportedException();
}
}

Expand Down
40 changes: 30 additions & 10 deletions src/UiPath.Workflow.Runtime/Statements/Flowchart.Execution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ protected override void Execute(NativeActivityContext context)
private Dictionary<int, NodeState> NodesStatesByIndex
=> GetPersistableState<Dictionary<int, NodeState>>("_nodesStatesByIndex");
private NativeActivityContext _activeContext;
private bool _doNotCompleteNode;

private IDisposable WithContext(NativeActivityContext context, ActivityInstance completedInstance)
{
Expand Down Expand Up @@ -149,10 +150,9 @@ private void RecordLinks(FlowNode predecessor, List<FlowNode> successors)

private bool IsCancelRequested()
{
var node = _current;
if (node == null)
if (_current == null)
return false;
return GetNodeState(node.Index).IsCancelRequested;
return GetNodeState(_current.Index).IsCancelRequested;
}

private NodeState GetNodeState(int index)
Expand Down Expand Up @@ -224,29 +224,32 @@ internal void EnqueueNodeExecution(FlowNode node, bool isNewBranch = false)
private void ExecuteQueue()
{
SetNodeCompleted();

while (_executionQueue.TryDequeue(out var next))
{
var state = GetNodeState(next.Index);
state.IsRunning = true;
ExecuteNode(next);
}
}
internal void MarkDoNotCompleteNode()
{
_doNotCompleteNode = true;
}
private void SetNodeCompleted()
{
var state = GetNodeState(_current.Index);
state.ActivityInstanceIds.Remove(_completedInstance?.Id);
if (!state.ActivityInstanceIds.Any())
{
state.IsCompleted = true;
state.IsCompleted = !_doNotCompleteNode;
if (_completedInstance is not null)
{
state.IsCancelRequested = _completedInstance.IsCancellationRequested;
}
}
_completedInstance = null;
_doNotCompleteNode = false;
}

private void ExecuteNode(FlowNode node)
{
var previousNode = _current;
Expand Down Expand Up @@ -282,12 +285,29 @@ private void ExecuteNode(FlowNode node)
node.Execute(previousNode);
}

private List<FlowMerge> GetRunningMerges()
{
var runningNodes = _reachableNodes
.OfType<FlowMerge>()
.Where(n =>
{
var state = GetNodeState(n.Index);
return state != null
&& state.IsRunning
&& !state.IsCompleted
;
}).ToList();
return runningNodes;
}

private void OnCurrentBranchCancelled()
{
var merge = GetBranch(_current)?.Split.MergeNode;
if (merge is null)
return;
EnqueueNodeExecution(merge);
var runningMerges = GetRunningMerges()
.OfType<FlowMerge>();
foreach (var runningMerge in runningMerges)
{
runningMerge.OnBranchEnded(_current);
}
}

private class NodeState
Expand Down

0 comments on commit 949dd0c

Please sign in to comment.