diff --git a/src/WorkflowCore/Interface/IPersistenceProvider.cs b/src/WorkflowCore/Interface/IPersistenceProvider.cs index a71128f49..2c2d707ef 100644 --- a/src/WorkflowCore/Interface/IPersistenceProvider.cs +++ b/src/WorkflowCore/Interface/IPersistenceProvider.cs @@ -7,7 +7,7 @@ namespace WorkflowCore.Interface { /// /// The implemention of this interface will be responsible for - /// persisiting running workflow instances to a durable store + /// persisting running workflow instances to a durable store /// public interface IPersistenceProvider { @@ -22,6 +22,8 @@ public interface IPersistenceProvider Task GetWorkflowInstance(string Id); + Task> GetWorkflowInstances(IEnumerable ids); + Task CreateEventSubscription(EventSubscription subscription); Task> GetSubcriptions(string eventName, string eventKey, DateTime asOf); diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index 2818fb5f9..d69984506 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -8,22 +8,22 @@ namespace WorkflowCore.Services { - + public interface ISingletonMemoryProvider : IPersistenceProvider { } - #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously /// /// In-memory implementation of IPersistenceProvider for demo and testing purposes /// public class MemoryPersistenceProvider : ISingletonMemoryProvider - { + { private readonly List _instances = new List(); private readonly List _subscriptions = new List(); private readonly List _events = new List(); private readonly List _errors = new List(); - + public async Task CreateNewWorkflow(WorkflowInstance workflow) { lock (_instances) @@ -61,6 +61,19 @@ public async Task GetWorkflowInstance(string Id) } } + public async Task> GetWorkflowInstances(IEnumerable ids) + { + if (ids == null) + { + return new List(); + } + + lock (_instances) + { + return _instances.Where(x => ids.Contains(x.Id)); + } + } + public async Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) { lock (_instances) @@ -121,7 +134,7 @@ public async Task TerminateSubscription(string eventSubscriptionId) } public void EnsureStoreExists() - { + { } public async Task CreateEvent(Event newEvent) @@ -133,7 +146,7 @@ public async Task CreateEvent(Event newEvent) return newEvent.Id; } } - + public async Task MarkEventProcessed(string id) { lock (_events) @@ -197,5 +210,5 @@ public async Task PersistErrors(IEnumerable errors) } } - #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously } diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs index 5a2bf288c..599119557 100644 --- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs @@ -36,6 +36,8 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) public Task GetWorkflowInstance(string Id) => _innerService.GetWorkflowInstance(Id); + public Task> GetWorkflowInstances(IEnumerable ids) => _innerService.GetWorkflowInstances(ids); + public Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take); public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id); diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index f04f35a12..4531f92b1 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -26,7 +26,7 @@ public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFacto _canCreateDB = canCreateDB; _canMigrateDB = canMigrateDB; } - + public async Task CreateEventSubscription(EventSubscription subscription) { using (var db = ConstructDbContext()) @@ -96,7 +96,7 @@ public async Task> GetWorkflowInstances(WorkflowSt return result; } } - + public async Task GetWorkflowInstance(string Id) { using (var db = ConstructDbContext()) @@ -115,6 +115,26 @@ public async Task GetWorkflowInstance(string Id) } } + public async Task> GetWorkflowInstances(IEnumerable ids) + { + if (ids == null) + { + return new List(); + } + + using (var db = ConstructDbContext()) + { + var uids = ids.Select(i => new Guid(i)); + var raw = db.Set() + .Include(wf => wf.ExecutionPointers) + .ThenInclude(ep => ep.ExtensionAttributes) + .Include(wf => wf.ExecutionPointers) + .Where(x => uids.Contains(x.InstanceId)); + + return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance()); + } + } + public async Task PersistWorkflow(WorkflowInstance workflow) { using (var db = ConstructDbContext()) @@ -143,7 +163,7 @@ public async Task TerminateSubscription(string eventSubscriptionId) await db.SaveChangesAsync(); } } - + public virtual void EnsureStoreExists() { using (var context = ConstructDbContext()) @@ -283,7 +303,7 @@ public async Task PersistErrors(IEnumerable errors) } } } - + private WorkflowDbContext ConstructDbContext() { return _contextFactory.Build(); diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index bae6f9bcd..e59c1230a 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -26,8 +26,8 @@ public MongoPersistenceProvider(IMongoDatabase database) static MongoPersistenceProvider() { BsonClassMap.RegisterClassMap(x => - { - x.MapIdProperty(y => y.Id) + { + x.MapIdProperty(y => y.Id) .SetIdGenerator(new StringObjectIdGenerator()); x.MapProperty(y => y.Data) .SetSerializer(new DataObjectSerializer()); @@ -48,9 +48,9 @@ static MongoPersistenceProvider() .SetIdGenerator(new StringObjectIdGenerator()); x.MapProperty(y => y.EventName); x.MapProperty(y => y.EventKey); - x.MapProperty(y => y.StepId); + x.MapProperty(y => y.StepId); x.MapProperty(y => y.WorkflowId); - x.MapProperty(y => y.SubscribeAsOf); + x.MapProperty(y => y.SubscribeAsOf); }); BsonClassMap.RegisterClassMap(x => @@ -113,7 +113,18 @@ public async Task GetWorkflowInstance(string Id) var result = await WorkflowInstances.FindAsync(x => x.Id == Id); return await result.FirstAsync(); } - + + public async Task> GetWorkflowInstances(IEnumerable ids) + { + if (ids == null) + { + return new List(); + } + + var result = await WorkflowInstances.FindAsync(x => ids.Contains(x.Id)); + return await result.ToListAsync(); + } + public async Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) { IQueryable result = WorkflowInstances.AsQueryable(); @@ -138,7 +149,7 @@ public async Task CreateEventSubscription(EventSubscription subscription await EventSubscriptions.InsertOneAsync(subscription); return subscription.Id; } - + public async Task TerminateSubscription(string eventSubscriptionId) { await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId); @@ -146,9 +157,9 @@ public async Task TerminateSubscription(string eventSubscriptionId) public void EnsureStoreExists() { - - } - + + } + public async Task> GetSubcriptions(string eventName, string eventKey, DateTime asOf) { var query = EventSubscriptions @@ -192,7 +203,7 @@ public async Task> GetEvents(string eventName, string eventK var query = Events .Find(x => x.EventName == eventName && x.EventKey == eventKey && x.EventTime >= asOf) .Project(x => x.Id); - + return await query.ToListAsync(); } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index 1a6c7bf81..bbe61ae18 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading.Tasks; using WorkflowCore.Providers.AWS.Interface; @@ -86,7 +87,7 @@ public async Task> GetRunnableInstances(DateTime asAt) }, ScanIndexForward = true }; - + var response = await _client.QueryAsync(request); foreach (var item in response.Items) @@ -117,6 +118,57 @@ public async Task GetWorkflowInstance(string Id) return response.Item.ToWorkflowInstance(); } + public async Task> GetWorkflowInstances(IEnumerable ids) + { + if (ids == null) + { + return new List(); + } + + var keys = new KeysAndAttributes() { Keys = new List>() }; + foreach (var id in ids) + { + var key = new Dictionary() + { + { + "id", new AttributeValue { S = id } + } + }; + keys.Keys.Add(key); + } + + var request = new BatchGetItemRequest + { + RequestItems = new Dictionary() + { + { + $"{_tablePrefix}-{WORKFLOW_TABLE}", keys + } + } + }; + + var result = new List>(); + BatchGetItemResponse response; + do + { + // Making request + response = await _client.BatchGetItemAsync(request); + + // Check the response + var responses = response.Responses; // Attribute list in the response. + foreach (var tableResponse in responses) + { + result.AddRange(tableResponse.Value); + } + + // Any unprocessed keys? could happen if you exceed ProvisionedThroughput or some other error. + Dictionary unprocessedKeys = response.UnprocessedKeys; + request.RequestItems = unprocessedKeys; + } while (response.UnprocessedKeys.Count > 0); + + return result.Select(i => i.ToWorkflowInstance()); + } + public async Task CreateEventSubscription(EventSubscription subscription) { subscription.Id = Guid.NewGuid().ToString(); @@ -326,4 +378,4 @@ public void EnsureStoreExists() _provisioner.ProvisionTables().Wait(); } } -} +} \ No newline at end of file diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs index 57f64c4ef..373ac14a6 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -75,6 +76,17 @@ public async Task GetWorkflowInstance(string Id) return JsonConvert.DeserializeObject(raw, _serializerSettings); } + public async Task> GetWorkflowInstances(IEnumerable ids) + { + if (ids == null) + { + return new List(); + } + + var raw = await _redis.HashGetAsync($"{_prefix}.{WORKFLOW_SET}", Array.ConvertAll(ids.ToArray(), x => (RedisValue)x)); + return raw.Select(r => JsonConvert.DeserializeObject(r, _serializerSettings)); + } + public async Task CreateEventSubscription(EventSubscription subscription) { subscription.Id = Guid.NewGuid().ToString(); diff --git a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs index 14847b1f1..87b733a2d 100644 --- a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs +++ b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs @@ -1,12 +1,12 @@ using System; using System.Collections.Generic; -using System.Text; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; using WorkflowCore.Interface; using WorkflowCore.Models; -using Xunit; -using FluentAssertions; using WorkflowCore.TestAssets; -using System.Threading.Tasks; +using Xunit; namespace WorkflowCore.UnitTests { @@ -50,7 +50,7 @@ public void GetWorkflowInstance_should_retrieve_workflow() NextExecution = 0, Version = 1, WorkflowDefinitionId = "My Workflow", - Reference = "My Reference" + Reference = "My Reference" }; workflow.ExecutionPointers.Add(new ExecutionPointer() { @@ -69,6 +69,89 @@ public void GetWorkflowInstance_should_retrieve_workflow() .Scope.Should().ContainInOrder(workflow.ExecutionPointers.FindById("1").Scope); } + [Fact] + public void GetWorkflowInstances_should_retrieve_workflows() + { + var workflow01 = new WorkflowInstance() + { + Data = new TestData() { Value1 = 7 }, + Description = "My Description", + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Version = 1, + WorkflowDefinitionId = "My Workflow", + Reference = "My Reference" + }; + workflow01.ExecutionPointers.Add(new ExecutionPointer() + { + Id = "1", + Active = true, + StepId = 0, + SleepUntil = new DateTime(2000, 1, 1).ToUniversalTime(), + Scope = new List() { "4", "3", "2", "1" } + }); + var workflowId01 = Subject.CreateNewWorkflow(workflow01).Result; + + var workflow02 = new WorkflowInstance() + { + Data = new TestData() { Value1 = 7 }, + Description = "My Description", + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Version = 1, + WorkflowDefinitionId = "My Workflow", + Reference = "My Reference" + }; + workflow02.ExecutionPointers.Add(new ExecutionPointer() + { + Id = "1", + Active = true, + StepId = 0, + SleepUntil = new DateTime(2000, 1, 1).ToUniversalTime(), + Scope = new List() { "4", "3", "2", "1" } + }); + var workflowId02 = Subject.CreateNewWorkflow(workflow02).Result; + + var workflow03 = new WorkflowInstance() + { + Data = new TestData() { Value1 = 7 }, + Description = "My Description", + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Version = 1, + WorkflowDefinitionId = "My Workflow", + Reference = "My Reference" + }; + workflow03.ExecutionPointers.Add(new ExecutionPointer() + { + Id = "1", + Active = true, + StepId = 0, + SleepUntil = new DateTime(2000, 1, 1).ToUniversalTime(), + Scope = new List() { "4", "3", "2", "1" } + }); + var workflowId03 = Subject.CreateNewWorkflow(workflow03).Result; + + var retrievedWorkflows = Subject.GetWorkflowInstances(new[] { workflowId01, workflowId02, workflowId03 }).Result; + + retrievedWorkflows.Count().ShouldBeEquivalentTo(3); + + var retrievedWorkflow01 = retrievedWorkflows.Single(o => o.Id == workflowId01); + retrievedWorkflow01.ShouldBeEquivalentTo(workflow01); + retrievedWorkflow01.ExecutionPointers.FindById("1") + .Scope.Should().ContainInOrder(workflow01.ExecutionPointers.FindById("1").Scope); + + var retrievedWorkflow02 = retrievedWorkflows.Single(o => o.Id == workflowId02); + retrievedWorkflow02.ShouldBeEquivalentTo(workflow02); + retrievedWorkflow02.ExecutionPointers.FindById("1") + .Scope.Should().ContainInOrder(workflow02.ExecutionPointers.FindById("1").Scope); + + var retrievedWorkflow03 = retrievedWorkflows.Single(o => o.Id == workflowId03); + retrievedWorkflow03.ShouldBeEquivalentTo(workflow03); + retrievedWorkflow03.ExecutionPointers.FindById("1") + .Scope.Should().ContainInOrder(workflow03.ExecutionPointers.FindById("1").Scope); + } + [Fact] public void PersistWorkflow() { @@ -150,4 +233,4 @@ public class TestData { public int Value1 { get; set; } } -} +} \ No newline at end of file