Skip to content

Commit

Permalink
Clean up change
Browse files Browse the repository at this point in the history
  • Loading branch information
kouvel committed Apr 23, 2020
1 parent 46dc10d commit df0caf0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,21 +264,22 @@ public bool TryCancel()
return true;
}

public void Dispatch(bool inlineAsync)
public void Dispatch(bool processAsyncOperationSynchronously)
{
ManualResetEventSlim? e = Event;
if (e != null)
{
// Sync operation. Signal waiting thread to continue processing.
e.Set();
}
else if (inlineAsync)
else if (processAsyncOperationSynchronously)
{
// Async operation. Process the IO and callback on the current thread synchronously as requested.
((IThreadPoolWorkItem)this).Execute();
}
else
{
// Async operation. Process the IO on the threadpool.
// Async operation. Process the IO and callback on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}
Expand Down Expand Up @@ -1001,7 +1002,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
}
}

nextOp?.Dispatch(inlineAsync: false);
nextOp?.Dispatch(processAsyncOperationSynchronously: false);

return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled);
}
Expand Down Expand Up @@ -1080,7 +1081,7 @@ public void CancelAndContinueProcessing(TOperation op)
}
}

nextOp?.Dispatch(inlineAsync: false);
nextOp?.Dispatch(processAsyncOperationSynchronously: false);
}

// Called when the socket is closed.
Expand Down Expand Up @@ -1957,14 +1958,19 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.HandleEvent(this) : null;
AsyncOperation? sendOperation =
(events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.HandleEvent(this) : null;

// This method is called from a thread pool thread. When we have only one operation to process, process it
// synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both
// synchronously may delay the second operation, so schedule one onto the thread pool and process the other
// synchronously. There might be better ways of doing this.
if (sendOperation == null)
{
receiveOperation?.Dispatch(inlineAsync: true);
receiveOperation?.Dispatch(processAsyncOperationSynchronously: true);
}
else
{
receiveOperation?.Dispatch(inlineAsync: false);
sendOperation.Dispatch(inlineAsync: true);
receiveOperation?.Dispatch(processAsyncOperationSynchronously: false);
sendOperation.Dispatch(processAsyncOperationSynchronously: true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -130,7 +131,16 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContext> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContext>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
private readonly ConcurrentQueue<Event> _eventQueue = new ConcurrentQueue<Event>();

//
// This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is
// set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is
// not scheduled. Changes are protected by atomic operations as appropriate.
//
private int _eventQueueProcessingRequested;

//
Expand Down Expand Up @@ -324,7 +334,7 @@ private void EventLoop()
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

bool scheduleProcessing = false;
bool enqueuedEvent = false;
for (int i = 0; i < numEvents; i++)
{
IntPtr handle = _buffer[i].Data;
Expand All @@ -336,13 +346,13 @@ private void EventLoop()
{
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}");
eventQueue.Enqueue(new Event(handle, _buffer[i].Events));
scheduleProcessing = true;
enqueuedEvent = true;
}
}

if (scheduleProcessing && Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0)
if (enqueuedEvent)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
ScheduleToProcessEvents();
}
}

Expand All @@ -354,8 +364,24 @@ private void EventLoop()
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleToProcessEvents()
{
// Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid
// over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item
// to be scheduled for parallelizing processing of events.
if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

void IThreadPoolWorkItem.Execute()
{
// Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer
// threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an
// enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is
// the last thread processing events, it must see the event queued by the enqueuer.
Interlocked.Exchange(ref _eventQueueProcessingRequested, 0);

ConcurrentDictionary<IntPtr, SocketAsyncContext> handleToContextMap = _handleToContextMap;
Expand All @@ -368,10 +394,12 @@ void IThreadPoolWorkItem.Execute()
continue;
}

if (_eventQueueProcessingRequested == 0 &&
Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0)
// An event was successfully dequeued, and as there may be more events to process, speculatively schedule a work
// item to parallelize processing of events. Since this is only for additional parallelization, doing so
// speculatively is ok.
if (_eventQueueProcessingRequested == 0)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
ScheduleToProcessEvents();
}

context.HandleEvents(ev.Events);
Expand Down Expand Up @@ -420,8 +448,8 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err

private struct Event
{
public IntPtr Handle { get; private set; }
public Interop.Sys.SocketEvents Events { get; private set; }
public IntPtr Handle { get; }
public Interop.Sys.SocketEvents Events { get; }

public Event(IntPtr handle, Interop.Sys.SocketEvents events)
{
Expand Down

0 comments on commit df0caf0

Please sign in to comment.