Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ReleaseNotes/1.3.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Workflow Core 1.3.0

* Added support for async steps

Simply inherit from `StepBodyAsync` instead of `StepBody`

```c#
public class DoSomething : StepBodyAsync
{
public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
await Task.Delay(2000);
return ExecutionResult.Next();
}
}
```

* Migrated from managing own thread pool to TPL datablocks for queue consumers

* After executing a workflow, will determine if it is scheduled to run before the next poll, if so, will delay queue it
81 changes: 41 additions & 40 deletions WorkflowCore.sln

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

namespace WorkflowCore.Interface
{
public interface IBackgroundWorker
public interface IBackgroundTask
{
void Start();
void Stop();
Expand Down
9 changes: 9 additions & 0 deletions src/WorkflowCore/Interface/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;

namespace WorkflowCore.Interface
{
public interface IDateTimeProvider
{
DateTime Now { get; }
}
}
3 changes: 2 additions & 1 deletion src/WorkflowCore/Interface/IDistributedLockProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace WorkflowCore.Interface
Expand All @@ -11,7 +12,7 @@ namespace WorkflowCore.Interface
/// </remarks>
public interface IDistributedLockProvider
{
Task<bool> AcquireLock(string Id);
Task<bool> AcquireLock(string Id, CancellationToken cancellationToken);

Task ReleaseLock(string Id);

Expand Down
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IEventThread.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/WorkflowCore/Interface/IPersistenceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IPersistenceProvider

Task PersistWorkflow(WorkflowInstance workflow);

Task<IEnumerable<string>> GetRunnableInstances();
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);

Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);

Expand All @@ -32,7 +32,7 @@ public interface IPersistenceProvider

Task<Event> GetEvent(string id);

Task<IEnumerable<string>> GetRunnableEvents();
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt);

Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf);

Expand Down
5 changes: 4 additions & 1 deletion src/WorkflowCore/Interface/IQueueProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using WorkflowCore.Models;

Expand All @@ -25,7 +26,9 @@ public interface IQueueProvider : IDisposable
/// If the queue is empty, NULL is returned
/// </summary>
/// <returns></returns>
Task<string> DequeueWork(QueueType queue);
Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken);

bool IsDequeueBlocking { get; }

Task Start();
Task Stop();
Expand Down
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IRunnablePoller.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IStepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace WorkflowCore.Interface
{
public interface IStepBody
{
ExecutionResult Run(IStepExecutionContext context);
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowExecutor
{
WorkflowExecutorResult Execute(WorkflowInstance workflow, WorkflowOptions options);
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, WorkflowOptions options);
}
}
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IWorkflowThread.cs

This file was deleted.

7 changes: 6 additions & 1 deletion src/WorkflowCore/Models/StepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ namespace WorkflowCore.Models
{
public abstract class StepBody : IStepBody
{

public abstract ExecutionResult Run(IStepExecutionContext context);

public Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
return Task.FromResult(Run(context));
}

protected ExecutionResult OutcomeResult(object value)
{
return new ExecutionResult()
Expand Down
14 changes: 14 additions & 0 deletions src/WorkflowCore/Models/StepBodyAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using WorkflowCore.Interface;

namespace WorkflowCore.Models
{
public abstract class StepBodyAsync : IStepBody
{
public abstract Task<ExecutionResult> RunAsync(IStepExecutionContext context);

}
}
2 changes: 2 additions & 0 deletions src/WorkflowCore/Models/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class WorkflowInstance

public string Description { get; set; }

public string Reference { get; set; }

public List<ExecutionPointer> ExecutionPointers { get; set; } = new List<ExecutionPointer>();

public long? NextExecution { get; set; }
Expand Down
9 changes: 1 addition & 8 deletions src/WorkflowCore/Models/WorkflowOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ public class WorkflowOptions
internal Func<IServiceProvider, IPersistenceProvider> PersistanceFactory;
internal Func<IServiceProvider, IQueueProvider> QueueFactory;
internal Func<IServiceProvider, IDistributedLockProvider> LockFactory;
internal int ThreadCount;
internal TimeSpan PollInterval;
internal TimeSpan IdleTime;
internal TimeSpan ErrorRetryInterval;

public WorkflowOptions()
{
//set defaults
ThreadCount = Environment.ProcessorCount;
PollInterval = TimeSpan.FromSeconds(10);
IdleTime = TimeSpan.FromMilliseconds(500);
IdleTime = TimeSpan.FromMilliseconds(100);
ErrorRetryInterval = TimeSpan.FromSeconds(60);

QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());
Expand All @@ -45,11 +43,6 @@ public void UseQueueProvider(Func<IServiceProvider, IQueueProvider> factory)
QueueFactory = factory;
}

public void UseThreads(int count)
{
ThreadCount = count;
}

public void UsePollInterval(TimeSpan interval)
{
PollInterval = interval;
Expand Down
1 change: 0 additions & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public virtual IStepBody ConstructBody(IServiceProvider serviceProvider)
}
return body;
}

}

