/
BufferBlock.cs
482 lines (418 loc) · 25.6 KB
/
BufferBlock.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BufferBlock.cs
//
//
// A propagator block that provides support for unbounded and bounded FIFO buffers.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Security;
using System.Threading.Tasks.Dataflow.Internal;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a buffer for storing data.</summary>
/// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BufferBlock<>.DebugView))]
public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
{
/// <summary>The core logic for the buffer block.</summary>
private readonly SourceCore<T> _source;
/// <summary>The bounding state for when in bounding mode; null if not bounding.</summary>
private readonly BoundingStateWithPostponedAndTask<T>? _boundingState;
/// <summary>Whether all future messages should be declined on the target.</summary>
private bool _targetDecliningPermanently;
/// <summary>A task has reserved the right to run the target's completion routine.</summary>
private bool _targetCompletionReserved;
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
/// <summary>Initializes the <see cref="BufferBlock{T}"/>.</summary>
public BufferBlock() :
this(DataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="BufferBlock{T}"/> with the specified <see cref="DataflowBlockOptions"/>.</summary>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BufferBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BufferBlock(DataflowBlockOptions dataflowBlockOptions)
{
if (dataflowBlockOptions is null)
{
throw new ArgumentNullException(nameof(dataflowBlockOptions));
}
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Initialize bounding state if necessary
Action<ISourceBlock<T>, int>? onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
onItemsRemoved = static (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
_boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
}
// Initialize the source state
_source = new SourceCore<T>(this, dataflowBlockOptions,
static owningSource => ((BufferBlock<T>)owningSource).Complete(),
onItemsRemoved);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BufferBlock<T>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception!);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, static (owningSource, _) => ((BufferBlock<T>)owningSource!).Complete(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
lock (IncomingLock)
{
// If we've already stopped accepting messages, decline permanently
if (_targetDecliningPermanently)
{
CompleteTargetIfPossible();
return DataflowMessageStatus.DecliningPermanently;
}
// We can directly accept the message if:
// 1) we are not bounding, OR
// 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
// (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
// (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
// accepting new ones directly into the queue.)
if (_boundingState == null
||
(_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
{
// Consume the message from the source if necessary
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// Once consumed, pass it to the source
_source.AddMessage(messageValue!);
if (_boundingState != null) _boundingState.CurrentCount++;
return DataflowMessageStatus.Accepted;
}
// Otherwise, we try to postpone if a source was provided
else if (source != null)
{
Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
"PostponedMessages must have been initialized during construction in bounding mode.");
_boundingState.PostponedMessages.Push(source, messageHeader);
return DataflowMessageStatus.Postponed;
}
// We can't do anything else about this message
return DataflowMessageStatus.Declined;
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
}
private void CompleteCore(Exception? exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
{
Debug.Assert(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
"Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
lock (IncomingLock)
{
// Faulting from outside is allowed until we start declining permanently.
// Faulting from inside is allowed at any time.
if (exception != null && (!_targetDecliningPermanently || storeExceptionEvenIfAlreadyCompleting))
{
_source.AddException(exception);
}
// Revert the dirty processing state if requested
if (revertProcessingState)
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"The processing state must be dirty when revertProcessingState==true.");
_boundingState.TaskForInputProcessing = null;
}
// Trigger completion
_targetDecliningPermanently = true;
CompleteTargetIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public bool TryReceive(Predicate<T>? filter, [MaybeNullWhen(false)] out T item) { return _source.TryReceive(filter, out item); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items) { return _source.TryReceiveAll(out items); }
/// <summary>Gets the number of items currently stored in the buffer.</summary>
public int Count { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
T? ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>Notifies the block that one or more items was removed from the queue.</summary>
/// <param name="numItemsRemoved">The number of items removed.</param>
private void OnItemsRemoved(int numItemsRemoved)
{
Debug.Assert(numItemsRemoved > 0, "A positive number of items to remove is required.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// If we're bounding, we need to know when an item is removed so that we
// can update the count that's mirroring the actual count in the source's queue,
// and potentially kick off processing to start consuming postponed messages.
if (_boundingState != null)
{
lock (IncomingLock)
{
// Decrement the count, which mirrors the count in the source half
Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
"It should be impossible to have a negative number of items.");
_boundingState.CurrentCount -= numItemsRemoved;
ConsumeAsyncIfNecessary();
CompleteTargetIfPossible();
}
}
}
/// <summary>Called when postponed messages may need to be consumed.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
Debug.Assert(_boundingState != null, "Must be in bounded mode.");
if (!_targetDecliningPermanently &&
_boundingState.TaskForInputProcessing == null &&
_boundingState.PostponedMessages.Count > 0 &&
_boundingState.CountIsLessThanBound)
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_boundingState.TaskForInputProcessing =
new Task(static state => ((BufferBlock<T>)state!).ConsumeMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
_boundingState.PostponedMessages.Count);
}
// Start the task handling scheduling exceptions
Exception? exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// Get out from under currently held locks. CompleteCore re-acquires the locks it needs.
Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc!, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>Task body used to consume postponed messages.</summary>
private void ConsumeMessagesLoopCore()
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
try
{
int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
for (int i = 0;
i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
i++)
;
}
catch (Exception exc)
{
// Prevent the creation of new processing tasks
CompleteCore(exc, storeExceptionEvenIfAlreadyCompleting: true);
}
finally
{
lock (IncomingLock)
{
// We're no longer processing, so null out the processing task
_boundingState.TaskForInputProcessing = null;
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ConsumeAsyncIfNecessary(isReplacementReplica: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteTargetIfPossible();
}
}
}
/// <summary>
/// Retrieves one postponed message if there's room and if we can consume a postponed message.
/// Stores any consumed message into the source half.
/// </summary>
/// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
/// <remarks>This must only be called from the asynchronous processing loop.</remarks>
private bool ConsumeAndStoreOneMessageIfAvailable()
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Loop through the postponed messages until we get one.
while (true)
{
// Get the next item to retrieve. If there are no more, bail.
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
lock (IncomingLock)
{
if (_targetDecliningPermanently) return false;
if (!_boundingState.CountIsLessThanBound) return false;
if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
// Optimistically assume we're going to get the item. This avoids taking the lock
// again if we're right. If we're wrong, we decrement it later under lock.
_boundingState.CurrentCount++;
}
// Consume the item
bool consumed = false;
try
{
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue!);
return true;
}
}
finally
{
// We didn't get the item, so decrement the count to counteract our optimistic assumption.
if (!consumed)
{
lock (IncomingLock) _boundingState.CurrentCount--;
}
}
}
}
/// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
private void CompleteTargetIfPossible()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (_targetDecliningPermanently &&
!_targetCompletionReserved &&
(_boundingState == null || _boundingState.TaskForInputProcessing == null))
{
_targetCompletionReserved = true;
// If we're in bounding mode and we have any postponed messages, we need to clear them,
// which means calling back to the source, which means we need to escape the incoming lock.
if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
{
Task.Factory.StartNew(static state =>
{
var thisBufferBlock = (BufferBlock<T>)state!;
// Release any postponed messages
List<Exception>? exceptions = null;
if (thisBufferBlock._boundingState != null)
{
// Note: No locks should be held at this point
Common.ReleaseAllPostponedMessages(thisBufferBlock,
thisBufferBlock._boundingState.PostponedMessages,
ref exceptions);
}
if (exceptions != null)
{
// It is important to migrate these exceptions to the source part of the owning batch,
// because that is the completion task that is publicly exposed.
thisBufferBlock._source.AddExceptions(exceptions);
}
thisBufferBlock._source.Complete();
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
// Otherwise, we can just decline the source directly.
else
{
_source.Complete();
}
}
}
/// <summary>Gets the number of messages in the buffer. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int CountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, Count = {CountForDebugger}";
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the BufferBlock.</summary>
private sealed class DebugView
{
/// <summary>The buffer block.</summary>
private readonly BufferBlock<T> _bufferBlock;
/// <summary>The buffer's source half.</summary>
private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="bufferBlock">The BufferBlock being viewed.</param>
public DebugView(BufferBlock<T> bufferBlock)
{
Debug.Assert(bufferBlock != null, "Need a block with which to construct the debug view.");
_bufferBlock = bufferBlock;
_sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the collection of postponed message headers.</summary>
public QueuedMap<ISourceBlock<T>, DataflowMessageHeader>? PostponedMessages
{
get { return _bufferBlock._boundingState?.PostponedMessages; }
}
/// <summary>Gets the messages in the buffer.</summary>
public IEnumerable<T> Queue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>The task used to process messages.</summary>
public Task? TaskForInputProcessing { get { return _bufferBlock._boundingState?.TaskForInputProcessing; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task? TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _bufferBlock._targetDecliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_bufferBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public ITargetBlock<T>? NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
}