Skip to content

Commit

Permalink
Parallelize epoll events on thread pool and process events in the sam…
Browse files Browse the repository at this point in the history
…e thread (dotnet#35330)

* Parallelize epoll events on thread pool and process events in the same thread

* Use interlocked write instead of volatile write

* Upon epoll notification for reads and writes to a socket, queue read work and process write work in same thread

* Readonly for Event struct and rename struct to SocketIOEvent

* Track and speculatively handle epoll events for synchronous operations on the epoll thread

* Prevent event scheduling threads from becoming long-running

* Non-speculatively schedule a work item to process epoll events upon first dequeue, delegating scheduling of more work items to other threads
  • Loading branch information
kouvel committed May 3, 2020
1 parent 82eff51 commit 8157271
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,21 @@ public void Dispatch()
}
else
{
// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
// Async operation.
Schedule();
}
}

public void Schedule()
{
Debug.Assert(Event == null);

// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

public void Process() => ((IThreadPoolWorkItem)this).Execute();

void IThreadPoolWorkItem.Execute()
{
// ReadOperation and WriteOperation, the only two types derived from
Expand Down Expand Up @@ -706,6 +716,7 @@ private enum QueueState : byte
// These fields define the queue state.

private QueueState _state; // See above
private bool _isNextOperationSynchronous;
private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification.
// It allows us to detect when a new epoll notification has arrived
// since the last time we checked the state of the queue.
Expand All @@ -720,6 +731,8 @@ private enum QueueState : byte

private LockToken Lock() => new LockToken(_queueLock);

public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous;

public void Init()
{
Debug.Assert(_queueLock == null);
Expand Down Expand Up @@ -779,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
// Enqueue the operation.
Debug.Assert(operation.Next == operation, "Expected operation.Next == operation");

if (_tail != null)
if (_tail == null)
{
Debug.Assert(!_isNextOperationSynchronous);
_isNextOperationSynchronous = operation.Event != null;
}
else
{
operation.Next = _tail.Next;
_tail.Next = operation;
Expand Down Expand Up @@ -825,8 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
}
}

// Called on the epoll thread whenever we receive an epoll notification.
public void HandleEvent(SocketAsyncContext context)
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context)
{
AsyncOperation op;
using (Lock())
Expand All @@ -839,7 +856,7 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail == null, "State == Ready but queue is not empty!");
_sequenceNumber++;
Trace(context, $"Exit (previously ready)");
return;
return null;

case QueueState.Waiting:
Debug.Assert(_tail != null, "State == Waiting but queue is empty!");
Expand All @@ -852,21 +869,31 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail != null, "State == Processing but queue is empty!");
_sequenceNumber++;
Trace(context, $"Exit (currently processing)");
return;
return null;

case QueueState.Stopped:
Debug.Assert(_tail == null);
Trace(context, $"Exit (stopped)");
return;
return null;

default:
Environment.FailFast("unexpected queue state");
return;
return null;
}
}

// Dispatch the op so we can try to process it.
op.Dispatch();
ManualResetEventSlim? e = op.Event;
if (e != null)
{
// Sync operation. Signal waiting thread to continue processing.
e.Set();
return null;
}
else
{
// Async operation. The caller will figure out how to process the IO.
return op;
}
}

internal void ProcessAsyncOperation(TOperation op)
Expand Down Expand Up @@ -991,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// No more operations to process
_tail = null;
_isNextOperationSynchronous = false;
_state = QueueState.Ready;
_sequenceNumber++;
Trace(context, $"Exit (finished queue)");
Expand All @@ -999,6 +1027,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// Pop current operation and advance to next
nextOp = _tail.Next = op.Next;
_isNextOperationSynchronous = nextOp.Event != null;
}
}
}
Expand Down Expand Up @@ -1033,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op)
{
// No more operations
_tail = null;
_isNextOperationSynchronous = false;
}
else
{
// Pop current operation and advance to next
_tail.Next = op.Next;
_isNextOperationSynchronous = op.Next.Event != null;
}

// We're the first op in the queue.
Expand Down Expand Up @@ -1112,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context)
}

_tail = null;
_isNextOperationSynchronous = false;

Trace(context, $"Exit");
}
Expand Down Expand Up @@ -1946,6 +1978,41 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co
return SocketError.IOPending;
}

// Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and
// returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically
// handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other
// hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that
// the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to
// be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may
// lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not
// advised) and more threads are not immediately available to run work items that would release those operations.
public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
// Set the Read and Write flags; the processing for these events
// will pick up the error.
events ^= Interop.Sys.SocketEvents.Error;
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0 &&
_receiveQueue.IsNextOperationSynchronous_Speculative &&
_receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
{
events ^= Interop.Sys.SocketEvents.Read;
}

if ((events & Interop.Sys.SocketEvents.Write) != 0 &&
_sendQueue.IsNextOperationSynchronous_Speculative &&
_sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
{
events ^= Interop.Sys.SocketEvents.Write;
}

return events;
}

public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
Expand All @@ -1955,14 +2022,23 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0)
AsyncOperation? receiveOperation =
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;
AsyncOperation? sendOperation =
(events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;

// This method is called from a thread pool thread. When we have only one operation to process, process it
// synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both
// synchronously may delay the second operation, so schedule one onto the thread pool and process the other
// synchronously. There might be better ways of doing this.
if (sendOperation == null)
{
_receiveQueue.HandleEvent(this);
receiveOperation?.Process();
}

if ((events & Interop.Sys.SocketEvents.Write) != 0)
else
{
_sendQueue.HandleEvent(this);
receiveOperation?.Schedule();
sendOperation.Process();
}
}

Expand Down
Loading

0 comments on commit 8157271

Please sign in to comment.