Skip to content

Commit

Permalink
add OrderByCompletion
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenCleary committed Aug 26, 2017
1 parent eeff91d commit c01321d
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 0 deletions.
86 changes: 86 additions & 0 deletions src/Nito.AsyncEx.Tasks/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -133,5 +134,90 @@ public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> @

return Task.WhenAll(@this);
}

/// <summary>
/// Creates a new collection of tasks that complete in order.
/// </summary>
/// <typeparam name="T">The type of the results of the tasks.</typeparam>
/// <param name="this">The tasks to order by completion. May not be <c>null</c>.</param>
public static List<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> @this)
{
if (@this == null)
throw new ArgumentNullException(nameof(@this));

// This is a combination of Jon Skeet's approach and Stephen Toub's approach:
// http://msmvps.com/blogs/jon_skeet/archive/2012/01/16/eduasync-part-19-ordering-by-completion-ahead-of-time.aspx
// http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

// Reify the source task sequence. TODO: better reification.
var taskArray = @this.ToArray();

// Allocate a TCS array and an array of the resulting tasks.
var numTasks = taskArray.Length;
var tcs = new TaskCompletionSource<T>[numTasks];
var ret = new List<Task<T>>(numTasks);

// As each task completes, complete the next tcs.
var lastIndex = -1;
// ReSharper disable once ConvertToLocalFunction
Action<Task<T>> continuation = task =>
{
var index = Interlocked.Increment(ref lastIndex);
tcs[index].TryCompleteFromCompletedTask(task);
};

// Fill out the arrays and attach the continuations.
for (var i = 0; i != numTasks; ++i)
{
tcs[i] = new TaskCompletionSource<T>();
ret.Add(tcs[i].Task);
taskArray[i].ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

return ret;
}

/// <summary>
/// Creates a new collection of tasks that complete in order.
/// </summary>
/// <param name="this">The tasks to order by completion. May not be <c>null</c>.</param>
public static List<Task> OrderByCompletion(this IEnumerable<Task> @this)
{
if (@this == null)
throw new ArgumentNullException(nameof(@this));

// This is a combination of Jon Skeet's approach and Stephen Toub's approach:
// http://msmvps.com/blogs/jon_skeet/archive/2012/01/16/eduasync-part-19-ordering-by-completion-ahead-of-time.aspx
// http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

// Reify the source task sequence. TODO: better reification.
var taskArray = @this.ToArray();

// Allocate a TCS array and an array of the resulting tasks.
var numTasks = taskArray.Length;
var tcs = new TaskCompletionSource<object>[numTasks];
var ret = new List<Task>(numTasks);

// As each task completes, complete the next tcs.
var lastIndex = -1;
// ReSharper disable once ConvertToLocalFunction
Action<Task> continuation = task =>
{
var index = Interlocked.Increment(ref lastIndex);
tcs[index].TryCompleteFromCompletedTask(task, NullResultFunc);
};

// Fill out the arrays and attach the continuations.
for (var i = 0; i != numTasks; ++i)
{
tcs[i] = new TaskCompletionSource<object>();
ret.Add(tcs[i].Task);
taskArray[i].ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

return ret;
}

private static Func<object> NullResultFunc { get; } = () => null;
}
}
102 changes: 102 additions & 0 deletions test/AsyncEx.Tasks.UnitTests/TaskExtensionsUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,108 @@ public async Task WhenAll_TaskCompletes_CompletesTask()
await task;
}

[Fact]
public async Task OrderByCompletion_OrdersByCompletion()
{
var tcs = new TaskCompletionSource<int>[] {new TaskCompletionSource<int>(), new TaskCompletionSource<int>()};
var results = tcs.Select(x => x.Task).OrderByCompletion();

Assert.False(results[0].IsCompleted);
Assert.False(results[1].IsCompleted);

tcs[1].SetResult(13);
var result0 = await results[0];
Assert.False(results[1].IsCompleted);
Assert.Equal(13, result0);

tcs[0].SetResult(17);
var result1 = await results[1];
Assert.Equal(13, result0);
Assert.Equal(17, result1);
}

[Fact]
public async Task OrderByCompletion_PropagatesFaultOnFirstCompletion()
{
var tcs = new TaskCompletionSource<int>[] {new TaskCompletionSource<int>(), new TaskCompletionSource<int>()};
var results = tcs.Select(x => x.Task).OrderByCompletion();

tcs[1].SetException(new InvalidOperationException("test message"));
try
{
await results[0];
}
catch (InvalidOperationException ex)
{
Assert.Equal("test message", ex.Message);
return;
}

Assert.True(false);
}

[Fact]
public async Task OrderByCompletion_PropagatesFaultOnSecondCompletion()
{
var tcs = new TaskCompletionSource<int>[] {new TaskCompletionSource<int>(), new TaskCompletionSource<int>()};
var results = tcs.Select(x => x.Task).OrderByCompletion();

tcs[0].SetResult(13);
tcs[1].SetException(new InvalidOperationException("test message"));
await results[0];
try
{
await results[1];
}
catch (InvalidOperationException ex)
{
Assert.Equal("test message", ex.Message);
return;
}

Assert.True(false);
}

[Fact]
public async Task OrderByCompletion_PropagatesCancelOnFirstCompletion()
{
var tcs = new TaskCompletionSource<int>[] {new TaskCompletionSource<int>(), new TaskCompletionSource<int>()};
var results = tcs.Select(x => x.Task).OrderByCompletion();

tcs[1].SetCanceled();
try
{
await results[0];
}
catch (OperationCanceledException)
{
return;
}

Assert.True(false);
}

[Fact]
public async Task OrderByCompletion_PropagatesCancelOnSecondCompletion()
{
var tcs = new TaskCompletionSource<int>[] {new TaskCompletionSource<int>(), new TaskCompletionSource<int>()};
var results = tcs.Select(x => x.Task).OrderByCompletion();

tcs[0].SetResult(13);
tcs[1].SetCanceled();
await results[0];
try
{
await results[1];
}
catch (OperationCanceledException)
{
return;
}

Assert.True(false);
}

private static CancellationToken GetCancellationTokenFromTask(Task task)
{
try
Expand Down

0 comments on commit c01321d

Please sign in to comment.