Skip to content

Commit

Permalink
clean up some logic around non-blocking sockets on Unix (#50985)
Browse files Browse the repository at this point in the history
Co-authored-by: Geoffrey Kizer <geoffrek@windows.microsoft.com>
  • Loading branch information
geoffkizer and Geoffrey Kizer committed Apr 15, 2021
1 parent d4818a3 commit 2aa0348
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public partial class SafeSocketHandle
private int _receiveTimeout = -1;
private int _sendTimeout = -1;
private bool _nonBlocking;
private bool _underlyingHandleNonBlocking;
private SocketAsyncContext? _asyncContext;

private TrackedSocketOptions _trackedOptions;
Expand Down Expand Up @@ -110,19 +109,12 @@ internal SocketAsyncContext AsyncContext
}
}

// This will set the underlying OS handle to be nonblocking, for whatever reason --
// performing an async operation or using a timeout will cause this to happen.
// Once the OS handle is nonblocking, it never transitions back to blocking.
private void SetHandleNonBlocking()
{
// We don't care about synchronization because this is idempotent
if (!_underlyingHandleNonBlocking)
{
AsyncContext.SetNonBlocking();
_underlyingHandleNonBlocking = true;
}
}

/// <summary>
/// This represents whether the Socket instance is blocking or non-blocking *from the user's point of view*,
/// i.e. it corresponds to the Socket.Blocking property (except in reverse).
/// Even if this is false, the underlying native socket may still be non-blocking if anything ever caused it to become non-blocking,
/// either by issuing an async operation or explicitly setting this property to true.
/// </summary>
internal bool IsNonBlocking
{
get
Expand All @@ -133,26 +125,18 @@ internal bool IsNonBlocking
{
_nonBlocking = value;

//
// If transitioning to non-blocking, we need to set the native socket to non-blocking mode.
// If we ever transition back to blocking, we keep the native socket in non-blocking mode, and emulate
// blocking. This avoids problems with switching to native blocking while there are pending async
// operations.
//
// If transitioning from blocking to non-blocking, we need to set the native socket to non-blocking mode.
// If transitioning from non-blocking to blocking, we keep the native socket in non-blocking mode, and emulate
// blocking operations within SocketAsyncContext on top of epoll/kqueue.
// This avoids problems with switching to native blocking while there are pending operations.
if (value)
{
SetHandleNonBlocking();
AsyncContext.SetHandleNonBlocking();
}
}
}

internal bool IsUnderlyingBlocking
{
get
{
return !_underlyingHandleNonBlocking;
}
}
internal bool IsUnderlyingHandleBlocking => !AsyncContext.IsHandleNonBlocking;

internal int ReceiveTimeout
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName]
private OperationQueue<WriteOperation> _sendQueue;
private SocketAsyncEngine? _asyncEngine;
private bool IsRegistered => _asyncEngine != null;
private bool _nonBlockingSet;
private bool _isHandleNonBlocking;

private readonly object _registerLock = new object();

Expand All @@ -1254,7 +1254,7 @@ public bool PreferInlineCompletions

