Skip to content

Commit

Permalink
Add workflow update RPC timeout/canceled exception (temporalio#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Jun 21, 2024
1 parent 6f54944 commit 8b430c4
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 31 deletions.
14 changes: 12 additions & 2 deletions src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,18 @@ public override async Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInp
UpdateWorkflowExecutionResponse resp;
do
{
resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
try
{
resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
}
catch (Exception e) when (e is OperationCanceledException ||
(e is RpcException rpcErr && (
rpcErr.Code == RpcException.StatusCode.DeadlineExceeded ||
rpcErr.Code == RpcException.StatusCode.Cancelled)))
{
throw new WorkflowUpdateRpcTimeoutOrCanceledException(e);
}
}
while (resp.Stage < req.WaitPolicy.LifecycleStage &&
resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted);
Expand Down
5 changes: 2 additions & 3 deletions src/Temporalio/Client/WorkflowHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,9 @@ public record WorkflowHandle(
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync(
public async Task<WorkflowUpdateHandle> StartUpdateAsync(
string update, IReadOnlyCollection<object?> args, WorkflowUpdateStartOptions options) =>
StartUpdateAsync<ValueTuple>(update, args, options).ContinueWith<WorkflowUpdateHandle>(
t => t.Result, TaskScheduler.Current);
await StartUpdateAsync<ValueTuple>(update, args, options).ConfigureAwait(false);

/// <summary>
/// Start a workflow update using its name.
Expand Down
24 changes: 17 additions & 7 deletions src/Temporalio/Client/WorkflowUpdateHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,25 @@ internal async Task PollUntilOutcomeAsync(RpcOptions? rpcOptions = null)
// Continually retry to poll while we get an empty response
while (KnownOutcome == null)
{
var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(
req, rpcOptions).ConfigureAwait(false);
try
{
var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(
req, rpcOptions).ConfigureAwait(false);
#pragma warning disable CA1508
// .NET incorrectly assumes KnownOutcome cannot be null here because they assume a
// single thread. We accept there is technically a race condition here since this is
// not an atomic CAS operation, but outcome is the same server side for the same
// update.
KnownOutcome ??= resp.Outcome;
// .NET incorrectly assumes KnownOutcome cannot be null here because they assume
// a single thread. We accept there is technically a race condition here since
// this is not an atomic CAS operation, but outcome is the same server side for
// the same update.
KnownOutcome ??= resp.Outcome;
#pragma warning restore CA1508
}
catch (Exception e) when (e is OperationCanceledException ||
(e is RpcException rpcErr && (
rpcErr.Code == RpcException.StatusCode.DeadlineExceeded ||
rpcErr.Code == RpcException.StatusCode.Cancelled)))
{
throw new WorkflowUpdateRpcTimeoutOrCanceledException(e);
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/Temporalio/Exceptions/RpcTimeoutOrCanceledException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace Temporalio.Exceptions
{
/// <summary>
/// Exception representing timeout or cancellation on some client calls.
/// </summary>
public class RpcTimeoutOrCanceledException : TemporalException
{
/// <summary>
/// Initializes a new instance of the <see cref="RpcTimeoutOrCanceledException"/> class.
/// </summary>
/// <param name="message">Message.</param>
/// <param name="inner">Cause of the exception.</param>
public RpcTimeoutOrCanceledException(string message, Exception? inner)
: base(message, inner)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;

namespace Temporalio.Exceptions
{
/// <summary>
/// Exception representing when an update RPC timeout or cancellation. This is not to be
/// confused with an update itself timing out or being canceled, this is just the call.
/// </summary>
public class WorkflowUpdateRpcTimeoutOrCanceledException : RpcTimeoutOrCanceledException
{
/// <summary>
/// Initializes a new instance of the
/// <see cref="WorkflowUpdateRpcTimeoutOrCanceledException"/> class.
/// </summary>
/// <param name="inner">Cause of the exception.</param>
public WorkflowUpdateRpcTimeoutOrCanceledException(Exception? inner)
: base("Timeout or cancellation waiting for update", inner)
{
}
}
}
57 changes: 38 additions & 19 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3890,25 +3890,6 @@ public async Task ExecuteWorkflowAsync_Updates_DuplicateMemoized()
});
}

[Fact]
public async Task ExecuteWorkflowAsync_Updates_GetResultCancellation()
{
await ExecuteWorkerAsync<UpdateWorkflow>(async worker =>
{
// Start the workflow
var handle = await Env.Client.StartWorkflowAsync(
(UpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
var updateHandle = await handle.StartUpdateAsync(
wf => wf.DoUpdateLongWaitAsync(), new(WorkflowUpdateStage.Accepted));
// Ask for the result but only for 1 second
using var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(1));
await Assert.ThrowsAsync<OperationCanceledException>(() =>
updateHandle.GetResultAsync(new() { CancellationToken = tokenSource.Token }));
});
}

[Fact]
public async Task ExecuteWorkflowAsync_Updates_ValidatorCreatesCommands()
{
Expand Down Expand Up @@ -3994,6 +3975,44 @@ await foreach (var evt in handle.FetchHistoryEventsAsync())
});
}

[Fact]
public async Task ExecuteWorkflowAsync_Updates_ClientCanceled()
{
await ExecuteWorkerAsync<UpdateWorkflow>(async worker =>
{
var handle = await Env.Client.StartWorkflowAsync(
(UpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
using var updateCancelSource = new CancellationTokenSource();
// Start update that waits until started and confirm it reached workflow
var updateTask = Task.Run(
() => handle.ExecuteUpdateAsync(
wf => wf.DoUpdateLongWaitAsync(),
new() { Rpc = new() { CancellationToken = updateCancelSource.Token } }));
await AssertMore.EqualEventuallyAsync(true, () => handle.QueryAsync(wf => wf.Waiting));
// Cancel and confirm error
updateCancelSource.Cancel();
await Assert.ThrowsAsync<WorkflowUpdateRpcTimeoutOrCanceledException>(() => updateTask);
});
}

[Fact]
public async Task ExecuteWorkflowAsync_Updates_ClientTimedOut()
{
// Don't even need to run worker for this one
var handle = await Env.Client.StartWorkflowAsync(
(UpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: $"tq-{Guid.NewGuid()}"));

// Run update that times out
await Assert.ThrowsAsync<WorkflowUpdateRpcTimeoutOrCanceledException>(() =>
handle.ExecuteUpdateAsync(
wf => wf.DoUpdateLongWaitAsync(),
new() { Rpc = new() { Timeout = TimeSpan.FromMilliseconds(50) } }));
}

[Workflow]
public class ImmediatelyCompleteUpdateAndWorkflow
{
Expand Down

0 comments on commit 8b430c4

Please sign in to comment.