/
BlockingCollectionExtensions.cs
205 lines (182 loc) · 10.2 KB
/
BlockingCollectionExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace WCell.Util.Threading.TaskParallel
{
/// <summary>Extension methods for BlockingCollection.</summary>
public static class BlockingCollectionExtensions
{
/// <summary>
/// Gets a partitioner for a BlockingCollection that consumes and yields the contents of the BlockingCollection.</summary>
/// <typeparam name="T">Specifies the type of data in the collection.</typeparam>
/// <param name="collection">The collection for which to create a partitioner.</param>
/// <returns>A partitioner that completely consumes and enumerates the contents of the collection.</returns>
/// <remarks>
/// Using this partitioner with a Parallel.ForEach loop or with PLINQ eliminates the need for those
/// constructs to do any additional locking. The only synchronization in place is that used by the
/// BlockingCollection internally.
/// </remarks>
public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
{
return new BlockingCollectionPartitioner<T>(collection);
}
/// <summary>Provides a partitioner that consumes a blocking collection and yields its contents.</summary>
/// <typeparam name="T">Specifies the type of data in the collection.</typeparam>
private class BlockingCollectionPartitioner<T> : Partitioner<T>
{
/// <summary>The target collection.</summary>
private BlockingCollection<T> _collection;
/// <summary>Initializes the partitioner.</summary>
/// <param name="collection">The collection to be enumerated and consumed.</param>
internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
{
if (collection == null) throw new ArgumentNullException("collection");
_collection = collection;
}
/// <summary>Gets whether additional partitions can be created dynamically.</summary>
public override bool SupportsDynamicPartitions { get { return true; } }
/// <summary>Partitions the underlying collection into the given number of partitions.</summary>
/// <param name="partitionCount">The number of partitions to create.</param>
/// <returns>A list containing partitionCount enumerators.</returns>
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
if (partitionCount < 1) throw new ArgumentOutOfRangeException("partitionCount");
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
}
/// <summary>
/// Creates an object that can partition the underlying collection into a variable number of partitions.
/// </summary>
/// <returns>An object that can create partitions over the underlying data source.</returns>
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
/// <summary>Adds the contents of an enumerable to the BlockingCollection.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
/// <param name="target">The target BlockingCollection to be augmented.</param>
/// <param name="source">The source enumerable containing the data to be added.</param>
/// <param name="completeAddingWhenDone">
/// Whether to mark the target BlockingCollection as complete for adding when
/// all elements of the source enumerable have been transfered.
/// </param>
public static void AddFromEnumerable<T>(this BlockingCollection<T> target, IEnumerable<T> source, bool completeAddingWhenDone)
{
try { foreach (var item in source) target.Add(item); }
finally { if (completeAddingWhenDone) target.CompleteAdding(); }
}
/// <summary>Adds the contents of an observable to the BlockingCollection.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
/// <param name="target">The target BlockingCollection to be augmented.</param>
/// <param name="source">The source observable containing the data to be added.</param>
/// <param name="completeAddingWhenDone">
/// Whether to mark the target BlockingCollection as complete for adding when
/// all elements of the source observable have been transfered.
/// </param>
/// <returns>An IDisposable that may be used to cancel the transfer.</returns>
public static IDisposable AddFromObservable<T>(this BlockingCollection<T> target, IObservable<T> source, bool completeAddingWhenDone)
{
if (target == null) throw new ArgumentNullException("target");
if (source == null) throw new ArgumentNullException("source");
return source.Subscribe(new DelegateBasedObserver<T>
(
onNext: item => target.Add(item),
onError: error => { if (completeAddingWhenDone) target.CompleteAdding(); },
onCompleted: () => { if (completeAddingWhenDone) target.CompleteAdding(); }
));
}
/// <summary>Creates an IProducerConsumerCollection-facade for a BlockingCollection instance.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
/// <param name="collection">The BlockingCollection.</param>
/// <returns>
/// An IProducerConsumerCollection that wraps the provided BlockingCollection.
/// </returns>
public static IProducerConsumerCollection<T> ToProducerConsumerCollection<T>(
this BlockingCollection<T> collection)
{
return ToProducerConsumerCollection(collection, Timeout.Infinite);
}
/// <summary>Creates an IProducerConsumerCollection-facade for a BlockingCollection instance.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
/// <param name="collection">The BlockingCollection.</param>
/// <param name="millisecondsTimeout">-1 for infinite blocking add and take operations. 0 for non-blocking, 1 or greater for blocking with timeout.</param>
/// <returns>An IProducerConsumerCollection that wraps the provided BlockingCollection.</returns>
public static IProducerConsumerCollection<T> ToProducerConsumerCollection<T>(
this BlockingCollection<T> collection, int millisecondsTimeout)
{
return new ProducerConsumerWrapper<T>(collection, millisecondsTimeout, new CancellationToken());
}
/// <summary>Creates an IProducerConsumerCollection-facade for a BlockingCollection instance.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
/// <param name="collection">The BlockingCollection.</param>
/// <param name="millisecondsTimeout">-1 for infinite blocking add and take operations. 0 for non-blocking, 1 or greater for blocking with timeout.</param>
/// <param name="cancellationToken">The CancellationToken to use for any blocking operations.</param>
/// <returns>An IProducerConsumerCollection that wraps the provided BlockingCollection.</returns>
public static IProducerConsumerCollection<T> ToProducerConsumerCollection<T>(
this BlockingCollection<T> collection, int millisecondsTimeout, CancellationToken cancellationToken)
{
return new ProducerConsumerWrapper<T>(collection, millisecondsTimeout, cancellationToken);
}
/// <summary>Provides a producer-consumer collection facade for a BlockingCollection.</summary>
/// <typeparam name="T">Specifies the type of the elements in the collection.</typeparam>
internal sealed class ProducerConsumerWrapper<T> : IProducerConsumerCollection<T>
{
private readonly BlockingCollection<T> _collection;
private readonly int _millisecondsTimeout;
private readonly CancellationToken _cancellationToken;
public ProducerConsumerWrapper(
BlockingCollection<T> collection, int millisecondsTimeout, CancellationToken cancellationToken)
{
if (collection == null) throw new ArgumentNullException("bc");
if (millisecondsTimeout < -1) throw new ArgumentOutOfRangeException("millisecondsTimeout");
_collection = collection;
_millisecondsTimeout = millisecondsTimeout;
_cancellationToken = cancellationToken;
}
public void CopyTo(T[] array, int index)
{
_collection.CopyTo(array, index);
}
public T[] ToArray()
{
return _collection.ToArray();
}
public bool TryAdd(T item)
{
return _collection.TryAdd(item, _millisecondsTimeout, _cancellationToken);
}
public bool TryTake(out T item)
{
return _collection.TryTake(out item, _millisecondsTimeout, _cancellationToken);
}
public IEnumerator<T> GetEnumerator()
{
return ((IEnumerable<T>)_collection).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void CopyTo(Array array, int index)
{
((ICollection)_collection).CopyTo(array, index);
}
public int Count
{
get { return _collection.Count; }
}
public bool IsSynchronized
{
get { return ((ICollection)_collection).IsSynchronized; }
}
public object SyncRoot
{
get { return ((ICollection)_collection).SyncRoot; }
}
}
}
}