Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Default Workflow Runtime Race condition #4362

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ public class ActivityExecutionRecord : Entity
/// <summary>
/// The state of the activity at the time this record is created or last updated.
/// </summary>
public IDictionary<string, object>? ActivityState { get; set; }
public IDictionary<string, object?>? ActivityState { get; set; }

/// <summary>
/// Any additional payload associated with the log record.
/// </summary>
public IDictionary<string, object>? Payload { get; set; }
public IDictionary<string, object?>? Payload { get; set; }

/// <summary>
/// Any outputs provided by the activity.
/// </summary>
public IDictionary<string, object>? Outputs { get; set; }
public IDictionary<string, object?>? Outputs { get; set; }

/// <summary>
/// Gets or sets the exception that occurred during the activity execution.
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/AnswerCallBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ protected AnswerCallBase(string? source = default, int? line = default) : base(s
/// The call control ID to answer. Leave blank when the workflow is driven by an incoming call and you wish to pick up that one.
/// </summary>
[Input(DisplayName = "Call Control ID", Description = "The call control ID of the call to answer.", Category = "Advanced")]
public Input<string?>? CallControlId { get; set; }
public Input<string> CallControlId { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);

var request = new AnswerCallRequest
{
Expand Down
12 changes: 6 additions & 6 deletions src/modules/Elsa.Telnyx/Activities/BridgeCallsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ protected BridgeCallsBase(string? source = default, int? line = default) : base(
/// The source call control ID of one of the call to bridge with. Leave empty to use the ambient inbound call control Id, if there is one.
/// </summary>
[Input(DisplayName = "Call Control ID A", Description = "The source call control ID of one of the call to bridge with. Leave empty to use the ambient inbound call control Id, if there is one.")]
public Input<string?>? CallControlIdA { get; set; }
public Input<string> CallControlIdA { get; set; } = default!;

/// <summary>
/// The destination call control ID of the call you want to bridge with.
/// </summary>
[Input(DisplayName = "Call Control ID B", Description = "The destination call control ID of the call you want to bridge with.")]
public Input<string?>? CallControlIdB { get; set; }
public Input<string> CallControlIdB { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var callControlIdA = context.GetPrimaryCallControlId(CallControlIdA) ?? throw new Exception("CallControlA is required");
var callControlIdB = context.GetSecondaryCallControlId(CallControlIdB) ?? throw new Exception("CallControlB is required");
var callControlIdA = CallControlIdA.Get(context);
var callControlIdB = CallControlIdB.Get(context);
var request = new BridgeCallsRequest(callControlIdB, ClientState: context.CreateCorrelatingClientState(context.Id));
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

Expand Down Expand Up @@ -80,8 +80,8 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
private async ValueTask ResumeAsync(ActivityExecutionContext context)
{
var payload = context.GetInput<CallBridgedPayload>()!;
var callControlIdA = context.GetPrimaryCallControlId(CallControlIdA);
var callControlIdB = context.GetSecondaryCallControlId(CallControlIdB);
var callControlIdA = CallControlIdA.Get(context);
var callControlIdB = CallControlIdB.Get(context);;

if (payload.CallControlId == callControlIdA) context.SetProperty("CallBridgedPayloadA", payload);
if (payload.CallControlId == callControlIdB) context.SetProperty("CallBridgedPayloadB", payload);
Expand Down
6 changes: 3 additions & 3 deletions src/modules/Elsa.Telnyx/Activities/GatherUsingAudio.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public GatherUsingAudio([CallerFilePath] string? source = default, [CallerLineNu
/// The call control ID of the call from which to gather input. Leave empty to use the ambient call control ID, if there is any.
/// </summary>
[Input(DisplayName = "Call Control ID", Description = "The call control ID of the call from which to gather input. Leave empty to use the ambient call control ID, if there is any.", Category = "Advanced")]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// The URL of a file to be played back at the beginning of each prompt. The URL can point to either a WAV or MP3 file.
Expand Down Expand Up @@ -127,7 +127,7 @@ public async ValueTask BookmarksPersistedAsync(ActivityExecutionContext context)
ValidDigits.Get(context).EmptyToNull()
);

var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand All @@ -144,7 +144,7 @@ public async ValueTask BookmarksPersistedAsync(ActivityExecutionContext context)
/// <inheritdoc />
protected override void Execute(ActivityExecutionContext context)
{
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required");
var callControlId = CallControlId.Get(context);
context.CreateBookmark(new WebhookEventBookmarkPayload(WebhookEventTypes.CallGatherEnded, callControlId), ResumeAsync);
}

Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/GatherUsingSpeak.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public GatherUsingSpeak([CallerFilePath] string? source = default, [CallerLineNu
/// The call control ID of the call from which to gather input. Leave empty to use the ambient call control ID, if there is any.
/// </summary>
[Input(DisplayName = "Call Control ID", Description = "The call control ID of the call from which to gather input. Leave empty to use the ambient call control ID, if there is any.", Category = "Advanced")]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// The language you want spoken.
Expand Down Expand Up @@ -144,7 +144,7 @@ public GatherUsingSpeak([CallerFilePath] string? source = default, [CallerLineNu
/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required");
var callControlId = CallControlId.Get(context);

var request = new GatherUsingSpeakRequest(
Language.Get(context) ?? throw new Exception("Language is required."),
Expand Down
5 changes: 3 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/HangupCallBase.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Extensions;
using Elsa.Telnyx.Client.Models;
using Elsa.Telnyx.Client.Services;
using Elsa.Telnyx.Extensions;
Expand All @@ -23,12 +24,12 @@ protected HangupCallBase(string? source = default, int? line = default) : base(s
/// Unique identifier and token for controlling the call.
/// </summary>
[Input(DisplayName = "Call Control ID", Description = "Unique identifier and token for controlling the call.", Category = "Advanced")]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);
var request = new HangupCallRequest(ClientState: context.CreateCorrelatingClientState());
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

Expand Down
16 changes: 4 additions & 12 deletions src/modules/Elsa.Telnyx/Activities/IncomingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class IncomingCall : Trigger<CallInitiatedPayload>
public IncomingCall([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
{
}

/// <summary>
/// A list of destination numbers to respond to.
/// </summary>
Expand Down Expand Up @@ -67,26 +67,18 @@ private async ValueTask ResumeAsync(ActivityExecutionContext context)
var webhookModel = context.GetInput<TelnyxWebhook>(WebhookSerializerOptions.Create());
var callInitiatedPayload = (CallInitiatedPayload)webhookModel.Data.Payload;

// Correlate workflow with call session ID.
// TODO: Add support for multiple correlation ID keys.
context.WorkflowExecutionContext.CorrelationId = callInitiatedPayload.CallSessionId;

// Associate workflow with inbound call control ID and from number.
context.SetPrimaryCallControlId(callInitiatedPayload.CallControlId);
context.SetFrom(callInitiatedPayload.From);

// Store webhook payload as output.
context.Set(Result, callInitiatedPayload);
Result.Set(context, callInitiatedPayload);

await context.CompleteActivityAsync();
}

private IEnumerable<object> GetBookmarkPayloads(ExpressionExecutionContext context)
{
var from = context.Get(From) ?? ArraySegment<string>.Empty;
var to = context.Get(To) ?? ArraySegment<string>.Empty;
var catchAll = context.Get(CatchAll);

foreach (var phoneNumber in from) yield return new IncomingCallFromBookmarkPayload(phoneNumber);
foreach (var phoneNumber in to) yield return new IncomingCallToBookmarkPayload(phoneNumber);

Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/PlayAudioBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected PlayAudioBase([CallerFilePath] string? source = default, [CallerLineNu
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?>? CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// The URL of a file to be played back at the beginning of each prompt. The URL can point to either a WAV or MP3 file.
Expand Down Expand Up @@ -89,7 +89,7 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
ClientState: context.CreateCorrelatingClientState(context.Id)
);

var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/SpeakTextBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected SpeakTextBase(string? source = default, int? line = default) : base(so
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// The language you want spoken.
Expand Down Expand Up @@ -97,7 +97,7 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
ClientState: context.CreateCorrelatingClientState(context.Id)
);

var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/StartRecordingBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected StartRecordingBase(string? source = default, int? line = default) : ba
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// When 'dual', final audio file will be stereo recorded with the first leg on channel A, and the rest on channel B.
Expand Down Expand Up @@ -72,7 +72,7 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
ClientState: context.CreateCorrelatingClientState()
);

var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/StopAudioPlaybackBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected StopAudioPlaybackBase(string? source = default, int? line = default) :
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// Use 'current' to stop only the current audio or 'all' to stop all audios in the queue.
Expand All @@ -44,7 +44,7 @@ protected StopAudioPlaybackBase(string? source = default, int? line = default) :
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var request = new StopAudioPlaybackRequest(Stop.Get(context), context.CreateCorrelatingClientState(context.Id));
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/StopRecording.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public StopRecording([CallerFilePath] string? source = default, [CallerLineNumbe
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var request = new StopRecordingRequest(context.CreateCorrelatingClientState());
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required");
var callControlId = CallControlId.Get(context);
var telnyxClient = context.GetRequiredService<ITelnyxClient>();

try
Expand Down
4 changes: 2 additions & 2 deletions src/modules/Elsa.Telnyx/Activities/TransferCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public TransferCall([CallerFilePath] string? source = default, [CallerLineNumber
Description = "Unique identifier and token for controlling the call.",
Category = "Advanced"
)]
public Input<string?> CallControlId { get; set; } = default!;
public Input<string> CallControlId { get; set; } = default!;

/// <summary>
/// The DID or SIP URI to dial out and bridge to the given call.
Expand Down Expand Up @@ -129,7 +129,7 @@ private async ValueTask HangupAsync(ActivityExecutionContext context)

private async ValueTask TransferCallAsync(ActivityExecutionContext context)
{
var callControlId = context.GetPrimaryCallControlId(CallControlId) ?? throw new Exception("CallControlId is required.");
var callControlId = CallControlId.Get(context);

var request = new TransferCallRequest(
To.Get(context) ?? throw new Exception("To is required."),
Expand Down
29 changes: 7 additions & 22 deletions src/modules/Elsa.Telnyx/Extensions/ActivityExecutionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Elsa.Extensions;
using Elsa.Telnyx.Models;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Models;

namespace Elsa.Telnyx.Extensions;

Expand All @@ -10,24 +8,11 @@ namespace Elsa.Telnyx.Extensions;
/// </summary>
public static class ActivityExecutionExtensions
{
private const string PrimaryCallControlIdKey = "telnyx:primary-callcontrol-id";
private const string SecondaryCallControlIdKey = "telnyx:secondary-callcontrol-id";
private const string FromKey = "telnyx:from";

public static void SetPrimaryCallControlId(this ActivityExecutionContext context, string value) => context.WorkflowExecutionContext.SetProperty(PrimaryCallControlIdKey, value);
public static string? GetPrimaryCallControlId(this ActivityExecutionContext context) => context.WorkflowExecutionContext.GetProperty<string>(PrimaryCallControlIdKey);
public static string? GetPrimaryCallControlId(this ActivityExecutionContext context, string? callControlId) => string.IsNullOrWhiteSpace(callControlId) ? context.GetPrimaryCallControlId() : callControlId;
public static string? GetPrimaryCallControlId(this ActivityExecutionContext context, Input<string?>? callControlId) => context.GetPrimaryCallControlId(callControlId.GetOrDefault(context));
public static bool HasPrimaryCallControlId(this ActivityExecutionContext context) => context.WorkflowExecutionContext.HasProperty(PrimaryCallControlIdKey);

public static void SetSecondaryCallControlId(this ActivityExecutionContext context, string value) => context.WorkflowExecutionContext.SetProperty(SecondaryCallControlIdKey, value);
public static string? GetSecondaryCallControlId(this ActivityExecutionContext context) => context.WorkflowExecutionContext.GetProperty<string>(SecondaryCallControlIdKey);
public static string? GetSecondaryCallControlId(this ActivityExecutionContext context, string? callControlId) => string.IsNullOrWhiteSpace(callControlId) ? context.GetSecondaryCallControlId() : callControlId;
public static string? GetSecondaryCallControlId(this ActivityExecutionContext context, Input<string?>? callControlId) => context.GetSecondaryCallControlId(callControlId.Get(context));
public static bool HasSecondaryCallControlId(this ActivityExecutionContext context) => context.WorkflowExecutionContext.HasProperty(SecondaryCallControlIdKey);
public static string CreateCorrelatingClientState(this ActivityExecutionContext context, string? activityInstanceId = default) => new ClientStatePayload(context.WorkflowExecutionContext.CorrelationId!, activityInstanceId).ToBase64();

public static void SetFrom(this ActivityExecutionContext context, string value) => context.WorkflowExecutionContext.SetProperty(FromKey, value);
public static string? GetFrom(this ActivityExecutionContext context) => context.WorkflowExecutionContext.GetProperty<string>(FromKey);
public static bool HasFrom(this ActivityExecutionContext context) => context.WorkflowExecutionContext.HasProperty(FromKey);
/// <summary>
/// Creates a correlating client state.
/// </summary>
public static string CreateCorrelatingClientState(this ActivityExecutionContext context, string? activityInstanceId = default)
{
return new ClientStatePayload(context.WorkflowExecutionContext.Id, activityInstanceId).ToBase64();
}
}
Loading
Loading