Skip to content

Add support for parallel order-preserving Parquet write #7375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 7, 2023

Conversation

Mytherin
Copy link
Collaborator

@Mytherin Mytherin commented May 5, 2023

Follow-up from #7368 - this PR adds support for parallel order-preserving writes to Parquet files.

Writing to Parquet files is less straightforward than writing to CSV files - as we need to write data in row groups of a specific size. For CSV files we can write any batch directly to disk regardless of how large it is. For Parquet files the desired row group size must be taken into account, and is user configurable. This complicates the parallel writing with batches as there is no guarantee to how many rows are in a batch.

In order to facilitate this we add a new callback to the copy function:

idx_t CopyDesiredBatchSize(ClientContext &context, FunctionData &bind_data);

If this callback is specified, a separate operator is executed (PhysicalFixedBatchCopy). This operator ensures prepare_batch and flush_batch are only called with ~the specified row group size (+/- STANDARD_VECTOR_SIZE, except for the final row group which can have fewer tuples).

The way this is done is by materializing data for the separate batches, and executing a separate repartitioning step which combines or splits up data from different batches so that each batch has approximately the desired size. The batches are then prepared in parallel, and flushed in sequential order, as is the case for the standard batch copy to file.

Performance

Performance improves significantly over the single-threaded case - but does not quite reach the performance of the preserve_insertion_order=false case due to the extra materialization & repartitioning step that is required. This is particularly noticeable when the batch sizes are not aligned with the desired row group size (e.g. when reading from one parquet file with different row group sizes and writing to another parquet file with a different row group size, or when filters are included). Nevertheless performance is still way better than the single-threaded case.

Dataset New preserve_insertion_order=false 1T Input Batch Size ROW_GROUP_SIZE
Lineitem SF1 1.05s 0.93s 5.14s 122880 122880
OnTime 1.68s 1.05s 7.32s 91555 122880
ClickBench Small 11.08s 7.4s 41.12s 100352 122880

@Mytherin Mytherin merged commit b0677fb into duckdb:master May 7, 2023
@Mytherin Mytherin deleted the parallelparquetcopytofile branch July 5, 2023 03:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant