Skip to content

Commit

Permalink
Merge branch 'release/2.1' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
davidfowl committed Mar 29, 2018
2 parents 2f9942e + 7a53e07 commit 80b842e
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 68 deletions.
19 changes: 15 additions & 4 deletions src/Common/PipeWriterStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,28 @@ public override void Write(byte[] buffer, int offset, int count)

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Write(buffer, offset, count);
return Task.CompletedTask;
return WriteCoreAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
}

#if NETCOREAPP2_1
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
_pipeWriter.Write(source.Span);
return WriteCoreAsync(source, cancellationToken);
}
#endif

private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
_length += source.Length;
var task = _pipeWriter.WriteAsync(source);
if (!task.IsCompletedSuccessfully)
{
return WriteSlowAsync(task);
}

return default;

async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask) => await flushTask;
}
#endif
}
}
13 changes: 9 additions & 4 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ private async Task HandshakeAsync()
{
var result = await _connectionState.Connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.Start;
var examined = buffer.End;

try
{
Expand All @@ -524,6 +526,12 @@ private async Task HandshakeAsync()
{
if (HandshakeProtocol.TryParseResponseMessage(ref buffer, out var message))
{
// Adjust consumed and examined to point to the end of the handshake
// response, this handles the case where invocations are sent in the same payload
// as the the negotiate response.
consumed = buffer.Start;
examined = consumed;

if (message.Error != null)
{
Log.HandshakeServerError(_logger, message.Error);
Expand All @@ -543,10 +551,7 @@ private async Task HandshakeAsync()
}
finally
{
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
_connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
_connectionState.Connection.Transport.Input.AdvanceTo(consumed, examined);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ internal async Task<bool> HandshakeAsync(TimeSpan timeout, IList<string> support
{
using (var cts = new CancellationTokenSource())
{
cts.CancelAfter(timeout);
if (!Debugger.IsAttached)
{
cts.CancelAfter(timeout);
}

while (true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ private async Task Poll(Uri pollUrl, CancellationToken cancellationToken)

var stream = new PipeWriterStream(_application.Output);
await response.Content.CopyToAsync(stream);
await _application.Output.FlushAsync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,10 @@ private async Task ProcessSend(HttpContext context, HttpConnectionOptions option
return;
}

// Until the parsers are incremental, we buffer the entire request body before
// flushing the buffer. Using CopyToAsync allows us to avoid allocating a single giant
// buffer before writing.
var pipeWriterStream = new PipeWriterStream(connection.Application.Output);
await context.Request.Body.CopyToAsync(pipeWriterStream);

Log.ReceivedBytes(_logger, pipeWriterStream.Length);
await connection.Application.Output.FlushAsync();
}

private async Task<bool> EnsureConnectionStateAsync(HttpConnectionContext connection, HttpContext context, TransportType transportType, TransportType supportedTransports, ConnectionLogScope logScope, HttpConnectionOptions options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,13 @@ private async Task StartReceiving(WebSocket socket)

_application.Output.Advance(receiveResult.Count);

if (receiveResult.EndOfMessage)
{
var flushResult = await _application.Output.FlushAsync();
var flushResult = await _application.Output.FlushAsync();

// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public async Task ReceiveCloseMessageWithoutErrorWillCloseHubConnection()
{
await hubConnection.StartAsync().OrTimeout();

await connection.ReceiveJsonMessage(new {type = 7}).OrTimeout();
await connection.ReceiveJsonMessage(new { type = 7 }).OrTimeout();

Exception closeException = await closedTcs.Task.OrTimeout();
Assert.Null(closeException);
Expand All @@ -127,7 +127,7 @@ public async Task ReceiveCloseMessageWithErrorWillCloseHubConnection()
{
await hubConnection.StartAsync().OrTimeout();

await connection.ReceiveJsonMessage(new {type = 7, error = "Error!"}).OrTimeout();
await connection.ReceiveJsonMessage(new { type = 7, error = "Error!" }).OrTimeout();

Exception closeException = await closedTcs.Task.OrTimeout();
Assert.NotNull(closeException);
Expand Down Expand Up @@ -156,7 +156,7 @@ public async Task StreamSendsAnInvocationMessage()
Assert.Equal("{\"type\":4,\"invocationId\":\"1\",\"target\":\"Foo\",\"arguments\":[]}", invokeMessage);

// Complete the channel
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
await channel.Completion;
}
finally
Expand All @@ -177,7 +177,7 @@ public async Task InvokeCompletedWhenCompletionMessageReceived()

var invokeTask = hubConnection.InvokeAsync("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();

await invokeTask.OrTimeout();
}
Expand All @@ -199,7 +199,7 @@ public async Task StreamCompletesWhenCompletionMessageIsReceived()

var channel = await hubConnection.StreamAsChannelAsync<int>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();

Assert.Empty(await channel.ReadAllAsync());
}
Expand All @@ -221,7 +221,7 @@ public async Task InvokeYieldsResultWhenCompletionMessageReceived()

var invokeTask = hubConnection.InvokeAsync<int>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3, result = 42}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = 42 }).OrTimeout();

Assert.Equal(42, await invokeTask.OrTimeout());
}
Expand All @@ -243,7 +243,7 @@ public async Task InvokeFailsWithExceptionWhenCompletionWithErrorReceived()

var invokeTask = hubConnection.InvokeAsync<int>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3, error = "An error occurred"}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout();

var ex = await Assert.ThrowsAsync<HubException>(() => invokeTask).OrTimeout();
Assert.Equal("An error occurred", ex.Message);
Expand All @@ -266,7 +266,7 @@ public async Task StreamFailsIfCompletionMessageHasPayload()

var channel = await hubConnection.StreamAsChannelAsync<string>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3, result = "Oops"}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout();

var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await channel.ReadAllAsync().OrTimeout());
Assert.Equal("Server provided a result in a completion response to a streamed invocation.", ex.Message);
Expand All @@ -289,7 +289,7 @@ public async Task StreamFailsWithExceptionWhenCompletionWithErrorReceived()

var channel = await hubConnection.StreamAsChannelAsync<int>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3, error = "An error occurred"}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout();

var ex = await Assert.ThrowsAsync<HubException>(async () => await channel.ReadAllAsync().OrTimeout());
Assert.Equal("An error occurred", ex.Message);
Expand All @@ -312,7 +312,7 @@ public async Task InvokeFailsWithErrorWhenStreamingItemReceived()

var invokeTask = hubConnection.InvokeAsync<int>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 2, item = 42}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 42 }).OrTimeout();

var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => invokeTask).OrTimeout();
Assert.Equal("Streaming hub methods must be invoked with the 'HubConnection.StreamAsChannelAsync' method.", ex.Message);
Expand All @@ -335,14 +335,14 @@ public async Task StreamYieldsItemsAsTheyArrive()

var channel = await hubConnection.StreamAsChannelAsync<string>("Foo").OrTimeout();

await connection.ReceiveJsonMessage(new {invocationId = "1", type = 2, item = "1"}).OrTimeout();
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 2, item = "2"}).OrTimeout();
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 2, item = "3"}).OrTimeout();
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "3" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();

var notifications = await channel.ReadAllAsync().OrTimeout();

Assert.Equal(new[] {"1", "2", "3",}, notifications.ToArray());
Assert.Equal(new[] { "1", "2", "3", }, notifications.ToArray());
}
finally
{
Expand All @@ -361,10 +361,10 @@ public async Task HandlerRegisteredWithOnIsFiredWhenInvocationReceived()
{
await hubConnection.StartAsync().OrTimeout();

hubConnection.On<int, string, float>("Foo", (r1, r2, r3) => handlerCalled.TrySetResult(new object[] {r1, r2, r3}));
hubConnection.On<int, string, float>("Foo", (r1, r2, r3) => handlerCalled.TrySetResult(new object[] { r1, r2, r3 }));

var args = new object[] {1, "Foo", 2.0f};
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 1, target = "Foo", arguments = args}).OrTimeout();
var args = new object[] { 1, "Foo", 2.0f };
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 1, target = "Foo", arguments = args }).OrTimeout();

Assert.Equal(args, await handlerCalled.Task.OrTimeout());
}
Expand All @@ -389,10 +389,10 @@ public async Task AcceptsPingMessages()
var invokeTask = hubConnection.InvokeAsync("Foo").OrTimeout();

// Receive the ping mid-invocation so we can see that the rest of the flow works fine
await connection.ReceiveJsonMessage(new {type = 6}).OrTimeout();
await connection.ReceiveJsonMessage(new { type = 6 }).OrTimeout();

// Receive a completion
await connection.ReceiveJsonMessage(new {invocationId = "1", type = 3}).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();

// Ensure the invokeTask completes properly
await invokeTask.OrTimeout();
Expand All @@ -403,6 +403,100 @@ public async Task AcceptsPingMessages()
await connection.DisposeAsync().OrTimeout();
}
}

[Fact]
public async Task PartialHandshakeResponseWorks()
{
var connection = new TestConnection(synchronousCallbacks: true, autoNegotiate: false);
var hubConnection = CreateHubConnection(connection);
try
{
var task = hubConnection.StartAsync();

await connection.ReceiveTextAsync("{");

Assert.False(task.IsCompleted);

await connection.ReceiveTextAsync("}");

Assert.False(task.IsCompleted);

await connection.ReceiveTextAsync("\u001e");

await task.OrTimeout();
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}

[Fact]
public async Task HandshakeAndInvocationInSameBufferWorks()
{
var payload = "{}\u001e{\"type\":1, \"target\": \"Echo\", \"arguments\":[\"hello\"]}\u001e";
var connection = new TestConnection(synchronousCallbacks: true, autoNegotiate: false);
var hubConnection = CreateHubConnection(connection);
try
{
var tcs = new TaskCompletionSource<string>();
hubConnection.On<string>("Echo", data =>
{
tcs.TrySetResult(data);
});

await connection.ReceiveTextAsync(payload);

await hubConnection.StartAsync();

var response = await tcs.Task.OrTimeout();
Assert.Equal("hello", response);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}

[Fact]
public async Task PartialInvocationWorks()
{
var connection = new TestConnection(synchronousCallbacks: true);
var hubConnection = CreateHubConnection(connection);
try
{
var tcs = new TaskCompletionSource<string>();
hubConnection.On<string>("Echo", data =>
{
tcs.TrySetResult(data);
});

await hubConnection.StartAsync().OrTimeout();

await connection.ReceiveTextAsync("{\"type\":1, ");

Assert.False(tcs.Task.IsCompleted);

await connection.ReceiveTextAsync("\"target\": \"Echo\", \"arguments\"");

Assert.False(tcs.Task.IsCompleted);

await connection.ReceiveTextAsync(":[\"hello\"]}\u001e");

Assert.True(tcs.Task.IsCompleted);

var response = await tcs.Task;

Assert.Equal("hello", response);
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
}
}
}
17 changes: 15 additions & 2 deletions test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ internal class TestConnection : IConnection
public IFeatureCollection Features { get; } = new FeatureCollection();
public int DisposeCount => _disposeCount;

public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoNegotiate = true)
public TestConnection(Func<Task> onStart = null, Func<Task> onDispose = null, bool autoNegotiate = true, bool synchronousCallbacks = false)
{
_autoNegotiate = autoNegotiate;
_onStart = onStart ?? (() => Task.CompletedTask);
_onDispose = onDispose ?? (() => Task.CompletedTask);

var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var scheduler = synchronousCallbacks ? PipeScheduler.Inline : null;
var options = new PipeOptions(readerScheduler: scheduler, writerScheduler: scheduler, useSynchronizationContext: false);

var pair = DuplexPipe.CreateConnectionPair(options, options);
Application = pair.Application;
Transport = pair.Transport;

Expand Down Expand Up @@ -88,6 +91,16 @@ public Task ReceiveJsonMessage(object jsonObject)
return Application.Output.WriteAsync(bytes).AsTask();
}

public Task ReceiveTextAsync(string rawText)
{
return ReceiveBytesAsync(Encoding.UTF8.GetBytes(rawText));
}

public Task ReceiveBytesAsync(byte[] bytes)
{
return Application.Output.WriteAsync(bytes).AsTask();
}

public async Task<string> ReadSentTextMessageAsync()
{
// Read a single text message from the Application Input pipe
Expand Down
Loading

0 comments on commit 80b842e

Please sign in to comment.