From 37876054cf77053b0f24fffa93f190af9e5926bf Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 28 Dec 2020 15:46:56 -0800 Subject: [PATCH 1/3] [SignalR] Ignore ODE with AsyncEnumerables --- .../test/UnitTests/HubConnectionTests.cs | 30 +++++++++++++++++++ .../common/Shared/AsyncEnumerableAdapters.cs | 12 +++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs index 618d6f14a92b..c1090a0df66b 100644 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs @@ -254,6 +254,36 @@ public async Task StreamAsyncCanceledWhenPassedCanceledToken() } } + [Fact] + public async Task CanCancelTokenAfterStreamIsCompleted() + { + using (StartVerifiableLog()) + { + var connection = new TestConnection(); + var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory); + + await hubConnection.StartAsync().OrTimeout(); + + var asyncEnumerable = hubConnection.StreamAsync("Stream", 1); + using var cts = new CancellationTokenSource(); + await using var e = asyncEnumerable.GetAsyncEnumerator(cts.Token); + var task = e.MoveNextAsync(); + + var item = await connection.ReadSentJsonAsync().OrTimeout(); + await connection.ReceiveJsonMessage( + new { type = HubProtocolConstants.CompletionMessageType, invocationId = item["invocationId"] } + ).OrTimeout(); + + await task.OrTimeout(); + + while (await e.MoveNextAsync().OrTimeout()) + { + } + // Cancel after stream is completed but before the AsyncEnumerator is disposed + cts.Cancel(); + } + } + [Fact] public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages() { diff --git a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs index 8139ca0eeb0f..b6bc1cb4df98 100644 --- a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs +++ b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs @@ -3,6 +3,7 @@ #nullable disable +using System; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; @@ -66,7 +67,16 @@ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellati { var registration = cancellationToken.Register((ctsState) => { - ((CancellationTokenSource)ctsState).Cancel(); + try + { + ((CancellationTokenSource)ctsState).Cancel(); + } + catch (ObjectDisposedException) + { + // cancellationToken is passed in by the user, and if they call cancel after the + // enumerator is finished but before the enumerator is disposed + // then _cts might already be disposed by our wrapping code + } }, _cts); return new CancelableEnumerator(enumerator, registration); From d69bc42055581168bcf71ec241b938052ccdc219 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Wed, 6 Jan 2021 15:03:26 -0800 Subject: [PATCH 2/3] dont dispose --- .../clients/csharp/Client.Core/src/HubConnection.cs | 13 +++---------- .../common/Shared/AsyncEnumerableAdapters.cs | 11 +---------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs index 6a09abb8d026..9d5b908af61d 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs @@ -565,20 +565,13 @@ public IAsyncEnumerable StreamAsyncCore(string methodName, obj private async IAsyncEnumerable CastIAsyncEnumerable(string methodName, object[] args, CancellationTokenSource cts) { var reader = await StreamAsChannelCoreAsync(methodName, typeof(T), args, cts.Token); - try + while (await reader.WaitToReadAsync(cts.Token)) { - while (await reader.WaitToReadAsync(cts.Token)) + while (reader.TryRead(out var item)) { - while (reader.TryRead(out var item)) - { - yield return (T)item; - } + yield return (T)item; } } - finally - { - cts.Dispose(); - } } private async Task> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken) diff --git a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs index b6bc1cb4df98..7d6d6623302c 100644 --- a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs +++ b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs @@ -67,16 +67,7 @@ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellati { var registration = cancellationToken.Register((ctsState) => { - try - { - ((CancellationTokenSource)ctsState).Cancel(); - } - catch (ObjectDisposedException) - { - // cancellationToken is passed in by the user, and if they call cancel after the - // enumerator is finished but before the enumerator is disposed - // then _cts might already be disposed by our wrapping code - } + ((CancellationTokenSource)ctsState).Cancel(); }, _cts); return new CancelableEnumerator(enumerator, registration); From 61ddcbace01d738c59a9ec56fd855299053afaf1 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Wed, 6 Jan 2021 15:13:44 -0800 Subject: [PATCH 3/3] using --- src/SignalR/common/Shared/AsyncEnumerableAdapters.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs index 7d6d6623302c..8139ca0eeb0f 100644 --- a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs +++ b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs @@ -3,7 +3,6 @@ #nullable disable -using System; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices;