-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposed additions to GrainCancellationTokenSource #7977
Comments
See also: #8267 |
I spent some time looking into this. The proposed solution to use My vote to solve this problem goes to For now, we are working around this problem by creating a task from the incoming This is what the helper class looks like: public static class CancellableGrainMethodInvoker
{
/// <summary>
/// The retry policy to propagate cancellation to the grain
/// </summary>
private static readonly Lazy<AsyncRetryPolicy> CancellationRetryPolicy =
new Lazy<AsyncRetryPolicy>(() => Policy.Handle<Exception>().RetryAsync(3));
/// <summary>
/// Invokes a method on a grain (<see cref="global::Orleans.IGrain"/>) with support for cancellation
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote).
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TResult">The type of result</typeparam>
/// <returns>The result of the grain method</returns>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static Task<TResult> InvokeAsync<TGrain, TResult>(
TGrain grain,
Func<TGrain, GrainCancellationToken, Task<TResult>> grainMethod,
CancellationToken cancellationToken) where TGrain : IGrain =>
InvokeAsync(
grain,
grainMethod,
static (grain, grainMethod, grainCancellationToken) => grainMethod(grain, grainCancellationToken),
cancellationToken);
/// <summary>
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote) grain.
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static Task InvokeAsync<TGrain>(
TGrain grain,
Func<TGrain, GrainCancellationToken, Task> grainMethod,
CancellationToken cancellationToken)
where TGrain: IGrain =>
InvokeAsync(
grain,
grainMethod,
static (grain, grainMethod, grainCancellationToken) => grainMethod(grain, grainCancellationToken),
cancellationToken);
/// <summary>
/// Invokes a method on a grain (<see cref="global::Orleans.IGrain"/>) with support for cancellation
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote).
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="state">The state to pass in to avoid closures (this allows you to make your callback static)</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TState">The type of state</typeparam>
/// <typeparam name="TResult">The type of result</typeparam>
/// <returns>The result of the grain method</returns>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static async Task<TResult> InvokeAsync<TGrain, TState, TResult>(
TGrain grain,
TState state,
Func<TGrain, TState, GrainCancellationToken, Task<TResult>> grainMethod,
CancellationToken cancellationToken) where TGrain: IGrain
{
// Immediately throw an OperationCanceledException and do not even invoke the grain if the cancellation token is already cancelled
cancellationToken.ThrowIfCancellationRequested();
// Create a task that will throw an OperationCanceledException when the cancellationToken is canceled
await using var cancellationTokenTaskSource = new CancellationTokenTaskSource(cancellationToken);
var cancellationTask = cancellationTokenTaskSource.Task;
// Create a grain cancellation token to communicate a possible cancellation event to the grain
using var grainCancellationTokenSource = new GrainCancellationTokenSource();
var grainCancellationToken = grainCancellationTokenSource.Token;
var grainMethodTask = grainMethod.Invoke(grain, state, grainCancellationToken);
if (await Task.WhenAny(grainMethodTask, cancellationTask) == grainMethodTask)
{
// The grain method completed first, await it to propagate inner exceptions or cancellations
return await grainMethodTask;
}
// This will propagate the cancellation to the grain, which can be running on a different machine
// Since this is a possible over-the-wire operation, we use a retry policy to subdue transient errors
await CancellationRetryPolicy.Value.ExecuteAsync(() => grainCancellationTokenSource.Cancel());
// This will propagate the OperationCanceledException up the call stack
await cancellationTask;
throw new UnreachableException("At this point, the OperationCanceledException should have been thrown already");
}
/// <summary>
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote) grain.
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="state">The state to pass in to avoid closures (this allows you to make your callback static)</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TState">The type of state</typeparam>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static async Task InvokeAsync<TGrain, TState>(
TGrain grain,
TState state,
Func<TGrain, TState, GrainCancellationToken, Task> grainMethod,
CancellationToken cancellationToken)
where TGrain: IGrain
{
// Immediately throw an OperationCanceledException and do not even invoke the grain if the cancellation token is already cancelled
cancellationToken.ThrowIfCancellationRequested();
// Create a task that will throw an OperationCanceledException when the cancellationToken is canceled
await using var cancellationTokenTaskSource = new CancellationTokenTaskSource(cancellationToken);
var cancellationTask = cancellationTokenTaskSource.Task;
// Create a grain cancellation token to communicate a possible cancellation event to the grain
using var grainCancellationTokenSource = new GrainCancellationTokenSource();
var grainCancellationToken = grainCancellationTokenSource.Token;
var grainMethodTask = grainMethod.Invoke(grain, state, grainCancellationToken);
if (await Task.WhenAny(grainMethodTask, cancellationTask) == grainMethodTask)
{
// The grain method completed first, await it to propagate inner exceptions or cancellations
await grainMethodTask;
return;
}
// This will propagate the cancellation to the grain, which can be running on a different machine
// Since this is a possible over-the-wire operation, we use a retry policy to subdue transient errors
await CancellationRetryPolicy.Value.ExecuteAsync(() => grainCancellationTokenSource.Cancel());
// This will propagate the OperationCanceledException up the call stack
await cancellationTask;
}
} And this is what /// <summary>
/// Holds the task for a cancellation token, as well as the token registration. The registration is disposed when this instance is disposed.
/// </summary>
public sealed class CancellationTokenTaskSource : IDisposable, IAsyncDisposable
{
/// <summary>
/// The cancellation token registration, if any. This is <c>null</c> if the registration was not necessary.
/// </summary>
private readonly CancellationTokenRegistration? _cancellationTokenRegistration;
/// <summary>
/// Creates a task for the specified cancellation token, registering with the token if necessary.
/// </summary>
/// <param name="cancellationToken">The cancellation token to observe.</param>
public CancellationTokenTaskSource(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
Task = Task.FromCanceled(cancellationToken);
return;
}
var tcs = new TaskCompletionSource();
_cancellationTokenRegistration = cancellationToken.Register(
static (state, innerCancellationToken) => ((TaskCompletionSource?)state)!.SetCanceled(innerCancellationToken),
tcs
);
Task = tcs.Task;
}
/// <summary>
/// Gets the task for the source cancellation token.
/// </summary>
public Task Task { get; }
/// <summary>
/// Disposes the cancellation token registration, if any.
/// Note that this may cause <see cref="Task"/> to never complete.
/// </summary>
public void Dispose() => _cancellationTokenRegistration?.Dispose();
/// <summary>
/// Disposes the cancellation token registration, if any.
/// Note that this may cause <see cref="Task"/> to never complete.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_cancellationTokenRegistration != null)
{
await _cancellationTokenRegistration.Value.DisposeAsync();
}
}
} Finally, this is how the helper class can be used: interface IMyGrain : IGrain
{
Task DoSomethingAsync(GrainCancellationToken grainCancellationToken);
}
class MyGrain : Grain, IMyGrain
{
public Task DoSomethingAsync(GrainCancellationToken grainCancellationToken)
{
// TODO
}
}
class HelloWorldController
{
private readonly IGrainFactory _grainFactory;
public HelloWorldController(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
public async Task<IActionResult> Index(CancellationToken cancellationToken)
{
var myGrain = _grainFactory.GetGrain<IMyGrain>(0);
await CancellableGrainMethodInvoker.InvokeAsync(
myGrain,
static (grain, grainCancellationToken) => grain.DoSomethingAsync(grainCancellationToken),
cancellationToken);
}
} There overloads of CancellableGrainMethodInvoker.InvokeAsync are designed to avoid allocation, you can pass a state parameter which is then passed into the lambda so you don't need closures. This should allow you to make your lambdas static. |
Good point. I had somehow missed that. I guess then it should actually be: using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
static cts => ((GrainCancellationTokenSource)cts!).Cancel().Wait(), cts))
{
await grain.Method(..., cts.Token);
} This is unfortunately however "sync-over-async". We don't generally expect the The only way we can get that exception to propagate to where the If I understand @amoerie's suggestion, the exception would instead propagate to where the grain method is invoked. If that is acceptable, then that could also be achieved as follows: Task? cancellationTask = null;
using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
cts => cancellationTask = ((GrainCancellationTokenSource)cts!).Cancel(), cts))
{
await grain.Method(..., cts.Token);
if (cancellationTask is not null)
{
await cancellationTask;
}
} The above approach does however require a heap allocation for the lambda passed to the Another approach (based on @amoerie's suggestion) would be: using var cts = new GrainCancellationTokenSource();
var task = grain.Method(..., cts.Token); // Invoke the grain method
try
{
// Wait for the grain method to complete or cancellationToken to be cancelled.
await task.WaitAsync(cancellationToken);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken)
{
// cancellationToken was cancelled, so cancel the GrainCancellationToken and then await the
// grain method to propagate the exception.
await cts.Cancel();
await task;
} I think this shows that a solution like a |
This is unacceptable for us.
Could it fail if the remote grain has since been deactivated, or gone offline unexpectedly due to network issues?
Are exceptions thrown on the remote grain propagated back to the client if you call GrainCancellationToken.Cancel?
You understand correctly, and this is definitely acceptable for us. This behavior feels like idiomatic C#: an async method that is canceled throws an OperationCanceledException.
Ah I like how you used Task.Wait to combine the task and the cancellation token. This looks like a good equivalent to my helper class, yes. |
Maybe, I'm not sure. @ReubenBond? I certainly think that it shouldn't fail. i.e., I don't think any delegate registered with the A method accepting a cancellation token may ignore the cancellation; but it should never cause the If a remote grain has been deactivated, then we expect any method invoked on that grain to fail; but we don't expect the A grain method can end prematurely due to either an error occurring (e.g., a network error) or the given But, I don't know whether this is true of the Orleans runtime and |
I assume you mean if you call If an exception has been thrown by the remote grain before you call If you invoke I suspect in that case the local grain method invocation will be cancelled when the |
Perhaps the best solution for this issue would be for Orleans to permit a cancelable grain method to accept a That would arguably obviate the need for |
An implementation may want to repeatedly retry cancellations if there is a failure (for example, if it's a longer running or expensive operation that would really benefit from being cancelled) or it may want to send one retry, or perhaps just attempt to cancel it once and log (or ignore) any failure to send the cancellation. One potential, simple implementation from the Discord, though a more resilient implementation with retires would also be an option if a use case merits that:
|
Okay yes, I just noticed that is also mentioned in the docs. But the fact remains that invoking an Orleans grain method from a non-Orleans service (e.g., from a gRPC or HTTP/REST service in ASP.NET) means we must wire up a i.e., we must wire up an asynchronous cancellation to a synchronous trigger, and that completely inescapable - i.e., outside our control. We definitely should not be doing "sync-over-async" (as explained here), so that makes our options somewhat limited. I also think it is also problematic that the I can see two problems with the proposed
I think the correct way to do "fire and forget" cancellations would be: using var cts = new GrainCancellationTokenSource();
using (context.CancellationToken.Register(
static cts => Task.Run(async () =>
{
try
{
await ((GrainCancellationTokenSource)cts!).Cancel());
}
catch
{
}
}), cts))
{
await grain.Method(..., cts.Token);
} We need the delegate to swallow all exceptions so we don't get an unobserved task exception if/when the I don't think retrying I think it would be better if each grain cancellation operation were to be independently retried if/when necessary. However, that would require an enhancement to Orleans. Note that the only reason not to do "fire and forget" cancellations is if you want/need to retry failed cancellations, and I suspect that isn't a common requirement. I'd actually be happy to just immediately cancel all client-side grain methods and not bother sending cancellation messages at all, which would be possible with the enhancement I proposed in #8634. Note that this is how REST and gRPC remote calls are cancelled because the HTTP protocol has no way to cancel a request. |
@bill-poole Just as an add-on to your answer, you can also simply use |
Thanks @amoerie. I didn't realize Orleans provided an using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
static cts => ((GrainCancellationTokenSource)cts!).Cancel()).Ignore(), cts))
{
await grain.Method(..., cts.Token);
} |
It's good practice to make grain methods cancelable by passing a
GrainCancellationToken
to each grain method. However, if we are calling a grain method via an Orleans client, then we have aCancellationToken
, not aGrainCancellationToken
.Ideally, grain methods would just accept a
CancellationToken
rather than aGrainCancellationToken
, which would make grains more "POCO". However, in the absence of that, either or both of the following two additions would be helpful:GrainCancellationTokenSource.CreateLinkedTokenSource(CancellationToken)
method like what exists on theCancellationTokenSource
class. This would allow us to more easily create aGrainCancellationTokenSource
linked to an existingCancellationToken
).CancellationToken
to aGrainCancellationToken
so we can pass an existingCancellationToken
directly to a grain method accepting aGrainCancellationToken
.In the absence of either of the above, we are forced to write code like below.
The text was updated successfully, but these errors were encountered: