Skip to content

Commit

Permalink
Refactor parquet row group pruning into a struct (use new statistics …
Browse files Browse the repository at this point in the history
…API, part 1) (#10607)

* Refactor parquet row group pruning into a struct

* Port tests

* improve docs

* fix msrv
  • Loading branch information
alamb committed May 23, 2024
1 parent 7757d63 commit 656da83
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 282 deletions.
7 changes: 7 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub struct FileRange {
pub end: i64,
}

impl FileRange {
/// returns true if this file range contains the specified offset
pub fn contains(&self, offset: i64) -> bool {
offset >= self.start && offset < self.end
}
}

#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
Expand Down
53 changes: 29 additions & 24 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod row_groups;
mod schema_adapter;
mod statistics;

use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};
Expand Down Expand Up @@ -556,32 +557,36 @@ impl FileOpener for ParquetOpener {
};
};

// Row group pruning by statistics: attempt to skip entire row_groups
// using metadata on the row groups
// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
predicate,
&file_metrics,
);
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let mut row_groups = RowGroupSet::new(rg_metadata.len());
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
}
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {
row_groups.prune_by_statistics(
&file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
&file_metrics,
);

// Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups
// using bloom filters on the row groups
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
&file_schema,
&mut builder,
&row_groups,
file_metadata.row_groups(),
predicate,
&file_metrics,
)
.await;
if enable_bloom_filter && !row_groups.is_empty() {
row_groups
.prune_by_bloom_filters(
&file_schema,
&mut builder,
predicate,
&file_metrics,
)
.await;
}
}

Expand Down Expand Up @@ -610,7 +615,7 @@ impl FileOpener for ParquetOpener {
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_row_groups(row_groups)
.with_row_groups(row_groups.indexes())
.build()?;

let adapted = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::physical_plan::parquet::statistics::{
from_bytes_to_i128, parquet_column,
};
Expand Down Expand Up @@ -99,7 +100,7 @@ use super::metrics::ParquetFileMetrics;
///
/// Using `A > 35`: can rule out all of values in Page 1 (rows 0 -> 199)
///
/// Using `B = 'F'`: can rule out all vaues in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299)
/// Using `B = 'F'`: can rule out all values in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299)
///
/// So we can entirely skip rows 0->199 and 250->299 as we know they
/// can not contain rows that match the predicate.
Expand Down Expand Up @@ -133,7 +134,7 @@ impl PagePruningPredicate {
&self,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
row_groups: &[usize],
row_groups: &RowGroupSet,
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
Expand Down Expand Up @@ -172,10 +173,10 @@ impl PagePruningPredicate {
let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let row_group_metadata = &groups[*r];
let row_group_metadata = &groups[r];

let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
let rg_offset_indexes = file_offset_indexes.get(r);
let rg_page_indexes = file_page_indexes.get(r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) =
(rg_page_indexes, rg_offset_indexes, col_idx)
{
Expand All @@ -185,7 +186,7 @@ impl PagePruningPredicate {
predicate,
rg_offset_indexes.get(col_idx),
rg_page_indexes.get(col_idx),
groups[*r].column(col_idx).column_descr(),
groups[r].column(col_idx).column_descr(),
file_metrics,
)
.map_err(|e| {
Expand All @@ -201,7 +202,7 @@ impl PagePruningPredicate {
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
vec![RowSelector::select(groups[r].num_rows() as usize)];
selectors.push(all_selected);
}
}
Expand Down
Loading

0 comments on commit 656da83

Please sign in to comment.