Skip to content

Commit adba2f9

Browse files
committed
feat(Consensus): [CON-1151] Add metrics for the size of the certification pools
1 parent 9307900 commit adba2f9

File tree

6 files changed

+139
-41
lines changed

6 files changed

+139
-41
lines changed

rs/artifact_pool/src/canister_http_pool.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
//! Canister Http Artifact Pool implementation.
22
3-
// TODO: Remove
4-
#![allow(dead_code)]
53
use crate::{
64
metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED},
7-
pool_common::PoolSection,
5+
pool_common::{HasLabel, PoolSection},
86
};
97
use ic_interfaces::{
108
canister_http::{CanisterHttpChangeAction, CanisterHttpChangeSet, CanisterHttpPool},
@@ -179,6 +177,12 @@ impl ValidatedPoolReader<CanisterHttpArtifact> for CanisterHttpPoolImpl {
179177
}
180178
}
181179

180+
impl HasLabel for CanisterHttpResponse {
181+
fn label(&self) -> &str {
182+
"canister_http_response"
183+
}
184+
}
185+
182186
#[cfg(test)]
183187
mod tests {
184188
use ic_logger::replica_logger::no_op_logger;

rs/artifact_pool/src/certification_pool.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::height_index::HeightIndex;
22
use crate::metrics::{PoolMetrics, POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED};
3+
use crate::pool_common::HasLabel;
34
use ic_config::artifact_pool::{ArtifactPoolConfig, PersistentPoolBackend};
45
use ic_interfaces::{
56
certification::{CertificationPool, ChangeAction, ChangeSet},
@@ -45,6 +46,8 @@ pub struct CertificationPoolImpl {
4546
}
4647

4748
const POOL_CERTIFICATION: &str = "certification";
49+
const CERTIFICATION_ARTIFACT_TYPE: &str = "certification";
50+
const CERTIFICATION_SHARE_ARTIFACT_TYPE: &str = "certification_share";
4851

4952
impl CertificationPoolImpl {
5053
pub fn new(
@@ -116,25 +119,50 @@ impl CertificationPoolImpl {
116119
.insert(CertificationMessage::Certification(certification))
117120
}
118121
}
122+
123+
fn update_metrics(&self) {
124+
// Validated artifacts metrics
125+
self.validated_pool_metrics
126+
.pool_artifacts
127+
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
128+
.set(self.persistent_pool.certifications().size() as i64);
129+
self.validated_pool_metrics
130+
.pool_artifacts
131+
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
132+
.set(self.persistent_pool.certification_shares().size() as i64);
133+
134+
// Unvalidated artifacts metrics
135+
self.unvalidated_pool_metrics
136+
.pool_artifacts
137+
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
138+
.set(self.unvalidated_certifications.size() as i64);
139+
self.unvalidated_pool_metrics
140+
.pool_artifacts
141+
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
142+
.set(self.unvalidated_shares.size() as i64);
143+
}
119144
}
120145

121146
impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
122147
type ChangeSet = ChangeSet;
123148

124149
fn insert(&mut self, msg: UnvalidatedArtifact<CertificationMessage>) {
125150
let height = msg.message.height();
151+
let label = msg.message.label();
126152
match &msg.message {
127153
CertificationMessage::CertificationShare(share) => {
128154
if self.unvalidated_shares.insert(height, share) {
129155
self.unvalidated_pool_metrics
130156
.received_artifact_bytes
157+
.with_label_values(&[label])
131158
.observe(std::mem::size_of_val(share) as f64);
132159
}
133160
}
134161
CertificationMessage::Certification(cert) => {
135162
if self.unvalidated_certifications.insert(height, cert) {
136163
self.unvalidated_pool_metrics
137164
.received_artifact_bytes
165+
.with_label_values(&[label])
138166
.observe(std::mem::size_of_val(cert) as f64);
139167
}
140168
}
@@ -160,11 +188,13 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
160188
let changed = !change_set.is_empty();
161189
let mut adverts = Vec::new();
162190
let mut purged = Vec::new();
191+
163192
change_set.into_iter().for_each(|action| match action {
164193
ChangeAction::AddToValidated(msg) => {
165194
adverts.push(CertificationArtifact::message_to_advert(&msg));
166195
self.validated_pool_metrics
167196
.received_artifact_bytes
197+
.with_label_values(&[msg.label()])
168198
.observe(std::mem::size_of_val(&msg) as f64);
169199
self.persistent_pool.insert(msg);
170200
}
@@ -174,11 +204,13 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
174204
adverts.push(CertificationArtifact::message_to_advert(&msg));
175205
}
176206
let height = msg.height();
207+
let label = msg.label().to_owned();
177208
match msg {
178209
CertificationMessage::CertificationShare(share) => {
179210
self.unvalidated_shares.remove(height, &share);
180211
self.validated_pool_metrics
181212
.received_artifact_bytes
213+
.with_label_values(&[&label])
182214
.observe(std::mem::size_of_val(&share) as f64);
183215
self.persistent_pool
184216
.insert(CertificationMessage::CertificationShare(share));
@@ -187,6 +219,7 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
187219
self.unvalidated_certifications.remove(height, &cert);
188220
self.validated_pool_metrics
189221
.received_artifact_bytes
222+
.with_label_values(&[&label])
190223
.observe(std::mem::size_of_val(&cert) as f64);
191224
self.insert_validated_certification(cert);
192225
}
@@ -228,6 +261,11 @@ impl MutablePool<CertificationArtifact> for CertificationPoolImpl {
228261
};
229262
}
230263
});
264+
265+
if changed {
266+
self.update_metrics();
267+
}
268+
231269
ChangeResult {
232270
purged,
233271
adverts,
@@ -393,6 +431,15 @@ impl ValidatedPoolReader<CertificationArtifact> for CertificationPoolImpl {
393431
}
394432
}
395433

434+
impl HasLabel for CertificationMessage {
435+
fn label(&self) -> &str {
436+
match self {
437+
CertificationMessage::Certification(_) => CERTIFICATION_ARTIFACT_TYPE,
438+
CertificationMessage::CertificationShare(_) => CERTIFICATION_SHARE_ARTIFACT_TYPE,
439+
}
440+
}
441+
}
442+
396443
#[cfg(test)]
397444
mod tests {
398445
use super::*;

rs/artifact_pool/src/dkg_pool.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
metrics::{POOL_TYPE_UNVALIDATED, POOL_TYPE_VALIDATED},
3-
pool_common::PoolSection,
3+
pool_common::{HasLabel, PoolSection},
44
};
55
use ic_interfaces::{
66
dkg::{ChangeAction, ChangeSet, DkgPool},
@@ -175,6 +175,18 @@ impl DkgPool for DkgPoolImpl {
175175
}
176176
}
177177

178+
impl HasLabel for dkg::Message {
179+
fn label(&self) -> &str {
180+
"dkg_message"
181+
}
182+
}
183+
184+
impl HasLabel for UnvalidatedArtifact<dkg::Message> {
185+
fn label(&self) -> &str {
186+
self.message.label()
187+
}
188+
}
189+
178190
#[cfg(test)]
179191
mod test {
180192
use super::*;

rs/artifact_pool/src/ingress_pool.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use prometheus::IntCounter;
3030
use std::collections::BTreeMap;
3131
use std::sync::Arc;
3232

33+
const INGRESS_MESSAGE_ARTIFACT_TYPE: &str = "ingress_message";
34+
3335
#[derive(Clone)]
3436
struct IngressPoolSection<T: AsRef<IngressPoolObject>> {
3537
/// Do not insert or remove elements in this map directly. Use this struct's
@@ -63,12 +65,14 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
6365
.with_label_values(&["insert"])
6466
.start_timer();
6567
let new_artifact_size = artifact.as_ref().count_bytes();
66-
self.metrics.observe_insert(new_artifact_size);
68+
self.metrics
69+
.observe_insert(new_artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
6770
if let Some(previous) = self.artifacts.insert(message_id, artifact) {
6871
let prev_size = previous.as_ref().count_bytes();
6972
self.byte_size -= prev_size;
7073
self.byte_size += new_artifact_size;
71-
self.metrics.observe_duplicate(prev_size);
74+
self.metrics
75+
.observe_duplicate(prev_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
7276
} else {
7377
self.byte_size += new_artifact_size;
7478
}
@@ -85,7 +89,10 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
8589
let removed = self.artifacts.remove(message_id);
8690
if let Some(artifact) = &removed {
8791
self.byte_size -= artifact.as_ref().count_bytes();
88-
self.metrics.observe_remove(artifact.as_ref().count_bytes());
92+
self.metrics.observe_remove(
93+
artifact.as_ref().count_bytes(),
94+
INGRESS_MESSAGE_ARTIFACT_TYPE,
95+
);
8996
}
9097
// SAFETY: Checking byte size invariant
9198
section_ok(self);
@@ -116,7 +123,8 @@ impl<T: AsRef<IngressPoolObject>> IngressPoolSection<T> {
116123
for artifact in to_remove.values() {
117124
let artifact_size = artifact.as_ref().count_bytes();
118125
self.byte_size -= artifact_size;
119-
self.metrics.observe_remove(artifact_size);
126+
self.metrics
127+
.observe_remove(artifact_size, INGRESS_MESSAGE_ARTIFACT_TYPE);
120128
}
121129
// SAFETY: Checking byte size invariant
122130
section_ok(self);

rs/artifact_pool/src/metrics.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero};
22
use ic_metrics::MetricsRegistry;
3-
use prometheus::{
4-
histogram_opts, labels, opts, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
5-
IntGaugeVec,
6-
};
3+
use prometheus::{histogram_opts, labels, opts, HistogramVec, IntCounterVec, IntGaugeVec};
74

85
pub const LABEL_POOL: &str = "pool";
96
pub const LABEL_POOL_TYPE: &str = "pool_type";
107
pub const POOL_TYPE_VALIDATED: &str = "validated";
118
pub const POOL_TYPE_UNVALIDATED: &str = "unvalidated";
9+
pub const ARTIFACT_TYPE: &str = "artifact_type";
1210

1311
/// Metrics for a given artifact pool's validated/unvalidated section.
1412
#[derive(Clone)]
1513
pub struct PoolMetrics {
1614
pub op_duration: HistogramVec,
17-
pub received_artifact_bytes: Histogram,
18-
received_duplicate_artifacts: IntCounter,
19-
pub pool_artifacts: IntGauge,
20-
pub pool_size_bytes: IntGauge,
15+
pub received_artifact_bytes: HistogramVec,
16+
received_duplicate_artifacts: IntCounterVec,
17+
pub pool_artifacts: IntGaugeVec,
18+
pub pool_size_bytes: IntGaugeVec,
2119
}
2220

2321
impl PoolMetrics {
@@ -37,59 +35,75 @@ impl PoolMetrics {
3735
.unwrap(),
3836
),
3937
received_artifact_bytes: metrics_registry.register(
40-
Histogram::with_opts(histogram_opts!(
38+
HistogramVec::new(histogram_opts!(
4139
"artifact_pool_received_artifact_bytes",
4240
"The byte size of all artifacts received by the given pool",
4341
// 0, 1B - 50MB
4442
decimal_buckets_with_zero(0, 7),
4543
labels! {LABEL_POOL.to_string() => pool.to_string(), LABEL_POOL_TYPE.to_string() => pool_type.to_string()}
46-
))
44+
), &[ARTIFACT_TYPE])
4745
.unwrap(),
4846
),
4947
received_duplicate_artifacts: metrics_registry.register(
50-
IntCounter::with_opts(opts!(
48+
IntCounterVec::new(opts!(
5149
"artifact_pool_received_duplicate_artifacts",
5250
"Duplicate artifacts received by the given pool",
5351
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
54-
))
52+
), &[ARTIFACT_TYPE])
5553
.unwrap(),
5654
),
5755
pool_artifacts: metrics_registry.register(
58-
IntGauge::with_opts(opts!(
56+
IntGaugeVec::new(opts!(
5957
"artifact_pool_artifacts",
6058
"Current number of artifacts in the given pool",
6159
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
62-
))
60+
), &[ARTIFACT_TYPE])
6361
.unwrap(),
6462
),
6563
pool_size_bytes: {
6664
metrics_registry.register(
67-
IntGauge::with_opts(opts!(
65+
IntGaugeVec::new(opts!(
6866
"artifact_pool_artifact_bytes",
6967
"Current byte size of artifacts in the given pool",
7068
labels! {LABEL_POOL => pool, LABEL_POOL_TYPE => pool_type}
71-
))
69+
), &[ARTIFACT_TYPE])
7270
.unwrap(),
7371
)
7472
},
7573
}
7674
}
7775

78-
pub fn observe_insert(&self, size_bytes: usize) {
79-
self.received_artifact_bytes.observe(size_bytes as f64);
80-
self.pool_artifacts.inc();
81-
self.pool_size_bytes.add(size_bytes as i64);
76+
pub fn observe_insert(&self, size_bytes: usize, artifact_type: &str) {
77+
self.received_artifact_bytes
78+
.with_label_values(&[artifact_type])
79+
.observe(size_bytes as f64);
80+
self.pool_artifacts
81+
.with_label_values(&[artifact_type])
82+
.inc();
83+
self.pool_size_bytes
84+
.with_label_values(&[artifact_type])
85+
.add(size_bytes as i64);
8286
}
8387

84-
pub fn observe_duplicate(&self, size_bytes: usize) {
85-
self.received_duplicate_artifacts.inc();
86-
self.pool_artifacts.dec();
87-
self.pool_size_bytes.sub(size_bytes as i64);
88+
pub fn observe_duplicate(&self, size_bytes: usize, artifact_type: &str) {
89+
self.received_duplicate_artifacts
90+
.with_label_values(&[artifact_type])
91+
.inc();
92+
self.pool_artifacts
93+
.with_label_values(&[artifact_type])
94+
.dec();
95+
self.pool_size_bytes
96+
.with_label_values(&[artifact_type])
97+
.sub(size_bytes as i64);
8898
}
8999

90-
pub fn observe_remove(&self, size_bytes: usize) {
91-
self.pool_artifacts.dec();
92-
self.pool_size_bytes.sub(size_bytes as i64);
100+
pub fn observe_remove(&self, size_bytes: usize, artifact_type: &str) {
101+
self.pool_artifacts
102+
.with_label_values(&[artifact_type])
103+
.dec();
104+
self.pool_size_bytes
105+
.with_label_values(&[artifact_type])
106+
.sub(size_bytes as i64);
93107
}
94108
}
95109

0 commit comments

Comments
 (0)