From 57b2ced886073b99f411b45ec0a17ebc06fbd563 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 15 Sep 2025 13:52:02 +0800 Subject: [PATCH 1/3] Improve NLJ for very small right side case --- benchmarks/src/nlj.rs | 9 + .../src/joins/nested_loop_join.rs | 264 +++++++++++++++++- 2 files changed, 258 insertions(+), 15 deletions(-) diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index e412c0ade8a8..071375dcda5b 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -146,6 +146,15 @@ const NLJ_QUERIES: &[&str] = &[ FULL JOIN range(30000) AS t2 ON (t1.value > t2.value); "#, + // Q13: INNER 200K x 1 | Low 0.1% | very small probe side + // TODO: ensure the optimizer won't swap order after we're able to turn off join + // reordering from configuration. + r#" + SELECT * + FROM range(200000) AS t1 + FULL JOIN range(1) AS t2 + ON ((t1.value + t2.value) % 1000) = 1 + "#, ]; impl RunOpt { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index ced9078d956b..4bd94c489758 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -48,9 +48,12 @@ use crate::{ use arrow::array::{ new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, + UInt32Array, }; use arrow::buffer::BooleanBuffer; -use arrow::compute::{concat_batches, filter, filter_record_batch, not, BatchCoalescer}; +use arrow::compute::{ + concat_batches, filter, filter_record_batch, not, take, BatchCoalescer, +}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; @@ -1232,11 +1235,49 @@ impl NestedLoopJoinStream { // and push the result into output_buffer // ======== + // Special case: + // When the right batch is very small, join with multiple left rows at once, + // + // The regular implementation is not efficient if the plan's right child is + // very small (e.g. 1 row total), because inside the inner loop of NLJ, it's + // handling one input right batch at once, if it's not large enough, the + // overheads like filter evalaution can't be amortized through vectorization. + debug_assert_ne!( + right_batch.num_rows(), + 0, + "When fetching the right batch, empty batches will be skipped" + ); + if (self.batch_size / right_batch.num_rows()) > 10 { + // Calculate max left rows to handle at once. This opeator tries to handle + // up to `datafusion.execution.batch_size` rows at once in the intermediate + // batch. + let l_row_count = self.batch_size / right_batch.num_rows(); + let l_row_count = std::cmp::min( + l_row_count, + left_data.batch().num_rows() - self.left_probe_idx, + ); + + let joined_batch = self.process_left_range_join( + &left_data, + &right_batch, + self.left_probe_idx, + l_row_count, + )?; + + if let Some(batch) = joined_batch { + self.output_buffer.push_batch(batch)?; + } + + self.left_probe_idx += l_row_count; + + return Ok(true); + } + let l_idx = self.left_probe_idx; - let join_batch = + let joined_batch = self.process_single_left_row_join(&left_data, &right_batch, l_idx)?; - if let Some(batch) = join_batch { + if let Some(batch) = joined_batch { self.output_buffer.push_batch(batch)?; } @@ -1249,8 +1290,197 @@ impl NestedLoopJoinStream { Ok(true) } + /// Process [l_start_index, l_start_index + l_count) JOIN right_batch + /// Returns a RecordBatch containing the join results (None if empty) + /// + /// Side Effect: If the join type requires, left or right side matched bitmap + /// will be set for matched indices. + fn process_left_range_join( + &mut self, + left_data: &JoinLeftData, + right_batch: &RecordBatch, + l_start_index: usize, + l_row_count: usize, + ) -> Result> { + // Construct the Cartesian product between the specified range of left rows and the entire right_batch. + // Do not apply filters or update any bitmaps here. + let right_rows = right_batch.num_rows(); + if l_row_count == 0 || right_rows == 0 { + return Ok(None); + } + + let total_rows = l_row_count * right_rows; + + // Build index arrays for cartesian product: left_range X right_batch + let left_indices: UInt32Array = + UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| { + std::iter::repeat_n((l_start_index + i) as u32, right_rows) + })); + let right_indices: UInt32Array = UInt32Array::from_iter_values( + (0..l_row_count).flat_map(|_| (0..right_rows as u32)), + ); + + debug_assert!( + left_indices.len() == right_indices.len() + && right_indices.len() == total_rows, + "The length or cartesian product should be (left_size * right_size)", + ); + + // Evaluate the join filter (if any) over an intermediate batch built + // using the filter's own schema/column indices. + let bitmap_combined = if let Some(filter) = &self.join_filter { + // Build the intermediate batch for filter evaluation + let intermediate_batch = if filter.schema.fields().is_empty() { + // Constant predicate (e.g., TRUE/FALSE). Use an empty schema with row_count + create_record_batch_with_empty_schema( + Arc::new((*filter.schema).clone()), + total_rows, + )? + } else { + let mut filter_columns: Vec> = + Vec::with_capacity(filter.column_indices().len()); + for column_index in filter.column_indices() { + let array = if column_index.side == JoinSide::Left { + let col = left_data.batch().column(column_index.index); + take(col.as_ref(), &left_indices, None)? + } else { + let col = right_batch.column(column_index.index); + take(col.as_ref(), &right_indices, None)? + }; + filter_columns.push(array); + } + + RecordBatch::try_new(Arc::new((*filter.schema).clone()), filter_columns)? + }; + + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + let filter_arr = as_boolean_array(&filter_result)?; + + // Combine with null bitmap to get a unified mask + boolean_mask_from_filter(filter_arr) + } else { + // No filter: all pairs match + BooleanArray::from(vec![true; total_rows]) + }; + + // Update the global left or right bitmap for matched indices + // ----------------------------------------------------------- + + // None means we don't have to update left bitmap for this join type + let mut left_bitmap = if need_produce_result_in_final(self.join_type) { + Some(left_data.bitmap().lock()) + } else { + None + }; + + // 'local' meaning: we want to collect 'is_matched' flag for the current + // right batch, after it has joining all of the left buffer, here it's only + // the partial result for joining given left range + let mut local_right_bitmap = if self.should_track_unmatched_right { + let mut current_right_batch_bitmap = BooleanBufferBuilder::new(right_rows); + // Ensure builder has logical length so set_bit is in-bounds + current_right_batch_bitmap.append_n(right_rows, false); + Some(current_right_batch_bitmap) + } else { + None + }; + + // Set the matched bit for left and right side bitmap + for (i, is_matched) in bitmap_combined.iter().enumerate() { + let is_matched = is_matched.ok_or_else(|| { + internal_datafusion_err!("Must be Some after the previous combining step") + })?; + + let l_index = l_start_index + i / right_rows; + let r_index = i % right_rows; + + if let Some(bitmap) = left_bitmap.as_mut() { + if is_matched { + // Map local index back to absolute left index within the batch + bitmap.set_bit(l_index, true); + } + } + + if let Some(bitmap) = local_right_bitmap.as_mut() { + if is_matched { + bitmap.set_bit(r_index, true); + } + } + } + + // Apply the local right bitmap to the global bitmap + if self.should_track_unmatched_right { + // Remember to put it back after update + let global_right_bitmap = + std::mem::take(&mut self.current_right_batch_matched).ok_or_else( + || internal_datafusion_err!("right batch's bitmap should be present"), + )?; + let (buf, nulls) = global_right_bitmap.into_parts(); + debug_assert!(nulls.is_none()); + + let current_right_bitmap = local_right_bitmap + .ok_or_else(|| { + internal_datafusion_err!( + "Should be Some if the current join type requries right bitmap" + ) + })? + .finish(); + let updated_global_right_bitmap = buf.bitor(¤t_right_bitmap); + + self.current_right_batch_matched = + Some(BooleanArray::new(updated_global_right_bitmap, None)); + } + + // For the following join types: only bitmaps are updated; do not emit rows now + if matches!( + self.join_type, + JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) { + return Ok(None); + } + + // Build the projected output batch (using output schema/column_indices), + // then apply the bitmap filter to it. + if self.output_schema.fields().is_empty() { + // Empty projection: only row count matters + let row_count = bitmap_combined.true_count(); + return Ok(Some(create_record_batch_with_empty_schema( + Arc::clone(&self.output_schema), + row_count, + )?)); + } + + let mut out_columns: Vec> = + Vec::with_capacity(self.output_schema.fields().len()); + for column_index in &self.column_indices { + let array = if column_index.side == JoinSide::Left { + let col = left_data.batch().column(column_index.index); + take(col.as_ref(), &left_indices, None)? + } else { + let col = right_batch.column(column_index.index); + take(col.as_ref(), &right_indices, None)? + }; + out_columns.push(array); + } + let pre_filtered = + RecordBatch::try_new(Arc::clone(&self.output_schema), out_columns)?; + let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?; + Ok(Some(filtered)) + } + /// Process a single left row join with the current right batch. /// Returns a RecordBatch containing the join results (None if empty) + /// + /// Side Effect: If the join type requires, left or right side matched bitmap + /// will be set for matched indices. fn process_single_left_row_join( &mut self, left_data: &JoinLeftData, @@ -1547,22 +1777,26 @@ fn apply_filter_to_row_join_batch( .into_array(intermediate_batch.num_rows())?; let filter_arr = as_boolean_array(&filter_result)?; - // [Caution] This step has previously introduced bugs - // The filter result is NOT a bitmap; it contains true/false/null values. - // For example, 1 < NULL is evaluated to NULL. Therefore, we must combine (AND) - // the boolean array with its null bitmap to construct a unified bitmap. - let (is_filtered, nulls) = filter_arr.clone().into_parts(); - let bitmap_combined = match nulls { - Some(nulls) => { - let combined = nulls.inner() & &is_filtered; - BooleanArray::new(combined, None) - } - None => BooleanArray::new(is_filtered, None), - }; + // Convert boolean array with potential nulls into a unified mask bitmap + let bitmap_combined = boolean_mask_from_filter(filter_arr); Ok(bitmap_combined) } +/// Convert a boolean filter array into a unified mask bitmap. +/// +/// Caution: The filter result is NOT a bitmap; it contains true/false/null values. +/// For example, `1 < NULL` evaluates to NULL. Therefore, we must combine (AND) +/// the boolean array with its null bitmap to construct a unified bitmap. +#[inline] +fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray { + let (values, nulls) = filter_arr.clone().into_parts(); + match nulls { + Some(nulls) => BooleanArray::new(nulls.inner() & &values, None), + None => BooleanArray::new(values, None), + } +} + /// This function performs the following steps: /// 1. Apply filter to probe-side batch /// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the From b5cc2a62474b8db68e5bdb10f1fe23c2e98d2fc8 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 15 Sep 2025 14:07:37 +0800 Subject: [PATCH 2/3] fix typos --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 4bd94c489758..11a3e3d96a6c 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1248,7 +1248,7 @@ impl NestedLoopJoinStream { "When fetching the right batch, empty batches will be skipped" ); if (self.batch_size / right_batch.num_rows()) > 10 { - // Calculate max left rows to handle at once. This opeator tries to handle + // Calculate max left rows to handle at once. This operator tries to handle // up to `datafusion.execution.batch_size` rows at once in the intermediate // batch. let l_row_count = self.batch_size / right_batch.num_rows(); @@ -1424,7 +1424,7 @@ impl NestedLoopJoinStream { let current_right_bitmap = local_right_bitmap .ok_or_else(|| { internal_datafusion_err!( - "Should be Some if the current join type requries right bitmap" + "Should be Some if the current join type requires right bitmap" ) })? .finish(); From bfc9c6e97627aa7a8c21a93e9ba8586e7a9da19c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 16 Sep 2025 12:13:43 +0800 Subject: [PATCH 3/3] review --- .../src/joins/nested_loop_join.rs | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 11a3e3d96a6c..832973b33ea4 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1241,22 +1241,24 @@ impl NestedLoopJoinStream { // The regular implementation is not efficient if the plan's right child is // very small (e.g. 1 row total), because inside the inner loop of NLJ, it's // handling one input right batch at once, if it's not large enough, the - // overheads like filter evalaution can't be amortized through vectorization. + // overheads like filter evaluation can't be amortized through vectorization. debug_assert_ne!( right_batch.num_rows(), 0, "When fetching the right batch, empty batches will be skipped" ); - if (self.batch_size / right_batch.num_rows()) > 10 { + + let l_row_cnt_ratio = self.batch_size / right_batch.num_rows(); + if l_row_cnt_ratio > 10 { // Calculate max left rows to handle at once. This operator tries to handle // up to `datafusion.execution.batch_size` rows at once in the intermediate // batch. - let l_row_count = self.batch_size / right_batch.num_rows(); let l_row_count = std::cmp::min( - l_row_count, + l_row_cnt_ratio, left_data.batch().num_rows() - self.left_probe_idx, ); + debug_assert!(l_row_count != 0, "This function should only be entered when there are remaining left rows to process"); let joined_batch = self.process_left_range_join( &left_data, &right_batch, @@ -1302,13 +1304,12 @@ impl NestedLoopJoinStream { l_start_index: usize, l_row_count: usize, ) -> Result> { - // Construct the Cartesian product between the specified range of left rows and the entire right_batch. - // Do not apply filters or update any bitmaps here. + // Construct the Cartesian product between the specified range of left rows + // and the entire right_batch. First, it calculates the index vectors, then + // materializes the intermediate batch, and finally applies the join filter + // to it. + // ----------------------------------------------------------- let right_rows = right_batch.num_rows(); - if l_row_count == 0 || right_rows == 0 { - return Ok(None); - } - let total_rows = l_row_count * right_rows; // Build index arrays for cartesian product: left_range X right_batch