Skip to content

Commit

Permalink
Added RunParallel extension method for enumerables
Browse files Browse the repository at this point in the history
  • Loading branch information
ManlyMarco committed Nov 28, 2019
1 parent 16dbd1e commit 92dd69e
Showing 1 changed file with 103 additions and 2 deletions.
105 changes: 103 additions & 2 deletions BepInEx/ThreadingHelper.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Reflection;
using System.Threading;
using BepInEx.Logging;
using UnityEngine;
Expand Down Expand Up @@ -142,7 +144,7 @@ object ISynchronizeInvoke.EndInvoke(IAsyncResult result)
{
var invokeResult = (InvokeResult)result;
invokeResult.AsyncWaitHandle.WaitOne();

if (invokeResult.ExceptionThrown)
throw (Exception)invokeResult.AsyncState;
return invokeResult.AsyncState;
Expand Down Expand Up @@ -170,7 +172,7 @@ public InvokeResult()

public void Finish(object result, bool completedSynchronously)
{
AsyncState = result;
AsyncState = result;
CompletedSynchronously = completedSynchronously;
IsCompleted = true;
((EventWaitHandle)AsyncWaitHandle).Set();
Expand All @@ -185,4 +187,103 @@ public void Finish(object result, bool completedSynchronously)

#endregion
}

/// <summary>
/// Convenience extensions for utilizing multiple threads and using the <see cref="ThreadingHelper"/>.
/// </summary>
public static class ThreadingExtensions
{
/// <inheritdoc cref="RunParallel{TIn,TOut}(IList{TIn},Func{TIn,TOut},int)"/>
public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IEnumerable<TIn> data, Func<TIn, TOut> work, int workerCount = -1)
{
foreach (var result in RunParallel(data.ToList(), work))
yield return result;
}

/// <summary>
/// Apply a function to a collection of data by spreading the work on multiple threads.
/// Outputs of the functions are returned to the current thread and yielded one by one.
/// </summary>
/// <typeparam name="TIn">Type of the input values.</typeparam>
/// <typeparam name="TOut">Type of the output values.</typeparam>
/// <param name="data">Input values for the work function.</param>
/// <param name="work">Function to apply to the data on multiple threads at once.</param>
/// <param name="workerCount">Number of worker threads. By default SystemInfo.processorCount is used.</param>
/// <exception cref="TargetInvocationException">An exception was thrown inside one of the threads, and the operation was aborted.</exception>
/// <exception cref="ArgumentException">Need at least 1 workerCount.</exception>
public static IEnumerable<TOut> RunParallel<TIn, TOut>(this IList<TIn> data, Func<TIn, TOut> work, int workerCount = -1)
{
if (workerCount < 0)
workerCount = Mathf.Max(2, SystemInfo.processorCount);
else if (workerCount == 0)
throw new ArgumentException("Need at least 1 worker", nameof(workerCount));

var perThreadCount = Mathf.CeilToInt(data.Count / (float)workerCount);
var doneCount = 0;

var lockObj = new object();
var are = new ManualResetEvent(false);
IEnumerable<TOut> doneItems = null;
Exception exceptionThrown = null;

// Start threads to process the data
for (var i = 0; i < workerCount; i++)
{
int first = i * perThreadCount;
int last = Mathf.Min(first + perThreadCount, data.Count);
ThreadPool.QueueUserWorkItem(
_ =>
{
var results = new List<TOut>(perThreadCount);
try
{
for (int dataIndex = first; dataIndex < last; dataIndex++)
{
if (exceptionThrown != null) break;
results.Add(work(data[dataIndex]));
}
}
catch (Exception ex)
{
exceptionThrown = ex;
}
lock (lockObj)
{
doneItems = doneItems == null ? results : results.Concat(doneItems);
doneCount++;
are.Set();
}
});
}

// Main thread waits for results and returns them until all threads finish
while (true)
{
are.WaitOne();

IEnumerable<TOut> toOutput;
bool isDone;
lock (lockObj)
{
toOutput = doneItems;
doneItems = null;
isDone = doneCount == workerCount;
}

if (toOutput != null)
{
foreach (var doneItem in toOutput)
yield return doneItem;
}

if (isDone)
break;
}

if (exceptionThrown != null)
throw new TargetInvocationException("An exception was thrown inside one of the threads", exceptionThrown);
}
}
}

0 comments on commit 92dd69e

Please sign in to comment.