Skip to content

Commit 0894d35

Browse files
authored
Merge pull request #360 from BepInEx/feature-runparallel
Add ThreadingExtensions.ForEachParallel
2 parents 1504d80 + 9efa51b commit 0894d35

File tree

1 file changed

+65
-2
lines changed

1 file changed

+65
-2
lines changed

BepInEx.Unity/ThreadingHelper.cs

+65-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.ComponentModel;
44
using System.Linq;
@@ -214,7 +214,7 @@ public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IEnumerable<TIn> dat
214214
/// <typeparam name="TOut">Type of the output values.</typeparam>
215215
/// <param name="data">Input values for the work function.</param>
216216
/// <param name="work">Function to apply to the data on multiple threads at once.</param>
217-
/// <param name="workerCount">Number of worker threads. By default SystemInfo.processorCount is used.</param>
217+
/// <param name="workerCount">Number of worker threads. By default Environment.ProcessorCount is used.</param>
218218
/// <exception cref="TargetInvocationException">
219219
/// An exception was thrown inside one of the threads, and the operation was
220220
/// aborted.
@@ -297,4 +297,67 @@ public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IList<TIn> data,
297297
throw new TargetInvocationException("An exception was thrown inside one of the threads",
298298
exceptionThrown);
299299
}
300+
301+
/// <summary>
302+
/// Apply a function to a collection of data by spreading the work on multiple threads.
303+
/// Lower overhead than RunParallel but it blocks the main thread until all work is completed or an exception has been thrown.
304+
/// </summary>
305+
/// <typeparam name="T">Type of the input values.</typeparam>
306+
/// <param name="data">Input values for the work function.</param>
307+
/// <param name="work">Function to apply to the data on multiple threads at once.</param>
308+
/// <param name="workerCount">Number of worker threads. By default Environment.ProcessorCount is used.</param>
309+
/// <exception cref="TargetInvocationException">An exception was thrown inside one of the threads, and the operation was aborted.</exception>
310+
/// <exception cref="ArgumentException">Need at least 1 workerCount.</exception>
311+
public static void ForEachParallel<T>(this IList<T> data, Action<T> work, int workerCount = -1)
312+
{
313+
if (workerCount < 0)
314+
workerCount = Mathf.Max(2, Environment.ProcessorCount);
315+
else if (workerCount == 0)
316+
throw new ArgumentException("Need at least 1 worker", nameof(workerCount));
317+
318+
var currentIndex = data.Count;
319+
320+
var are = new ManualResetEvent(false);
321+
var runningCount = workerCount;
322+
Exception exceptionThrown = null;
323+
324+
void DoWork(object _)
325+
{
326+
try
327+
{
328+
while (true)
329+
{
330+
if (exceptionThrown != null)
331+
return;
332+
333+
var decrementedIndex = Interlocked.Decrement(ref currentIndex);
334+
if (decrementedIndex < 0)
335+
return;
336+
337+
work(data[decrementedIndex]);
338+
}
339+
}
340+
catch (Exception ex)
341+
{
342+
exceptionThrown = ex;
343+
}
344+
finally
345+
{
346+
var decCount = Interlocked.Decrement(ref runningCount);
347+
if (decCount <= 0)
348+
are.Set();
349+
}
350+
}
351+
352+
// Start threads to process the data
353+
for (var i = 0; i < workerCount - 1; i++)
354+
ThreadPool.QueueUserWorkItem(DoWork);
355+
356+
DoWork(null);
357+
358+
are.WaitOne();
359+
360+
if (exceptionThrown != null)
361+
throw new TargetInvocationException("An exception was thrown inside one of the threads", exceptionThrown);
362+
}
300363
}

0 commit comments

Comments
 (0)