diff --git a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs index c016fa279d7..5a64b4b9920 100644 --- a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs +++ b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs @@ -86,8 +86,8 @@ internal SocketConnection(Socket socket, MemoryPool pool) // TODO: Make this configurable // Dispatch to avoid deadlocks - _input = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun)); - _output = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun)); + _input = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool)); + _output = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool)); _receiveTask = ReceiveFromSocketAndPushToWriterAsync(); _sendTask = ReadFromReaderAndWriteToSocketAsync(); diff --git a/src/System.IO.Pipelines/System.IO.Pipelines.csproj b/src/System.IO.Pipelines/System.IO.Pipelines.csproj index f7f4ff09035..dd3c7bbcca7 100644 --- a/src/System.IO.Pipelines/System.IO.Pipelines.csproj +++ b/src/System.IO.Pipelines/System.IO.Pipelines.csproj @@ -2,7 +2,7 @@ An abstraction for doing efficient asynchronous IO - netstandard1.1 + netstandard1.1;netstandard2.0;netcoreapp2.1 true http://go.microsoft.com/fwlink/?linkid=833199 diff --git a/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs b/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs index b65eb70480e..ba9fe000cfc 100644 --- a/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/InlineScheduler.cs @@ -4,7 +4,7 @@ namespace System.Threading { - internal class InlineScheduler : Scheduler + internal sealed class InlineScheduler : Scheduler { public override void Schedule(Action action) { diff --git a/src/System.IO.Pipelines/System/Threading/Scheduler.cs b/src/System.IO.Pipelines/System/Threading/Scheduler.cs index 6685dcda0fe..a2edfeeafa7 100644 --- a/src/System.IO.Pipelines/System/Threading/Scheduler.cs +++ b/src/System.IO.Pipelines/System/Threading/Scheduler.cs @@ -5,8 +5,11 @@ namespace System.Threading { public abstract class Scheduler { - public static Scheduler TaskRun { get; } = new TaskRunScheduler(); - public static Scheduler Inline { get; } = new InlineScheduler(); + private static ThreadPoolScheduler _threadPoolScheduler = new ThreadPoolScheduler(); + private static InlineScheduler _inlineScheduler = new InlineScheduler(); + + public static Scheduler ThreadPool => _threadPoolScheduler; + public static Scheduler Inline => _inlineScheduler; public abstract void Schedule(Action action); public abstract void Schedule(Action action, object state); diff --git a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs b/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs deleted file mode 100644 index e2cdd3bc04d..00000000000 --- a/src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System.Threading.Tasks; - -namespace System.Threading -{ - internal class TaskRunScheduler : Scheduler - { - public override void Schedule(Action action) - { - Task.Factory.StartNew(action); - } - - public override void Schedule(Action action, object state) - { - Task.Factory.StartNew(action, state); - } - } -} diff --git a/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs new file mode 100644 index 00000000000..4de228df9d0 --- /dev/null +++ b/src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs @@ -0,0 +1,55 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading +{ + internal sealed class ThreadPoolScheduler : Scheduler + { + public override void Schedule(Action action) + { +#if NETCOREAPP2_1 + // Queue to low contention local ThreadPool queue; rather than global queue as per Task + Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action, preferLocal: true); +#elif NETSTANDARD2_0 + Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action); +#else + Task.Factory.StartNew(action); +#endif + } + + public override void Schedule(Action action, object state) + { +#if NETCOREAPP2_1 + // Queue to low contention local ThreadPool queue; rather than global queue as per Task + Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state), preferLocal: true); +#elif NETSTANDARD2_0 + Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state)); +#else + Task.Factory.StartNew(action, state); +#endif + } + +#if NETCOREAPP2_1 || NETSTANDARD2_0 + private readonly static WaitCallback _actionWaitCallback = state => ((Action)state)(); + + private readonly static WaitCallback _actionObjectWaitCallback = state => ((ActionObjectAsWaitCallback)state).Run(); + + private sealed class ActionObjectAsWaitCallback + { + private Action _action; + private object _state; + + public ActionObjectAsWaitCallback(Action action, object state) + { + _action = action; + _state = state; + } + + public void Run() => _action(_state); + } +#endif + } +}