Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 210 additions & 28 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::filter_pushdown::{
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
ProbeSideBoundsAccumulator,
};
use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
use crate::joins::utils::{
Expand Down Expand Up @@ -463,12 +464,25 @@ impl HashJoinExec {
})
}

fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
// Extract the right-side keys (probe side keys) from the `on` clauses
// Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
fn join_exprs_for_side(on: &JoinOn, pushdown_side: JoinSide) -> Vec<PhysicalExprRef> {
match pushdown_side {
JoinSide::Left => on.iter().map(|(l, _)| Arc::clone(l)).collect(),
JoinSide::Right => on.iter().map(|(_, r)| Arc::clone(r)).collect(),
JoinSide::None => vec![],
}
}

fn create_dynamic_filter(
on: &JoinOn,
pushdown_side: JoinSide,
) -> Result<Arc<DynamicFilterPhysicalExpr>> {
if pushdown_side == JoinSide::None {
return internal_err!("dynamic filter side must be specified");
}
// Extract the join key expressions from the side that will receive the dynamic filter
let keys = Self::join_exprs_for_side(on, pushdown_side);
// Initialize with a placeholder expression (true) that will be updated when the hash table is built
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
Ok(Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true))))
}

/// left (build) side which gets hashed
Expand Down Expand Up @@ -527,12 +541,6 @@ impl HashJoinExec {
]
}

/// Get probe side information for the hash join.
pub fn probe_side() -> JoinSide {
// In current implementation right side is always probe side.
JoinSide::Right
}

/// Return whether the join contains a projection
pub fn contains_projection(&self) -> bool {
self.projection.is_some()
Expand Down Expand Up @@ -578,7 +586,7 @@ impl HashJoinExec {
&join_type,
Arc::clone(&schema),
&Self::maintains_input_order(join_type),
Some(Self::probe_side()),
Some(find_filter_pushdown_sides(join_type)),
on,
)?;

Expand Down Expand Up @@ -780,6 +788,26 @@ impl DisplayAs for HashJoinExec {
}
}

fn find_filter_pushdown_sides(join_type: JoinType) -> JoinSide {
// This represents the side that will receive the dynamic filter and apply the bounds.
// The other side will be the build side where we collect the bounds from.
// Bounds accumulator only collect join key range from ON clause.
match join_type {
JoinType::Inner => JoinSide::Right,
JoinType::Left => JoinSide::Right,
JoinType::Right => JoinSide::Left,
JoinType::LeftSemi => JoinSide::Right,
JoinType::RightSemi => JoinSide::Left,
JoinType::LeftAnti => JoinSide::Right,
JoinType::RightAnti => JoinSide::Left,
JoinType::LeftMark => JoinSide::Right,
JoinType::RightMark => JoinSide::Left,
// Full outer join cannot have dynamic filter pushdown because all rows on both
// sides are preserved.
JoinType::Full => JoinSide::None,
}
}

impl ExecutionPlan for HashJoinExec {
fn name(&self) -> &'static str {
"HashJoinExec"
Expand Down Expand Up @@ -929,6 +957,9 @@ impl ExecutionPlan for HashJoinExec {
}

let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
let probe_side = find_filter_pushdown_sides(self.join_type);
let report_build_bounds =
enable_dynamic_filter_pushdown && probe_side == JoinSide::Right;

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
Expand All @@ -946,7 +977,7 @@ impl ExecutionPlan for HashJoinExec {
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
report_build_bounds,
))
})?,
PartitionMode::Partitioned => {
Expand All @@ -964,7 +995,7 @@ impl ExecutionPlan for HashJoinExec {
reservation,
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown,
report_build_bounds,
))
}
PartitionMode::Auto => {
Expand All @@ -982,18 +1013,16 @@ impl ExecutionPlan for HashJoinExec {
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
// Determine which side will receive the dynamic filter
let probe_side = find_filter_pushdown_sides(self.join_type);
let on_expressions = Self::join_exprs_for_side(&self.on, probe_side);
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
on_expressions,
))
})))
})
Expand All @@ -1004,6 +1033,7 @@ impl ExecutionPlan for HashJoinExec {
// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
let right_stream = self.right.execute(partition, context)?;
let right_schema = right_stream.schema();

// update column indices to reflect the projection
let column_indices_after_projection = match &self.projection {
Expand All @@ -1020,6 +1050,23 @@ impl ExecutionPlan for HashJoinExec {
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();

let probe_bounds_accumulators =
if enable_dynamic_filter_pushdown && probe_side == JoinSide::Left {
Some(
on_right
.iter()
.map(|expr| {
ProbeSideBoundsAccumulator::try_new(
Arc::clone(expr),
&right_schema,
)
})
.collect::<Result<Vec<_>>>()?,
)
} else {
None
};

Ok(Box::pin(HashJoinStream::new(
partition,
self.schema(),
Expand All @@ -1037,6 +1084,8 @@ impl ExecutionPlan for HashJoinExec {
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
report_build_bounds,
probe_bounds_accumulators,
self.mode,
)))
}
Expand Down Expand Up @@ -1126,7 +1175,7 @@ impl ExecutionPlan for HashJoinExec {
}

// Get basic filter descriptions for both children
let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
let mut left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
&parent_filters,
self.left(),
)?;
Expand All @@ -1139,9 +1188,24 @@ impl ExecutionPlan for HashJoinExec {
if matches!(phase, FilterPushdownPhase::Post)
&& config.optimizer.enable_join_dynamic_filter_pushdown
{
// Add actual dynamic filter to right side (probe side)
let dynamic_filter = Self::create_dynamic_filter(&self.on);
right_child = right_child.with_self_filter(dynamic_filter);
let pushdown_side = find_filter_pushdown_sides(self.join_type);
let dynamic_filter = Self::create_dynamic_filter(&self.on, pushdown_side)?;
match pushdown_side {
JoinSide::None => {
// A join type that preserves both sides (e.g. FULL) cannot
// leverage dynamic filters. Return early before attempting to
// create one.
return Ok(FilterDescription::new()
.with_child(left_child)
.with_child(right_child));
}
JoinSide::Left => {
left_child = left_child.with_self_filter(dynamic_filter);
}
JoinSide::Right => {
right_child = right_child.with_self_filter(dynamic_filter);
}
}
}

Ok(FilterDescription::new()
Expand All @@ -1159,7 +1223,8 @@ impl ExecutionPlan for HashJoinExec {
// non-inner joins in `gather_filters_for_pushdown`.
// However it's a cheap check and serves to inform future devs touching this function that they need to be really
// careful pushing down filters through non-inner joins.
if self.join_type != JoinType::Inner {
let pushdown_side = find_filter_pushdown_sides(self.join_type);
if pushdown_side == JoinSide::None {
// Other types of joins can support *some* filters, but restrictions are complex and error prone.
// For now we don't support them.
// See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
Expand All @@ -1170,9 +1235,13 @@ impl ExecutionPlan for HashJoinExec {

let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
// We expect 0 or 1 self filters
if let Some(filter) = right_child_self_filters.first() {
let self_filters = match pushdown_side {
JoinSide::Left => &child_pushdown_result.self_filters[0],
JoinSide::Right => &child_pushdown_result.self_filters[1],
JoinSide::None => unreachable!(),
};
// We expect 0 or 1 self filters
if let Some(filter) = self_filters.first() {
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
let predicate = Arc::clone(&filter.predicate);
Expand Down Expand Up @@ -4518,4 +4587,117 @@ mod tests {
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}

#[test]
fn create_dynamic_filter_none_side_returns_error() {
let on: JoinOn = vec![];
let err = HashJoinExec::create_dynamic_filter(&on, JoinSide::None).unwrap_err();
assert_contains!(err.to_string(), "dynamic filter side must be specified");
}

#[test]
fn full_join_skips_dynamic_filter_creation() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_physical_expr::expressions::col;

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![1]))],
)?;
let left =
TestMemoryExec::try_new(&[vec![batch.clone()]], Arc::clone(&schema), None)?;
let right = TestMemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None)?;

let on = vec![(col("a", &left.schema())?, col("a", &right.schema())?)];
let join = HashJoinExec::try_new(
Arc::new(left),
Arc::new(right),
on,
None,
&JoinType::Full,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
)?;

let mut config = ConfigOptions::default();
config.optimizer.enable_dynamic_filter_pushdown = true;

let desc =
join.gather_filters_for_pushdown(FilterPushdownPhase::Post, vec![], &config)?;
assert!(desc.self_filters().iter().all(|f| f.is_empty()));
Ok(())
}

// This test verifies that when a HashJoinExec is created with a dynamic filter
// targeting the left side, the join build phase collects min/max bounds from
// the build-side input and reports them back into the dynamic filter for the
// other side. Concretely:
// - Left input has values [1, 3, 5]
// - Right (probe) input has values [2, 4, 6]
// - JoinType::Right is used so that the dynamic filter is attached to the left side expression.
// - After fully executing the join, the dynamic filter should be updated
// with the observed bounds `a@0 >= 2 AND a@0 <= 6` (min=2, max=6).
// The test asserts that HashJoinExec correctly accumulates and reports these
// bounds so downstream consumers can use the dynamic predicate for pruning.
#[tokio::test]
async fn reports_bounds_when_dynamic_filter_side_left() -> Result<()> {
use datafusion_physical_expr::expressions::col;

let task_ctx = Arc::new(TaskContext::default());

let left_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let left_batch = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![Arc::new(Int32Array::from(vec![1, 3, 5]))],
)?;
let left = TestMemoryExec::try_new(&[vec![left_batch]], left_schema, None)?;

let right_schema =
Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
let right_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![Arc::new(Int32Array::from(vec![2, 4, 6]))],
)?;
let right = TestMemoryExec::try_new(&[vec![right_batch]], right_schema, None)?;

let on = vec![(col("a", &left.schema())?, col("b", &right.schema())?)];

let test_cases = vec![
(JoinType::Right, JoinSide::Left, "a@0 >= 2 AND a@0 <= 6"),
(JoinType::Left, JoinSide::Right, "b@0 >= 1 AND b@0 <= 5"),
];
for (join_type, probe_side, expected_filter) in test_cases {
let mut join_exec = HashJoinExec::try_new(
Arc::new(left.clone()),
Arc::new(right.clone()),
on.clone(),
None,
&join_type,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
)?;

let dynamic_filter: Arc<DynamicFilterPhysicalExpr> =
HashJoinExec::create_dynamic_filter(&join_exec.on, probe_side)?;
join_exec.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: Arc::clone(&dynamic_filter),
bounds_accumulator: OnceLock::new(),
});

let stream = join_exec.execute(0, Arc::clone(&task_ctx))?;
let _batches: Vec<RecordBatch> = stream.try_collect().await?;

assert_eq!(
format!("{}", dynamic_filter.current().unwrap()),
expected_filter
);
}

Ok(())
}
}
Loading
Loading