Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize epoll events on thread pool and process events in the same thread #35330

Merged
merged 13 commits into from
May 3, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,21 @@ public void Dispatch()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading the code correctly that we'll likely end up calling Set twice for a sync operation (with the second call just being a nop)? I think that's probably fine, but it'd be worth a comment calling that out.

Copy link
Member Author

@kouvel kouvel Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think it might be called twice? My intention was that either the epoll thread handles an event or it will queue it for processing in the background, but not both. If the epoll thread correctly sees that there is a pending sync operation next, then it would call set and not queue an operation for that. Otherwise, it would not process the event and queue it instead.

else
{
// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
// Async operation.
Schedule();
}
}

public void Schedule()
{
Debug.Assert(Event == null);

// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

public void Process() => ((IThreadPoolWorkItem)this).Execute();

void IThreadPoolWorkItem.Execute()
{
// ReadOperation and WriteOperation, the only two types derived from
Expand Down Expand Up @@ -706,6 +716,7 @@ private enum QueueState : byte
// These fields define the queue state.

private QueueState _state; // See above
private bool _isNextOperationSynchronous;
private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification.
// It allows us to detect when a new epoll notification has arrived
// since the last time we checked the state of the queue.
Expand All @@ -720,6 +731,8 @@ private enum QueueState : byte

private LockToken Lock() => new LockToken(_queueLock);

public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous;

public void Init()
{
Debug.Assert(_queueLock == null);
Expand Down Expand Up @@ -779,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
// Enqueue the operation.
Debug.Assert(operation.Next == operation, "Expected operation.Next == operation");

if (_tail != null)
if (_tail == null)
{
Debug.Assert(!_isNextOperationSynchronous);
_isNextOperationSynchronous = operation.Event != null;
}
else
{
operation.Next = _tail.Next;
_tail.Next = operation;
Expand Down Expand Up @@ -825,8 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
}
}

// Called on the epoll thread whenever we receive an epoll notification.
public void HandleEvent(SocketAsyncContext context)
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context)
{
AsyncOperation op;
using (Lock())
Expand All @@ -839,7 +856,7 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail == null, "State == Ready but queue is not empty!");
_sequenceNumber++;
Trace(context, $"Exit (previously ready)");
return;
return null;

case QueueState.Waiting:
Debug.Assert(_tail != null, "State == Waiting but queue is empty!");
Expand All @@ -852,21 +869,31 @@ public void HandleEvent(SocketAsyncContext context)
Debug.Assert(_tail != null, "State == Processing but queue is empty!");
_sequenceNumber++;
Trace(context, $"Exit (currently processing)");
return;
return null;

case QueueState.Stopped:
Debug.Assert(_tail == null);
Trace(context, $"Exit (stopped)");
return;
return null;

default:
Environment.FailFast("unexpected queue state");
return;
return null;
}
}

// Dispatch the op so we can try to process it.
op.Dispatch();
ManualResetEventSlim? e = op.Event;
if (e != null)
{
// Sync operation. Signal waiting thread to continue processing.
e.Set();
return null;
}
else
{
// Async operation. The caller will figure out how to process the IO.
return op;
}
}

internal void ProcessAsyncOperation(TOperation op)
Expand Down Expand Up @@ -991,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// No more operations to process
_tail = null;
_isNextOperationSynchronous = false;
_state = QueueState.Ready;
_sequenceNumber++;
Trace(context, $"Exit (finished queue)");
Expand All @@ -999,6 +1027,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
{
// Pop current operation and advance to next
nextOp = _tail.Next = op.Next;
_isNextOperationSynchronous = nextOp.Event != null;
}
}
}
Expand Down Expand Up @@ -1033,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op)
{
// No more operations
_tail = null;
_isNextOperationSynchronous = false;
}
else
{
// Pop current operation and advance to next
_tail.Next = op.Next;
_isNextOperationSynchronous = op.Next.Event != null;
}

// We're the first op in the queue.
Expand Down Expand Up @@ -1112,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context)
}

_tail = null;
_isNextOperationSynchronous = false;

Trace(context, $"Exit");
}
Expand Down Expand Up @@ -1946,6 +1978,36 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co
return SocketError.IOPending;
}

// Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and
// returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically
// handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there situations where we may not signal a sync op synchronously? What do those look like and how likely are they to happen?

Copy link
Member Author

