Skip to content

Commit

Permalink
Implement ZLibStream and fix SocketsHttpHandler deflate support (#42717)
Browse files Browse the repository at this point in the history
* Implement ZLibStream and fix SocketsHttpHandler deflate support

- Implements ZLibStream, exposes it in the ref, and add tests
- Fixes SocketsHttpHandler to use ZLibStream instead of DeflateStream

* Add comment about deflate content encoding

* Apply suggestions from code review

Co-authored-by: Carlos Sanchez <1175054+carlossanlop@users.noreply.github.com>

* Fix netfx build

Co-authored-by: Carlos Sanchez <1175054+carlossanlop@users.noreply.github.com>
  • Loading branch information
stephentoub and carlossanlop committed Oct 9, 2020
1 parent c5b6881 commit d3beb60
Show file tree
Hide file tree
Showing 16 changed files with 520 additions and 234 deletions.
Expand Up @@ -1254,6 +1254,59 @@ public async Task Parallel_CompressDecompressMultipleStreamsConcurrently()
Assert.Equal(sourceData, decompressedStream.ToArray());
})));
}

[Fact]
public void Precancellation()
{
var ms = new MemoryStream();
using (Stream compressor = CreateStream(ms, CompressionMode.Compress, leaveOpen: true))
{
Assert.True(compressor.WriteAsync(new byte[1], 0, 1, new CancellationToken(true)).IsCanceled);
Assert.True(compressor.FlushAsync(new CancellationToken(true)).IsCanceled);
}
using (Stream decompressor = CreateStream(ms, CompressionMode.Decompress, leaveOpen: true))
{
Assert.True(decompressor.ReadAsync(new byte[1], 0, 1, new CancellationToken(true)).IsCanceled);
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task DisposeAsync_Flushes(bool leaveOpen)
{
var ms = new MemoryStream();
var cs = CreateStream(ms, CompressionMode.Compress, leaveOpen);
cs.WriteByte(1);
await cs.FlushAsync();

long pos = ms.Position;
cs.WriteByte(1);
Assert.Equal(pos, ms.Position);

await cs.DisposeAsync();
Assert.InRange(ms.ToArray().Length, pos + 1, int.MaxValue);
if (leaveOpen)
{
Assert.InRange(ms.Position, pos + 1, int.MaxValue);
}
else
{
Assert.Throws<ObjectDisposedException>(() => ms.Position);
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task DisposeAsync_MultipleCallsAllowed(bool leaveOpen)
{
using (var cs = CreateStream(new MemoryStream(), CompressionMode.Compress, leaveOpen))
{
await cs.DisposeAsync();
await cs.DisposeAsync();
}
}
}

internal sealed class BadWrappedStream : MemoryStream
Expand Down
Expand Up @@ -33,46 +33,55 @@ public static IEnumerable<object[]> RemoteServersAndCompressionUris()
foreach (Configuration.Http.RemoteServer remoteServer in Configuration.Http.RemoteServers)
{
yield return new object[] { remoteServer, remoteServer.GZipUri };
yield return new object[] { remoteServer, remoteServer.DeflateUri };

// Remote deflate endpoint isn't correctly following the deflate protocol.
//yield return new object[] { remoteServer, remoteServer.DeflateUri };
}
}

public static IEnumerable<object[]> DecompressedResponse_MethodSpecified_DecompressedContentReturned_MemberData()
[Theory]
[InlineData("gzip", false)]
[InlineData("gzip", true)]
[InlineData("deflate", false)]
[InlineData("deflate", true)]
[InlineData("br", false)]
[InlineData("br", true)]
public async Task DecompressedResponse_MethodSpecified_DecompressedContentReturned(string encodingName, bool all)
{
foreach (bool specifyAllMethods in new[] { false, true })
Func<Stream, Stream> compress;
DecompressionMethods methods;
switch (encodingName)
{
yield return new object[]
{
"deflate",
new Func<Stream, Stream>(s => new DeflateStream(s, CompressionLevel.Optimal, leaveOpen: true)),
specifyAllMethods ? DecompressionMethods.Deflate : _all
};
yield return new object[]
{
"gzip",
new Func<Stream, Stream>(s => new GZipStream(s, CompressionLevel.Optimal, leaveOpen: true)),
specifyAllMethods ? DecompressionMethods.GZip : _all
};
case "gzip":
compress = s => new GZipStream(s, CompressionLevel.Optimal, leaveOpen: true);
methods = all ? DecompressionMethods.GZip : _all;
break;

#if !NETFRAMEWORK
yield return new object[]
{
"br",
new Func<Stream, Stream>(s => new BrotliStream(s, CompressionLevel.Optimal, leaveOpen: true)),
specifyAllMethods ? DecompressionMethods.Brotli : _all
};
case "br":
if (IsWinHttpHandler)
{
// Brotli only supported on SocketsHttpHandler.
return;
}

compress = s => new BrotliStream(s, CompressionLevel.Optimal, leaveOpen: true);
methods = all ? DecompressionMethods.Brotli : _all;
break;

case "deflate":
// WinHttpHandler continues to use DeflateStream as it doesn't have a newer build than netstandard2.0
// and doesn't have access to ZLibStream.
compress = IsWinHttpHandler ?
new Func<Stream, Stream>(s => new DeflateStream(s, CompressionLevel.Optimal, leaveOpen: true)) :
new Func<Stream, Stream>(s => new ZLibStream(s, CompressionLevel.Optimal, leaveOpen: true));
methods = all ? DecompressionMethods.Deflate : _all;
break;
#endif
}
}

[Theory]
[MemberData(nameof(DecompressedResponse_MethodSpecified_DecompressedContentReturned_MemberData))]
public async Task DecompressedResponse_MethodSpecified_DecompressedContentReturned(
string encodingName, Func<Stream, Stream> compress, DecompressionMethods methods)
{
// Brotli only supported on SocketsHttpHandler.
if (IsWinHttpHandler && encodingName == "br")
{
return;
default:
Assert.Contains(encodingName, new[] { "br", "deflate", "gzip" });
return;
}

var expectedContent = new byte[12345];
Expand Down Expand Up @@ -104,15 +113,15 @@ public static IEnumerable<object[]> DecompressedResponse_MethodNotSpecified_Orig
{
yield return new object[]
{
"deflate",
new Func<Stream, Stream>(s => new DeflateStream(s, CompressionLevel.Optimal, leaveOpen: true)),
"gzip",
new Func<Stream, Stream>(s => new GZipStream(s, CompressionLevel.Optimal, leaveOpen: true)),
DecompressionMethods.None
};
#if !NETFRAMEWORK
yield return new object[]
{
"gzip",
new Func<Stream, Stream>(s => new GZipStream(s, CompressionLevel.Optimal, leaveOpen: true)),
"deflate",
new Func<Stream, Stream>(s => new ZLibStream(s, CompressionLevel.Optimal, leaveOpen: true)),
DecompressionMethods.Brotli
};
yield return new object[]
Expand Down Expand Up @@ -186,6 +195,26 @@ public async Task GetAsync_SetAutomaticDecompression_ContentDecompressed(Configu
}
}

// The remote server endpoint was written to use DeflateStream, which isn't actually a correct
// implementation of the deflate protocol (the deflate protocol requires the zlib wrapper around
// deflate). Until we can get that updated (and deal with previous releases still testing it
// via a DeflateStream-based implementation), we utilize httpbin.org to help validate behavior.
[OuterLoop("Uses external servers")]
[Theory]
[InlineData("http://httpbin.org/deflate", "\"deflated\": true")]
[InlineData("https://httpbin.org/deflate", "\"deflated\": true")]
[InlineData("http://httpbin.org/gzip", "\"gzipped\": true")]
[InlineData("https://httpbin.org/gzip", "\"gzipped\": true")]
public async Task GetAsync_SetAutomaticDecompression_ContentDecompressed(string uri, string expectedContent)
{
HttpClientHandler handler = CreateHttpClientHandler();
handler.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
using (HttpClient client = CreateHttpClient(handler))
{
Assert.Contains(expectedContent, await client.GetStringAsync(uri));
}
}

[OuterLoop("Uses external server")]
[Theory, MemberData(nameof(RemoteServersAndCompressionUris))]
public async Task GetAsync_SetAutomaticDecompression_HeadersRemoved(Configuration.Http.RemoteServer remoteServer, Uri uri)
Expand Down
Expand Up @@ -23,42 +23,6 @@ public class BrotliStreamUnitTests : CompressionStreamUnitTestBase

protected override string CompressedTestFile(string uncompressedPath) => Path.Combine("BrotliTestData", Path.GetFileName(uncompressedPath) + ".br");

[Fact]
public void Precancellation()
{
var ms = new MemoryStream();
using (Stream compressor = new BrotliStream(ms, CompressionMode.Compress, leaveOpen: true))
{
Assert.True(compressor.WriteAsync(new byte[1], 0, 1, new CancellationToken(true)).IsCanceled);
Assert.True(compressor.FlushAsync(new CancellationToken(true)).IsCanceled);
}
using (Stream decompressor = CreateStream(ms, CompressionMode.Decompress, leaveOpen: true))
{
Assert.True(decompressor.ReadAsync(new byte[1], 0, 1, new CancellationToken(true)).IsCanceled);
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task DisposeAsync_Flushes(bool leaveOpen)
{
var ms = new MemoryStream();
var bs = new BrotliStream(ms, CompressionMode.Compress, leaveOpen);
bs.WriteByte(1);
Assert.Equal(0, ms.Position);
await bs.DisposeAsync();
Assert.InRange(ms.ToArray().Length, 1, int.MaxValue);
if (leaveOpen)
{
Assert.InRange(ms.Position, 1, int.MaxValue);
}
else
{
Assert.Throws<ObjectDisposedException>(() => ms.Position);
}
}

[Fact]
[OuterLoop("Test takes ~6 seconds to run")]
public override void FlushAsync_DuringWriteAsync() { base.FlushAsync_DuringWriteAsync(); }
Expand Down
35 changes: 35 additions & 0 deletions src/libraries/System.IO.Compression/ref/System.IO.Compression.cs
Expand Up @@ -121,4 +121,39 @@ public enum ZipArchiveMode
Create = 1,
Update = 2,
}
public sealed partial class ZLibStream : System.IO.Stream
{
public ZLibStream(System.IO.Stream stream, System.IO.Compression.CompressionLevel compressionLevel) { }
public ZLibStream(System.IO.Stream stream, System.IO.Compression.CompressionLevel compressionLevel, bool leaveOpen) { }
public ZLibStream(System.IO.Stream stream, System.IO.Compression.CompressionMode mode) { }
public ZLibStream(System.IO.Stream stream, System.IO.Compression.CompressionMode mode, bool leaveOpen) { }
public System.IO.Stream BaseStream { get { throw null; } }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public override System.IAsyncResult BeginRead(byte[] array, int offset, int count, System.AsyncCallback? asyncCallback, object? asyncState) { throw null; }
public override System.IAsyncResult BeginWrite(byte[] array, int offset, int count, System.AsyncCallback? asyncCallback, object? asyncState) { throw null; }
public override void CopyTo(System.IO.Stream destination, int bufferSize) { }
public override System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, int bufferSize, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override void Dispose(bool disposing) { }
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
public override int EndRead(System.IAsyncResult asyncResult) { throw null; }
public override void EndWrite(System.IAsyncResult asyncResult) { }
public override void Flush() { }
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override int Read(byte[] array, int offset, int count) { throw null; }
public override int Read(System.Span<byte> buffer) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] array, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int ReadByte() { throw null; }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public override void Write(byte[] array, int offset, int count) { }
public override void Write(System.ReadOnlySpan<byte> buffer) { }
public override void WriteByte(byte value) { }
public override System.Threading.Tasks.Task WriteAsync(byte[] array, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
}
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>$(NetCoreAppCurrent)-Windows_NT;$(NetCoreAppCurrent)-Unix;$(NetCoreAppCurrent)-Browser</TargetFrameworks>
Expand Down Expand Up @@ -33,6 +33,7 @@
<Compile Include="System\IO\Compression\Crc32Helper.ZLib.cs" />
<Compile Include="System\IO\Compression\GZipStream.cs" />
<Compile Include="System\IO\Compression\PositionPreservingWriteOnlyStreamWrapper.cs" />
<Compile Include="System\IO\Compression\ZLibStream.cs" />
<Compile Include="$(CommonPath)System\IO\StreamHelpers.CopyValidation.cs"
Link="Common\System\IO\StreamHelpers.CopyValidation.cs" />
<Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs"
Expand Down
Expand Up @@ -162,7 +162,7 @@ private void EnsureNotDisposed()

private static void ThrowStreamClosedException()
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
throw new ObjectDisposedException(nameof(DeflateStream), SR.ObjectDisposed_StreamClosed);
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? asyncCallback, object? asyncState) =>
Expand Down
Expand Up @@ -320,7 +320,7 @@ private void EnsureNotDisposed()

private static void ThrowStreamClosedException()
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
throw new ObjectDisposedException(nameof(DeflateStream), SR.ObjectDisposed_StreamClosed);
}

private void EnsureDecompressionMode()
Expand Down
Expand Up @@ -114,6 +114,14 @@ public enum CompressionMethod : int
public const int Deflate_DefaultWindowBits = -15; // Legal values are 8..15 and -8..-15. 15 is the window size,
// negative val causes deflate to produce raw deflate data (no zlib header).

/// <summary>
/// <p><strong>From the ZLib manual:</strong></p>
/// <p>ZLib's <code>windowBits</code> parameter is the base two logarithm of the window size (the size of the history buffer).
/// It should be in the range 8..15 for this version of the library. Larger values of this parameter result in better compression
/// at the expense of memory usage. The default value is 15 if deflateInit is used instead.<br /></p>
/// </summary>
public const int ZLib_DefaultWindowBits = 15;

/// <summary>
/// <p>Zlib's <code>windowBits</code> parameter is the base two logarithm of the window size (the size of the history buffer).
/// For GZip header encoding, <code>windowBits</code> should be equal to a value between 8..15 (to specify Window Size) added to
Expand Down
Expand Up @@ -234,7 +234,7 @@ private void CheckDeflateStream()

private static void ThrowStreamClosedException()
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
throw new ObjectDisposedException(nameof(GZipStream), SR.ObjectDisposed_StreamClosed);
}
}
}

0 comments on commit d3beb60

Please sign in to comment.