Permalink
Browse files

Make write non-blocking for async transport.

  • Loading branch information...
xinchen10 committed Sep 24, 2016
1 parent 5847267 commit 1c8da16195264cac589294e462dc2e78d02b35db
Showing with 19 additions and 8 deletions.
  1. +1 −1 src/Connection.cs
  2. +14 −3 src/Net/TaskExtensions.cs
  3. +2 −2 src/WinRT/TcpTransport.cs
  4. +2 −2 src/WinRT/WebSocketTransport.cs
@@ -352,7 +352,7 @@ void Connect(SaslProfile saslProfile, Open open)
if (WebSocketTransport.MatchScheme(address.Scheme))
{
WebSocketTransport wsTransport = new WebSocketTransport();
wsTransport.ConnectAsync(address, null).GetAwaiter().GetResult();
wsTransport.ConnectAsync(address, null).ConfigureAwait(false).GetAwaiter().GetResult();
transport = wsTransport;
}
else
@@ -266,13 +266,24 @@ public static Task<Message> ReceiveAsync(this ReceiverLink receiver, int timeout
internal static async Task<IAsyncTransport> OpenAsync(this SaslProfile saslProfile, string hostname,
IBufferManager bufferManager, IAsyncTransport transport)
{
ProtocolHeader header = saslProfile.Start(hostname, transport);
// if transport is closed, pump reader should throw exception
TransportWriter writer = new TransportWriter(transport, e => { });
ProtocolHeader myHeader = saslProfile.Start(hostname, writer);
AsyncPump pump = new AsyncPump(bufferManager, transport);
await pump.PumpAsync(
h => { saslProfile.OnHeader(header, h); return true; },
b => { SaslCode code; return saslProfile.OnFrame(transport, b, out code); });
header =>
{
saslProfile.OnHeader(myHeader, header);
return true;
},
buffer =>
{
SaslCode code;
return saslProfile.OnFrame(writer, buffer, out code);
});
return (IAsyncTransport)saslProfile.UpgradeTransportInternal(transport);
}
@@ -83,12 +83,12 @@ public void Close()
public void Send(ByteBuffer buffer)
{
this.SendAsync(new ByteBuffer[] { buffer }, buffer.Length).GetAwaiter().GetResult();
this.SendAsync(new ByteBuffer[] { buffer }, buffer.Length).ConfigureAwait(false).GetAwaiter().GetResult();
}
public int Receive(byte[] buffer, int offset, int count)
{
return this.ReceiveAsync(buffer, offset, count).GetAwaiter().GetResult();
return this.ReceiveAsync(buffer, offset, count).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
}
@@ -109,12 +109,12 @@ void ITransport.Close()
void ITransport.Send(ByteBuffer buffer)
{
((IAsyncTransport)this).SendAsync(new ByteBuffer[] { buffer }, buffer.Length).GetAwaiter().GetResult();
((IAsyncTransport)this).SendAsync(new ByteBuffer[] { buffer }, buffer.Length).ConfigureAwait(false).GetAwaiter().GetResult();
}
int ITransport.Receive(byte[] buffer, int offset, int count)
{
return ((IAsyncTransport)this).ReceiveAsync(buffer, offset, count).GetAwaiter().GetResult();
return ((IAsyncTransport)this).ReceiveAsync(buffer, offset, count).ConfigureAwait(false).GetAwaiter().GetResult();
}
static int GetDefaultPort(string scheme, int port)

0 comments on commit 1c8da16

Please sign in to comment.