From 859af91af309ce469e7e3d2dbd8d7bd5fa98a0db Mon Sep 17 00:00:00 2001 From: Gabriel <45515538+gabotechs@users.noreply.github.com> Date: Wed, 22 Apr 2026 15:13:48 +0200 Subject: [PATCH] Improve ergonomics for ExecutionPlanMetricsSet and MetricsSet (#21762) ## Which issue does this PR close? - None ## Rationale for this change Sometimes, when an `ExecutionPlan` implementation is complex, different metrics are collected from different structs that compose the whole execution plan. These metrics need to eventually be served from the single entrypoint `ExecutionPlan::metrics()` or `DataSource::metrics()`, and the current api does not have good methods for merging several `ExecutionPlanMetricsSet` coming from different sources into a single one. ## What changes are included in this PR? Add some basic conversion and iteration methods for `MetricsSet` and `ExecutionPlanMetricsSet`, in order to improve ergonomics around these structs. ## Are these changes tested? This is purely just basic std trait implementations and method exposure, so as long as the code compiles, I don't think it needs further tests. ## Are there any user-facing changes? People will see some more available methods in the `MetricsSet` and `ExecutionPlanMetricsSet` structs for ergonomics. (cherry picked from commit ff844be198821863f013c47d527cf879ba783636) --- .../physical-expr-common/src/metrics/mod.rs | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index 18dafa41276d9..f760fa8d62faf 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -29,6 +29,7 @@ use std::{ borrow::Cow, fmt::{Debug, Display}, sync::Arc, + vec::IntoIter, }; // public exports @@ -398,6 +399,38 @@ impl Display for MetricsSet { } } +impl IntoIterator for MetricsSet { + type Item = Arc; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.metrics.into_iter() + } +} + +impl<'a> IntoIterator for &'a MetricsSet { + type Item = &'a Arc; + type IntoIter = std::slice::Iter<'a, Arc>; + + fn into_iter(self) -> Self::IntoIter { + self.metrics.iter() + } +} + +impl Extend> for MetricsSet { + fn extend>>(&mut self, iter: I) { + self.metrics.extend(iter); + } +} + +impl FromIterator> for MetricsSet { + fn from_iter>>(iter: T) -> Self { + Self { + metrics: iter.into_iter().collect(), + } + } +} + /// A set of [`Metric`]s for an individual operator. /// /// This structure is intended as a convenience for execution plan @@ -431,6 +464,14 @@ impl ExecutionPlanMetricsSet { } } +impl From for ExecutionPlanMetricsSet { + fn from(metrics: MetricsSet) -> Self { + Self { + inner: Arc::new(Mutex::new(metrics)), + } + } +} + /// `name=value` pairs identifying a metric. This concept is called various things /// in various different systems: /// @@ -718,6 +759,52 @@ mod tests { }; } + #[test] + fn test_extend() { + let mut metrics = MetricsSet::new(); + let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None)); + let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None)); + + metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]); + assert_eq!(metrics.iter().count(), 2); + + let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None)); + metrics.extend(std::iter::once(Arc::clone(&m3))); + assert_eq!(metrics.iter().count(), 3); + } + + #[test] + fn test_collect() { + let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None)); + let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None)); + + let metrics: MetricsSet = + vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect(); + assert_eq!(metrics.iter().count(), 2); + + let empty: MetricsSet = std::iter::empty().collect(); + assert_eq!(empty.iter().count(), 0); + } + + #[test] + fn test_into_iterator_by_ref() { + let mut metrics = MetricsSet::new(); + metrics.push(Arc::new(Metric::new( + MetricValue::OutputRows(Count::new()), + None, + ))); + metrics.push(Arc::new(Metric::new( + MetricValue::SpillCount(Count::new()), + None, + ))); + + let mut count = 0; + for _m in &metrics { + count += 1; + } + assert_eq!(count, 2); + } + #[test] fn test_sorted_for_display() { let metrics = ExecutionPlanMetricsSet::new();