@kouvel kouvel Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check done on the epoll thread is speculative. The value that is checked is updated under the lock, and since the epoll thread does not take the lock the value read may be incorrect. It is very unlikely to happen because a wait of any sort would typically involve a memory barrier (often even if it does not actually end up waiting), and the value read would be at least as recent as when the epoll wait was released. Even if the wait did not block, it is possible that (if it doesn't involve a memory barrier directly) the call sequence involved would involve a memory barrier of some sort. Nevermind the estimations on memory barriers, the fact is that at worst we are relying on the latency of processor cache consistency here, and how bad that can be depends entirely on the processor. Some old (especially arm) processors don't have any sort of cache consistency and they rely entirely on software to do the right thing. When a processor has cache consistency the whole idea is that it shouldn't take an inordinate amount of time to make caches consistent, otherwise it would defeat the purpose. For example, using Volatile.Write to exit a lock relies entirely on processor cache consistency latency for it to work reasonably well. My stance remains that considering that the alternative is not functionally incorrect, this should be good enough for the purpose. That is up for debate though, we can sacrifice some perf to guarantee that sync operations are signaled on the epoll thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feasibility of any speculative code like this, and the feasibility of the general change itself, may depend on various dynamics. I'm not an expert in this area. Generally I would recommend that a subject-matter expert review this change. I see that @geoffkizer made some changes here before. Who would be appropriate to review this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kouvel beside Geoff, @stephentoub has the most expertise.
@antonfirsov may be able to chime in as well ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can sacrifice some perf to guarantee that sync operations are signaled on the epoll thread.

@adamsitnik benchmarked non speculatively to be 1090k rps vs 1139k rps for json plaintext (due to taking the lock).
Coincidentally we're fully loading the 1 epoll thread on the Citrine machine with that benchmark, so not taking the lock avoids the epoll bottleneck.

Does adding a Volatile.Read in IsNextOperationSynchronous_Speculative have an effect on performance? It would make more clear at what point we're picking a value that got set on another thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok great, thanks. I think this is an open issue currently, whether it is ok to miss releasing a synchronous operation from the epoll thread sometimes. It could happen due to races, processor cache issues, etc., and could lead to thread pool starvation if all thread pool threads are blocked by the synchronous operations, perhaps with more synchronous operations waiting in the queue, and probably more likely if no more epoll notifications come in for the relevant sockets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does adding a Volatile.Read in IsNextOperationSynchronous_Speculative have an effect on performance? It would make more clear at what point we're picking a value that got set on another thread.

It wouldn't have any effect on x64, would have to check on arm64. On arm64 I think the memory barrier would usually be redundant (and in the wrong place, after the read instead of before), and the overhead would be incurred for each event and each queue. Adding an explicit memory barrier before the loop may be better, but I suspect it would be difficult to quantify how much it would help.

Copy link
Member Author

@kouvel kouvel Apr 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Edited] Actually the explicit memory barrier may not help much. There is an interlocked operation to schedule a thread to process events. If events were not queued, then it would have taken at least one operation queue lock. And maybe a barrier from the epoll call as well. So caches would likely already be cleared and the speculative read would be reading a recent value, just not under a lock so there could still be races.

public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
// Set the Read and Write flags; the processing for these events
// will pick up the error.
events ^= Interop.Sys.SocketEvents.Error;
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0 &&
_receiveQueue.IsNextOperationSynchronous_Speculative &&
_receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kouvel @stephentoub @adamsitnik I think there is an issue when IsNextOperationSynchronous_Speculative is true but the operation is not really a SyncEvent operation. The queue moves to Processing but no-one dispatches the operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, will fix

{
events ^= Interop.Sys.SocketEvents.Read;
}

if ((events & Interop.Sys.SocketEvents.Write) != 0 &&
_sendQueue.IsNextOperationSynchronous_Speculative &&
_sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null)
{
events ^= Interop.Sys.SocketEvents.Write;
}

return events;
}

