diff --git a/src/WorkflowCore/Services/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/MemoryPersistenceProvider.cs index fb4ba675f..1345b2bbe 100644 --- a/src/WorkflowCore/Services/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/MemoryPersistenceProvider.cs @@ -22,66 +22,90 @@ public class MemoryPersistenceProvider : IPersistenceProvider public async Task CreateNewWorkflow(WorkflowInstance workflow) { - workflow.Id = Guid.NewGuid().ToString(); - _instances.Add(workflow); - return workflow.Id; + lock (_instances) + { + workflow.Id = Guid.NewGuid().ToString(); + _instances.Add(workflow); + return workflow.Id; + } } public async Task PersistWorkflow(WorkflowInstance workflow) { - var existing = _instances.First(x => x.Id == workflow.Id); - _instances.Remove(existing); - _instances.Add(workflow); + lock (_instances) + { + var existing = _instances.First(x => x.Id == workflow.Id); + _instances.Remove(existing); + _instances.Add(workflow); + } } public async Task> GetRunnableInstances() { - var now = DateTime.Now.ToUniversalTime().Ticks; - return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id); + lock (_instances) + { + var now = DateTime.Now.ToUniversalTime().Ticks; + return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id); + } } public async Task GetWorkflowInstance(string Id) { - return _instances.First(x => x.Id == Id); + lock (_instances) + { + return _instances.First(x => x.Id == Id); + } } public async Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) { - var result = _instances.AsQueryable(); + lock (_instances) + { + var result = _instances.AsQueryable(); - if (status.HasValue) - result = result.Where(x => x.Status == status.Value); + if (status.HasValue) + result = result.Where(x => x.Status == status.Value); - if (!String.IsNullOrEmpty(type)) - result = result.Where(x => x.WorkflowDefinitionId == type); + if (!String.IsNullOrEmpty(type)) + result = result.Where(x => x.WorkflowDefinitionId == type); - if (createdFrom.HasValue) - result = result.Where(x => x.CreateTime >= createdFrom.Value); + if (createdFrom.HasValue) + result = result.Where(x => x.CreateTime >= createdFrom.Value); - if (createdTo.HasValue) - result = result.Where(x => x.CreateTime <= createdTo.Value); + if (createdTo.HasValue) + result = result.Where(x => x.CreateTime <= createdTo.Value); - return result.Skip(skip).Take(take).ToList(); + return result.Skip(skip).Take(take).ToList(); + } } public async Task CreateEventSubscription(EventSubscription subscription) { - subscription.Id = Guid.NewGuid().ToString(); - _subscriptions.Add(subscription); - return subscription.Id; + lock (_subscriptions) + { + subscription.Id = Guid.NewGuid().ToString(); + _subscriptions.Add(subscription); + return subscription.Id; + } } public async Task> GetSubcriptions(string eventName, string eventKey, DateTime asOf) { - return _subscriptions - .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf); + lock (_subscriptions) + { + return _subscriptions + .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf); + } } public async Task TerminateSubscription(string eventSubscriptionId) { - var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); - _subscriptions.Remove(sub); + lock (_subscriptions) + { + var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId); + _subscriptions.Remove(sub); + } } public void EnsureStoreExists() @@ -90,51 +114,72 @@ public void EnsureStoreExists() public async Task CreateEvent(Event newEvent) { - newEvent.Id = Guid.NewGuid().ToString(); - _events.Add(newEvent); - return newEvent.Id; + lock (_events) + { + newEvent.Id = Guid.NewGuid().ToString(); + _events.Add(newEvent); + return newEvent.Id; + } } public async Task MarkEventProcessed(string id) { - var evt = _events.FirstOrDefault(x => x.Id == id); - if (evt != null) - evt.IsProcessed = true; + lock (_events) + { + var evt = _events.FirstOrDefault(x => x.Id == id); + if (evt != null) + evt.IsProcessed = true; + } } public async Task> GetRunnableEvents() { - return _events - .Where(x => !x.IsProcessed) - .Where(x => x.EventTime <= DateTime.Now.ToUniversalTime()) - .Select(x => x.Id) - .ToList(); + lock (_events) + { + return _events + .Where(x => !x.IsProcessed) + .Where(x => x.EventTime <= DateTime.Now.ToUniversalTime()) + .Select(x => x.Id) + .ToList(); + } } public async Task GetEvent(string id) { - return _events.FirstOrDefault(x => x.Id == id); + lock (_events) + { + return _events.FirstOrDefault(x => x.Id == id); + } } public async Task> GetEvents(string eventName, string eventKey, DateTime asOf) { - return _events + lock (_events) + { + return _events .Where(x => x.EventName == eventName && x.EventKey == eventKey) .Where(x => x.EventTime >= asOf) .Select(x => x.Id) .ToList(); + } } public async Task MarkEventUnprocessed(string id) { - var evt = _events.FirstOrDefault(x => x.Id == id); - if (evt != null) - evt.IsProcessed = false; + lock (_events) + { + var evt = _events.FirstOrDefault(x => x.Id == id); + if (evt != null) + evt.IsProcessed = false; + } } public async Task PersistErrors(IEnumerable errors) { - _errors.AddRange(errors); + lock (errors) + { + _errors.AddRange(errors); + } } } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously