-
Notifications
You must be signed in to change notification settings - Fork 2k
/
AsyncPipeline.cs
226 lines (200 loc) · 7.64 KB
/
AsyncPipeline.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Orleans.Runtime
{
/// <summary>
/// A utility interface that allows to control the rate of generation of asynchronous activities.
/// </summary>
/// <seealso cref="AsyncPipeline"/>
public interface IPipeline
{
/// <summary>Adds a new task to the pipeline</summary>
/// <param name="task">The task to add</param>
void Add(Task task);
/// <summary>Waits until all currently queued asynchronous operations are done. Blocks the calling thread.</summary>
void Wait();
/// <summary>The number of items currently enqueued into this pipeline.</summary>
int Count { get; }
}
/// <summary>
/// A helper utility class that allows to control the rate of generation of asynchronous activities.
/// Maintains a pipeline of asynchronous operations up to a given maximal capacity and blocks the calling thread if the pipeline
/// gets too deep before enqueued operations are not finished.
/// Effectively adds a back-pressure to the caller.
/// This is mainly useful for stress-testing grains under controlled load and should never be used from within a grain code!
/// </summary>
public class AsyncPipeline : IPipeline
{
/// <summary>
/// The Default Capacity of this AsyncPipeline. Equals to 10.
/// </summary>
public const int DEFAULT_CAPACITY = 10;
private readonly HashSet<Task> running;
private readonly int capacity;
private readonly LinkedList<Tuple<Task,TaskCompletionSource<bool>>> waiting;
private readonly object lockable;
/// <summary>
/// The maximal number of async in-flight operations that can be enqueued into this async pipeline.
/// </summary>
public int Capacity { get { return capacity; } }
/// <summary>
/// The number of items currently enqueued into this async pipeline.
/// </summary>
public int Count { get { return running.Count; } }
/// <summary>
/// Constructs an empty AsyncPipeline with capacity equal to the DefaultCapacity.
/// </summary>
public AsyncPipeline() :
this(DEFAULT_CAPACITY)
{}
/// <summary>
/// Constructs an empty AsyncPipeline with a given capacity.
/// </summary>
/// <param name="capacity">The maximal capacity of this AsyncPipeline.</param>
public AsyncPipeline(int capacity)
{
if (capacity < 1)
throw new ArgumentOutOfRangeException("capacity", "The pipeline size must be larger than 0.");
running = new HashSet<Task>();
waiting = new LinkedList<Tuple<Task, TaskCompletionSource<bool>>>();
this.capacity = capacity;
lockable = new object();
}
/// <summary>
/// Adds a new task to this AsyncPipeline.
/// </summary>
/// <param name="task">A task to add to this AsyncPipeline.</param>
public void Add(Task task)
{
Add(task, whiteBox: null);
}
/// <summary>
/// Adds a collection of tasks to this AsyncPipeline.
/// </summary>
/// <param name="tasks">A collection of tasks to add to this AsyncPipeline.</param>
public void AddRange(IEnumerable<Task> tasks)
{
foreach (var i in tasks)
Add(i);
}
/// <summary>
/// Adds a collection of tasks to this AsyncPipeline.
/// </summary>
/// <param name="tasks">A collection of tasks to add to this AsyncPipeline.</param>
public void AddRange<T>(IEnumerable<Task<T>> tasks)
{
foreach (var i in tasks)
Add(i);
}
/// <summary>
/// Waits until all currently queued asynchronous operations are done.
/// Blocks the calling thread.
/// </summary>
public void Wait()
{
Wait(null);
}
internal void Wait(WhiteBox whiteBox)
{
var tasks = new List<Task>();
lock (lockable)
{
tasks.AddRange(running);
foreach (var i in waiting)
tasks.Add(i.Item2.Task);
}
Task.WhenAll(tasks).Wait();
if (null != whiteBox)
{
whiteBox.Reset();
whiteBox.PipelineSize = 0;
}
}
private bool IsFull
{
get
{
return Count >= capacity;
}
}
internal void Add(Task task, WhiteBox whiteBox)
{
if (null == task)
throw new ArgumentNullException("task");
// whitebox testing results-- we initialize pipeSz with an inconsistent copy of Count because it's better than nothing and will reflect that the pipeline size was in a valid state during some portion of this method, even if it isn't at a properly synchronized moment.
int pipeSz = Count;
var full = false;
// we should be using a try...finally to execute the whitebox testing logic here but it apparently adds too much latency to be palatable for AsyncPipelineSimpleTest(), which is sensitive to latency.
try
{
TaskCompletionSource<bool> tcs;
lock (lockable)
{
if (!IsFull && waiting.Count == 0)
{
task.ContinueWith(OnTaskCompletion).Ignore();
running.Add(task);
pipeSz = Count;
return;
}
full = true;
tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
waiting.AddLast(Tuple.Create(task, tcs));
}
tcs.Task.Wait();
// the following quantity is an inconsistent value but i don't have a means to geuuuut one in this part of the
// code because adding the actual add has already been performed from within a continuation.
pipeSz = Count;
}
finally
{
if (whiteBox != null)
{
whiteBox.Reset();
whiteBox.PipelineSize = pipeSz;
whiteBox.PipelineFull = full;
}
}
}
private void OnTaskCompletion(Task task)
{
lock (lockable)
{
running.Remove(task);
UnblockWaiting();
}
}
private void UnblockWaiting()
{
while (!IsFull && waiting.Count > 0)
{
Tuple<Task,TaskCompletionSource<bool>> next = waiting.First();
waiting.RemoveFirst();
Task task = next.Item1;
if(!task.IsCompleted)
{
task.ContinueWith(OnTaskCompletion).Ignore();
running.Add(task);
}
next.Item2.SetResult(true);
}
}
internal class WhiteBox
{
public bool PipelineFull { get; internal set; }
public int PipelineSize { get; internal set; }
public bool FastPathed { get; internal set; }
public WhiteBox()
{
Reset();
}
public void Reset()
{
PipelineFull = false;
PipelineSize = 0;
}
}
}
}