Skip to content

Commit

Permalink
Fix CachingWorkflowRegistry to use cached workflow blueprints
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Jun 21, 2021
1 parent fdb8173 commit 6e2601e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
Expand Up @@ -104,7 +104,7 @@ private async IAsyncEnumerable<(string topicName, string subscriptionName)> GetT

private static async Task<bool> WorkflowHasNonFinishedWorkflowsAsync(IWorkflowBlueprint workflowBlueprint, IWorkflowInstanceStore workflowInstanceStore, CancellationToken cancellationToken)
{
var count = await workflowInstanceStore.CountAsync(new NonFinalizedWorkflowSpecification().WithWorkflowDefinition(workflowBlueprint.Id), cancellationToken);
var count = await workflowInstanceStore.CountAsync(new UnfinishedWorkflowSpecification().WithWorkflowDefinition(workflowBlueprint.Id), cancellationToken);
return count > 0;
}
}
Expand Down
37 changes: 31 additions & 6 deletions src/core/Elsa.Core/Decorators/CachingWorkflowRegistry.cs
@@ -1,11 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Elsa.Caching;
using Elsa.Events;
using Elsa.Models;
using Elsa.Persistence;
using Elsa.Persistence.Specifications.WorkflowInstances;
using Elsa.Services;
using Elsa.Services.Models;
using MediatR;
Expand All @@ -20,40 +23,62 @@ public class CachingWorkflowRegistry : IWorkflowRegistry, INotificationHandler<W
private readonly IWorkflowRegistry _workflowRegistry;
private readonly IMemoryCache _memoryCache;
private readonly ICacheSignal _cacheSignal;
private readonly IWorkflowInstanceStore _workflowInstanceStore;

public CachingWorkflowRegistry(IWorkflowRegistry workflowRegistry, IMemoryCache memoryCache, ICacheSignal cacheSignal)
public CachingWorkflowRegistry(IWorkflowRegistry workflowRegistry, IMemoryCache memoryCache, ICacheSignal cacheSignal, IWorkflowInstanceStore workflowInstanceStore)
{
_workflowRegistry = workflowRegistry;
_memoryCache = memoryCache;
_cacheSignal = cacheSignal;
_workflowInstanceStore = workflowInstanceStore;
}

public async Task<IEnumerable<IWorkflowBlueprint>> ListAsync(CancellationToken cancellationToken) => await GetWorkflowBlueprints(cancellationToken);
public Task<IEnumerable<IWorkflowBlueprint>> ListActiveAsync(CancellationToken cancellationToken = default) => _workflowRegistry.ListActiveAsync(cancellationToken);
public async Task<IEnumerable<IWorkflowBlueprint>> ListAsync(CancellationToken cancellationToken) => await ListInternalAsync(cancellationToken);
public async Task<IEnumerable<IWorkflowBlueprint>> ListActiveAsync(CancellationToken cancellationToken = default) => await ListActiveInternalAsync(cancellationToken).ToListAsync(cancellationToken);

public async Task<IWorkflowBlueprint?> GetAsync(string id, string? tenantId, VersionOptions version, CancellationToken cancellationToken) =>
await FindAsync(x => x.Id == id && x.TenantId == tenantId && x.WithVersion(version), cancellationToken);

public async Task<IEnumerable<IWorkflowBlueprint>> FindManyAsync(Func<IWorkflowBlueprint, bool> predicate, CancellationToken cancellationToken)
{
var workflows = await GetWorkflowBlueprints(cancellationToken);
var workflows = await ListInternalAsync(cancellationToken);
return workflows.Where(predicate);
}

public async Task<IWorkflowBlueprint?> FindAsync(Func<IWorkflowBlueprint, bool> predicate, CancellationToken cancellationToken)
{
var workflows = await GetWorkflowBlueprints(cancellationToken);
var workflows = await ListInternalAsync(cancellationToken);
return workflows.FirstOrDefault(predicate);
}

private async Task<ICollection<IWorkflowBlueprint>> GetWorkflowBlueprints(CancellationToken cancellationToken)
private async Task<ICollection<IWorkflowBlueprint>> ListInternalAsync(CancellationToken cancellationToken)
{
return await _memoryCache.GetOrCreateAsync(CacheKey, async entry =>
{
entry.Monitor(_cacheSignal.GetToken(CacheKey));
return await _workflowRegistry.ListAsync(cancellationToken).ToList();
});
}

private async IAsyncEnumerable<IWorkflowBlueprint> ListActiveInternalAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
var workflows = await ListInternalAsync(cancellationToken);

foreach (var workflow in workflows)
{
// If a workflow is not published, only consider it for processing if it has at least one non-ended workflow instance.
if (!workflow.IsPublished && !await WorkflowHasUnfinishedWorkflowsAsync(workflow, cancellationToken))
continue;

yield return workflow;
}
}

