Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 66 additions & 37 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::PhysicalExpr;
use crate::physical_plan::{PhysicalExpr, SQLMetric};
use log::debug;

// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
Expand Down Expand Up @@ -102,6 +102,35 @@ pub struct HashJoinExec {
random_state: RandomState,
/// Partitioning mode to use
mode: PartitionMode,
/// Metrics
metrics: Arc<HashJoinMetrics>,
}

/// Metrics for HashJoinExec
#[derive(Debug)]
struct HashJoinMetrics {
/// Total time for joining probe-side batches to the build-side batches
join_time: Arc<SQLMetric>,
/// Number of batches consumed by this operator
input_batches: Arc<SQLMetric>,
/// Number of rows consumed by this operator
input_rows: Arc<SQLMetric>,
/// Number of batches produced by this operator
output_batches: Arc<SQLMetric>,
/// Number of rows produced by this operator
output_rows: Arc<SQLMetric>,
}

impl HashJoinMetrics {
fn new() -> Self {
Self {
join_time: SQLMetric::time_nanos(),
input_batches: SQLMetric::counter(),
input_rows: SQLMetric::counter(),
output_batches: SQLMetric::counter(),
output_rows: SQLMetric::counter(),
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
Expand Down Expand Up @@ -154,6 +183,7 @@ impl HashJoinExec {
build_side: Arc::new(Mutex::new(None)),
random_state,
mode: partition_mode,
metrics: Arc::new(HashJoinMetrics::new()),
})
}

Expand Down Expand Up @@ -394,6 +424,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices,
self.random_state.clone(),
visited_left_side,
self.metrics.clone(),
)))
}

Expand All @@ -412,6 +443,22 @@ impl ExecutionPlan for HashJoinExec {
}
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
metrics.insert(
"inputBatches".to_owned(),
(*self.metrics.input_batches).clone(),
);
metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
metrics.insert(
"outputBatches".to_owned(),
(*self.metrics.output_batches).clone(),
);
metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
metrics
}
}

/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
Expand Down Expand Up @@ -467,22 +514,14 @@ struct HashJoinStream {
right: SendableRecordBatchStream,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for joining probe-side batches to the build-side batches
join_time: usize,
/// Random state used for hashing initialization
random_state: RandomState,
/// Keeps track of the left side rows whether they are visited
visited_left_side: Vec<bool>, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
/// Metrics
metrics: Arc<HashJoinMetrics>,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -497,6 +536,7 @@ impl HashJoinStream {
column_indices: Vec<ColumnIndex>,
random_state: RandomState,
visited_left_side: Vec<bool>,
metrics: Arc<HashJoinMetrics>,
) -> Self {
HashJoinStream {
schema,
Expand All @@ -506,14 +546,10 @@ impl HashJoinStream {
left_data,
right,
column_indices,
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
join_time: 0,
random_state,
visited_left_side,
is_exhausted: false,
metrics,
}
}
}
Expand Down Expand Up @@ -1215,12 +1251,14 @@ impl Stream for HashJoinStream {
&self.column_indices,
&self.random_state,
);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok((ref batch, ref left_side)) = result {
self.join_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
self.metrics
.join_time
.add(start.elapsed().as_millis() as usize);
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());

match self.join_type {
JoinType::Left
Expand Down Expand Up @@ -1254,13 +1292,14 @@ impl Stream for HashJoinStream {
self.join_type != JoinType::Semi,
);
if let Ok(ref batch) = result {
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
self.join_time +=
start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
self.metrics
.join_time
.add(start.elapsed().as_millis() as usize);
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());
}
}
self.is_exhausted = true;
Expand All @@ -1274,16 +1313,6 @@ impl Stream for HashJoinStream {
| JoinType::Right => {}
}

// End of right batch, print stats in debug mode
debug!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.join_time
);
other
}
})
Expand Down