Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock-free IOQueue #6154

Merged
merged 5 commits into from Feb 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs
Expand Up @@ -10,23 +10,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
public class IOQueue : PipeScheduler, IThreadPoolWorkItem
{
private readonly object _workSync = new object();
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private bool _doingWork;
private int _doingWork;

public override void Schedule(Action<object> action, object state)
{
var work = new Work(action, state);
_workItems.Enqueue(new Work(action, state));

_workItems.Enqueue(work);

lock (_workSync)
// Set working if it wasn't (via atomic Interlocked).
if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0)
{
if (!_doingWork)
{
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
_doingWork = true;
}
// Wasn't working, schedule.
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

Expand All @@ -39,14 +34,31 @@ void IThreadPoolWorkItem.Execute()
item.Callback(item.State);
}

lock (_workSync)
// All work done.

// Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim.
// This doesn't need to be volatile due to the following barrier (i.e. it is volatile).
_doingWork = 0;
benaadams marked this conversation as resolved.
Show resolved Hide resolved

// Ensure _doingWork is written before IsEmpty is read.
// As they are two different memory locations, we insert a barrier to guarantee ordering.
Thread.MemoryBarrier();

// Check if there is work to do
if (_workItems.IsEmpty)
benaadams marked this conversation as resolved.
Show resolved Hide resolved
{
// Nothing to do, exit.
break;
}

// Is work, can we set it as active again (via atomic Interlocked), prior to scheduling?
benaadams marked this conversation as resolved.
Show resolved Hide resolved
if (Interlocked.Exchange(ref _doingWork, 1) == 1)
{
if (_workItems.IsEmpty)
{
_doingWork = false;
return;
}
// Execute has been rescheduled already, exit.
break;
}

// Is work, wasn't already scheduled so continue loop.
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/Servers/Kestrel/Transport.Sockets/src/SocketTransport.cs
Expand Up @@ -19,8 +19,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
internal sealed class SocketTransport : ITransport
{
private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool };

private readonly MemoryPool<byte> _memoryPool;
private readonly IEndPointInformation _endPointInformation;
private readonly IConnectionDispatcher _dispatcher;
Expand Down Expand Up @@ -65,8 +63,9 @@ internal sealed class SocketTransport : ITransport
}
else
{
_numSchedulers = ThreadPoolSchedulerArray.Length;
_schedulers = ThreadPoolSchedulerArray;
var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool };
_numSchedulers = directScheduler.Length;
_schedulers = directScheduler;
benaadams marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
119 changes: 119 additions & 0 deletions src/Servers/Kestrel/perf/Kestrel.Performance/SchedulerBenchmark.cs
@@ -0,0 +1,119 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;

namespace Microsoft.AspNetCore.Server.Kestrel.Performance
{
public class SchedulerBenchmark
{
private const int InnerLoopCount = 1024;
private const int OuterLoopCount = 64;
private const int OperationsPerInvoke = InnerLoopCount * OuterLoopCount;

private readonly static int IOQueueCount = Math.Min(Environment.ProcessorCount, 16);

private PipeScheduler[] _ioQueueSchedulers;
private PipeScheduler[] _threadPoolSchedulers;
private PipeScheduler[] _inlineSchedulers;

private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _totalToReport;
private PaddedInteger[] _counters = new PaddedInteger[OuterLoopCount];

private Func<int, ParallelLoopState, PipeScheduler[], PipeScheduler[]> _parallelAction;
private Action<object> _action;

[GlobalSetup]
public void Setup()
{
_parallelAction = ParallelBody;
_action = new Action<object>(ScheduledAction);

_inlineSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _inlineSchedulers.Length; i++)
{
_inlineSchedulers[i] = PipeScheduler.Inline;
}

_threadPoolSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _threadPoolSchedulers.Length; i++)
{
_threadPoolSchedulers[i] = PipeScheduler.ThreadPool;
}

_ioQueueSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _ioQueueSchedulers.Length; i++)
{
_ioQueueSchedulers[i] = new IOQueue();
}
}

[IterationSetup]
public void IterationSetup()
{
_totalToReport = OuterLoopCount;

for (var i = 0; i < _counters.Length; i++)
{
_counters[i].Remaining = InnerLoopCount;
}
}

[Benchmark(OperationsPerInvoke = OperationsPerInvoke, Baseline = true)]
public void ThreadPoolScheduler() => Schedule(_threadPoolSchedulers);

[Benchmark(OperationsPerInvoke = OperationsPerInvoke)]
public void IOQueueScheduler() => Schedule(_ioQueueSchedulers);

[Benchmark(OperationsPerInvoke = OperationsPerInvoke)]
public void InlineScheduler() => Schedule(_inlineSchedulers);

private void Schedule(PipeScheduler[] schedulers)
{
Parallel.For(0, OuterLoopCount, () => schedulers, _parallelAction, (s) => { });

while (_totalToReport > 0)
{
_semaphore.Wait();
_totalToReport--;
}
}

private void ScheduledAction(object o)
{
var counter = (int)o;
var result = Interlocked.Decrement(ref _counters[counter].Remaining);
if (result == 0)
{
_semaphore.Release();
}
}

private PipeScheduler[] ParallelBody(int i, ParallelLoopState state, PipeScheduler[] schedulers)
{
PipeScheduler pipeScheduler = schedulers[i % schedulers.Length];
object counter = i;
for (var t = 0; t < InnerLoopCount; t++)
{
pipeScheduler.Schedule(_action, counter);
}

return schedulers;
}

[StructLayout(LayoutKind.Explicit, Size = 128)]
private struct PaddedInteger
{
// Padded to avoid false sharing
[FieldOffset(64)]
public int Remaining;
}
}
}