private bool TryRegister(out Interop.Error error)
{
Debug.Assert(_nonBlockingSet);
Debug.Assert(_isHandleNonBlocking);
lock (_registerLock)
{
if (_asyncEngine == null)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ public bool StopAndAbort()
return aborted;
}

public void SetNonBlocking()
public void SetHandleNonBlocking()
{
//
// Our sockets may start as blocking, and later transition to non-blocking, either because the user
Expand All @@ -1317,17 +1317,19 @@ public void SetNonBlocking()
// Note that there's no synchronization here, so we may set the non-blocking option multiple times
// in a race. This should be fine.
//
if (!_nonBlockingSet)
if (!_isHandleNonBlocking)
{
if (Interop.Sys.Fcntl.SetIsNonBlocking(_socket, 1) != 0)
{
throw new SocketException((int)SocketPal.GetSocketErrorForErrorCode(Interop.Sys.GetLastError()));
}

_nonBlockingSet = true;
_isHandleNonBlocking = true;
}
}

public bool IsHandleNonBlocking => _isHandleNonBlocking;

private void PerformSyncOperation<TOperation>(ref OperationQueue<TOperation> queue, TOperation operation, int timeout, int observedSequenceNumber)
where TOperation : AsyncOperation
{
Expand Down Expand Up @@ -1389,7 +1391,7 @@ private void PerformSyncOperation<TOperation>(ref OperationQueue<TOperation> que

private bool ShouldRetrySyncOperation(out SocketError errorCode)
{
if (_nonBlockingSet)
if (_isHandleNonBlocking)
{
errorCode = SocketError.Success; // Will be ignored
return true;
Expand Down Expand Up @@ -1437,7 +1439,7 @@ public SocketError AcceptAsync(byte[] socketAddress, ref int socketAddressLen, o
Debug.Assert(socketAddressLen > 0, $"Unexpected socketAddressLen: {socketAddressLen}");
Debug.Assert(callback != null, "Expected non-null callback");

SetNonBlocking();
SetHandleNonBlocking();

SocketError errorCode;
int observedSequenceNumber;
Expand Down Expand Up @@ -1504,7 +1506,7 @@ public SocketError ConnectAsync(byte[] socketAddress, int socketAddressLen, Acti
Debug.Assert(socketAddressLen > 0, $"Unexpected socketAddressLen: {socketAddressLen}");
Debug.Assert(callback != null, "Expected non-null callback");

SetNonBlocking();
SetHandleNonBlocking();

// Connect is different than the usual "readiness" pattern of other operations.
// We need to initiate the connect before we try to complete it.
Expand Down Expand Up @@ -1616,7 +1618,7 @@ public unsafe SocketError ReceiveFrom(Span<byte> buffer, ref SocketFlags flags,

public SocketError ReceiveAsync(Memory<byte> buffer, SocketFlags flags, out int bytesReceived, Action<int, byte[]?, int, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
{
SetNonBlocking();
SetHandleNonBlocking();

SocketError errorCode;
int observedSequenceNumber;
Expand Down Expand Up @@ -1649,7 +1651,7 @@ public SocketError ReceiveAsync(Memory<byte> buffer, SocketFlags flags, out int

public SocketError ReceiveFromAsync(Memory<byte> buffer, SocketFlags flags, byte[]? socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[]?, int, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
{
SetNonBlocking();
SetHandleNonBlocking();

SocketError errorCode;
int observedSequenceNumber;
Expand Down Expand Up @@ -1726,7 +1728,7 @@ public SocketError ReceiveFrom(IList<ArraySegment<byte>> buffers, ref SocketFlag

public SocketError ReceiveFromAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[]? socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[]?, int, SocketFlags, SocketError> callback)
{
SetNonBlocking();
SetHandleNonBlocking();

SocketError errorCode;
int observedSequenceNumber;
Expand Down Expand Up @@ -1837,7 +1839,7 @@ public SocketError ReceiveFromAsync(IList<ArraySegment<byte>> buffers, SocketFla

public SocketError ReceiveMessageFromAsync(Memory<byte> buffer, IList<ArraySegment<byte>>? buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out int bytesReceived, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, Action<int, byte[], int, SocketFlags, IPPacketInformation, SocketError> callback, CancellationToken cancellationToken = default)
{
SetNonBlocking();
SetHandleNonBlocking();

SocketError errorCode;
int observedSequenceNumber;
Expand Down Expand Up @@ -1956,7 +1958,7 @@ public unsafe SocketError SendTo(ReadOnlySpan<byte> buffer, SocketFlags flags, b

public SocketError SendToAsync(Memory<byte> buffer, int offset, int count, SocketFlags flags, byte[]? socketAddress, ref int socketAddressLen, out int bytesSent, Action<int, byte[]?, int, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
{
SetNonBlocking();
SetHandleNonBlocking();

bytesSent = 0;
SocketError errorCode;
Expand Down Expand Up @@ -2035,7 +2037,7 @@ public SocketError SendTo(IList<ArraySegment<byte>> buffers, SocketFlags flags,

public SocketError SendToAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[]? socketAddress, ref int socketAddressLen, out int bytesSent, Action<int, byte[]?, int, SocketFlags, SocketError> callback)
{
SetNonBlocking();
SetHandleNonBlocking();

bytesSent = 0;
int bufferIndex = 0;
Expand Down Expand Up @@ -2100,7 +2102,7 @@ public SocketError SendFile(SafeFileHandle fileHandle, long offset, long count,

public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long count, out long bytesSent, Action<long, SocketError> callback)
{
SetNonBlocking();
SetHandleNonBlocking();

bytesSent = 0;
SocketError errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public static bool TryCompleteSendTo(SafeSocketHandle socket, IList<ArraySegment
public static bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan<byte> buffer, IList<ArraySegment<byte>>? buffers, ref int bufferIndex, ref int offset, ref int count, SocketFlags flags, byte[]? socketAddress, int socketAddressLen, ref int bytesSent, out SocketError errorCode)
{
bool successfulSend = false;
long start = socket.IsUnderlyingBlocking && socket.SendTimeout > 0 ? Environment.TickCount64 : 0; // Get ticks only if timeout is set and socket is blocking.
long start = socket.IsUnderlyingHandleBlocking && socket.SendTimeout > 0 ? Environment.TickCount64 : 0; // Get ticks only if timeout is set and socket is blocking.

while (true)
{
Expand Down Expand Up @@ -965,7 +965,7 @@ public static bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan<byte>
return true;
}

if (socket.IsUnderlyingBlocking && socket.SendTimeout > 0 && (Environment.TickCount64 - start) >= socket.SendTimeout)
if (socket.IsUnderlyingHandleBlocking && socket.SendTimeout > 0 && (Environment.TickCount64 - start) >= socket.SendTimeout)
{
// When socket is truly in blocking mode, we depend on OS to enforce send timeout.
// When we are here we had partial send when we neither completed or failed.
Expand Down

0 comments on commit 2aa0348

Please sign in to comment.