Skip to content

Commit

Permalink
Add timeout and cancellation tokens to AsyncDelegatePump (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane32 committed Jul 19, 2023
1 parent ef8e7a2 commit e652aa3
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 60 deletions.
2 changes: 2 additions & 0 deletions src/AsyncResetEvents/AsyncAutoResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async Task<bool> WaitOrFalseAsync(Task task, int millisecondsTimeout, Cancellati
#endif
}

#if !NET6_0_OR_GREATER
private sealed class TaskContinuationState
{
public Task Task { get; }
Expand All @@ -138,6 +139,7 @@ public TaskContinuationState(Task task, AsyncAutoResetEvent asyncAutoResetEvent)
AsyncAutoResetEvent = asyncAutoResetEvent;
}
}
#endif

/// <summary>
/// Returns a task that will complete when the reset event has been signaled.
Expand Down
106 changes: 54 additions & 52 deletions src/AsyncResetEvents/AsyncDelegatePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,74 +4,76 @@ namespace Shane32.AsyncResetEvents;
/// An asynchronous delegate pump, where queued asynchronous delegates are
/// executed in order.
/// </summary>
public class AsyncDelegatePump : AsyncMessagePump<Func<Task>>
public class AsyncDelegatePump : AsyncMessagePump<IDelegateTuple>
{
/// <summary>
/// Initializes a new instance.
/// </summary>
public AsyncDelegatePump() : base(d => d())
public AsyncDelegatePump() : base(static info => info.ExecuteAsync())
{
}

/// <summary>
/// Executes a delegate in order and returns its result.
/// </summary>
public Task SendAsync(Func<Task> action)
{
#if NET5_0_OR_GREATER
var tcs = new TaskCompletionSource();
#else
var tcs = new TaskCompletionSource<byte>();
#endif
Post(() => {
Task task;
try {
task = action();
} catch (Exception ex) {
tcs.SetException(ex);
throw;
}
task.ContinueWith(task2 => {
if (task2.IsFaulted)
tcs.SetException(task2.Exception!.GetBaseException());
else if (task2.IsCanceled)
tcs.SetCanceled();
else
#if NET5_0_OR_GREATER
tcs.SetResult();
#else
tcs.SetResult(0);
#endif
}, TaskContinuationOptions.ExecuteSynchronously);
return task;
});
return tcs.Task;
}
=> SendAsync(action, Timeout.InfiniteTimeSpan, default);

/// <summary>
/// Executes a delegate in order and returns its result.
/// If the cancellation token is signaled before the delegate
/// starts executing, the delegate will not be started, and the
/// task will be canceled.
/// </summary>
public Task SendAsync(Func<Task> action, CancellationToken cancellationToken)
=> SendAsync(action, Timeout.InfiniteTimeSpan, cancellationToken);

/// <summary>
/// Executes a delegate in order and returns its result.
/// If the cancellation token is signaled before the delegate
/// starts executing, the delegate will not be started, and the
/// task will be canceled. If the timeout is reached before the
/// delegate starts executing, the delegate will not be started,
/// and the task will throw <see cref="TimeoutException"/>.
/// </summary>
public Task SendAsync(Func<Task> action, TimeSpan timeout, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var info = new DelegateTuple(action, timeout, cancellationToken);
Post(info);
return info.Task;
}

/// <inheritdoc cref="SendAsync(Func{Task})"/>
public Task<T> SendAsync<T>(Func<Task<T>> action)
=> SendAsync(action, Timeout.InfiniteTimeSpan, default);

/// <inheritdoc cref="SendAsync(Func{Task}, CancellationToken)"/>
public Task<T> SendAsync<T>(Func<Task<T>> action, CancellationToken cancellationToken)
=> SendAsync(action, Timeout.InfiniteTimeSpan, cancellationToken);

/// <inheritdoc cref="SendAsync(Func{Task}, TimeSpan, CancellationToken)"/>
public Task<T> SendAsync<T>(Func<Task<T>> action, TimeSpan timeout, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var info = new DelegateTuple<T>(action, timeout, cancellationToken);
Post(info);
return info.Task;
}

/// <inheritdoc cref="AsyncMessagePump{T}.Post(T)"/>
public void Post(Func<Task> action)
{
Post(new SimpleTuple(action));
}

private class SimpleTuple : IDelegateTuple
{
var tcs = new TaskCompletionSource<T>();
Post(() => {
Task<T> task;
try {
task = action();
} catch (Exception ex) {
tcs.SetException(ex);
throw;
}
task.ContinueWith(task2 => {
if (task2.IsFaulted)
tcs.SetException(task2.Exception!.GetBaseException());
else if (task2.IsCanceled)
tcs.SetCanceled();
else
tcs.SetResult(task2.Result);
}, TaskContinuationOptions.ExecuteSynchronously);
return task;
});
return tcs.Task;
private readonly Func<Task> _action;
public SimpleTuple(Func<Task> action)
{
_action = action;
}
public Task ExecuteAsync() => _action();
}
}
14 changes: 6 additions & 8 deletions src/AsyncResetEvents/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ private struct MessageTuple
private TaskCompletionSource<bool>? _drainTask;
#endif

