From a21b9a2dd4c31cf5bd37626562b7612faf21cee6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 13 Apr 2022 09:36:31 -0700 Subject: [PATCH] [release/6.0] Fix cancel in PipeReader.ReadAtLeastAsync (#66870) * Fix cancel in PipeReader.ReadAtLeastAsync * Add missing GeneratePackageOnBuild and bump version Co-authored-by: Brennan Co-authored-by: Carlos Sanchez <1175054+carlossanlop@users.noreply.github.com> --- .../src/System.IO.Pipelines.csproj | 3 +- .../src/System/IO/Pipelines/Pipe.cs | 9 ++++-- .../tests/PipeReaderReadAtLeastAsyncTests.cs | 31 +++++++++++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index ec9632c4d1f25..9e3f41ee8236a 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -9,7 +9,8 @@ Commonly Used Types: System.IO.Pipelines.Pipe System.IO.Pipelines.PipeWriter System.IO.Pipelines.PipeReader - 2 + true + 3 FlushAsync(CancellationToken cancellationToken) ValueTask result; lock (SyncObj) { - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); @@ -355,7 +355,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) return result; } - private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) + private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { var completeReader = CommitUnsynchronized(); @@ -691,6 +691,9 @@ internal ValueTask ReadAtLeastAsync(int minimumBytes, CancellationTo // We also need to flip the reading state off _operationState.EndRead(); + + // Begin read again to wire up cancellation token + _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); } // If the writer is currently paused and we are about the wait for more data then this would deadlock. @@ -1057,7 +1060,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella WriteMultiSegment(source.Span); } - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs index c12d3b88253cf..64762de5ec8ce 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs @@ -162,5 +162,36 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync() Assert.True(result.IsCanceled); PipeReader.AdvanceTo(buffer.End); } + + [Fact] + public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData() + { + CancellationTokenSource cts = new CancellationTokenSource(); + ValueTask task = PipeReader.ReadAtLeastAsync(1, cts.Token); + cts.Cancel(); + return Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSome() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask task = PipeReader.ReadAtLeastAsync(11, cts.Token); + cts.Cancel(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask task = PipeReader.ReadAtLeastAsync(12, cts.Token); + // Write, but not enough to unblock ReadAtLeastAsync + await Pipe.WriteAsync(new byte[1], default); + cts.Cancel(); + await Assert.ThrowsAnyAsync(async () => await task); + } } }