Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add datafusion based parquet reader #312

Closed
wants to merge 13 commits into from

Conversation

jiacai2050
Copy link
Contributor

Which issue does this PR close?

Closes #291

Rationale for this change

As described in #291, this PR also fix object store cache isn't working.
After #14, parquet reader will read all bytes out, ignoring whether if it's already cached.

What changes are included in this PR?

Replace hand-rolled parquet reader with datafusion's ParquetExec, and add CachableParquetFileReader to implement row-group level cache

Are there any user-facing changes?

No

How does this change test

Using existing UT

@@ -264,7 +264,14 @@ mod tests {
};

let mut reader = ParquetSstReader::new(&sst_file_path, &store, &sst_reader_options);
assert_eq!(reader.meta_data().await.unwrap(), &sst_meta);
let sst_meta_readback = {
// size of SstMetaData is not what this file's size, so overwrite it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add FIXME: prefix.


use super::encoding::{self, ParquetDecoder};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid the relative importing path.

self.metadata_size_hint, self.cache_hit, self.cache_miss, self.metrics.bytes_scanned.value()
);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss one newline here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


/// Creates expression like:
/// start <= time && time < end
pub fn df_expr(&self, column_name: impl AsRef<str>) -> Expr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn df_expr(&self, column_name: impl AsRef<str>) -> Expr {
pub fn to_df_expr(&self, column_name: impl AsRef<str>) -> Expr {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@jiacai2050 jiacai2050 force-pushed the new-parquet-reader branch 3 times, most recently from c1f7c51 to 6d3851d Compare October 26, 2022 07:38
@jiacai2050
Copy link
Contributor Author

jiacai2050 commented Oct 27, 2022

In my local environment, the performance have regression when adopt this new reader, so further investigation is required before merge this.

Tested sst file: 104,022,899 rows

old: 11709ms(this exclude file read, since it's read out in advance)
new: 38986ms
new without read:23208ms

Related issue: apache/arrow-rs#2916

@jiacai2050 jiacai2050 changed the title feat: add datafusion based parquet reader WIP: add datafusion based parquet reader Oct 27, 2022
@jiacai2050 jiacai2050 closed this Nov 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

replace custom ParquetSstReader to DataFusion's ParquetExec
2 participants