Skip to content

Commit

Permalink
fix(zip): fully implement async deflate (#813)
Browse files Browse the repository at this point in the history
  • Loading branch information
piksel committed Dec 16, 2022
1 parent c4009fd commit 68e2f92
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/ICSharpCode.SharpZipLib/GZip/GzipInputStream.cs
Expand Up @@ -334,7 +334,7 @@ private void ReadFooter()
int crcval = (footer[0] & 0xff) | ((footer[1] & 0xff) << 8) | ((footer[2] & 0xff) << 16) | (footer[3] << 24);
if (crcval != (int)crc.Value)
{
throw new GZipException("GZIP crc sum mismatch, theirs \"" + crcval + "\" and ours \"" + (int)crc.Value);
throw new GZipException($"GZIP crc sum mismatch, theirs \"{crcval:x8}\" and ours \"{(int)crc.Value:x8}\"");
}

// NOTE The total here is the original total modulo 2 ^ 32.
Expand Down
70 changes: 52 additions & 18 deletions src/ICSharpCode.SharpZipLib/GZip/GzipOutputStream.cs
Expand Up @@ -138,6 +138,11 @@ public string FileName
}
}

/// <summary>
/// If defined, will use this time instead of the current for the output header
/// </summary>
public DateTime? ModifiedTime { get; set; }

#endregion Public API

#region Stream overrides
Expand All @@ -149,21 +154,47 @@ public string FileName
/// <param name="offset">Offset of first byte in buf to write</param>
/// <param name="count">Number of bytes to write</param>
public override void Write(byte[] buffer, int offset, int count)
=> WriteSyncOrAsync(buffer, offset, count, null).GetAwaiter().GetResult();

private async Task WriteSyncOrAsync(byte[] buffer, int offset, int count, CancellationToken? ct)
{
if (state_ == OutputState.Header)
{
WriteHeader();
if (ct.HasValue)
{
await WriteHeaderAsync(ct.Value).ConfigureAwait(false);
}
else
{
WriteHeader();
}
}

if (state_ != OutputState.Footer)
{
throw new InvalidOperationException("Write not permitted in current state");
}


crc.Update(new ArraySegment<byte>(buffer, offset, count));
base.Write(buffer, offset, count);

if (ct.HasValue)
{
await base.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
base.Write(buffer, offset, count);
}
}

/// <summary>
/// Asynchronously write given buffer to output updating crc
/// </summary>
/// <param name="buffer">Buffer to write</param>
/// <param name="offset">Offset of first byte in buf to write</param>
/// <param name="count">Number of bytes to write</param>
/// <param name="ct">The token to monitor for cancellation requests</param>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
=> await WriteSyncOrAsync(buffer, offset, count, ct).ConfigureAwait(false);

/// <summary>
/// Writes remaining compressed output data to the output stream
/// and closes it.
Expand All @@ -187,7 +218,7 @@ protected override void Dispose(bool disposing)
}
}

#if NETSTANDARD2_1_OR_GREATER
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_1_OR_GREATER
/// <inheritdoc cref="DeflaterOutputStream.Dispose"/>
public override async ValueTask DisposeAsync()
{
Expand Down Expand Up @@ -225,6 +256,16 @@ public override void Flush()
base.Flush();
}

/// <inheritdoc cref="Flush"/>
public override async Task FlushAsync(CancellationToken ct)
{
if (state_ == OutputState.Header)
{
await WriteHeaderAsync(ct).ConfigureAwait(false);
}
await base.FlushAsync(ct).ConfigureAwait(false);
}

#endregion Stream overrides

#region DeflaterOutputStream overrides
Expand All @@ -249,21 +290,13 @@ public override void Finish()
}
}

/// <inheritdoc cref="Flush"/>
public override async Task FlushAsync(CancellationToken ct)
{
await WriteHeaderAsync().ConfigureAwait(false);
await base.FlushAsync(ct).ConfigureAwait(false);
}


