Skip to content

Commit

Permalink
Small HubConnection cleanup (#8643)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidfowl authored and mikaelm12 committed Mar 21, 2019
1 parent 0c4b3fb commit 0d6e063
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Expand Up @@ -59,6 +59,13 @@ public partial class HubConnection

private readonly ConnectionLogScope _logScope;

// The receive loop has a single reader and single writer at a time so optimize the channel for that
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
};

// Transient state to a connection
private ConnectionState _connectionState;
private int _serverProtocolMinorVersion;
Expand Down Expand Up @@ -823,14 +830,16 @@ private async Task HandshakeAsync(ConnectionState startingConnectionState, Cance
throw new InvalidOperationException("The server disconnected before the handshake was completed");
}

var input = startingConnectionState.Connection.Transport.Input;

try
{
using (var handshakeCts = new CancellationTokenSource(HandshakeTimeout))
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, handshakeCts.Token))
{
while (true)
{
var result = await startingConnectionState.Connection.Transport.Input.ReadAsync(cts.Token);
var result = await input.ReadAsync(cts.Token);

var buffer = result.Buffer;
var consumed = buffer.Start;
Expand Down Expand Up @@ -871,7 +880,7 @@ private async Task HandshakeAsync(ConnectionState startingConnectionState, Cance
}
finally
{
startingConnectionState.Connection.Transport.Input.AdvanceTo(consumed, examined);
input.AdvanceTo(consumed, examined);
}
}
}
Expand Down Expand Up @@ -899,11 +908,11 @@ private async Task ReceiveLoop(ConnectionState connectionState)
// Performs periodic tasks -- here sending pings and checking timeout
// Disposed with `timer.Stop()` in the finally block below
var timer = new TimerAwaitable(TickRate, TickRate);
_ = TimerLoop(timer);
var timerTask = TimerLoop(timer);

var uploadStreamSource = new CancellationTokenSource();
_uploadStreamToken = uploadStreamSource.Token;
var invocationMessageChannel = Channel.CreateUnbounded<InvocationMessage>();
var invocationMessageChannel = Channel.CreateUnbounded<InvocationMessage>(_receiveLoopOptions);
var invocationMessageReceiveTask = StartProcessingInvocationMessages(invocationMessageChannel.Reader);

async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> invocationMessageChannelReader)
Expand All @@ -917,11 +926,13 @@ async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> in
}
}

var input = connectionState.Connection.Transport.Input;

try
{
while (true)
{
var result = await connectionState.Connection.Transport.Input.ReadAsync();
var result = await input.ReadAsync();
var buffer = result.Buffer;

try
Expand Down Expand Up @@ -972,7 +983,7 @@ async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> in
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as `buffer.End` so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
Expand All @@ -986,6 +997,7 @@ async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> in
invocationMessageChannel.Writer.TryComplete();
await invocationMessageReceiveTask;
timer.Stop();
await timerTask;
uploadStreamSource.Cancel();
}

Expand Down

0 comments on commit 0d6e063

Please sign in to comment.