diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b5fe5ee5cda1..7d6eb6932359 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -29,6 +29,7 @@ use crate::filter_pushdown::{ use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; use crate::joins::hash_join::stream::{ BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, + ProbeSideBoundsAccumulator, }; use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; use crate::joins::utils::{ @@ -463,12 +464,25 @@ impl HashJoinExec { }) } - fn create_dynamic_filter(on: &JoinOn) -> Arc { - // Extract the right-side keys (probe side keys) from the `on` clauses - // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) - let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + fn join_exprs_for_side(on: &JoinOn, pushdown_side: JoinSide) -> Vec { + match pushdown_side { + JoinSide::Left => on.iter().map(|(l, _)| Arc::clone(l)).collect(), + JoinSide::Right => on.iter().map(|(_, r)| Arc::clone(r)).collect(), + JoinSide::None => vec![], + } + } + + fn create_dynamic_filter( + on: &JoinOn, + pushdown_side: JoinSide, + ) -> Result> { + if pushdown_side == JoinSide::None { + return internal_err!("dynamic filter side must be specified"); + } + // Extract the join key expressions from the side that will receive the dynamic filter + let keys = Self::join_exprs_for_side(on, pushdown_side); // Initialize with a placeholder expression (true) that will be updated when the hash table is built - Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + Ok(Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true)))) } /// left (build) side which gets hashed @@ -527,12 +541,6 @@ impl HashJoinExec { ] } - /// Get probe side information for the hash join. - pub fn probe_side() -> JoinSide { - // In current implementation right side is always probe side. - JoinSide::Right - } - /// Return whether the join contains a projection pub fn contains_projection(&self) -> bool { self.projection.is_some() @@ -578,7 +586,7 @@ impl HashJoinExec { &join_type, Arc::clone(&schema), &Self::maintains_input_order(join_type), - Some(Self::probe_side()), + Some(find_filter_pushdown_sides(join_type)), on, )?; @@ -780,6 +788,26 @@ impl DisplayAs for HashJoinExec { } } +fn find_filter_pushdown_sides(join_type: JoinType) -> JoinSide { + // This represents the side that will receive the dynamic filter and apply the bounds. + // The other side will be the build side where we collect the bounds from. + // Bounds accumulator only collect join key range from ON clause. + match join_type { + JoinType::Inner => JoinSide::Right, + JoinType::Left => JoinSide::Right, + JoinType::Right => JoinSide::Left, + JoinType::LeftSemi => JoinSide::Right, + JoinType::RightSemi => JoinSide::Left, + JoinType::LeftAnti => JoinSide::Right, + JoinType::RightAnti => JoinSide::Left, + JoinType::LeftMark => JoinSide::Right, + JoinType::RightMark => JoinSide::Left, + // Full outer join cannot have dynamic filter pushdown because all rows on both + // sides are preserved. + JoinType::Full => JoinSide::None, + } +} + impl ExecutionPlan for HashJoinExec { fn name(&self) -> &'static str { "HashJoinExec" @@ -929,6 +957,9 @@ impl ExecutionPlan for HashJoinExec { } let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + let probe_side = find_filter_pushdown_sides(self.join_type); + let report_build_bounds = + enable_dynamic_filter_pushdown && probe_side == JoinSide::Right; let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { @@ -946,7 +977,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown, + report_build_bounds, )) })?, PartitionMode::Partitioned => { @@ -964,7 +995,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown, + report_build_bounds, )) } PartitionMode::Auto => { @@ -982,18 +1013,16 @@ impl ExecutionPlan for HashJoinExec { .then(|| { self.dynamic_filter.as_ref().map(|df| { let filter = Arc::clone(&df.filter); - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); + // Determine which side will receive the dynamic filter + let probe_side = find_filter_pushdown_sides(self.join_type); + let on_expressions = Self::join_exprs_for_side(&self.on, probe_side); Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { Arc::new(SharedBoundsAccumulator::new_from_partition_mode( self.mode, self.left.as_ref(), self.right.as_ref(), filter, - on_right, + on_expressions, )) }))) }) @@ -1004,6 +1033,7 @@ impl ExecutionPlan for HashJoinExec { // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; + let right_schema = right_stream.schema(); // update column indices to reflect the projection let column_indices_after_projection = match &self.projection { @@ -1020,6 +1050,23 @@ impl ExecutionPlan for HashJoinExec { .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); + let probe_bounds_accumulators = + if enable_dynamic_filter_pushdown && probe_side == JoinSide::Left { + Some( + on_right + .iter() + .map(|expr| { + ProbeSideBoundsAccumulator::try_new( + Arc::clone(expr), + &right_schema, + ) + }) + .collect::>>()?, + ) + } else { + None + }; + Ok(Box::pin(HashJoinStream::new( partition, self.schema(), @@ -1037,6 +1084,8 @@ impl ExecutionPlan for HashJoinExec { vec![], self.right.output_ordering().is_some(), bounds_accumulator, + report_build_bounds, + probe_bounds_accumulators, self.mode, ))) } @@ -1126,7 +1175,7 @@ impl ExecutionPlan for HashJoinExec { } // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + let mut left_child = crate::filter_pushdown::ChildFilterDescription::from_child( &parent_filters, self.left(), )?; @@ -1139,9 +1188,24 @@ impl ExecutionPlan for HashJoinExec { if matches!(phase, FilterPushdownPhase::Post) && config.optimizer.enable_join_dynamic_filter_pushdown { - // Add actual dynamic filter to right side (probe side) - let dynamic_filter = Self::create_dynamic_filter(&self.on); - right_child = right_child.with_self_filter(dynamic_filter); + let pushdown_side = find_filter_pushdown_sides(self.join_type); + let dynamic_filter = Self::create_dynamic_filter(&self.on, pushdown_side)?; + match pushdown_side { + JoinSide::None => { + // A join type that preserves both sides (e.g. FULL) cannot + // leverage dynamic filters. Return early before attempting to + // create one. + return Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)); + } + JoinSide::Left => { + left_child = left_child.with_self_filter(dynamic_filter); + } + JoinSide::Right => { + right_child = right_child.with_self_filter(dynamic_filter); + } + } } Ok(FilterDescription::new() @@ -1159,7 +1223,8 @@ impl ExecutionPlan for HashJoinExec { // non-inner joins in `gather_filters_for_pushdown`. // However it's a cheap check and serves to inform future devs touching this function that they need to be really // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { + let pushdown_side = find_filter_pushdown_sides(self.join_type); + if pushdown_side == JoinSide::None { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs @@ -1170,9 +1235,13 @@ impl ExecutionPlan for HashJoinExec { let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children - let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child - // We expect 0 or 1 self filters - if let Some(filter) = right_child_self_filters.first() { + let self_filters = match pushdown_side { + JoinSide::Left => &child_pushdown_result.self_filters[0], + JoinSide::Right => &child_pushdown_result.self_filters[1], + JoinSide::None => unreachable!(), + }; + // We expect 0 or 1 self filters + if let Some(filter) = self_filters.first() { // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating let predicate = Arc::clone(&filter.predicate); @@ -4518,4 +4587,117 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + #[test] + fn create_dynamic_filter_none_side_returns_error() { + let on: JoinOn = vec![]; + let err = HashJoinExec::create_dynamic_filter(&on, JoinSide::None).unwrap_err(); + assert_contains!(err.to_string(), "dynamic filter side must be specified"); + } + + #[test] + fn full_join_skips_dynamic_filter_creation() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::col; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?; + let left = + TestMemoryExec::try_new(&[vec![batch.clone()]], Arc::clone(&schema), None)?; + let right = TestMemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None)?; + + let on = vec![(col("a", &left.schema())?, col("a", &right.schema())?)]; + let join = HashJoinExec::try_new( + Arc::new(left), + Arc::new(right), + on, + None, + &JoinType::Full, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + + let desc = + join.gather_filters_for_pushdown(FilterPushdownPhase::Post, vec![], &config)?; + assert!(desc.self_filters().iter().all(|f| f.is_empty())); + Ok(()) + } + + // This test verifies that when a HashJoinExec is created with a dynamic filter + // targeting the left side, the join build phase collects min/max bounds from + // the build-side input and reports them back into the dynamic filter for the + // other side. Concretely: + // - Left input has values [1, 3, 5] + // - Right (probe) input has values [2, 4, 6] + // - JoinType::Right is used so that the dynamic filter is attached to the left side expression. + // - After fully executing the join, the dynamic filter should be updated + // with the observed bounds `a@0 >= 2 AND a@0 <= 6` (min=2, max=6). + // The test asserts that HashJoinExec correctly accumulates and reports these + // bounds so downstream consumers can use the dynamic predicate for pruning. + #[tokio::test] + async fn reports_bounds_when_dynamic_filter_side_left() -> Result<()> { + use datafusion_physical_expr::expressions::col; + + let task_ctx = Arc::new(TaskContext::default()); + + let left_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let left_batch = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![Arc::new(Int32Array::from(vec![1, 3, 5]))], + )?; + let left = TestMemoryExec::try_new(&[vec![left_batch]], left_schema, None)?; + + let right_schema = + Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![Arc::new(Int32Array::from(vec![2, 4, 6]))], + )?; + let right = TestMemoryExec::try_new(&[vec![right_batch]], right_schema, None)?; + + let on = vec![(col("a", &left.schema())?, col("b", &right.schema())?)]; + + let test_cases = vec![ + (JoinType::Right, JoinSide::Left, "a@0 >= 2 AND a@0 <= 6"), + (JoinType::Left, JoinSide::Right, "b@0 >= 1 AND b@0 <= 5"), + ]; + for (join_type, probe_side, expected_filter) in test_cases { + let mut join_exec = HashJoinExec::try_new( + Arc::new(left.clone()), + Arc::new(right.clone()), + on.clone(), + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; + + let dynamic_filter: Arc = + HashJoinExec::create_dynamic_filter(&join_exec.on, probe_side)?; + join_exec.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + bounds_accumulator: OnceLock::new(), + }); + + let stream = join_exec.execute(0, Arc::clone(&task_ctx))?; + let _batches: Vec = stream.try_collect().await?; + + assert_eq!( + format!("{}", dynamic_filter.current().unwrap()), + expected_filter + ); + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 25f7a0de31ac..b442fd719077 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -107,8 +107,8 @@ pub(crate) struct SharedBoundsAccumulator { barrier: Barrier, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, - /// Right side join expressions needed for creating filter bounds - on_right: Vec, + /// Join expressions for the side that will receive the dynamic filter + on_expressions: Vec, } /// State protected by SharedBoundsAccumulator's mutex @@ -149,7 +149,7 @@ impl SharedBoundsAccumulator { left_child: &dyn ExecutionPlan, right_child: &dyn ExecutionPlan, dynamic_filter: Arc, - on_right: Vec, + on_expressions: Vec, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -171,7 +171,7 @@ impl SharedBoundsAccumulator { }), barrier: Barrier::new(expected_calls), dynamic_filter, - on_right, + on_expressions, } } @@ -199,16 +199,16 @@ impl SharedBoundsAccumulator { // Create range predicates for each join key in this partition let mut column_predicates = Vec::with_capacity(partition_bounds.len()); - for (col_idx, right_expr) in self.on_right.iter().enumerate() { + for (col_idx, expr) in self.on_expressions.iter().enumerate() { if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { // Create predicate: col >= min AND col <= max let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), + Arc::clone(expr), Operator::GtEq, lit(column_bounds.min.clone()), )) as Arc; let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), + Arc::clone(expr), Operator::LtEq, lit(column_bounds.max.clone()), )) as Arc; @@ -311,3 +311,68 @@ impl fmt::Debug for SharedBoundsAccumulator { write!(f, "SharedBoundsAccumulator") } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::empty::EmptyExec; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common_runtime::SpawnedTask; + use datafusion_physical_expr::expressions::{col, lit, DynamicFilterPhysicalExpr}; + use tokio::task; + + // This test verifies the synchronization behavior of `SharedBoundsAccumulator`. + // It ensures that the dynamic filter is not updated until all expected + // partitions have reported their build-side bounds. One partition reports + // in a spawned task while the test reports another; the dynamic filter + // should remain the default until the final partition arrives, at which + // point the accumulated bounds are combined and the dynamic filter is + // updated exactly once with range predicates (>= and <=) for the join key. + #[tokio::test] + async fn waits_for_all_partitions_before_updating() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let left = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); + let right = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); + let col_expr = col("a", &schema).unwrap(); + let dynamic = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_expr)], + lit(true), + )); + let acc = Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + PartitionMode::Partitioned, + &left, + &right, + Arc::clone(&dynamic), + vec![Arc::clone(&col_expr)], + )); + + assert_eq!(format!("{}", dynamic.current().unwrap()), "true"); + + let acc0 = Arc::clone(&acc); + let handle = SpawnedTask::spawn(async move { + acc0.report_partition_bounds( + 0, + Some(vec![ColumnBounds::new( + ScalarValue::from(1i32), + ScalarValue::from(2i32), + )]), + ) + .await + .unwrap(); + }); + task::yield_now().await; + assert_eq!(format!("{}", dynamic.current().unwrap()), "true"); + acc.report_partition_bounds( + 1, + Some(vec![ColumnBounds::new( + ScalarValue::from(3i32), + ScalarValue::from(4i32), + )]), + ) + .await + .unwrap(); + handle.await.unwrap(); + let updated = format!("{}", dynamic.current().unwrap()); + assert!(updated.contains(">=") && updated.contains("<=")); + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 88c50c2eb2ce..d895def06b81 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::task::Poll; use crate::joins::hash_join::exec::JoinLeftData; -use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; +use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; use crate::joins::utils::{ equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, }; @@ -43,11 +43,13 @@ use crate::{ }; use arrow::array::{ArrayRef, UInt32Array, UInt64Array}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ internal_datafusion_err, internal_err, JoinSide, JoinType, NullEquality, Result, }; +use datafusion_expr::Accumulator; +use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::PhysicalExprRef; use ahash::RandomState; @@ -102,6 +104,56 @@ impl BuildSide { } } +/// Accumulates probe-side column bounds for dynamic filter pushdown. +/// +/// This mirrors the build-side accumulator used when collecting bounds from +/// the left (build) side. Each accumulator tracks the minimum and maximum +/// values for a single join key expression. +pub(super) struct ProbeSideBoundsAccumulator { + expr: PhysicalExprRef, + min: MinAccumulator, + max: MaxAccumulator, +} + +impl ProbeSideBoundsAccumulator { + /// Creates a new accumulator for the given join key expression. + pub(super) fn try_new(expr: PhysicalExprRef, schema: &SchemaRef) -> Result { + fn dictionary_value_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Dictionary(_, value_type) => { + dictionary_value_type(value_type.as_ref()) + } + _ => data_type.clone(), + } + } + + let data_type = expr + .data_type(schema) + .map(|dt| dictionary_value_type(&dt))?; + Ok(Self { + expr, + min: MinAccumulator::try_new(&data_type)?, + max: MaxAccumulator::try_new(&data_type)?, + }) + } + + /// Updates bounds using values from the provided batch. + fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; + self.min.update_batch(std::slice::from_ref(&array))?; + self.max.update_batch(std::slice::from_ref(&array))?; + Ok(()) + } + + /// Returns the final column bounds. + fn evaluate(mut self) -> Result { + Ok(ColumnBounds::new( + self.min.evaluate()?, + self.max.evaluate()?, + )) + } +} + /// Represents state of HashJoinStream /// /// Expected state transitions performed by HashJoinStream are: @@ -182,9 +234,9 @@ pub(super) struct HashJoinStream { schema: Arc, /// equijoin columns from the right (probe side) on_right: Vec, - /// optional join filter + /// Optional join filter filter: Option, - /// type of the join (left, right, semi, etc) + /// Type of the join (left, right, semi, etc) join_type: JoinType, /// right (probe) input right: SendableRecordBatchStream, @@ -211,7 +263,12 @@ pub(super) struct HashJoinStream { /// Optional future to signal when bounds have been reported by all partitions /// and the dynamic filter has been updated bounds_waiter: Option>, - + /// Whether we should report bounds derived from the build (left) side + report_build_bounds: bool, + /// Accumulators for probe-side bounds when filtering the left side + probe_bounds_accumulators: Option>, + /// Total number of probe-side rows processed (for bounds reporting) + probe_side_row_count: usize, /// Partitioning mode to use mode: PartitionMode, } @@ -316,6 +373,8 @@ impl HashJoinStream { hashes_buffer: Vec, right_side_ordered: bool, bounds_accumulator: Option>, + report_build_bounds: bool, + probe_bounds_accumulators: Option>, mode: PartitionMode, ) -> Self { Self { @@ -336,6 +395,9 @@ impl HashJoinStream { right_side_ordered, bounds_accumulator, bounds_waiter: None, + report_build_bounds, + probe_bounds_accumulators, + probe_side_row_count: 0, mode, } } @@ -390,6 +452,18 @@ impl HashJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } + fn create_bounds_waiter( + bounds_accumulator: Arc, + partition_id: usize, + bounds: Option>, + ) -> OnceFut<()> { + OnceFut::new(async move { + bounds_accumulator + .report_partition_bounds(partition_id, bounds) + .await + }) + } + /// Collects build-side data by polling `OnceFut` future from initialized build-side /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` @@ -410,25 +484,26 @@ impl HashJoinStream { // // Dynamic filter coordination between partitions: // Report bounds to the accumulator which will handle synchronization and filter updates + let mut next_state = HashJoinStreamState::FetchProbeBatch; if let Some(ref bounds_accumulator) = self.bounds_accumulator { - let bounds_accumulator = Arc::clone(bounds_accumulator); - - let left_side_partition_id = match self.mode { - PartitionMode::Partitioned => self.partition, - PartitionMode::CollectLeft => 0, - PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), - }; - - let left_data_bounds = left_data.bounds.clone(); - self.bounds_waiter = Some(OnceFut::new(async move { - bounds_accumulator - .report_partition_bounds(left_side_partition_id, left_data_bounds) - .await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = HashJoinStreamState::FetchProbeBatch; + if self.report_build_bounds { + let partition_id = match self.mode { + PartitionMode::Partitioned => self.partition, + PartitionMode::CollectLeft => 0, + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" + ), + }; + let bounds = left_data.bounds.clone(); + self.bounds_waiter = Some(Self::create_bounds_waiter( + Arc::clone(bounds_accumulator), + partition_id, + bounds, + )); + next_state = HashJoinStreamState::WaitPartitionBoundsReport; + } } + self.state = next_state; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) @@ -444,7 +519,27 @@ impl HashJoinStream { ) -> Poll>>> { match ready!(self.right.poll_next_unpin(cx)) { None => { - self.state = HashJoinStreamState::ExhaustedProbeSide; + let mut next_state = HashJoinStreamState::ExhaustedProbeSide; + if let Some(ref bounds_accumulator) = self.bounds_accumulator { + if let Some(accs) = self.probe_bounds_accumulators.take() { + let bounds = if self.probe_side_row_count > 0 { + Some( + accs.into_iter() + .map(|acc| acc.evaluate()) + .collect::>>()?, + ) + } else { + None + }; + self.bounds_waiter = Some(Self::create_bounds_waiter( + Arc::clone(bounds_accumulator), + self.partition, + bounds, + )); + next_state = HashJoinStreamState::WaitPartitionBoundsReport; + } + } + self.state = next_state; } Some(Ok(batch)) => { // Precalculate hash values for fetched batch @@ -454,6 +549,13 @@ impl HashJoinStream { .map(|c| c.evaluate(&batch)?.into_array(batch.num_rows())) .collect::>>()?; + if let Some(accumulators) = self.probe_bounds_accumulators.as_mut() { + for acc in accumulators.iter_mut() { + acc.update_batch(&batch)?; + } + self.probe_side_row_count += batch.num_rows(); + } + self.hashes_buffer.clear(); self.hashes_buffer.resize(batch.num_rows(), 0); create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?; @@ -637,7 +739,7 @@ impl HashJoinStream { let (left_side, right_side) = get_final_indices_from_shared_bitmap( build_side.left_data.visited_indices_bitmap(), self.join_type, - true, + false, // piecewise = false for regular hash join ); let empty_right_batch = RecordBatch::new_empty(self.right.schema()); // use the left and right indices to produce the batch result