diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 3017df623f1d..82f4ec7c2c35 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -149,7 +149,9 @@ pub struct HashJoinExec { #[derive(Debug)] struct HashJoinMetrics { /// Total time for joining probe-side batches to the build-side batches - join_time: metrics::Time, + probe_time: metrics::Time, + /// Total time for building hashmap + build_time: metrics::Time, /// Number of batches consumed by this operator input_batches: metrics::Count, /// Number of rows consumed by this operator @@ -162,7 +164,9 @@ struct HashJoinMetrics { impl HashJoinMetrics { pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition); + let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", partition); + + let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition); let input_batches = MetricBuilder::new(metrics).counter("input_batches", partition); @@ -175,7 +179,8 @@ impl HashJoinMetrics { let output_rows = MetricBuilder::new(metrics).output_rows(partition); Self { - join_time, + probe_time, + build_time, input_batches, input_rows, output_batches, @@ -1487,10 +1492,12 @@ impl HashJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { + let build_timer = self.join_metrics.build_time.timer(); let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, Err(e) => return Poll::Ready(Some(Err(e))), }; + build_timer.done(); let visited_left_side = self.visited_left_side.get_or_insert_with(|| { let num_rows = left_data.1.num_rows(); @@ -1516,7 +1523,7 @@ impl HashJoinStream { .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { - let timer = self.join_metrics.join_time.timer(); + let timer = self.join_metrics.probe_time.timer(); let result = build_batch( &batch, left_data, @@ -1532,7 +1539,6 @@ impl HashJoinStream { self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(batch.num_rows()); if let Ok((ref batch, ref left_side)) = result { - timer.done(); self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(batch.num_rows()); @@ -1551,11 +1557,13 @@ impl HashJoinStream { | JoinType::RightAnti => {} } } - Some(result.map(|x| x.0)) + let final_result = Some(result.map(|x| x.0)); + timer.done(); + final_result } Some(err) => Some(err), None => { - let timer = self.join_metrics.join_time.timer(); + let timer = self.join_metrics.probe_time.timer(); // For the left join, produce rows for unmatched rows match self.join_type { JoinType::Left