Skip to content

Commit

Permalink
Fix super block compression and stream raw blocks in decompression (#…
Browse files Browse the repository at this point in the history
…1947)

Super blocks must never violate the zstd block bound of input_size + ZSTD_blockHeaderSize. The individual sub-blocks may, but not the super block. If the superblock violates the block bound we are liable to violate ZSTD_compressBound(), which we must not do. Whenever the super block violates the block bound we instead emit an uncompressed block.

This means we increase the latency because of the single uncompressed block. I fix this by enabling streaming an uncompressed block, so the latency of an uncompressed block is 1 byte. This doesn't reduce the latency of the buffer-less API, but I don't think we really care.

* I added a test case that verifies that the decompression has 1 byte latency.
* I rely on existing zstreamtest / fuzzer / libfuzzer regression tests for correctness. During development I had several correctness bugs, and they easily caught them.
* The added assert that the superblock doesn't violate the block bound will help us discover any missed conditions (though I think I got them all).

Credit to OSS-Fuzz.
  • Loading branch information
terrelln committed Jan 11, 2020
1 parent f25a6e9 commit 036b30b
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 86 deletions.
87 changes: 40 additions & 47 deletions lib/compress/zstd_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* Note that the result from this function is only compatible with the "normal"
* full-block strategy.
* When there are a lot of small blocks due to frequent flush in streaming mode
* or targetCBlockSize, the overhead of headers can make the compressed data to
* be larger than the return value of ZSTD_compressBound().
* the overhead of headers can make the compressed data to be larger than the
* return value of ZSTD_compressBound().
*/
size_t ZSTD_compressBound(size_t srcSize) {
return ZSTD_COMPRESSBOUND(srcSize);
Expand Down Expand Up @@ -2385,6 +2385,13 @@ static int ZSTD_isRLE(const BYTE *ip, size_t length) {
return 1;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize, U32 frame)
Expand Down Expand Up @@ -2435,10 +2442,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,

out:
if (!ZSTD_isError(cSize) && cSize > 1) {
/* confirm repcodes and entropy tables when emitting a compressed block */
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
ZSTD_confirmRepcodesAndEntropyTables(zc);
}
/* We check that dictionaries have offset codes available for the first
* block. After the first block, the offcode table might not have large
Expand All @@ -2450,57 +2454,45 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
return cSize;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_targetCBlockSize_body(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const size_t bss, U32 lastBlock)
{
DEBUGLOG(6, "Attempting ZSTD_compressSuperBlock()");
/* Attempt superblock compression and return early if successful */
if (bss == ZSTDbss_compress) {
/* Attempt superblock compression.
*
* Note that compressed size of ZSTD_compressSuperBlock() is not bound by the
* standard ZSTD_compressBound(). This is a problem, because even if we have
* space now, taking an extra byte now could cause us to run out of space later
* and violate ZSTD_compressBound().
*
* Define blockBound(blockSize) = blockSize + ZSTD_blockHeaderSize.
*
* In order to respect ZSTD_compressBound() we must attempt to emit a raw
* uncompressed block in these cases:
* * cSize == 0: Return code for an uncompressed block.
* * cSize == dstSize_tooSmall: We may have expanded beyond blockBound(srcSize).
* ZSTD_noCompressBlock() will return dstSize_tooSmall if we are really out of
* output space.
* * cSize >= blockBound(srcSize): We have expanded the block too much so
* emit an uncompressed block.
*/
size_t const cSize = ZSTD_compressSuperBlock(zc, dst, dstCapacity, lastBlock);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}

DEBUGLOG(6, "Attempting ZSTD_noCompressSuperBlock()");
/* Superblock compression failed, attempt to emit noCompress superblocks
* and return early if that is successful and we have enough room for checksum */
{
size_t const cSize = ZSTD_noCompressSuperBlock(dst, dstCapacity, src, srcSize, zc->appliedParams.targetCBlockSize, lastBlock);
if (cSize != ERROR(dstSize_tooSmall) && (dstCapacity - cSize) >= 4)
return cSize;
}

DEBUGLOG(6, "Attempting ZSTD_compressSequences() on superblock");
/* noCompress superblock emission failed. Attempt to compress normally
* and return early if that is successful */
{
size_t const cSize = ZSTD_compressSequences(&zc->seqStore,
&zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy,
&zc->appliedParams, (BYTE*)dst+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
srcSize, zc->entropyWorkspace, HUF_WORKSPACE_SIZE, zc->bmi2);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
U32 const cBlockHeader24 = lastBlock + (((U32)bt_compressed)<<1) + (U32)(cSize << 3);
MEM_writeLE24((BYTE*)dst, cBlockHeader24);
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize + ZSTD_blockHeaderSize;
if (cSize != ERROR(dstSize_tooSmall)) {
FORWARD_IF_ERROR(cSize);
if (cSize != 0 && cSize < srcSize + ZSTD_blockHeaderSize) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}
}

DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock() on superblock");
/* Everything failed. Just emit a regular noCompress block */
DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock()");
/* Superblock compression failed, attempt to emit a single no compress block.
* The decoder will be able to stream this block since it is uncompressed.
*/
return ZSTD_noCompressBlock(dst, dstCapacity, src, srcSize, lastBlock);
}

