Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ itertools = { workspace = true, features = ["use_std"] }
log = { workspace = true }
parking_lot = { workspace = true }
paste = "^1.0"
petgraph = "0.8.2"
petgraph = "0.8.3"
tokio = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand All @@ -77,3 +78,6 @@ name = "is_null"
[[bench]]
harness = false
name = "binary_op"

[package.metadata.cargo-machete]
ignored = ["half"]
102 changes: 100 additions & 2 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use parking_lot::RwLock;
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
use tokio::sync::watch;

use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -27,6 +28,24 @@ use datafusion_common::{
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterState {
/// Filter is in progress and may receive more updates.
InProgress { generation: u64 },
/// Filter is complete and will not receive further updates.
Complete { generation: u64 },
}

impl FilterState {
fn generation(&self) -> u64 {
match self {
FilterState::InProgress { generation }
| FilterState::Complete { generation } => *generation,
}
}
}

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
Expand All @@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
/// The source of dynamic filters.
inner: Arc<RwLock<Inner>>,
/// Broadcasts filter state (updates and completion) to all waiters.
state_watch: watch::Sender<FilterState>,
/// For testing purposes track the data type and nullability to make sure they don't change.
/// If they do, there's a bug in the implementation.
/// But this can have overhead in production, so it's only included in our tests.
Expand All @@ -57,6 +78,10 @@ struct Inner {
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
generation: u64,
expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
}

impl Inner {
Expand All @@ -66,6 +91,7 @@ impl Inner {
// This is not currently used anywhere but it seems useful to have this simple distinction.
generation: 1,
expr,
is_complete: false,
}
}

Expand Down Expand Up @@ -135,10 +161,12 @@ impl DynamicFilterPhysicalExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
inner: Arc<dyn PhysicalExpr>,
) -> Self {
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
Self {
children,
remapped_children: None, // Initially no remapped children
inner: Arc::new(RwLock::new(Inner::new(inner))),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
Expand Down Expand Up @@ -181,7 +209,7 @@ impl DynamicFilterPhysicalExpr {
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
}

/// Update the current expression.
/// Update the current expression and notify all waiters.
/// Any children of this expression must be a subset of the original children
/// passed to the constructor.
/// This should be called e.g.:
Expand All @@ -200,12 +228,67 @@ impl DynamicFilterPhysicalExpr {

// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
let new_generation = current.generation + 1;
*current = Inner {
generation: current.generation + 1,
generation: new_generation,
expr: new_expr,
is_complete: current.is_complete,
};
drop(current); // Release the lock before broadcasting

// Broadcast the new state to all waiters
let _ = self.state_watch.send(FilterState::InProgress {
generation: new_generation,
});
Ok(())
}

/// Mark this dynamic filter as complete and broadcast to all waiters.
///
/// This signals that all expected updates have been received.
/// Waiters using [`Self::wait_complete`] will be notified.
pub fn mark_complete(&self) {
let mut current = self.inner.write();
let current_generation = current.generation;
current.is_complete = true;
drop(current);

// Broadcast completion to all waiters
let _ = self.state_watch.send(FilterState::Complete {
generation: current_generation,
});
}

/// Wait asynchronously for any update to this filter.
///
/// This method will return when [`Self::update`] is called and the generation increases.
/// It does not guarantee that the filter is complete.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
let current_gen = rx.borrow_and_update().generation();

// Wait until generation increases
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
}

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}

let mut rx = self.state_watch.subscribe();
let _ = rx
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
.await;
}
}

impl PhysicalExpr for DynamicFilterPhysicalExpr {
Expand All @@ -229,6 +312,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
state_watch: self.state_watch.clone(),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
}))
Expand Down Expand Up @@ -488,4 +572,18 @@ mod test {
"Expected err when evaluate is called after changing the expression."
);
}

#[tokio::test]
async fn test_wait_complete_already_complete() {
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(42) as Arc<dyn PhysicalExpr>,
));

// Mark as complete immediately
dynamic_filter.mark_complete();

// wait_complete should return immediately
dynamic_filter.wait_complete().await;
}
}
99 changes: 99 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4500,4 +4500,103 @@ mod tests {
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}

/// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
#[tokio::test]
async fn test_hash_join_marks_filter_complete() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 6]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);

let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
});

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// After the join completes, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;

Ok(())
}

/// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
#[tokio::test]
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
// Empty left side (build side)
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);

let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
});

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// Even with empty build side, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;

Ok(())
}
}
10 changes: 7 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,13 @@ impl SharedBoundsAccumulator {
// Critical synchronization point: Only update the filter when ALL partitions are complete
// Troubleshooting: If you see "completed > total_partitions", check partition
// count calculation in new_from_partition_mode() - it may not match actual execution calls
if completed == total_partitions && !inner.bounds.is_empty() {
let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?;
self.dynamic_filter.update(filter_expr)?;
if completed == total_partitions {
if !inner.bounds.is_empty() {
let filter_expr =
self.create_filter_from_partition_bounds(&inner.bounds)?;
self.dynamic_filter.update(filter_expr)?;
}
self.dynamic_filter.mark_complete();
}

Ok(())
Expand Down
Loading
Loading