From 859d3c61c3f85dbd38a84f995e28ce9ef529e5bd Mon Sep 17 00:00:00 2001 From: Qianqian <130200611+Sevenannn@users.noreply.github.com> Date: Thu, 24 Oct 2024 17:29:12 -0700 Subject: [PATCH 1/2] Fix omitted predicate in parquet reading --- .../datasource/physical_plan/parquet/mod.rs | 27 +++++++ .../physical_plan/parquet/opener.rs | 70 ++++++++++++++++--- 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 743dd5896986..f768499f793f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -2312,6 +2312,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_predicate_no_pushdown_parquet() -> Result<()> { + let tmp_dir = TempDir::new()?; + let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet"; + write_file(&path); + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + let sql = " + with tmp as ( + select *, 's' flag from base_table) + select * from tmp where flag = 'w'; + "; + let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + assert_eq!(batch.len(), 1); + let expected = [ + "+--------+----+------+------+", + "| struct | id | name | flag |", + "+--------+----+------+------+", + "+--------+----+------+------+", + ]; + crate::assert_batches_eq!(expected, &batch); + Ok(()) + } + #[tokio::test] async fn test_struct_filter_parquet_with_view_types() -> Result<()> { let tmp_dir = TempDir::new().unwrap(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 4990cb4dd735..83ef161d175a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -30,9 +30,11 @@ use crate::datasource::physical_plan::{ }; use crate::datasource::schema_adapter::SchemaAdapterFactory; use crate::physical_optimizer::pruning::PruningPredicate; -use arrow_schema::{ArrowError, SchemaRef}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; use datafusion_common::{exec_err, Result}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::filter::batch_filter; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use log::debug; @@ -159,7 +161,9 @@ impl FileOpener for ParquetOpener { ); // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { + if let Some(predicate) = + pushdown_filters.then_some(predicate.clone()).flatten() + { let row_filter = row_filter::build_row_filter( &predicate, &file_schema, @@ -187,7 +191,7 @@ impl FileOpener for ParquetOpener { // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query let file_metadata = Arc::clone(builder.metadata()); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let pruning_predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read let access_plan = @@ -198,12 +202,12 @@ impl FileOpener for ParquetOpener { row_groups.prune_by_range(rg_metadata, range); } // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { + if let Some(pruning_predicate) = pruning_predicate.as_ref() { row_groups.prune_by_statistics( &file_schema, builder.parquet_schema(), rg_metadata, - predicate, + pruning_predicate, &file_metrics, ); @@ -212,7 +216,7 @@ impl FileOpener for ParquetOpener { .prune_by_bloom_filters( &file_schema, &mut builder, - predicate, + pruning_predicate, &file_metrics, ) .await; @@ -256,8 +260,21 @@ impl FileOpener for ParquetOpener { let adapted = stream .map_err(|e| ArrowError::ExternalError(Box::new(e))) .map(move |maybe_batch| { - maybe_batch - .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) + maybe_batch.and_then(|b| { + if !pushdown_filters { + if let Some(predicate) = predicate.clone() { + let updated_predicate = update_predicate_index( + Arc::clone(&predicate), + b.schema_ref(), + )?; + + let b = batch_filter(&b, &updated_predicate)?; + + return schema_mapping.map_batch(b).map_err(Into::into); + } + } + schema_mapping.map_batch(b).map_err(Into::into) + }) }); Ok(adapted.boxed()) @@ -297,3 +314,40 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Return the predicate with updated column indexes +/// +/// The indexes of Column PhysicalExpr in predicate are based on +/// file_schema / table_schema, which might be different from the +/// RecordBatch schema returned by Parquet Reader +/// +/// Recursively find all Column PhysicalExpr +/// and update with indexes of RecordBatch Schema +/// +/// Returns an error if failed to update Column index +fn update_predicate_index( + predicate: Arc, + schema: &Arc, +) -> std::result::Result, ArrowError> { + let children = predicate.children(); + + if children.len() == 0 { + if let Some(column) = predicate.as_any().downcast_ref::() { + let name = column.name(); + let new_index = schema.index_of(name)?; + let new_column = Column::new(name, new_index); + return Ok(Arc::new(new_column)); + } + return Ok(Arc::clone(&predicate)); + } + + let mut new_children: Vec> = Vec::new(); + for child in children { + let updated_child = update_predicate_index(Arc::clone(child), schema)?; + new_children.push(updated_child); + } + + predicate + .with_new_children(new_children) + .map_err(Into::into) +} From 907d93d1376e41afa41d63bcbd0093df6b44e58b Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Fri, 25 Oct 2024 19:48:23 -0700 Subject: [PATCH 2/2] fix --- .../physical_plan/parquet/opener.rs | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 83ef161d175a..b90c1253b4b3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -263,14 +263,17 @@ impl FileOpener for ParquetOpener { maybe_batch.and_then(|b| { if !pushdown_filters { if let Some(predicate) = predicate.clone() { - let updated_predicate = update_predicate_index( + if let Some(updated_predicate) = update_predicate_index( Arc::clone(&predicate), b.schema_ref(), - )?; - - let b = batch_filter(&b, &updated_predicate)?; - - return schema_mapping.map_batch(b).map_err(Into::into); + )? { + let filtered_batch = + batch_filter(&b, &updated_predicate)?; + + return schema_mapping + .map_batch(filtered_batch) + .map_err(Into::into); + } } } schema_mapping.map_batch(b).map_err(Into::into) @@ -328,26 +331,34 @@ fn create_initial_plan( fn update_predicate_index( predicate: Arc, schema: &Arc, -) -> std::result::Result, ArrowError> { +) -> Result>, ArrowError> { let children = predicate.children(); if children.len() == 0 { if let Some(column) = predicate.as_any().downcast_ref::() { let name = column.name(); - let new_index = schema.index_of(name)?; + let new_index = match schema.index_of(name) { + Ok(new_index) => new_index, + Err(_) => return Ok(None), + }; let new_column = Column::new(name, new_index); - return Ok(Arc::new(new_column)); + return Ok(Some(Arc::new(new_column))); } - return Ok(Arc::clone(&predicate)); + return Ok(Some(Arc::clone(&predicate))); } let mut new_children: Vec> = Vec::new(); for child in children { - let updated_child = update_predicate_index(Arc::clone(child), schema)?; + let updated_child = match update_predicate_index(Arc::clone(child), schema)? { + Some(child) => child, + None => return Ok(None), + }; new_children.push(updated_child); } - predicate + let updated_predicate = predicate .with_new_children(new_children) - .map_err(Into::into) + .map_err(Into::::into)?; + + Ok(Some(updated_predicate)) }