Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ impl MetricsSet {
pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
self.sum(|m| match m.value() {
MetricValue::Count { name, .. } => name == metric_name,
MetricValue::Time { name, .. } => name == metric_name,
MetricValue::Time { name, .. } | MetricValue::MaxTime { name, .. } => {
name == metric_name
}
MetricValue::OutputRows(_) => false,
MetricValue::ElapsedCompute(_) => false,
MetricValue::SpillCount(_) => false,
Expand Down
75 changes: 69 additions & 6 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use std::{
};

use chrono::{DateTime, Utc};
use datafusion_common::instant::Instant;
use parking_lot::Mutex;

use datafusion_common::instant::Instant;

/// A counter to record things such as number of input or output rows
///
/// Note `clone`ing counters update the same underlying metrics
Expand Down Expand Up @@ -221,6 +222,11 @@ impl Time {
pub fn value(&self) -> usize {
self.nanos.load(Ordering::Relaxed)
}

/// Set the time to the maximum of itself and `other` and return the previous value
pub fn max(&self, other: &Self) -> usize {
self.nanos.fetch_max(other.value(), Ordering::Relaxed)
}
}

/// Stores a single timestamp, stored as the number of nanoseconds
Expand Down Expand Up @@ -396,6 +402,14 @@ pub enum MetricValue {
/// The value of the metric
time: Time,
},
/// Behaves the same as [`Time`](Self::Time) except when aggregated,
/// it returns the maximum instead.
MaxTime {
/// The provided name of this metric
name: Cow<'static, str>,
/// The value of the metric
time: Time,
},
/// The time at which execution started
StartTimestamp(Timestamp),
/// The time at which execution ended
Expand All @@ -414,7 +428,7 @@ impl MetricValue {
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Gauge { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Self::Time { name, .. } | Self::MaxTime { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
}
Expand All @@ -431,7 +445,7 @@ impl MetricValue {
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Gauge { gauge, .. } => gauge.value(),
Self::Time { time, .. } => time.value(),
Self::Time { time, .. } | Self::MaxTime { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
.and_then(|ts| ts.timestamp_nanos_opt())
Expand Down Expand Up @@ -467,6 +481,10 @@ impl MetricValue {
name: name.clone(),
time: Time::new(),
},
Self::MaxTime { name, .. } => Self::MaxTime {
name: name.clone(),
time: Time::new(),
},
Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
}
Expand Down Expand Up @@ -507,6 +525,14 @@ impl MetricValue {
time: other_time, ..
},
) => time.add(other_time),
(
Self::MaxTime { time, .. },
Self::MaxTime {
time: other_time, ..
},
) => {
time.max(other_time);
}
// timestamps are aggregated by min/max
(Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
timestamp.update_to_min(other_timestamp);
Expand Down Expand Up @@ -537,8 +563,9 @@ impl MetricValue {
Self::Count { .. } => 6,
Self::Gauge { .. } => 7,
Self::Time { .. } => 8,
Self::StartTimestamp(_) => 9, // show timestamps last
Self::EndTimestamp(_) => 10,
Self::MaxTime { .. } => 9,
Self::StartTimestamp(_) => 10, // show timestamps last
Self::EndTimestamp(_) => 11,
}
}

Expand All @@ -562,7 +589,9 @@ impl Display for MetricValue {
Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
write!(f, "{gauge}")
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
Self::ElapsedCompute(time)
| Self::Time { time, .. }
| Self::MaxTime { time, .. } => {
// distinguish between no time recorded and very small
// amount of time recorded
if time.value() > 0 {
Expand Down Expand Up @@ -614,6 +643,10 @@ mod tests {
name: "my_time".into(),
time: time.clone(),
},
MetricValue::MaxTime {
name: "my_time".into(),
time: time.clone(),
},
];

// if time is not set, it should not be reported as zero
Expand Down Expand Up @@ -649,4 +682,34 @@ mod tests {
);
}
}

#[test]
fn test_maxtime_agg() {
let time1 = Time::new();
time1.add_duration(Duration::from_nanos(117));

let time2 = Time::new();
time2.add_duration(Duration::from_nanos(2048));

let time3 = Time::new();
time3.add_duration(Duration::from_nanos(1337));

let mut mt1 = MetricValue::MaxTime {
name: "1".into(),
time: time1,
};
let mt2 = MetricValue::MaxTime {
name: "2".into(),
time: time2,
};
let mt3 = MetricValue::MaxTime {
name: "3".into(),
time: time3,
};

mt1.aggregate(&mt2);
mt1.aggregate(&mt3);

assert_eq!(mt1.as_usize(), 2048);
}
}