Skip to content

Commit

Permalink
feat(Consensus): [CON-1151] Add metrics for the size of the certifica…
Browse files Browse the repository at this point in the history
…tion pools
  • Loading branch information
kpop-dfinity committed Jan 15, 2024
1 parent 9307900 commit adba2f9
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 41 deletions.
10 changes: 7 additions & 3 deletions rs/artifact_pool/src/canister_http_pool.rs
@@ -1,10 +1,8 @@
//! Canister Http Artifact Pool implementation.

// TODO: Remove
#![allow(dead_code)]
use crate::{
metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED},
pool_common::PoolSection,
pool_common::{HasLabel, PoolSection},
};
use ic_interfaces::{
canister_http::{CanisterHttpChangeAction, CanisterHttpChangeSet, CanisterHttpPool},
Expand Down Expand Up @@ -179,6 +177,12 @@ impl ValidatedPoolReader<CanisterHttpArtifact> for CanisterHttpPoolImpl {
}
}

impl HasLabel for CanisterHttpResponse {
fn label(&self) -> &str {
"canister_http_response"
}
}

#[cfg(test)]
mod tests {
use ic_logger::replica_logger::no_op_logger;
Expand Down
47 changes: 47 additions & 0 deletions rs/artifact_pool/src/certification_pool.rs
@@ -1,5 +1,6 @@
use crate::height_index::HeightIndex;
use crate::metrics::{PoolMetrics, POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED};
use crate::pool_common::HasLabel;
use ic_config::artifact_pool::{ArtifactPoolConfig, PersistentPoolBackend};
use ic_interfaces::{
certification::{CertificationPool, ChangeAction, ChangeSet},
Expand Down Expand Up @@ -45,6 +46,8 @@ pub struct CertificationPoolImpl {
}

const POOL_CERTIFICATION: &str = "certification";
const CERTIFICATION_ARTIFACT_TYPE: &str = "certification";
const CERTIFICATION_SHARE_ARTIFACT_TYPE: &str = "certification_share";

impl CertificationPoolImpl {
pub fn new(
Expand Down Expand Up @@ -116,25 +119,50 @@ impl CertificationPoolImpl {
.insert(CertificationMessage::Certification(certification))
}
}

fn update_metrics(&self) {
// Validated artifacts metrics
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
.set(self.persistent_pool.certifications().size() as i64);
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
.set(self.persistent_pool.certification_shares().size() as i64);

// Unvalidated artifacts metrics
self.unvalidated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
.set(self.unvalidated_certifications.size() as i64);
self.unvalidated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
.set(self.unvalidated_shares.size() as i64);
}
}

impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
type ChangeSet = ChangeSet;

fn insert(&mut self, msg: UnvalidatedArtifact<CertificationMessage>) {
let height = msg.message.height();
let label = msg.message.label();
match &msg.message {
CertificationMessage::CertificationShare(share) => {
if self.unvalidated_shares.insert(height, share) {
self.unvalidated_pool_metrics
.received_artifact_bytes
.with_label_values(&[label])
.observe(std::mem::size_of_val(share) as f64);
}
}
CertificationMessage::Certification(cert) => {
if self.unvalidated_certifications.insert(height, cert) {
self.unvalidated_pool_metrics
.received_artifact_bytes
.with_label_values(&[label])
.observe(std::mem::size_of_val(cert) as f64);
}
}
Expand All @@ -160,11 +188,13 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
let changed = !change_set.is_empty();
let mut adverts = Vec::new();
let mut purged = Vec::new();

change_set.into_iter().for_each(|action| match action {
ChangeAction::AddToValidated(msg) => {
adverts.push(CertificationArtifact::message_to_advert(&msg));
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[msg.label()])
.observe(std::mem::size_of_val(&msg) as f64);
self.persistent_pool.insert(msg);
}
Expand All @@ -174,11 +204,13 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
adverts.push(CertificationArtifact::message_to_advert(&msg));
}
let height = msg.height();
let label = msg.label().to_owned();
match msg {
CertificationMessage::CertificationShare(share) => {
self.unvalidated_shares.remove(height, &share);
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(std::mem::size_of_val(&share) as f64);
self.persistent_pool
.insert(CertificationMessage::CertificationShare(share));
Expand All @@ -187,6 +219,7 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
self.unvalidated_certifications.remove(height, &cert);
self.validated_pool_metrics
.received_artifact_bytes
.with_label_values(&[&label])
.observe(std::mem::size_of_val(&cert) as f64);
self.insert_validated_certification(cert);
}
Expand Down Expand Up @@ -228,6 +261,11 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
};
}
});

if changed {
self.update_metrics();
}

ChangeResult {
purged,
adverts,
Expand Down Expand Up @@ -393,6 +431,15 @@ impl ValidatedPoolReader<CertificationArtifact> for CertificationPoolImpl {
}
}

impl HasLabel for CertificationMessage {
fn label(&self) -> &str {
match self {
CertificationMessage::Certification(_) => CERTIFICATION_ARTIFACT_TYPE,
CertificationMessage::CertificationShare(_) => CERTIFICATION_SHARE_ARTIFACT_TYPE,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 13 additions & 1 deletion rs/artifact_pool/src/dkg_pool.rs
@@ -1,6 +1,6 @@
use crate::{
metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED},
pool_common::PoolSection,
pool_common::{HasLabel, PoolSection},
};
use ic_interfaces::{
dkg::{ChangeAction, ChangeSet, DkgPool},
Expand Down Expand Up @@ -175,6 +175,18 @@ impl DkgPool for DkgPoolImpl {
}
}

impl HasLabel for dkg::Message {
fn label(&self) -> &str {
"dkg_message"
}
}

impl HasLabel for UnvalidatedArtifact<dkg::Message> {
fn label(&self) -> &str {
self.message.label()
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
16 changes: 12 additions & 4 deletions rs/artifact_pool/src/ingress_pool.rs
Expand Up @@ -30,6 +30,8 @@ use prometheus::IntCounter;
use std::collections::BTreeMap;
use std::sync::Arc;

const INGRESS_MESSAGE_ARTIFACT_TYPE: &str = "ingress_message";

#[derive(Clone)]
struct IngressPoolSection<T: AsRef<IngressPoolObject>> {
/// Do not insert or remove elements in this map directly. Use this struct's
Expand Down Expand Up @@ -63,12 +65,14 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
.with_label_values(&["insert"])
.start_timer();
let new_artifact_size = artifact.as_ref().count_bytes();
self.metrics.observe_insert(new_artifact_size);
self.metrics
.observe_insert(new_artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
if let Some(previous) = self.artifacts.insert(message_id, artifact) {
let prev_size = previous.as_ref().count_bytes();
self.byte_size -= prev_size;
self.byte_size += new_artifact_size;
self.metrics.observe_duplicate(prev_size);
self.metrics
.observe_duplicate(prev_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
} else {
self.byte_size += new_artifact_size;
}
Expand All @@ -85,7 +89,10 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
let removed = self.artifacts.remove(message_id);
if let Some(artifact) = &removed {
self.byte_size -= artifact.as_ref().count_bytes();
self.metrics.observe_remove(artifact.as_ref().count_bytes());
self.metrics.observe_remove(
artifact.as_ref().count_bytes(),
INGRESS_MESSAGE_ARTIFACT_TYPE,
);
}
// SAFETY: Checking byte size invariant
section_ok(self);
Expand Down Expand Up @@ -116,7 +123,8 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
for artifact in to_remove.values() {
let artifact_size = artifact.as_ref().count_bytes();
self.byte_size -= artifact_size;
self.metrics.observe_remove(artifact_size);
self.metrics
.observe_remove(artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
}
// SAFETY: Checking byte size invariant
section_ok(self);
Expand Down
68 changes: 41 additions & 27 deletions rs/artifact_pool/src/metrics.rs
@@ -1,23 +1,21 @@
use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero};
use ic_metrics::MetricsRegistry;
use prometheus::{
histogram_opts, labels, opts, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec,
};
use prometheus::{histogram_opts, labels, opts, HistogramVec, IntCounterVec, IntGaugeVec};

pub const LABEL_POOL: &str = "pool";
pub const LABEL_POOL_TYPE: &str = "pool_type";
pub const POOL_TYPE_VALIDATED: &str = "validated";
pub const POOL_TYPE_UNVALIDATED: &str = "unvalidated";
pub const ARTIFACT_TYPE: &str = "artifact_type";

/// Metrics for a given artifact pool's validated/unvalidated section.
#[derive(Clone)]
pub struct PoolMetrics {
pub op_duration: HistogramVec,
pub received_artifact_bytes: Histogram,
received_duplicate_artifacts: IntCounter,
pub pool_artifacts: IntGauge,
pub pool_size_bytes: IntGauge,
pub received_artifact_bytes: HistogramVec,
received_duplicate_artifacts: IntCounterVec,
pub pool_artifacts: IntGaugeVec,
pub pool_size_bytes: IntGaugeVec,
}

impl PoolMetrics {
Expand All @@ -37,59 +35,75 @@ impl PoolMetrics {
.unwrap(),
),
received_artifact_bytes: metrics_registry.register(
Histogram::with_opts(histogram_opts!(
HistogramVec::new(histogram_opts!(
"artifact_pool_received_artifact_bytes",
"The byte size of all artifacts received by the given pool",
// 0, 1B - 50MB
decimal_buckets_with_zero(0, 7),
labels! {LABEL_POOL.to_string() => pool.to_string(), LABEL_POOL_TYPE.to_string() => pool_type.to_string()}
))
), &[ARTIFACT_TYPE])
.unwrap(),
),
received_duplicate_artifacts: metrics_registry.register(
IntCounter::with_opts(opts!(
IntCounterVec::new(opts!(
"artifact_pool_received_duplicate_artifacts",
"Duplicate artifacts received by the given pool",
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
))
), &[ARTIFACT_TYPE])
.unwrap(),
),
pool_artifacts: metrics_registry.register(
IntGauge::with_opts(opts!(
IntGaugeVec::new(opts!(
"artifact_pool_artifacts",
"Current number of artifacts in the given pool",
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
))
), &[ARTIFACT_TYPE])
.unwrap(),
),
pool_size_bytes: {
metrics_registry.register(
IntGauge::with_opts(opts!(
IntGaugeVec::new(opts!(
"artifact_pool_artifact_bytes",
"Current byte size of artifacts in the given pool",
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
))
), &[ARTIFACT_TYPE])
.unwrap(),
)
},
}
}

pub fn observe_insert(&self, size_bytes: usize) {
self.received_artifact_bytes.observe(size_bytes as f64);
self.pool_artifacts.inc();
self.pool_size_bytes.add(size_bytes as i64);
pub fn observe_insert(&self, size_bytes: usize, artifact_type: &str) {
self.received_artifact_bytes
.with_label_values(&[artifact_type])
.observe(size_bytes as f64);
self.pool_artifacts
.with_label_values(&[artifact_type])
.inc();
self.pool_size_bytes
.with_label_values(&[artifact_type])
.add(size_bytes as i64);
}

pub fn observe_duplicate(&self, size_bytes: usize) {
self.received_duplicate_artifacts.inc();
self.pool_artifacts.dec();
self.pool_size_bytes.sub(size_bytes as i64);
pub fn observe_duplicate(&self, size_bytes: usize, artifact_type: &str) {
self.received_duplicate_artifacts
.with_label_values(&[artifact_type])
.inc();
self.pool_artifacts
.with_label_values(&[artifact_type])
.dec();
self.pool_size_bytes
.with_label_values(&[artifact_type])
.sub(size_bytes as i64);
}

pub fn observe_remove(&self, size_bytes: usize) {
self.pool_artifacts.dec();
self.pool_size_bytes.sub(size_bytes as i64);
pub fn observe_remove(&self, size_bytes: usize, artifact_type: &str) {
self.pool_artifacts
.with_label_values(&[artifact_type])
.dec();
self.pool_size_bytes
.with_label_values(&[artifact_type])
.sub(size_bytes as i64);
}
}

Expand Down

0 comments on commit adba2f9

Please sign in to comment.