#if NETSTANDARD1_0
private static readonly Task _completedTask = Task.FromResult<byte>(0);
#endif

/// <summary>
/// Initializes a new instances with the specified asynchronous callback delegate.
/// </summary>
Expand All @@ -55,7 +51,7 @@ public AsyncMessagePump(Action<T> callback)
_callback = message => {
callback(message);
#if NETSTANDARD1_0
return _completedTask;
return DelegateTuple.CompletedTask;
#else
return Task.CompletedTask;
#endif
Expand Down Expand Up @@ -123,7 +119,9 @@ private async void CompleteAsync()
var message = messageTuple.Delegate != null
? await messageTuple.Delegate.ConfigureAwait(false)
: messageTuple.Value!;
await _callback(message).ConfigureAwait(false);
var callbackTask = _callback(message);
if (callbackTask != null)
await callbackTask.ConfigureAwait(false);
} catch (Exception ex) {
try {
await HandleErrorAsync(ex).ConfigureAwait(false);
Expand Down Expand Up @@ -165,7 +163,7 @@ public Task DrainAsync()
lock (_queue) {
if (_queue.Count == 0)
#if NETSTANDARD1_0
return _completedTask;
return DelegateTuple.CompletedTask;
#else
return Task.CompletedTask;
#endif
Expand All @@ -179,7 +177,7 @@ public Task DrainAsync()
/// </summary>
protected virtual Task HandleErrorAsync(Exception exception)
#if NETSTANDARD1_0
=> _completedTask;
=> DelegateTuple.CompletedTask;
#else
=> Task.CompletedTask;
#endif
Expand Down
110 changes: 110 additions & 0 deletions src/AsyncResetEvents/DelegateTuple.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
namespace Shane32.AsyncResetEvents;

/// <summary>
/// Holds state information for <see cref="AsyncDelegatePump"/>.
/// </summary>
#pragma warning disable CA1001 // Types that own disposable fields should be disposable
internal sealed class DelegateTuple : IDelegateTuple
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
private readonly Func<Task> _action;

#if NET5_0_OR_GREATER
private readonly TaskCompletionSource _taskCompletionSource = new();
#else
private readonly TaskCompletionSource<byte> _taskCompletionSource = new();
#endif

private readonly CancellationTokenRegistration? _cancellationTokenRegistration;
#if !NETSTANDARD1_0
private readonly Timer? _timer;
#endif
private int _started;

#if NETSTANDARD1_0
internal static readonly Task CompletedTask = Task.FromResult<byte>(0);
#endif

internal Task Task => _taskCompletionSource.Task;

public DelegateTuple(Func<Task> action, TimeSpan timeout, CancellationToken cancellationToken)
{
if (timeout == TimeSpan.Zero) {
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be greater than zero.");
}

_action = action;

if (cancellationToken.CanBeCanceled) {
// register cancellation callback that will cancel the task if it hasn't started yet
_cancellationTokenRegistration = cancellationToken.Register(static state => {
var info = (DelegateTuple)state!;
if (Interlocked.Exchange(ref info._started, 1) == 0) {
#if !NETSTANDARD1_0
info._timer?.Dispose();
#endif
// we don't need to dispose the registration because it's already been invoked
info._taskCompletionSource.SetCanceled();
}
}, this);
}

if (timeout != Timeout.InfiniteTimeSpan) {
#if !NETSTANDARD1_0
_timer = new Timer(static state => {
var info = (DelegateTuple)state!;
if (Interlocked.Exchange(ref info._started, 1) == 0) {
info._timer?.Dispose();
info._cancellationTokenRegistration?.Dispose();
info._taskCompletionSource.SetException(new TimeoutException());
}
}, this, timeout, Timeout.InfiniteTimeSpan);
#else
Task.Delay(timeout, default).ContinueWith(static (t, state) => {
var info = (DelegateTuple)state!;
if (Interlocked.Exchange(ref info._started, 1) == 0) {
info._cancellationTokenRegistration?.Dispose();
info._taskCompletionSource.SetException(new TimeoutException());
}
}, this, default(CancellationToken));
#endif
}
}

public Task ExecuteAsync()
{
if (Interlocked.Exchange(ref _started, 1) == 1) {
// already canceled, so do nothing
#if NETSTANDARD1_0
return CompletedTask;
#else
return Task.CompletedTask;
#endif
}
#if !NETSTANDARD1_0
_timer?.Dispose();
#endif
_cancellationTokenRegistration?.Dispose();
Task task;
try {
task = _action.Invoke();
} catch (Exception ex) {
_taskCompletionSource.SetException(ex);
throw;
}
task.ContinueWith(static (task2, state) => {
var info = (DelegateTuple)state!;
if (task2.IsFaulted)
info._taskCompletionSource.SetException(task2.Exception!.GetBaseException());
else if (task2.IsCanceled)
info._taskCompletionSource.SetCanceled();
else
#if NET5_0_OR_GREATER
info._taskCompletionSource.SetResult();
#else
info._taskCompletionSource.SetResult(0);
#endif
}, this, TaskContinuationOptions.ExecuteSynchronously);
return task;
}
}
Loading

0 comments on commit e652aa3

Please sign in to comment.