diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 195a19c54070..f426bc9d3c3c 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -64,7 +64,7 @@ use super::{ SendableRecordBatchStream, }; use crate::physical_plan::coalesce_batches::concat_batches; -use crate::physical_plan::PhysicalExpr; +use crate::physical_plan::{PhysicalExpr, SQLMetric}; use log::debug; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. @@ -102,6 +102,35 @@ pub struct HashJoinExec { random_state: RandomState, /// Partitioning mode to use mode: PartitionMode, + /// Metrics + metrics: Arc, +} + +/// Metrics for HashJoinExec +#[derive(Debug)] +struct HashJoinMetrics { + /// Total time for joining probe-side batches to the build-side batches + join_time: Arc, + /// Number of batches consumed by this operator + input_batches: Arc, + /// Number of rows consumed by this operator + input_rows: Arc, + /// Number of batches produced by this operator + output_batches: Arc, + /// Number of rows produced by this operator + output_rows: Arc, +} + +impl HashJoinMetrics { + fn new() -> Self { + Self { + join_time: SQLMetric::time_nanos(), + input_batches: SQLMetric::counter(), + input_rows: SQLMetric::counter(), + output_batches: SQLMetric::counter(), + output_rows: SQLMetric::counter(), + } + } } #[derive(Clone, Copy, Debug, PartialEq)] @@ -154,6 +183,7 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), random_state, mode: partition_mode, + metrics: Arc::new(HashJoinMetrics::new()), }) } @@ -394,6 +424,7 @@ impl ExecutionPlan for HashJoinExec { column_indices, self.random_state.clone(), visited_left_side, + self.metrics.clone(), ))) } @@ -412,6 +443,22 @@ impl ExecutionPlan for HashJoinExec { } } } + + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone()); + metrics.insert( + "inputBatches".to_owned(), + (*self.metrics.input_batches).clone(), + ); + metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone()); + metrics.insert( + "outputBatches".to_owned(), + (*self.metrics.output_batches).clone(), + ); + metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone()); + metrics + } } /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, @@ -467,22 +514,14 @@ struct HashJoinStream { right: SendableRecordBatchStream, /// Information of index and left / right placement of columns column_indices: Vec, - /// number of input batches - num_input_batches: usize, - /// number of input rows - num_input_rows: usize, - /// number of batches produced - num_output_batches: usize, - /// number of rows produced - num_output_rows: usize, - /// total time for joining probe-side batches to the build-side batches - join_time: usize, /// Random state used for hashing initialization random_state: RandomState, /// Keeps track of the left side rows whether they are visited visited_left_side: Vec, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240 /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, + /// Metrics + metrics: Arc, } #[allow(clippy::too_many_arguments)] @@ -497,6 +536,7 @@ impl HashJoinStream { column_indices: Vec, random_state: RandomState, visited_left_side: Vec, + metrics: Arc, ) -> Self { HashJoinStream { schema, @@ -506,14 +546,10 @@ impl HashJoinStream { left_data, right, column_indices, - num_input_batches: 0, - num_input_rows: 0, - num_output_batches: 0, - num_output_rows: 0, - join_time: 0, random_state, visited_left_side, is_exhausted: false, + metrics, } } } @@ -1215,12 +1251,14 @@ impl Stream for HashJoinStream { &self.column_indices, &self.random_state, ); - self.num_input_batches += 1; - self.num_input_rows += batch.num_rows(); + self.metrics.input_batches.add(1); + self.metrics.input_rows.add(batch.num_rows()); if let Ok((ref batch, ref left_side)) = result { - self.join_time += start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + self.metrics + .join_time + .add(start.elapsed().as_millis() as usize); + self.metrics.output_batches.add(1); + self.metrics.output_rows.add(batch.num_rows()); match self.join_type { JoinType::Left @@ -1254,13 +1292,14 @@ impl Stream for HashJoinStream { self.join_type != JoinType::Semi, ); if let Ok(ref batch) = result { - self.num_input_batches += 1; - self.num_input_rows += batch.num_rows(); + self.metrics.input_batches.add(1); + self.metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { - self.join_time += - start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + self.metrics + .join_time + .add(start.elapsed().as_millis() as usize); + self.metrics.output_batches.add(1); + self.metrics.output_rows.add(batch.num_rows()); } } self.is_exhausted = true; @@ -1274,16 +1313,6 @@ impl Stream for HashJoinStream { | JoinType::Right => {} } - // End of right batch, print stats in debug mode - debug!( - "Processed {} probe-side input batches containing {} rows and \ - produced {} output batches containing {} rows in {} ms", - self.num_input_batches, - self.num_input_rows, - self.num_output_batches, - self.num_output_rows, - self.join_time - ); other } })