public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
Expand All @@ -1955,14 +2017,23 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0)
AsyncOperation? receiveOperation =
stephentoub marked this conversation as resolved.
Show resolved Hide resolved
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;
AsyncOperation? sendOperation =
(events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;

// This method is called from a thread pool thread. When we have only one operation to process, process it
// synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both
// synchronously may delay the second operation, so schedule one onto the thread pool and process the other
// synchronously. There might be better ways of doing this.
if (sendOperation == null)
{
_receiveQueue.HandleEvent(this);
receiveOperation?.Process();
}

if ((events & Interop.Sys.SocketEvents.Write) != 0)
else
{
_sendQueue.HandleEvent(this);
receiveOperation?.Schedule();
sendOperation.Process();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.Sockets
{
internal sealed unsafe class SocketAsyncEngine
internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
{
//
// Encapsulates a particular SocketAsyncContext object's access to a SocketAsyncEngine.
Expand Down Expand Up @@ -130,6 +131,18 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContext> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContext>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();

//
// This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is
// set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is
// not scheduled. Changes are protected by atomic operations as appropriate.
//
private int _eventQueueProcessingRequested;

//
// True if we've reached the handle value limit for this event port, and thus must allocate a new event port
// on the next handle allocation.
Expand Down Expand Up @@ -308,10 +321,13 @@ private void EventLoop()
try
{
bool shutdown = false;
Interop.Sys.SocketEvent* buffer = _buffer;
ConcurrentDictionary<IntPtr, SocketAsyncContext> handleToContextMap = _handleToContextMap;
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
while (!shutdown)
{
int numEvents = EventBufferCount;
Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, _buffer, &numEvents);
Interop.Error err = Interop.Sys.WaitForSocketEvents(_port, buffer, &numEvents);
if (err != Interop.Error.SUCCESS)
{
throw new InternalException(err);
Expand All @@ -320,24 +336,46 @@ private void EventLoop()
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

bool enqueuedEvent = false;
for (int i = 0; i < numEvents; i++)
{
IntPtr handle = _buffer[i].Data;
IntPtr handle = buffer[i].Data;
if (handle == ShutdownHandle)
{
shutdown = true;
}
else
{
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}");
_handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context);
handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context);
if (context != null)
{
context.HandleEvents(_buffer[i].Events);
Interop.Sys.SocketEvents events = buffer[i].Events;
events = context.HandleSyncEventsSpeculatively(events);
if (events != Interop.Sys.SocketEvents.None)
{
var ev = new SocketIOEvent(context, events);
eventQueue.Enqueue(ev);
enqueuedEvent = true;

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
ev = default;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
stephentoub marked this conversation as resolved.
Show resolved Hide resolved
}

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
context = null;
}
}
}

if (enqueuedEvent)
{
ScheduleToProcessEvents();
}
}

FreeNativeResources();
Expand All @@ -348,6 +386,59 @@ private void EventLoop()
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleToProcessEvents()
{
// Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid
// over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item
// to be scheduled for parallelizing processing of events.
if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

void IThreadPoolWorkItem.Execute()
{
// Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer
// threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an
// enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is
// the last thread processing events, it must see the event queued by the enqueuer.
Interlocked.Exchange(ref _eventQueueProcessingRequested, 0);

ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
int startTimeMs = Environment.TickCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe move startTimeMs assignment below TryDequeue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it would avoid the tick count when the queue is empty, will do

if (!eventQueue.TryDequeue(out SocketIOEvent ev))
{
return;
}

// An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize
// processing of events, before processing more events. Following this, it is the responsibility of the new work
// item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if
// the user callback as part of handling the event blocks for some reason that may have a dependency on other queued
// socket events.
ScheduleToProcessEvents();

while (true)
{
ev.Context.HandleEvents(ev.Events);

if (Environment.TickCount - startTimeMs >= 15)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TickCount is sufficiently fast to call per work item?

Also, what is "15" here? Is that just a reasonable quantum? A comment would be helpful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably a measurable perf difference with many measurements to weed out error, of maybe 2-3 K RPS, but that was more than countered by the change to remove the speculative work item scheduling, which can also has the effect of decreasing the rate of parallelization and its counteracting improvement is similarly difficult to measure. Roughly break-even between the two.

Copy link
Member Author

@kouvel kouvel Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a bunch of values including using Stopwatch instead, considering that it is the max latency if such work items were to use up the whole thread pool budget, I figured it's a reasonable value. The average latency to run another queued work item as a result of this work item being long-running would be much less, something like 15 ms / proc count in ideal circumstances, and even then it's kind of a corner case.

Copy link
Member Author

@kouvel kouvel Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the comment I had following that path was sufficient, is there something specific that you would like to be mentioned in the comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there something specific that you would like to be mentioned in the comment?

Just why you chose 15 specifically. It's not clear to me from the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, done

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need some sign coercion as TickCount overflows every 49'ish days of host uptime (so it will become -2.1billionish - +2.1billionish). Or ignore it if things remain correct-but-non-optimal for that once every 49 days event? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would only be possible if a user callback takes a vastly inordinate amount of time to run. TickCount overflow would not impact the check done here, the check done here is on the difference between current tick count and start tick count, that signed value would take a lot more to overflow, enough so that IMO it would already be incredibly unreasonable for user code to cause such a thing to happen for it to be worth considering here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But even if a single continuation did take more than 49 days, this check wouldn't hurt anything. This is just an optimization to try to avoid tying up a single thread pool thread by running too many small work items; if a user has a 49-day work item, this doesn't matter anyway.

{
break;
}

if (!eventQueue.TryDequeue(out ev))
{
return;
}
}

// Yield the thread to allow the thread pool to run other work items
ScheduleToProcessEvents();
}

private void RequestEventLoopShutdown()
{
//
Expand Down Expand Up @@ -387,5 +478,17 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, handle);
return error == Interop.Error.SUCCESS;
}

private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
public Interop.Sys.SocketEvents Events { get; }

public SocketIOEvent(SocketAsyncContext context, Interop.Sys.SocketEvents events)
{
Context = context;
Events = events;
}
}
}
}