Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,10 @@ impl HashJoinExec {
false,
matches!(
join_type,
JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
JoinType::Inner
| JoinType::Right
| JoinType::RightAnti
| JoinType::RightSemi
),
]
}
Expand Down Expand Up @@ -779,6 +782,7 @@ impl ExecutionPlan for HashJoinExec {
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
hashes_buffer: vec![],
right_side_ordered: self.right.output_ordering().is_some(),
}))
}

Expand Down Expand Up @@ -1107,6 +1111,8 @@ struct HashJoinStream {
batch_size: usize,
/// Scratch space for computing hashes
hashes_buffer: Vec<u64>,
/// Specifies whether the right side has an ordering to potentially preserve
right_side_ordered: bool,
}

impl RecordBatchStream for HashJoinStream {
Expand Down Expand Up @@ -1444,6 +1450,7 @@ impl HashJoinStream {
right_indices,
index_alignment_range_start..index_alignment_range_end,
self.join_type,
self.right_side_ordered,
);

let result = build_batch_from_indices(
Expand Down Expand Up @@ -1537,7 +1544,6 @@ impl Stream for HashJoinStream {

#[cfg(test)]
mod tests {

use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ fn join_left_and_right_batch(
right_side,
0..right_batch.num_rows(),
join_type,
false,
);

build_batch_from_indices(
Expand Down Expand Up @@ -647,7 +648,6 @@ impl RecordBatchStream for NestedLoopJoinStream {

#[cfg(test)]
mod tests {

use super::*;
use crate::{
common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec,
Expand Down
135 changes: 107 additions & 28 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use arrow::array::{
UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
};
use arrow::compute;
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use arrow_array::builder::UInt64Builder;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
use arrow_buffer::ArrowNativeType;
use datafusion_common::cast::as_boolean_array;
Expand Down Expand Up @@ -1284,6 +1285,7 @@ pub(crate) fn adjust_indices_by_join_type(
right_indices: UInt32Array,
adjust_range: Range<usize>,
join_type: JoinType,
preserve_order_for_right: bool,
) -> (UInt64Array, UInt32Array) {
match join_type {
JoinType::Inner => {
Expand All @@ -1295,12 +1297,17 @@ pub(crate) fn adjust_indices_by_join_type(
(left_indices, right_indices)
// unmatched left row will be produced in the end of loop, and it has been set in the left visited bitmap
}
JoinType::Right | JoinType::Full => {
// matched
// unmatched right row will be produced in this batch
let right_unmatched_indices = get_anti_indices(adjust_range, &right_indices);
JoinType::Right => {
// combine the matched and unmatched right result together
append_right_indices(left_indices, right_indices, right_unmatched_indices)
append_right_indices(
left_indices,
right_indices,
adjust_range,
preserve_order_for_right,
)
}
JoinType::Full => {
append_right_indices(left_indices, right_indices, adjust_range, false)
}
JoinType::RightSemi => {
// need to remove the duplicated record in the right side
Expand All @@ -1326,30 +1333,48 @@ pub(crate) fn adjust_indices_by_join_type(
}
}

/// Appends the `right_unmatched_indices` to the `right_indices`,
/// and fills Null to tail of `left_indices` to
/// keep the length of `right_indices` and `left_indices` consistent.
/// Appends right indices to left indices based on the specified order mode.
///
/// The function operates in two modes:
/// 1. If `preserve_order_for_right` is true, probe matched and unmatched indices
/// are inserted in order using the `append_probe_indices_in_order()` method.
/// 2. Otherwise, unmatched probe indices are simply appended after matched ones.
///
/// # Parameters
/// - `left_indices`: UInt64Array of left indices.
/// - `right_indices`: UInt32Array of right indices.
/// - `adjust_range`: Range to adjust the right indices.
/// - `preserve_order_for_right`: Boolean flag to determine the mode of operation.
///
/// # Returns
/// A tuple of updated `UInt64Array` and `UInt32Array`.
pub(crate) fn append_right_indices(
left_indices: UInt64Array,
right_indices: UInt32Array,
right_unmatched_indices: UInt32Array,
adjust_range: Range<usize>,
preserve_order_for_right: bool,
) -> (UInt64Array, UInt32Array) {
// left_indices, right_indices and right_unmatched_indices must not contain the null value
if right_unmatched_indices.is_empty() {
(left_indices, right_indices)
if preserve_order_for_right {
append_probe_indices_in_order(left_indices, right_indices, adjust_range)
} else {
let unmatched_size = right_unmatched_indices.len();
// the new left indices: left_indices + null array
// the new right indices: right_indices + right_unmatched_indices
let new_left_indices = left_indices
.iter()
.chain(std::iter::repeat(None).take(unmatched_size))
.collect::<UInt64Array>();
let new_right_indices = right_indices
.iter()
.chain(right_unmatched_indices.iter())
.collect::<UInt32Array>();
(new_left_indices, new_right_indices)
let right_unmatched_indices = get_anti_indices(adjust_range, &right_indices);

if right_unmatched_indices.is_empty() {
(left_indices, right_indices)
} else {
let unmatched_size = right_unmatched_indices.len();
// the new left indices: left_indices + null array
// the new right indices: right_indices + right_unmatched_indices
let new_left_indices = left_indices
.iter()
.chain(std::iter::repeat(None).take(unmatched_size))
.collect();
let new_right_indices = right_indices
.iter()
.chain(right_unmatched_indices.iter())
.collect();
(new_left_indices, new_right_indices)
}
}
}

Expand Down Expand Up @@ -1379,7 +1404,7 @@ where
.filter_map(|idx| {
(!bitmap.get_bit(idx - offset)).then_some(T::Native::from_usize(idx))
})
.collect::<PrimitiveArray<T>>()
.collect()
}

/// Returns intersection of `range` and `input_indices` omitting duplicates
Expand Down Expand Up @@ -1408,7 +1433,61 @@ where
.filter_map(|idx| {
(bitmap.get_bit(idx - offset)).then_some(T::Native::from_usize(idx))
})
.collect::<PrimitiveArray<T>>()
.collect()
}

/// Appends probe indices in order by considering the given build indices.
///
/// This function constructs new build and probe indices by iterating through
/// the provided indices, and appends any missing values between previous and
/// current probe index with a corresponding null build index.
///
/// # Parameters
///
/// - `build_indices`: `PrimitiveArray` of `UInt64Type` containing build indices.
/// - `probe_indices`: `PrimitiveArray` of `UInt32Type` containing probe indices.
/// - `range`: The range of indices to consider.
///
/// # Returns
///
/// A tuple of two arrays:
/// - A `PrimitiveArray` of `UInt64Type` with the newly constructed build indices.
/// - A `PrimitiveArray` of `UInt32Type` with the newly constructed probe indices.
fn append_probe_indices_in_order(
build_indices: PrimitiveArray<UInt64Type>,
probe_indices: PrimitiveArray<UInt32Type>,
range: Range<usize>,
) -> (PrimitiveArray<UInt64Type>, PrimitiveArray<UInt32Type>) {
// Builders for new indices:
let mut new_build_indices = UInt64Builder::new();
let mut new_probe_indices = UInt32Builder::new();
// Set previous index as the start index for the initial loop:
let mut prev_index = range.start as u32;
// Zip the two iterators.
debug_assert!(build_indices.len() == probe_indices.len());
for (build_index, probe_index) in build_indices
.values()
.into_iter()
.zip(probe_indices.values().into_iter())
{
// Append values between previous and current probe index with null build index:
for value in prev_index..*probe_index {
new_probe_indices.append_value(value);
new_build_indices.append_null();
}
// Append current indices:
new_probe_indices.append_value(*probe_index);
new_build_indices.append_value(*build_index);
// Set current probe index as previous for the next iteration:
prev_index = probe_index + 1;
}
// Append remaining probe indices after the last valid probe index with null build index.
for value in prev_index..range.end as u32 {
new_probe_indices.append_value(value);
new_build_indices.append_null();
}
// Build arrays and return:
(new_build_indices.finish(), new_probe_indices.finish())
}

/// Metrics for build & probe joins
Expand Down Expand Up @@ -2475,7 +2554,7 @@ mod tests {
&on_columns,
left_columns_len,
maintains_input_order,
probe_side
probe_side,
),
expected[i]
);
Expand Down
Loading