From dffd77b3df2ff1768f1908ec05b4ffd5160f778a Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Tue, 18 Nov 2025 12:06:12 +0100 Subject: [PATCH 1/6] Add field to DynamicPhysicalExpr to indicate its complete --- .../src/expressions/dynamic_filters.rs | 19 +++++++++++++++++++ .../src/joins/hash_join/shared_bounds.rs | 1 + datafusion/physical-plan/src/topk/mod.rs | 5 ++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 964a193db833..dfaaccc6e0a5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -57,6 +57,8 @@ struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. generation: u64, expr: Arc, + /// Flag indicating whether all updates have been received and the filter is complete. + is_complete: bool, } impl Inner { @@ -66,6 +68,7 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, + is_complete: false, } } @@ -207,10 +210,26 @@ impl DynamicFilterPhysicalExpr { *current = Inner { generation: current.generation + 1, expr: new_expr, + is_complete: current.is_complete, }; Ok(()) } + /// Mark this dynamic filter as complete. + /// + /// This signals that all expected updates have been received. + pub fn mark_complete(&self) { + let mut current = self.inner.write(); + current.is_complete = true; + } + + /// Check if this dynamic filter is complete. + /// + /// Returns `true` if all expected updates have been received via [`Self::mark_complete`]. + pub fn is_complete(&self) -> bool { + self.inner.read().is_complete + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 25f7a0de31ac..e83f89e4e5d7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -300,6 +300,7 @@ impl SharedBoundsAccumulator { self.create_filter_from_partition_bounds(&inner.bounds)?; self.dynamic_filter.update(filter_expr)?; } + self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 0b5ab784df67..0b673e79cdc5 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -591,10 +591,13 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, - filter: _, + filter, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop + // Mark the dynamic filter as complete now that TopK processing is finished. + filter.read().expr().mark_complete(); + // break into record batches as needed let mut batches = vec![]; if let Some(mut batch) = heap.emit()? { From 278c3bb30f0c05d472569453727b5d14632546ca Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 19 Nov 2025 10:05:44 +0100 Subject: [PATCH 2/6] use watch instead --- Cargo.lock | 1 + datafusion/physical-expr/Cargo.toml | 1 + .../src/expressions/dynamic_filters.rs | 55 ++++++++++++++----- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b95390c24e9..2974b3f8145b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2496,6 +2496,7 @@ dependencies = [ "petgraph 0.8.3", "rand 0.9.2", "rstest", + "tokio", ] [[package]] diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 953a46929c39..b5b5c324a445 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } paste = "^1.0" petgraph = "0.8.3" +tokio = { workspace = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index dfaaccc6e0a5..921fa18cc332 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -17,6 +17,7 @@ use parking_lot::RwLock; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; +use tokio::sync::watch; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; @@ -44,6 +45,10 @@ pub struct DynamicFilterPhysicalExpr { remapped_children: Option>>, /// The source of dynamic filters. inner: Arc>, + /// Broadcasts completion state changes to all waiters. + completion_watch: watch::Sender, + /// Broadcasts update changes to all waiters. + update_watch: watch::Sender, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -57,8 +62,6 @@ struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. generation: u64, expr: Arc, - /// Flag indicating whether all updates have been received and the filter is complete. - is_complete: bool, } impl Inner { @@ -68,7 +71,6 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, - is_complete: false, } } @@ -137,10 +139,14 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { + let (completion_watch, _) = watch::channel(false); + let (update_watch, _) = watch::channel(1u64); Self { children, remapped_children: None, // Initially no remapped children inner: Arc::new(RwLock::new(Inner::new(inner))), + completion_watch, + update_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } @@ -188,7 +194,7 @@ impl DynamicFilterPhysicalExpr { Self::remap_children(&self.children, self.remapped_children.as_ref(), expr) } - /// Update the current expression. + /// Update the current expression and notify all waiters. /// Any children of this expression must be a subset of the original children /// passed to the constructor. /// This should be called e.g.: @@ -207,27 +213,48 @@ impl DynamicFilterPhysicalExpr { // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); + let new_generation = current.generation + 1; *current = Inner { - generation: current.generation + 1, + generation: new_generation, expr: new_expr, - is_complete: current.is_complete, }; + drop(current); // Release the lock before broadcasting + + // Broadcast the new generation to all waiters + let _ = self.update_watch.send(new_generation); Ok(()) } - /// Mark this dynamic filter as complete. + /// Mark this dynamic filter as complete and broadcast to all waiters. /// /// This signals that all expected updates have been received. + /// Waiters using [`Self::wait_complete`] will be notified. pub fn mark_complete(&self) { - let mut current = self.inner.write(); - current.is_complete = true; + // Broadcast completion to all waiters + let _ = self.completion_watch.send(true); + } + + /// Wait asynchronously for any update to this filter. + /// + /// This method will return when [`Self::update`] is called. + /// It does not guarantee that the filter is complete. + pub async fn wait_update(&self) { + let mut rx = self.update_watch.subscribe(); + // Wait for the generation to change + let _ = rx.changed().await; } - /// Check if this dynamic filter is complete. + /// Wait asynchronously until this dynamic filter is marked as complete. + /// + /// This method returns immediately if the filter is already complete. + /// Otherwise, it waits until [`Self::mark_complete`] is called. /// - /// Returns `true` if all expected updates have been received via [`Self::mark_complete`]. - pub fn is_complete(&self) -> bool { - self.inner.read().is_complete + /// Unlike [`Self::wait_update`], this method guarantees that when it returns, + /// the filter is fully complete with no more updates expected. + pub async fn wait_complete(&self) { + let mut rx = self.completion_watch.subscribe(); + // Wait until the completion flag becomes true + let _ = rx.wait_for(|&complete| complete).await; } fn render( @@ -272,6 +299,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), + completion_watch: self.completion_watch.clone(), + update_watch: self.update_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), })) From 32bbd7aa594a5dd26cf933b34ae3f9ae2bb4530a Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 19 Nov 2025 11:01:06 +0100 Subject: [PATCH 3/6] unify in one watch --- .../src/expressions/dynamic_filters.rs | 63 +++++++++++++------ 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 921fa18cc332..3f19a0b4a712 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -28,6 +28,24 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +/// State of a dynamic filter, tracking both updates and completion. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterState { + /// Filter is in progress and may receive more updates. + InProgress { generation: u64 }, + /// Filter is complete and will not receive further updates. + Complete { generation: u64 }, +} + +impl FilterState { + fn generation(&self) -> u64 { + match self { + FilterState::InProgress { generation } + | FilterState::Complete { generation } => *generation, + } + } +} + /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also @@ -45,10 +63,8 @@ pub struct DynamicFilterPhysicalExpr { remapped_children: Option>>, /// The source of dynamic filters. inner: Arc>, - /// Broadcasts completion state changes to all waiters. - completion_watch: watch::Sender, - /// Broadcasts update changes to all waiters. - update_watch: watch::Sender, + /// Broadcasts filter state (updates and completion) to all waiters. + state_watch: watch::Sender, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -139,14 +155,12 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { - let (completion_watch, _) = watch::channel(false); - let (update_watch, _) = watch::channel(1u64); + let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); Self { children, remapped_children: None, // Initially no remapped children inner: Arc::new(RwLock::new(Inner::new(inner))), - completion_watch, - update_watch, + state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } @@ -220,8 +234,10 @@ impl DynamicFilterPhysicalExpr { }; drop(current); // Release the lock before broadcasting - // Broadcast the new generation to all waiters - let _ = self.update_watch.send(new_generation); + // Broadcast the new state to all waiters + let _ = self.state_watch.send(FilterState::InProgress { + generation: new_generation, + }); Ok(()) } @@ -230,18 +246,24 @@ impl DynamicFilterPhysicalExpr { /// This signals that all expected updates have been received. /// Waiters using [`Self::wait_complete`] will be notified. pub fn mark_complete(&self) { + let current_generation = self.inner.read().generation; // Broadcast completion to all waiters - let _ = self.completion_watch.send(true); + let _ = self.state_watch.send(FilterState::Complete { + generation: current_generation, + }); } /// Wait asynchronously for any update to this filter. /// - /// This method will return when [`Self::update`] is called. + /// This method will return when [`Self::update`] is called and the generation increases. /// It does not guarantee that the filter is complete. pub async fn wait_update(&self) { - let mut rx = self.update_watch.subscribe(); - // Wait for the generation to change - let _ = rx.changed().await; + let mut rx = self.state_watch.subscribe(); + // Get the current generation + let current_gen = rx.borrow_and_update().generation(); + + // Wait until generation increases + let _ = rx.wait_for(|state| state.generation() > current_gen).await; } /// Wait asynchronously until this dynamic filter is marked as complete. @@ -252,9 +274,11 @@ impl DynamicFilterPhysicalExpr { /// Unlike [`Self::wait_update`], this method guarantees that when it returns, /// the filter is fully complete with no more updates expected. pub async fn wait_complete(&self) { - let mut rx = self.completion_watch.subscribe(); - // Wait until the completion flag becomes true - let _ = rx.wait_for(|&complete| complete).await; + let mut rx = self.state_watch.subscribe(); + // Wait until the state becomes Complete + let _ = rx + .wait_for(|state| matches!(state, FilterState::Complete { .. })) + .await; } fn render( @@ -299,8 +323,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), - completion_watch: self.completion_watch.clone(), - update_watch: self.update_watch.clone(), + state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), })) From 43d9fb27ddf61917ad0daa3445d8206e38bcb3d2 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 19 Nov 2025 11:15:53 +0100 Subject: [PATCH 4/6] Fix cargo machete error --- datafusion/physical-expr/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b5b5c324a445..69670941ad4f 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -80,3 +80,6 @@ name = "is_null" [[bench]] harness = false name = "binary_op" + +[package.metadata.cargo-machete] +ignored = ["half"] From 3115cfb212b24c56d468c31b89a3a3a8fd3c9207 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 19 Nov 2025 11:44:38 +0100 Subject: [PATCH 5/6] Add tests --- .../src/expressions/dynamic_filters.rs | 31 +++++- .../physical-plan/src/joins/hash_join/exec.rs | 99 +++++++++++++++++++ datafusion/physical-plan/src/topk/mod.rs | 48 +++++++++ 3 files changed, 176 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 3f19a0b4a712..43a242472bec 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -78,6 +78,10 @@ struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. generation: u64, expr: Arc, + /// Flag for quick synchronous check if filter is complete. + /// This is redundant with the watch channel state, but allows us to return immediately + /// from `wait_complete()` without subscribing if already complete. + is_complete: bool, } impl Inner { @@ -87,6 +91,7 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, + is_complete: false, } } @@ -231,6 +236,7 @@ impl DynamicFilterPhysicalExpr { *current = Inner { generation: new_generation, expr: new_expr, + is_complete: current.is_complete, }; drop(current); // Release the lock before broadcasting @@ -246,7 +252,11 @@ impl DynamicFilterPhysicalExpr { /// This signals that all expected updates have been received. /// Waiters using [`Self::wait_complete`] will be notified. pub fn mark_complete(&self) { - let current_generation = self.inner.read().generation; + let mut current = self.inner.write(); + let current_generation = current.generation; + current.is_complete = true; + drop(current); + // Broadcast completion to all waiters let _ = self.state_watch.send(FilterState::Complete { generation: current_generation, @@ -274,8 +284,11 @@ impl DynamicFilterPhysicalExpr { /// Unlike [`Self::wait_update`], this method guarantees that when it returns, /// the filter is fully complete with no more updates expected. pub async fn wait_complete(&self) { + if self.inner.read().is_complete { + return; + } + let mut rx = self.state_watch.subscribe(); - // Wait until the state becomes Complete let _ = rx .wait_for(|state| matches!(state, FilterState::Complete { .. })) .await; @@ -580,4 +593,18 @@ mod test { "Expected err when evaluate is called after changing the expression." ); } + + #[tokio::test] + async fn test_wait_complete_already_complete() { + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // Mark as complete immediately + dynamic_filter.mark_complete(); + + // wait_complete should return immediately + dynamic_filter.wait_complete().await; + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index d923acaea02b..0c51b9a108e3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4494,4 +4494,103 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table. + #[tokio::test] + async fn test_hash_join_marks_filter_complete() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 6]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // After the join completes, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } + + /// This test verifies that the dynamic filter is marked as complete even when the build side is empty. + #[tokio::test] + async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + // Empty left side (build side) + let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // Even with empty build side, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 0b673e79cdc5..96356e50d7ec 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -1201,4 +1201,52 @@ mod tests { Ok(()) } + + /// This test verifies that the dynamic filter is marked as complete after TopK processing finishes. + #[tokio::test] + async fn test_topk_marks_filter_complete() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let sort_expr = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + + let full_expr = LexOrdering::from([sort_expr.clone()]); + let prefix = vec![sort_expr]; + + // Create a dummy runtime environment and metrics + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a dynamic filter that we'll check for completion + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create a TopK instance + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + prefix, + full_expr, + 2, + 10, + runtime, + &metrics, + Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))), + )?; + + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)])); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?; + topk.insert_batch(batch)?; + + // Call emit to finish TopK processing + let _results: Vec<_> = topk.emit()?.try_collect().await?; + + // After emit is called, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } } From e10669627e1b087cd60b481c1f654f93d7151b68 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Thu, 20 Nov 2025 09:37:59 +0100 Subject: [PATCH 6/6] minor conflicts --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f2abb8d69ad9..680e4368ae70 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4524,7 +4524,7 @@ mod tests { )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: dynamic_filter, - bounds_accumulator: OnceLock::new(), + build_accumulator: OnceLock::new(), }); // Execute the join @@ -4572,7 +4572,7 @@ mod tests { )?; join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: dynamic_filter, - bounds_accumulator: OnceLock::new(), + build_accumulator: OnceLock::new(), }); // Execute the join