Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive compression #1327

Merged
merged 29 commits into from
Sep 26, 2018
Merged

Adaptive compression #1327

merged 29 commits into from
Sep 26, 2018

Conversation

Cyan4973
Copy link
Contributor

@Cyan4973 Cyan4973 commented Sep 20, 2018

This patch brings adaptive compression level driven by I/O conditions,
provided by command --adapt.
This is a long awaited feature request (#75).

This a pure CLI feature, and implemented into fileio.c,
though it also forced the creation or update of some reporting functions inside the library
to properly track MT progresses.

Some notes on current behavior :

  • --adapt is currently incompatible with --single-thread mode : since it's based on comparing I/O speed with compression speed, both operations must be performed in parallel, hence at least 1 worker (default mode)
  • In single worker mode, adaptation works well, in both direction.
    • but adaptation is damn slow when compression level is very high
  • In multi-workers mode, accelerating works fine, as well as lightly correcting speed when close to cruise regime
    • however, when starting from a way too slow speed, it's not able to "correct" that and remains stuck at low speed

So it's far from perfect.

Issue is, I suspect a major engine update would be necessary to improve this behavior.

Major issues facing the current mechanism are latency, and I/O bursts.

To smooth I/O bursts, adaptation only happens when at least one job is completed.
But one job is 32 MB by default, so at level 19, it's very slow.

Even then, it takes time before the order to accelerate goes through the pipeline and takes effect, due to existing jobs, stacked buffers and decisions. And a problem is : during that time, statistics will be collected and believe they are measuring speed for the new parameters, resulting in over correction.
So, to tame that, correction is limited to one level per job.

Both issues combined cannot be compensated well at very slow speeds, hence the issue with acceleration when starting from a too low speed.

A better engine would need to :

  • ensure its new parameters are being applied immediately, or almost immediately
    • if necessary, by cancelling or shortening prior jobs stacked in the queue
  • ensure it knows which parameters it is measuring
  • compensate for the "queue of running jobs" in multi-threading mode, typically by measuring only the speed of the last job, and extrapolating from there.
  • even then, it will suffer from I/O bursts, and require some smoothing capabilities

These modifications look complex, and I'm not sure they can be completed on time for next release.

So these leaves us with 2 possibilities :

  • merge the feature as it is today, warn about its limitations, improve later.
  • do not merge now, improve the feature first

I suspect solution 1 is preferable.
The current algorithm nonetheless works correctly as long as target speed is "fast enough", which should be the case for most datacenter connections.
It will bring some use cases, provide some insights, give us opportunities to learn to improve the feature more accurately.

Possible follow-up :

  • set a min and max level, to ensure the adaptation mechanism does not drift too far in either direction.

Notes on tests :

  • while the new command --adapt can be added to the test suite, verifying that it indeed adapts properly is in another complexity league. Such tests are currently done manually, using typically pv and tests/rateLimiter.py.

this rate limiter avoid the problem of `pv`
which "catch up" after a blocked period
instead of preserving a constant speed cap.
resists better to changing in/out conditions
limits risks of "catching up"
safer : this parameter is read-only,
we don't want original structure to be modified
only slows down when all buffers are full
tells in a non-blocking way if there is something ready to flush right now.
only works with multi-threading for the time being.

Useful to know if flush speed will be limited by lack of production.
within newly added ZSD_toFlushNow()
so that --adapt can work on multiple input files too
and added `--adapt` test in `playTests.sh`
when job->consumed == job->src.size , compression job is presumed completed,
so it must be the very last action done in worker thread.
@@ -711,7 +713,12 @@ void ZSTDMT_compressionJob(void* jobDescription)
assert(job->cSize == 0);
for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
if (ZSTD_isError(cSize)) {
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few more places that set job->cSize without a lock. Maybe make a helper function to lock, set the csize, and unlock.

@@ -1615,6 +1657,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
DEBUGLOG(5, "dstBuffer released")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a semicolon.

@@ -1834,8 +1880,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* It is only possible for this operation to fail if there are
* still compression jobs ongoing.
*/
DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semicolon.

assert(zfp.produced >= previous_zfp_update.produced);
assert(g_nbWorkers >= 1);

if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this condition suggest that compression is too slow? When would this happen if compression is too fast?

Copy link
Contributor Author

@Cyan4973 Cyan4973 Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the general case, yes : if ingestion is blocked, it means input is faster than compression can consume.

But this case is a bit different : we are actually testing if both compression and input are blocked because output speed is too slow, resulting in all buffers being full.

In which case, it doesn't matter that input is faster : compression should nonetheless slow down in an attempt to match output speed.

I shall add a comment to explain the nature of this test.
Also, thinking about it again, I may not need to check ingestion rate. The test could work too with just consumption rate and nbActiveWorkers (which is a recently added indicator).
I will make more tests to check that hypothesis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of remember now that controlling ingestion rate was meant to cover the initial part,
when the compressor is waiting for at least one job to be filled,
and cannot even start as long as the first buffer is not completely filled.

In such a case, nbActiveWorkers = 0 and consumptionRate == 0. But it doesn't mean that output speed is too slow. It's actually the reverse situation : input speed is too slow.

Note that the outcome is still the same : compression must slow down.

But input speed is too slow compared to what ?
While waiting for the first job, it's only compared to the notification update rate.
We don't know yet if input speed is too slow compared to compression speed.
If initial compression level is set to 19, it might very well be fast enough.
It's just we did not had a chance yet to compare both.

So, in order to avoid triggering a "slow down" order while waiting for the first job to fill up, I added a test looking at ingestion rate.
Since then, test order has been changed, and now this test is behind an if (zfp.currentJobID > 1) condition, which means the first job has long been started.

Now, a situation where all buffers are empty, resulting in compression waiting for more input, may still happen in the future : we could imagine a sudden prolonged drop in input speed. In which case, measuring consumptionRate and nbActiveWorkers will trigger this condition.
But this time, this is really because input is too slow. So we can slow down compression.

Hence this test can be simplified by removing the ingestionRate part.


if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */
&& (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the previous 2 checks, or is this one sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's useful to check consumptionRate.

zfp.nbActiveWorkers is the current nb of active workers, at the moment update is done.
It doesn't tell that "no compression happened" since last update.
Maybe some compression happened and was completed, and at the exact moment of the update, there are no active workers.

Now, one could argue that just the fact that nbActiveWorkers == 0 even for a short period is ground for a speed reduction, as it proves that compression is currently "waiting for something" (either input or output).

That's correct. In this context, ensuring that, on top of that, no compression actually happened since last update, acts as a kind of confirmation signal, reducing over-reactions.

Both strategies can be defended. I just selected one which is a bit more cautious to take this decision.

@Cyan4973
Copy link
Contributor Author

Cyan4973 commented Sep 25, 2018

Added adaptation limits, that can be selected through command --adapt=min=#,max=#

can supply min and max compression level through advanced command :
--adapt=min=#,max=#
@X-Ryl669
Copy link

I think you should add a generation counter that clocks everything. When you trigger an adaptation, you've to record the actual generation in the request. If the adaptation generation is greater than the global generation, then there is no adaptation possible and you skip adaptation. When the compression job take a decision it has to ensure it's actual, that is, it's the same generation as the global generation.
If it is, it increment the global generation. That way, whatever the timing of the different threads, you'll have, at worst on step late, so you'll adapt in all cases.
This can be done completely atomically, fortunately without stalling anything and should solve the issues you've mentioned.

@terrelln
Copy link
Contributor

I agree with landing it as-is. It would be nice if we adjusted down from high levels faster, but I think this will be a useful feature without it. If adjusting down is critical, then releasing will help us get that feedback.

I ran some tests to make sure that we are adapting to about the right compression level:

zstd -cq -X some-lage.file | zstd -dq | zstd --adapt -vo /dev/null
zstd --adapt -v -c | zstd -dq | zstd -cq -11 -o /dev/null

I'm very happy with the results when the target level isn't too high (say < 10), but once the target level is > 10, the adaptive compression ends up stabilizing a bit lower than the target level (7-9 for level 12). It could be an artifact of the test, like the input is bursty, but that is expected in the real world. This just shows that there is room for improvement, which we already know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants