Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move parallel parquet serialization to blocking threads #9605

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

related to #9493

Rationale for this change

Serialization is CPU intensive and could starve the object store writer futures resulting in failed/timed out writes.

I believe that all parts of parallel parquet writing apart from concatenate_parallel_row_groups (which does the object store put) could be moved to a sync/blocking thread. However, I think serialization is the only one CPU intensive enough between calls to .await to potentially cause issues.

What changes are included in this PR?

Moves parquet column serialization to blocking threads with spawn_blocking method.

Are these changes tested?

Yes, by existing tests

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Mar 14, 2024
@devinjdangelo devinjdangelo changed the title Move parallel parquet serialization to blocking threadpool Move parallel parquet serialization to blocking threads Mar 14, 2024
@tustvold
Copy link
Contributor

So yes in an ideal world all CPU computation would be spawned to rayon or a similar blocking threadpool as in this PR. However, unfortunately this isn't the way DF has been implemented.

Instead the compromise we use in IOx is to just spawn the IO off to a separate runtime, and accept that the runtime DF is running in will not have good poll latencies.

Therefore rather than spawning the CPU bound work, we instead want to spawn the IO of the multipart upload.

@alamb
Copy link
Contributor

alamb commented Mar 14, 2024

So yes in an ideal world all CPU computation would be spawned to rayon or a similar blocking threadpool as in this PR. However, unfortunately this isn't the way DF has been implemented.

I will continue to agree to disagree on this point. But I think @tustvold may be trolling me 🤔 :)

Anyhow, for the record, I think using 2 separate tokio runtimes is perfectly acceptable and good for reasons I have soapbox'ed on at length about. However, using tokio for CPU bound threads means it is very easy to schedule CPU and IO on the same thread pool which will become a bottleneck in some scenarios

So in the sense that using a different threadpool API would make it impossible to mix IO and CPU work it would be an improvement. However I think it would be a heavy price to pay

@alamb
Copy link
Contributor

alamb commented Mar 14, 2024

Therefore rather than spawning the CPU bound work, we instead want to spawn the IO of the multipart upload.

I agree with this assesment -- basically the threadpool running the DataFusion plan should be doing CPU work and ideally not also IO work

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Mar 14, 2024

I agree with this assesment -- basically the threadpool running the DataFusion plan should be doing CPU work and ideally not also IO work

I understand the context here for influxdb, but it would also be interesting to have a deeper discussion on this in the context of DataFusion as a standalone execution engine. I.e. should we be doing anything differently to make sure users of datafusion-cli running a query like

COPY (select * from 's3://bucket/table') to 's3://bucket/parquet.file'

won't run into poll latency issues reading/writing from remote object stores. Perhaps one of two things is true:

  1. Poll latency actually isn't that big of an issue for multipart objectstore writes. The batch sizes are small enough that the time between .awaits will not negatively impact a streaming multipart write workload.
  2. Poll latency does cause unpredictable job failure and DataFusion should perhaps manage two tokio runtimes itself for IO/CPU or make more consistent use of spawn_blocking

I have tested queries like the above myself (albeit unscientifically) and not run into any issues. It may be a good idea to test this more thoroughly.

@tustvold
Copy link
Contributor

tustvold commented Mar 14, 2024

Poll latency does cause unpredictable job failure and DataFusion should perhaps manage two tokio runtimes itself for IO/CPU or make more consistent use of spawn_blocking

Certainly the lancedb users have reported issues related to this apache/arrow-rs#5366, and we have seen similar issues on the read side in IOx in the past before we split out to use a separate runtime.

I suspect in most cases it will just limit throughput, it would take a very contended system for it to result in an actual failure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants