Skip to content
Permalink
Browse files

[libzstd] Fix ZSTD_compress2() for multithreaded compression

`ZSTD_compress2()` wouldn't wait for multithreaded compression to
finish. We didn't find this because ZSTDMT will block when it can
compress all in one go, but it can't do that if it doesn't have enough
output space, or if `ZSTD_c_rsyncable` is enabled.

Since we will already sometimes block when using `ZSTD_e_end`, I've
changed `ZSTD_e_end` and `ZSTD_e_flush` to guarantee maximum forward
progress. This simplifies the API, and helps users avoid the easy bug
that was made in `ZSTD_compress2()`

* Found by the libfuzzer fuzzers.
* Added a test case that catches the problem.
* I will make the fuzzers sometimes allocate less than
  `ZSTD_compressBound()` output space.
  • Loading branch information...
terrelln committed Apr 9, 2019
1 parent 7a1fde2 commit 48a6427d22f290157b8acc3f7c03c0f762a768be
Showing with 40 additions and 6 deletions.
  1. +14 −4 lib/compress/zstd_compress.c
  2. +13 −2 lib/zstd.h
  3. +13 −0 tests/fuzzer.c
@@ -4179,18 +4179,28 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
/* compression stage */
#ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbWorkers > 0) {
int const forceMaxProgress = (endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
size_t flushMin;
assert(forceMaxProgress || endOp == ZSTD_e_continue /* Protection for a new flush type */);
if (cctx->cParamsChanged) {
ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
cctx->cParamsChanged = 0;
}
{ size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
do {
flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
}
DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
return flushMin;
} }
FORWARD_IF_ERROR(flushMin);
} while (forceMaxProgress && flushMin != 0 && output->pos < output->size);
DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
/* Either we don't require maximum forward progress, we've finished the
* flush, or we are out of output space.
*/
assert(!forceMaxProgress || flushMin == 0 || output->pos == output->size);
return flushMin;
}
#endif
FORWARD_IF_ERROR( ZSTD_compressStream_generic(cctx, output, input, endOp) );
DEBUGLOG(5, "completed ZSTD_compressStream2");
@@ -577,6 +577,11 @@ typedef struct ZSTD_outBuffer_s {
* The caller must check if input has been entirely consumed.
* If not, the caller must make some room to receive more compressed data,
* and then present again remaining input data.
* note: ZSTD_e_continue is guaranteed to make some forward progress when called,
* but doesn't guarantee maximal forward progress. This is especially relevant
* when compressing with multiple threads. The call won't block if it can
* consume some input, but if it can't it will wait for some, but not all,
* output to be flushed.
* @return : provides a minimum amount of data remaining to be flushed from internal buffers
* or an error code, which can be tested using ZSTD_isError().
*
@@ -586,6 +591,8 @@ typedef struct ZSTD_outBuffer_s {
* In which case, make some room to receive more compressed data, and call again ZSTD_compressStream2() with ZSTD_e_flush.
* You must continue calling ZSTD_compressStream2() with ZSTD_e_flush until it returns 0, at which point you can change the
* operation.
* note: ZSTD_e_flush will flush as much output as possible, meaning when compressing with multiple threads, it will
* block until the flush is complete or the output buffer is full.
* @return : 0 if internal buffers are entirely flushed,
* >0 if some data still present within internal buffer (the value is minimal estimation of remaining size),
* or an error code, which can be tested using ZSTD_isError().
@@ -596,6 +603,8 @@ typedef struct ZSTD_outBuffer_s {
* flush operation is the same, and follows same rules as calling ZSTD_compressStream2() with ZSTD_e_flush.
* You must continue calling ZSTD_compressStream2() with ZSTD_e_end until it returns 0, at which point you are free to
* start a new frame.
* note: ZSTD_e_end will flush as much output as possible, meaning when compressing with multiple threads, it will
* block until the flush is complete or the output buffer is full.
* @return : 0 if frame fully completed and fully flushed,
* >0 if some data still present within internal buffer (the value is minimal estimation of remaining size),
* or an error code, which can be tested using ZSTD_isError().
@@ -613,11 +622,13 @@ typedef enum {
ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal compression ratio */
ZSTD_e_flush=1, /* flush any data provided so far,
* it creates (at least) one new block, that can be decoded immediately on reception;
* frame will continue: any future data can still reference previously compressed data, improving compression. */
* frame will continue: any future data can still reference previously compressed data, improving compression.
* note : multithreaded compression will block to flush as much output as possible. */
ZSTD_e_end=2 /* flush any remaining data _and_ close current frame.
* note that frame is only closed after compressed data is fully flushed (return value == 0).
* After that point, any additional data starts a new frame.
* note : each frame is independent (does not reference any content from previous frame). */
* note : each frame is independent (does not reference any content from previous frame).
: note : multithreaded compression will block to flush as much output as possible. */
} ZSTD_EndDirective;

/*! ZSTD_compressStream2() :
@@ -880,6 +880,19 @@ static int basicUnitTests(U32 seed, double compressibility)
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : Multithreaded ZSTD_compress2() with rsyncable : ", testNb++)
{ ZSTD_CCtx* cctx = ZSTD_createCCtx();
/* Set rsyncable and don't give the ZSTD_compressBound(CNBuffSize) so
* ZSTDMT is forced to not take the shortcut.
*/
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 1) );
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, 1) );
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_rsyncable, 1) );
CHECK( ZSTD_compress2(cctx, compressedBuffer, compressedBufferSize - 1, CNBuffer, CNBuffSize) );
ZSTD_freeCCtx(cctx);
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : setting multithreaded parameters : ", testNb++)
{ ZSTD_CCtx_params* params = ZSTD_createCCtxParams();
int value;

0 comments on commit 48a6427

Please sign in to comment.
You can’t perform that action at this time.