-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Expand file tree
/
Copy pathParallelEnumerable.cs
More file actions
5965 lines (5482 loc) · 322 KB
/
ParallelEnumerable.cs
File metadata and controls
5965 lines (5482 loc) · 322 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ParallelEnumerable.cs
//
// The standard IEnumerable-based LINQ-to-Objects query provider. This class basically
// mirrors the System.Linq.Enumerable class, but (1) takes as input a special "parallel
// enumerable" data type and (2) uses an alternative implementation of the operators.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
using System.Linq.Parallel;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
namespace System.Linq
{
//-----------------------------------------------------------------------------------
// Languages like C# and VB that support query comprehensions translate queries
// into calls to a query provider which creates executable representations of the
// query. The LINQ-to-Objects provider is implemented as a static class with an
// extension method per-query operator; when invoked, these return enumerable
// objects that implement the querying behavior.
//
// We have a new sequence class for two reasons:
//
// (1) Developers can opt in to parallel query execution piecemeal, by using
// a special AsParallel API to wrap the data source.
// (2) Parallel LINQ uses a new representation for queries when compared to LINQ,
// which we must return from the new sequence operator implementations.
//
// Comments and documentation will be somewhat light in this file. Please refer
// to the "official" Standard Query Operators specification for details on each API:
// http://download.microsoft.com/download/5/8/6/5868081c-68aa-40de-9a45-a3803d8134b8/Standard_Query_Operators.doc
//
// Notes:
// The Standard Query Operators herein should be semantically equivalent to
// the specification linked to above. In some cases, we offer operators that
// aren't available in the sequential LINQ library; in each case, we will note
// why this is needed.
//
/// <summary>
/// Provides a set of methods for querying objects that implement
/// ParallelQuery{TSource}. This is the parallel equivalent of
/// <see cref="System.Linq.Enumerable"/>.
/// </summary>
public static class ParallelEnumerable
{
// We pass this string constant to an attribute constructor. Unfortunately, we cannot access resources from
// an attribute constructor, so we have to store this string in source code.
private const string RIGHT_SOURCE_NOT_PARALLEL_STR =
"The second data source of a binary operator must be of type System.Linq.ParallelQuery<T> rather than "
+ "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
+ "to convert the right data source to System.Linq.ParallelQuery<T>.";
// When running in single partition mode, PLINQ operations will occur on a single partition and will not
// be executed in parallel, but will retain PLINQ semantics (exceptions wrapped as aggregates, etc).
[System.Runtime.Versioning.SupportedOSPlatformGuard("browser")]
internal static bool SinglePartitionMode => OperatingSystem.IsBrowser();
//-----------------------------------------------------------------------------------
// Converts any IEnumerable<TSource> into something that can be the target of parallel
// query execution.
//
// Arguments:
// source - the enumerable data source
// options - query analysis options to override the defaults
// degreeOfParallelism - the DOP to use instead of the system default, if any
//
// Notes:
// If the argument is already a parallel enumerable, such as a query operator,
// no new objects are allocated. Otherwise, a very simple wrapper is instantiated
// that exposes the IEnumerable as a ParallelQuery.
//
/// <summary>
/// Enables parallelization of a query.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">An <see cref="System.Collections.Generic.IEnumerable{T}"/>
/// to convert to a <see cref="System.Linq.ParallelQuery{T}"/>.</param>
/// <returns>The source as a <see cref="System.Linq.ParallelQuery{T}"/> to bind to
/// ParallelEnumerable extension methods.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
return new ParallelEnumerableWrapper<TSource>(source);
}
/// <summary>
/// Enables parallelization of a query, as sourced by a partitioner
/// responsible for splitting the input sequence into partitions.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A partitioner over the input sequence.</param>
/// <returns>The <paramref name="source"/> as a ParallelQuery to bind to ParallelEnumerable extension methods.</returns>
/// <remarks>
/// The source partitioner's GetOrderedPartitions method is used when ordering is enabled,
/// whereas the partitioner's GetPartitions is used if ordering is not enabled (the default).
/// The source partitioner's GetDynamicPartitions and GetDynamicOrderedPartitions are not used.
/// </remarks>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TSource> AsParallel<TSource>(this Partitioner<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
return new PartitionerQueryOperator<TSource>(source);
}
/// <summary>
/// Enables treatment of a data source as if it was ordered, overriding the default of unordered.
/// AsOrdered may only be invoked on sequences returned by AsParallel, ParallelEnumerable.Range,
/// and ParallelEnumerable.Repeat.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">The input sequence.</param>
/// <exception cref="System.InvalidOperationException">
/// Thrown if <paramref name="source"/> is not one of AsParallel, ParallelEnumerable.Range, or ParallelEnumerable.Repeat.
/// </exception>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <remarks>
/// A natural tension exists between performance and preserving order in parallel processing. By default,
/// a parallelized query behaves as if the ordering of the results is arbitrary
/// unless AsOrdered is applied or there is an explicit OrderBy operator in the query.
/// </remarks>
/// <returns>The source sequence which will maintain ordering in the query.</returns>
public static ParallelQuery<TSource> AsOrdered<TSource>(this ParallelQuery<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
if (!(source is ParallelEnumerableWrapper<TSource> || source is IParallelPartitionable<TSource>))
{
if (source is PartitionerQueryOperator<TSource> partitionerOp)
{
if (!partitionerOp.Orderable)
{
throw new InvalidOperationException(SR.ParallelQuery_PartitionerNotOrderable);
}
}
else
{
throw new InvalidOperationException(SR.ParallelQuery_InvalidAsOrderedCall);
}
}
return new OrderingQueryOperator<TSource>(QueryOperator<TSource>.AsQueryOperator(source), true);
}
/// <summary>
/// Enables treatment of a data source as if it was ordered, overriding the default of unordered.
/// AsOrdered may only be invoked on sequences returned by AsParallel, ParallelEnumerable.Range,
/// and ParallelEnumerable.Repeat.
/// </summary>
/// <param name="source">The input sequence.</param>
/// <exception cref="InvalidOperationException">
/// Thrown if the <paramref name="source"/> is not one of AsParallel, ParallelEnumerable.Range, or ParallelEnumerable.Repeat.
/// </exception>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <remarks>
/// A natural tension exists between performance and preserving order in parallel processing. By default,
/// a parallelized query behaves as if the ordering of the results is arbitrary unless AsOrdered
/// is applied or there is an explicit OrderBy operator in the query.
/// </remarks>
/// <returns>The source sequence which will maintain ordering in the query.</returns>
public static ParallelQuery AsOrdered(this ParallelQuery source)
{
ArgumentNullException.ThrowIfNull(source);
ParallelEnumerableWrapper? wrapper = source as ParallelEnumerableWrapper;
if (wrapper == null)
{
throw new InvalidOperationException(SR.ParallelQuery_InvalidNonGenericAsOrderedCall);
}
return new OrderingQueryOperator<object?>(QueryOperator<object?>.AsQueryOperator(wrapper), true);
}
/// <summary>
/// Allows an intermediate query to be treated as if no ordering is implied among the elements.
/// </summary>
/// <remarks>
/// AsUnordered may provide
/// performance benefits when ordering is not required in a portion of a query.
/// </remarks>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">The input sequence.</param>
/// <returns>The source sequence with arbitrary order.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TSource> AsUnordered<TSource>(this ParallelQuery<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
return new OrderingQueryOperator<TSource>(QueryOperator<TSource>.AsQueryOperator(source), false);
}
/// <summary>
/// Enables parallelization of a query.
/// </summary>
/// <param name="source">An <see cref="System.Collections.Generic.IEnumerable{T}"/> to convert
/// to a <see cref="System.Linq.ParallelQuery{T}"/>.</param>
/// <returns>
/// The source as a ParallelQuery to bind to
/// ParallelEnumerable extension methods.
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery AsParallel(this IEnumerable source)
{
ArgumentNullException.ThrowIfNull(source);
return new ParallelEnumerableWrapper(source);
}
//-----------------------------------------------------------------------------------
// Converts a parallel enumerable into something that forces sequential execution.
//
// Arguments:
// source - the parallel enumerable data source
//
/// <summary>
/// Converts a <see cref="ParallelQuery{T}"/> into an
/// <see cref="System.Collections.Generic.IEnumerable{T}"/> to force sequential
/// evaluation of the query.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A <see cref="ParallelQuery{T}"/> to convert to an <see cref="System.Collections.Generic.IEnumerable{T}"/>.</param>
/// <returns>The source as an <see cref="System.Collections.Generic.IEnumerable{T}"/>
/// to bind to sequential extension methods.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static IEnumerable<TSource> AsSequential<TSource>(this ParallelQuery<TSource> source)
{
ArgumentNullException.ThrowIfNull(source);
// Ditch the wrapper, if there is one.
if (source is ParallelEnumerableWrapper<TSource> wrapper)
{
return wrapper.WrappedEnumerable;
}
else
{
return source;
}
}
/// <summary>
/// Sets the degree of parallelism to use in a query. Degree of parallelism is the maximum number of concurrently
/// executing tasks that will be used to process the query.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A ParallelQuery on which to set the limit on the degrees of parallelism.</param>
/// <param name="degreeOfParallelism">The degree of parallelism for the query.</param>
/// <returns>ParallelQuery representing the same query as source, with the limit on the degrees of parallelism set.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// WithDegreeOfParallelism is used multiple times in the query.
/// </exception>
/// <exception cref="System.ArgumentOutOfRangeException">
/// <paramref name="degreeOfParallelism"/> is less than 1 or greater than 512.
/// </exception>
public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this ParallelQuery<TSource> source, int degreeOfParallelism)
{
ArgumentNullException.ThrowIfNull(source);
if (degreeOfParallelism < 1 || degreeOfParallelism > Scheduling.MAX_SUPPORTED_DOP)
{
throw new ArgumentOutOfRangeException(nameof(degreeOfParallelism));
}
QuerySettings settings = QuerySettings.Empty;
settings.DegreeOfParallelism = degreeOfParallelism;
return new QueryExecutionOption<TSource>(
QueryOperator<TSource>.AsQueryOperator(source), settings);
}
/// <summary>
/// Sets the <see cref="System.Threading.CancellationToken"/> to associate with the query.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A ParallelQuery on which to set the option.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>ParallelQuery representing the same query as source, but with the <seealso cref="System.Threading.CancellationToken"/>
/// registered.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// WithCancellation is used multiple times in the query.
/// </exception>
public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(source);
QuerySettings settings = QuerySettings.Empty;
settings.CancellationState = new CancellationState(cancellationToken);
return new QueryExecutionOption<TSource>(
QueryOperator<TSource>.AsQueryOperator(source), settings);
}
/// <summary>
/// Sets the execution mode of the query.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A ParallelQuery on which to set the option.</param>
/// <param name="executionMode">The mode in which to execute the query.</param>
/// <returns>ParallelQuery representing the same query as source, but with the
/// <seealso cref="System.Linq.ParallelExecutionMode"/> registered.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="System.ArgumentException">
/// <paramref name="executionMode"/> is not a valid <see cref="System.Linq.ParallelExecutionMode"/> value.
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// WithExecutionMode is used multiple times in the query.
/// </exception>
public static ParallelQuery<TSource> WithExecutionMode<TSource>(this ParallelQuery<TSource> source, ParallelExecutionMode executionMode)
{
ArgumentNullException.ThrowIfNull(source);
if (executionMode != ParallelExecutionMode.Default && executionMode != ParallelExecutionMode.ForceParallelism)
{
throw new ArgumentException(SR.ParallelEnumerable_WithQueryExecutionMode_InvalidMode);
}
QuerySettings settings = QuerySettings.Empty;
settings.ExecutionMode = executionMode;
return new QueryExecutionOption<TSource>(
QueryOperator<TSource>.AsQueryOperator(source), settings);
}
/// <summary>
/// Sets the merge options for this query, which specify how the query will buffer output.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">A ParallelQuery on which to set the option.</param>
/// <param name="mergeOptions">The merge options to set for this query.</param>
/// <returns>ParallelQuery representing the same query as source, but with the
/// <seealso cref="ParallelMergeOptions"/> registered.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="System.ArgumentException">
/// <paramref name="mergeOptions"/> is not a valid <see cref="System.Linq.ParallelMergeOptions"/> value.
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// WithMergeOptions is used multiple times in the query.
/// </exception>
public static ParallelQuery<TSource> WithMergeOptions<TSource>(this ParallelQuery<TSource> source, ParallelMergeOptions mergeOptions)
{
ArgumentNullException.ThrowIfNull(source);
if (mergeOptions != ParallelMergeOptions.Default
&& mergeOptions != ParallelMergeOptions.AutoBuffered
&& mergeOptions != ParallelMergeOptions.NotBuffered
&& mergeOptions != ParallelMergeOptions.FullyBuffered)
{
throw new ArgumentException(SR.ParallelEnumerable_WithMergeOptions_InvalidOptions);
}
QuerySettings settings = QuerySettings.Empty;
settings.MergeOptions = mergeOptions;
return new QueryExecutionOption<TSource>(
QueryOperator<TSource>.AsQueryOperator(source), settings);
}
//-----------------------------------------------------------------------------------
// Range generates a sequence of numbers that can be used as input to a query.
//
/// <summary>
/// Generates a parallel sequence of integral numbers within a specified range.
/// </summary>
/// <param name="start">The value of the first integer in the sequence.</param>
/// <param name="count">The number of sequential integers to generate.</param>
/// <returns>An <b>IEnumerable<Int32></b> in C# or <B>IEnumerable(Of Int32)</B> in
/// Visual Basic that contains a range of sequential integral numbers.</returns>
/// <exception cref="System.ArgumentOutOfRangeException">
/// <paramref name="count"/> is less than 0
/// -or-
/// <paramref name="start"/> + <paramref name="count"/> - 1 is larger than <see cref="int.MaxValue"/>.
/// </exception>
public static ParallelQuery<int> Range(int start, int count)
{
if (count < 0 || (count > 0 && int.MaxValue - (count - 1) < start)) throw new ArgumentOutOfRangeException(nameof(count));
return new RangeEnumerable(start, count);
}
//-----------------------------------------------------------------------------------
// Repeat just generates a sequence of size 'count' containing 'element'.
//
/// <summary>
/// Generates a parallel sequence that contains one repeated value.
/// </summary>
/// <typeparam name="TResult">The type of the value to be repeated in the result sequence.</typeparam>
/// <param name="element">The value to be repeated.</param>
/// <param name="count">The number of times to repeat the value in the generated sequence.</param>
/// <returns>A sequence that contains a repeated value.</returns>
/// <exception cref="System.ArgumentOutOfRangeException">
/// <paramref name="count"/> is less than 0.
/// </exception>
public static ParallelQuery<TResult> Repeat<TResult>(TResult element, int count)
{
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
return new RepeatEnumerable<TResult>(element, count);
}
//-----------------------------------------------------------------------------------
// Returns an always-empty sequence.
//
/// <summary>
/// Returns an empty ParallelQuery{TResult} that has the specified type argument.
/// </summary>
/// <typeparam name="TResult">The type to assign to the type parameter of the returned
/// generic sequence.</typeparam>
/// <returns>An empty sequence whose type argument is <typeparamref name="TResult"/>.</returns>
public static ParallelQuery<TResult> Empty<TResult>()
{
return System.Linq.Parallel.EmptyEnumerable<TResult>.Instance;
}
//-----------------------------------------------------------------------------------
// A new query operator that allows an arbitrary user-specified "action" to be
// tacked on to the query tree. The action will be invoked for every element in the
// underlying data source, avoiding a costly final merge in the query's execution,
// which can lead to much better scalability. The caveat is that these occur in
// parallel, so the user providing an action must take care to eliminate shared state
// accesses or to synchronize as appropriate.
//
// Arguments:
// source - the data source over which the actions will be invoked
// action - a delegate representing the per-element action to be invoked
//
// Notes:
// Neither source nor action may be null, otherwise this method throws.
//
/// <summary>
/// Invokes in parallel the specified action for each element in the <paramref name="source"/>.
/// </summary>
/// <remarks>
/// This is an efficient way to process the output from a parallelized query because it does
/// not require a merge step at the end. However, order of execution is non-deterministic.
/// </remarks>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <param name="source">The <see cref="ParallelQuery{T}"/> whose elements will be processed by
/// <paramref name="action"/>.</param>
/// <param name="action">An Action to invoke on each element.</param>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="action"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="System.AggregateException">
/// One or more exceptions occurred during the evaluation of the query.
/// </exception>
/// <exception cref="System.OperationCanceledException">
/// The query was canceled.
/// </exception>
public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(action);
// We just instantiate the forall operator and invoke it synchronously on this thread.
// By the time it returns, the entire query has been executed and the actions run..
new ForAllOperator<TSource>(source, action).RunSynchronously();
}
/*===================================================================================
* BASIC OPERATORS
*===================================================================================*/
//-----------------------------------------------------------------------------------
// Where is an operator that filters any elements from the data source for which the
// user-supplied predicate returns false.
//
/// <summary>
/// Filters in parallel a sequence of values based on a predicate.
/// </summary>
/// <typeparam name="TSource">The type of the elements of source.</typeparam>
/// <param name="source">A sequence to filter.</param>
/// <param name="predicate">A function to test each element for a condition.</param>
/// <returns>A sequence that contains elements from the input sequence that satisfy
/// the condition.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="predicate"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TSource> Where<TSource>(this ParallelQuery<TSource> source, Func<TSource, bool> predicate)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);
return new WhereQueryOperator<TSource>(source, predicate);
}
/// <summary>
/// Filters in parallel a sequence of values based on a predicate. Each element's index is used in the logic of the predicate function.
/// </summary>
/// <typeparam name="TSource">The type of the elements of source.</typeparam>
/// <param name="source">A sequence to filter.</param>
/// <param name="predicate">A function to test each element for a condition.</param>
/// <returns>A sequence that contains elements from the input sequence that satisfy the condition.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="predicate"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TSource> Where<TSource>(this ParallelQuery<TSource> source, Func<TSource, int, bool> predicate)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);
return new IndexedWhereQueryOperator<TSource>(source, predicate);
}
//-----------------------------------------------------------------------------------
// Select merely maps a selector delegate over each element in the data source.
//
/// <summary>
/// Projects in parallel each element of a sequence into a new form.
/// </summary>
/// <typeparam name="TSource">The type of the elements of <paramref name="source"/>.</typeparam>
/// <typeparam name="TResult">The type of elements returned by <b>selector</b>.</typeparam>
/// <param name="source">A sequence of values to invoke a transform function on.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <returns>A sequence whose elements are the result of invoking the transform function on each
/// element of <paramref name="source"/>.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="selector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> Select<TSource, TResult>(
this ParallelQuery<TSource> source, Func<TSource, TResult> selector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);
return new SelectQueryOperator<TSource, TResult>(source, selector);
}
/// <summary>
/// Projects in parallel each element of a sequence into a new form by incorporating the element's index.
/// </summary>
/// <typeparam name="TSource">The type of the elements of <paramref name="source"/>.</typeparam>
/// <typeparam name="TResult">The type of elements returned by <b>selector</b>.</typeparam>
/// <param name="source">A sequence of values to invoke a transform function on.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <returns>A sequence whose elements are the result of invoking the transform function on each
/// element of <paramref name="source"/>.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="selector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> Select<TSource, TResult>(
this ParallelQuery<TSource> source, Func<TSource, int, TResult> selector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);
return new IndexedSelectQueryOperator<TSource, TResult>(source, selector);
}
//-----------------------------------------------------------------------------------
// Zip combines an outer and inner data source into a single output data stream.
//
/// <summary>
/// Merges in parallel two sequences by using the specified predicate function.
/// </summary>
/// <typeparam name="TFirst">The type of the elements of the first sequence.</typeparam>
/// <typeparam name="TSecond">The type of the elements of the second sequence.</typeparam>
/// <typeparam name="TResult">The type of the return elements.</typeparam>
/// <param name="first">The first sequence to zip.</param>
/// <param name="second">The second sequence to zip.</param>
/// <param name="resultSelector">A function to create a result element from two matching elements.</param>
/// <returns>
/// A sequence that has elements of type <typeparamref name="TResult"/> that are obtained by performing
/// resultSelector pairwise on two sequences. If the sequence lengths are unequal, this truncates
/// to the length of the shorter sequence.
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="first"/> or <paramref name="second"/> or <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult>(
this ParallelQuery<TFirst> first, ParallelQuery<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
ArgumentNullException.ThrowIfNull(first);
ArgumentNullException.ThrowIfNull(second);
ArgumentNullException.ThrowIfNull(resultSelector);
return new ZipQueryOperator<TFirst, TSecond, TResult>(first, second, resultSelector);
}
/// <summary>
/// This Zip overload should never be called.
/// This method is marked as obsolete and always throws
/// <see cref="System.NotSupportedException"/> when invoked.
/// </summary>
/// <typeparam name="TFirst">This type parameter is not used.</typeparam>
/// <typeparam name="TSecond">This type parameter is not used.</typeparam>
/// <typeparam name="TResult">This type parameter is not used.</typeparam>
/// <param name="first">This parameter is not used.</param>
/// <param name="second">This parameter is not used.</param>
/// <param name="resultSelector">This parameter is not used.</param>
/// <returns>This overload always throws a <see cref="System.NotSupportedException"/>.</returns>
/// <exception cref="System.NotSupportedException">The exception that occurs when this method is called.</exception>
/// <remarks>
/// This overload exists to disallow usage of Zip with a left data source of type
/// <see cref="System.Linq.ParallelQuery{TFirst}"/> and a right data source of type <see cref="System.Collections.Generic.IEnumerable{TSecond}"/>.
/// Otherwise, the Zip operator would appear to be bind to the parallel implementation, but would in reality bind to the sequential implementation.
/// </remarks>
[Obsolete(RIGHT_SOURCE_NOT_PARALLEL_STR)]
public static ParallelQuery<TResult> Zip<TFirst, TSecond, TResult>(
this ParallelQuery<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
throw new NotSupportedException(SR.ParallelEnumerable_BinaryOpMustUseAsParallel);
}
//-----------------------------------------------------------------------------------
// Join is an inner join operator, i.e. elements from outer with no inner matches
// will yield no results in the output data stream.
//
/// <summary>
/// Correlates in parallel the elements of two sequences based on matching keys.
/// The default equality comparer is used to compare keys.
/// </summary>
/// <typeparam name="TOuter">The type of the elements of the first sequence.</typeparam>
/// <typeparam name="TInner">The type of the elements of the second sequence.</typeparam>
/// <typeparam name="TKey">The type of the keys returned by the key selector functions.</typeparam>
/// <typeparam name="TResult">The type of the result elements.</typeparam>
/// <param name="outer">The first sequence to join.</param>
/// <param name="inner">The sequence to join to the first sequence.</param>
/// <param name="outerKeySelector">A function to extract the join key from each element of
/// the first sequence.</param>
/// <param name="innerKeySelector">A function to extract the join key from each element of
/// the second sequence.</param>
/// <param name="resultSelector">A function to create a result element from two matching elements.</param>
/// <returns>A sequence that has elements of type <typeparamref name="TResult"/> that are obtained by performing
/// an inner join on two sequences.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="outer"/> or <paramref name="inner"/> or <paramref name="outerKeySelector"/> or
/// <paramref name="innerKeySelector"/> or <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, ParallelQuery<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector)
{
return Join<TOuter, TInner, TKey, TResult>(
outer, inner, outerKeySelector, innerKeySelector, resultSelector, null);
}
/// <summary>
/// This Join overload should never be called.
/// This method is marked as obsolete and always throws <see cref="System.NotSupportedException"/> when invoked.
/// </summary>
/// <typeparam name="TOuter">This type parameter is not used.</typeparam>
/// <typeparam name="TInner">This type parameter is not used.</typeparam>
/// <typeparam name="TKey">This type parameter is not used.</typeparam>
/// <typeparam name="TResult">This type parameter is not used.</typeparam>
/// <param name="outer">This parameter is not used.</param>
/// <param name="inner">This parameter is not used.</param>
/// <param name="outerKeySelector">This parameter is not used.</param>
/// <param name="innerKeySelector">This parameter is not used.</param>
/// <param name="resultSelector">This parameter is not used.</param>
/// <returns>This overload always throws a <see cref="System.NotSupportedException"/>.</returns>
/// <exception cref="System.NotSupportedException">The exception that occurs when this method is called.</exception>
/// <remarks>
/// This overload exists to disallow usage Join with a left data source of type
/// <see cref="System.Linq.ParallelQuery{TOuter}"/> and a right data source of type <see cref="System.Collections.Generic.IEnumerable{TInner}"/>.
/// Otherwise, the Join operator would appear to be binding to the parallel implementation, but would in reality bind to the sequential implementation.
/// </remarks>
[Obsolete(RIGHT_SOURCE_NOT_PARALLEL_STR)]
public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, IEnumerable<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector)
{
throw new NotSupportedException(SR.ParallelEnumerable_BinaryOpMustUseAsParallel);
}
/// <summary>
/// Correlates in parallel the elements of two sequences based on matching keys.
/// A specified IEqualityComparer{T} is used to compare keys.
/// </summary>
/// <typeparam name="TOuter">The type of the elements of the first sequence.</typeparam>
/// <typeparam name="TInner">The type of the elements of the second sequence.</typeparam>
/// <typeparam name="TKey">The type of the keys returned by the key selector functions.</typeparam>
/// <typeparam name="TResult">The type of the result elements.</typeparam>
/// <param name="outer">The first sequence to join.</param>
/// <param name="inner">The sequence to join to the first sequence.</param>
/// <param name="outerKeySelector">A function to extract the join key from each element
/// of the first sequence.</param>
/// <param name="innerKeySelector">A function to extract the join key from each element
/// of the second sequence.</param>
/// <param name="resultSelector">A function to create a result element from two matching elements.</param>
/// <param name="comparer">An IEqualityComparer<(Of <(T>)>) to hash and compare keys.</param>
/// <returns>A sequence that has elements of type <typeparamref name="TResult"/> that are obtained by performing
/// an inner join on two sequences.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="outer"/> or <paramref name="inner"/> or <paramref name="outerKeySelector"/> or
/// <paramref name="innerKeySelector"/> or <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, ParallelQuery<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey>? comparer)
{
ArgumentNullException.ThrowIfNull(outer);
ArgumentNullException.ThrowIfNull(inner);
ArgumentNullException.ThrowIfNull(outerKeySelector);
ArgumentNullException.ThrowIfNull(innerKeySelector);
ArgumentNullException.ThrowIfNull(resultSelector);
return new JoinQueryOperator<TOuter, TInner, TKey, TResult>(
outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
}
/// <summary>
/// This Join overload should never be called.
/// This method is marked as obsolete and always throws <see cref="System.NotSupportedException"/> when invoked.
/// </summary>
/// <typeparam name="TOuter">This type parameter is not used.</typeparam>
/// <typeparam name="TInner">This type parameter is not used.</typeparam>
/// <typeparam name="TKey">This type parameter is not used.</typeparam>
/// <typeparam name="TResult">This type parameter is not used.</typeparam>
/// <param name="outer">This parameter is not used.</param>
/// <param name="inner">This parameter is not used.</param>
/// <param name="outerKeySelector">This parameter is not used.</param>
/// <param name="innerKeySelector">This parameter is not used.</param>
/// <param name="resultSelector">This parameter is not used.</param>
/// <param name="comparer">This parameter is not used.</param>
/// <returns>This overload always throws a <see cref="System.NotSupportedException"/>.</returns>
/// <exception cref="System.NotSupportedException">The exception that occurs when this method is called.</exception>
/// <remarks>
/// This overload exists to disallow usage of Join with a left data source of type
/// <see cref="System.Linq.ParallelQuery{TOuter}"/> and a right data source of type <see cref="System.Collections.Generic.IEnumerable{TInner}"/>.
/// Otherwise, the Join operator would appear to be binding to the parallel implementation, but would in reality bind to the sequential implementation.
/// </remarks>
[Obsolete(RIGHT_SOURCE_NOT_PARALLEL_STR)]
public static ParallelQuery<TResult> Join<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, IEnumerable<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey>? comparer)
{
throw new NotSupportedException(SR.ParallelEnumerable_BinaryOpMustUseAsParallel);
}
//-----------------------------------------------------------------------------------
// GroupJoin is an outer join operator, i.e. elements from outer with no inner matches
// will yield results (empty lists) in the output data stream.
//
/// <summary>
/// Correlates in parallel the elements of two sequences based on equality of keys and groups the results.
/// The default equality comparer is used to compare keys.
/// </summary>
/// <typeparam name="TOuter">The type of the elements of the first sequence.</typeparam>
/// <typeparam name="TInner">The type of the elements of the second sequence.</typeparam>
/// <typeparam name="TKey">The type of the keys returned by the key selector functions.</typeparam>
/// <typeparam name="TResult">The type of the result elements.</typeparam>
/// <param name="outer">The first sequence to join.</param>
/// <param name="inner">The sequence to join to the first sequence.</param>
/// <param name="outerKeySelector">A function to extract the join key from each element
/// of the first sequence.</param>
/// <param name="innerKeySelector">A function to extract the join key from each element
/// of the second sequence.</param>
/// <param name="resultSelector">A function to create a result element from an element from
/// the first sequence and a collection of matching elements from the second sequence.</param>
/// <returns>A sequence that has elements of type <typeparamref name="TResult"/> that are obtained by performing
/// a grouped join on two sequences.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="outer"/> or <paramref name="inner"/> or <paramref name="outerKeySelector"/> or
/// <paramref name="innerKeySelector"/> or <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, ParallelQuery<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
{
return GroupJoin<TOuter, TInner, TKey, TResult>(
outer, inner, outerKeySelector, innerKeySelector, resultSelector, null);
}
/// <summary>
/// This GroupJoin overload should never be called.
/// This method is marked as obsolete and always throws <see cref="System.NotSupportedException"/> when called.
/// </summary>
/// <typeparam name="TOuter">This type parameter is not used.</typeparam>
/// <typeparam name="TInner">This type parameter is not used.</typeparam>
/// <typeparam name="TKey">This type parameter is not used.</typeparam>
/// <typeparam name="TResult">This type parameter is not used.</typeparam>
/// <param name="outer">This parameter is not used.</param>
/// <param name="inner">This parameter is not used.</param>
/// <param name="outerKeySelector">This parameter is not used.</param>
/// <param name="innerKeySelector">This parameter is not used.</param>
/// <param name="resultSelector">This parameter is not used.</param>
/// <returns>This overload always throws a <see cref="System.NotSupportedException"/>.</returns>
/// <exception cref="System.NotSupportedException">The exception that occurs when this method is called.</exception>
/// <remarks>
/// This overload exists to disallow usage of GroupJoin with a left data source of type
/// <see cref="System.Linq.ParallelQuery{TOuter}"/> and a right data source of type <see cref="System.Collections.Generic.IEnumerable{TInner}"/>.
/// Otherwise, the GroupJoin operator would appear to be binding to the parallel implementation,
/// but would in reality bind to the sequential implementation.
///</remarks>
[Obsolete(RIGHT_SOURCE_NOT_PARALLEL_STR)]
public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, IEnumerable<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, IEnumerable<TInner>, TResult> resultSelector)
{
throw new NotSupportedException(SR.ParallelEnumerable_BinaryOpMustUseAsParallel);
}
/// <summary>
/// Correlates in parallel the elements of two sequences based on key equality and groups the results.
/// A specified IEqualityComparer{T} is used to compare keys.
/// </summary>
/// <typeparam name="TOuter">The type of the elements of the first sequence.</typeparam>
/// <typeparam name="TInner">The type of the elements of the second sequence.</typeparam>
/// <typeparam name="TKey">The type of the keys returned by the key selector functions.</typeparam>
/// <typeparam name="TResult">The type of the result elements.</typeparam>
/// <param name="outer">The first sequence to join.</param>
/// <param name="inner">The sequence to join to the first sequence.</param>
/// <param name="outerKeySelector">A function to extract the join key from each element
/// of the first sequence.</param>
/// <param name="innerKeySelector">A function to extract the join key from each element
/// of the second sequence.</param>
/// <param name="resultSelector">A function to create a result element from an element from
/// the first sequence and a collection of matching elements from the second sequence.</param>
/// <param name="comparer">An IEqualityComparer<(Of <(T>)>) to hash and compare keys.</param>
/// <returns>A sequence that has elements of type <typeparamref name="TResult"/> that are obtained by performing
/// a grouped join on two sequences.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="outer"/> or <paramref name="inner"/> or <paramref name="outerKeySelector"/> or
/// <paramref name="innerKeySelector"/> or <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, ParallelQuery<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, IEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey>? comparer)
{
ArgumentNullException.ThrowIfNull(outer);
ArgumentNullException.ThrowIfNull(inner);
ArgumentNullException.ThrowIfNull(outerKeySelector);
ArgumentNullException.ThrowIfNull(innerKeySelector);
ArgumentNullException.ThrowIfNull(resultSelector);
return new GroupJoinQueryOperator<TOuter, TInner, TKey, TResult>(outer, inner,
outerKeySelector, innerKeySelector, resultSelector, comparer);
}
/// <summary>
/// This GroupJoin overload should never be called.
/// This method is marked as obsolete and always throws <see cref="System.NotSupportedException"/> when called.
/// </summary>
/// <typeparam name="TOuter">This type parameter is not used.</typeparam>
/// <typeparam name="TInner">This type parameter is not used.</typeparam>
/// <typeparam name="TKey">This type parameter is not used.</typeparam>
/// <typeparam name="TResult">This type parameter is not used.</typeparam>
/// <param name="outer">This parameter is not used.</param>
/// <param name="inner">This parameter is not used.</param>
/// <param name="outerKeySelector">This parameter is not used.</param>
/// <param name="innerKeySelector">This parameter is not used.</param>
/// <param name="resultSelector">This parameter is not used.</param>
/// <param name="comparer">This parameter is not used.</param>
/// <returns>This overload always throws a <see cref="System.NotSupportedException"/>.</returns>
/// <exception cref="System.NotSupportedException">The exception that occurs when this method is called.</exception>
/// <remarks>
/// This overload exists to disallow usage of GroupJoin with a left data source of type
/// <see cref="System.Linq.ParallelQuery{TOuter}"/> and a right data source of type <see cref="System.Collections.Generic.IEnumerable{TInner}"/>.
/// Otherwise, the GroupJoin operator would appear to be binding to the parallel implementation,
/// but would in reality bind to the sequential implementation.
/// </remarks>
[Obsolete(RIGHT_SOURCE_NOT_PARALLEL_STR)]
public static ParallelQuery<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this ParallelQuery<TOuter> outer, IEnumerable<TInner> inner,
Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector,
Func<TOuter, IEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey>? comparer)
{
throw new NotSupportedException(SR.ParallelEnumerable_BinaryOpMustUseAsParallel);
}
//-----------------------------------------------------------------------------------
// SelectMany is a kind of nested loops join. For each element in the outer data
// source, we enumerate each element in the inner data source, yielding the result
// with some kind of selection routine. A few different flavors are supported.
//
/// <summary>
/// Projects in parallel each element of a sequence to an IEnumerable{T}
/// and flattens the resulting sequences into one sequence.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <typeparam name="TResult">The type of the elements of the sequence returned by <B>selector</B>.</typeparam>
/// <param name="source">A sequence of values to project.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <returns>A sequence whose elements are the result of invoking the one-to-many transform
/// function on each element of the input sequence.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="selector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> SelectMany<TSource, TResult>(
this ParallelQuery<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);
return new SelectManyQueryOperator<TSource, TResult, TResult>(source, selector, null, null);
}
/// <summary>
/// Projects in parallel each element of a sequence to an IEnumerable{T}, and flattens the resulting
/// sequences into one sequence. The index of each source element is used in the projected form of
/// that element.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <typeparam name="TResult">The type of the elements of the sequence returned by <B>selector</B>.</typeparam>
/// <param name="source">A sequence of values to project.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <returns>A sequence whose elements are the result of invoking the one-to-many transform
/// function on each element of the input sequence.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="selector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> SelectMany<TSource, TResult>(
this ParallelQuery<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);
return new SelectManyQueryOperator<TSource, TResult, TResult>(source, null, selector, null);
}
/// <summary>
/// Projects each element of a sequence to an IEnumerable{T},
/// flattens the resulting sequences into one sequence, and invokes a result selector
/// function on each element therein.
/// </summary>
/// <typeparam name="TSource">The type of elements of <paramref name="source"/>.</typeparam>
/// <typeparam name="TCollection">The type of the intermediate elements collected by <paramref name="collectionSelector"/>.</typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="source">A sequence of values to project.</param>
/// <param name="collectionSelector">A transform function to apply to each source element;
/// the second parameter of the function represents the index of the source element.</param>
/// <param name="resultSelector">A function to create a result element from an element from
/// the first sequence and a collection of matching elements from the second sequence.</param>
/// <returns>A sequence whose elements are the result of invoking the one-to-many transform
/// function <paramref name="collectionSelector"/> on each element of <paramref name="source"/> and then mapping
/// each of those sequence elements and their corresponding source element to a result element.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="source"/> or <paramref name="collectionSelector"/> or
/// <paramref name="resultSelector"/> is a null reference (Nothing in Visual Basic).
/// </exception>
public static ParallelQuery<TResult> SelectMany<TSource, TCollection, TResult>(
this ParallelQuery<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector,
Func<TSource, TCollection, TResult> resultSelector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(collectionSelector);
ArgumentNullException.ThrowIfNull(resultSelector);
return new SelectManyQueryOperator<TSource, TCollection, TResult>(source, collectionSelector, null, resultSelector);
}