Skip to content

Commit

Permalink
[release/6.0] Fix cancel in PipeReader.ReadAtLeastAsync (dotnet#66870)
Browse files Browse the repository at this point in the history
* Fix cancel in PipeReader.ReadAtLeastAsync

* Add missing GeneratePackageOnBuild and bump version

Co-authored-by: Brennan <brecon@microsoft.com>
Co-authored-by: Carlos Sanchez <1175054+carlossanlop@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 13, 2022
1 parent 49c4a15 commit a21b9a2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ Commonly Used Types:
System.IO.Pipelines.Pipe
System.IO.Pipelines.PipeWriter
System.IO.Pipelines.PipeReader</PackageDescription>
<ServicingVersion>2</ServicingVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<ServicingVersion>3</ServicingVersion>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,15 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
ValueTask<FlushResult> result;
lock (SyncObj)
{
PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}

TrySchedule(ReaderScheduler, completionData);

return result;
}

private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var completeReader = CommitUnsynchronized();

Expand Down Expand Up @@ -691,6 +691,9 @@ internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationTo

// We also need to flip the reading state off
_operationState.EndRead();

// Begin read again to wire up cancellation token
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
}

// If the writer is currently paused and we are about the wait for more data then this would deadlock.
Expand Down Expand Up @@ -1057,7 +1060,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
WriteMultiSegment(source.Span);
}

PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}

TrySchedule(ReaderScheduler, completionData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,36 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
Assert.True(result.IsCanceled);
PipeReader.AdvanceTo(buffer.End);
}

[Fact]
public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
{
CancellationTokenSource cts = new CancellationTokenSource();
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token);
cts.Cancel();
return Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
}

[Fact]
public async Task ReadAtLeastAsyncCancelableAfterReadingSome()
{
CancellationTokenSource cts = new CancellationTokenSource();
await Pipe.WriteAsync(new byte[10], default);
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token);
cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
}

[Fact]
public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead()
{
CancellationTokenSource cts = new CancellationTokenSource();
await Pipe.WriteAsync(new byte[10], default);
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(12, cts.Token);
// Write, but not enough to unblock ReadAtLeastAsync
await Pipe.WriteAsync(new byte[1], default);
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task);
}
}
}

0 comments on commit a21b9a2

Please sign in to comment.