From 1d328a2ae5f33d1a31542666270f7607e2c63572 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 6 Jan 2018 05:01:34 +0000 Subject: [PATCH 1/5] Support Scheduler devirtualization --- .../System/Threading/InlineScheduler.cs | 2 +- src/System.IO.Pipelines/System/Threading/Scheduler.cs | 7 +++++-- .../System/Threading/TaskRunScheduler.cs | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs b/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs index b65eb70480e..ba9fe000cfc 100644 --- a/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs @@ -4,7 +4,7 @@ namespace System.Threading { - internal class InlineScheduler : Scheduler + internal sealed class InlineScheduler : Scheduler { public override void Schedule(Action action) { diff --git a/src/System.IO.Pipelines/System/Threading/Scheduler.cs b/src/System.IO.Pipelines/System/Threading/Scheduler.cs index 6685dcda0fe..689949c68ff 100644 --- a/src/System.IO.Pipelines/System/Threading/Scheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/Scheduler.cs @@ -5,8 +5,11 @@ namespace System.Threading { public abstract class Scheduler { - public static Scheduler TaskRun { get; } = new TaskRunScheduler(); - public static Scheduler Inline { get; } = new InlineScheduler(); + private static TaskRunScheduler _taskRunScheduler = new TaskRunScheduler(); + private static InlineScheduler _inlineScheduler = new InlineScheduler(); + + public static Scheduler TaskRun => _taskRunScheduler; + public static Scheduler Inline => _inlineScheduler; public abstract void Schedule(Action action); public abstract void Schedule(Action action, object state); diff --git a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs b/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs index e2cdd3bc04d..234783dcc56 100644 --- a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs @@ -6,7 +6,7 @@ namespace System.Threading { - internal class TaskRunScheduler : Scheduler + internal sealed class TaskRunScheduler : Scheduler { public override void Schedule(Action action) { From d32726abb4825e90d9f090a929d5883a5221518d Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 6 Jan 2018 05:09:20 +0000 Subject: [PATCH 2/5] Fast fire-and-forget Tasks --- .../System.IO.Pipelines.csproj | 2 +- .../System/Threading/TaskRunScheduler.cs | 62 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/System.IO.Pipelines/System.IO.Pipelines.csproj b/src/System.IO.Pipelines/System.IO.Pipelines.csproj index f7f4ff09035..dd3c7bbcca7 100644 --- a/src/System.IO.Pipelines/System.IO.Pipelines.csproj +++ b/src/System.IO.Pipelines/System.IO.Pipelines.csproj @@ -2,7 +2,7 @@ An abstraction for doing efficient asynchronous IO - netstandard1.1 + netstandard1.1;netstandard2.0;netcoreapp2.1 true http://go.microsoft.com/fwlink/?linkid=833199 diff --git a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs b/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs index 234783dcc56..75b355728ca 100644 --- a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs @@ -10,12 +10,74 @@ internal sealed class TaskRunScheduler : Scheduler { public override void Schedule(Action action) { +#if NETCOREAPP2_1 + // Queue to low contention local ThreadPool queue; rather than global queue as per Task + ThreadPool.QueueUserWorkItem(_actionAsTask, action, preferLocal: true); +#elif NETSTANDARD2_0 + ThreadPool.QueueUserWorkItem(_actionAsTask, action); +#else Task.Factory.StartNew(action); +#endif } public override void Schedule(Action action, object state) { +#if NETCOREAPP2_1 + // Queue to low contention local ThreadPool queue; rather than global queue as per Task + ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state), preferLocal: true); +#elif NETSTANDARD2_0 + ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state)); +#else Task.Factory.StartNew(action, state); +#endif } + +#if NETCOREAPP2_1 || NETSTANDARD2_0 + // Catches only the exception into a failed Task, so the fire-and-forget action + // can be queued directly to the ThreadPool without the extra overhead of as Task + private readonly static WaitCallback _actionAsTask = state => + { + try + { + ((Action)state)(); + } + catch (Exception ex) + { + // Create faulted Task for the TaskScheulder to handle exception + // rather than letting it escape onto the ThreadPool and crashing the process + Task.FromException(ex); + } + }; + + private readonly static WaitCallback _actionObjectAsTask = state => ((ActionObjectAsTask)state).Run(); + + private sealed class ActionObjectAsTask + { + private Action _action; + private object _state; + + public ActionObjectAsTask(Action action, object state) + { + _action = action; + _state = state; + } + + // Catches only the exception into a failed Task, so the fire-and-forget action + // can be queued directly to the ThreadPool without the extra overhead of as Task + public void Run() + { + try + { + _action(_state); + } + catch (Exception ex) + { + // Create faulted Task for the TaskScheulder to handle exception + // rather than letting it escape onto the ThreadPool and crashing the process + Task.FromException(ex); + } + } + } +#endif } } From cc7549e2a4f86ce110c646aac7df642e6371f9d5 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 6 Jan 2018 06:08:28 +0000 Subject: [PATCH 3/5] Rename TaskRun -> ThreadPool --- src/System.IO.Pipelines/System/Threading/Scheduler.cs | 4 ++-- .../{TaskRunScheduler.cs => ThreadPoolScheduler.cs} | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) rename src/System.IO.Pipelines/System/Threading/{TaskRunScheduler.cs => ThreadPoolScheduler.cs} (84%) diff --git a/src/System.IO.Pipelines/System/Threading/Scheduler.cs b/src/System.IO.Pipelines/System/Threading/Scheduler.cs index 689949c68ff..a2edfeeafa7 100644 --- a/src/System.IO.Pipelines/System/Threading/Scheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/Scheduler.cs @@ -5,10 +5,10 @@ namespace System.Threading { public abstract class Scheduler { - private static TaskRunScheduler _taskRunScheduler = new TaskRunScheduler(); + private static ThreadPoolScheduler _threadPoolScheduler = new ThreadPoolScheduler(); private static InlineScheduler _inlineScheduler = new InlineScheduler(); - public static Scheduler TaskRun => _taskRunScheduler; + public static Scheduler ThreadPool => _threadPoolScheduler; public static Scheduler Inline => _inlineScheduler; public abstract void Schedule(Action action); diff --git a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs similarity index 84% rename from src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs rename to src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs index 75b355728ca..7a82dd43025 100644 --- a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs @@ -6,15 +6,15 @@ namespace System.Threading { - internal sealed class TaskRunScheduler : Scheduler + internal sealed class ThreadPoolScheduler : Scheduler { public override void Schedule(Action action) { #if NETCOREAPP2_1 // Queue to low contention local ThreadPool queue; rather than global queue as per Task - ThreadPool.QueueUserWorkItem(_actionAsTask, action, preferLocal: true); + Threading.ThreadPool.QueueUserWorkItem(_actionAsTask, action, preferLocal: true); #elif NETSTANDARD2_0 - ThreadPool.QueueUserWorkItem(_actionAsTask, action); + Threading.ThreadPool.QueueUserWorkItem(_actionAsTask, action); #else Task.Factory.StartNew(action); #endif @@ -24,9 +24,9 @@ public override void Schedule(Action action, object state) { #if NETCOREAPP2_1 // Queue to low contention local ThreadPool queue; rather than global queue as per Task - ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state), preferLocal: true); + Threading.ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state), preferLocal: true); #elif NETSTANDARD2_0 - ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state)); + Threading.ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state)); #else Task.Factory.StartNew(action, state); #endif From f3ffc2b024e305abc4aad27eb1467ef70cd3a542 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Thu, 11 Jan 2018 01:17:38 +0000 Subject: [PATCH 4/5] Fix build --- .../SocketConnection.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs index c016fa279d7..5a64b4b9920 100644 --- a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs +++ b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs @@ -86,8 +86,8 @@ internal SocketConnection(Socket socket, MemoryPool pool) // TODO: Make this configurable // Dispatch to avoid deadlocks - _input = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun)); - _output = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun)); + _input = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool)); + _output = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool)); _receiveTask = ReceiveFromSocketAndPushToWriterAsync(); _sendTask = ReadFromReaderAndWriteToSocketAsync(); From 89322e75c95654ed83ca61ae94372dc4b3cdc18a Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Thu, 11 Jan 2018 05:46:37 +0000 Subject: [PATCH 5/5] Don't catch throw --- .../System/Threading/ThreadPoolScheduler.cs | 46 ++++--------------- 1 file changed, 9 insertions(+), 37 deletions(-) diff --git a/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs index 7a82dd43025..4de228df9d0 100644 --- a/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs @@ -12,9 +12,9 @@ public override void Schedule(Action action) { #if NETCOREAPP2_1 // Queue to low contention local ThreadPool queue; rather than global queue as per Task - Threading.ThreadPool.QueueUserWorkItem(_actionAsTask, action, preferLocal: true); + Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action, preferLocal: true); #elif NETSTANDARD2_0 - Threading.ThreadPool.QueueUserWorkItem(_actionAsTask, action); + Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action); #else Task.Factory.StartNew(action); #endif @@ -24,59 +24,31 @@ public override void Schedule(Action action, object state) { #if NETCOREAPP2_1 // Queue to low contention local ThreadPool queue; rather than global queue as per Task - Threading.ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state), preferLocal: true); + Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state), preferLocal: true); #elif NETSTANDARD2_0 - Threading.ThreadPool.QueueUserWorkItem(_actionObjectAsTask, new ActionObjectAsTask(action, state)); + Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state)); #else Task.Factory.StartNew(action, state); #endif } #if NETCOREAPP2_1 || NETSTANDARD2_0 - // Catches only the exception into a failed Task, so the fire-and-forget action - // can be queued directly to the ThreadPool without the extra overhead of as Task - private readonly static WaitCallback _actionAsTask = state => - { - try - { - ((Action)state)(); - } - catch (Exception ex) - { - // Create faulted Task for the TaskScheulder to handle exception - // rather than letting it escape onto the ThreadPool and crashing the process - Task.FromException(ex); - } - }; + private readonly static WaitCallback _actionWaitCallback = state => ((Action)state)(); - private readonly static WaitCallback _actionObjectAsTask = state => ((ActionObjectAsTask)state).Run(); + private readonly static WaitCallback _actionObjectWaitCallback = state => ((ActionObjectAsWaitCallback)state).Run(); - private sealed class ActionObjectAsTask + private sealed class ActionObjectAsWaitCallback { private Action _action; private object _state; - public ActionObjectAsTask(Action action, object state) + public ActionObjectAsWaitCallback(Action action, object state) { _action = action; _state = state; } - // Catches only the exception into a failed Task, so the fire-and-forget action - // can be queued directly to the ThreadPool without the extra overhead of as Task - public void Run() - { - try - { - _action(_state); - } - catch (Exception ex) - { - // Create faulted Task for the TaskScheulder to handle exception - // rather than letting it escape onto the ThreadPool and crashing the process - Task.FromException(ex); - } - } + public void Run() => _action(_state); } #endif }