-
Notifications
You must be signed in to change notification settings - Fork 696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ParquetMetadataWriter
allow ad-hoc encoding of ParquetMetadata
#6000
base: 53.0.0-dev
Are you sure you want to change the base?
Conversation
encode_metadata
function to mirror decode_metadata
and allow ad-hoc encoding of ParquetMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb -- this PR looks good to me and I think we could proceed with this design.
I did file #6002 to track a potentially more flexible API that I think is worth considering. However, adding this API to mirror decode_metadata I think would also be fine (and we could make a more complex API later)
parquet/src/file/footer.rs
Outdated
|
||
let encoded = encode_metadata(&metadata).unwrap(); | ||
let decoded = decode_metadata(&encoded).unwrap(); | ||
assert_eq!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you simply just assert that encoded
== decoded
?
parquet/src/file/footer.rs
Outdated
{ | ||
assert_eq!(a, b); | ||
} | ||
// TODO: add encoding and decoding of column and offset indexes (aka page indexes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that encoding/decoding of these structures doesn't have to be present in the initial PR, however given they are stored out of line / slightly differently than the other structures I think it would be good to ensure we could encode them using this same API
parquet/src/file/footer.rs
Outdated
/// specified by the [Parquet Spec]. | ||
/// | ||
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata | ||
pub fn encode_metadata(metadata: &ParquetMetaData) -> Result<Vec<u8>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to switch the existing writers to use this API as well? Not only would that avoid code duplication, it would ensure the API is general enough
For example, I wonder if it would make sense for this function signature to be more like
/// write the metadata to the target `std::io:Write`, returning the number of bytes written
pub fn encode_metadata<W: Write>(metadata: &ParquetMetaData) -> Result<usize> {
...
}
That would allow writing into a Vec
but also allow writing into various other targets and perhaps avoid buffering
@alamb I pushed a fluentish API version of this. I got bogged down implementing the page index writing because there doesn't seem to be a clean path to go from a I do think the readers could be merged. For this encoder to make sense I think it should have an option to handle page indexes and have it enabled and working by default (like the writers do). |
One thing I can do to avoid blocking on my lack of knowledge of encoding the page index stuff is to design the API first and implement it later. E.g. we can add |
Thanks @adriangb -- I will try and review this PR today |
Working through the list of PRs in arrow-rs is on my list of things to do tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb -- this is looking like a good start
I think we should try and structure the code so the existing writer uses this new MetadataEncoder
which would keep metadata writing consistent as well as enable usecases like encoding bloom filters, etc.
Let me know what you think.
cc @sunchao @tustvold @Jefffrey @liukun4515 @nevi-me for any thoughts you might have on this API / approach
@@ -86,7 +86,7 @@ pub type ParquetOffsetIndex = Vec<Vec<Vec<PageLocation>>>; | |||
/// | |||
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift | |||
/// [`parse_metadata`]: crate::file::footer::parse_metadata | |||
#[derive(Debug, Clone)] | |||
#[derive(Debug, Clone, PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
parquet/src/file/footer.rs
Outdated
let column_orders = encode_column_orders(metadata.file_metadata().column_orders()); | ||
let schema = types::to_thrift(&metadata.file_metadata().schema().clone())?; | ||
|
||
let t_file_metadata = TFileMetaData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that this is not quite the same code as used n the actual writer (specifically the way column order is not the same) so I worry it would be inconsistent or drift over time from the actual writer
arrow-rs/parquet/src/file/writer.rs
Lines 352 to 375 in 22e0b44
// We only include ColumnOrder for leaf nodes. | |
// Currently only supported ColumnOrder is TypeDefinedOrder so we set this | |
// for all leaf nodes. | |
// Even if the column has an undefined sort order, such as INTERVAL, this | |
// is still technically the defined TYPEORDER so it should still be set. | |
let column_orders = (0..self.schema_descr().num_columns()) | |
.map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder {})) | |
.collect(); | |
// This field is optional, perhaps in cases where no min/max fields are set | |
// in any Statistics or ColumnIndex object in the whole file. | |
// But for simplicity we always set this field. | |
let column_orders = Some(column_orders); | |
let file_metadata = parquet::FileMetaData { | |
num_rows, | |
row_groups, | |
key_value_metadata, | |
version: self.props.writer_version().as_num(), | |
schema: types::to_thrift(self.schema.as_ref())?, | |
created_by: Some(self.props.created_by().to_owned()), | |
column_orders, | |
encryption_algorithm: None, | |
footer_signing_key_metadata: None, | |
}; |
Thus what I suggest we do here is change writer.rs to use the ParquetMetadataEncoder
and refactor the code from there into this function. That would be a bit more involved but I think would set us up nicely so that metadata encoding remains consistent.
I completely agree. That's just a much bigger chunk to bite off, I can give it a shot but I may need support to get there. |
afa975d
to
d7a4156
Compare
I've made some progress. I made a (very rough) metadata writer that is used internally by
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking quite nice @adriangb and I think we should try and proceed with this approach.
I think it would be easier to make progress if we can work on the approach incrementally as multiple smaller PRs rather than one large one (it will be easier for me to give you timely feedback)
Also, it is probably good to know of #5486 from @etseidl which could conflict as we change the metadata.
Given we are now being careful about breaking changes (see https://github.com/apache/arrow-rs/blob/master/CONTRIBUTING.md#breaking-changes) I am worried that these PRs will interact / cause conflicts with each other
What do you think of this idea: #6050 ?
Some(self.props.created_by().to_string()), | ||
self.props.writer_version().as_num(), | ||
); | ||
encoder.finish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
parquet/src/file/writer.rs
Outdated
|
||
let mut row_groups = self | ||
.row_groups | ||
.as_slice() | ||
.iter() | ||
.map(|v| v.to_thrift()) | ||
.collect::<Vec<_>>(); | ||
|
||
self.write_bloom_filters(&mut row_groups)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW #5933 also contains changes for bloom filter writing
@@ -791,23 +710,274 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> { | |||
} | |||
} | |||
|
|||
struct ThriftMetadataWriter<'a, W: Write> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always get confused when reading the parquet code between what are the generated Thrift structures from the structures in https://docs.rs/parquet/latest/parquet/file/metadata/index.html
I like how you have split out writing of the thrift structures here from the writing of the parquet::file
structures
Ok(()) | ||
} | ||
|
||
fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking around for another copy of this code and I now see that this is the first time we are going from Index
--> ColumnIndex
Makes sense to me. I think this type of structure could really help clean up some of the tests too (but I am getting ahead of myself)
parquet/src/file/writer.rs
Outdated
|
||
let file_metadata = parquet::FileMetaData { | ||
num_rows, | ||
let encoder = ThriftMetadataWriter::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might read nicer like this:
let encoder = ThriftMetadataWriter::new()
.with_schema(&self.schema)
.with_descr(&self.descr)
.with_row_groups(row_groups)
...
);
// encode the data to buf
encoder.encode(&mut buf)
Though I realize many of these fields are required
Maybe something like
let encoder = ThriftMetadataWriter::new(
&self.schema,
&self.descr,
...
)
.with_column_indexes(&self.column_indexes)
.with_offset_indexes(&self.offset_indexes);
encoder.encode(&mut buf)
parquet/src/file/writer.rs
Outdated
if let Some(row_group_offset_indexes) = self.metadata.offset_index() { | ||
(0..self.metadata.row_groups().len()) | ||
.map(|rg_idx| { | ||
let column_indexes = &row_group_offset_indexes[rg_idx]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: could this be named offset_indexes
?
parquet/src/file/page_index/index.rs
Outdated
let null_counts = self | ||
.indexes | ||
.iter() | ||
.map(|x| x.null_count()) | ||
.collect::<Option<Vec<_>>>() | ||
.unwrap_or_else(|| vec![0; self.indexes.len()]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While merging with #5486, I noticed this. IIUC, if on read the optional thrift ColumnIndex::null_counts
is not present, then the PageIndex::null_count
will be None
. When converting back to a thrift ColumnIndex
, it appears that this will convert the missing null_counts
into a vector of num_pages
zeros. I don't know if this is the correct behavior, mostly because the spec is (AFAICT) silent on the interpretation of a non-present null_counts
. Is it not present as an optimization when there are no nulls, or is it not present due to a lack of information (say a V1 encoder doesn't keep null counts since the V1 page header doesn't require them). Due to that ambiguity I think null_counts
here should be None
if any or all of the PageIndex::null_count
fields is None
. Perhaps stop after the collect()
and pass null_counts
directly below.
Update here is I plan to make a |
Hi @adriangb -- I changed this PR to point at the Again, I am really sorry for the delay in reviewing. I think this is a really important feature but I have been overwhelmed with reviews for the last week or two |
Thank you @alamb! No need to apologize; you have such a diverse and impactful contribution to open source, your time management is really quite inspiring. If anything I need to apologize for lagging on applying feedback. I will go over this PR and incorporate feedback (hopefully before your review tomorrow). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is how I suggest we proceed with this PR:
- Let's create an example with the usecase described in API for encoding/decoding ParquetMetadata with more control #6002 (comment) (I will try to do this later today). I think this will motivate how the API looks like
- In parallel we could pull out some of the simple usability changes (like adding
PartialEq
andpub use thrift
stuff into their own PR so we can merge that.
I started on a basic example here: #6081 -- tomorrow I'll try and find time to try and rebase it on this PR and see if I can do what is needed |
encode_metadata
function to mirror decode_metadata
and allow ad-hoc encoding of ParquetMetadata
ParquetMetadataWriter
allow ad-hoc encoding of ParquetMetadata
67545a6
to
96fa84d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @adriangb -- I think this is looking really close
While looking through the API for #6097 I had a few more suggestions, but then I think this would be ready. What do you think @etseidl ? I feel this is close to what is proposed in #6095 and related PRs. I feel we are quite close to having some sort of reasonable API for reading / writing these structures:
- Metadata is stored in
ParquetFileMetadata
and associated structures - Write
ParquetFileMetadata
to bytes usingThriftMetadataWriter
(and maybe there will be an async version) - (Eventually) we can have an equivalent
ThriftMetadataReader
(and the async version `MetadataLoader)
parquet/src/file/page_index/index.rs
Outdated
@@ -168,6 +168,38 @@ impl<T: ParquetValueType> NativeIndex<T> { | |||
boundary_order: index.boundary_order, | |||
}) | |||
} | |||
|
|||
pub(crate) fn to_column_index(&self) -> ColumnIndex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think calling this method to_thrift
might be more consistent with other APIs like
https://docs.rs/parquet/latest/parquet/file/metadata/struct.RowGroupMetaData.html#method.to_thrift
The naming is already pretty confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
buf: &'a mut TrackedWrite<W>, | ||
schema: &'a TypePtr, | ||
schema_descr: &'a SchemaDescPtr, | ||
row_groups: Vec<RowGroup>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than storing these fields separately, I wonder if it would be possible simply to store a https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html
(can be done as a follow on PR)
Something like
struct ThriftMetadataWriter<'a, W: Write> {
buf: &'a mut TrackedWrite<W>,
parquet_metadata: &ParquetMetadata, // or maybe Arc<ParquetMetadata> 🤔
}
And then add the various builder APIs like with_key_value_metadata
directly to ParquetMetadata
This would make it easier to work / manipulate ParquetMetadata
in general and would make the responsibility of the writer clearer (handle the details of coordinating the writing of the thrift encoded structures and their indexes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but lets do that in a followup PR since (I think) that would not be breaking in any way and this is quite large already
Yes, I think this is ready to merge into 53.0.0-dev. The unencoded size info is still separate from the page locations, so hopefully this will merge in fairly cleanly. From a practical standpoint, I think it will be easier to merge this before #6095, and then I can make the needed changes to the new offset index struct to implement |
I think I've addressed the feedback and updated the branch :) |
I'm not sure why the test is failing (it was before, I don't think it's from a merge). Need to investigate. |
I think you'll need to merge 53.0.0-dev again to pick up the latest changes to the offset index, and then reformat (some new names are longer and changed how the linter wants lines wrapped). |
b7943fc
to
b38ccf7
Compare
b38ccf7
to
b41173f
Compare
I've updated the branch and cleaned up, test is still failing. It seems the reading part is trying to access byte 0 of the file, which doesn't make sense and makes me think there's a bug somewhere (could be in the test since there's a lot of shim in there): https://github.com/apache/arrow-rs/actions/runs/10082832978/job/27878006690?pr=6000#step:6:761 |
|
||
let data = buf.into_inner().freeze(); | ||
|
||
let decoded_metadata = load_metadata_from_bytes(metadata.file_size, data).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let decoded_metadata = load_metadata_from_bytes(metadata.file_size, data).await; | |
let decoded_metadata = load_metadata_from_bytes(data.len(), data).await; |
This will load the page indexes, but then the assert below fails because the offset_index_offset
and column_index_offset
fields of the column chunk are different. Might have to write an equals
that accounts for that.
I merged the 53 dev branch Update: I restored the branch |
I wrote up some thoughts that were floating in my head in #6129 I am hoping to spend some more time today looking at this PR in deatil Thank you again for your patience |
A step towards #5988, #6002