Skip to content

Commit

Permalink
fb
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Jan 25, 2021
1 parent 55fa14c commit 5c0693f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 32 deletions.
8 changes: 4 additions & 4 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, obj
{
while (reader.TryRead(out var item))
{
yield return (T)item;
yield return (T)item!;
}
}
}
Expand Down Expand Up @@ -662,7 +662,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
}

var id = connectionState.GetNextId();
readers[id] = args[i];
readers[id] = arg;
streamIds.Add(id);

Log.StartingStream(_logger, id);
Expand All @@ -675,8 +675,8 @@ async Task OnStreamCanceled(InvocationRequest irq)
}

var newArgs = newArgsCount > 0
? new object[newArgsCount]
: Array.Empty<object>();
? new object?[newArgsCount]
: Array.Empty<object?>();
int newArgsIndex = 0;

for (var i = 0; i < args.Length; i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Http;
Expand Down Expand Up @@ -69,9 +70,11 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio

private async Task ProcessAsync(Uri url)
{
Debug.Assert(_application != null);

// Start sending and polling (ask for binary if the server supports it)
var receiving = Poll(url, _transportCts.Token);
var sending = SendUtils.SendMessages(url, _application!, _httpClient, _logger);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger);

// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending);
Expand All @@ -85,7 +88,7 @@ private async Task ProcessAsync(Uri url)
// 2. Waiting for an outgoing send (this should be instantaneous)

// Cancel the application so that ReadAsync yields
_application!.Input.CancelPendingRead();
_application.Input.CancelPendingRead();

await sending;
}
Expand All @@ -98,7 +101,7 @@ private async Task ProcessAsync(Uri url)
_transportCts.Cancel();

// Cancel any pending flush so that we can quit
_application!.Output.CancelPendingFlush();
_application.Output.CancelPendingFlush();

await receiving;

Expand Down Expand Up @@ -137,10 +140,12 @@ public async Task StopAsync()

private async Task Poll(Uri pollUrl, CancellationToken cancellationToken)
{
Debug.Assert(_application != null);

Log.StartReceive(_logger);

// Allocate this once for the duration of the transport so we can continuously write to it
var applicationStream = new PipeWriterStream(_application!.Output);
var applicationStream = new PipeWriterStream(_application.Output);

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.Http.Headers;
Expand Down Expand Up @@ -92,9 +93,11 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio

private async Task ProcessAsync(Uri url, HttpResponseMessage response)
{
Debug.Assert(_application != null);

// Start sending and polling (ask for binary if the server supports it)
var receiving = ProcessEventStream(response, _transportCts.Token);
var sending = SendUtils.SendMessages(url, _application!, _httpClient, _logger, _inputCts.Token);
var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, _inputCts.Token);

// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending);
Expand All @@ -108,7 +111,7 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response)
_inputCts.Cancel();

// Cancel the application so that ReadAsync yields
_application!.Input.CancelPendingRead();
_application.Input.CancelPendingRead();

await sending;
}
Expand All @@ -120,14 +123,16 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response)
_transportCts.Cancel();

// Cancel any pending flush so that we can quit
_application!.Output.CancelPendingFlush();
_application.Output.CancelPendingFlush();

await receiving;
}
}

private async Task ProcessEventStream(HttpResponseMessage response, CancellationToken cancellationToken)
{
Debug.Assert(_application != null);

Log.StartReceive(_logger);

static void CancelReader(object? state) => ((PipeReader)state!).CancelPendingRead();
Expand Down Expand Up @@ -168,7 +173,7 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation
case ServerSentEventsMessageParser.ParseResult.Completed:
Log.MessageToApplication(_logger, message!.Length);

flushResult = await _application!.Output.WriteAsync(message);
flushResult = await _application.Output.WriteAsync(message);

_parser.Reset();
break;
Expand Down Expand Up @@ -205,7 +210,7 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation
}
finally
{
_application!.Output.Complete(_error);
_application.Output.Complete(_error);

Log.ReceiveStopped(_logger);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio

private async Task ProcessSocketAsync(WebSocket socket)
{
Debug.Assert(_application != null);

using (socket)
{
// Begin sending and receiving.
Expand All @@ -193,7 +195,7 @@ private async Task ProcessSocketAsync(WebSocket socket)
// 2. Waiting for a websocket send to complete

// Cancel the application so that ReadAsync yields
_application!.Input.CancelPendingRead();
_application.Input.CancelPendingRead();

using (var delayCts = new CancellationTokenSource())
{
Expand Down Expand Up @@ -225,13 +227,15 @@ private async Task ProcessSocketAsync(WebSocket socket)
socket.Abort();

// Cancel any pending flush so that we can quit
_application!.Output.CancelPendingFlush();
_application.Output.CancelPendingFlush();
}
}
}

private async Task StartReceiving(WebSocket socket)
{
Debug.Assert(_application != null);

try
{
while (true)
Expand All @@ -252,7 +256,7 @@ private async Task StartReceiving(WebSocket socket)
return;
}
#endif
var memory = _application!.Output.GetMemory();
var memory = _application.Output.GetMemory();
#if NETSTANDARD2_1 || NETCOREAPP
// Because we checked the CloseStatus from the 0 byte read above, we don't need to check again after reading
var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None);
Expand Down Expand Up @@ -280,9 +284,9 @@ private async Task StartReceiving(WebSocket socket)

Log.MessageReceived(_logger, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);

_application!.Output.Advance(receiveResult.Count);
_application.Output.Advance(receiveResult.Count);

var flushResult = await _application!.Output.FlushAsync();
var flushResult = await _application.Output.FlushAsync();

// We canceled in the middle of applying back pressure
// or if the consumer is done
Expand All @@ -300,27 +304,29 @@ private async Task StartReceiving(WebSocket socket)
{
if (!_aborted)
{
_application!.Output.Complete(ex);
_application.Output.Complete(ex);
}
}
finally
{
// We're done writing
_application!.Output.Complete();
_application.Output.Complete();

Log.ReceiveStopped(_logger);
}
}

private async Task StartSending(WebSocket socket)
{
Debug.Assert(_application != null);

Exception? error = null;

try
{
while (true)
{
var result = await _application!.Input.ReadAsync();
var result = await _application.Input.ReadAsync();
var buffer = result.Buffer;

// Get a frame from the application
Expand Down Expand Up @@ -363,7 +369,7 @@ private async Task StartSending(WebSocket socket)
}
finally
{
_application!.Input.AdvanceTo(buffer.End);
_application.Input.AdvanceTo(buffer.End);
}
}
}
Expand All @@ -386,7 +392,7 @@ private async Task StartSending(WebSocket socket)
}
}

_application!.Input.Complete();
_application.Input.Complete();

Log.SendStopped(_logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ internal class HttpConnectionContext : ConnectionContext,
/// Creates the DefaultConnectionContext without Pipes to avoid upfront allocations.
/// The caller is expected to set the <see cref="Transport"/> and <see cref="Application"/> pipes manually.
/// </summary>
/// <param name="connectionId"></param>
/// <param name="connectionToken"></param>
/// <param name="logger"></param>
/// <param name="transport"></param>
/// <param name="application"></param>
public HttpConnectionContext(string connectionId, string connectionToken, ILogger logger, IDuplexPipe transport, IDuplexPipe application)
{
Transport = transport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
reader.Read();
while (reader.TokenType != JsonTokenType.EndArray)
{
newStreamIds.Add(reader.GetString() ?? throw new InvalidDataException($"Null value for {StreamIdsPropertyName} is not valid."));
newStreamIds.Add(reader.GetString() ?? throw new InvalidDataException($"Null value for '{StreamIdsPropertyName}' is not valid."));
reader.Read();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
reader.Read();
while (reader.TokenType != JsonToken.EndArray)
{
newStreamIds.Add(reader.Value?.ToString() ?? throw new InvalidDataException($"Null value for {StreamIdsPropertyName} is not valid."));
newStreamIds.Add(reader.Value?.ToString() ?? throw new InvalidDataException($"Null value for '{StreamIdsPropertyName}' is not valid."));
reader.Read();
}

Expand Down
4 changes: 2 additions & 2 deletions src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal partial class DefaultHubDispatcher<THub> : HubDispatcher<THub> where TH

public override async Task OnConnectedAsync(HubConnectionContext connection)
{
IServiceScope scope = _serviceScopeFactory.CreateScope();
var scope = _serviceScopeFactory.CreateScope();

try
{
Expand Down Expand Up @@ -105,7 +105,7 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)

public override async Task OnDisconnectedAsync(HubConnectionContext connection, Exception? exception)
{
IServiceScope scope = _serviceScopeFactory.CreateScope();
var scope = _serviceScopeFactory.CreateScope();

try
{
Expand Down

0 comments on commit 5c0693f

Please sign in to comment.