Skip to content

Commit

Permalink
Improve prometheus utils api
Browse files Browse the repository at this point in the history
This makes things a little easier to use and collapses
more of the functions into a single main entry point
`.publish()` with the exception of `.publish_stats()`,
because we want the user to understand when they publish
slices of numbers it is treated as a stat.
  • Loading branch information
allada committed Jul 19, 2023
1 parent f274dcf commit 87bd0e6
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 59 deletions.
16 changes: 8 additions & 8 deletions cas/store/filesystem_store.rs
Expand Up @@ -643,27 +643,27 @@ impl<Fe: FileEntry> StoreTrait for FilesystemStore<Fe> {
}

impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
fn gather_metrics(&self, collector: &mut CollectorState) {
collector.publish(
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"read_buff_size",
self.read_buffer_size,
&self.read_buffer_size,
"Size of the configured read buffer size",
);
collector.publish(
c.publish(
"active_drop_spawns",
self.shared_context.active_drop_spawns.load(Ordering::Relaxed),
&self.shared_context.active_drop_spawns,
"Number of active drop spawns",
);
collector.publish_text(
c.publish(
"temp_path",
&self.shared_context.temp_path,
"Path to the configured temp path",
);
collector.publish_text(
c.publish(
"content_path",
&self.shared_context.content_path,
"Path to the configured content path",
);
collector.publish_child(Some("evicting_map"), &self.evicting_map);
c.publish("evicting_map", &self.evicting_map, "");
}
}
4 changes: 2 additions & 2 deletions cas/store/memory_store.rs
Expand Up @@ -144,7 +144,7 @@ impl StoreTrait for MemoryStore {
}

impl MetricsComponent for MemoryStore {
fn gather_metrics(&self, collector: &mut CollectorState) {
collector.publish_child(Some("evicting_map"), &self.evicting_map);
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish("evicting_map", &self.evicting_map, "");
}
}
18 changes: 9 additions & 9 deletions cas/store/verify_store.rs
Expand Up @@ -172,25 +172,25 @@ impl StoreTrait for VerifyStore {
}

impl MetricsComponent for VerifyStore {
fn gather_metrics(&self, collector: &mut CollectorState) {
collector.publish(
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"verify_size",
self.verify_size,
&self.verify_size,
"If the verification store is verifying the size of the data",
);
collector.publish(
c.publish(
"verify_hash",
self.verify_hash,
&self.verify_hash,
"If the verification store is verifying the hash of the data",
);
collector.publish(
c.publish(
"size_verification_failures",
self.size_verification_failures.load(Ordering::Relaxed),
&self.size_verification_failures,
"Number of failures the verification store had due to size mismatches",
);
collector.publish(
c.publish(
"hash_verification_failures",
self.hash_verification_failures.load(Ordering::Relaxed),
&self.hash_verification_failures,
"Number of failures the verification store had due to hash mismatches",
);
}
Expand Down
32 changes: 16 additions & 16 deletions util/evicting_map.rs
Expand Up @@ -351,38 +351,38 @@ where

impl<T: LenEntry + Debug, I: InstantWrapper> MetricsComponent for EvictingMap<T, I> {
fn gather_metrics(&self, collector: &mut CollectorState) {
collector.publish("max_bytes", self.max_bytes, "Maximum size of the store in bytes");
collector.publish("max_bytes", &self.max_bytes, "Maximum size of the store in bytes");
collector.publish(
"evict_bytes",
self.evict_bytes,
&self.evict_bytes,
"Number of bytes to evict when the store is full",
);
collector.publish(
"anchor_time",
self.anchor_time.unix_timestamp(),
&self.anchor_time.unix_timestamp(),
"Anchor time for the store",
);
collector.publish(
"max_seconds",
self.max_seconds,
&self.max_seconds,
"Maximum number of seconds to keep an item in the store",
);
collector.publish(
"max_count",
self.max_count,
&self.max_count,
"Maximum number of items to keep in the store",
);
futures::executor::block_on(async move {
let state = self.state.lock().await;
collector.publish(
"sum_store_size",
state.sum_store_size,
&state.sum_store_size,
"Total size of all items in the store",
);
collector.publish("items_in_store", state.lru.len(), "Mumber of items in the store");
collector.publish("items_in_store", &state.lru.len(), "Mumber of items in the store");
collector.publish(
"oldest_item_timestamp",
state
&state
.lru
.peek_lru()
.map(|(_, v)| v.seconds_since_anchor as i64 + self.anchor_time.unix_timestamp() as i64)
Expand All @@ -391,7 +391,7 @@ impl<T: LenEntry + Debug, I: InstantWrapper> MetricsComponent for EvictingMap<T,
);
collector.publish(
"oldest_item_age",
state
&state
.lru
.peek_lru()
.map(|(_, v)| v.seconds_since_anchor as i64 + self.anchor_time.elapsed().as_secs() as i64)
Expand All @@ -400,37 +400,37 @@ impl<T: LenEntry + Debug, I: InstantWrapper> MetricsComponent for EvictingMap<T,
);
collector.publish(
"evicted_items",
state.evicted_items,
&state.evicted_items,
"Number of items evicted from the store",
);
collector.publish(
"evicted_bytes",
state.evicted_bytes,
&state.evicted_bytes,
"Number of bytes evicted from the store",
);
collector.publish(
"lifetime_inserted_bytes",
state.lifetime_inserted_bytes,
&state.lifetime_inserted_bytes,
"Number of bytes inserted into the store since it was created",
);
collector.publish(
"replaced_bytes",
state.replaced_bytes,
&state.replaced_bytes,
"Number of bytes replaced in the store",
);
collector.publish(
"replaced_items",
state.replaced_items,
&state.replaced_items,
"Number of items replaced in the store",
);
collector.publish(
"removed_bytes",
state.removed_bytes,
&state.removed_bytes,
"Number of bytes explicitly removed from the store",
);
collector.publish(
"removed_items",
state.removed_items,
&state.removed_items,
"Number of items explicitly removed from the store",
);
collector.publish_stats(
Expand Down
124 changes: 100 additions & 24 deletions util/prometheus_utils.rs
Expand Up @@ -15,6 +15,10 @@
use std::borrow::Cow;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::atomic::{
AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicU16, AtomicU32, AtomicU64, AtomicU8, AtomicUsize,
Ordering,
};
use std::sync::{Arc, Weak};

use prometheus_client::collector::Collector as PrometheusCollector;
Expand Down Expand Up @@ -48,8 +52,17 @@ pub struct CollectorState {
}

impl CollectorState {
/// Publish a numerical metric.
pub fn publish<N, T>(&mut self, name: impl Into<String>, value: T, help: impl Into<String>)
/// Publishes a value. This should be the primary way a metric is published.
/// Any special types that want metrics published should implement `MetricPublisher`
/// for that type.
#[inline]
pub fn publish(&mut self, name: impl Into<String>, value: impl MetricPublisher, help: impl Into<String>) {
value.publish(self, name.into(), help.into());
}

/// Publish a numerical metric. Usually used by `MetricPublisher` to publish metrics.
#[inline]
pub fn publish_number<N, T>(&mut self, name: impl Into<String>, value: T, help: impl Into<String>)
where
N: Debug + 'static,
T: Into<NumericalMetric<N>>,
Expand All @@ -60,33 +73,16 @@ impl CollectorState {
}

/// Publish a static text metric. Generally these are used for labels and don't
/// change during runtime.
/// change during runtime. Usually used by `MetricPublisher` to publish metrics.
#[inline]
pub fn publish_text(&mut self, name: impl Into<String>, value: impl Into<String>, help: impl Into<String>) {
self.text.push((name.into(), help.into(), value.into()));
}

/// Publish a child module. The child module must implement the `MetricsComponent`.
/// The child module will have all of its metrics published prefixed with the
/// parent's name.
pub fn publish_child(&mut self, module_name: Option<impl Into<String>>, module: &impl MetricsComponent) {
let mut state = CollectorState {
module_name: match (&self.module_name, module_name) {
(Some(parent), None) => Some(parent.clone()),
(Some(parent), Some(child)) => Some(format!("{parent}_{}", child.into())),
(None, Some(child)) => Some(child.into()),
(None, None) => None,
},
metrics: Vec::default(),
text: Vec::default(),
children: Vec::default(),
};
module.gather_metrics(&mut state);
self.children.push(state);
}

/// Publish a histogram metric. Be careful not to have the iterator take too
/// much data or this will consume a lot of memory because we need to collect
/// all the data and sort them to calculate the percentiles.
#[inline]
pub fn publish_stats<N, T>(
&mut self,
name: impl std::fmt::Display,
Expand All @@ -107,7 +103,7 @@ impl CollectorState {
let index = (i * data_len) as usize;
let value = data.get(index).unwrap();
let p = i * 100.0;
self.publish(format!("{}_p{:02}", name, p), *value, format!("{} p{:02}", help, p));
self.publish_number(format!("{}_p{:02}", name, p), *value, format!("{} p{:02}", help, p));
}
}

Expand Down Expand Up @@ -188,27 +184,107 @@ impl<S: MetricsComponent + Sync + Send + 'static> PrometheusCollector for Collec
}
}

pub trait MetricPublisher {
/// Publish a gague metric.
fn publish(&self, state: &mut CollectorState, name: String, help: String);
}

/// Implements MetricPublisher for string types.
impl MetricPublisher for &String {
#[inline]
fn publish(&self, state: &mut CollectorState, name: String, help: String) {
state.publish_text(name, *self, help);
}
}

/// Implements MetricPublisher for string types.
impl<T> MetricPublisher for &T
where
T: MetricsComponent,
{
#[inline]
fn publish(&self, parent_state: &mut CollectorState, module_name: String, _help: String) {
let module_name = if module_name.is_empty() {
None
} else {
Some(module_name)
};
let mut state = CollectorState {
module_name: match (&parent_state.module_name, module_name) {
(Some(parent), None) => Some(parent.clone()),
(Some(parent), Some(child)) => Some(format!("{parent}_{}", child)),
(None, child) => child,
},
metrics: Vec::default(),
text: Vec::default(),
children: Vec::default(),
};
self.gather_metrics(&mut state);
parent_state.children.push(state);
}
}

macro_rules! impl_publish_atomic {
($($t:ty),*) => {
$(
impl MetricPublisher for &$t {
#[inline]
fn publish(&self, state: &mut CollectorState, name: String, help: String) {
state.publish_number(name, &self.load(Ordering::Relaxed), help);
}
}
)*
};
}

impl_publish_atomic!(
AtomicU8,
AtomicU16,
AtomicU32,
AtomicU64,
AtomicUsize,
AtomicI8,
AtomicI16,
AtomicI32,
AtomicI64,
AtomicIsize
);

#[derive(Debug)]
pub struct NumericalMetric<T>(T);

macro_rules! impl_numerical {
($($t:ty),*) => {
$(
impl From<$t> for NumericalMetric<$t> {
#[inline]
fn from(t: $t) -> Self {
NumericalMetric(t)
}
}
impl From<&$t> for NumericalMetric<$t> {
#[inline]
fn from(t: &$t) -> Self {
NumericalMetric(*t)
}
}
)*
};
}

// Regsiter all the numerical types to be converted into Numerical.
impl_numerical!(u8, bool, u16, u32, u64, usize, i8, i16, i32, i64, isize);
impl_numerical!(u8, bool, u16, u32, u64, usize, i8, i16, i32, i64, isize, f32, f64);

macro_rules! impl_numerical_metric {
($u:ty,$($t:ty),*) => {
$(
impl MetricPublisher for &$t {
#[inline]
fn publish(&self, state: &mut CollectorState, name: String, help: String) {
state.publish_number(name, *self, help);
}
}

impl EncodeMetric for NumericalMetric<$t> {
fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> {
encoder.encode_gauge(&TryInto::<$u>::try_into(self.0).map_err(|_| std::fmt::Error::default())?)
Expand Down

0 comments on commit 87bd0e6

Please sign in to comment.