Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix super block compression and stream raw blocks in decompression #1947

Merged
merged 1 commit into from
Jan 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 40 additions & 47 deletions lib/compress/zstd_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* Note that the result from this function is only compatible with the "normal"
* full-block strategy.
* When there are a lot of small blocks due to frequent flush in streaming mode
* or targetCBlockSize, the overhead of headers can make the compressed data to
* be larger than the return value of ZSTD_compressBound().
* the overhead of headers can make the compressed data to be larger than the
* return value of ZSTD_compressBound().
*/
size_t ZSTD_compressBound(size_t srcSize) {
return ZSTD_COMPRESSBOUND(srcSize);
Expand Down Expand Up @@ -2385,6 +2385,13 @@ static int ZSTD_isRLE(const BYTE *ip, size_t length) {
return 1;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize, U32 frame)
Expand Down Expand Up @@ -2435,10 +2442,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,

out:
if (!ZSTD_isError(cSize) && cSize > 1) {
/* confirm repcodes and entropy tables when emitting a compressed block */
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
ZSTD_confirmRepcodesAndEntropyTables(zc);
}
/* We check that dictionaries have offset codes available for the first
* block. After the first block, the offcode table might not have large
Expand All @@ -2450,57 +2454,45 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
return cSize;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_targetCBlockSize_body(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const size_t bss, U32 lastBlock)
{
DEBUGLOG(6, "Attempting ZSTD_compressSuperBlock()");
/* Attempt superblock compression and return early if successful */
if (bss == ZSTDbss_compress) {
/* Attempt superblock compression.
*
* Note that compressed size of ZSTD_compressSuperBlock() is not bound by the
* standard ZSTD_compressBound(). This is a problem, because even if we have
* space now, taking an extra byte now could cause us to run out of space later
* and violate ZSTD_compressBound().
*
* Define blockBound(blockSize) = blockSize + ZSTD_blockHeaderSize.
*
* In order to respect ZSTD_compressBound() we must attempt to emit a raw
* uncompressed block in these cases:
* * cSize == 0: Return code for an uncompressed block.
* * cSize == dstSize_tooSmall: We may have expanded beyond blockBound(srcSize).
* ZSTD_noCompressBlock() will return dstSize_tooSmall if we are really out of
* output space.
* * cSize >= blockBound(srcSize): We have expanded the block too much so
* emit an uncompressed block.
*/
size_t const cSize = ZSTD_compressSuperBlock(zc, dst, dstCapacity, lastBlock);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}

DEBUGLOG(6, "Attempting ZSTD_noCompressSuperBlock()");
/* Superblock compression failed, attempt to emit noCompress superblocks
* and return early if that is successful and we have enough room for checksum */
{
size_t const cSize = ZSTD_noCompressSuperBlock(dst, dstCapacity, src, srcSize, zc->appliedParams.targetCBlockSize, lastBlock);
if (cSize != ERROR(dstSize_tooSmall) && (dstCapacity - cSize) >= 4)
return cSize;
}

DEBUGLOG(6, "Attempting ZSTD_compressSequences() on superblock");
/* noCompress superblock emission failed. Attempt to compress normally
* and return early if that is successful */
{
size_t const cSize = ZSTD_compressSequences(&zc->seqStore,
&zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy,
&zc->appliedParams, (BYTE*)dst+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
srcSize, zc->entropyWorkspace, HUF_WORKSPACE_SIZE, zc->bmi2);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
U32 const cBlockHeader24 = lastBlock + (((U32)bt_compressed)<<1) + (U32)(cSize << 3);
MEM_writeLE24((BYTE*)dst, cBlockHeader24);
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize + ZSTD_blockHeaderSize;
if (cSize != ERROR(dstSize_tooSmall)) {
FORWARD_IF_ERROR(cSize);
if (cSize != 0 && cSize < srcSize + ZSTD_blockHeaderSize) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}
}

DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock() on superblock");
/* Everything failed. Just emit a regular noCompress block */
DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock()");
/* Superblock compression failed, attempt to emit a single no compress block.
* The decoder will be able to stream this block since it is uncompressed.
*/
return ZSTD_noCompressBlock(dst, dstCapacity, src, srcSize, lastBlock);
}

Expand Down Expand Up @@ -2593,6 +2585,8 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
if (ZSTD_useTargetCBlockSize(&cctx->appliedParams)) {
cSize = ZSTD_compressBlock_targetCBlockSize(cctx, op, dstCapacity, ip, blockSize, lastBlock);
FORWARD_IF_ERROR(cSize);
assert(cSize > 0);
assert(cSize <= blockSize + ZSTD_blockHeaderSize);
} else {
cSize = ZSTD_compressBlock_internal(cctx,
op+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
Expand Down Expand Up @@ -3796,7 +3790,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,

case zcss_load:
if ( (flushMode == ZSTD_e_end)
&& !ZSTD_useTargetCBlockSize(&zcs->appliedParams)
&& ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) /* enough dstCapacity */
&& (zcs->inBuffPos == 0) ) {
/* shortcut to compression pass directly into output buffer */
Expand Down
26 changes: 1 addition & 25 deletions lib/compress/zstd_compress_superblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ static size_t ZSTD_compressSubBlock_multi(const seqStore_t* seqStorePtr,
/* I think there is an optimization opportunity here.
* Calling ZSTD_estimateSubBlockSize for every sequence can be wasteful
* since it recalculates estimate from scratch.
* For example, it would recount literal distribution and symbol codes everytime.
* For example, it would recount literal distribution and symbol codes everytime.
*/
cBlockSizeEstimate = ZSTD_estimateSubBlockSize(lp, litSize, ofCodePtr, llCodePtr, mlCodePtr, seqCount,
entropy, entropyMetadata,
Expand Down Expand Up @@ -716,27 +716,3 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
zc->bmi2, lastBlock,
zc->entropyWorkspace, HUF_WORKSPACE_SIZE /* statically allocated in resetCCtx */);
}

size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock) {
const BYTE* const istart = (const BYTE*)src;
const BYTE* const iend = istart + srcSize;
const BYTE* ip = istart;
BYTE* const ostart = (BYTE*)dst;
BYTE* const oend = ostart + dstCapacity;
BYTE* op = ostart;
DEBUGLOG(5, "ZSTD_noCompressSuperBlock (dstCapacity=%zu, srcSize=%zu, targetCBlockSize=%zu)",
dstCapacity, srcSize, targetCBlockSize);
while (ip < iend) {
size_t remaining = iend-ip;
unsigned lastSubBlock = remaining <= targetCBlockSize;
size_t blockSize = lastSubBlock ? remaining : targetCBlockSize;
size_t cSize = ZSTD_noCompressBlock(op, oend-op, ip, blockSize, lastSubBlock && lastBlock);
FORWARD_IF_ERROR(cSize);
ip += blockSize;
op += cSize;
}
return op-ostart;
}
10 changes: 0 additions & 10 deletions lib/compress/zstd_compress_superblock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,4 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
unsigned lastBlock);

/* ZSTD_noCompressSuperBlock() :
* Used to break a super block into multiple uncompressed sub blocks
* when targetCBlockSize is being used.
* The given block will be broken into multiple uncompressed sub blocks that are
* around targetCBlockSize. */
size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock);

#endif /* ZSTD_COMPRESS_ADVANCED_H */
39 changes: 35 additions & 4 deletions lib/decompress/zstd_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,24 @@ size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t sr
****************************************/
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) { return dctx->expected; }

/**
* Similar to ZSTD_nextSrcSizeToDecompress(), but when when a block input can be streamed,
* we allow taking a partial block as the input. Currently only raw uncompressed blocks can
* be streamed.
*
* For blocks that can be streamed, this allows us to reduce the latency until we produce
* output, and avoid copying the input.
*
* @param inputSize - The total amount of input that the caller currently has.
*/
static size_t ZSTD_nextSrcSizeToDecompressWithInputSize(ZSTD_DCtx* dctx, size_t inputSize) {
if (!(dctx->stage == ZSTDds_decompressBlock || dctx->stage == ZSTDds_decompressLastBlock))
return dctx->expected;
if (dctx->bType != bt_raw)
return dctx->expected;
return MIN(MAX(inputSize, 1), dctx->expected);
}

