Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::{
};
use datafusion_catalog::memory::DataSourceExec;
use datafusion_common::config::ConfigOptions;
use datafusion_common::JoinType;
use datafusion_datasource::{
file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, PartitionedFile,
};
Expand All @@ -55,6 +56,7 @@ use datafusion_physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::FilterExec,
joins::{HashJoinExec, PartitionMode},
repartition::RepartitionExec,
sorts::sort::SortExec,
ExecutionPlan,
Expand Down Expand Up @@ -2625,3 +2627,52 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
];
assert_batches_eq!(&expected, &batches);
}

#[test]
fn test_hash_join_dynamic_filter_with_unsupported_scan() {
// This test verifies that HashJoin doesn't create dynamic filters when the probe side
// returns Unsupported (e.g., a scan that can't use filters at all)
let schema = schema();

// Build side
let build_scan = TestScanBuilder::new(schema.clone())
.with_support(true)
.build();

// Probe side: scan that doesn't support pushdown
let probe_scan = TestScanBuilder::new(schema.clone())
.with_support(false) // This will create a test node that doesn't support filter pushdown
.build();

let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
vec![(
Arc::new(Column::new_with_schema("a", &schema).unwrap()),
Arc::new(Column::new_with_schema("a", &schema).unwrap()),
)],
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

let config = SessionConfig::new();
let rule = FilterPushdown::new_post_optimization();
let ctx = OptimizerContext::new(config.clone());
let plan = rule.optimize_plan(join, &ctx).unwrap();

// Optimized plan should not have a DynamicFilter placeholder in the probe node.
insta::assert_snapshot!(
format_plan_for_test(&plan),
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
"
);
}
11 changes: 6 additions & 5 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ impl FileSource for TestSource {
..self.clone()
});
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::Yes; filters.len()],
vec![PushedDown::Exact; filters.len()],
)
.with_updated_node(new_node))
} else {
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
vec![PushedDown::Unsupported; filters.len()],
))
}
}
Expand Down Expand Up @@ -538,8 +538,8 @@ impl ExecutionPlan for TestNode {
let first_pushdown_result = self_pushdown_result[0].clone();

match &first_pushdown_result.discriminant {
PushedDown::No => {
// We have a filter to push down
PushedDown::Unsupported | PushedDown::Inexact => {
// We have a filter that wasn't exactly pushed down, so we need to apply it
let new_child = FilterExec::try_new(
Arc::clone(&first_pushdown_result.predicate),
Arc::clone(&self.input),
Expand All @@ -551,7 +551,8 @@ impl ExecutionPlan for TestNode {
res.updated_node = Some(Arc::new(new_self) as Arc<dyn ExecutionPlan>);
Ok(res)
}
PushedDown::Yes => {
PushedDown::Exact => {
// Filter was exactly pushed down, no need to apply it again
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
Ok(res)
}
Expand Down
26 changes: 20 additions & 6 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ impl FileSource for ParquetSource {
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
// When pushdown_filters is true, filters will be used for row-level filtering
// When pushdown_filters is false, filters will be used for stats pruning only
// We'll mark them as supported here and adjust later based on pushdown_filters
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
Expand All @@ -737,19 +740,19 @@ impl FileSource for ParquetSource {
.collect();
if filters
.iter()
.all(|f| matches!(f.discriminant, PushedDown::No))
.all(|f| matches!(f.discriminant, PushedDown::Unsupported))
{
// No filters can be pushed down, so we can just return the remaining filters
// and avoid replacing the source in the physical plan.
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
vec![PushedDown::Unsupported; filters.len()],
));
}
let allowed_filters = filters
.iter()
.filter_map(|f| match f.discriminant {
PushedDown::Yes => Some(Arc::clone(&f.predicate)),
PushedDown::No => None,
PushedDown::Exact => Some(Arc::clone(&f.predicate)),
PushedDown::Inexact | PushedDown::Unsupported => None,
})
.collect_vec();
let predicate = match source.predicate {
Expand All @@ -762,13 +765,24 @@ impl FileSource for ParquetSource {
source = source.with_pushdown_filters(pushdown_filters);
let source = Arc::new(source);
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
// even though we updated the predicate to include the filters (they will be used for stats pruning only).
// In this case, we return Inexact for filters that can be pushed down (used for stats pruning)
// and Unsupported for filters that cannot be pushed down at all.
if !pushdown_filters {
let result_discriminants = filters
.iter()
.map(|f| match f.discriminant {
PushedDown::Exact => PushedDown::Inexact, // Downgrade to Inexact (stats pruning only)
PushedDown::Unsupported => PushedDown::Unsupported, // Keep as Unsupported
_ => unreachable!("Only Exact or Unsupported expected at this point"),
})
.collect();
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
result_discriminants,
)
.with_updated_node(source));
}
// If pushdown_filters is true, we return the original discriminants (Exact for supported filters)
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
filters.iter().map(|f| f.discriminant).collect(),
)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ pub trait FileSource: Send + Sync {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
// Default implementation: don't support filter pushdown
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
vec![PushedDown::Unsupported; filters.len()],
))
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ pub trait DataSource: Send + Sync + Debug {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
// Default implementation: don't support filter pushdown
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
vec![PushedDown::Unsupported; filters.len()],
))
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ impl Default for SessionConfig {
}

/// A type map for storing extensions.
///
///
/// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one
/// will be kept.
///
///
/// Extensions are opaque objects that are unknown to DataFusion itself but can be downcast by optimizer rules,
/// execution plans, or other components that have access to the session config.
/// They provide a flexible way to attach extra data or behavior to the session config.
Expand Down
13 changes: 7 additions & 6 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,10 @@ fn push_down_filters(
let mut all_predicates = self_filtered.items().to_vec();

// Apply second filter pass: collect indices of parent filters that can be pushed down
let parent_filters_for_child = parent_filtered
.chain_filter_slice(&parent_filters, |filter| {
matches!(filter.discriminant, PushedDown::Yes)
// Push down filters that are either Exact or Inexact (both mean the child will use them)
let parent_filters_for_child =
parent_filtered.chain_filter_slice(&parent_filters, |filter| {
matches!(filter.discriminant, PushedDown::Exact | PushedDown::Inexact)
});

// Add the filtered parent predicates to all_predicates
Expand Down Expand Up @@ -549,7 +550,7 @@ fn push_down_filters(
.collect_vec();
// Map the results from filtered self filters back to their original positions using FilteredVec
let mapped_self_results =
self_filtered.map_results_to_original(all_filters, PushedDown::No);
self_filtered.map_results_to_original(all_filters, PushedDown::Unsupported);

// Wrap each result with its corresponding expression
let self_filter_results: Vec<_> = mapped_self_results
Expand All @@ -562,7 +563,7 @@ fn push_down_filters(

// Start by marking all parent filters as unsupported for this child
for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
parent_filter_pushdown_support.push(PushedDown::No);
parent_filter_pushdown_support.push(PushedDown::Unsupported);
assert_eq!(
parent_filter_pushdown_support.len(),
child_idx + 1,
Expand All @@ -571,7 +572,7 @@ fn push_down_filters(
}
// Map results from pushed-down filters back to original parent filter indices
let mapped_parent_results = parent_filters_for_child
.map_results_to_original(parent_filters, PushedDown::No);
.map_results_to_original(parent_filters, PushedDown::Unsupported);

// Update parent_filter_pushdown_supports with the mapped results
// mapped_parent_results already has the results at their original indices
Expand Down
14 changes: 9 additions & 5 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,19 +521,20 @@ impl ExecutionPlan for FilterExec {
if !matches!(phase, FilterPushdownPhase::Pre) {
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
// We absorb any parent filters that were not handled by our children
// We absorb any parent filters that were not handled by our children with exact filtering
let unsupported_parent_filters =
child_pushdown_result.parent_filters.iter().filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
// Keep filters that weren't handled with exact filtering (Inexact or Unsupported)
(!matches!(f.all(), PushedDown::Exact)).then_some(Arc::clone(&f.filter))
});
let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
.expect("we have exactly one child")
.iter()
.filter_map(|f| match f.discriminant {
PushedDown::Yes => None,
PushedDown::No => Some(&f.predicate),
PushedDown::Exact => None,
PushedDown::Inexact | PushedDown::Unsupported => Some(&f.predicate),
})
.cloned();

Expand Down Expand Up @@ -592,8 +593,11 @@ impl ExecutionPlan for FilterExec {
Some(Arc::new(new) as _)
};

// FilterExec always applies filters exactly.
// Even if the child returned Inexact or Unsupported, we absorb those filters
// and apply them, so from the parent's perspective, the filter is handled exactly.
Ok(FilterPushdownPropagation {
filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
filters: vec![PushedDown::Exact; child_pushdown_result.parent_filters.len()],
updated_node,
})
}
Expand Down
Loading