-
Notifications
You must be signed in to change notification settings - Fork 5k
/
TaskFactoryExtensions_Iterate.cs
218 lines (200 loc) · 12.2 KB
/
TaskFactoryExtensions_Iterate.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
//
using System.Collections.Generic;
namespace System.Threading.Tasks
{
public static partial class TaskFactoryExtensions
{
#region No Object State Overloads
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, null, factory.CancellationToken, factory.CreationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the iteration.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source,
CancellationToken cancellationToken)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, null, cancellationToken, factory.CreationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="creationOptions">Options that control the task's behavior.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source,
TaskCreationOptions creationOptions)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, null, factory.CancellationToken, creationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="scheduler">The scheduler to which tasks will be scheduled.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source,
TaskScheduler scheduler)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, null, factory.CancellationToken, factory.CreationOptions, scheduler);
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the iteration.</param>
/// <param name="creationOptions">Options that control the task's behavior.</param>
/// <param name="scheduler">The scheduler to which tasks will be scheduled.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source,
CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskScheduler scheduler) => Iterate(factory, source, null, cancellationToken, creationOptions, scheduler);
#endregion
#region Object State Overloads and Full Implementation
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="state">The asynchronous state for the returned Task.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source, object state)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, state, factory.CancellationToken, factory.CreationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="state">The asynchronous state for the returned Task.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the iteration.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source, object state,
CancellationToken cancellationToken)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, state, cancellationToken, factory.CreationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="state">The asynchronous state for the returned Task.</param>
/// <param name="creationOptions">Options that control the task's behavior.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source, object state,
TaskCreationOptions creationOptions)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, state, factory.CancellationToken, creationOptions, factory.GetTargetScheduler());
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="state">The asynchronous state for the returned Task.</param>
/// <param name="scheduler">The scheduler to which tasks will be scheduled.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source, object state,
TaskScheduler scheduler)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
return Iterate(factory, source, state, factory.CancellationToken, factory.CreationOptions, scheduler);
}
/// <summary>Asynchronously iterates through an enumerable of tasks.</summary>
/// <param name="factory">The target factory.</param>
/// <param name="source">The enumerable containing the tasks to be iterated through.</param>
/// <param name="state">The asynchronous state for the returned Task.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the iteration.</param>
/// <param name="creationOptions">Options that control the task's behavior.</param>
/// <param name="scheduler">The scheduler to which tasks will be scheduled.</param>
/// <returns>A Task that represents the complete asynchronous operation.</returns>
public static Task Iterate(
this TaskFactory factory,
IEnumerable<object> source, object state,
CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskScheduler scheduler)
{
// Validate/update parameters
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (source == null) throw new ArgumentNullException(nameof(source));
if (scheduler == null) throw new ArgumentNullException(nameof(scheduler));
// Get an enumerator from the enumerable
var enumerator = source.GetEnumerator();
if (enumerator == null) throw new InvalidOperationException("Invalid enumerable - GetEnumerator returned null");
// Create the task to be returned to the caller. And ensure
// that when everything is done, the enumerator is cleaned up.
var trs = new TaskCompletionSource<object>(state, creationOptions);
trs.Task.ContinueWith(_ => enumerator.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
// This will be called every time more work can be done.
Action<Task> recursiveBody = null;
recursiveBody = antecedent =>
{
try
{
// If we should continue iterating and there's more to iterate
// over, create a continuation to continue processing. We only
// want to continue processing once the current Task (as yielded
// from the enumerator) is complete.
if (enumerator.MoveNext())
{
var nextItem = enumerator.Current;
// If we got a Task, continue from it to continue iterating
if (nextItem is Task nextTask)
{
nextTask.IgnoreExceptions(); // TODO: Is this a good idea?
nextTask.ContinueWith(recursiveBody).IgnoreExceptions();
}
// If we got a scheduler, continue iterating under the new scheduler,
// enabling hopping between contexts.
else if (nextItem is TaskScheduler)
{
Task.Factory.StartNew(() => recursiveBody(null), CancellationToken.None, TaskCreationOptions.None, (TaskScheduler)nextItem).IgnoreExceptions();
}
else trs.TrySetException(new InvalidOperationException("Task or TaskScheduler object expected in Iterate"));
}
// Otherwise, we're done!
else trs.TrySetResult(null);
}
// If MoveNext throws an exception, propagate that to the user,
// either as cancellation or as a fault
catch (Exception exc)
{
if (exc is OperationCanceledException oce && oce.CancellationToken == cancellationToken)
{
trs.TrySetCanceled();
}
else trs.TrySetException(exc);
}
};
// Get things started by launching the first task
factory.StartNew(() => recursiveBody(null), CancellationToken.None, TaskCreationOptions.None, scheduler).IgnoreExceptions();
// Return the representative task to the user
return trs.Task;
}
#endregion
}
}