-
-
Notifications
You must be signed in to change notification settings - Fork 358
/
AsyncContext.cs
253 lines (223 loc) · 11.8 KB
/
AsyncContext.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx.Synchronous;
namespace Nito.AsyncEx
{
/// <summary>
/// Provides a context for asynchronous operations. This class is threadsafe.
/// </summary>
/// <remarks>
/// <para><see cref="Execute()"/> may only be called once. After <see cref="Execute()"/> returns, the async context should be disposed.</para>
/// </remarks>
[DebuggerDisplay("Id = {Id}, OperationCount = {_outstandingOperations}")]
[DebuggerTypeProxy(typeof(DebugView))]
public sealed partial class AsyncContext : IDisposable
{
/// <summary>
/// The queue holding the actions to run.
/// </summary>
private readonly TaskQueue _queue;
/// <summary>
/// The <see cref="SynchronizationContext"/> for this <see cref="AsyncContext"/>.
/// </summary>
private readonly AsyncContextSynchronizationContext _synchronizationContext;
/// <summary>
/// The <see cref="TaskScheduler"/> for this <see cref="AsyncContext"/>.
/// </summary>
private readonly AsyncContextTaskScheduler _taskScheduler;
/// <summary>
/// The <see cref="TaskFactory"/> for this <see cref="AsyncContext"/>.
/// </summary>
private readonly TaskFactory _taskFactory;
/// <summary>
/// The number of outstanding operations, including actions in the queue.
/// </summary>
private int _outstandingOperations;
/// <summary>
/// Initializes a new instance of the <see cref="AsyncContext"/> class. This is an advanced operation; most people should use one of the static <c>Run</c> methods instead.
/// </summary>
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public AsyncContext()
{
_queue = new TaskQueue();
_synchronizationContext = new AsyncContextSynchronizationContext(this);
_taskScheduler = new AsyncContextTaskScheduler(this);
_taskFactory = new TaskFactory(CancellationToken.None, TaskCreationOptions.HideScheduler, TaskContinuationOptions.HideScheduler, _taskScheduler);
}
/// <summary>
/// Gets a semi-unique identifier for this asynchronous context. This is the same identifier as the context's <see cref="TaskScheduler"/>.
/// </summary>
public int Id => _taskScheduler.Id;
/// <summary>
/// Increments the outstanding asynchronous operation count.
/// </summary>
private void OperationStarted()
{
var newCount = Interlocked.Increment(ref _outstandingOperations);
}
/// <summary>
/// Decrements the outstanding asynchronous operation count.
/// </summary>
private void OperationCompleted()
{
var newCount = Interlocked.Decrement(ref _outstandingOperations);
if (newCount == 0)
_queue.CompleteAdding();
}
/// <summary>
/// Queues a task for execution by <see cref="Execute"/>. If all tasks have been completed and the outstanding asynchronous operation count is zero, then this method has undefined behavior.
/// </summary>
/// <param name="task">The task to queue. May not be <c>null</c>.</param>
/// <param name="propagateExceptions">A value indicating whether exceptions on this task should be propagated out of the main loop.</param>
private void Enqueue(Task task, bool propagateExceptions)
{
OperationStarted();
task.ContinueWith(_ => OperationCompleted(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskScheduler);
_queue.TryAdd(task, propagateExceptions);
// If we fail to add to the queue, just drop the Task. This is the same behavior as the TaskScheduler.FromCurrentSynchronizationContext(WinFormsSynchronizationContext).
}
/// <summary>
/// Disposes all resources used by this class. This method should NOT be called while <see cref="Execute"/> is executing.
/// </summary>
public void Dispose()
{
_queue.Dispose();
}
/// <summary>
/// Executes all queued actions. This method returns when all tasks have been completed and the outstanding asynchronous operation count is zero. This method will unwrap and propagate errors from tasks that are supposed to propagate errors.
/// </summary>
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public void Execute()
{
SynchronizationContextSwitcher.ApplyContext(_synchronizationContext, () =>
{
var tasks = _queue.GetConsumingEnumerable();
foreach (var task in tasks)
{
_taskScheduler.DoTryExecuteTask(task.Item1);
// Propagate exception if necessary.
if (task.Item2)
task.Item1.WaitAndUnwrapException();
}
});
}
/// <summary>
/// Queues a task for execution, and begins executing all tasks in the queue. This method returns when all tasks have been completed and the outstanding asynchronous operation count is zero. This method will unwrap and propagate errors from the task.
/// </summary>
/// <param name="action">The action to execute. May not be <c>null</c>.</param>
public static void Run(Action action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
using (var context = new AsyncContext())
{
var task = context._taskFactory.Run(action);
context.Execute();
task.WaitAndUnwrapException();
}
}
/// <summary>
/// Queues a task for execution, and begins executing all tasks in the queue. This method returns when all tasks have been completed and the outstanding asynchronous operation count is zero. Returns the result of the task. This method will unwrap and propagate errors from the task.
/// </summary>
/// <typeparam name="TResult">The result type of the task.</typeparam>
/// <param name="action">The action to execute. May not be <c>null</c>.</param>
public static TResult Run<TResult>(Func<TResult> action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
using (var context = new AsyncContext())
{
var task = context._taskFactory.Run(action);
context.Execute();
return task.WaitAndUnwrapException();
}
}
/// <summary>
/// Queues a task for execution, and begins executing all tasks in the queue. This method returns when all tasks have been completed and the outstanding asynchronous operation count is zero. This method will unwrap and propagate errors from the task proxy.
/// </summary>
/// <param name="action">The action to execute. May not be <c>null</c>.</param>
public static void Run(Func<Task> action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
// ReSharper disable AccessToDisposedClosure
using (var context = new AsyncContext())
{
context.OperationStarted();
var task = context._taskFactory.Run(action).ContinueWith(t =>
{
context.OperationCompleted();
t.WaitAndUnwrapException();
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, context._taskScheduler);
context.Execute();
task.WaitAndUnwrapException();
}
// ReSharper restore AccessToDisposedClosure
}
/// <summary>
/// Queues a task for execution, and begins executing all tasks in the queue. This method returns when all tasks have been completed and the outstanding asynchronous operation count is zero. Returns the result of the task proxy. This method will unwrap and propagate errors from the task proxy.
/// </summary>
/// <typeparam name="TResult">The result type of the task.</typeparam>
/// <param name="action">The action to execute. May not be <c>null</c>.</param>
public static TResult Run<TResult>(Func<Task<TResult>> action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
// ReSharper disable AccessToDisposedClosure
using (var context = new AsyncContext())
{
context.OperationStarted();
var task = context._taskFactory.Run(action).ContinueWith(t =>
{
context.OperationCompleted();
return t.WaitAndUnwrapException();
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, context._taskScheduler);
context.Execute();
return task.WaitAndUnwrapException();
}
// ReSharper restore AccessToDisposedClosure
}
/// <summary>
/// Gets the current <see cref="AsyncContext"/> for this thread, or <c>null</c> if this thread is not currently running in an <see cref="AsyncContext"/>.
/// </summary>
public static AsyncContext? Current
{
get
{
var syncContext = SynchronizationContext.Current as AsyncContextSynchronizationContext;
return syncContext?.Context;
}
}
/// <summary>
/// Gets the <see cref="SynchronizationContext"/> for this <see cref="AsyncContext"/>. From inside <see cref="Execute"/>, this value is always equal to <see cref="System.Threading.SynchronizationContext.Current"/>.
/// </summary>
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public SynchronizationContext SynchronizationContext => _synchronizationContext;
/// <summary>
/// Gets the <see cref="TaskScheduler"/> for this <see cref="AsyncContext"/>. From inside <see cref="Execute"/>, this value is always equal to <see cref="TaskScheduler.Current"/>.
/// </summary>
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public TaskScheduler Scheduler => _taskScheduler;
#pragma warning disable CA1200 // Avoid using cref tags with a prefix
/// <summary>
/// Gets the <see cref="TaskFactory"/> for this <see cref="AsyncContext"/>. Note that this factory has the <see cref="TaskCreationOptions.HideScheduler"/> option set. Be careful with async delegates; you may need to call <see cref="M:System.Threding.SynchronizationContext.OperationStarted"/> and <see cref="M:System.Threading.SynchronizationContext.OperationCompleted"/> to prevent early termination of this <see cref="AsyncContext"/>.
/// </summary>
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
#pragma warning restore CA1200 // Avoid using cref tags with a prefix
public TaskFactory Factory => _taskFactory;
[DebuggerNonUserCode]
#pragma warning disable CA1812 // Avoid uninstantiated internal classes
internal sealed class DebugView
#pragma warning restore CA1812 // Avoid uninstantiated internal classes
{
private readonly AsyncContext _context;
public DebugView(AsyncContext context)
{
_context = context;
}
public TaskScheduler TaskScheduler => _context._taskScheduler;
}
}
}