Skip to content

Commit

Permalink
Fix Deflate/Brotli/CryptoStream handling of partial and zero-byte rea…
Browse files Browse the repository at this point in the history
…ds (#53644)

Stream.Read{Async} is supposed to return once at least a byte of data is available, and in particular, if there's any data already available, it shouldn't block.  But Read{Async} on DeflateStream (and thus also GZipStream and ZLibStream), BrotliStream, and CryptoStream won't return until either it hits the end of the stream or the caller's buffer is filled.  This makes it behave very unexpectedly when used in a context where the app is using a large read buffer but expects to be able to process data as it's available, e.g. in networked streaming scenarios where messages are being sent as part of bidirectional communication.

This fixes that by stopping looping once any data is consumed.  Just doing that, though, caused problems for zero-byte reads.  Zero-byte reads are typically used by code that's trying to delay-allocate a buffer for the read data until data will be available to read.  At present, however, zero-byte reads return immediately regardless of whether data is available to be consumed.  I've changed the flow to make it so that zero-byte reads don't return until there's at least some data available as input to the inflater/transform (this, though, doesn't 100% guarantee the inflater/transform will be able to produce output data).

Note that both of these changes have the potential to introduce breaks into an app that erroneously depended on these implementation details:
- If an app passing in a buffer of size N to Read{Async} depended on that call always producing the requested number of bytes (rather than what the Stream contract defines), they might experience behavioral changes.
- If an app passing in a zero-byte buffer expected it to return immediately, it might instead end up waiting until data was actually available.
  • Loading branch information
stephentoub committed Jun 11, 2021
1 parent ffcef4a commit 68dec6a
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 381 deletions.
Expand Up @@ -1562,11 +1562,6 @@ public abstract class ConnectedStreamConformanceTests : StreamConformanceTests
/// Gets whether the stream guarantees that all data written to it will be flushed as part of Flush{Async}.
/// </summary>
protected virtual bool FlushGuaranteesAllDataWritten => true;
/// <summary>
/// Gets whether a stream implements an aggressive read that tries to fill the supplied buffer and only
/// stops when it does so or hits EOF.
/// </summary>
protected virtual bool ReadsMayBlockUntilBufferFullOrEOF => false;
/// <summary>Gets whether reads for a count of 0 bytes block if no bytes are available to read.</summary>
protected virtual bool BlocksOnZeroByteReads => false;
/// <summary>
Expand Down Expand Up @@ -1709,6 +1704,10 @@ public virtual async Task ReadWriteByte_Success()
}
}

public static IEnumerable<object[]> ReadWrite_Modes =>
from mode in Enum.GetValues<ReadWriteMode>()
select new object[] { mode };

public static IEnumerable<object[]> ReadWrite_Success_MemberData() =>
from mode in Enum.GetValues<ReadWriteMode>()
from writeSize in new[] { 1, 42, 10 * 1024 }
Expand Down Expand Up @@ -1785,6 +1784,54 @@ public virtual async Task ReadWrite_Success(ReadWriteMode mode, int writeSize, b
}
}

[Theory]
[MemberData(nameof(ReadWrite_Modes))]
[ActiveIssue("https://github.com/dotnet/runtime/issues/51371", TestPlatforms.iOS | TestPlatforms.tvOS | TestPlatforms.MacCatalyst)]
public virtual async Task ReadWrite_MessagesSmallerThanReadBuffer_Success(ReadWriteMode mode)
{
if (!FlushGuaranteesAllDataWritten)
{
return;
}

foreach (CancellationToken nonCanceledToken in new[] { CancellationToken.None, new CancellationTokenSource().Token })
{
using StreamPair streams = await CreateConnectedStreamsAsync();

foreach ((Stream writeable, Stream readable) in GetReadWritePairs(streams))
{
byte[] writerBytes = RandomNumberGenerator.GetBytes(512);
var readerBytes = new byte[writerBytes.Length * 2];

// Repeatedly write then read a message smaller in size than the read buffer
for (int i = 0; i < 5; i++)
{
Task writes = Task.Run(async () =>
{
await WriteAsync(mode, writeable, writerBytes, 0, writerBytes.Length, nonCanceledToken);
if (FlushRequiredToWriteData)
{
await writeable.FlushAsync();
}
});

int n = 0;
while (n < writerBytes.Length)
{
int r = await ReadAsync(mode, readable, readerBytes, n, readerBytes.Length - n);
Assert.InRange(r, 1, writerBytes.Length - n);
n += r;
}

Assert.Equal(writerBytes.Length, n);
AssertExtensions.SequenceEqual(writerBytes, readerBytes.AsSpan(0, writerBytes.Length));

await writes;
}
}
}
}