ZSTD_nextInputType_e ZSTD_nextInputType(ZSTD_DCtx* dctx) {
switch(dctx->stage)
{
Expand Down Expand Up @@ -877,7 +895,7 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
{
DEBUGLOG(5, "ZSTD_decompressContinue (srcSize:%u)", (unsigned)srcSize);
/* Sanity check */
RETURN_ERROR_IF(srcSize != dctx->expected, srcSize_wrong, "not allowed");
RETURN_ERROR_IF(srcSize != ZSTD_nextSrcSizeToDecompressWithInputSize(dctx, srcSize), srcSize_wrong, "not allowed");
if (dstCapacity) ZSTD_checkContinuity(dctx, dst);

switch (dctx->stage)
Expand Down Expand Up @@ -944,22 +962,34 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
case bt_compressed:
DEBUGLOG(5, "ZSTD_decompressContinue: case bt_compressed");
rSize = ZSTD_decompressBlock_internal(dctx, dst, dstCapacity, src, srcSize, /* frame */ 1);
dctx->expected = 0; /* Streaming not supported */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note : it changes the meaning and role of dctx->expected.

Instead of being the amount of data of next block,
that a user must provide for decompression to happen,
it is now an indication of what it should provide, though less may also be authorized.
More importantly, this variable becomes a stage validator (it must be ==0 to move to next stage).

Nothing wrong with it, just, it's a fairly consequential change of meaning,
on a long-lived variable which is pretty close to global state,
so it deserves careful attention,
since there is a potential that it could mess with other parts of the code base where it's read or written.

break;
case bt_raw :
assert(srcSize <= dctx->expected);
rSize = ZSTD_copyRawBlock(dst, dstCapacity, src, srcSize);
FORWARD_IF_ERROR(rSize);
assert(rSize == srcSize);
dctx->expected -= rSize;
break;
case bt_rle :
rSize = ZSTD_setRleBlock(dst, dstCapacity, *(const BYTE*)src, dctx->rleSize);
dctx->expected = 0; /* Streaming not supported */
break;
case bt_reserved : /* should never happen */
default:
RETURN_ERROR(corruption_detected);
}
if (ZSTD_isError(rSize)) return rSize;
FORWARD_IF_ERROR(rSize);
RETURN_ERROR_IF(rSize > dctx->fParams.blockSizeMax, corruption_detected, "Decompressed Block Size Exceeds Maximum");
DEBUGLOG(5, "ZSTD_decompressContinue: decoded size from block : %u", (unsigned)rSize);
dctx->decodedSize += rSize;
if (dctx->fParams.checksumFlag) XXH64_update(&dctx->xxhState, dst, rSize);
dctx->previousDstEnd = (char*)dst + rSize;

/* Stay on the same stage until we are finished streaming the block. */
if (dctx->expected > 0) {
return rSize;
}

if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */
DEBUGLOG(4, "ZSTD_decompressContinue: decoded size from frame : %u", (unsigned)dctx->decodedSize);
Expand All @@ -977,7 +1007,6 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
} else {
dctx->stage = ZSTDds_decodeBlockHeader;
dctx->expected = ZSTD_blockHeaderSize;
dctx->previousDstEnd = (char*)dst + rSize;
}
return rSize;
}
Expand Down Expand Up @@ -1645,7 +1674,7 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB

case zdss_read:
DEBUGLOG(5, "stage zdss_read");
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompress(zds);
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip);
DEBUGLOG(5, "neededInSize = %u", (U32)neededInSize);
if (neededInSize==0) { /* end of frame */
zds->streamStage = zdss_init;
Expand Down Expand Up @@ -1673,6 +1702,8 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
size_t const toLoad = neededInSize - zds->inPos;
int const isSkipFrame = ZSTD_isSkipFrame(zds);
size_t loadedSize;
/* At this point we shouldn't be decompressing a block that we can stream. */
assert(neededInSize == ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip));
if (isSkipFrame) {
loadedSize = MIN(toLoad, (size_t)(iend-ip));
} else {
Expand Down
37 changes: 37 additions & 0 deletions tests/zstreamtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,43 @@ static int basicUnitTests(U32 seed, double compressibility)
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : raw block can be streamed: ", testNb++);
{ size_t const inputSize = 10000;
size_t const compCapacity = ZSTD_compressBound(inputSize);
BYTE* const input = (BYTE*)malloc(inputSize);
BYTE* const comp = (BYTE*)malloc(compCapacity);
BYTE* const decomp = (BYTE*)malloc(inputSize);

CHECK(input == NULL || comp == NULL || decomp == NULL, "failed to alloc buffers");

RDG_genBuffer(input, inputSize, 0.0, 0.0, seed);
{ size_t const compSize = ZSTD_compress(comp, compCapacity, input, inputSize, -(int)inputSize);
ZSTD_inBuffer in = { comp, 0, 0 };
ZSTD_outBuffer out = { decomp, 0, 0 };
CHECK_Z(compSize);
CHECK_Z( ZSTD_DCtx_reset(zd, ZSTD_reset_session_and_parameters) );
while (in.size < compSize) {
in.size = MIN(in.size + 100, compSize);
while (in.pos < in.size) {
size_t const outPos = out.pos;
if (out.pos == out.size) {
out.size = MIN(out.size + 10, inputSize);
}
CHECK_Z( ZSTD_decompressStream(zd, &out, &in) );
CHECK(!(out.pos > outPos), "We are not streaming (no output generated)");
}
}
CHECK(in.pos != compSize, "Not all input consumed!");
CHECK(out.pos != inputSize, "Not all output produced!");
}
CHECK(memcmp(input, decomp, inputSize), "round trip failed!");

free(input);
free(comp);
free(decomp);
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : dictionary + uncompressible block + reusing tables checks offset table validity: ", testNb++);
{ ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(
dictionary.start, dictionary.filled,
Expand Down