Skip to content

Commit

Permalink
Use a dynamically sized threadpool for the I/Os
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Gosse committed Feb 26, 2019
1 parent 0a987aa commit dda2c4d
Showing 1 changed file with 145 additions and 27 deletions.
172 changes: 145 additions & 27 deletions src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Concurrent;
using Microsoft.Win32.SafeHandles;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -32,7 +33,7 @@ namespace System.Net.Sockets

internal sealed class SocketAsyncContext
{
private static FixedSizeThreadPool IOThreadPool = new FixedSizeThreadPool(Environment.ProcessorCount * 2);
private static DynamicThreadPool IOThreadPool = new DynamicThreadPool(Environment.ProcessorCount * 2);

// Cached operation instances for operations commonly repeated on the same socket instance,
// e.g. async accepts, sends/receives with single and multiple buffers. More can be
Expand Down Expand Up @@ -1954,70 +1955,187 @@ public static void OutputTrace(string s)
public static string IdOf(object o) => o == null ? "(null)" : $"{o.GetType().Name}#{o.GetHashCode():X2}";
}

internal class FixedSizeThreadPool
internal class DynamicThreadPool
{
private readonly System.Collections.Concurrent.ConcurrentQueue<WorkItem> _queue;
private const int WorkerTimeout = 20 * 1000;
private const int StartWorkerDelay = 500;

public FixedSizeThreadPool(int numberOfThreads)
private readonly ConcurrentQueue<(WaitCallback callback, object state)> _queue;
private readonly ConcurrentStack<Worker> _pendingWorkers;

private int _lastTickCount;

public DynamicThreadPool(int minSize)
{
_queue = new System.Collections.Concurrent.ConcurrentQueue<WorkItem>();
_queue = new ConcurrentQueue<(WaitCallback callback, object state)>();
_pendingWorkers = new ConcurrentStack<Worker>();

new Thread(GateThread) { IsBackground = true }.Start();

for (int i = 0; i < numberOfThreads; i++)
for (int i = 0; i < minSize; i++)
{
var thread = new System.Threading.Thread(Worker) { IsBackground = true };
thread.Start();
StartWorker(Timeout.Infinite);
}
}

public void QueueWorkItem(WaitCallback callback, object state)
{
_queue.Enqueue(new WorkItem(callback, state));
_queue.Enqueue((callback, state));

lock (_queue)
while (_pendingWorkers.TryPop(out var worker))
{
Monitor.Pulse(_queue);
if (worker.TryWakeUp())
{
// At least one worker is awake, the action will be dequeued eventually
return;
}

if (_queue.IsEmpty)
{
// The work got processed, no need to wake anybody anymore
return;
}
}

// No worker, try to start one
StartWorkerIfNeeded();
}

private void Worker()
private void GateThread()
{
// There's a subtle race-condition with workers: if an item get enqueued while the worker prepares to sleep, or right after the sleep timed out,
// the worker will be in the "Pending workers" queue, yet busy executing some work.
// It's possible that this work depends synchronously on another item enqueued to the same threadpool, and this same worker
// gets waken up to execute it, leading to a deadlock.
// The gate thread is there as a secondary mechanism to make the threadpool grow if it happens.
while (true)
{
if (!_queue.TryDequeue(out var item))
Thread.Sleep(StartWorkerDelay);
StartWorkerIfNeeded();
}
}

private void StartWorkerIfNeeded()
{
if (_queue.IsEmpty)
{
return;
}

while (true)
{
var tickCount = Environment.TickCount;
var lastTickCount = _lastTickCount;

// Make sure enough time elapsed since we last started a worker
if (tickCount - _lastTickCount > StartWorkerDelay)
{
lock (_queue)
if (Interlocked.CompareExchange(ref _lastTickCount, tickCount, lastTickCount) != lastTickCount)
{
while (!_queue.TryDequeue(out item))
{
Monitor.Wait(_queue);
}
// Another thread beat us, retry
continue;
}

StartWorker(WorkerTimeout);

return;
}

item.Execute();
return;
}
}

private readonly struct WorkItem
private void StartWorker(int timeout)
{
private readonly WaitCallback _callback;
private readonly object _state;
new Thread(() => WorkerThread(timeout)) { IsBackground = true }.Start();
}

public WorkItem(WaitCallback callback, object state)
private void WorkerThread(int timeout)
{
var worker = new Worker();

while (true)
{
_callback = callback;
_state = state;
if (TryExecuteItem())
{
continue;
}

// Nothing left in the queue, prepare to sleep
worker.Event.Reset();
_pendingWorkers.Push(worker);

// We need to check the queue again before sleeping, as an item could have been added in the meantime.
if (TryExecuteItem())
{
continue;
}

if (worker.Event.Wait(timeout))
{
continue;
}

// The wait timed out. Scale down the threadpool
if (worker.TryExit())
{
return;
}

// If we got there, it means we lost the race condition and got signaled as we tried to exit. Resume the loop.
}
}

public void Execute()
private bool TryExecuteItem()
{
if (_queue.TryDequeue(out var item))
{
try
{
_callback(_state);
item.callback(item.state);
}
catch (Exception)
{
}

return true;
}

return false;
}

internal class Worker
{
public readonly ManualResetEventSlim Event = new ManualResetEventSlim(false);
private bool _hasExited;

public bool TryWakeUp()
{
lock (Event)
{
if (!_hasExited)
{
Event.Set();
return true;
}
}

return false;
}

public bool TryExit()
{
lock (Event)
{
if (!Event.IsSet)
{
_hasExited = true;
Event.Dispose();
return true;
}
}

return false;
}
}
}
Expand Down

0 comments on commit dda2c4d

Please sign in to comment.