[Theory]
[MemberData(nameof(AllReadWriteModesAndValue), false)]
[MemberData(nameof(AllReadWriteModesAndValue), true)]
Expand Down Expand Up @@ -2160,6 +2207,10 @@ public virtual async Task ZeroByteRead_BlocksUntilDataAvailableOrNops(ReadWriteM
});
Assert.Equal(0, await zeroByteRead);

// Perform a second zero-byte read.
await Task.Run(() => ReadAsync(mode, readable, Array.Empty<byte>(), 0, 0));

// Now consume all the data.
var readBytes = new byte[5];
int count = 0;
while (count < readBytes.Length)
Expand Down Expand Up @@ -2684,7 +2735,7 @@ public virtual async Task Flush_FlushesUnderlyingStream(bool flushAsync)
[InlineData(true, true)]
public virtual async Task Dispose_Flushes(bool useAsync, bool leaveOpen)
{
if (leaveOpen && (!SupportsLeaveOpen || ReadsMayBlockUntilBufferFullOrEOF))
if (leaveOpen && !SupportsLeaveOpen)
{
return;
}
Expand Down
Expand Up @@ -54,6 +54,6 @@ protected override Task<StreamPair> CreateConnectedStreamsAsync()
protected override Type UnsupportedReadWriteExceptionType => typeof(InvalidOperationException);
protected override bool WrappedUsableAfterClose => false;
protected override bool FlushRequiredToWriteData => true;
protected override bool FlushGuaranteesAllDataWritten => false;
protected override bool BlocksOnZeroByteReads => true;
}
}
17 changes: 14 additions & 3 deletions src/libraries/Common/tests/System/IO/Compression/ZipTestHelper.cs
Expand Up @@ -65,6 +65,17 @@ public static void ReadBytes(Stream stream, byte[] buffer, long bytesToRead)
}
}

public static int ReadAllBytes(Stream stream, byte[] buffer, int offset, int count)
{
int bytesRead;
int totalRead = 0;
while ((bytesRead = stream.Read(buffer, offset + totalRead, count - totalRead)) != 0)
{
totalRead += bytesRead;
}
return totalRead;
}

public static bool ArraysEqual<T>(T[] a, T[] b) where T : IComparable<T>
{
if (a.Length != b.Length) return false;
Expand Down Expand Up @@ -111,8 +122,8 @@ public static void StreamsEqual(Stream ast, Stream bst, int blocksToRead)
if (blocksToRead != -1 && blocksRead >= blocksToRead)
break;

ac = ast.Read(ad, 0, 4096);
bc = bst.Read(bd, 0, 4096);
ac = ReadAllBytes(ast, ad, 0, 4096);
bc = ReadAllBytes(bst, bd, 0, 4096);

if (ac != bc)
{
Expand Down Expand Up @@ -170,7 +181,7 @@ public static void IsZipSameAsDir(Stream archiveFile, string directory, ZipArchi
var buffer = new byte[entry.Length];
using (Stream entrystream = entry.Open())
{
entrystream.Read(buffer, 0, buffer.Length);
ReadAllBytes(entrystream, buffer, 0, buffer.Length);
#if NETCOREAPP
uint zipcrc = entry.Crc32;
Assert.Equal(CRC.CalculateCRC(buffer), zipcrc);
Expand Down
Expand Up @@ -173,21 +173,19 @@ private void EnsureNoActiveAsyncOperation()

private void AsyncOperationStarting()
{
if (Interlocked.CompareExchange(ref _activeAsyncOperation, 1, 0) != 0)
if (Interlocked.Exchange(ref _activeAsyncOperation, 1) != 0)
{
ThrowInvalidBeginCall();
}
}

private void AsyncOperationCompleting()
{
int oldValue = Interlocked.CompareExchange(ref _activeAsyncOperation, 0, 1);
Debug.Assert(oldValue == 1, $"Expected {nameof(_activeAsyncOperation)} to be 1, got {oldValue}");
Debug.Assert(_activeAsyncOperation == 1);
Volatile.Write(ref _activeAsyncOperation, 0);
}

private static void ThrowInvalidBeginCall()
{
private static void ThrowInvalidBeginCall() =>
throw new InvalidOperationException(SR.InvalidBeginCall);
}
}
}

0 comments on commit 68dec6a

Please sign in to comment.