private async Task<bool> WorkflowHasUnfinishedWorkflowsAsync(IWorkflowBlueprint workflowBlueprint, CancellationToken cancellationToken)
{
var count = await _workflowInstanceStore.CountAsync(new UnfinishedWorkflowSpecification().WithWorkflowDefinition(workflowBlueprint.Id), cancellationToken);
return count > 0;
}

Task INotificationHandler<WorkflowDefinitionSaved>.Handle(WorkflowDefinitionSaved notification, CancellationToken cancellationToken)
{
Expand Down
Expand Up @@ -7,7 +7,7 @@ namespace Elsa.Persistence.Specifications.WorkflowInstances
/// <summary>
/// Matches all workflow instances that are idle, running or suspended.
/// </summary>
public class NonFinalizedWorkflowSpecification : Specification<WorkflowInstance>
public class UnfinishedWorkflowSpecification : Specification<WorkflowInstance>
{
public override Expression<Func<WorkflowInstance, bool>> ToExpression() => x => x.WorkflowStatus == WorkflowStatus.Idle || x.WorkflowStatus == WorkflowStatus.Running || x.WorkflowStatus == WorkflowStatus.Suspended;
}
Expand Down
22 changes: 11 additions & 11 deletions src/core/Elsa.Core/Services/Workflows/WorkflowRegistry.cs
Expand Up @@ -24,8 +24,8 @@ public WorkflowRegistry(IEnumerable<IWorkflowProvider> workflowProviders, IWorkf
_workflowInstanceStore = workflowInstanceStore;
}

public async Task<IEnumerable<IWorkflowBlueprint>> ListAsync(CancellationToken cancellationToken) => await GetWorkflowsInternalAsync(cancellationToken).ToListAsync(cancellationToken);
public async Task<IEnumerable<IWorkflowBlueprint>> ListActiveAsync(CancellationToken cancellationToken) => await ListActiveWorkflowsAsync(cancellationToken).ToListAsync(cancellationToken);
public async Task<IEnumerable<IWorkflowBlueprint>> ListAsync(CancellationToken cancellationToken) => await ListInternalAsync(cancellationToken).ToListAsync(cancellationToken);
public async Task<IEnumerable<IWorkflowBlueprint>> ListActiveAsync(CancellationToken cancellationToken) => await ListActiveInternalAsync(cancellationToken).ToListAsync(cancellationToken);

public async Task<IWorkflowBlueprint?> GetAsync(string id, string? tenantId, VersionOptions version, CancellationToken cancellationToken) =>
await FindAsync(x => x.Id == id && x.TenantId == tenantId && x.WithVersion(version), cancellationToken);
Expand All @@ -36,27 +36,21 @@ public WorkflowRegistry(IEnumerable<IWorkflowProvider> workflowProviders, IWorkf
public async Task<IWorkflowBlueprint?> FindAsync(Func<IWorkflowBlueprint, bool> predicate, CancellationToken cancellationToken) =>
(await ListAsync(cancellationToken).Where(predicate).OrderByDescending(x => x.Version)).FirstOrDefault();

private async IAsyncEnumerable<IWorkflowBlueprint> ListActiveWorkflowsAsync([EnumeratorCancellation] CancellationToken cancellationToken)
private async IAsyncEnumerable<IWorkflowBlueprint> ListActiveInternalAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
var workflows = await ListAsync(cancellationToken);

foreach (var workflow in workflows)
{
// If a workflow is not published, only consider it for processing if it has at least one non-ended workflow instance.
if (!workflow.IsPublished && !await WorkflowHasNonFinishedWorkflowsAsync(workflow, cancellationToken))
if (!workflow.IsPublished && !await WorkflowHasUnfinishedWorkflowsAsync(workflow, cancellationToken))
continue;

yield return workflow;
}
}

private async Task<bool> WorkflowHasNonFinishedWorkflowsAsync(IWorkflowBlueprint workflowBlueprint, CancellationToken cancellationToken)
{
var count = await _workflowInstanceStore.CountAsync(new NonFinalizedWorkflowSpecification().WithWorkflowDefinition(workflowBlueprint.Id), cancellationToken);
return count > 0;
}

private async IAsyncEnumerable<IWorkflowBlueprint> GetWorkflowsInternalAsync([EnumeratorCancellation] CancellationToken cancellationToken)
private async IAsyncEnumerable<IWorkflowBlueprint> ListInternalAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
var providers = _workflowProviders;

Expand All @@ -66,5 +60,11 @@ await foreach (var workflow in provider.GetWorkflowsAsync(cancellationToken).Wit
yield return workflow;
}
}

private async Task<bool> WorkflowHasUnfinishedWorkflowsAsync(IWorkflowBlueprint workflowBlueprint, CancellationToken cancellationToken)
{
var count = await _workflowInstanceStore.CountAsync(new UnfinishedWorkflowSpecification().WithWorkflowDefinition(workflowBlueprint.Id), cancellationToken);
return count > 0;
}
}
}

0 comments on commit 6e2601e

Please sign in to comment.