From adba2f93769c951fea51d961c3a3d159deb41748 Mon Sep 17 00:00:00 2001 From: Kamil Popielarz Date: Mon, 15 Jan 2024 08:02:24 +0000 Subject: [PATCH] feat(Consensus): [CON-1151] Add metrics for the size of the certification pools --- rs/artifact_pool/src/canister_http_pool.rs | 10 +++- rs/artifact_pool/src/certification_pool.rs | 47 +++++++++++++++ rs/artifact_pool/src/dkg_pool.rs | 14 ++++- rs/artifact_pool/src/ingress_pool.rs | 16 +++-- rs/artifact_pool/src/metrics.rs | 68 +++++++++++++--------- rs/artifact_pool/src/pool_common.rs | 25 ++++++-- 6 files changed, 139 insertions(+), 41 deletions(-) diff --git a/rs/artifact_pool/src/canister_http_pool.rs b/rs/artifact_pool/src/canister_http_pool.rs index 0da6ad3ed94..14af9f40ccd 100644 --- a/rs/artifact_pool/src/canister_http_pool.rs +++ b/rs/artifact_pool/src/canister_http_pool.rs @@ -1,10 +1,8 @@ //! Canister Http Artifact Pool implementation. -// TODO: Remove -#![allow(dead_code)] use crate::{ metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED}, - pool_common::PoolSection, + pool_common::{HasLabel, PoolSection}, }; use ic_interfaces::{ canister_http::{CanisterHttpChangeAction, CanisterHttpChangeSet, CanisterHttpPool}, @@ -179,6 +177,12 @@ impl ValidatedPoolReader for CanisterHttpPoolImpl { } } +impl HasLabel for CanisterHttpResponse { + fn label(&self) -> &str { + "canister_http_response" + } +} + #[cfg(test)] mod tests { use ic_logger::replica_logger::no_op_logger; diff --git a/rs/artifact_pool/src/certification_pool.rs b/rs/artifact_pool/src/certification_pool.rs index d54a2a70d0a..13936ce12e1 100644 --- a/rs/artifact_pool/src/certification_pool.rs +++ b/rs/artifact_pool/src/certification_pool.rs @@ -1,5 +1,6 @@ use crate::height_index::HeightIndex; use crate::metrics::{PoolMetrics, POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED}; +use crate::pool_common::HasLabel; use ic_config::artifact_pool::{ArtifactPoolConfig, PersistentPoolBackend}; use ic_interfaces::{ certification::{CertificationPool, ChangeAction, ChangeSet}, @@ -45,6 +46,8 @@ pub struct CertificationPoolImpl { } const POOL_CERTIFICATION: &str = "certification"; +const CERTIFICATION_ARTIFACT_TYPE: &str = "certification"; +const CERTIFICATION_SHARE_ARTIFACT_TYPE: &str = "certification_share"; impl CertificationPoolImpl { pub fn new( @@ -116,6 +119,28 @@ impl CertificationPoolImpl { .insert(CertificationMessage::Certification(certification)) } } + + fn update_metrics(&self) { + // Validated artifacts metrics + self.validated_pool_metrics + .pool_artifacts + .with_label_values(&[CERTIFICATION_ARTIFACT_TYPE]) + .set(self.persistent_pool.certifications().size() as i64); + self.validated_pool_metrics + .pool_artifacts + .with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE]) + .set(self.persistent_pool.certification_shares().size() as i64); + + // Unvalidated artifacts metrics + self.unvalidated_pool_metrics + .pool_artifacts + .with_label_values(&[CERTIFICATION_ARTIFACT_TYPE]) + .set(self.unvalidated_certifications.size() as i64); + self.unvalidated_pool_metrics + .pool_artifacts + .with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE]) + .set(self.unvalidated_shares.size() as i64); + } } impl MutablePool for CertificationPoolImpl { @@ -123,11 +148,13 @@ impl MutablePool for CertificationPoolImpl { fn insert(&mut self, msg: UnvalidatedArtifact) { let height = msg.message.height(); + let label = msg.message.label(); match &msg.message { CertificationMessage::CertificationShare(share) => { if self.unvalidated_shares.insert(height, share) { self.unvalidated_pool_metrics .received_artifact_bytes + .with_label_values(&[label]) .observe(std::mem::size_of_val(share) as f64); } } @@ -135,6 +162,7 @@ impl MutablePool for CertificationPoolImpl { if self.unvalidated_certifications.insert(height, cert) { self.unvalidated_pool_metrics .received_artifact_bytes + .with_label_values(&[label]) .observe(std::mem::size_of_val(cert) as f64); } } @@ -160,11 +188,13 @@ impl MutablePool for CertificationPoolImpl { let changed = !change_set.is_empty(); let mut adverts = Vec::new(); let mut purged = Vec::new(); + change_set.into_iter().for_each(|action| match action { ChangeAction::AddToValidated(msg) => { adverts.push(CertificationArtifact::message_to_advert(&msg)); self.validated_pool_metrics .received_artifact_bytes + .with_label_values(&[msg.label()]) .observe(std::mem::size_of_val(&msg) as f64); self.persistent_pool.insert(msg); } @@ -174,11 +204,13 @@ impl MutablePool for CertificationPoolImpl { adverts.push(CertificationArtifact::message_to_advert(&msg)); } let height = msg.height(); + let label = msg.label().to_owned(); match msg { CertificationMessage::CertificationShare(share) => { self.unvalidated_shares.remove(height, &share); self.validated_pool_metrics .received_artifact_bytes + .with_label_values(&[&label]) .observe(std::mem::size_of_val(&share) as f64); self.persistent_pool .insert(CertificationMessage::CertificationShare(share)); @@ -187,6 +219,7 @@ impl MutablePool for CertificationPoolImpl { self.unvalidated_certifications.remove(height, &cert); self.validated_pool_metrics .received_artifact_bytes + .with_label_values(&[&label]) .observe(std::mem::size_of_val(&cert) as f64); self.insert_validated_certification(cert); } @@ -228,6 +261,11 @@ impl MutablePool for CertificationPoolImpl { }; } }); + + if changed { + self.update_metrics(); + } + ChangeResult { purged, adverts, @@ -393,6 +431,15 @@ impl ValidatedPoolReader for CertificationPoolImpl { } } +impl HasLabel for CertificationMessage { + fn label(&self) -> &str { + match self { + CertificationMessage::Certification(_) => CERTIFICATION_ARTIFACT_TYPE, + CertificationMessage::CertificationShare(_) => CERTIFICATION_SHARE_ARTIFACT_TYPE, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rs/artifact_pool/src/dkg_pool.rs b/rs/artifact_pool/src/dkg_pool.rs index 6921abb05fd..bd3d5a7ceea 100644 --- a/rs/artifact_pool/src/dkg_pool.rs +++ b/rs/artifact_pool/src/dkg_pool.rs @@ -1,6 +1,6 @@ use crate::{ metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED}, - pool_common::PoolSection, + pool_common::{HasLabel, PoolSection}, }; use ic_interfaces::{ dkg::{ChangeAction, ChangeSet, DkgPool}, @@ -175,6 +175,18 @@ impl DkgPool for DkgPoolImpl { } } +impl HasLabel for dkg::Message { + fn label(&self) -> &str { + "dkg_message" + } +} + +impl HasLabel for UnvalidatedArtifact { + fn label(&self) -> &str { + self.message.label() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/rs/artifact_pool/src/ingress_pool.rs b/rs/artifact_pool/src/ingress_pool.rs index c8247fc3a75..bef10673ff0 100644 --- a/rs/artifact_pool/src/ingress_pool.rs +++ b/rs/artifact_pool/src/ingress_pool.rs @@ -30,6 +30,8 @@ use prometheus::IntCounter; use std::collections::BTreeMap; use std::sync::Arc; +const INGRESS_MESSAGE_ARTIFACT_TYPE: &str = "ingress_message"; + #[derive(Clone)] struct IngressPoolSection> { /// Do not insert or remove elements in this map directly. Use this struct's @@ -63,12 +65,14 @@ impl> IngressPoolSection { .with_label_values(&["insert"]) .start_timer(); let new_artifact_size = artifact.as_ref().count_bytes(); - self.metrics.observe_insert(new_artifact_size); + self.metrics + .observe_insert(new_artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE); if let Some(previous) = self.artifacts.insert(message_id, artifact) { let prev_size = previous.as_ref().count_bytes(); self.byte_size -= prev_size; self.byte_size += new_artifact_size; - self.metrics.observe_duplicate(prev_size); + self.metrics + .observe_duplicate(prev_size, INGRESS_MESSAGE_ARTIFACT_TYPE); } else { self.byte_size += new_artifact_size; } @@ -85,7 +89,10 @@ impl> IngressPoolSection { let removed = self.artifacts.remove(message_id); if let Some(artifact) = &removed { self.byte_size -= artifact.as_ref().count_bytes(); - self.metrics.observe_remove(artifact.as_ref().count_bytes()); + self.metrics.observe_remove( + artifact.as_ref().count_bytes(), + INGRESS_MESSAGE_ARTIFACT_TYPE, + ); } // SAFETY: Checking byte size invariant section_ok(self); @@ -116,7 +123,8 @@ impl> IngressPoolSection { for artifact in to_remove.values() { let artifact_size = artifact.as_ref().count_bytes(); self.byte_size -= artifact_size; - self.metrics.observe_remove(artifact_size); + self.metrics + .observe_remove(artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE); } // SAFETY: Checking byte size invariant section_ok(self); diff --git a/rs/artifact_pool/src/metrics.rs b/rs/artifact_pool/src/metrics.rs index f5f8b0ad935..3e6b22ead56 100644 --- a/rs/artifact_pool/src/metrics.rs +++ b/rs/artifact_pool/src/metrics.rs @@ -1,23 +1,21 @@ use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero}; use ic_metrics::MetricsRegistry; -use prometheus::{ - histogram_opts, labels, opts, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, - IntGaugeVec, -}; +use prometheus::{histogram_opts, labels, opts, HistogramVec, IntCounterVec, IntGaugeVec}; pub const LABEL_POOL: &str = "pool"; pub const LABEL_POOL_TYPE: &str = "pool_type"; pub const POOL_TYPE_VALIDATED: &str = "validated"; pub const POOL_TYPE_UNVALIDATED: &str = "unvalidated"; +pub const ARTIFACT_TYPE: &str = "artifact_type"; /// Metrics for a given artifact pool's validated/unvalidated section. #[derive(Clone)] pub struct PoolMetrics { pub op_duration: HistogramVec, - pub received_artifact_bytes: Histogram, - received_duplicate_artifacts: IntCounter, - pub pool_artifacts: IntGauge, - pub pool_size_bytes: IntGauge, + pub received_artifact_bytes: HistogramVec, + received_duplicate_artifacts: IntCounterVec, + pub pool_artifacts: IntGaugeVec, + pub pool_size_bytes: IntGaugeVec, } impl PoolMetrics { @@ -37,59 +35,75 @@ impl PoolMetrics { .unwrap(), ), received_artifact_bytes: metrics_registry.register( - Histogram::with_opts(histogram_opts!( + HistogramVec::new(histogram_opts!( "artifact_pool_received_artifact_bytes", "The byte size of all artifacts received by the given pool", // 0, 1B - 50MB decimal_buckets_with_zero(0, 7), labels! {LABEL_POOL.to_string() => pool.to_string(), LABEL_POOL_TYPE.to_string() => pool_type.to_string()} - )) + ), &[ARTIFACT_TYPE]) .unwrap(), ), received_duplicate_artifacts: metrics_registry.register( - IntCounter::with_opts(opts!( + IntCounterVec::new(opts!( "artifact_pool_received_duplicate_artifacts", "Duplicate artifacts received by the given pool", labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type} - )) + ), &[ARTIFACT_TYPE]) .unwrap(), ), pool_artifacts: metrics_registry.register( - IntGauge::with_opts(opts!( + IntGaugeVec::new(opts!( "artifact_pool_artifacts", "Current number of artifacts in the given pool", labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type} - )) + ), &[ARTIFACT_TYPE]) .unwrap(), ), pool_size_bytes: { metrics_registry.register( - IntGauge::with_opts(opts!( + IntGaugeVec::new(opts!( "artifact_pool_artifact_bytes", "Current byte size of artifacts in the given pool", labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type} - )) + ), &[ARTIFACT_TYPE]) .unwrap(), ) }, } } - pub fn observe_insert(&self, size_bytes: usize) { - self.received_artifact_bytes.observe(size_bytes as f64); - self.pool_artifacts.inc(); - self.pool_size_bytes.add(size_bytes as i64); + pub fn observe_insert(&self, size_bytes: usize, artifact_type: &str) { + self.received_artifact_bytes + .with_label_values(&[artifact_type]) + .observe(size_bytes as f64); + self.pool_artifacts + .with_label_values(&[artifact_type]) + .inc(); + self.pool_size_bytes + .with_label_values(&[artifact_type]) + .add(size_bytes as i64); } - pub fn observe_duplicate(&self, size_bytes: usize) { - self.received_duplicate_artifacts.inc(); - self.pool_artifacts.dec(); - self.pool_size_bytes.sub(size_bytes as i64); + pub fn observe_duplicate(&self, size_bytes: usize, artifact_type: &str) { + self.received_duplicate_artifacts + .with_label_values(&[artifact_type]) + .inc(); + self.pool_artifacts + .with_label_values(&[artifact_type]) + .dec(); + self.pool_size_bytes + .with_label_values(&[artifact_type]) + .sub(size_bytes as i64); } - pub fn observe_remove(&self, size_bytes: usize) { - self.pool_artifacts.dec(); - self.pool_size_bytes.sub(size_bytes as i64); + pub fn observe_remove(&self, size_bytes: usize, artifact_type: &str) { + self.pool_artifacts + .with_label_values(&[artifact_type]) + .dec(); + self.pool_size_bytes + .with_label_values(&[artifact_type]) + .sub(size_bytes as i64); } } diff --git a/rs/artifact_pool/src/pool_common.rs b/rs/artifact_pool/src/pool_common.rs index d73562009ed..9b5fee9530f 100644 --- a/rs/artifact_pool/src/pool_common.rs +++ b/rs/artifact_pool/src/pool_common.rs @@ -5,13 +5,23 @@ use crate::metrics::PoolMetrics; const MESSAGE_SIZE_BYTES: usize = 0; +pub(crate) trait HasLabel { + fn label(&self) -> &str; +} + +impl HasLabel for () { + fn label(&self) -> &str { + "" + } +} + /// Wrapper around `BTreeMap`, instrumenting insertions and removals. pub(crate) struct PoolSection { messages: BTreeMap, metrics: PoolMetrics, } -impl PoolSection { +impl PoolSection { pub(crate) fn new(metrics_registry: MetricsRegistry, pool: &str, pool_type: &str) -> Self { Self { messages: Default::default(), @@ -20,18 +30,21 @@ impl PoolSection { } pub(crate) fn insert(&mut self, key: K, value: V) -> Option { - self.metrics.observe_insert(MESSAGE_SIZE_BYTES); + self.metrics + .observe_insert(MESSAGE_SIZE_BYTES, value.label()); let replaced = self.messages.insert(key, value); - if replaced.is_some() { - self.metrics.observe_duplicate(MESSAGE_SIZE_BYTES); + if let Some(replaced) = &replaced { + self.metrics + .observe_duplicate(MESSAGE_SIZE_BYTES, replaced.label()); } replaced } pub(crate) fn remove(&mut self, key: &K) -> Option { let removed = self.messages.remove(key); - if removed.is_some() { - self.metrics.observe_remove(MESSAGE_SIZE_BYTES); + if let Some(removed) = &removed { + self.metrics + .observe_remove(MESSAGE_SIZE_BYTES, removed.label()); } removed }