diff --git a/crates/sui-core/src/histogram.rs b/crates/sui-core/src/histogram.rs index 7b878a566aef3..754892f627277 100644 --- a/crates/sui-core/src/histogram.rs +++ b/crates/sui-core/src/histogram.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use futures::FutureExt; +use parking_lot::Mutex; use prometheus::{ register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec, IntGaugeVec, Registry, }; use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::Duration; @@ -35,12 +36,17 @@ pub struct HistogramVec { channel: mpsc::UnboundedSender, } +struct HistogramCollector { + reporter: Arc>, + channel: mpsc::UnboundedReceiver, +} + struct HistogramReporter { gauge: IntGaugeVec, sum: IntCounterVec, count: IntCounterVec, - percentiles: Arc>, - channel: mpsc::UnboundedReceiver, + known_labels: HashSet, + percentiles: Vec, } type HistogramLabels = Arc; @@ -117,15 +123,19 @@ impl HistogramVec { // data points being dropped way before it will produce any measurable memory pressure #[allow(clippy::disallowed_methods)] let (sender, receiver) = mpsc::unbounded_channel(); - let percentiles = Arc::new(percentiles); let reporter = HistogramReporter { gauge, sum, count, percentiles, + known_labels: Default::default(), + }; + let reporter = Arc::new(Mutex::new(reporter)); + let collector = HistogramCollector { + reporter, channel: receiver, }; - Handle::current().spawn(reporter.run()); + Handle::current().spawn(collector.run()); Self { channel: sender } } @@ -178,7 +188,7 @@ impl Histogram { } } -impl HistogramReporter { +impl HistogramCollector { pub async fn run(mut self) { let mut deadline = Instant::now(); loop { @@ -209,41 +219,30 @@ impl HistogramReporter { } } } - if labeled_data.is_empty() { - return Ok(()); - } - if Arc::strong_count(&self.percentiles) != 1 { - // Not processing new data point if we have not finished processing previous - error!("Histogram data overflow - we receive histogram data faster then can process. Some histogram data is dropped") + if Arc::strong_count(&self.reporter) != 1 { + error!("Histogram data overflow - we receive histogram data faster then can process. Some histogram data is dropped"); } else { - let percentiles = self.percentiles.clone(); - let gauge = self.gauge.clone(); - let sum = self.sum.clone(); - let count = self.count.clone(); - // Histogram calculation can be CPU intensive, running in tokio blocking thread pool - Handle::current() - .spawn_blocking(move || Self::report(percentiles, gauge, sum, count, labeled_data)); + let reporter = self.reporter.clone(); + Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data)); } Ok(()) } +} - fn report( - percentiles: Arc>, - gauge: IntGaugeVec, - sum_counter: IntCounterVec, - count_counter: IntCounterVec, - labeled_data: HashMap>, - ) { +impl HistogramReporter { + pub fn report(&mut self, labeled_data: HashMap>) { + let mut reset_labels = self.known_labels.clone(); for (label, mut data) in labeled_data { + self.known_labels.insert(label.clone()); + reset_labels.remove(&label); assert!(!data.is_empty()); data.sort_unstable(); - for pct1000 in percentiles.iter() { + for pct1000 in self.percentiles.iter() { let index = Self::pct1000_index(data.len(), *pct1000); let point = *data.get(index).unwrap(); let pct_str = Self::format_pct1000(*pct1000); - let labels = label.labels.iter().map(|s| &s[..]).chain([&pct_str[..]]); - let labels: Vec<_> = labels.collect(); - let metric = gauge.with_label_values(&labels); + let labels = Self::gauge_labels(&label, &pct_str); + let metric = self.gauge.with_label_values(&labels); metric.set(point as i64); } let mut sum = 0u64; @@ -252,11 +251,25 @@ impl HistogramReporter { sum += point; } let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect(); - sum_counter.with_label_values(&labels).inc_by(sum); - count_counter.with_label_values(&labels).inc_by(count); + self.sum.with_label_values(&labels).inc_by(sum); + self.count.with_label_values(&labels).inc_by(count); + } + + for reset_label in reset_labels { + for pct1000 in self.percentiles.iter() { + let pct_str = Self::format_pct1000(*pct1000); + let labels = Self::gauge_labels(&reset_label, &pct_str); + let metric = self.gauge.with_label_values(&labels); + metric.set(0); + } } } + fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> { + let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]); + labels.collect() + } + /// Returns value in range [0; len) fn pct1000_index(len: usize, pct1000: usize) -> usize { len * pct1000 / 1000