Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11014: [Rust] [DataFusion] Use correct statistics for ParquetExec #8992

Closed
wants to merge 10 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 22, 2020

ParquetExec represents multiple files but we were calculating statistics based on the first file.

I stumbled across this when working on https://issues.apache.org/jira/browse/ARROW-10995

@andygrove
Copy link
Member Author

@seddonm1 fyi

@codecov-io
Copy link

codecov-io commented Dec 22, 2020

Codecov Report

Merging #8992 (0f21d1a) into master (0519c4c) will increase coverage by 0.01%.
The diff coverage is 85.41%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8992      +/-   ##
==========================================
+ Coverage   82.64%   82.66%   +0.01%     
==========================================
  Files         200      200              
  Lines       49730    49798      +68     
==========================================
+ Hits        41098    41164      +66     
- Misses       8632     8634       +2     
Impacted Files Coverage Δ
rust/datafusion/src/datasource/datasource.rs 100.00% <ø> (ø)
rust/datafusion/src/physical_plan/parquet.rs 80.31% <84.44%> (+0.74%) ⬆️
rust/datafusion/src/datasource/parquet.rs 96.92% <100.00%> (+1.30%) ⬆️
rust/parquet/src/file/metadata.rs 91.05% <0.00%> (-0.78%) ⬇️
rust/parquet/src/schema/types.rs 89.93% <0.00%> (-0.27%) ⬇️
rust/parquet/src/encodings/encoding.rs 95.43% <0.00%> (+0.19%) ⬆️
rust/datafusion/src/physical_plan/expressions.rs 84.49% <0.00%> (+0.31%) ⬆️
rust/arrow/src/util/test_util.rs 90.90% <0.00%> (+15.90%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 081728f...0f21d1a. Read the comment docs.

@seddonm1
Copy link
Contributor

seddonm1 commented Dec 22, 2020

Thanks. Good find. 🤦 by me

@github-actions
Copy link

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is some way to test this code -- I also remember being confused about the fact that ParquetExec can actually take a directory full of files rather than a single one.

use async_trait::async_trait;
use futures::stream::Stream;

/// Execution plan for scanning a Parquet file
/// Execution plan for scanning one or more Parquet files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let mut total_byte_size = 0;
for file in &filenames {
let file = File::open(file)?;
let file_reader = Arc::new(SerializedFileReader::new(file)?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter but we are creating arrow_readers several times for the same file -- like here we create them just to read metadata, and then right below we (re)open the first one again to read the schema. And then we open them again to actually read data...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I've pushed another change here to collect unique schemas during the scan of the files to avoid the separate read. This now also implements a check to make sure the schemas are the same. I have wasted time in the past tracking down issues due to incompatible files. I added a reference to the issue for implementing schema merging, which would be a nice addition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone a little further and introduced a ParquetPartition struct to make things more explicit about how partitioning works and added references to related issues for changing the partitioning strategy. I also improved an error message and added more documentation.

@andygrove
Copy link
Member Author

I wonder if there is some way to test this code -- I also remember being confused about the fact that ParquetExec can actually take a directory full of files rather than a single one.

Yes, tests are definitely lacking here. I will take this on as a follow-up task for the release: https://issues.apache.org/jira/browse/ARROW-11020

@andygrove
Copy link
Member Author

@alamb @jorgecarleitao I got a bit carried away with some other improvements in this PR but I am going to stop now. I filed a follow-up issue to add more comprehensive tests before we release 3.0.0

@andygrove
Copy link
Member Author

Also @Dandandan this starts to introduce some per-partition stats now

@Dandandan
Copy link
Contributor

Makes sense. Maybe we can reuse the summing stats for all partitions for different source types if it gets more complex.

// build a list of Parquet partitions with statistics and gather all unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];
let mut partitions = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use with_capacity

}
let statistics = Statistics {
num_rows: if num_rows == 0 {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why map rows and byte size to None here and not Some(0)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the result is 0, it is best to know that there are 0 records/bytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was a bit sloppy. This is now fixed.

pub fn new(
filenames: Vec<String>,
partitions: Vec<ParquetPartition>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that part of the reason for allowing a list of files was to support similar behavior to DeltaLake where an external file contains a list of filenames representing a version of data rather than grouping them by directory structure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I was not aware of this use case. Perhaps we need a specific constructor for that use case. I'll take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be premature to support that use case anyway until more of the core engine works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a change so there are now try_from_path and try_from_files constructors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

@Dandandan
Copy link
Contributor

1 comment that I think should be resolved. The rest looks good, great that this is fixed.
Agree also that there should be some tests 👍

@andygrove andygrove closed this in f9efa02 Dec 23, 2020
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

num_rows: Some(num_rows as usize),
total_byte_size: Some(total_byte_size as usize),
};
partitions.push(ParquetPartition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
ParquetExec represents multiple files but we were calculating statistics based on the first file.

I stumbled across this when working on https://issues.apache.org/jira/browse/ARROW-10995

Closes apache#8992 from andygrove/ARROW-11014

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants