diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs index d525769f7a09..a34dd16b8245 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs @@ -569,7 +569,7 @@ private async IAsyncEnumerable CastIAsyncEnumerable(string methodName, obj { while (reader.TryRead(out var item)) { - yield return (T)item; + yield return (T)item!; } } } @@ -662,7 +662,7 @@ async Task OnStreamCanceled(InvocationRequest irq) } var id = connectionState.GetNextId(); - readers[id] = args[i]; + readers[id] = arg; streamIds.Add(id); Log.StartingStream(_logger, id); @@ -675,8 +675,8 @@ async Task OnStreamCanceled(InvocationRequest irq) } var newArgs = newArgsCount > 0 - ? new object[newArgsCount] - : Array.Empty(); + ? new object?[newArgsCount] + : Array.Empty(); int newArgsIndex = 0; for (var i = 0; i < args.Length; i++) diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/LongPollingTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/LongPollingTransport.cs index e331a54e00d7..11539dfa72f6 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/LongPollingTransport.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/LongPollingTransport.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.IO.Pipelines; using System.Net; using System.Net.Http; @@ -69,9 +70,11 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio private async Task ProcessAsync(Uri url) { + Debug.Assert(_application != null); + // Start sending and polling (ask for binary if the server supports it) var receiving = Poll(url, _transportCts.Token); - var sending = SendUtils.SendMessages(url, _application!, _httpClient, _logger); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger); // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending); @@ -85,7 +88,7 @@ private async Task ProcessAsync(Uri url) // 2. Waiting for an outgoing send (this should be instantaneous) // Cancel the application so that ReadAsync yields - _application!.Input.CancelPendingRead(); + _application.Input.CancelPendingRead(); await sending; } @@ -98,7 +101,7 @@ private async Task ProcessAsync(Uri url) _transportCts.Cancel(); // Cancel any pending flush so that we can quit - _application!.Output.CancelPendingFlush(); + _application.Output.CancelPendingFlush(); await receiving; @@ -137,10 +140,12 @@ public async Task StopAsync() private async Task Poll(Uri pollUrl, CancellationToken cancellationToken) { + Debug.Assert(_application != null); + Log.StartReceive(_logger); // Allocate this once for the duration of the transport so we can continuously write to it - var applicationStream = new PipeWriterStream(_application!.Output); + var applicationStream = new PipeWriterStream(_application.Output); try { diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs index 071d0291c24d..89000d26ddb6 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.IO.Pipelines; using System.Net.Http; using System.Net.Http.Headers; @@ -92,9 +93,11 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio private async Task ProcessAsync(Uri url, HttpResponseMessage response) { + Debug.Assert(_application != null); + // Start sending and polling (ask for binary if the server supports it) var receiving = ProcessEventStream(response, _transportCts.Token); - var sending = SendUtils.SendMessages(url, _application!, _httpClient, _logger, _inputCts.Token); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, _inputCts.Token); // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending); @@ -108,7 +111,7 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response) _inputCts.Cancel(); // Cancel the application so that ReadAsync yields - _application!.Input.CancelPendingRead(); + _application.Input.CancelPendingRead(); await sending; } @@ -120,7 +123,7 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response) _transportCts.Cancel(); // Cancel any pending flush so that we can quit - _application!.Output.CancelPendingFlush(); + _application.Output.CancelPendingFlush(); await receiving; } @@ -128,6 +131,8 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response) private async Task ProcessEventStream(HttpResponseMessage response, CancellationToken cancellationToken) { + Debug.Assert(_application != null); + Log.StartReceive(_logger); static void CancelReader(object? state) => ((PipeReader)state!).CancelPendingRead(); @@ -168,7 +173,7 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation case ServerSentEventsMessageParser.ParseResult.Completed: Log.MessageToApplication(_logger, message!.Length); - flushResult = await _application!.Output.WriteAsync(message); + flushResult = await _application.Output.WriteAsync(message); _parser.Reset(); break; @@ -205,7 +210,7 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation } finally { - _application!.Output.Complete(_error); + _application.Output.Complete(_error); Log.ReceiveStopped(_logger); diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs index 59727854d544..3ab6ddda2bbf 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs @@ -177,6 +177,8 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio private async Task ProcessSocketAsync(WebSocket socket) { + Debug.Assert(_application != null); + using (socket) { // Begin sending and receiving. @@ -193,7 +195,7 @@ private async Task ProcessSocketAsync(WebSocket socket) // 2. Waiting for a websocket send to complete // Cancel the application so that ReadAsync yields - _application!.Input.CancelPendingRead(); + _application.Input.CancelPendingRead(); using (var delayCts = new CancellationTokenSource()) { @@ -225,13 +227,15 @@ private async Task ProcessSocketAsync(WebSocket socket) socket.Abort(); // Cancel any pending flush so that we can quit - _application!.Output.CancelPendingFlush(); + _application.Output.CancelPendingFlush(); } } } private async Task StartReceiving(WebSocket socket) { + Debug.Assert(_application != null); + try { while (true) @@ -252,7 +256,7 @@ private async Task StartReceiving(WebSocket socket) return; } #endif - var memory = _application!.Output.GetMemory(); + var memory = _application.Output.GetMemory(); #if NETSTANDARD2_1 || NETCOREAPP // Because we checked the CloseStatus from the 0 byte read above, we don't need to check again after reading var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None); @@ -280,9 +284,9 @@ private async Task StartReceiving(WebSocket socket) Log.MessageReceived(_logger, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage); - _application!.Output.Advance(receiveResult.Count); + _application.Output.Advance(receiveResult.Count); - var flushResult = await _application!.Output.FlushAsync(); + var flushResult = await _application.Output.FlushAsync(); // We canceled in the middle of applying back pressure // or if the consumer is done @@ -300,13 +304,13 @@ private async Task StartReceiving(WebSocket socket) { if (!_aborted) { - _application!.Output.Complete(ex); + _application.Output.Complete(ex); } } finally { // We're done writing - _application!.Output.Complete(); + _application.Output.Complete(); Log.ReceiveStopped(_logger); } @@ -314,13 +318,15 @@ private async Task StartReceiving(WebSocket socket) private async Task StartSending(WebSocket socket) { + Debug.Assert(_application != null); + Exception? error = null; try { while (true) { - var result = await _application!.Input.ReadAsync(); + var result = await _application.Input.ReadAsync(); var buffer = result.Buffer; // Get a frame from the application @@ -363,7 +369,7 @@ private async Task StartSending(WebSocket socket) } finally { - _application!.Input.AdvanceTo(buffer.End); + _application.Input.AdvanceTo(buffer.End); } } } @@ -386,7 +392,7 @@ private async Task StartSending(WebSocket socket) } } - _application!.Input.Complete(); + _application.Input.Complete(); Log.SendStopped(_logger); } diff --git a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs index dd13c417b37c..39e40f865f6f 100644 --- a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs +++ b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs @@ -59,11 +59,6 @@ internal class HttpConnectionContext : ConnectionContext, /// Creates the DefaultConnectionContext without Pipes to avoid upfront allocations. /// The caller is expected to set the and pipes manually. /// - /// - /// - /// - /// - /// public HttpConnectionContext(string connectionId, string connectionToken, ILogger logger, IDuplexPipe transport, IDuplexPipe application) { Transport = transport; diff --git a/src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs b/src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs index d2e29952eb10..d17094f6ef74 100644 --- a/src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs +++ b/src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs @@ -178,7 +178,7 @@ public ReadOnlyMemory GetMessageBytes(HubMessage message) reader.Read(); while (reader.TokenType != JsonTokenType.EndArray) { - newStreamIds.Add(reader.GetString() ?? throw new InvalidDataException($"Null value for {StreamIdsPropertyName} is not valid.")); + newStreamIds.Add(reader.GetString() ?? throw new InvalidDataException($"Null value for '{StreamIdsPropertyName}' is not valid.")); reader.Read(); } diff --git a/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs b/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs index 696ebcfb5947..6c10a86ef229 100644 --- a/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs +++ b/src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs @@ -179,7 +179,7 @@ public ReadOnlyMemory GetMessageBytes(HubMessage message) reader.Read(); while (reader.TokenType != JsonToken.EndArray) { - newStreamIds.Add(reader.Value?.ToString() ?? throw new InvalidDataException($"Null value for {StreamIdsPropertyName} is not valid.")); + newStreamIds.Add(reader.Value?.ToString() ?? throw new InvalidDataException($"Null value for '{StreamIdsPropertyName}' is not valid.")); reader.Read(); } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 886c2c440e04..6a4ea35db16c 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -72,7 +72,7 @@ internal partial class DefaultHubDispatcher : HubDispatcher where TH public override async Task OnConnectedAsync(HubConnectionContext connection) { - IServiceScope scope = _serviceScopeFactory.CreateScope(); + var scope = _serviceScopeFactory.CreateScope(); try { @@ -105,7 +105,7 @@ public override async Task OnConnectedAsync(HubConnectionContext connection) public override async Task OnDisconnectedAsync(HubConnectionContext connection, Exception? exception) { - IServiceScope scope = _serviceScopeFactory.CreateScope(); + var scope = _serviceScopeFactory.CreateScope(); try {