diff --git a/BepInEx/ThreadingHelper.cs b/BepInEx/ThreadingHelper.cs index 8a199f8c..840ed43e 100644 --- a/BepInEx/ThreadingHelper.cs +++ b/BepInEx/ThreadingHelper.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.ComponentModel; using System.Linq; +using System.Reflection; using System.Threading; using BepInEx.Logging; using UnityEngine; @@ -142,7 +144,7 @@ object ISynchronizeInvoke.EndInvoke(IAsyncResult result) { var invokeResult = (InvokeResult)result; invokeResult.AsyncWaitHandle.WaitOne(); - + if (invokeResult.ExceptionThrown) throw (Exception)invokeResult.AsyncState; return invokeResult.AsyncState; @@ -170,7 +172,7 @@ public InvokeResult() public void Finish(object result, bool completedSynchronously) { - AsyncState = result; + AsyncState = result; CompletedSynchronously = completedSynchronously; IsCompleted = true; ((EventWaitHandle)AsyncWaitHandle).Set(); @@ -185,4 +187,103 @@ public void Finish(object result, bool completedSynchronously) #endregion } + + /// + /// Convenience extensions for utilizing multiple threads and using the . + /// + public static class ThreadingExtensions + { + /// + public static IEnumerable RunParallel(this IEnumerable data, Func work, int workerCount = -1) + { + foreach (var result in RunParallel(data.ToList(), work)) + yield return result; + } + + /// + /// Apply a function to a collection of data by spreading the work on multiple threads. + /// Outputs of the functions are returned to the current thread and yielded one by one. + /// + /// Type of the input values. + /// Type of the output values. + /// Input values for the work function. + /// Function to apply to the data on multiple threads at once. + /// Number of worker threads. By default SystemInfo.processorCount is used. + /// An exception was thrown inside one of the threads, and the operation was aborted. + /// Need at least 1 workerCount. + public static IEnumerable RunParallel(this IList data, Func work, int workerCount = -1) + { + if (workerCount < 0) + workerCount = Mathf.Max(2, SystemInfo.processorCount); + else if (workerCount == 0) + throw new ArgumentException("Need at least 1 worker", nameof(workerCount)); + + var perThreadCount = Mathf.CeilToInt(data.Count / (float)workerCount); + var doneCount = 0; + + var lockObj = new object(); + var are = new ManualResetEvent(false); + IEnumerable doneItems = null; + Exception exceptionThrown = null; + + // Start threads to process the data + for (var i = 0; i < workerCount; i++) + { + int first = i * perThreadCount; + int last = Mathf.Min(first + perThreadCount, data.Count); + ThreadPool.QueueUserWorkItem( + _ => + { + var results = new List(perThreadCount); + + try + { + for (int dataIndex = first; dataIndex < last; dataIndex++) + { + if (exceptionThrown != null) break; + results.Add(work(data[dataIndex])); + } + } + catch (Exception ex) + { + exceptionThrown = ex; + } + + lock (lockObj) + { + doneItems = doneItems == null ? results : results.Concat(doneItems); + doneCount++; + are.Set(); + } + }); + } + + // Main thread waits for results and returns them until all threads finish + while (true) + { + are.WaitOne(); + + IEnumerable toOutput; + bool isDone; + lock (lockObj) + { + toOutput = doneItems; + doneItems = null; + isDone = doneCount == workerCount; + } + + if (toOutput != null) + { + foreach (var doneItem in toOutput) + yield return doneItem; + } + + if (isDone) + break; + } + + if (exceptionThrown != null) + throw new TargetInvocationException("An exception was thrown inside one of the threads", exceptionThrown); + } + } } \ No newline at end of file