Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches #9036

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use arrow::array::ArrayRef;
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, collections::HashSet};

use async_trait::async_trait;
Expand Down Expand Up @@ -47,6 +48,7 @@ use crate::error::{DataFusionError, Result};

use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
use ahash::RandomState;
use log::debug;

// An index of (batch, row) uniquely identifying a row in a part.
type Index = (usize, usize);
Expand Down Expand Up @@ -160,6 +162,8 @@ impl ExecutionPlan for HashJoinExec {
match build_side.as_ref() {
Some(stream) => stream.clone(),
None => {
let start = Instant::now();

// merge all left parts into a single stream
let merge = MergeExec::new(self.left.clone());
let stream = merge.execute(0).await?;
Expand All @@ -186,8 +190,18 @@ impl ExecutionPlan for HashJoinExec {
})
.await?;

let num_rows: usize =
left_data.1.iter().map(|batch| batch.num_rows()).sum();

let left_side = Arc::new((left_data.0, left_data.1));
*build_side = Some(left_side.clone());

debug!(
"Built build-side of hash join containing {} rows in {} ms",
num_rows,
start.elapsed().as_millis()
);

left_side
}
}
Expand All @@ -208,6 +222,11 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
left_data,
right: stream,
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
join_time: 0,
}))
}
}
Expand Down Expand Up @@ -252,6 +271,16 @@ struct HashJoinStream {
left_data: JoinLeftData,
/// right
right: SendableRecordBatchStream,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for joining probe-side batches to the build-side batches
join_time: usize,
}

impl RecordBatchStream for HashJoinStream {
Expand All @@ -271,6 +300,7 @@ fn build_batch_from_indices(
right: &RecordBatch,
join_type: &JoinType,
indices: &[(JoinIndex, RightIndex)],
capacity: usize,
) -> ArrowResult<RecordBatch> {
if left.is_empty() {
todo!("Create empty record batch");
Expand Down Expand Up @@ -307,7 +337,6 @@ fn build_batch_from_indices(
.iter()
.map(|array| array.as_ref())
.collect::<Vec<_>>();
let capacity = arrays.iter().map(|array| array.len()).sum();
let mut mutable = MutableArrayData::new(arrays, true, capacity);

let is_left =
Expand Down Expand Up @@ -402,10 +431,11 @@ fn build_batch(
on_right: &HashSet<String>,
join_type: &JoinType,
schema: &Schema,
capacity: usize,
) -> ArrowResult<RecordBatch> {
let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap();

build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices)
build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices, capacity)
}

/// returns a vector with (index from left, index from right).
Expand Down Expand Up @@ -531,14 +561,45 @@ impl Stream for HashJoinStream {
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => Some(build_batch(
&batch,
&self.left_data,
&self.on_right,
&self.join_type,
&self.schema,
)),
other => other,
Some(Ok(batch)) => {
let start = Instant::now();
let capacity = if self.num_output_batches == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #9032 it was benchmarked to be a bit faster currently to initialize with capacity 0 than to use the correct capacity upfront. I think that is more something curious and hopefully will change, but that might be the case here too? What is the performance if we initialize with 0 everywhere?

1024
} else {
self.num_output_rows / self.num_output_batches + 1024
};
let result = build_batch(
&batch,
&self.left_data,
&self.on_right,
&self.join_type,
&self.schema,
capacity,
);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
match result {
Ok(ref batch) => {
self.join_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
}
_ => {}
}
Some(result)
}
other => {
debug!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.join_time
);
other
}
})
}
}
Expand Down