Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions datafusion/physical-expr-common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
borrow::Cow,
fmt::{self, Debug, Display},
sync::Arc,
vec::IntoIter,
};

// public exports
Expand Down Expand Up @@ -432,6 +433,38 @@ impl Display for MetricsSet {
}
}

impl IntoIterator for MetricsSet {
Comment thread
gabotechs marked this conversation as resolved.
type Item = Arc<Metric>;
type IntoIter = IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.metrics.into_iter()
}
}

Comment thread
gabotechs marked this conversation as resolved.
impl<'a> IntoIterator for &'a MetricsSet {
type Item = &'a Arc<Metric>;
type IntoIter = std::slice::Iter<'a, Arc<Metric>>;

fn into_iter(self) -> Self::IntoIter {
self.metrics.iter()
}
}

impl Extend<Arc<Metric>> for MetricsSet {
fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
self.metrics.extend(iter);
}
}

impl FromIterator<Arc<Metric>> for MetricsSet {
fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
Self {
metrics: iter.into_iter().collect(),
}
}
}

/// A set of [`Metric`]s for an individual operator.
///
/// This structure is intended as a convenience for execution plan
Expand Down Expand Up @@ -465,6 +498,14 @@ impl ExecutionPlanMetricsSet {
}
}

impl From<MetricsSet> for ExecutionPlanMetricsSet {
fn from(metrics: MetricsSet) -> Self {
Self {
inner: Arc::new(Mutex::new(metrics)),
}
}
}

/// `name=value` pairs identifying a metric. This concept is called various things
/// in various different systems:
///
Expand Down Expand Up @@ -752,6 +793,52 @@ mod tests {
};
}

#[test]
fn test_extend() {
let mut metrics = MetricsSet::new();
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));

metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
assert_eq!(metrics.iter().count(), 2);

let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
metrics.extend(std::iter::once(Arc::clone(&m3)));
assert_eq!(metrics.iter().count(), 3);
}

#[test]
fn test_collect() {
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));

let metrics: MetricsSet =
vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
assert_eq!(metrics.iter().count(), 2);

let empty: MetricsSet = std::iter::empty().collect();
assert_eq!(empty.iter().count(), 0);
}

#[test]
fn test_into_iterator_by_ref() {
let mut metrics = MetricsSet::new();
metrics.push(Arc::new(Metric::new(
MetricValue::OutputRows(Count::new()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::SpillCount(Count::new()),
None,
)));

let mut count = 0;
for _m in &metrics {
count += 1;
}
assert_eq!(count, 2);
}

#[test]
fn test_sorted_for_display() {
let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading