Skip to content

Commit

Permalink
Reset counters
Browse files Browse the repository at this point in the history
  • Loading branch information
andll committed Sep 13, 2022
1 parent b82b7f8 commit 52157a1
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions crates/sui-core/src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

use futures::FutureExt;
use parking_lot::Mutex;
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
IntGaugeVec, Registry,
};
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -35,12 +36,17 @@ pub struct HistogramVec {
channel: mpsc::UnboundedSender<HistogramMessage>,
}

struct HistogramCollector {
reporter: Arc<Mutex<HistogramReporter>>,
channel: mpsc::UnboundedReceiver<HistogramMessage>,
}

struct HistogramReporter {
gauge: IntGaugeVec,
sum: IntCounterVec,
count: IntCounterVec,
percentiles: Arc<Vec<usize>>,
channel: mpsc::UnboundedReceiver<HistogramMessage>,
known_labels: HashSet<HistogramLabels>,
percentiles: Vec<usize>,
}

type HistogramLabels = Arc<HistogramLabelsInner>;
Expand Down Expand Up @@ -117,15 +123,19 @@ impl HistogramVec {
// data points being dropped way before it will produce any measurable memory pressure
#[allow(clippy::disallowed_methods)]
let (sender, receiver) = mpsc::unbounded_channel();
let percentiles = Arc::new(percentiles);
let reporter = HistogramReporter {
gauge,
sum,
count,
percentiles,
known_labels: Default::default(),
};
let reporter = Arc::new(Mutex::new(reporter));
let collector = HistogramCollector {
reporter,
channel: receiver,
};
Handle::current().spawn(reporter.run());
Handle::current().spawn(collector.run());
Self { channel: sender }
}

Expand Down Expand Up @@ -178,7 +188,7 @@ impl Histogram {
}
}

impl HistogramReporter {
impl HistogramCollector {
pub async fn run(mut self) {
let mut deadline = Instant::now();
loop {
Expand Down Expand Up @@ -209,41 +219,30 @@ impl HistogramReporter {
}
}
}
if labeled_data.is_empty() {
return Ok(());
}
if Arc::strong_count(&self.percentiles) != 1 {
// Not processing new data point if we have not finished processing previous
error!("Histogram data overflow - we receive histogram data faster then can process. Some histogram data is dropped")
if Arc::strong_count(&self.reporter) != 1 {
error!("Histogram data overflow - we receive histogram data faster then can process. Some histogram data is dropped");
} else {
let percentiles = self.percentiles.clone();
let gauge = self.gauge.clone();
let sum = self.sum.clone();
let count = self.count.clone();
// Histogram calculation can be CPU intensive, running in tokio blocking thread pool
Handle::current()
.spawn_blocking(move || Self::report(percentiles, gauge, sum, count, labeled_data));
let reporter = self.reporter.clone();
Handle::current().spawn_blocking(move || reporter.lock().report(labeled_data));
}
Ok(())
}
}

fn report(
percentiles: Arc<Vec<usize>>,
gauge: IntGaugeVec,
sum_counter: IntCounterVec,
count_counter: IntCounterVec,
labeled_data: HashMap<HistogramLabels, Vec<Point>>,
) {
impl HistogramReporter {
pub fn report(&mut self, labeled_data: HashMap<HistogramLabels, Vec<Point>>) {
let mut reset_labels = self.known_labels.clone();
for (label, mut data) in labeled_data {
self.known_labels.insert(label.clone());
reset_labels.remove(&label);
assert!(!data.is_empty());
data.sort_unstable();
for pct1000 in percentiles.iter() {
for pct1000 in self.percentiles.iter() {
let index = Self::pct1000_index(data.len(), *pct1000);
let point = *data.get(index).unwrap();
let pct_str = Self::format_pct1000(*pct1000);
let labels = label.labels.iter().map(|s| &s[..]).chain([&pct_str[..]]);
let labels: Vec<_> = labels.collect();
let metric = gauge.with_label_values(&labels);
let labels = Self::gauge_labels(&label, &pct_str);
let metric = self.gauge.with_label_values(&labels);
metric.set(point as i64);
}
let mut sum = 0u64;
Expand All @@ -252,11 +251,25 @@ impl HistogramReporter {
sum += point;
}
let labels: Vec<_> = label.labels.iter().map(|s| &s[..]).collect();
sum_counter.with_label_values(&labels).inc_by(sum);
count_counter.with_label_values(&labels).inc_by(count);
self.sum.with_label_values(&labels).inc_by(sum);
self.count.with_label_values(&labels).inc_by(count);
}

for reset_label in reset_labels {
for pct1000 in self.percentiles.iter() {
let pct_str = Self::format_pct1000(*pct1000);
let labels = Self::gauge_labels(&reset_label, &pct_str);
let metric = self.gauge.with_label_values(&labels);
metric.set(0);
}
}
}

fn gauge_labels<'a>(label: &'a HistogramLabels, pct_str: &'a str) -> Vec<&'a str> {
let labels = label.labels.iter().map(|s| &s[..]).chain([pct_str]);
labels.collect()
}

/// Returns value in range [0; len)
fn pct1000_index(len: usize, pct1000: usize) -> usize {
len * pct1000 / 1000
Expand Down

0 comments on commit 52157a1

Please sign in to comment.