From e740447358a1e04890f737cee2d3c0e57b5e01f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rn=20Moe?= Date: Mon, 16 Dec 2019 10:47:03 +0100 Subject: [PATCH] Add UtcNow to IDateTimeProvider, and use that throughout. Fix some assignments to EventTimeUtc. Old version with DateTime.Now.ToUniversalTime was less clear and above an order of magnitude slower. --- .../Interface/IDateTimeProvider.cs | 1 + .../Services/BackgroundTasks/EventConsumer.cs | 2 +- .../BackgroundTasks/WorkflowConsumer.cs | 4 ++-- src/WorkflowCore/Services/DateTimeProvider.cs | 1 + .../ErrorHandlers/CompensateHandler.cs | 2 +- .../Services/ErrorHandlers/RetryHandler.cs | 2 +- .../Services/ErrorHandlers/SuspendHandler.cs | 2 +- .../ErrorHandlers/TerminateHandler.cs | 2 +- .../Services/ExecutionResultProcessor.cs | 8 +++---- src/WorkflowCore/Services/WorkflowExecutor.cs | 24 +++++++++---------- .../ExecutionResultProcessorFixture.cs | 1 + .../Services/WorkflowExecutorFixture.cs | 1 + 12 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/WorkflowCore/Interface/IDateTimeProvider.cs b/src/WorkflowCore/Interface/IDateTimeProvider.cs index eec4026f3..13c5b86f3 100644 --- a/src/WorkflowCore/Interface/IDateTimeProvider.cs +++ b/src/WorkflowCore/Interface/IDateTimeProvider.cs @@ -5,5 +5,6 @@ namespace WorkflowCore.Interface public interface IDateTimeProvider { DateTime Now { get; } + DateTime UtcNow { get; } } } \ No newline at end of file diff --git a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs index 71f6a9c86..0fa13ef84 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs @@ -37,7 +37,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance { cancellationToken.ThrowIfCancellationRequested(); var evt = await _persistenceStore.GetEvent(itemId); - if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime()) + if (evt.EventTime <= _datetimeProvider.UtcNow) { var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime); var toQueue = new List(); diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index 4cfa74716..1548ddd10 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance await persistenceStore.PersistErrors(result.Errors); - var readAheadTicks = _datetimeProvider.Now.Add(Options.PollInterval).ToUniversalTime().Ticks; + var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks; if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks) { @@ -109,7 +109,7 @@ private async void FutureQueue(WorkflowInstance workflow, CancellationToken canc return; } - var target = (workflow.NextExecution.Value - _datetimeProvider.Now.ToUniversalTime().Ticks); + var target = (workflow.NextExecution.Value - _datetimeProvider.UtcNow.Ticks); if (target > 0) { await Task.Delay(TimeSpan.FromTicks(target), cancellationToken); diff --git a/src/WorkflowCore/Services/DateTimeProvider.cs b/src/WorkflowCore/Services/DateTimeProvider.cs index 467b08248..93defc48f 100644 --- a/src/WorkflowCore/Services/DateTimeProvider.cs +++ b/src/WorkflowCore/Services/DateTimeProvider.cs @@ -6,5 +6,6 @@ namespace WorkflowCore.Services public class DateTimeProvider : IDateTimeProvider { public DateTime Now => DateTime.Now; + public DateTime UtcNow => DateTime.UtcNow; } } diff --git a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs index aca56d6b6..1f3142610 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs @@ -60,7 +60,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP } scopePointer.Active = false; - scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + scopePointer.EndTime = _datetimeProvider.UtcNow; scopePointer.Status = PointerStatus.Failed; if (scopeStep.CompensationStepId.HasValue) diff --git a/src/WorkflowCore/Services/ErrorHandlers/RetryHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/RetryHandler.cs index f934922f6..0a4d54810 100644 --- a/src/WorkflowCore/Services/ErrorHandlers/RetryHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/RetryHandler.cs @@ -22,7 +22,7 @@ public RetryHandler(IDateTimeProvider datetimeProvider, WorkflowOptions options) public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception, Queue bubbleUpQueue) { pointer.RetryCount++; - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? _options.ErrorRetryInterval); + pointer.SleepUntil = _datetimeProvider.UtcNow.Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? _options.ErrorRetryInterval); step.PrimeForRetry(pointer); } } diff --git a/src/WorkflowCore/Services/ErrorHandlers/SuspendHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/SuspendHandler.cs index b0aec84fd..1cf6f7941 100755 --- a/src/WorkflowCore/Services/ErrorHandlers/SuspendHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/SuspendHandler.cs @@ -24,7 +24,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP workflow.Status = WorkflowStatus.Suspended; _eventPublisher.PublishNotification(new WorkflowSuspended() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, WorkflowInstanceId = workflow.Id, WorkflowDefinitionId = workflow.WorkflowDefinitionId, diff --git a/src/WorkflowCore/Services/ErrorHandlers/TerminateHandler.cs b/src/WorkflowCore/Services/ErrorHandlers/TerminateHandler.cs index 3745fe483..d89f7f470 100755 --- a/src/WorkflowCore/Services/ErrorHandlers/TerminateHandler.cs +++ b/src/WorkflowCore/Services/ErrorHandlers/TerminateHandler.cs @@ -24,7 +24,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP workflow.Status = WorkflowStatus.Terminated; _eventPublisher.PublishNotification(new WorkflowTerminated() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, WorkflowInstanceId = workflow.Id, WorkflowDefinitionId = workflow.WorkflowDefinitionId, diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs index 730dbc911..65ad3fda9 100755 --- a/src/WorkflowCore/Services/ExecutionResultProcessor.cs +++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs @@ -33,7 +33,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition pointer.Outcome = result.OutcomeValue; if (result.SleepFor.HasValue) { - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(result.SleepFor.Value); + pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value); pointer.Status = PointerStatus.Sleeping; } @@ -57,7 +57,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition if (result.Proceed) { pointer.Active = false; - pointer.EndTime = _datetimeProvider.Now.ToUniversalTime(); + pointer.EndTime = _datetimeProvider.UtcNow; pointer.Status = PointerStatus.Complete; foreach (var outcomeTarget in step.Outcomes.Where(x => object.Equals(x.GetValue(workflow.Data), result.OutcomeValue) || x.GetValue(workflow.Data) == null)) @@ -67,7 +67,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition _eventPublisher.PublishNotification(new StepCompleted() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, ExecutionPointerId = pointer.Id, StepId = step.Id, @@ -92,7 +92,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de { _eventPublisher.PublishNotification(new WorkflowError() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, WorkflowInstanceId = workflow.Id, WorkflowDefinitionId = workflow.WorkflowDefinitionId, diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index 9c28ce7f3..7597e7490 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -41,7 +41,7 @@ public async Task Execute(WorkflowInstance workflow) { var wfResult = new WorkflowExecutorResult(); - var exePointers = new List(workflow.ExecutionPointers.Where(x => x.Active && (!x.SleepUntil.HasValue || x.SleepUntil < _datetimeProvider.Now.ToUniversalTime()))); + var exePointers = new List(workflow.ExecutionPointers.Where(x => x.Active && (!x.SleepUntil.HasValue || x.SleepUntil < _datetimeProvider.UtcNow))); var def = _registry.GetDefinition(workflow.WorkflowDefinitionId, workflow.Version); if (def == null) { @@ -60,12 +60,12 @@ public async Task Execute(WorkflowInstance workflow) if (step == null) { _logger.LogError("Unable to find step {0} in workflow definition", pointer.StepId); - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); + pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval); wfResult.Errors.Add(new ExecutionError() { WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, - ErrorTime = _datetimeProvider.Now.ToUniversalTime(), + ErrorTime = _datetimeProvider.UtcNow, Message = $"Unable to find step {pointer.StepId} in workflow definition" }); continue; @@ -85,7 +85,7 @@ public async Task Execute(WorkflowInstance workflow) { WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, - ErrorTime = _datetimeProvider.Now.ToUniversalTime(), + ErrorTime = _datetimeProvider.UtcNow, Message = ex.Message }); @@ -108,7 +108,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl return false; case ExecutionPipelineDirective.EndWorkflow: workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); + workflow.CompleteTime = _datetimeProvider.UtcNow; return false; } @@ -117,7 +117,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl pointer.Status = PointerStatus.Running; _publisher.PublishNotification(new StepStarted() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, ExecutionPointerId = pointer.Id, StepId = step.Id, @@ -129,7 +129,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl if (!pointer.StartTime.HasValue) { - pointer.StartTime = _datetimeProvider.Now.ToUniversalTime(); + pointer.StartTime = _datetimeProvider.UtcNow; } return true; @@ -146,12 +146,12 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe if (body == null) { _logger.LogError("Unable to construct step body {0}", step.BodyType.ToString()); - pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval); + pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval); wfResult.Errors.Add(new ExecutionError() { WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, - ErrorTime = _datetimeProvider.Now.ToUniversalTime(), + ErrorTime = _datetimeProvider.UtcNow, Message = $"Unable to construct step body {step.BodyType.ToString()}" }); return; @@ -175,7 +175,7 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe return; case ExecutionPipelineDirective.EndWorkflow: workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); + workflow.CompleteTime = _datetimeProvider.UtcNow; return; } @@ -245,10 +245,10 @@ private void DetermineNextExecutionTime(WorkflowInstance workflow) return; workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime(); + workflow.CompleteTime = _datetimeProvider.UtcNow; _publisher.PublishNotification(new WorkflowCompleted() { - EventTimeUtc = _datetimeProvider.Now, + EventTimeUtc = _datetimeProvider.UtcNow, Reference = workflow.Reference, WorkflowInstanceId = workflow.Id, WorkflowDefinitionId = workflow.WorkflowDefinitionId, diff --git a/test/WorkflowCore.UnitTests/Services/ExecutionResultProcessorFixture.cs b/test/WorkflowCore.UnitTests/Services/ExecutionResultProcessorFixture.cs index 03eaa7cc7..d1378e2b3 100644 --- a/test/WorkflowCore.UnitTests/Services/ExecutionResultProcessorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/ExecutionResultProcessorFixture.cs @@ -34,6 +34,7 @@ public ExecutionResultProcessorFixture() Options = new WorkflowOptions(A.Fake()); A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now); + A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow); //config logging var loggerFactory = new LoggerFactory(); diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs index 781c04e8f..6d3f04499 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs @@ -48,6 +48,7 @@ public WorkflowExecutorFixture() A.CallTo(() => scope.ServiceProvider).Returns(ServiceProvider); A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now); + A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow); //config logging var loggerFactory = new LoggerFactory();