public enum ExecutionPipelineDirective { Next = 0, Defer = 1, EndWorkflow = 2 }
Expand Down
16 changes: 12 additions & 4 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
using WorkflowCore.Services;
using WorkflowCore.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using WorkflowCore.Primitives;
using WorkflowCore.Services.BackgroundTasks;

namespace Microsoft.Extensions.DependencyInjection
{
Expand All @@ -23,13 +25,19 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
services.AddSingleton<IDistributedLockProvider>(options.LockFactory);
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
services.AddSingleton<WorkflowOptions>(options);

services.AddTransient<IBackgroundTask, WorkflowConsumer>();
services.AddTransient<IBackgroundTask, EventConsumer>();
services.AddTransient<IBackgroundTask, RunnablePoller>();

services.AddSingleton<IWorkflowHost, WorkflowHost>();
services.AddTransient<IWorkflowExecutor, WorkflowExecutor>();
services.AddTransient<IWorkflowBuilder, WorkflowBuilder>();
services.AddTransient<IWorkflowThread, WorkflowThread>();
services.AddTransient<IEventThread, EventThread>();
services.AddTransient<IRunnablePoller, RunnablePoller>();

services.AddTransient<IDateTimeProvider, DateTimeProvider>();

services.AddTransient<IPooledObjectPolicy<IPersistenceProvider>, InjectedObjectPoolPolicy<IPersistenceProvider>>();
services.AddTransient<IPooledObjectPolicy<IWorkflowExecutor>, InjectedObjectPoolPolicy<IWorkflowExecutor>>();

services.AddTransient<Foreach>();
}
}
Expand Down
95 changes: 95 additions & 0 deletions src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCore.Services.BackgroundTasks
{
internal class EventConsumer : QueueConsumer, IBackgroundTask
{
private readonly IPersistenceProvider _persistenceStore;
private readonly IDistributedLockProvider _lockProvider;
private readonly IDateTimeProvider _datetimeProvider;

protected override QueueType Queue => QueueType.Event;

public EventConsumer(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
: base(queueProvider, loggerFactory, options)
{
_persistenceStore = persistenceStore;
_lockProvider = lockProvider;
_datetimeProvider = datetimeProvider;
}

protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
{
if (await _lockProvider.AcquireLock($"evt:{itemId}", cancellationToken))
{
try
{
cancellationToken.ThrowIfCancellationRequested();
var evt = await _persistenceStore.GetEvent(itemId);
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
{
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
var success = true;

foreach (var sub in subs.ToList())
success = success && await SeedSubscription(evt, sub, cancellationToken);

if (success)
await _persistenceStore.MarkEventProcessed(itemId);
}
}
finally
{
await _lockProvider.ReleaseLock($"evt:{itemId}");
}
}
else
{
Logger.LogInformation($"Event locked {itemId}");
}
}

private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
{
if (await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
{
try
{
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished);
foreach (var p in pointers)
{
p.EventData = evt.EventData;
p.EventPublished = true;
p.Active = true;
}
workflow.NextExecution = 0;
await _persistenceStore.PersistWorkflow(workflow);
await _persistenceStore.TerminateSubscription(sub.Id);
return true;
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
return false;
}
finally
{
await _lockProvider.ReleaseLock(sub.WorkflowId);
await QueueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
}
}
else
{
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
return false;
}
}
}
}
Loading