Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/WorkflowCore/Interface/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace WorkflowCore.Interface
public interface IDateTimeProvider
{
DateTime Now { get; }
DateTime UtcNow { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
{
cancellationToken.ThrowIfCancellationRequested();
var evt = await _persistenceStore.GetEvent(itemId);
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
if (evt.EventTime <= _datetimeProvider.UtcNow)
{
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
var toQueue = new List<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance

await persistenceStore.PersistErrors(result.Errors);

var readAheadTicks = _datetimeProvider.Now.Add(Options.PollInterval).ToUniversalTime().Ticks;
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;

if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
{
Expand Down Expand Up @@ -109,7 +109,7 @@ private async void FutureQueue(WorkflowInstance workflow, CancellationToken canc
return;
}

var target = (workflow.NextExecution.Value - _datetimeProvider.Now.ToUniversalTime().Ticks);
var target = (workflow.NextExecution.Value - _datetimeProvider.UtcNow.Ticks);
if (target > 0)
{
await Task.Delay(TimeSpan.FromTicks(target), cancellationToken);
Expand Down
1 change: 1 addition & 0 deletions src/WorkflowCore/Services/DateTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace WorkflowCore.Services
public class DateTimeProvider : IDateTimeProvider
{
public DateTime Now => DateTime.Now;
public DateTime UtcNow => DateTime.UtcNow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
}

scopePointer.Active = false;
scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
scopePointer.EndTime = _datetimeProvider.UtcNow;
scopePointer.Status = PointerStatus.Failed;

if (scopeStep.CompensationStepId.HasValue)
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Services/ErrorHandlers/RetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public RetryHandler(IDateTimeProvider datetimeProvider, WorkflowOptions options)
public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception, Queue<ExecutionPointer> bubbleUpQueue)
{
pointer.RetryCount++;
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? _options.ErrorRetryInterval);
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? _options.ErrorRetryInterval);
step.PrimeForRetry(pointer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
workflow.Status = WorkflowStatus.Suspended;
_eventPublisher.PublishNotification(new WorkflowSuspended()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
workflow.Status = WorkflowStatus.Terminated;
_eventPublisher.PublishNotification(new WorkflowTerminated()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/ExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
pointer.Outcome = result.OutcomeValue;
if (result.SleepFor.HasValue)
{
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(result.SleepFor.Value);
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value);
pointer.Status = PointerStatus.Sleeping;
}

Expand All @@ -57,7 +57,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
if (result.Proceed)
{
pointer.Active = false;
pointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
pointer.EndTime = _datetimeProvider.UtcNow;
pointer.Status = PointerStatus.Complete;

foreach (var outcomeTarget in step.Outcomes.Where(x => object.Equals(x.GetValue(workflow.Data), result.OutcomeValue) || x.GetValue(workflow.Data) == null))
Expand All @@ -67,7 +67,7 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition

_eventPublisher.PublishNotification(new StepCompleted()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
ExecutionPointerId = pointer.Id,
StepId = step.Id,
Expand All @@ -92,7 +92,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
{
_eventPublisher.PublishNotification(new WorkflowError()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Expand Down
24 changes: 12 additions & 12 deletions src/WorkflowCore/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
{
var wfResult = new WorkflowExecutorResult();

var exePointers = new List<ExecutionPointer>(workflow.ExecutionPointers.Where(x => x.Active && (!x.SleepUntil.HasValue || x.SleepUntil < _datetimeProvider.Now.ToUniversalTime())));
var exePointers = new List<ExecutionPointer>(workflow.ExecutionPointers.Where(x => x.Active && (!x.SleepUntil.HasValue || x.SleepUntil < _datetimeProvider.UtcNow)));
var def = _registry.GetDefinition(workflow.WorkflowDefinitionId, workflow.Version);
if (def == null)
{
Expand All @@ -60,12 +60,12 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
if (step == null)
{
_logger.LogError("Unable to find step {0} in workflow definition", pointer.StepId);
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval);
wfResult.Errors.Add(new ExecutionError()
{
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
ErrorTime = _datetimeProvider.UtcNow,
Message = $"Unable to find step {pointer.StepId} in workflow definition"
});
continue;
Expand All @@ -85,7 +85,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
{
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
ErrorTime = _datetimeProvider.UtcNow,
Message = ex.Message
});

Expand All @@ -108,7 +108,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl
return false;
case ExecutionPipelineDirective.EndWorkflow:
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
workflow.CompleteTime = _datetimeProvider.UtcNow;
return false;
}

Expand All @@ -117,7 +117,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl
pointer.Status = PointerStatus.Running;
_publisher.PublishNotification(new StepStarted()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
ExecutionPointerId = pointer.Id,
StepId = step.Id,
Expand All @@ -129,7 +129,7 @@ private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, Workfl

if (!pointer.StartTime.HasValue)
{
pointer.StartTime = _datetimeProvider.Now.ToUniversalTime();
pointer.StartTime = _datetimeProvider.UtcNow;
}

return true;
Expand All @@ -146,12 +146,12 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
if (body == null)
{
_logger.LogError("Unable to construct step body {0}", step.BodyType.ToString());
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval);
wfResult.Errors.Add(new ExecutionError()
{
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
ErrorTime = _datetimeProvider.UtcNow,
Message = $"Unable to construct step body {step.BodyType.ToString()}"
});
return;
Expand All @@ -175,7 +175,7 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
return;
case ExecutionPipelineDirective.EndWorkflow:
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
workflow.CompleteTime = _datetimeProvider.UtcNow;
return;
}

Expand Down Expand Up @@ -245,10 +245,10 @@ private void DetermineNextExecutionTime(WorkflowInstance workflow)
return;

workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
workflow.CompleteTime = _datetimeProvider.UtcNow;
_publisher.PublishNotification(new WorkflowCompleted()
{
EventTimeUtc = _datetimeProvider.Now,
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public ExecutionResultProcessorFixture()
Options = new WorkflowOptions(A.Fake<IServiceCollection>());

A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now);
A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow);

//config logging
var loggerFactory = new LoggerFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public WorkflowExecutorFixture()
A.CallTo(() => scope.ServiceProvider).Returns(ServiceProvider);

A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now);
A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow);

//config logging
var loggerFactory = new LoggerFactory();
Expand Down