Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize epoll events on thread pool and process events in the same thread #35330

Merged
merged 13 commits into from
May 3, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,21 @@ public void Dispatch()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading the code correctly that we'll likely end up calling Set twice for a sync operation (with the second call just being a nop)? I think that's probably fine, but it'd be worth a comment calling that out.

Copy link
Member Author

@kouvel kouvel Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think it might be called twice? My intention was that either the epoll thread handles an event or it will queue it for processing in the background, but not both. If the epoll thread correctly sees that there is a pending sync operation next, then it would call set and not queue an operation for that. Otherwise, it would not process the event and queue it instead.

else
{
// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
// Async operation.
Schedule();
}
}

public void Schedule()
{
Debug.Assert(Event == null);

// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

public void Process() => ((IThreadPoolWorkItem)this).Execute();

void IThreadPoolWorkItem.Execute()
{
// ReadOperation and WriteOperation, the only two types derived from
Expand Down Expand Up @@ -706,6 +716,7 @@ private enum QueueState : byte
// These fields define the queue state.

private QueueState _state; // See above
private bool _isNextOperationSynchronous;
private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification.
// It allows us to detect when a new epoll notification has arrived
// since the last time we checked the state of the queue.
Expand All @@ -720,6 +731,8 @@ private enum QueueState : byte

private LockToken Lock() => new LockToken(_queueLock);

public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous;

public void Init()
{
Debug.Assert(_queueLock == null);
Expand Down Expand Up @@ -779,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
// Enqueue the operation.
Debug.Assert(operation.Next == operation, "Expected operation.Next == operation");

if (_tail != null)
if (_tail == null)
{
Debug.Assert(!_isNextOperationSynchronous);
_isNextOperationSynchronous = operation.Event != null;
}
else
{
operation.Next = _tail.Next;
_tail.Next = operation;
Expand Down Expand Up @@ -825,8 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
}
}

// Called on the epoll thread whenever we receive an epoll notification.
public void HandleEvent(SocketAsyncContext context)
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context)
{
AsyncOperation op;
using (Lock())
Expand All @@ -839,7 +856,7 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail == null, "State == Ready but queue is not empty!");
_sequenceNumber++;
Trace(context, $"Exit (previously ready)");
return;
return null;

case QueueState.Waiting:
Debug.Assert(_tail != null, "State == Waiting but queue is empty!");
Expand All @@ -852,21 +869,31 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail != null, "State == Processing but queue is empty!");
_sequenceNumber++;
Trace(context, $"Exit (currently processing)");
return;
return null;

case QueueState.Stopped:
Debug.Assert(_tail == null);
Trace(context, $"Exit (stopped)");
return;
return null;

default:
Environment.FailFast("unexpected queue state");
return;
return null;
}
}

// Dispatch the op so we can try to process it.
op.Dispatch();
ManualResetEventSlim? e = op.Event;
if (e != null)
{
// Sync operation. Signal waiting thread to continue processing.
e.Set();
return null;
}
else
{
// Async operation. The caller will figure out how to process the IO.
return op;
}
}

internal void ProcessAsyncOperation(TOperation op)
Expand Down Expand Up @@ -991,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// No more operations to process
_tail = null;
_isNextOperationSynchronous = false;
_state = QueueState.Ready;
_sequenceNumber++;
Trace(context, $"Exit (finished queue)");
Expand All @@ -999,6 +1027,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// Pop current operation and advance to next
nextOp = _tail.Next = op.Next;
_isNextOperationSynchronous = nextOp.Event != null;
}
}
}
Expand Down Expand Up @@ -1033,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op)
{
// No more operations
_tail = null;
_isNextOperationSynchronous = false;
}
else
{
// Pop current operation and advance to next
_tail.Next = op.Next;
_isNextOperationSynchronous = op.Next.Event != null;
}

// We're the first op in the queue.
Expand Down Expand Up @@ -1112,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context)
}

_tail = null;
_isNextOperationSynchronous = false;

Trace(context, $"Exit");
}
Expand Down Expand Up @@ -1946,6 +1978,41 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co
return SocketError.IOPending;
}

// Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and
// returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically
// handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other
// hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that
// the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to
// be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may
// lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not
// advised) and more threads are not immediately available to run work items that would release those operations.
public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
// Set the Read and Write flags; the processing for these events
// will pick up the error.
events ^= Interop.Sys.SocketEvents.Error;
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0 &&
_receiveQueue.IsNextOperationSynchronous_Speculative &&
_receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kouvel @stephentoub @adamsitnik I think there is an issue when IsNextOperationSynchronous_Speculative is true but the operation is not really a SyncEvent operation. The queue moves to Processing but no-one dispatches the operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, will fix

{
events ^= Interop.Sys.SocketEvents.Read;
}

if ((events & Interop.Sys.SocketEvents.Write) != 0 &&
_sendQueue.IsNextOperationSynchronous_Speculative &&
_sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
{
events ^= Interop.Sys.SocketEvents.Write;
}

return events;
}

public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
Expand All @@ -1955,14 +2022,23 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0)
AsyncOperation? receiveOperation =
stephentoub marked this conversation as resolved.
Show resolved Hide resolved
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;
AsyncOperation? sendOperation =
(events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;

// This method is called from a thread pool thread. When we have only one operation to process, process it
// synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both
// synchronously may delay the second operation, so schedule one onto the thread pool and process the other
// synchronously. There might be better ways of doing this.
if (sendOperation == null)
{
_receiveQueue.HandleEvent(this);
receiveOperation?.Process();
}

if ((events & Interop.Sys.SocketEvents.Write) != 0)
else
{
_sendQueue.HandleEvent(this);
receiveOperation?.Schedule();
sendOperation.Process();
}
}

Expand Down
Loading