diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index ec9632c4d1f25..9e3f41ee8236a 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -9,7 +9,8 @@ Commonly Used Types: System.IO.Pipelines.Pipe System.IO.Pipelines.PipeWriter System.IO.Pipelines.PipeReader - 2 + true + 3 FlushAsync(CancellationToken cancellationToken) ValueTask result; lock (SyncObj) { - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); @@ -355,7 +355,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) return result; } - private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) + private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { var completeReader = CommitUnsynchronized(); @@ -691,6 +691,9 @@ internal ValueTask ReadAtLeastAsync(int minimumBytes, CancellationTo // We also need to flip the reading state off _operationState.EndRead(); + + // Begin read again to wire up cancellation token + _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); } // If the writer is currently paused and we are about the wait for more data then this would deadlock. @@ -1057,7 +1060,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella WriteMultiSegment(source.Span); } - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs index c12d3b88253cf..64762de5ec8ce 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs @@ -162,5 +162,36 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync() Assert.True(result.IsCanceled); PipeReader.AdvanceTo(buffer.End); } + + [Fact] + public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData() + { + CancellationTokenSource cts = new CancellationTokenSource(); + ValueTask task = PipeReader.ReadAtLeastAsync(1, cts.Token); + cts.Cancel(); + return Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSome() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask task = PipeReader.ReadAtLeastAsync(11, cts.Token); + cts.Cancel(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask task = PipeReader.ReadAtLeastAsync(12, cts.Token); + // Write, but not enough to unblock ReadAtLeastAsync + await Pipe.WriteAsync(new byte[1], default); + cts.Cancel(); + await Assert.ThrowsAnyAsync(async () => await task); + } } }