Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushdown RowFilter in ParquetExec #3380

Merged
merged 14 commits into from Sep 13, 2022
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Expand Up @@ -44,6 +44,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};

use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Expand Up @@ -25,6 +25,7 @@ mod delimited_stream;
mod file_stream;
mod json;
mod parquet;
mod row_filter;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
Expand Down
190 changes: 166 additions & 24 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Expand Up @@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::row_filter::build_row_filter;
use crate::physical_plan::file_format::FileMeta;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -67,6 +68,30 @@ use parquet::file::{
};
use parquet::schema::types::ColumnDescriptor;

#[derive(Debug, Clone, Default)]
/// Specify options for the parquet scan
pub struct ParquetScanOptions {
/// If true, any available `pruning_predicate` will be converted to a `RowFilter`
/// and pushed down to the `ParquetRecordBatchStream`. This will enable row level
/// filter at the decoder level. Defaults to false
pushdown_filters: bool,
/// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize
/// the cost of filter evaluation.
reorder_predicates: bool,
}

impl ParquetScanOptions {
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.pushdown_filters = pushdown_filters;
self
}

pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self {
self.reorder_predicates = reorder_predicates;
self
}
}

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
Expand All @@ -81,6 +106,8 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
/// Options to specify behavior of parquet scan
scan_options: ParquetScanOptions,
}

impl ParquetExec {
Expand Down Expand Up @@ -121,6 +148,7 @@ impl ParquetExec {
pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
scan_options: ParquetScanOptions::default(),
}
}

Expand Down Expand Up @@ -148,6 +176,12 @@ impl ParquetExec {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}

/// Configure `ParquetScanOptions`
pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> Self {
self.scan_options = scan_options;
self
}
}

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec {
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
parquet_file_reader_factory,
scan_options: self.scan_options.clone(),
};

let stream = FileStream::new(
Expand Down Expand Up @@ -319,6 +354,7 @@ struct ParquetOpener {
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
scan_options: ParquetScanOptions,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
let table_schema = self.table_schema.clone();
let reorder_predicates = self.scan_options.reorder_predicates;
let pushdown_filters = self.scan_options.pushdown_filters;

Ok(Box::pin(async move {
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;

Expand All @@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener {
adapted_projections.iter().cloned(),
);

if let Some(predicate) = pushdown_filters
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
if let Ok(Some(filter)) = build_row_filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is an error when build the row filter, why not throw the error out?

predicate.clone(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
reorder_predicates,
) {
builder = builder.with_row_filter(filter);
}
};

let groups = builder.metadata().row_groups();
let row_groups =
prune_row_groups(groups, file_range, pruning_predicate, &metrics);
Expand Down Expand Up @@ -839,6 +893,7 @@ mod tests {
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a follow on PR we can adjust the tests to do the same scan both with and without pushdown

) -> Result<Vec<RecordBatch>> {
let file_schema = match schema {
Some(schema) => schema,
Expand All @@ -851,7 +906,7 @@ mod tests {
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
let parquet_exec = ParquetExec::new(
let mut parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
Expand All @@ -865,6 +920,14 @@ mod tests {
None,
);

if pushdown_predicate {
parquet_exec = parquet_exec.with_scan_options(
ParquetScanOptions::default()
.with_pushdown_filters(true)
.with_reorder_predicates(true),
);
}

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
collect(Arc::new(parquet_exec), task_ctx).await
Expand Down Expand Up @@ -912,9 +975,10 @@ mod tests {
let batch3 = add_to_batch(&batch1, "c3", c3);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None)
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None, false)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
Expand Down Expand Up @@ -953,7 +1017,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None)
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false)
.await
.unwrap();
let expected = vec![
Expand Down Expand Up @@ -987,7 +1051,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None)
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false)
.await
.unwrap();
let expected = vec![
Expand Down Expand Up @@ -1020,24 +1084,60 @@ mod tests {
// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

let filter = col("c2").eq(lit(0_i64));
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c3 | c2 |",
"+-----+----+----+",
"| Foo | 10 | |",
"| | | |",
"| | 10 | 1 |",
"| | 20 | |",
"| | 20 | 2 |",
"| Foo | 10 | |",
"| bar | | |",
"+-----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_intersection_filter_with_filter_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));

// batch1: c1(string), c3(int8)
let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);

// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_projection() {
let c1: ArrayRef =
Expand All @@ -1061,10 +1161,15 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]);

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None, None)
.await
.unwrap();
let read = round_trip_to_parquet(
vec![batch1, batch2],
Some(vec![0, 3]),
None,
None,
false,
)
.await
.unwrap();
let expected = vec![
"+-----+-----+",
"| c1 | c4 |",
Expand Down Expand Up @@ -1102,9 +1207,10 @@ mod tests {
let filter = col("c3").eq(lit(0_i8));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();

// Predicate should prune all row groups
assert_eq!(read.len(), 0);
Expand All @@ -1123,12 +1229,13 @@ mod tests {
// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2)]);

let filter = col("c2").eq(lit(0_i64));
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();

// This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0`
// but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is
Expand All @@ -1139,14 +1246,48 @@ mod tests {
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| Foo | |",
"| | |",
"| | |",
"| | 1 |",
"| | 2 |",
"| Foo | |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2)]);

let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();

let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| | 1 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
Expand Down Expand Up @@ -1181,6 +1322,7 @@ mod tests {
None,
Some(Arc::new(schema)),
None,
false,
)
.await;
assert_contains!(read.unwrap_err().to_string(),
Expand Down