From 0831f0469119dab96be84edc96ab5ea806eca95d Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Sun, 10 Mar 2019 08:26:58 -0700 Subject: [PATCH 1/2] wip --- src/WorkflowCore/Services/CancellationProcessor.cs | 10 +++++----- src/WorkflowCore/Services/WorkflowExecutor.cs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/WorkflowCore/Services/CancellationProcessor.cs b/src/WorkflowCore/Services/CancellationProcessor.cs index d32d43f1f..e75d94f05 100644 --- a/src/WorkflowCore/Services/CancellationProcessor.cs +++ b/src/WorkflowCore/Services/CancellationProcessor.cs @@ -39,15 +39,15 @@ public void ProcessCancellations(WorkflowInstance workflow, WorkflowDefinition w foreach (var ptr in toCancel) { - ptr.EndTime = DateTime.Now.ToUniversalTime(); - ptr.Active = false; - ptr.Status = PointerStatus.Cancelled; - if (step.ProceedOnCancel) { _executionResultProcessor.ProcessExecutionResult(workflow, workflowDef, ptr, step, ExecutionResult.Next(), executionResult); } - + + ptr.EndTime = DateTime.Now.ToUniversalTime(); + ptr.Active = false; + ptr.Status = PointerStatus.Cancelled; + foreach (var descendent in workflow.ExecutionPointers.FindByScope(ptr.Id).Where(x => x.Status != PointerStatus.Complete && x.Status != PointerStatus.Cancelled)) { descendent.EndTime = DateTime.Now.ToUniversalTime(); diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 46f05d4f7..b66507767 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -51,7 +51,7 @@ public async Task Execute(WorkflowInstance workflow) foreach (var pointer in exePointers) { - if (pointer.Status == PointerStatus.Cancelled) + if (!pointer.Active) continue; var step = def.Steps.FindById(pointer.StepId); From 901c2b7534c47e0762f7fb5288458cbf90257466 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Tue, 12 Mar 2019 20:19:17 -0700 Subject: [PATCH 2/2] readability --- src/WorkflowCore/Services/WorkflowExecutor.cs | 189 +++++++++--------- 1 file changed, 100 insertions(+), 89 deletions(-) diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index b66507767..ce52e26b7 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -48,6 +48,8 @@ public async Task Execute(WorkflowInstance workflow) _logger.LogError("Workflow {0} version {1} is not registered", workflow.WorkflowDefinitionId, workflow.Version); return wfResult; } + + _cancellationProcessor.ProcessCancellations(workflow, def, wfResult); foreach (var pointer in exePointers) { @@ -64,97 +66,17 @@ public async Task Execute(WorkflowInstance workflow) WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, ErrorTime = _datetimeProvider.Now.ToUniversalTime(), - Message = String.Format("Unable to find step {0} in workflow definition", pointer.StepId) + Message = $"Unable to find step {pointer.StepId} in workflow definition" }); continue; } try - { - switch (step.InitForExecution(wfResult, def, workflow, pointer)) - { - case ExecutionPipelineDirective.Defer: - continue; - case ExecutionPipelineDirective.EndWorkflow: - workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); - continue; - } - - if (pointer.Status != PointerStatus.Running) - { - pointer.Status = PointerStatus.Running; - _publisher.PublishNotification(new StepStarted() - { - EventTimeUtc = _datetimeProvider.Now, - Reference = workflow.Reference, - ExecutionPointerId = pointer.Id, - StepId = step.Id, - WorkflowInstanceId = workflow.Id, - WorkflowDefinitionId = workflow.WorkflowDefinitionId, - Version = workflow.Version - }); - } - - if (!pointer.StartTime.HasValue) - { - pointer.StartTime = _datetimeProvider.Now.ToUniversalTime(); - } + { + if (!InitializeStep(workflow, step, wfResult, def, pointer)) + continue; - using (var scope = _scopeProvider.CreateScope()) - { - _logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id); - - IStepBody body = step.ConstructBody(scope.ServiceProvider); - - if (body == null) - { - _logger.LogError("Unable to construct step body {0}", step.BodyType.ToString()); - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); - wfResult.Errors.Add(new ExecutionError() - { - WorkflowId = workflow.Id, - ExecutionPointerId = pointer.Id, - ErrorTime = _datetimeProvider.Now.ToUniversalTime(), - Message = String.Format("Unable to construct step body {0}", step.BodyType.ToString()) - }); - continue; - } - - IStepExecutionContext context = new StepExecutionContext() - { - Workflow = workflow, - Step = step, - PersistenceData = pointer.PersistenceData, - ExecutionPointer = pointer, - Item = pointer.ContextItem - }; - - foreach (var input in step.Inputs) - input.AssignInput(workflow.Data, body, context); - - - switch (step.BeforeExecute(wfResult, context, pointer, body)) - { - case ExecutionPipelineDirective.Defer: - continue; - case ExecutionPipelineDirective.EndWorkflow: - workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); - continue; - } - - var result = await body.RunAsync(context); - - if (result.Proceed) - { - foreach (var output in step.Outputs) - output.AssignOutput(workflow.Data, body, context); - } - - _executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); - step.AfterExecute(wfResult, context, result, pointer); - } + await ExecuteStep(workflow, step, pointer, wfResult, def); } catch (Exception ex) { @@ -171,16 +93,105 @@ public async Task Execute(WorkflowInstance workflow) Host.ReportStepError(workflow, step, ex); } _cancellationProcessor.ProcessCancellations(workflow, def, wfResult); - - - } ProcessAfterExecutionIteration(workflow, def, wfResult); DetermineNextExecutionTime(workflow); return wfResult; } - + + private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, WorkflowExecutorResult wfResult, WorkflowDefinition def, ExecutionPointer pointer) + { + switch (step.InitForExecution(wfResult, def, workflow, pointer)) + { + case ExecutionPipelineDirective.Defer: + return false; + case ExecutionPipelineDirective.EndWorkflow: + workflow.Status = WorkflowStatus.Complete; + workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); + return false; + } + + if (pointer.Status != PointerStatus.Running) + { + pointer.Status = PointerStatus.Running; + _publisher.PublishNotification(new StepStarted() + { + EventTimeUtc = _datetimeProvider.Now, + Reference = workflow.Reference, + ExecutionPointerId = pointer.Id, + StepId = step.Id, + WorkflowInstanceId = workflow.Id, + WorkflowDefinitionId = workflow.WorkflowDefinitionId, + Version = workflow.Version + }); + } + + if (!pointer.StartTime.HasValue) + { + pointer.StartTime = _datetimeProvider.Now.ToUniversalTime(); + } + + return true; + } + + private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, ExecutionPointer pointer, WorkflowExecutorResult wfResult, WorkflowDefinition def) + { + using (var scope = _scopeProvider.CreateScope()) + { + _logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id); + + IStepBody body = step.ConstructBody(scope.ServiceProvider); + + if (body == null) + { + _logger.LogError("Unable to construct step body {0}", step.BodyType.ToString()); + pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); + wfResult.Errors.Add(new ExecutionError() + { + WorkflowId = workflow.Id, + ExecutionPointerId = pointer.Id, + ErrorTime = _datetimeProvider.Now.ToUniversalTime(), + Message = $"Unable to construct step body {step.BodyType.ToString()}" + }); + return; + } + + IStepExecutionContext context = new StepExecutionContext() + { + Workflow = workflow, + Step = step, + PersistenceData = pointer.PersistenceData, + ExecutionPointer = pointer, + Item = pointer.ContextItem + }; + + foreach (var input in step.Inputs) + input.AssignInput(workflow.Data, body, context); + + switch (step.BeforeExecute(wfResult, context, pointer, body)) + { + case ExecutionPipelineDirective.Defer: + return; + case ExecutionPipelineDirective.EndWorkflow: + workflow.Status = WorkflowStatus.Complete; + workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); + return; + } + + var result = await body.RunAsync(context); + + if (result.Proceed) + { + foreach (var output in step.Outputs) + output.AssignOutput(workflow.Data, body, context); + } + + _executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); + step.AfterExecute(wfResult, context, result, pointer); + } + } + private void ProcessAfterExecutionIteration(WorkflowInstance workflow, WorkflowDefinition workflowDef, WorkflowExecutorResult workflowResult) { var pointers = workflow.ExecutionPointers.Where(x => x.EndTime == null);