Skip to content

Commit

Permalink
ARROW-11076: [Rust][DataFusion] Refactor usage of right indices in ha…
Browse files Browse the repository at this point in the history
…sh join

This applies some refactoring to `build_batch_from_indices` which is supposed to make further changes easier, e.g. solving https://issues.apache.org/jira/browse/ARROW-11030

* This starts handling right (1) batch and left (many) batches differently as for the right batches we can directly use `take` on it. This should be more efficient anyway, and also allows in the future to build the index array directly instead of doing extra copying.
* Use `indices.len()` for the capacity parameter, rather than the number of rows at the left. This is of impact at larger sizes (e.g. SF 100), see: #9036 Rather than estimating it based on previous batches, this does it based on the (known) number of resulting rows.
* Reuse "computed" right indices across multiple columns.
* The refactoring makes it easier to apply changes needed for https://issues.apache.org/jira/browse/ARROW-11030 where we need to remove the n*n work that is done for the build side
* The changes don't have a big impact locally on performance on TPC-H with small scale factor, but I believe it should have a similar effect as #9036 on SF=100 by using `indices.len()` rather than the number of rows in the build side.

FYI @jorgecarleitao @andygrove

Closes #9048 from Dandandan/join_right_refactor

Authored-by: Heres, Daniel <danielheres@gmail.com>
Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
  • Loading branch information
Dandandan authored and jorgecarleitao committed Dec 31, 2020
1 parent 2440c9e commit 25b7302
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines the join plan for executing partitions in parallel and then joining the results
//! into a set of partitions.

use arrow::array::ArrayRef;
use arrow::{array::ArrayRef, compute};
use std::sync::Arc;
use std::{any::Any, collections::HashSet};

Expand Down Expand Up @@ -55,7 +55,7 @@ type Index = (usize, usize);
// as a left join may issue None indices, in which case
type JoinIndex = Option<(usize, usize)>;
// An index of row uniquely identifying a row in a batch
type RightIndex = Option<usize>;
type RightIndex = Option<u32>;

// Maps ["on" value] -> [list of indices with this key's value]
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
Expand Down Expand Up @@ -275,56 +275,57 @@ fn build_batch_from_indices(
if left.is_empty() {
todo!("Create empty record batch");
}
// this is just for symmetry of the code below.
let right = vec![right.clone()];

let (primary_is_left, primary, secondary) = match join_type {
JoinType::Inner | JoinType::Left => (true, left, &right),
JoinType::Right => (false, &right, left),
let (primary_is_left, primary_schema, secondary_schema) = match join_type {
JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()),
JoinType::Right => (false, right.schema(), left[0].schema()),
};

// build the columns of the new [RecordBatch]:
// 1. pick whether the column is from the left or right
// 2. based on the pick, `take` items from the different recordBatches
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());

let right_indices = indices.iter().map(|(_, join_index)| join_index).collect();

for field in schema.fields() {
// pick the column (left or right) based on the field name.
// Note that we take `.data_ref()` to gather the [ArrayData] of each array.
let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
let (is_primary, column_index) = match primary_schema.index_of(field.name()) {
Ok(i) => Ok((true, i)),
Err(_) => {
match secondary[0].schema().index_of(field.name()) {
Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
match secondary_schema.index_of(field.name()) {
Ok(i) => Ok((false, i)),
_ => Err(DataFusionError::Internal(
format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
))
}
}
}.map_err(DataFusionError::into_arrow_external_error)?;

let capacity = arrays.iter().map(|array| array.len()).sum();
let mut mutable = MutableArrayData::new(arrays, true, capacity);

let is_left =
(is_primary && primary_is_left) || (!is_primary && !primary_is_left);
if is_left {

let array = if is_left {
// Note that we take `.data_ref()` to gather the [ArrayData] of each array.
let arrays = left
.iter()
.map(|batch| batch.column(column_index).data_ref().as_ref())
.collect::<Vec<_>>();

let mut mutable = MutableArrayData::new(arrays, true, indices.len());
// use the left indices
for (join_index, _) in indices {
match join_index {
Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
None => mutable.extend_nulls(1),
}
}
make_array(Arc::new(mutable.freeze()))
} else {
// use the right indices
for (_, join_index) in indices {
match join_index {
Some(row) => mutable.extend(0, *row, *row + 1),
None => mutable.extend_nulls(1),
}
}
let array = right.column(column_index);
compute::take(array.as_ref(), &right_indices, None)?
};
let array = make_array(Arc::new(mutable.freeze()));
columns.push(array);
}
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
Expand Down Expand Up @@ -400,7 +401,7 @@ fn build_batch(
) -> ArrowResult<RecordBatch> {
let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap();

build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices)
build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices)
}

/// returns a vector with (index from left, index from right).
Expand Down Expand Up @@ -456,7 +457,7 @@ fn build_join_indexes(
// for every item on the left and right with this key, add the respective pair
left_indexes.unwrap_or(&vec![]).iter().for_each(|x| {
// on an inner join, left and right indices are present
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
})
}
Ok(indexes)
Expand All @@ -477,7 +478,7 @@ fn build_join_indexes(
is_visited.insert(key.clone());

indices.iter().for_each(|x| {
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
})
};
}
Expand All @@ -502,12 +503,12 @@ fn build_join_indexes(
match left_indices {
Some(indices) => {
indices.iter().for_each(|x| {
indexes.push((Some(*x), Some(row)));
indexes.push((Some(*x), Some(row as u32)));
});
}
None => {
// when no match, add the row with None for the left side
indexes.push((None, Some(row)));
indexes.push((None, Some(row as u32)));
}
}
}
Expand Down

0 comments on commit 25b7302

Please sign in to comment.