diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 9232865aa09c..00a8d9592448 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -253,7 +253,9 @@ impl MetricsSet { pub fn sum_by_name(&self, metric_name: &str) -> Option { self.sum(|m| match m.value() { MetricValue::Count { name, .. } => name == metric_name, - MetricValue::Time { name, .. } => name == metric_name, + MetricValue::Time { name, .. } | MetricValue::MaxTime { name, .. } => { + name == metric_name + } MetricValue::OutputRows(_) => false, MetricValue::ElapsedCompute(_) => false, MetricValue::SpillCount(_) => false, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 22db8f1e4e88..2750f3f5c6a3 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -28,9 +28,10 @@ use std::{ }; use chrono::{DateTime, Utc}; -use datafusion_common::instant::Instant; use parking_lot::Mutex; +use datafusion_common::instant::Instant; + /// A counter to record things such as number of input or output rows /// /// Note `clone`ing counters update the same underlying metrics @@ -221,6 +222,11 @@ impl Time { pub fn value(&self) -> usize { self.nanos.load(Ordering::Relaxed) } + + /// Set the time to the maximum of itself and `other` and return the previous value + pub fn max(&self, other: &Self) -> usize { + self.nanos.fetch_max(other.value(), Ordering::Relaxed) + } } /// Stores a single timestamp, stored as the number of nanoseconds @@ -396,6 +402,14 @@ pub enum MetricValue { /// The value of the metric time: Time, }, + /// Behaves the same as [`Time`](Self::Time) except when aggregated, + /// it returns the maximum instead. + MaxTime { + /// The provided name of this metric + name: Cow<'static, str>, + /// The value of the metric + time: Time, + }, /// The time at which execution started StartTimestamp(Timestamp), /// The time at which execution ended @@ -414,7 +428,7 @@ impl MetricValue { Self::ElapsedCompute(_) => "elapsed_compute", Self::Count { name, .. } => name.borrow(), Self::Gauge { name, .. } => name.borrow(), - Self::Time { name, .. } => name.borrow(), + Self::Time { name, .. } | Self::MaxTime { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", } @@ -431,7 +445,7 @@ impl MetricValue { Self::ElapsedCompute(time) => time.value(), Self::Count { count, .. } => count.value(), Self::Gauge { gauge, .. } => gauge.value(), - Self::Time { time, .. } => time.value(), + Self::Time { time, .. } | Self::MaxTime { time, .. } => time.value(), Self::StartTimestamp(timestamp) => timestamp .value() .and_then(|ts| ts.timestamp_nanos_opt()) @@ -467,6 +481,10 @@ impl MetricValue { name: name.clone(), time: Time::new(), }, + Self::MaxTime { name, .. } => Self::MaxTime { + name: name.clone(), + time: Time::new(), + }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), } @@ -507,6 +525,14 @@ impl MetricValue { time: other_time, .. }, ) => time.add(other_time), + ( + Self::MaxTime { time, .. }, + Self::MaxTime { + time: other_time, .. + }, + ) => { + time.max(other_time); + } // timestamps are aggregated by min/max (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => { timestamp.update_to_min(other_timestamp); @@ -537,8 +563,9 @@ impl MetricValue { Self::Count { .. } => 6, Self::Gauge { .. } => 7, Self::Time { .. } => 8, - Self::StartTimestamp(_) => 9, // show timestamps last - Self::EndTimestamp(_) => 10, + Self::MaxTime { .. } => 9, + Self::StartTimestamp(_) => 10, // show timestamps last + Self::EndTimestamp(_) => 11, } } @@ -562,7 +589,9 @@ impl Display for MetricValue { Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => { write!(f, "{gauge}") } - Self::ElapsedCompute(time) | Self::Time { time, .. } => { + Self::ElapsedCompute(time) + | Self::Time { time, .. } + | Self::MaxTime { time, .. } => { // distinguish between no time recorded and very small // amount of time recorded if time.value() > 0 { @@ -614,6 +643,10 @@ mod tests { name: "my_time".into(), time: time.clone(), }, + MetricValue::MaxTime { + name: "my_time".into(), + time: time.clone(), + }, ]; // if time is not set, it should not be reported as zero @@ -649,4 +682,34 @@ mod tests { ); } } + + #[test] + fn test_maxtime_agg() { + let time1 = Time::new(); + time1.add_duration(Duration::from_nanos(117)); + + let time2 = Time::new(); + time2.add_duration(Duration::from_nanos(2048)); + + let time3 = Time::new(); + time3.add_duration(Duration::from_nanos(1337)); + + let mut mt1 = MetricValue::MaxTime { + name: "1".into(), + time: time1, + }; + let mt2 = MetricValue::MaxTime { + name: "2".into(), + time: time2, + }; + let mt3 = MetricValue::MaxTime { + name: "3".into(), + time: time3, + }; + + mt1.aggregate(&mt2); + mt1.aggregate(&mt3); + + assert_eq!(mt1.as_usize(), 2048); + } }