/// <inheritdoc cref="Finish"/>
public override async Task FinishAsync(CancellationToken ct)
{
// If no data has been written a header should be added.
if (state_ == OutputState.Header)
{
await WriteHeaderAsync().ConfigureAwait(false);
await WriteHeaderAsync(ct).ConfigureAwait(false);
}

if (state_ == OutputState.Footer)
Expand Down Expand Up @@ -305,7 +338,8 @@ private byte[] GetFooter()

private byte[] GetHeader()
{
var modTime = (int)((DateTime.Now.Ticks - new DateTime(1970, 1, 1).Ticks) / 10000000L); // Ticks give back 100ns intervals
var modifiedUtc = ModifiedTime?.ToUniversalTime() ?? DateTime.UtcNow;
var modTime = (int)((modifiedUtc - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).Ticks / 10000000L); // Ticks give back 100ns intervals
byte[] gzipHeader = {
// The two magic bytes
GZipConstants.ID1,
Expand Down Expand Up @@ -351,12 +385,12 @@ private void WriteHeader()
baseOutputStream_.Write(gzipHeader, 0, gzipHeader.Length);
}

private async Task WriteHeaderAsync()
private async Task WriteHeaderAsync(CancellationToken ct)
{
if (state_ != OutputState.Header) return;
state_ = OutputState.Footer;
var gzipHeader = GetHeader();
await baseOutputStream_.WriteAsync(gzipHeader, 0, gzipHeader.Length).ConfigureAwait(false);
await baseOutputStream_.WriteAsync(gzipHeader, 0, gzipHeader.Length, ct).ConfigureAwait(false);
}

#endregion Support Routines
Expand Down
Expand Up @@ -240,11 +240,9 @@ protected void EncryptBlock(byte[] buffer, int offset, int length)
/// are processed.
/// </summary>
protected void Deflate()
{
Deflate(false);
}
=> DeflateSyncOrAsync(false, null).GetAwaiter().GetResult();

private void Deflate(bool flushing)
private async Task DeflateSyncOrAsync(bool flushing, CancellationToken? ct)
{
while (flushing || !deflater_.IsNeedingInput)
{
Expand All @@ -257,7 +255,14 @@ private void Deflate(bool flushing)

EncryptBlock(buffer_, 0, deflateCount);

baseOutputStream_.Write(buffer_, 0, deflateCount);
if (ct.HasValue)
{
await baseOutputStream_.WriteAsync(buffer_, 0, deflateCount, ct.Value).ConfigureAwait(false);
}
else
{
baseOutputStream_.Write(buffer_, 0, deflateCount);
}
}

if (!deflater_.IsNeedingInput)
Expand Down Expand Up @@ -383,10 +388,18 @@ public override int Read(byte[] buffer, int offset, int count)
public override void Flush()
{
deflater_.Flush();
Deflate(true);
DeflateSyncOrAsync(true, null).GetAwaiter().GetResult();
baseOutputStream_.Flush();
}

/// <inheritdoc/>
public override async Task FlushAsync(CancellationToken cancellationToken)
{
deflater_.Flush();
await DeflateSyncOrAsync(true, cancellationToken).ConfigureAwait(false);
await baseOutputStream_.FlushAsync(cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Calls <see cref="Finish"/> and closes the underlying
/// stream when <see cref="IsStreamOwner"></see> is true.
Expand Down Expand Up @@ -491,6 +504,13 @@ public override void Write(byte[] buffer, int offset, int count)
Deflate();
}

/// <inheritdoc />
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
{
deflater_.SetInput(buffer, offset, count);
await DeflateSyncOrAsync(false, ct).ConfigureAwait(false);
}

#endregion Stream Overrides

#region Instance Fields
Expand Down
35 changes: 27 additions & 8 deletions src/ICSharpCode.SharpZipLib/Zip/ZipOutputStream.cs
Expand Up @@ -552,7 +552,7 @@ public async Task PutNextEntryAsync(ZipEntry entry, CancellationToken ct = defau
public void CloseEntry()
{
// Note: This method will run synchronously
FinishCompression(null).Wait();
FinishCompressionSyncOrAsync(null).GetAwaiter().GetResult();
WriteEntryFooter(baseOutputStream_);

// Patch the header if possible
Expand All @@ -566,7 +566,7 @@ public void CloseEntry()
curEntry = null;
}

private async Task FinishCompression(CancellationToken? ct)
private async Task FinishCompressionSyncOrAsync(CancellationToken? ct)
{
// Compression handled externally
if (entryIsPassthrough) return;
Expand Down Expand Up @@ -600,7 +600,7 @@ private async Task FinishCompression(CancellationToken? ct)
/// <inheritdoc cref="CloseEntry"/>
public async Task CloseEntryAsync(CancellationToken ct)
{
await FinishCompression(ct).ConfigureAwait(false);
await FinishCompressionSyncOrAsync(ct).ConfigureAwait(false);
await baseOutputStream_.WriteProcToStreamAsync(WriteEntryFooter, ct).ConfigureAwait(false);

// Patch the header if possible
Expand Down Expand Up @@ -767,9 +767,7 @@ private byte[] CreateZipCryptoHeader(long crcValue)
private void InitializeZipCryptoPassword(string password)
{
var pkManaged = new PkzipClassicManaged();
Console.WriteLine($"Output Encoding: {ZipCryptoEncoding.EncodingName}");
byte[] key = PkzipClassic.GenerateKeys(ZipCryptoEncoding.GetBytes(password));
Console.WriteLine($"Output Bytes: {string.Join(", ", key.Select(b => $"{b:x2}").ToArray())}");
cryptoTransform_ = pkManaged.CreateEncryptor(key, null);
}

Expand All @@ -782,6 +780,13 @@ private void InitializeZipCryptoPassword(string password)
/// <exception cref="ZipException">Archive size is invalid</exception>
/// <exception cref="System.InvalidOperationException">No entry is active.</exception>
public override void Write(byte[] buffer, int offset, int count)
=> WriteSyncOrAsync(buffer, offset, count, null).GetAwaiter().GetResult();

/// <inheritdoc />
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
=> await WriteSyncOrAsync(buffer, offset, count, ct).ConfigureAwait(false);

private async Task WriteSyncOrAsync(byte[] buffer, int offset, int count, CancellationToken? ct)
{
if (curEntry == null)
{
Expand Down Expand Up @@ -816,20 +821,34 @@ public override void Write(byte[] buffer, int offset, int count)

size += count;

if(curMethod == CompressionMethod.Stored || entryIsPassthrough)
if (curMethod == CompressionMethod.Stored || entryIsPassthrough)
{
if (Password != null)
{
CopyAndEncrypt(buffer, offset, count);
}
else
{
baseOutputStream_.Write(buffer, offset, count);
if (ct.HasValue)
{
await baseOutputStream_.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
baseOutputStream_.Write(buffer, offset, count);
}
}
}
else
{
base.Write(buffer, offset, count);
if (ct.HasValue)
{
await base.WriteAsync(buffer, offset, count, ct.Value).ConfigureAwait(false);
}
else
{
base.Write(buffer, offset, count);
}
}
}

Expand Down
48 changes: 45 additions & 3 deletions test/ICSharpCode.SharpZipLib.Tests/GZip/GZipAsyncTests.cs
@@ -1,4 +1,5 @@
using System.IO;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using ICSharpCode.SharpZipLib.GZip;
Expand All @@ -7,8 +8,6 @@

namespace ICSharpCode.SharpZipLib.Tests.GZip
{


[TestFixture]
public class GZipAsyncTests
{
Expand Down Expand Up @@ -140,5 +139,48 @@ await using (var outStream = new GZipOutputStream(ms) { IsStreamOwner = false })
Assert.IsEmpty(content);
}
}

[Test]
[Category("GZip")]
[Category("Async")]
public async Task WriteGZipStreamToAsyncOnlyStream()
{
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
var content = Encoding.ASCII.GetBytes("a");
var modTime = DateTime.UtcNow;

await using (var msAsync = new MemoryStreamWithoutSync())
{
await using (var outStream = new GZipOutputStream(msAsync) { IsStreamOwner = false })
{
outStream.ModifiedTime = modTime;
await outStream.WriteAsync(content);
}

using var msSync = new MemoryStream();
using (var outStream = new GZipOutputStream(msSync) { IsStreamOwner = false })
{
outStream.ModifiedTime = modTime;
outStream.Write(content);
}

var syncBytes = string.Join(' ', msSync.ToArray());
var asyncBytes = string.Join(' ', msAsync.ToArray());

Assert.AreEqual(syncBytes, asyncBytes, "Sync and Async compressed streams are not equal");

// Since GZipInputStream isn't async yet we need to read from it from a regular MemoryStream
using (var readStream = new MemoryStream(msAsync.ToArray()))
using (var inStream = new GZipInputStream(readStream))
using (var reader = new StreamReader(inStream))
{
Assert.AreEqual(content, await reader.ReadToEndAsync());
}
}
#else
await Task.CompletedTask;
Assert.Ignore("AsyncDispose is not supported");
#endif
}
}
}
8 changes: 5 additions & 3 deletions test/ICSharpCode.SharpZipLib.Tests/Zip/ZipStreamAsyncTests.cs
Expand Up @@ -124,17 +124,19 @@ public async Task WriteReadOnlyZipStreamAsync ()
[Test]
[Category("Zip")]
[Category("Async")]
public async Task WriteZipStreamToAsyncOnlyStream ()
[TestCase(12, Description = "Small files")]
[TestCase(12000, Description = "Large files")]
public async Task WriteZipStreamToAsyncOnlyStream (int fileSize)
{
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
await using(var ms = new MemoryStreamWithoutSync()){
await using(var outStream = new ZipOutputStream(ms) { IsStreamOwner = false })
{
await outStream.PutNextEntryAsync(new ZipEntry("FirstFile"));
await Utils.WriteDummyDataAsync(outStream, 12);
await Utils.WriteDummyDataAsync(outStream, fileSize);

await outStream.PutNextEntryAsync(new ZipEntry("SecondFile"));
await Utils.WriteDummyDataAsync(outStream, 12);
await Utils.WriteDummyDataAsync(outStream, fileSize);

await outStream.FinishAsync(CancellationToken.None);
await outStream.DisposeAsync();
Expand Down

0 comments on commit 68e2f92

Please sign in to comment.