diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index def1cc09a17f4..9ac7447a8abde 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::array::ArrayRef; +use arrow::{array::ArrayRef, compute}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -55,7 +55,7 @@ type Index = (usize, usize); // as a left join may issue None indices, in which case type JoinIndex = Option<(usize, usize)>; // An index of row uniquely identifying a row in a batch -type RightIndex = Option; +type RightIndex = Option; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true @@ -275,26 +275,26 @@ fn build_batch_from_indices( if left.is_empty() { todo!("Create empty record batch"); } - // this is just for symmetry of the code below. - let right = vec![right.clone()]; - let (primary_is_left, primary, secondary) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, &right), - JoinType::Right => (false, &right, left), + let (primary_is_left, primary_schema, secondary_schema) = match join_type { + JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), + JoinType::Right => (false, right.schema(), left[0].schema()), }; // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + + let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); + for field in schema.fields() { // pick the column (left or right) based on the field name. - // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) { - Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::>())), + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), Err(_) => { - match secondary[0].schema().index_of(field.name()) { - Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::>())), + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), _ => Err(DataFusionError::Internal( format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() )) @@ -302,12 +302,17 @@ fn build_batch_from_indices( } }.map_err(DataFusionError::into_arrow_external_error)?; - let capacity = arrays.iter().map(|array| array.len()).sum(); - let mut mutable = MutableArrayData::new(arrays, true, capacity); - let is_left = (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - if is_left { + + let array = if is_left { + // Note that we take `.data_ref()` to gather the [ArrayData] of each array. + let arrays = left + .iter() + .map(|batch| batch.column(column_index).data_ref().as_ref()) + .collect::>(); + + let mut mutable = MutableArrayData::new(arrays, true, indices.len()); // use the left indices for (join_index, _) in indices { match join_index { @@ -315,16 +320,12 @@ fn build_batch_from_indices( None => mutable.extend_nulls(1), } } + make_array(Arc::new(mutable.freeze())) } else { // use the right indices - for (_, join_index) in indices { - match join_index { - Some(row) => mutable.extend(0, *row, *row + 1), - None => mutable.extend_nulls(1), - } - } + let array = right.column(column_index); + compute::take(array.as_ref(), &right_indices, None)? }; - let array = make_array(Arc::new(mutable.freeze())); columns.push(array); } Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) @@ -400,7 +401,7 @@ fn build_batch( ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) } /// returns a vector with (index from left, index from right). @@ -456,7 +457,7 @@ fn build_join_indexes( // for every item on the left and right with this key, add the respective pair left_indexes.unwrap_or(&vec![]).iter().for_each(|x| { // on an inner join, left and right indices are present - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }) } Ok(indexes) @@ -477,7 +478,7 @@ fn build_join_indexes( is_visited.insert(key.clone()); indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }) }; } @@ -502,12 +503,12 @@ fn build_join_indexes( match left_indices { Some(indices) => { indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row))); + indexes.push((Some(*x), Some(row as u32))); }); } None => { // when no match, add the row with None for the left side - indexes.push((None, Some(row))); + indexes.push((None, Some(row as u32))); } } }