Skip to content

Commit

Permalink
Remove redundant Trigger parameter from workflow selection
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Jul 12, 2021
1 parent 0850f0a commit cbfb21c
Show file tree
Hide file tree
Showing 24 changed files with 39 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ private async Task TriggerWorkflowsAsync(Message message, CancellationToken canc
};

var bookmark = CreateBookmark(message);
var trigger = CreateTrigger(message);
var launchContext = new CollectWorkflowsContext(ActivityType, bookmark, trigger, correlationId);
var launchContext = new CollectWorkflowsContext(ActivityType, bookmark, correlationId);

await _workflowLaunchpad.UseServiceAsync(service => service.CollectAndDispatchWorkflowsAsync(launchContext, model, cancellationToken));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public class Dispatch : ControllerBase
public async Task<IActionResult> Handle(string eventName, EventModel model)
{
var bookmark = new EventBookmark(eventName.ToLowerInvariant());
var trigger = new EventBookmark(eventName.ToLowerInvariant());
var context = new CollectWorkflowsContext(nameof(EventReceived), bookmark, trigger, model.CorrelationId, model.WorkflowInstanceId);
var context = new CollectWorkflowsContext(nameof(EventReceived), bookmark, model.CorrelationId, model.WorkflowInstanceId);
var pendingWorkflows = await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(context, model);

return Accepted(pendingWorkflows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public class Execute : ControllerBase
public async Task<IActionResult> Handle(string eventName, EventModel model)
{
var bookmark = new EventBookmark(eventName);
var trigger = new EventBookmark(eventName);
var context = new CollectWorkflowsContext(nameof(EventReceived), bookmark, trigger, model.CorrelationId, model.WorkflowInstanceId);
var context = new CollectWorkflowsContext(nameof(EventReceived), bookmark, model.CorrelationId, model.WorkflowInstanceId);
var pendingWorkflows = await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(context, model);

return Accepted(pendingWorkflows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public class Dispatch : ControllerBase
public async Task<IActionResult> Handle(string taskName, TaskResultModel model)
{
var bookmark = new TaskBookmark(taskName.ToLowerInvariant());
var trigger = new TaskBookmark(taskName.ToLowerInvariant());
var context = new CollectWorkflowsContext(nameof(RunTask), bookmark, trigger, model.CorrelationId, model.WorkflowInstanceId);
var context = new CollectWorkflowsContext(nameof(RunTask), bookmark, model.CorrelationId, model.WorkflowInstanceId);
var pendingWorkflows = await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(context, model);

return Accepted(pendingWorkflows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ public class Execute : ControllerBase
public async Task<IActionResult> Handle(string taskName, TaskResultModel model)
{
var bookmark = new TaskBookmark(taskName.ToLowerInvariant());
var trigger = new TaskBookmark(taskName.ToLowerInvariant());
var context = new CollectWorkflowsContext(nameof(RunTask), bookmark, trigger, model.CorrelationId, model.WorkflowInstanceId);
var context = new CollectWorkflowsContext(nameof(RunTask), bookmark, model.CorrelationId, model.WorkflowInstanceId);
var pendingWorkflows = await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(context, model);

return Accepted(pendingWorkflows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ namespace Elsa.Activities.Entity.Bookmarks
{
public class EntityChangedBookmark : IBookmark
{
public EntityChangedBookmark(string? entityName, EntityChangedAction? action, string? contextId, string? correlationId)
public EntityChangedBookmark(string? entityName, EntityChangedAction? action, string? contextId)
{
EntityName = entityName;
Action = action;
ContextId = contextId;
CorrelationId = correlationId;
}

public string? EntityName { get; }
public EntityChangedAction? Action { get; }
public string? ContextId { get; }
public string? CorrelationId { get; }
}

public class EntityChangedWorkflowTriggerProvider : BookmarkProvider<EntityChangedBookmark, EntityChanged>
Expand All @@ -27,10 +25,9 @@ public class EntityChangedWorkflowTriggerProvider : BookmarkProvider<EntityChang
new[]
{
Result(new EntityChangedBookmark(
entityName: await context.ReadActivityPropertyAsync(x => x.EntityName, cancellationToken),
action: await context.ReadActivityPropertyAsync(x => x.Action, cancellationToken),
contextId: context.ActivityExecutionContext.WorkflowExecutionContext.WorkflowInstance.ContextId,
correlationId: context.ActivityExecutionContext.WorkflowExecutionContext.WorkflowInstance.CorrelationId
await context.ReadActivityPropertyAsync(x => x.EntityName, cancellationToken),
await context.ReadActivityPropertyAsync(x => x.Action, cancellationToken),
context.ActivityExecutionContext.WorkflowExecutionContext.WorkflowInstance.ContextId
))
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Elsa.Activities.Entity.Extensions
{
public static class WorkflowRunnerExtensions
{
// TODO: Design multi-tenancy
// TODO: Design multi-tenancy.
private const string? TenantId = default;

public static async Task TriggerEntityChangedWorkflowsAsync(
Expand All @@ -23,21 +23,13 @@ public static class WorkflowRunnerExtensions
const string activityType = nameof(EntityChanged);
var input = new EntityChangedContext(entityId, entityName, changedAction);

var trigger = new EntityChangedBookmark(
entityName,
changedAction,
contextId,
null
);

var bookmark = new EntityChangedBookmark(
entityName,
changedAction,
contextId,
correlationId
contextId
);

await workflowDispatcher.DispatchAsync(new TriggerWorkflowsRequest(activityType, bookmark, trigger, input, correlationId, default, contextId, TenantId), cancellationToken);
await workflowDispatcher.DispatchAsync(new TriggerWorkflowsRequest(activityType, bookmark, input, correlationId, default, contextId, TenantId), cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public class HttpEndpointMiddleware
request.TryGetCorrelationId(out var correlationId);

const string activityType = nameof(HttpEndpoint);
var trigger = new HttpEndpointBookmark(path, method);
var bookmark = new HttpEndpointBookmark(path, method);
var collectWorkflowsContext = new CollectWorkflowsContext(activityType, bookmark, trigger, correlationId, default, default, TenantId);
var collectWorkflowsContext = new CollectWorkflowsContext(activityType, bookmark, correlationId, default, default, TenantId);
var pendingWorkflows = await workflowLaunchpad.CollectWorkflowsAsync(collectWorkflowsContext, cancellationToken).ToList();

if (!pendingWorkflows.Any())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Elsa.Activities.MassTransit.Bookmarks
public class MessageReceivedBookmark : IBookmark
{
public string MessageType { get; set; } = default!;
public string? CorrelationId { get; set; }
}

public class MessageReceivedTriggerProvider : BookmarkProvider<MessageReceivedBookmark, ReceiveMassTransitMessage>
Expand All @@ -18,8 +17,7 @@ public class MessageReceivedTriggerProvider : BookmarkProvider<MessageReceivedBo
{
Result(new MessageReceivedBookmark
{
MessageType = (await context.ReadActivityPropertyAsync(x => x.MessageType, cancellationToken))!.Name,
CorrelationId = context.ActivityExecutionContext.WorkflowExecutionContext.CorrelationId
MessageType = (await context.ReadActivityPropertyAsync(x => x.MessageType, cancellationToken))!.Name
})
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,13 @@ public async Task Consume(ConsumeContext<T> context)
break;

var bookmark = new MessageReceivedBookmark
{
MessageType = message.GetType().Name,
CorrelationId = correlationId.ToString()
};
var trigger = new MessageReceivedBookmark
{
MessageType = message.GetType().Name
};

await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new CollectWorkflowsContext(
nameof(ReceiveMassTransitMessage),
bookmark,
trigger,
correlationId.ToString()
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Elsa.Activities.Rebus.Bookmarks
public class MessageReceivedBookmark : IBookmark
{
public string MessageType { get; set; } = default!;
public string? CorrelationId { get; set; }
}

public class MessageReceivedTriggerProvider : BookmarkProvider<MessageReceivedBookmark, RebusMessageReceived>
Expand All @@ -18,8 +17,7 @@ public class MessageReceivedTriggerProvider : BookmarkProvider<MessageReceivedBo
{
Result(new MessageReceivedBookmark
{
MessageType = (await context.ReadActivityPropertyAsync(x => x.MessageType, cancellationToken))!.Name,
CorrelationId = context.ActivityExecutionContext.WorkflowExecutionContext.CorrelationId
MessageType = (await context.ReadActivityPropertyAsync(x => x.MessageType, cancellationToken))!.Name
})
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public async Task Handle(T message)
var correlationId = MessageContext.Current.TransportMessage.Headers.GetValueOrNull(Headers.CorrelationId);
await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new CollectWorkflowsContext(
nameof(RebusMessageReceived),
new MessageReceivedBookmark { MessageType = message.GetType().Name, CorrelationId = correlationId },
new MessageReceivedBookmark { MessageType = message.GetType().Name },
correlationId,
default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ public async Task Handle(TelnyxWebhookReceived notification, CancellationToken c
return;

var correlationId = GetCorrelationId(receivedPayload);
var trigger = CreateBookmark();
var bookmark = CreateBookmark();
var context = new CollectWorkflowsContext(ActivityTypeName, bookmark, trigger, correlationId);
var context = new CollectWorkflowsContext(ActivityTypeName, bookmark, correlationId);
await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(context, receivedPayload, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public async Task Handle(TelnyxWebhookReceived notification, CancellationToken c

var correlationId = GetCorrelationId(payload);
var bookmark = new NotificationBookmark(eventType);
var trigger = new NotificationBookmark(eventType);
var context = new CollectWorkflowsContext(activityType, bookmark, trigger, correlationId);
var context = new CollectWorkflowsContext(activityType, bookmark, correlationId);

await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(context, webhook, cancellationToken);
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Elsa.Abstractions/Services/Dispatch/Models.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Elsa.Services
{
public record TriggerWorkflowsRequest(string ActivityType, IBookmark Bookmark, IBookmark Trigger, object? Input = default, string? CorrelationId = default, string? WorkflowInstanceId = default, string? ContextId = default, string? TenantId = default);
public record TriggerWorkflowsRequest(string ActivityType, IBookmark Bookmark, object? Input = default, string? CorrelationId = default, string? WorkflowInstanceId = default, string? ContextId = default, string? TenantId = default);

public record ExecuteWorkflowDefinitionRequest(string WorkflowDefinitionId, string? ActivityId = default, object? Input = default, string? CorrelationId = default, string? ContextId = default, string? TenantId = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public interface IWorkflowLaunchpad
Task<IEnumerable<CollectedWorkflow>> CollectAndDispatchWorkflowsAsync(CollectWorkflowsContext context, object? input = default, CancellationToken cancellationToken = default);
}

public record CollectWorkflowsContext(string ActivityType, IBookmark? Bookmark, IBookmark? Trigger = default, string? CorrelationId = default, string? WorkflowInstanceId = default, string? ContextId = default, string? TenantId = default);
public record CollectWorkflowsContext(string ActivityType, IBookmark? Bookmark, string? CorrelationId = default, string? WorkflowInstanceId = default, string? ContextId = default, string? TenantId = default);

public record CollectStartableWorkflowsContext(string WorkflowDefinitionId, string? ActivityId = default, string? CorrelationId = default, string? ContextId = default, string? TenantId = default);
}
28 changes: 15 additions & 13 deletions src/core/Elsa.Core/Activities/Signaling/Services/Signaler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string sign
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new CollectWorkflowsContext(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = normalizedSignal },
new SignalReceivedBookmark { Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
Expand All @@ -53,17 +52,20 @@ public async Task<IEnumerable<CollectedWorkflow>> DispatchSignalTokenAsync(strin
return await DispatchSignalAsync(signal.Name, input, signal.WorkflowInstanceId, cancellationToken: cancellationToken);
}

public async Task<IEnumerable<CollectedWorkflow>> DispatchSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default) =>
await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(new CollectWorkflowsContext(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = signal },
new SignalReceivedBookmark { Signal = signal },
correlationId,
workflowInstanceId,
default,
TenantId
),
new Signal(signal, input),
cancellationToken);
public async Task<IEnumerable<CollectedWorkflow>> DispatchSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
var normalizedSignal = signal.ToLowerInvariant();

return await _workflowLaunchpad.CollectAndDispatchWorkflowsAsync(new CollectWorkflowsContext(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
),
new Signal(normalizedSignal, input),
cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ public async Task Handle(TriggerWorkflowsRequest message)
{
var pendingWorkflows = await _workflowLaunchpad.CollectWorkflowsAsync(new CollectWorkflowsContext(
message.ActivityType,
message.Bookmark,
message.Trigger,
message.Bookmark,
message.CorrelationId,
message.WorkflowInstanceId,
message.ContextId,
Expand Down
2 changes: 1 addition & 1 deletion src/core/Elsa.Core/Services/Workflows/WorkflowLaunchpad.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private async Task<IEnumerable<StartableWorkflow>> CollectStartableWorkflowsInte
{
_logger.LogDebug("Triggering workflows using {ActivityType}", context.ActivityType);

var filter = context.Trigger ?? context.Bookmark;
var filter = context.Bookmark;
var triggers = filter != null ? (await _triggerFinder.FindTriggersAsync(context.ActivityType, filter, context.TenantId, cancellationToken)).ToList() : new List<TriggerFinderResult>();
var startableWorkflows = new List<StartableWorkflow>();

Expand Down
2 changes: 1 addition & 1 deletion src/server/Elsa.Server.Api/Endpoints/Workflows/Models.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public record TriggeredWorkflowModel(string WorkflowInstanceId, string? Activity
public record DispatchTriggerWorkflowsRequestModel(string ActivityType, IBookmark? Bookmark, IBookmark? Trigger, string? CorrelationId, string? WorkflowInstanceId, string? ContextId, object? Input);

public record DispatchTriggerWorkflowsResponseModel(ICollection<CollectedWorkflow> PendingWorkflows);
public record TriggerWorkflowsRequestModel(string ActivityType, IBookmark? Bookmark, IBookmark? Trigger, string? CorrelationId, string? WorkflowInstanceId, string? ContextId, object? Input, bool Dispatch);
public record TriggerWorkflowsRequestModel(string ActivityType, IBookmark? Bookmark, string? CorrelationId, string? WorkflowInstanceId, string? ContextId, object? Input, bool Dispatch);

public record TriggerWorkflowsResponseModel(ICollection<TriggeredWorkflowModel> TriggeredWorkflows);
}
2 changes: 1 addition & 1 deletion src/server/Elsa.Server.Api/Endpoints/Workflows/Trigger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public Trigger(IWorkflowLaunchpad workflowLaunchpad,ITenantAccessor tenantAccess
public async Task<IActionResult> Handle(TriggerWorkflowsRequestModel request, CancellationToken cancellationToken = default)
{
var tenantId = await _tenantAccessor.GetTenantIdAsync(cancellationToken);
var context = new CollectWorkflowsContext(request.ActivityType, request.Bookmark, request.Trigger, request.CorrelationId, request.WorkflowInstanceId, request.ContextId, tenantId);
var context = new CollectWorkflowsContext(request.ActivityType, request.Bookmark, request.CorrelationId, request.WorkflowInstanceId, request.ContextId, tenantId);
ICollection<TriggeredWorkflowModel> triggeredWorkflows;

if (request.Dispatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class CorrelatedWorkflowDefinitionJob
public async Task ExecuteAsync(TriggerWorkflowsRequest request, CancellationToken cancellationToken = default) => await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new CollectWorkflowsContext(
request.ActivityType,
request.Bookmark,
request.Trigger,
request.CorrelationId,
request.WorkflowInstanceId,
request.ContextId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class CorrelatedWorkflowDefinitionGrain : Grain, ICorrelatedWorkflowGrain
public async Task ExecutedCorrelatedWorkflowAsync(TriggerWorkflowsRequest request, CancellationToken cancellationToken = default) => await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new CollectWorkflowsContext(
request.ActivityType,
request.Bookmark,
request.Trigger,
request.CorrelationId,
request.WorkflowInstanceId,
request.ContextId,
Expand Down

0 comments on commit cbfb21c

Please sign in to comment.