Skip to content

Commit

Permalink
Fix WorkflowLaunchpad correlation locking
Browse files Browse the repository at this point in the history
This fixes an issue where there was a brief moment of opportunity for callers to create duplicate workflow instances.
It also fixes duplicate workflow instances by not only querying for suspended workflows, but any other "unfinished" state workflows
  • Loading branch information
sfmskywalker committed Jun 7, 2021
1 parent 6d0fc33 commit 26bf218
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Elsa.Activities.AzureServiceBus.Options;
using Elsa.Bookmarks;
using Elsa.Dispatch;
using Elsa.Exceptions;
using Elsa.Services;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
Expand Down Expand Up @@ -101,6 +102,10 @@ private async Task OnMessageReceived(Message message, CancellationToken cancella
{
_logger.LogDebug("Message received with ID {MessageId}", message.MessageId);
await TriggerWorkflowsAsync(message, CancellationToken.None);

if (ReceiverClient.IsClosedOrClosing)
throw new WorkflowException("Can't handle message with closed receiver");

await ReceiverClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Linq.Expressions;
using Elsa.Models;

namespace Elsa.Persistence.Specifications.WorkflowInstances
{
public class WorkflowFinishedStatusSpecification : Specification<WorkflowInstance>
{
public override Expression<Func<WorkflowInstance, bool>> ToExpression() => x => x.WorkflowStatus == WorkflowStatus.Cancelled || x.WorkflowStatus == WorkflowStatus.Faulted || x.WorkflowStatus == WorkflowStatus.Finished;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Linq.Expressions;
using Elsa.Models;

namespace Elsa.Persistence.Specifications.WorkflowInstances
{
public class WorkflowUnfinishedStatusSpecification : Specification<WorkflowInstance>
{
public override Expression<Func<WorkflowInstance, bool>> ToExpression() => x => x.WorkflowStatus == WorkflowStatus.Idle || x.WorkflowStatus == WorkflowStatus.Running || x.WorkflowStatus == WorkflowStatus.Suspended;
}
}
175 changes: 101 additions & 74 deletions src/core/Elsa.Core/Services/WorkflowLaunchpad.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,14 @@ public async Task<IEnumerable<PendingWorkflow>> CollectWorkflowsAsync(CollectWor

public async Task<IEnumerable<StartableWorkflow>> CollectStartableWorkflowsAsync(CollectWorkflowsContext context, CancellationToken cancellationToken = default)
{
_logger.LogDebug("Triggering workflows using {ActivityType}", context.ActivityType);

var filter = context.Trigger;
var triggers = filter != null ? (await _triggerFinder.FindTriggersAsync(context.ActivityType, filter, context.TenantId, cancellationToken)).ToList() : new List<TriggerFinderResult>();
var startableWorkflows = new List<StartableWorkflow>();

foreach (var trigger in triggers)
{
var workflowBlueprint = trigger.WorkflowBlueprint;
var startableWorkflow = await CollectStartableWorkflowAsync(workflowBlueprint, trigger.ActivityId, context.CorrelationId, context.ContextId, context.TenantId, cancellationToken);

if (startableWorkflow != null)
startableWorkflows.Add(startableWorkflow);
}

return startableWorkflows;
var correlationId = context.CorrelationId ?? Guid.NewGuid().ToString("N");
var updatedContext = context with {CorrelationId = correlationId};
await using var lockHandle = await AcquireLockAsync(correlationId, cancellationToken);
return await CollectStartableWorkflowsInternalAsync(updatedContext, cancellationToken);
}

public async Task<StartableWorkflow?> CollectStartableWorkflowAsync(string workflowDefinitionId, string? activityId, string? correlationId = default, string? contextId = default, string? tenantId = default, CancellationToken cancellationToken = default)
public async Task<StartableWorkflow?> CollectStartableWorkflowAsync(string workflowDefinitionId, string? activityId, string? correlationId = default, string? contextId = default, string? tenantId = default,
CancellationToken cancellationToken = default)
{
var workflowBlueprint = await _workflowRegistry.GetAsync(workflowDefinitionId, tenantId, VersionOptions.Published, cancellationToken);

Expand All @@ -104,51 +93,24 @@ public async Task<IEnumerable<StartableWorkflow>> CollectStartableWorkflowsAsync
return await CollectStartableWorkflowAsync(workflowBlueprint, activityId, correlationId, contextId, tenantId, cancellationToken);
}

public async Task<StartableWorkflow?> CollectStartableWorkflowAsync(IWorkflowBlueprint workflowBlueprint, string? activityId, string? correlationId = default, string? contextId = default, string? tenantId = default, CancellationToken cancellationToken = default)
public async Task<StartableWorkflow?> CollectStartableWorkflowAsync(
IWorkflowBlueprint workflowBlueprint,
string? activityId,
string? correlationId = default,
string? contextId = default,
string? tenantId = default,
CancellationToken cancellationToken = default)
{
var workflowDefinitionId = workflowBlueprint.Id;

if (!ValidatePreconditions(workflowDefinitionId, workflowBlueprint))
return null;

correlationId ??= Guid.NewGuid().ToString("N");

// Acquire a lock on correlation ID to prevent duplicate workflow instances from being created.
await using var correlationLockHandle = await AcquireLockAsync(correlationId, cancellationToken);

// Acquire a lock on the workflow definition so that we can ensure singleton-workflows never execute more than one instance.
var lockKey = $"execute-workflow-definition:tenant:{tenantId}:workflow-definition:{workflowDefinitionId}";
await using var workflowDefinitionHandle = await AcquireLockAsync(lockKey, cancellationToken);

if (workflowBlueprint.IsSingleton)
{
if (await GetWorkflowIsAlreadyExecutingAsync(tenantId, workflowDefinitionId))
{
_logger.LogDebug("Workflow {WorkflowDefinitionId} is a singleton workflow and is already running", workflowDefinitionId);
return null;
}
}

var startActivities = _getsStartActivities.GetStartActivities(workflowBlueprint).Select(x => x.Id).ToHashSet();
var startActivityId = activityId == null ? startActivities.FirstOrDefault() : startActivities.Contains(activityId) ? activityId : default;

if (startActivityId == null)
{
_logger.LogWarning("Cannot start workflow {WorkflowDefinitionId} with version {WorkflowDefinitionVersion} because it has no starting activities", workflowBlueprint.Id, workflowBlueprint.Version);
return null;
}

var workflowInstance = await _workflowFactory.InstantiateAsync(
workflowBlueprint,
correlationId,
contextId,
cancellationToken);

await _workflowInstanceStore.SaveAsync(workflowInstance, cancellationToken);
return new StartableWorkflow(workflowBlueprint, workflowInstance, startActivityId);
return await CollectStartableWorkflowInternalAsync(workflowBlueprint, activityId, correlationId, contextId, tenantId, cancellationToken);
}

public async Task CollectAndExecuteStartableWorkflowAsync(string workflowDefinitionId, string? activityId, string? correlationId = default, string? contextId = default, object? input = default, string? tenantId = default, CancellationToken cancellationToken = default)
public async Task CollectAndExecuteStartableWorkflowAsync(string workflowDefinitionId, string? activityId, string? correlationId = default, string? contextId = default, object? input = default, string? tenantId = default,
CancellationToken cancellationToken = default)
{
var workflowBlueprint = await _workflowRegistry.GetAsync(workflowDefinitionId, tenantId, VersionOptions.Published, cancellationToken);

Expand All @@ -161,7 +123,8 @@ public async Task CollectAndExecuteStartableWorkflowAsync(string workflowDefinit
await CollectAndExecuteStartableWorkflowAsync(workflowBlueprint, activityId, correlationId, contextId, input, cancellationToken);
}

public async Task<RunWorkflowResult> CollectAndExecuteStartableWorkflowAsync(IWorkflowBlueprint workflowBlueprint, string? activityId, string? correlationId = default, string? contextId = default, object? input = default, CancellationToken cancellationToken = default)
public async Task<RunWorkflowResult> CollectAndExecuteStartableWorkflowAsync(IWorkflowBlueprint workflowBlueprint, string? activityId, string? correlationId = default, string? contextId = default, object? input = default,
CancellationToken cancellationToken = default)
{
var workflowDefinitionId = workflowBlueprint.Id;
var tenantId = workflowBlueprint.TenantId;
Expand Down Expand Up @@ -195,7 +158,8 @@ public async Task DispatchPendingWorkflowsAsync(IEnumerable<PendingWorkflow> pen
public async Task DispatchPendingWorkflowAsync(PendingWorkflow pendingWorkflow, object? input, CancellationToken cancellationToken = default) =>
await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(pendingWorkflow.WorkflowInstanceId, pendingWorkflow.ActivityId, input), cancellationToken);

public Task DispatchPendingWorkflowAsync(string workflowInstanceId, string? activityId, object? input, CancellationToken cancellationToken = default) => DispatchPendingWorkflowAsync(new PendingWorkflow(workflowInstanceId, activityId), input, cancellationToken);
public Task DispatchPendingWorkflowAsync(string workflowInstanceId, string? activityId, object? input, CancellationToken cancellationToken = default) =>
DispatchPendingWorkflowAsync(new PendingWorkflow(workflowInstanceId, activityId), input, cancellationToken);

public async Task<RunWorkflowResult> ExecuteStartableWorkflowAsync(StartableWorkflow startableWorkflow, object? input, CancellationToken cancellationToken = default) =>
await _workflowRunner.RunWorkflowAsync(startableWorkflow.WorkflowBlueprint, startableWorkflow.WorkflowInstance, startableWorkflow.ActivityId, input, cancellationToken);
Expand Down Expand Up @@ -231,6 +195,71 @@ public async Task<IEnumerable<PendingWorkflow>> CollectResumableAndStartableWork

return pendingWorkflows;
}

private async Task<IEnumerable<StartableWorkflow>> CollectStartableWorkflowsInternalAsync(CollectWorkflowsContext context, CancellationToken cancellationToken = default)
{
_logger.LogDebug("Triggering workflows using {ActivityType}", context.ActivityType);

var filter = context.Trigger;
var triggers = filter != null ? (await _triggerFinder.FindTriggersAsync(context.ActivityType, filter, context.TenantId, cancellationToken)).ToList() : new List<TriggerFinderResult>();
var startableWorkflows = new List<StartableWorkflow>();

foreach (var trigger in triggers)
{
var workflowBlueprint = trigger.WorkflowBlueprint;
var startableWorkflow = await CollectStartableWorkflowInternalAsync(workflowBlueprint, trigger.ActivityId, context.CorrelationId!, context.ContextId, context.TenantId, cancellationToken);

if (startableWorkflow != null)
startableWorkflows.Add(startableWorkflow);
}

return startableWorkflows;
}

private async Task<StartableWorkflow?> CollectStartableWorkflowInternalAsync(
IWorkflowBlueprint workflowBlueprint,
string? activityId,
string correlationId,
string? contextId = default,
string? tenantId = default,
CancellationToken cancellationToken = default)
{
var workflowDefinitionId = workflowBlueprint.Id;

if (!ValidatePreconditions(workflowDefinitionId, workflowBlueprint))
return null;

// Acquire a lock on the workflow definition so that we can ensure singleton-workflows never execute more than one instance.
var lockKey = $"execute-workflow-definition:tenant:{tenantId}:workflow-definition:{workflowDefinitionId}";
await using var workflowDefinitionHandle = await AcquireLockAsync(lockKey, cancellationToken);

if (workflowBlueprint.IsSingleton)
{
if (await GetWorkflowIsAlreadyExecutingAsync(tenantId, workflowDefinitionId))
{
_logger.LogDebug("Workflow {WorkflowDefinitionId} is a singleton workflow and is already running", workflowDefinitionId);
return null;
}
}

var startActivities = _getsStartActivities.GetStartActivities(workflowBlueprint).Select(x => x.Id).ToHashSet();
var startActivityId = activityId == null ? startActivities.FirstOrDefault() : startActivities.Contains(activityId) ? activityId : default;

if (startActivityId == null)
{
_logger.LogWarning("Cannot start workflow {WorkflowDefinitionId} with version {WorkflowDefinitionVersion} because it has no starting activities", workflowBlueprint.Id, workflowBlueprint.Version);
return null;
}

var workflowInstance = await _workflowFactory.InstantiateAsync(
workflowBlueprint,
correlationId,
contextId,
cancellationToken);

await _workflowInstanceStore.SaveAsync(workflowInstance, cancellationToken);
return new StartableWorkflow(workflowBlueprint, workflowInstance, startActivityId);
}

private async Task<IEnumerable<PendingWorkflow>> CollectSpecificWorkflowInstanceAsync(CollectWorkflowsContext context, CancellationToken cancellationToken)
{
Expand All @@ -247,32 +276,30 @@ private async Task<IEnumerable<PendingWorkflow>> CollectResumableOrStartableCorr
var correlationId = context.CorrelationId!;
var lockKey = correlationId;

await using (var handle = await AcquireLockAsync(lockKey, cancellationToken))
{
var correlatedWorkflowInstanceCount = !string.IsNullOrWhiteSpace(correlationId)
? await _workflowInstanceStore.CountAsync(new CorrelationIdSpecification<WorkflowInstance>(correlationId).WithStatus(WorkflowStatus.Suspended), cancellationToken)
: 0;
await using var handle = await AcquireLockAsync(lockKey, cancellationToken);
var correlatedWorkflowInstanceCount = !string.IsNullOrWhiteSpace(correlationId)
? await _workflowInstanceStore.CountAsync(new CorrelationIdSpecification<WorkflowInstance>(correlationId).And(new WorkflowUnfinishedStatusSpecification()), cancellationToken)
: 0;

_logger.LogDebug("Found {CorrelatedWorkflowCount} workflows with correlation ID {CorrelationId}", correlatedWorkflowInstanceCount, correlationId);
_logger.LogDebug("Found {CorrelatedWorkflowCount} workflows with correlation ID {CorrelationId}", correlatedWorkflowInstanceCount, correlationId);

if (correlatedWorkflowInstanceCount > 0)
{
var bookmarkResults = context.Bookmark != null
? await _bookmarkFinder.FindBookmarksAsync(context.ActivityType, context.Bookmark, correlationId, context.TenantId, cancellationToken).ToList()
: new List<BookmarkFinderResult>();
_logger.LogDebug("Found {BookmarkCount} bookmarks for activity type {ActivityType}", bookmarkResults.Count, context.ActivityType);
return bookmarkResults.Select(x => new PendingWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList();
}
} // This ensures the lock handle is released before calling the next line (CollectStartableWorkflowsAsync), which also acquires a lock on the correlation ID.
if (correlatedWorkflowInstanceCount > 0)
{
var bookmarkResults = context.Bookmark != null
? await _bookmarkFinder.FindBookmarksAsync(context.ActivityType, context.Bookmark, correlationId, context.TenantId, cancellationToken).ToList()
: new List<BookmarkFinderResult>();
_logger.LogDebug("Found {BookmarkCount} bookmarks for activity type {ActivityType}", bookmarkResults.Count, context.ActivityType);
return bookmarkResults.Select(x => new PendingWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList();
}

var startableWorkflows = await CollectStartableWorkflowsAsync(context, cancellationToken);
var startableWorkflows = await CollectStartableWorkflowsInternalAsync(context, cancellationToken);
return startableWorkflows.Select(x => new PendingWorkflow(x.WorkflowInstance.Id, x.ActivityId)).ToList();
}

private async Task<IDistributedSynchronizationHandle> AcquireLockAsync(string resource, CancellationToken cancellationToken)
{
var handle = await _distributedLockProvider.AcquireLockAsync(resource, _elsaOptions.DistributedLockTimeout, cancellationToken);

if (handle == null)
throw new LockAcquisitionException($"Failed to acquire a lock on {resource}");

Expand Down

0 comments on commit 26bf218

Please sign in to comment.