Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_predicate_no_pushdown_parquet() -> Result<()> {
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
write_file(&path);
let ctx = SessionContext::new();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
ctx.register_listing_table("base_table", path, opt, None, None)
.await
.unwrap();
let sql = "
with tmp as (
select *, 's' flag from base_table)
select * from tmp where flag = 'w';
";
let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
assert_eq!(batch.len(), 1);
let expected = [
"+--------+----+------+------+",
"| struct | id | name | flag |",
"+--------+----+------+------+",
"+--------+----+------+------+",
];
crate::assert_batches_eq!(expected, &batch);
Ok(())
}

#[tokio::test]
async fn test_struct_filter_parquet_with_view_types() -> Result<()> {
let tmp_dir = TempDir::new().unwrap();
Expand Down
81 changes: 73 additions & 8 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ use crate::datasource::physical_plan::{
};
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;
use arrow_schema::{ArrowError, SchemaRef};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter::batch_filter;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use log::debug;
Expand Down Expand Up @@ -159,7 +161,9 @@ impl FileOpener for ParquetOpener {
);

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
if let Some(predicate) =
pushdown_filters.then_some(predicate.clone()).flatten()
{
let row_filter = row_filter::build_row_filter(
&predicate,
&file_schema,
Expand Down Expand Up @@ -187,7 +191,7 @@ impl FileOpener for ParquetOpener {
// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let pruning_predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan =
Expand All @@ -198,12 +202,12 @@ impl FileOpener for ParquetOpener {
row_groups.prune_by_range(rg_metadata, range);
}
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {
if let Some(pruning_predicate) = pruning_predicate.as_ref() {
row_groups.prune_by_statistics(
&file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
pruning_predicate,
&file_metrics,
);

Expand All @@ -212,7 +216,7 @@ impl FileOpener for ParquetOpener {
.prune_by_bloom_filters(
&file_schema,
&mut builder,
predicate,
pruning_predicate,
&file_metrics,
)
.await;
Expand Down Expand Up @@ -256,8 +260,24 @@ impl FileOpener for ParquetOpener {
let adapted = stream
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.map(move |maybe_batch| {
maybe_batch
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into))
maybe_batch.and_then(|b| {
if !pushdown_filters {
if let Some(predicate) = predicate.clone() {
if let Some(updated_predicate) = update_predicate_index(
Arc::clone(&predicate),
b.schema_ref(),
)? {
let filtered_batch =
batch_filter(&b, &updated_predicate)?;

return schema_mapping
.map_batch(filtered_batch)
.map_err(Into::into);
}
}
}
schema_mapping.map_batch(b).map_err(Into::into)
})
});

Ok(adapted.boxed())
Expand Down Expand Up @@ -297,3 +317,48 @@ fn create_initial_plan(
// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}

/// Return the predicate with updated column indexes
///
/// The indexes of Column PhysicalExpr in predicate are based on
/// file_schema / table_schema, which might be different from the
/// RecordBatch schema returned by Parquet Reader
///
/// Recursively find all Column PhysicalExpr
/// and update with indexes of RecordBatch Schema
///
/// Returns an error if failed to update Column index
fn update_predicate_index(
predicate: Arc<dyn PhysicalExpr>,
schema: &Arc<Schema>,
) -> Result<Option<Arc<dyn PhysicalExpr>>, ArrowError> {
let children = predicate.children();

if children.len() == 0 {
if let Some(column) = predicate.as_any().downcast_ref::<Column>() {
let name = column.name();
let new_index = match schema.index_of(name) {
Ok(new_index) => new_index,
Err(_) => return Ok(None),
};
let new_column = Column::new(name, new_index);
return Ok(Some(Arc::new(new_column)));
}
return Ok(Some(Arc::clone(&predicate)));
}

let mut new_children: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
for child in children {
let updated_child = match update_predicate_index(Arc::clone(child), schema)? {
Some(child) => child,
None => return Ok(None),
};
new_children.push(updated_child);
}

let updated_predicate = predicate
.with_new_children(new_children)
.map_err(Into::<ArrowError>::into)?;

Ok(Some(updated_predicate))
}
Loading