Skip to content

Commit

Permalink
Refactor Task Socket.ConnectAsync methods to use AwaitableSocketAsync…
Browse files Browse the repository at this point in the history
…EventArgs
  • Loading branch information
tmds committed Dec 12, 2019
1 parent 89f3ff1 commit ffcc63a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,77 +119,47 @@ private Task<Socket> AcceptAsyncApm(Socket acceptSocket)

internal Task ConnectAsync(EndPoint remoteEP)
{
var tcs = new TaskCompletionSource<bool>(this);
BeginConnect(remoteEP, iar =>
// Use ValueTaskReceive so the AwaitableSocketAsyncEventArgs can be re-used later.
AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref EventArgs.ValueTaskReceive, () => new AwaitableSocketAsyncEventArgs());

// We don't expect concurrent users while calling ConnectAsync.
if (!saea.Reserve())
{
var innerTcs = (TaskCompletionSource<bool>)iar.AsyncState;
try
{
((Socket)innerTcs.Task.AsyncState).EndConnect(iar);
innerTcs.TrySetResult(true);
}
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
throw new InvalidOperationException(SR.Format(SR.net_socketopinprogress));
}

saea.RemoteEndPoint = remoteEP;
return saea.ConnectAsync(this).AsTask();
}

internal Task ConnectAsync(IPAddress address, int port)
{
var tcs = new TaskCompletionSource<bool>(this);
BeginConnect(address, port, iar =>
{
var innerTcs = (TaskCompletionSource<bool>)iar.AsyncState;
try
{
((Socket)innerTcs.Task.AsyncState).EndConnect(iar);
innerTcs.TrySetResult(true);
}
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
}
=> ConnectAsync(new IPEndPoint(address, port));

internal Task ConnectAsync(IPAddress[] addresses, int port)
internal async Task ConnectAsync(IPAddress[] addresses, int port)
{
var tcs = new TaskCompletionSource<bool>(this);
BeginConnect(addresses, port, iar =>
if (addresses == null)
{
var innerTcs = (TaskCompletionSource<bool>)iar.AsyncState;
try
{
((Socket)innerTcs.Task.AsyncState).EndConnect(iar);
innerTcs.TrySetResult(true);
}
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
throw new ArgumentNullException(nameof(addresses));
}
if (addresses.Length == 0)
{
throw new ArgumentException(SR.net_invalidAddressList, nameof(addresses));
}
foreach (var address in addresses)
{
await ConnectAsync(address, port).ConfigureAwait(false);
}
}

internal Task ConnectAsync(string host, int port)
{
var tcs = new TaskCompletionSource<bool>(this);
BeginConnect(host, port, iar =>
{
var innerTcs = (TaskCompletionSource<bool>)iar.AsyncState;
try
{
((Socket)innerTcs.Task.AsyncState).EndConnect(iar);
innerTcs.TrySetResult(true);
}
catch (Exception e) { innerTcs.TrySetException(e); }
}, tcs);
return tcs.Task;
}
=> ConnectAsync(new DnsEndPoint(host, port));

internal Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream)
{
ValidateBuffer(buffer);
return ReceiveAsync((Memory<byte>)buffer, socketFlags, fromNetworkStream, default).AsTask();
}

// TODO https://github.com/dotnet/corefx/issues/24430:
// Fully plumb cancellation down into socket operations.

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -949,6 +919,24 @@ public ValueTask SendAsyncForNetworkStream(Socket socket, CancellationToken canc
new ValueTask(Task.FromException(CreateException(error)));
}

public ValueTask ConnectAsync(Socket socket)
{
Debug.Assert(Volatile.Read(ref _continuation) == null, $"Expected null continuation to indicate reserved for use");

if (socket.ConnectAsync(this))
{
return new ValueTask(this, _token);
}

SocketError error = SocketError;

Release();

return error == SocketError.Success ?
default :
new ValueTask(Task.FromException(CreateException(error)));
}

/// <summary>Gets the status of the operation.</summary>
public ValueTaskSourceStatus GetStatus(short token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public async Task Connect_AfterDisconnect_Fails()
[PlatformSpecific(~(TestPlatforms.OSX | TestPlatforms.FreeBSD))] // Not supported on BSD like OSes.
public async Task ConnectGetsCanceledByDispose()
{
bool usesApm = UsesApm ||
(this is ConnectTask); // .NET Core ConnectAsync Task API is implemented using Apm
bool usesApm = UsesApm;

// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
Expand Down

0 comments on commit ffcc63a

Please sign in to comment.