Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Encoding Parquet Columns in Parallel #4871

Merged
merged 7 commits into from
Oct 1, 2023

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Sep 27, 2023

Which issue does this PR close?

Related to: #1718
Enables: apache/datafusion#7655
closes #4871

Rationale for this change

Inspired by #4859 but exposing a slightly different API

I have confirmed this does not appear to have any impact on benchmarks, likely because it doesn't alter any of the "hot" loops

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Sep 27, 2023
@@ -558,10 +564,25 @@ pub(crate) struct LevelInfo {

/// The maximum repetition for this leaf column
max_rep_level: i16,

/// The arrow array
array: ArrayRef,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major meat of the PR, by bundling the leaf array with the corresponding metadata, we get an API that is very amenable to parallelization

pub struct ArrowLeafColumn(ArrayLevels);

/// Computes the [`ArrowLeafColumn`] for a given potentially nested [`ArrayRef`]
pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
Copy link
Contributor Author

@tustvold tustvold Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the API that allows data to be written encoded in parallel. This method takes a single array so that:

}
}
}

/// Encodes [`RecordBatch`] to a parquet row group
pub struct ArrowRowGroupWriter {
writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
struct ArrowRowGroupWriter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer needs to be public, as this hasn't been released yet, I opted to partially revert the change in (#4850)

@tustvold
Copy link
Contributor Author

Working on adding an example

@tustvold tustvold changed the title Facilitate parallel parquet writing Support Encoding Parquet Columns in Parallel Sep 27, 2023
@@ -115,8 +115,7 @@ pub type OnCloseRowGroup<'a> = Box<
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
) -> Result<()>
+ 'a
+ Send,
+ 'a,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unreleased change from #4850 is no longer necessary

@devinjdangelo
Copy link
Contributor

This looks great! I will take a pass at updating apache/datafusion#7655 to use this to test it out and report back.

@devinjdangelo
Copy link
Contributor

This worked like a charm! I updated apache/datafusion#7655 to use this branch. Your example comment was very helpful as well. I believe the datafusion PR should be handling nested columns correctly now. Performance metrics are within a margin of error vs. the previous API.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really neat @tustvold and @devinjdangelo

I had some suggestions that might make the example and the APIs a little easier to work with for mere mortals like myself 😅 -- but nothing that couldn't be done as a follow on either.

I took a shot at using this API to make an example program that writes parquet in parallel (both row groups and column), both as a way to document how to use it / make it hopefully more discoverable as well as figuring it out myself.

I hope to make a follow on PR with this example and some additional documentation suggestions

///
/// // Spawn work to encode columns
/// let mut worker_iter = workers.iter_mut();
/// for (a, f) in to_write.iter().zip(&schema.fields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know single letter variable names are concise, but I would find this much easier to follow if it had names like

Suggested change
/// for (a, f) in to_write.iter().zip(&schema.fields) {
/// for (arr, field) in to_write.iter().zip(&schema.fields) {

parquet/src/arrow/arrow_writer/mod.rs Show resolved Hide resolved
/// // Spawn work to encode columns
/// let mut worker_iter = workers.iter_mut();
/// for (a, f) in to_write.iter().zip(&schema.fields) {
/// for c in compute_leaves(f, a).unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// for c in compute_leaves(f, a).unwrap() {
/// for leaves in compute_leaves(f, a).unwrap() {

/// for (handle, send) in workers {
/// drop(send); // Drop send side to signal termination
/// let (chunk, result) = handle.join().unwrap().unwrap();
/// row_group.append_column(&chunk, result).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result in copying the chunk data, but I suppose that is inevitable -- as this code effectively encoding columns chunks into memory buffers somewhere that then need to be copied directly into destination parquet file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will just write the bytes to the Write implementation. In this case that is a Vec but if it were a File it "technically" wouldn't be a copy... FWIW this is the same as master

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry - I should have been clear that I don't see anything wrong. I was just observing / validating my understanding.

}

/// Close this column returning the [`ArrowColumnChunk`] and [`ColumnCloseResult`]
pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was playing with this API, I found this combination of (ArrowColumnChunk, ColumnCloseResult) to be awkward to use because you always need both of them, but the api requires passing them as a pair

What would you think about wrapping them in a named struct, something like this, perhaps?

EncodedRowGroup((ArrowColumnChunk, ColumnCloseResult))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't be usable with append_column unless you first exploded it into parts, at which point...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there could be a light wrapper around append_column that accepts EncodedRowGroup as @alamb defined it?

It isn't a big deal, but I agree with @alamb that it is confusing at first since the consumer of this API does not really need to concern themself with either of these two structs individually. They are more internal implementation details of arrow-rs. EncodedRowGroup would be more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to address this in a4c17a9 PTAL

@alamb
Copy link
Contributor

alamb commented Sep 28, 2023

Can we also implement Debug for ArrowColumnWriter?

93 | #[derive(Debug, Clone)]
   |          ----- in this derive macro expansion
...
98 |     col_writers: Vec<ArrowColumnWriter>
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `ArrowColumnWriter` cannot be formatted using `{:?}` because it doesn't implement `Debug`
   |

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants