Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions WorkflowCore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Providers.Elas
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Tests.Elasticsearch", "test\WorkflowCore.Tests.Elasticsearch\WorkflowCore.Tests.Elasticsearch.csproj", "{44644716-0CE8-4837-B189-AB65AE2106AA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.Redis", "test\WorkflowCore.Tests.Redis\WorkflowCore.Tests.Redis.csproj", "{78217204-B873-40B9-8875-E3925B2FBCEC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -328,6 +330,10 @@ Global
{44644716-0CE8-4837-B189-AB65AE2106AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.Build.0 = Release|Any CPU
{78217204-B873-40B9-8875-E3925B2FBCEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{78217204-B873-40B9-8875-E3925B2FBCEC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78217204-B873-40B9-8875-E3925B2FBCEC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78217204-B873-40B9-8875-E3925B2FBCEC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -384,6 +390,7 @@ Global
{435C6263-C6F8-4E93-B417-D861E9C22E18} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
{F6348170-B695-4D97-BAE6-4F0F643F3BEF} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
{44644716-0CE8-4837-B189-AB65AE2106AA} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
{78217204-B873-40B9-8875-E3925B2FBCEC} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace WorkflowCore.Providers.AWS.Services
public class DynamoDbProvisioner : IDynamoDbProvisioner
{
private readonly ILogger _logger;
private readonly AmazonDynamoDBClient _client;
private readonly IAmazonDynamoDB _client;
private readonly string _tablePrefix;

public DynamoDbProvisioner(AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix, ILoggerFactory logFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace WorkflowCore.Providers.AWS.Services
public class DynamoLockProvider : IDistributedLockProvider
{
private readonly ILogger _logger;
private readonly AmazonDynamoDBClient _client;
private readonly IAmazonDynamoDB _client;
private readonly string _tableName;
private readonly string _nodeId;
private readonly long _ttl = 30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace WorkflowCore.Providers.AWS.Services
public class DynamoPersistenceProvider : IPersistenceProvider
{
private readonly ILogger _logger;
private readonly AmazonDynamoDBClient _client;
private readonly IAmazonDynamoDB _client;
private readonly string _tablePrefix;
private readonly IDynamoDbProvisioner _provisioner;

Expand Down
7 changes: 5 additions & 2 deletions src/providers/WorkflowCore.Providers.Redis/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Redis providers for Workflow Core

* Provides Persistence support on [Workflow Core](../../README.md) backed by Redis.
* Provides Queueing support on [Workflow Core](../../README.md) backed by Redis.
* Provides Distributed locking support on [Workflow Core](../../README.md) backed by Redis.
* Provides event hub support on [Workflow Core](../../README.md) backed by Redis.
Expand All @@ -23,15 +24,17 @@ dotnet add package WorkflowCore.Providers.Redis
## Usage

Use the `IServiceCollection` extension methods when building your service provider
* .UseRedisPersistence
* .UseRedisQueues
* .UseRedisLocking
* .UseRedisEventHub

```C#
services.AddWorkflow(cfg =>
{
cfg.UseRedisPersistence("localhost:6379", "app-name");
cfg.UseRedisLocking("localhost:6379");
cfg.UseRedisQueues("localhost:6379", "my-app");
cfg.UseRedisEventHub("localhost:6379", "my-channel")
cfg.UseRedisQueues("localhost:6379", "app-name");
cfg.UseRedisEventHub("localhost:6379", "channel-name")
});
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public static WorkflowOptions UseRedisLocking(this WorkflowOptions options, stri
return options;
}

public static WorkflowOptions UseRedisPersistence(this WorkflowOptions options, string connectionString, string prefix)
{
options.UsePersistence(sp => new RedisPersistenceProvider(connectionString, prefix, sp.GetService<ILoggerFactory>()));
return options;
}

public static WorkflowOptions UseRedisEventHub(this WorkflowOptions options, string connectionString, string channel)
{
options.UseEventHub(sp => new RedisLifeCycleEventHub(connectionString, channel, sp.GetService<ILoggerFactory>()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using StackExchange.Redis;
using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Redis.Services
{
public class RedisPersistenceProvider : IPersistenceProvider
{
private readonly ILogger _logger;
private readonly string _connectionString;
private readonly string _prefix;
private const string WORKFLOW_SET = "workflows";
private const string SUBSCRIPTION_SET = "subscriptions";
private const string EVENT_SET = "events";
private const string RUNNABLE_INDEX = "runnable";
private const string EVENTSLUG_INDEX = "eventslug";
private readonly IConnectionMultiplexer _multiplexer;
private readonly IDatabase _redis;

private readonly JsonSerializerSettings _serializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };

public RedisPersistenceProvider(string connectionString, string prefix, ILoggerFactory logFactory)
{
_connectionString = connectionString;
_prefix = prefix;
_logger = logFactory.CreateLogger(GetType());
_multiplexer = ConnectionMultiplexer.Connect(_connectionString);
_redis = _multiplexer.GetDatabase();
}

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = Guid.NewGuid().ToString();
await PersistWorkflow(workflow);
return workflow.Id;
}

public async Task PersistWorkflow(WorkflowInstance workflow)
{
var str = JsonConvert.SerializeObject(workflow, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{WORKFLOW_SET}", workflow.Id, str);

if ((workflow.Status == WorkflowStatus.Runnable) && (workflow.NextExecution.HasValue))
await _redis.SortedSetAddAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", workflow.Id, workflow.NextExecution.Value);
else
await _redis.SortedSetRemoveAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", workflow.Id);
}

public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
{
var result = new List<string>();
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", -1, DateTime.UtcNow.Ticks);

foreach (var item in data)
result.Add(item);

return result;
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip,
int take)
{
throw new NotImplementedException();
}

public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
{
var raw = await _redis.HashGetAsync($"{_prefix}.{WORKFLOW_SET}", Id);
return JsonConvert.DeserializeObject<WorkflowInstance>(raw, _serializerSettings);
}

public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
var str = JsonConvert.SerializeObject(subscription, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{SUBSCRIPTION_SET}", subscription.Id, str);
await _redis.SortedSetAddAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{subscription.EventName}-{subscription.EventKey}", subscription.Id, subscription.SubscribeAsOf.Ticks);

return subscription.Id;
}

public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
{
var result = new List<EventSubscription>();
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{eventName}-{eventKey}", -1, asOf.Ticks);

foreach (var id in data)
{
var raw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", id);
if (raw.HasValue)
result.Add(JsonConvert.DeserializeObject<EventSubscription>(raw, _serializerSettings));
}

return result;
}

public async Task TerminateSubscription(string eventSubscriptionId)
{
var existingRaw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId);
var existing = JsonConvert.DeserializeObject<EventSubscription>(existingRaw, _serializerSettings);
await _redis.HashDeleteAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId);
await _redis.SortedSetRemoveAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{existing.EventName}-{existing.EventKey}", eventSubscriptionId);
}

public async Task<string> CreateEvent(Event newEvent)
{
newEvent.Id = Guid.NewGuid().ToString();
var str = JsonConvert.SerializeObject(newEvent, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", newEvent.Id, str);
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{EVENTSLUG_INDEX}.{newEvent.EventName}-{newEvent.EventKey}", newEvent.Id, newEvent.EventTime.Ticks);

if (newEvent.IsProcessed)
await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", newEvent.Id);
else
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", newEvent.Id, newEvent.EventTime.Ticks);

return newEvent.Id;
}

public async Task<Event> GetEvent(string id)
{
var raw = await _redis.HashGetAsync($"{_prefix}.{EVENT_SET}", id);
return JsonConvert.DeserializeObject<Event>(raw, _serializerSettings);
}

public async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt)
{
var result = new List<string>();
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", -1, asAt.Ticks);

foreach (var item in data)
result.Add(item);

return result;
}

public async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf)
{
var result = new List<string>();
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{EVENT_SET}.{EVENTSLUG_INDEX}.{eventName}-{eventKey}", asOf.Ticks);

foreach (var id in data)
result.Add(id);

return result;
}

public async Task MarkEventProcessed(string id)
{
var evt = await GetEvent(id);
evt.IsProcessed = true;
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", id);
}

public async Task MarkEventUnprocessed(string id)
{
var evt = await GetEvent(id);
evt.IsProcessed = false;
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", evt.Id, evt.EventTime.Ticks);
}

public Task PersistErrors(IEnumerable<ExecutionError> errors)
{
return Task.CompletedTask;
}

public void EnsureStoreExists()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>1.7.0</Version>
<Version>1.8.0</Version>
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageProjectUrl>https://github.com/danielgerlag/workflow-core</PackageProjectUrl>
<Description>Redis providers for Workflow Core

- Provides Queueing support on Workflow Core
- Provides distributed locking support on Workflow Core</Description>
<Description>Redis providers for Workflow Core (Persistence, queueing, distributed locking and event hubs)
</Description>
</PropertyGroup>

<ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/samples/WorkflowCore.Sample04/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static IServiceProvider ConfigureServices()

//services.AddWorkflow(cfg =>
//{
// cfg.UseRedisPersistence("localhost:6379", "sample4");
// cfg.UseRedisLocking("localhost:6379");
// cfg.UseRedisQueues("localhost:6379", "sample4");
// cfg.UseRedisEventHub("localhost:6379", "channel1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoBasicScenario : BasicScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoCompensationScenario : CompensationScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoDataScenario : DataIOScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoDynamicDataScenario : DynamicDataIOScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoEventScenario : EventScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoForeachScenario : ForeachScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoIfScenario : IfScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoRetrySagaScenario : RetrySagaScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoSagaScenario : SagaScenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using WorkflowCore.Tests.DynamoDB;
using Xunit;

namespace WorkflowCore.Tests.MongoDB.Scenarios
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoWhileScenario : WhileScenario
Expand Down
Loading