Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,67 @@ fn test_filter_pushdown_through_union() {
);
}

#[test]
fn test_filter_pushdown_through_union_mixed_support() {
// Test case where one child supports filter pushdown and one doesn't
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();

let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();

let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
Comment on lines +1825 to +1836
Copy link
Contributor Author

@haohuaijin haohuaijin Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is main purpose for this pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one really seems like it should be reproducible from an SLT test

Copy link
Contributor Author

@haohuaijin haohuaijin Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try, maybe one memory and one parquet file can reproduce

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"
);
}

#[test]
fn test_filter_pushdown_through_union_does_not_support() {
// Test case where one child supports filter pushdown and one doesn't
let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();

let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();

let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- UnionExec
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- UnionExec
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
Comment on lines +1855 to +1867
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think even for this case, pushdown filter do not have some bad effect.

"
);
}

/// Schema:
/// a: String
/// b: String
Expand Down
84 changes: 82 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ use crate::execution_plan::{
InvariantLevel, boundedness_from_children, check_default_invariants,
emission_type_from_children,
};
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use crate::filter::FilterExec;
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDown,
};
use crate::metrics::BaselineMetrics;
use crate::projection::{ProjectionExec, make_with_child};
use crate::stream::ObservedStream;
Expand All @@ -49,7 +53,9 @@ use datafusion_common::{
Result, assert_or_internal_err, exec_err, internal_datafusion_err,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, calculate_union};
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, calculate_union, conjunction,
};

use futures::Stream;
use itertools::Itertools;
Expand Down Expand Up @@ -370,6 +376,80 @@ impl ExecutionPlan for UnionExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}

fn handle_child_pushdown_result(
&self,
phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// For non-Pre phase, use default behavior
if !matches!(phase, FilterPushdownPhase::Pre) {
Comment on lines +386 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the Pre / Post phase need different behavior?

Copy link
Contributor Author

@haohuaijin haohuaijin Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm thinking the purpose for this pr can only happen in the pre phase, post phase is for dynamic filter, seem like not related, so i keep the default behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The intuition here is to let the creator of the filter decide what to do with it.
I don't like that it makes assumptions about the implementation / what the creators of the filter want to do, but I don't see a better way to handle this.
I don't think forcing creation of the FilterExec would be good at least as things currently stand.

But we should add a comment explaining this.

return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}

// UnionExec needs specialized filter pushdown handling when children have
// heterogeneous pushdown support. Without this, when some children support
// pushdown and others don't, the default behavior would leave FilterExec
// above UnionExec, re-applying filters to outputs of all children—including
// those that already applied the filters via pushdown. This specialized
// implementation adds FilterExec only to children that don't support
// pushdown, avoiding redundant filtering and improving performance.
//
// Example: Given Child1 (no pushdown support) and Child2 (has pushdown support)
// Default behavior: This implementation:
// FilterExec UnionExec
// UnionExec FilterExec
// Child1 Child1
// Child2(filter) Child2(filter)

// Collect unsupported filters for each child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we could make UnionExec transparent similar to CoalesceBatchesExec.

Help me understand why we are adding the FilterExec here.
My guess is that you are trying to address the case of Child2 supporting pushdown but Child1 not supporting it.
Without this specialized implementation we would get:

FilterExec
  UnionExec
     Child1
     Child2

i.e. no changes to the plan, not incorrect but we are applying filters to the output of Child2 that are unnecessary (it already applied these filters)

With this logic we get:

UnionExec
  FilterExec
    Child1
  Child2

Which skips re-applying filters to the output of Child2.

Is this interpretation correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, thank you for such good explain👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add something along these lines as a comment justifying the added complexity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in ded7d9b

let mut unsupported_filters_per_child = vec![Vec::new(); self.inputs.len()];
for parent_filter_result in child_pushdown_result.parent_filters.iter() {
for (child_idx, &child_result) in
parent_filter_result.child_results.iter().enumerate()
{
if matches!(child_result, PushedDown::No) {
unsupported_filters_per_child[child_idx]
.push(Arc::clone(&parent_filter_result.filter));
}
}
}

// Wrap children that have unsupported filters with FilterExec
let mut new_children = self.inputs.clone();
for (child_idx, unsupported_filters) in
unsupported_filters_per_child.iter().enumerate()
{
if !unsupported_filters.is_empty() {
let combined_filter = conjunction(unsupported_filters.clone());
new_children[child_idx] = Arc::new(FilterExec::try_new(
combined_filter,
Arc::clone(&self.inputs[child_idx]),
)?);
}
}

// Check if any children were modified
let children_modified = new_children
.iter()
.zip(self.inputs.iter())
.any(|(new, old)| !Arc::ptr_eq(new, old));

let all_filters_pushed =
vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()];
let propagation = if children_modified {
let updated_node = UnionExec::try_new(new_children)?;
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
.with_updated_node(updated_node)
} else {
FilterPushdownPropagation::with_parent_pushdown_result(all_filters_pushed)
};

// Report all parent filters as supported since we've ensured they're applied
// on all children (either pushed down or via FilterExec)
Ok(propagation)
}
}

/// Combines multiple input streams by interleaving them.
Expand Down
63 changes: 63 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,66 @@ logical_plan
physical_plan
01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]

###
# Test filter pushdown through UNION with mixed support
# This tests the case where one child supports filter pushdown (parquet) and one doesn't (memory table)
###

# enable filter pushdown
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

statement ok
set datafusion.optimizer.max_passes = 0;

# Create memory table with matching schema (a: VARCHAR, b: BIGINT)
statement ok
CREATE TABLE t_union_mem(a VARCHAR, b BIGINT) AS VALUES ('qux', 4), ('quux', 5);

# Create parquet table with matching schema
statement ok
CREATE EXTERNAL TABLE t_union_parquet(a VARCHAR, b BIGINT) STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet';

# Query results combining memory table and Parquet with filter
query I rowsort
SELECT b FROM (
SELECT a, b FROM t_union_mem
UNION ALL
SELECT a, b FROM t_union_parquet
) WHERE b > 2;
----
3
4
5
50

# Explain the union query - filter should be pushed to parquet but not memory table
query TT
EXPLAIN SELECT b FROM (
SELECT a, b FROM t_union_mem
UNION ALL
SELECT a, b FROM t_union_parquet
) WHERE b > 2;
----
logical_plan
01)Projection: b
02)--Filter: b > Int64(2)
03)----Union
04)------Projection: t_union_mem.a, t_union_mem.b
05)--------TableScan: t_union_mem
06)------Projection: t_union_parquet.a, t_union_parquet.b
07)--------TableScan: t_union_parquet
physical_plan
01)UnionExec
02)--FilterExec: b@0 > 2
03)----DataSourceExec: partitions=1, partition_sizes=[1]
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet]]}, projection=[b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]

# Clean up union test tables
statement ok
DROP TABLE t_union_mem;

statement ok
DROP TABLE t_union_parquet;