Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 247 additions & 15 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{

use arrow::array::{
new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions,
UInt64Array,
UInt32Array, UInt64Array,
};
use arrow::buffer::BooleanBuffer;
use arrow::compute::{
Expand Down Expand Up @@ -1269,11 +1269,51 @@ impl NestedLoopJoinStream {
// and push the result into output_buffer
// ========

// Special case:
// When the right batch is very small, join with multiple left rows at once,
//
// The regular implementation is not efficient if the plan's right child is
// very small (e.g. 1 row total), because inside the inner loop of NLJ, it's
// handling one input right batch at once, if it's not large enough, the
// overheads like filter evaluation can't be amortized through vectorization.
debug_assert_ne!(
right_batch.num_rows(),
0,
"When fetching the right batch, empty batches will be skipped"
);

let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
if l_row_cnt_ratio > 10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be follow up PR to make this fine tuning param be configurable by user?

// Calculate max left rows to handle at once. This operator tries to handle
// up to `datafusion.execution.batch_size` rows at once in the intermediate
// batch.
let l_row_count = std::cmp::min(
l_row_cnt_ratio,
left_data.batch().num_rows() - self.left_probe_idx,
);

debug_assert!(l_row_count != 0, "This function should only be entered when there are remaining left rows to process");
let joined_batch = self.process_left_range_join(
&left_data,
&right_batch,
self.left_probe_idx,
l_row_count,
)?;

if let Some(batch) = joined_batch {
self.output_buffer.push_batch(batch)?;
}

self.left_probe_idx += l_row_count;

return Ok(true);
}

let l_idx = self.left_probe_idx;
let join_batch =
let joined_batch =
self.process_single_left_row_join(&left_data, &right_batch, l_idx)?;

if let Some(batch) = join_batch {
if let Some(batch) = joined_batch {
self.output_buffer.push_batch(batch)?;
}

Expand All @@ -1286,8 +1326,196 @@ impl NestedLoopJoinStream {
Ok(true)
}

/// Process [l_start_index, l_start_index + l_count) JOIN right_batch
/// Returns a RecordBatch containing the join results (None if empty)
///
/// Side Effect: If the join type requires, left or right side matched bitmap
/// will be set for matched indices.
fn process_left_range_join(
&mut self,
left_data: &JoinLeftData,
right_batch: &RecordBatch,
l_start_index: usize,
l_row_count: usize,
) -> Result<Option<RecordBatch>> {
// Construct the Cartesian product between the specified range of left rows
// and the entire right_batch. First, it calculates the index vectors, then
// materializes the intermediate batch, and finally applies the join filter
// to it.
// -----------------------------------------------------------
let right_rows = right_batch.num_rows();
let total_rows = l_row_count * right_rows;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like cartesian?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Added more comments to make it clear


// Build index arrays for cartesian product: left_range X right_batch
let left_indices: UInt32Array =
UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| {
std::iter::repeat_n((l_start_index + i) as u32, right_rows)
}));
let right_indices: UInt32Array = UInt32Array::from_iter_values(
(0..l_row_count).flat_map(|_| 0..right_rows as u32),
);

debug_assert!(
left_indices.len() == right_indices.len()
&& right_indices.len() == total_rows,
"The length or cartesian product should be (left_size * right_size)",
);

// Evaluate the join filter (if any) over an intermediate batch built
// using the filter's own schema/column indices.
let bitmap_combined = if let Some(filter) = &self.join_filter {
// Build the intermediate batch for filter evaluation
let intermediate_batch = if filter.schema.fields().is_empty() {
// Constant predicate (e.g., TRUE/FALSE). Use an empty schema with row_count
create_record_batch_with_empty_schema(
Arc::new((*filter.schema).clone()),
total_rows,
)?
} else {
let mut filter_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(filter.column_indices().len());
for column_index in filter.column_indices() {
let array = if column_index.side == JoinSide::Left {
let col = left_data.batch().column(column_index.index);
take(col.as_ref(), &left_indices, None)?
} else {
let col = right_batch.column(column_index.index);
take(col.as_ref(), &right_indices, None)?
};
filter_columns.push(array);
}

RecordBatch::try_new(Arc::new((*filter.schema).clone()), filter_columns)?
};

let filter_result = filter
.expression()
.evaluate(&intermediate_batch)?
.into_array(intermediate_batch.num_rows())?;
let filter_arr = as_boolean_array(&filter_result)?;

// Combine with null bitmap to get a unified mask
boolean_mask_from_filter(filter_arr)
} else {
// No filter: all pairs match
BooleanArray::from(vec![true; total_rows])
};

// Update the global left or right bitmap for matched indices
// -----------------------------------------------------------

// None means we don't have to update left bitmap for this join type
let mut left_bitmap = if need_produce_result_in_final(self.join_type) {
Some(left_data.bitmap().lock())
} else {
None
};

// 'local' meaning: we want to collect 'is_matched' flag for the current
// right batch, after it has joining all of the left buffer, here it's only
// the partial result for joining given left range
let mut local_right_bitmap = if self.should_track_unmatched_right {
let mut current_right_batch_bitmap = BooleanBufferBuilder::new(right_rows);
// Ensure builder has logical length so set_bit is in-bounds
current_right_batch_bitmap.append_n(right_rows, false);
Some(current_right_batch_bitmap)
} else {
None
};

// Set the matched bit for left and right side bitmap
for (i, is_matched) in bitmap_combined.iter().enumerate() {
let is_matched = is_matched.ok_or_else(|| {
internal_datafusion_err!("Must be Some after the previous combining step")
})?;

let l_index = l_start_index + i / right_rows;
let r_index = i % right_rows;

if let Some(bitmap) = left_bitmap.as_mut() {
if is_matched {
// Map local index back to absolute left index within the batch
bitmap.set_bit(l_index, true);
}
}

if let Some(bitmap) = local_right_bitmap.as_mut() {
if is_matched {
bitmap.set_bit(r_index, true);
}
}
}

// Apply the local right bitmap to the global bitmap
if self.should_track_unmatched_right {
// Remember to put it back after update
let global_right_bitmap =
std::mem::take(&mut self.current_right_batch_matched).ok_or_else(
|| internal_datafusion_err!("right batch's bitmap should be present"),
)?;
let (buf, nulls) = global_right_bitmap.into_parts();
debug_assert!(nulls.is_none());

let current_right_bitmap = local_right_bitmap
.ok_or_else(|| {
internal_datafusion_err!(
"Should be Some if the current join type requires right bitmap"
)
})?
.finish();
let updated_global_right_bitmap = buf.bitor(&current_right_bitmap);

self.current_right_batch_matched =
Some(BooleanArray::new(updated_global_right_bitmap, None));
}

// For the following join types: only bitmaps are updated; do not emit rows now
if matches!(
self.join_type,
JoinType::LeftAnti
| JoinType::LeftSemi
| JoinType::LeftMark
| JoinType::RightAnti
| JoinType::RightMark
| JoinType::RightSemi
) {
return Ok(None);
}

// Build the projected output batch (using output schema/column_indices),
// then apply the bitmap filter to it.
if self.output_schema.fields().is_empty() {
// Empty projection: only row count matters
let row_count = bitmap_combined.true_count();
return Ok(Some(create_record_batch_with_empty_schema(
Arc::clone(&self.output_schema),
row_count,
)?));
}

let mut out_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(self.output_schema.fields().len());
for column_index in &self.column_indices {
let array = if column_index.side == JoinSide::Left {
let col = left_data.batch().column(column_index.index);
take(col.as_ref(), &left_indices, None)?
} else {
let col = right_batch.column(column_index.index);
take(col.as_ref(), &right_indices, None)?
};
out_columns.push(array);
}
let pre_filtered =
RecordBatch::try_new(Arc::clone(&self.output_schema), out_columns)?;
let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?;
Ok(Some(filtered))
}

/// Process a single left row join with the current right batch.
/// Returns a RecordBatch containing the join results (None if empty)
///
/// Side Effect: If the join type requires, left or right side matched bitmap
/// will be set for matched indices.
fn process_single_left_row_join(
&mut self,
left_data: &JoinLeftData,
Expand Down Expand Up @@ -1584,22 +1812,26 @@ fn apply_filter_to_row_join_batch(
.into_array(intermediate_batch.num_rows())?;
let filter_arr = as_boolean_array(&filter_result)?;

// [Caution] This step has previously introduced bugs
// The filter result is NOT a bitmap; it contains true/false/null values.
// For example, 1 < NULL is evaluated to NULL. Therefore, we must combine (AND)
// the boolean array with its null bitmap to construct a unified bitmap.
let (is_filtered, nulls) = filter_arr.clone().into_parts();
let bitmap_combined = match nulls {
Some(nulls) => {
let combined = nulls.inner() & &is_filtered;
BooleanArray::new(combined, None)
}
None => BooleanArray::new(is_filtered, None),
};
// Convert boolean array with potential nulls into a unified mask bitmap
let bitmap_combined = boolean_mask_from_filter(filter_arr);

Ok(bitmap_combined)
}

/// Convert a boolean filter array into a unified mask bitmap.
///
/// Caution: The filter result is NOT a bitmap; it contains true/false/null values.
/// For example, `1 < NULL` evaluates to NULL. Therefore, we must combine (AND)
/// the boolean array with its null bitmap to construct a unified bitmap.
#[inline]
fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray {
let (values, nulls) = filter_arr.clone().into_parts();
match nulls {
Some(nulls) => BooleanArray::new(nulls.inner() & &values, None),
None => BooleanArray::new(values, None),
}
}

/// This function performs the following steps:
/// 1. Apply filter to probe-side batch
/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the
Expand Down
Loading