Expand Down Expand Up @@ -2593,6 +2585,8 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
if (ZSTD_useTargetCBlockSize(&cctx->appliedParams)) {
cSize = ZSTD_compressBlock_targetCBlockSize(cctx, op, dstCapacity, ip, blockSize, lastBlock);
FORWARD_IF_ERROR(cSize);
assert(cSize > 0);
assert(cSize <= blockSize + ZSTD_blockHeaderSize);
} else {
cSize = ZSTD_compressBlock_internal(cctx,
op+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
Expand Down Expand Up @@ -3796,7 +3790,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,

case zcss_load:
if ( (flushMode == ZSTD_e_end)
&& !ZSTD_useTargetCBlockSize(&zcs->appliedParams)
&& ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) /* enough dstCapacity */
&& (zcs->inBuffPos == 0) ) {
/* shortcut to compression pass directly into output buffer */
Expand Down
26 changes: 1 addition & 25 deletions lib/compress/zstd_compress_superblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ static size_t ZSTD_compressSubBlock_multi(const seqStore_t* seqStorePtr,
/* I think there is an optimization opportunity here.
* Calling ZSTD_estimateSubBlockSize for every sequence can be wasteful
* since it recalculates estimate from scratch.
* For example, it would recount literal distribution and symbol codes everytime.
* For example, it would recount literal distribution and symbol codes everytime.
*/
cBlockSizeEstimate = ZSTD_estimateSubBlockSize(lp, litSize, ofCodePtr, llCodePtr, mlCodePtr, seqCount,
entropy, entropyMetadata,
Expand Down Expand Up @@ -716,27 +716,3 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
zc->bmi2, lastBlock,
zc->entropyWorkspace, HUF_WORKSPACE_SIZE /* statically allocated in resetCCtx */);
}

size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock) {
const BYTE* const istart = (const BYTE*)src;
const BYTE* const iend = istart + srcSize;
const BYTE* ip = istart;
BYTE* const ostart = (BYTE*)dst;
BYTE* const oend = ostart + dstCapacity;
BYTE* op = ostart;
DEBUGLOG(5, "ZSTD_noCompressSuperBlock (dstCapacity=%zu, srcSize=%zu, targetCBlockSize=%zu)",
dstCapacity, srcSize, targetCBlockSize);
while (ip < iend) {
size_t remaining = iend-ip;
unsigned lastSubBlock = remaining <= targetCBlockSize;
size_t blockSize = lastSubBlock ? remaining : targetCBlockSize;
size_t cSize = ZSTD_noCompressBlock(op, oend-op, ip, blockSize, lastSubBlock && lastBlock);
FORWARD_IF_ERROR(cSize);
ip += blockSize;
op += cSize;
}
return op-ostart;
}
10 changes: 0 additions & 10 deletions lib/compress/zstd_compress_superblock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,4 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
unsigned lastBlock);

/* ZSTD_noCompressSuperBlock() :
* Used to break a super block into multiple uncompressed sub blocks
* when targetCBlockSize is being used.
* The given block will be broken into multiple uncompressed sub blocks that are
* around targetCBlockSize. */
size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock);

#endif /* ZSTD_COMPRESS_ADVANCED_H */
39 changes: 35 additions & 4 deletions lib/decompress/zstd_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,24 @@ size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t sr
****************************************/
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) { return dctx->expected; }

/**
* Similar to ZSTD_nextSrcSizeToDecompress(), but when when a block input can be streamed,
* we allow taking a partial block as the input. Currently only raw uncompressed blocks can
* be streamed.
*
* For blocks that can be streamed, this allows us to reduce the latency until we produce
* output, and avoid copying the input.
*
* @param inputSize - The total amount of input that the caller currently has.
*/
static size_t ZSTD_nextSrcSizeToDecompressWithInputSize(ZSTD_DCtx* dctx, size_t inputSize) {
if (!(dctx->stage == ZSTDds_decompressBlock || dctx->stage == ZSTDds_decompressLastBlock))
return dctx->expected;
if (dctx->bType != bt_raw)
return dctx->expected;
return MIN(MAX(inputSize, 1), dctx->expected);
}

ZSTD_nextInputType_e ZSTD_nextInputType(ZSTD_DCtx* dctx) {
switch(dctx->stage)
{
Expand Down Expand Up @@ -877,7 +895,7 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
{
DEBUGLOG(5, "ZSTD_decompressContinue (srcSize:%u)", (unsigned)srcSize);
/* Sanity check */
RETURN_ERROR_IF(srcSize != dctx->expected, srcSize_wrong, "not allowed");
RETURN_ERROR_IF(srcSize != ZSTD_nextSrcSizeToDecompressWithInputSize(dctx, srcSize), srcSize_wrong, "not allowed");
if (dstCapacity) ZSTD_checkContinuity(dctx, dst);

switch (dctx->stage)
Expand Down Expand Up @@ -944,22 +962,34 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
case bt_compressed:
DEBUGLOG(5, "ZSTD_decompressContinue: case bt_compressed");
rSize = ZSTD_decompressBlock_internal(dctx, dst, dstCapacity, src, srcSize, /* frame */ 1);
dctx->expected = 0; /* Streaming not supported */
break;
case bt_raw :
assert(srcSize <= dctx->expected);
rSize = ZSTD_copyRawBlock(dst, dstCapacity, src, srcSize);
FORWARD_IF_ERROR(rSize);
assert(rSize == srcSize);
dctx->expected -= rSize;
break;
case bt_rle :
rSize = ZSTD_setRleBlock(dst, dstCapacity, *(const BYTE*)src, dctx->rleSize);
dctx->expected = 0; /* Streaming not supported */
break;
case bt_reserved : /* should never happen */
default:
RETURN_ERROR(corruption_detected);
}
if (ZSTD_isError(rSize)) return rSize;
FORWARD_IF_ERROR(rSize);
RETURN_ERROR_IF(rSize > dctx->fParams.blockSizeMax, corruption_detected, "Decompressed Block Size Exceeds Maximum");
DEBUGLOG(5, "ZSTD_decompressContinue: decoded size from block : %u", (unsigned)rSize);
dctx->decodedSize += rSize;
if (dctx->fParams.checksumFlag) XXH64_update(&dctx->xxhState, dst, rSize);
dctx->previousDstEnd = (char*)dst + rSize;

/* Stay on the same stage until we are finished streaming the block. */
if (dctx->expected > 0) {
return rSize;
}

if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */
DEBUGLOG(4, "ZSTD_decompressContinue: decoded size from frame : %u", (unsigned)dctx->decodedSize);
Expand All @@ -977,7 +1007,6 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
} else {
dctx->stage = ZSTDds_decodeBlockHeader;
dctx->expected = ZSTD_blockHeaderSize;
dctx->previousDstEnd = (char*)dst + rSize;
}
return rSize;
}
Expand Down Expand Up @@ -1645,7 +1674,7 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB

case zdss_read:
DEBUGLOG(5, "stage zdss_read");
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompress(zds);
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip);
DEBUGLOG(5, "neededInSize = %u", (U32)neededInSize);
if (neededInSize==0) { /* end of frame */
zds->streamStage = zdss_init;
Expand Down Expand Up @@ -1673,6 +1702,8 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
size_t const toLoad = neededInSize - zds->inPos;
int const isSkipFrame = ZSTD_isSkipFrame(zds);
size_t loadedSize;
/* At this point we shouldn't be decompressing a block that we can stream. */
assert(neededInSize == ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip));
if (isSkipFrame) {
loadedSize = MIN(toLoad, (size_t)(iend-ip));
} else {
Expand Down
37 changes: 37 additions & 0 deletions tests/zstreamtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,43 @@ static int basicUnitTests(U32 seed, double compressibility)
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : raw block can be streamed: ", testNb++);
{ size_t const inputSize = 10000;
size_t const compCapacity = ZSTD_compressBound(inputSize);
BYTE* const input = (BYTE*)malloc(inputSize);
BYTE* const comp = (BYTE*)malloc(compCapacity);
BYTE* const decomp = (BYTE*)malloc(inputSize);

CHECK(input == NULL || comp == NULL || decomp == NULL, "failed to alloc buffers");

RDG_genBuffer(input, inputSize, 0.0, 0.0, seed);
{ size_t const compSize = ZSTD_compress(comp, compCapacity, input, inputSize, -(int)inputSize);
ZSTD_inBuffer in = { comp, 0, 0 };
ZSTD_outBuffer out = { decomp, 0, 0 };
CHECK_Z(compSize);
CHECK_Z( ZSTD_DCtx_reset(zd, ZSTD_reset_session_and_parameters) );
while (in.size < compSize) {
in.size = MIN(in.size + 100, compSize);
while (in.pos < in.size) {
size_t const outPos = out.pos;
if (out.pos == out.size) {
out.size = MIN(out.size + 10, inputSize);
}
CHECK_Z( ZSTD_decompressStream(zd, &out, &in) );
CHECK(!(out.pos > outPos), "We are not streaming (no output generated)");
}
}
CHECK(in.pos != compSize, "Not all input consumed!");
CHECK(out.pos != inputSize, "Not all output produced!");
}
CHECK(memcmp(input, decomp, inputSize), "round trip failed!");

free(input);
free(comp);
free(decomp);
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : dictionary + uncompressible block + reusing tables checks offset table validity: ", testNb++);
{ ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(
dictionary.start, dictionary.filled,
Expand Down

0 comments on commit 036b30b

Please sign in to comment.