Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -565,20 +565,13 @@ public IAsyncEnumerable<TResult> StreamAsyncCore<TResult>(string methodName, obj
private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, object[] args, CancellationTokenSource cts)
{
var reader = await StreamAsChannelCoreAsync(methodName, typeof(T), args, cts.Token);
try
while (await reader.WaitToReadAsync(cts.Token))
{
while (await reader.WaitToReadAsync(cts.Token))
while (reader.TryRead(out var item))
{
while (reader.TryRead(out var item))
{
yield return (T)item;
}
yield return (T)item;
}
}
finally
{
cts.Dispose();
}
}

private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,36 @@ public async Task StreamAsyncCanceledWhenPassedCanceledToken()
}
}

[Fact]
public async Task CanCancelTokenAfterStreamIsCompleted()
{
using (StartVerifiableLog())
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);

await hubConnection.StartAsync().OrTimeout();

var asyncEnumerable = hubConnection.StreamAsync<int>("Stream", 1);
using var cts = new CancellationTokenSource();
await using var e = asyncEnumerable.GetAsyncEnumerator(cts.Token);
var task = e.MoveNextAsync();

var item = await connection.ReadSentJsonAsync().OrTimeout();
await connection.ReceiveJsonMessage(
new { type = HubProtocolConstants.CompletionMessageType, invocationId = item["invocationId"] }
).OrTimeout();

await task.OrTimeout();

while (await e.MoveNextAsync().OrTimeout())
{
}
// Cancel after stream is completed but before the AsyncEnumerator is disposed
cts.Cancel();
}
}

[Fact]
public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages()
{
Expand Down