diff --git a/Cargo.lock b/Cargo.lock index 1afa1e349167..e0f811f16f1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2494,6 +2494,7 @@ dependencies = [ "petgraph 0.8.3", "rand 0.9.2", "rstest", + "tokio", ] [[package]] diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 953a46929c39..69670941ad4f 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } paste = "^1.0" petgraph = "0.8.3" +tokio = { workspace = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } @@ -79,3 +80,6 @@ name = "is_null" [[bench]] harness = false name = "binary_op" + +[package.metadata.cargo-machete] +ignored = ["half"] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 964a193db833..43a242472bec 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -17,6 +17,7 @@ use parking_lot::RwLock; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; +use tokio::sync::watch; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; @@ -27,6 +28,24 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +/// State of a dynamic filter, tracking both updates and completion. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterState { + /// Filter is in progress and may receive more updates. + InProgress { generation: u64 }, + /// Filter is complete and will not receive further updates. + Complete { generation: u64 }, +} + +impl FilterState { + fn generation(&self) -> u64 { + match self { + FilterState::InProgress { generation } + | FilterState::Complete { generation } => *generation, + } + } +} + /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also @@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr { remapped_children: Option>>, /// The source of dynamic filters. inner: Arc>, + /// Broadcasts filter state (updates and completion) to all waiters. + state_watch: watch::Sender, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -57,6 +78,10 @@ struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. generation: u64, expr: Arc, + /// Flag for quick synchronous check if filter is complete. + /// This is redundant with the watch channel state, but allows us to return immediately + /// from `wait_complete()` without subscribing if already complete. + is_complete: bool, } impl Inner { @@ -66,6 +91,7 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, + is_complete: false, } } @@ -134,10 +160,12 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { + let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); Self { children, remapped_children: None, // Initially no remapped children inner: Arc::new(RwLock::new(Inner::new(inner))), + state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } @@ -185,7 +213,7 @@ impl DynamicFilterPhysicalExpr { Self::remap_children(&self.children, self.remapped_children.as_ref(), expr) } - /// Update the current expression. + /// Update the current expression and notify all waiters. /// Any children of this expression must be a subset of the original children /// passed to the constructor. /// This should be called e.g.: @@ -204,13 +232,68 @@ impl DynamicFilterPhysicalExpr { // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); + let new_generation = current.generation + 1; *current = Inner { - generation: current.generation + 1, + generation: new_generation, expr: new_expr, + is_complete: current.is_complete, }; + drop(current); // Release the lock before broadcasting + + // Broadcast the new state to all waiters + let _ = self.state_watch.send(FilterState::InProgress { + generation: new_generation, + }); Ok(()) } + /// Mark this dynamic filter as complete and broadcast to all waiters. + /// + /// This signals that all expected updates have been received. + /// Waiters using [`Self::wait_complete`] will be notified. + pub fn mark_complete(&self) { + let mut current = self.inner.write(); + let current_generation = current.generation; + current.is_complete = true; + drop(current); + + // Broadcast completion to all waiters + let _ = self.state_watch.send(FilterState::Complete { + generation: current_generation, + }); + } + + /// Wait asynchronously for any update to this filter. + /// + /// This method will return when [`Self::update`] is called and the generation increases. + /// It does not guarantee that the filter is complete. + pub async fn wait_update(&self) { + let mut rx = self.state_watch.subscribe(); + // Get the current generation + let current_gen = rx.borrow_and_update().generation(); + + // Wait until generation increases + let _ = rx.wait_for(|state| state.generation() > current_gen).await; + } + + /// Wait asynchronously until this dynamic filter is marked as complete. + /// + /// This method returns immediately if the filter is already complete. + /// Otherwise, it waits until [`Self::mark_complete`] is called. + /// + /// Unlike [`Self::wait_update`], this method guarantees that when it returns, + /// the filter is fully complete with no more updates expected. + pub async fn wait_complete(&self) { + if self.inner.read().is_complete { + return; + } + + let mut rx = self.state_watch.subscribe(); + let _ = rx + .wait_for(|state| matches!(state, FilterState::Complete { .. })) + .await; + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -253,6 +336,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), + state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), })) @@ -509,4 +593,18 @@ mod test { "Expected err when evaluate is called after changing the expression." ); } + + #[tokio::test] + async fn test_wait_complete_already_complete() { + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // Mark as complete immediately + dynamic_filter.mark_complete(); + + // wait_complete should return immediately + dynamic_filter.wait_complete().await; + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 109ea479e488..680e4368ae70 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4486,4 +4486,103 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table. + #[tokio::test] + async fn test_hash_join_marks_filter_complete() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 6]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // After the join completes, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } + + /// This test verifies that the dynamic filter is marked as complete even when the build side is empty. + #[tokio::test] + async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + // Empty left side (build side) + let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // Even with empty build side, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 7cf192bdf6c1..cb727f40a20a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -410,6 +410,7 @@ impl SharedBuildAccumulator { self.dynamic_filter.update(case_expr)?; } } + self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 0b5ab784df67..96356e50d7ec 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -591,10 +591,13 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, - filter: _, + filter, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop + // Mark the dynamic filter as complete now that TopK processing is finished. + filter.read().expr().mark_complete(); + // break into record batches as needed let mut batches = vec![]; if let Some(mut batch) = heap.emit()? { @@ -1198,4 +1201,52 @@ mod tests { Ok(()) } + + /// This test verifies that the dynamic filter is marked as complete after TopK processing finishes. + #[tokio::test] + async fn test_topk_marks_filter_complete() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let sort_expr = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + + let full_expr = LexOrdering::from([sort_expr.clone()]); + let prefix = vec![sort_expr]; + + // Create a dummy runtime environment and metrics + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a dynamic filter that we'll check for completion + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create a TopK instance + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + prefix, + full_expr, + 2, + 10, + runtime, + &metrics, + Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))), + )?; + + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)])); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?; + topk.insert_batch(batch)?; + + // Call emit to finish TopK processing + let _results: Vec<_> = topk.emit()?.try_collect().await?; + + // After emit is called, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } }