Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 88 additions & 43 deletions src/WorkflowCore/Services/MemoryPersistenceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,90 @@ public class MemoryPersistenceProvider : IPersistenceProvider

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = Guid.NewGuid().ToString();
_instances.Add(workflow);
return workflow.Id;
lock (_instances)
{
workflow.Id = Guid.NewGuid().ToString();
_instances.Add(workflow);
return workflow.Id;
}
}

public async Task PersistWorkflow(WorkflowInstance workflow)
{
var existing = _instances.First(x => x.Id == workflow.Id);
_instances.Remove(existing);
_instances.Add(workflow);
lock (_instances)
{
var existing = _instances.First(x => x.Id == workflow.Id);
_instances.Remove(existing);
_instances.Add(workflow);
}
}

public async Task<IEnumerable<string>> GetRunnableInstances()
{
var now = DateTime.Now.ToUniversalTime().Ticks;
return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id);
lock (_instances)
{
var now = DateTime.Now.ToUniversalTime().Ticks;
return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id);
}
}

public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
{
return _instances.First(x => x.Id == Id);
lock (_instances)
{
return _instances.First(x => x.Id == Id);
}
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
{
var result = _instances.AsQueryable();
lock (_instances)
{
var result = _instances.AsQueryable();

if (status.HasValue)
result = result.Where(x => x.Status == status.Value);
if (status.HasValue)
result = result.Where(x => x.Status == status.Value);

if (!String.IsNullOrEmpty(type))
result = result.Where(x => x.WorkflowDefinitionId == type);
if (!String.IsNullOrEmpty(type))
result = result.Where(x => x.WorkflowDefinitionId == type);

if (createdFrom.HasValue)
result = result.Where(x => x.CreateTime >= createdFrom.Value);
if (createdFrom.HasValue)
result = result.Where(x => x.CreateTime >= createdFrom.Value);

if (createdTo.HasValue)
result = result.Where(x => x.CreateTime <= createdTo.Value);
if (createdTo.HasValue)
result = result.Where(x => x.CreateTime <= createdTo.Value);

return result.Skip(skip).Take(take).ToList();
return result.Skip(skip).Take(take).ToList();
}
}


public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
_subscriptions.Add(subscription);
return subscription.Id;
lock (_subscriptions)
{
subscription.Id = Guid.NewGuid().ToString();
_subscriptions.Add(subscription);
return subscription.Id;
}
}

public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
{
return _subscriptions
.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf);
lock (_subscriptions)
{
return _subscriptions
.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf);
}
}

public async Task TerminateSubscription(string eventSubscriptionId)
{
var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId);
_subscriptions.Remove(sub);
lock (_subscriptions)
{
var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId);
_subscriptions.Remove(sub);
}
}

public void EnsureStoreExists()
Expand All @@ -90,51 +114,72 @@ public void EnsureStoreExists()

public async Task<string> CreateEvent(Event newEvent)
{
newEvent.Id = Guid.NewGuid().ToString();
_events.Add(newEvent);
return newEvent.Id;
lock (_events)
{
newEvent.Id = Guid.NewGuid().ToString();
_events.Add(newEvent);
return newEvent.Id;
}
}

public async Task MarkEventProcessed(string id)
{
var evt = _events.FirstOrDefault(x => x.Id == id);
if (evt != null)
evt.IsProcessed = true;
lock (_events)
{
var evt = _events.FirstOrDefault(x => x.Id == id);
if (evt != null)
evt.IsProcessed = true;
}
}

public async Task<IEnumerable<string>> GetRunnableEvents()
{
return _events
.Where(x => !x.IsProcessed)
.Where(x => x.EventTime <= DateTime.Now.ToUniversalTime())
.Select(x => x.Id)
.ToList();
lock (_events)
{
return _events
.Where(x => !x.IsProcessed)
.Where(x => x.EventTime <= DateTime.Now.ToUniversalTime())
.Select(x => x.Id)
.ToList();
}
}

public async Task<Event> GetEvent(string id)
{
return _events.FirstOrDefault(x => x.Id == id);
lock (_events)
{
return _events.FirstOrDefault(x => x.Id == id);
}
}

public async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf)
{
return _events
lock (_events)
{
return _events
.Where(x => x.EventName == eventName && x.EventKey == eventKey)
.Where(x => x.EventTime >= asOf)
.Select(x => x.Id)
.ToList();
}
}

public async Task MarkEventUnprocessed(string id)
{
var evt = _events.FirstOrDefault(x => x.Id == id);
if (evt != null)
evt.IsProcessed = false;
lock (_events)
{
var evt = _events.FirstOrDefault(x => x.Id == id);
if (evt != null)
evt.IsProcessed = false;
}
}

public async Task PersistErrors(IEnumerable<ExecutionError> errors)
{
_errors.AddRange(errors);
lock (errors)
{
_errors.AddRange(errors);
}
}
}
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
Expand Down