diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 8f430f7753ef..17b98b53051a 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -34,6 +34,7 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_catalog::memory::DataSourceExec; +use datafusion_common::JoinType; use datafusion_common::config::ConfigOptions; use datafusion_datasource::{ PartitionedFile, file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, @@ -53,6 +54,7 @@ use datafusion_physical_expr::{ use datafusion_physical_optimizer::{ PhysicalOptimizerRule, filter_pushdown::FilterPushdown, }; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion_physical_plan::{ ExecutionPlan, aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, @@ -3145,6 +3147,425 @@ fn test_pushdown_with_empty_group_by() { ); } +#[test] +fn test_pushdown_through_aggregate_with_reordered_input_columns() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + + // Reorder scan output from (a, b, c) to (c, a, b) + let reordered_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Float64, false), + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let projection = Arc::new( + ProjectionExec::try_new( + vec![ + (col("c", &schema()).unwrap(), "c".to_string()), + (col("a", &schema()).unwrap(), "a".to_string()), + (col("b", &schema()).unwrap(), "b".to_string()), + ], + scan, + ) + .unwrap(), + ); + + let aggregate_expr = vec![ + AggregateExprBuilder::new( + count_udaf(), + vec![col("c", &reordered_schema).unwrap()], + ) + .schema(reordered_schema.clone()) + .alias("cnt") + .build() + .map(Arc::new) + .unwrap(), + ]; + + // Group by a@1, b@2 (input indices in reordered schema) + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a", &reordered_schema).unwrap(), "a".to_string()), + (col("b", &reordered_schema).unwrap(), "b".to_string()), + ]); + + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + aggregate_expr, + vec![None], + projection, + reordered_schema, + ) + .unwrap(), + ); + + // Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2) + // The grouping expr for b references input index 2, but output index is 1. + let agg_output_schema = aggregate.schema(); + let predicate = col_lit_predicate("b", "bar", &agg_output_schema); + let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap()); + + // The filter should be pushed down + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt], ordering_mode=PartiallySorted([1]) + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar + " + ); +} + +#[test] +fn test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + + let reordered_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Float64, false), + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let projection = Arc::new( + ProjectionExec::try_new( + vec![ + (col("c", &schema()).unwrap(), "c".to_string()), + (col("a", &schema()).unwrap(), "a".to_string()), + (col("b", &schema()).unwrap(), "b".to_string()), + ], + scan, + ) + .unwrap(), + ); + + let aggregate_expr = vec![ + AggregateExprBuilder::new( + count_udaf(), + vec![col("c", &reordered_schema).unwrap()], + ) + .schema(reordered_schema.clone()) + .alias("cnt") + .build() + .map(Arc::new) + .unwrap(), + ]; + + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a", &reordered_schema).unwrap(), "a".to_string()), + (col("b", &reordered_schema).unwrap(), "b".to_string()), + ]); + + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + aggregate_expr, + vec![None], + projection, + reordered_schema, + ) + .unwrap(), + ); + + // Filter on cnt@2 (aggregate result, not a grouping column) + let agg_output_schema = aggregate.schema(); + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("cnt", &agg_output_schema).unwrap()), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(5)))), + )) as Arc; + let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap()); + + // The filter is not pushed down. + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: cnt@2 > 5 + - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: cnt@2 > 5 + - AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + " + ); +} + +#[test] +fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + + let reordered_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Float64, false), + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let projection = Arc::new( + ProjectionExec::try_new( + vec![ + (col("c", &schema()).unwrap(), "c".to_string()), + (col("a", &schema()).unwrap(), "a".to_string()), + (col("b", &schema()).unwrap(), "b".to_string()), + ], + scan, + ) + .unwrap(), + ); + + let aggregate_expr = vec![ + AggregateExprBuilder::new( + count_udaf(), + vec![col("c", &reordered_schema).unwrap()], + ) + .schema(reordered_schema.clone()) + .alias("cnt") + .build() + .map(Arc::new) + .unwrap(), + ]; + + // Use grouping sets (a, b) and (b). + let group_by = PhysicalGroupBy::new( + vec![ + (col("a", &reordered_schema).unwrap(), "a".to_string()), + (col("b", &reordered_schema).unwrap(), "b".to_string()), + ], + vec![ + ( + Arc::new(Literal::new(ScalarValue::Utf8(None))), + "a".to_string(), + ), + ( + Arc::new(Literal::new(ScalarValue::Utf8(None))), + "b".to_string(), + ), + ], + vec![vec![false, false], vec![true, false]], + true, + ); + + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + aggregate_expr, + vec![None], + projection, + reordered_schema, + ) + .unwrap(), + ); + + let agg_output_schema = aggregate.schema(); + + // Filter on b (present in all grouping sets) should be pushed down + let predicate = col_lit_predicate("b", "bar", &agg_output_schema); + let plan = Arc::new(FilterExec::try_new(predicate, aggregate.clone()).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1]) + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar + " + ); + + // Filter on a (missing from second grouping set) should not be pushed down + let predicate = col_lit_predicate("a", "foo", &agg_output_schema); + let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = foo + - AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt] + - ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + " + ); +} + +/// Regression test for https://github.com/apache/datafusion/issues/21065. +/// +/// Given a plan similar to the following, ensure that the filter is pushed down +/// through an AggregateExec whose input columns are reordered by a ProjectionExec. +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input() { + // Build side + let build_batches = vec![record_batch!(("a", Utf8, ["h1", "h2"])).unwrap()]; + let build_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Probe side + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["h1", "h2", "h3", "h4"]), + ("value", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // ProjectionExec reorders (a, value) → (value, a) + let reordered_schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Float64, false), + Field::new("a", DataType::Utf8, false), + ])); + let projection = Arc::new( + ProjectionExec::try_new( + vec![ + (col("value", &probe_schema).unwrap(), "value".to_string()), + (col("a", &probe_schema).unwrap(), "a".to_string()), + ], + probe_scan, + ) + .unwrap(), + ); + + // AggregateExec: GROUP BY a@1, min(value@0) + let aggregate_expr = vec![ + AggregateExprBuilder::new( + min_udaf(), + vec![col("value", &reordered_schema).unwrap()], + ) + .schema(reordered_schema.clone()) + .alias("min_value") + .build() + .map(Arc::new) + .unwrap(), + ]; + let group_by = PhysicalGroupBy::new_single(vec![( + col("a", &reordered_schema).unwrap(), // a@1 in input + "a".to_string(), + )]); + + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + group_by, + aggregate_expr, + vec![None], + projection, + reordered_schema, + ) + .unwrap(), + ); + + // Aggregate output schema: (a@0, min_value@1) + let agg_output_schema = aggregate.schema(); + + // Join the build and probe side + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + aggregate, + vec![( + col("a", &build_schema).unwrap(), + col("a", &agg_output_schema).unwrap(), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // The HashJoin's dynamic filter on `a` should push + // through the aggregate and reach the probe-side DataSource. + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true + - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value] + - ProjectionExec: expr=[value@1 as value, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true + - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value] + - ProjectionExec: expr=[value@1 as value, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + " + ); + + // Actually execute the plan to verify the dynamic filter is populated + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + let session_config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + stream.next().await.unwrap().unwrap(); + + // After execution, the dynamic filter should be populated with values + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true + - AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value] + - ProjectionExec: expr=[value@1 as value, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ] + " + ); +} + #[test] fn test_pushdown_with_computed_grouping_key() { // Test filter pushdown with computed grouping expression diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 42df1a8b07cd..32dd7b4b4824 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1473,11 +1473,12 @@ impl ExecutionPlan for AggregateExec { // This optimization is NOT safe for filters on aggregated columns (like filtering on // the result of SUM or COUNT), as those require computing all groups first. - let grouping_columns: HashSet<_> = self - .group_by - .expr() - .iter() - .flat_map(|(expr, _)| collect_columns(expr)) + // Build grouping columns using output indices because parent filters reference the AggregateExec's output schema where grouping + // columns in the output schema. The grouping expressions reference + // input columns which may not match the output schema. + let output_schema = self.schema(); + let grouping_columns: HashSet<_> = (0..self.group_by.expr().len()) + .map(|i| Column::new(output_schema.field(i).name(), i)) .collect(); // Analyze each filter separately to determine if it can be pushed down @@ -1502,9 +1503,7 @@ impl ExecutionPlan for AggregateExec { let filter_column_indices: Vec = filter_columns .iter() .filter_map(|filter_col| { - self.group_by.expr().iter().position(|(expr, _)| { - collect_columns(expr).contains(filter_col) - }) + grouping_columns.get(filter_col).map(|col| col.index()) }) .collect();