diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 87f1c691c8930..008d2a1ae594e 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -264,7 +264,7 @@ public bool TryCancel() return true; } - public void Dispatch(bool inlineAsync) + public void Dispatch(bool processAsyncOperationSynchronously) { ManualResetEventSlim? e = Event; if (e != null) @@ -272,13 +272,14 @@ public void Dispatch(bool inlineAsync) // Sync operation. Signal waiting thread to continue processing. e.Set(); } - else if (inlineAsync) + else if (processAsyncOperationSynchronously) { + // Async operation. Process the IO and callback on the current thread synchronously as requested. ((IThreadPoolWorkItem)this).Execute(); } else { - // Async operation. Process the IO on the threadpool. + // Async operation. Process the IO and callback on the threadpool. ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } @@ -1001,7 +1002,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) } } - nextOp?.Dispatch(inlineAsync: false); + nextOp?.Dispatch(processAsyncOperationSynchronously: false); return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } @@ -1080,7 +1081,7 @@ public void CancelAndContinueProcessing(TOperation op) } } - nextOp?.Dispatch(inlineAsync: false); + nextOp?.Dispatch(processAsyncOperationSynchronously: false); } // Called when the socket is closed. @@ -1957,14 +1958,19 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.HandleEvent(this) : null; AsyncOperation? sendOperation = (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.HandleEvent(this) : null; + + // This method is called from a thread pool thread. When we have only one operation to process, process it + // synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both + // synchronously may delay the second operation, so schedule one onto the thread pool and process the other + // synchronously. There might be better ways of doing this. if (sendOperation == null) { - receiveOperation?.Dispatch(inlineAsync: true); + receiveOperation?.Dispatch(processAsyncOperationSynchronously: true); } else { - receiveOperation?.Dispatch(inlineAsync: false); - sendOperation.Dispatch(inlineAsync: true); + receiveOperation?.Dispatch(processAsyncOperationSynchronously: false); + sendOperation.Dispatch(processAsyncOperationSynchronously: true); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 04dc86812bd57..5258c4f67260c 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -130,7 +131,16 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private readonly ConcurrentDictionary _handleToContextMap = new ConcurrentDictionary(); + // + // Queue of events generated by EventLoop() that would be processed by the thread pool + // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + + // + // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is + // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is + // not scheduled. Changes are protected by atomic operations as appropriate. + // private int _eventQueueProcessingRequested; // @@ -324,7 +334,7 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); - bool scheduleProcessing = false; + bool enqueuedEvent = false; for (int i = 0; i < numEvents; i++) { IntPtr handle = _buffer[i].Data; @@ -336,13 +346,13 @@ private void EventLoop() { Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); eventQueue.Enqueue(new Event(handle, _buffer[i].Events)); - scheduleProcessing = true; + enqueuedEvent = true; } } - if (scheduleProcessing && Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + if (enqueuedEvent) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + ScheduleToProcessEvents(); } } @@ -354,8 +364,24 @@ private void EventLoop() } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ScheduleToProcessEvents() + { + // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid + // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item + // to be scheduled for parallelizing processing of events. + if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + { + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + } + void IThreadPoolWorkItem.Execute() { + // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer + // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an + // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is + // the last thread processing events, it must see the event queued by the enqueuer. Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); ConcurrentDictionary handleToContextMap = _handleToContextMap; @@ -368,10 +394,12 @@ void IThreadPoolWorkItem.Execute() continue; } - if (_eventQueueProcessingRequested == 0 && - Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + // An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work + // item to parallelize processing of events. Since this is only for additional parallelization, doing so + // speculatively is ok. + if (_eventQueueProcessingRequested == 0) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + ScheduleToProcessEvents(); } context.HandleEvents(ev.Events); @@ -420,8 +448,8 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err private struct Event { - public IntPtr Handle { get; private set; } - public Interop.Sys.SocketEvents Events { get; private set; } + public IntPtr Handle { get; } + public Interop.Sys.SocketEvents Events { get; } public Event(IntPtr handle, Interop.Sys.SocketEvents events) {