Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Scheduler improvements (#2026)
Browse files Browse the repository at this point in the history
- Support Scheduler de-virtualization
- Fast fire-and-forget Tasks
- Rename TaskRun -> ThreadPool
  • Loading branch information
benaadams authored and davidfowl committed Jan 11, 2018
1 parent 6d1c3c2 commit addc27e
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ internal SocketConnection(Socket socket, MemoryPool<byte> pool)

// TODO: Make this configurable
// Dispatch to avoid deadlocks
_input = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun));
_output = new Pipe(new PipeOptions(pool, Scheduler.TaskRun, Scheduler.TaskRun));
_input = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool));
_output = new Pipe(new PipeOptions(pool, Scheduler.ThreadPool, Scheduler.ThreadPool));

_receiveTask = ReceiveFromSocketAndPushToWriterAsync();
_sendTask = ReadFromReaderAndWriteToSocketAsync();
Expand Down
2 changes: 1 addition & 1 deletion src/System.IO.Pipelines/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Import Project="..\..\tools\common.props" />
<PropertyGroup>
<Description>An abstraction for doing efficient asynchronous IO</Description>
<TargetFramework>netstandard1.1</TargetFramework>
<TargetFrameworks>netstandard1.1;netstandard2.0;netcoreapp2.1</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageIconUrl>http://go.microsoft.com/fwlink/?linkid=833199</PackageIconUrl>
<!--<DefineConstants Condition="'$(Configuration)' == 'Debug'">$(DefineConstants);BLOCK_LEASE_TRACKING</DefineConstants>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace System.Threading
{
internal class InlineScheduler : Scheduler
internal sealed class InlineScheduler : Scheduler
{
public override void Schedule(Action action)
{
Expand Down
7 changes: 5 additions & 2 deletions src/System.IO.Pipelines/System/Threading/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ namespace System.Threading
{
public abstract class Scheduler
{
public static Scheduler TaskRun { get; } = new TaskRunScheduler();
public static Scheduler Inline { get; } = new InlineScheduler();
private static ThreadPoolScheduler _threadPoolScheduler = new ThreadPoolScheduler();
private static InlineScheduler _inlineScheduler = new InlineScheduler();

public static Scheduler ThreadPool => _threadPoolScheduler;
public static Scheduler Inline => _inlineScheduler;

public abstract void Schedule(Action action);
public abstract void Schedule(Action<object> action, object state);
Expand Down
21 changes: 0 additions & 21 deletions src/System.IO.Pipelines/System/Threading/TaskRunScheduler.cs

This file was deleted.

55 changes: 55 additions & 0 deletions src/System.IO.Pipelines/System/Threading/ThreadPoolScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;

namespace System.Threading
{
internal sealed class ThreadPoolScheduler : Scheduler
{
public override void Schedule(Action action)
{
#if NETCOREAPP2_1
// Queue to low contention local ThreadPool queue; rather than global queue as per Task
Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action, preferLocal: true);
#elif NETSTANDARD2_0
Threading.ThreadPool.QueueUserWorkItem(_actionWaitCallback, action);
#else
Task.Factory.StartNew(action);
#endif
}

public override void Schedule(Action<object> action, object state)
{
#if NETCOREAPP2_1
// Queue to low contention local ThreadPool queue; rather than global queue as per Task
Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state), preferLocal: true);
#elif NETSTANDARD2_0
Threading.ThreadPool.QueueUserWorkItem(_actionObjectWaitCallback, new ActionObjectAsWaitCallback(action, state));
#else
Task.Factory.StartNew(action, state);
#endif
}

#if NETCOREAPP2_1 || NETSTANDARD2_0
private readonly static WaitCallback _actionWaitCallback = state => ((Action)state)();

private readonly static WaitCallback _actionObjectWaitCallback = state => ((ActionObjectAsWaitCallback)state).Run();

private sealed class ActionObjectAsWaitCallback
{
private Action<object> _action;
private object _state;

public ActionObjectAsWaitCallback(Action<object> action, object state)
{
_action = action;
_state = state;
}

public void Run() => _action(_state);
}
#endif
}
}

0 comments on commit addc27e

Please sign in to comment.