Skip to content

Commit

Permalink
feat(consensus): [CON-1151] Add an artifact_type label to `consensu…
Browse files Browse the repository at this point in the history
…s_pool_size` metric.
  • Loading branch information
kpop-dfinity committed Jan 11, 2024
1 parent dbc2731 commit 1951764
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 40 deletions.
76 changes: 37 additions & 39 deletions rs/artifact_pool/src/consensus_pool.rs
Expand Up @@ -74,9 +74,10 @@ pub trait MutablePoolSection<T>: PoolSection<T> {
}

struct PerTypeMetrics<T> {
max_height: prometheus::IntGauge,
min_height: prometheus::IntGauge,
count: Histogram,
max_height: IntGauge,
min_height: IntGauge,
count: IntGauge,
count_per_height: Histogram,
phantom: PhantomData<T>,
}

Expand All @@ -88,6 +89,7 @@ impl<T> PerTypeMetrics<T> {
const NAME: &str = "artifact_pool_consensus_height_stat";
const HELP: &str =
"The height of objects in a consensus pool, by pool type, object type and stat";

Self {
max_height: registry.register(
IntGauge::with_opts(opts!(
Expand All @@ -106,6 +108,14 @@ impl<T> PerTypeMetrics<T> {
.unwrap(),
),
count: registry.register(
IntGauge::with_opts(opts!(
"consensus_pool_size",
"The number of artifacts in a consensus pool, by pool type and object type",
labels! {LABEL_POOL_TYPE => pool_portion, LABEL_TYPE => type_name}
))
.unwrap(),
),
count_per_height: registry.register(
Histogram::with_opts(histogram_opts!(
"artifact_pool_consensus_count_per_height",
"The number of artifacts of the given height in a consensus pool, by pool type \
Expand All @@ -120,29 +130,26 @@ impl<T> PerTypeMetrics<T> {
}
}

fn update_from_height_indexed_pool(&self, index: &dyn HeightIndexedPool<T>) {
let (min, max) = index
.height_range()
.map_or((-1, -1), |r| (r.min.get() as i64, r.max.get() as i64));
if min >= 0 {
self.min_height.set(min);
}
if max >= 0 {
self.max_height.set(max);
fn update_from_height_indexed_pool(&self, pool: &dyn HeightIndexedPool<T>) {
if let Some(height_range) = pool.height_range() {
self.min_height.set(height_range.min.get() as i64);
self.max_height.set(height_range.max.get() as i64);
}

self.count.set(pool.size() as i64);
}

/// Updates the number of artifacts for each height in [last_height, new_height)
fn update_count(
fn update_count_per_height(
&self,
index: &dyn HeightIndexedPool<T>,
pool: &dyn HeightIndexedPool<T>,
last_height: Height,
new_height: Height,
) {
let mut height = last_height;
while height < new_height {
let count = index.get_by_height(height).count();
self.count.observe(count as f64);
let count = pool.get_by_height(height).count();
self.count_per_height.observe(count as f64);
height.inc_assign();
}
}
Expand All @@ -160,7 +167,6 @@ struct PoolMetrics {
notarization_share: PerTypeMetrics<NotarizationShare>,
finalization_share: PerTypeMetrics<FinalizationShare>,
catch_up_package_share: PerTypeMetrics<CatchUpPackageShare>,
total_size: prometheus::IntGauge,
}

impl PoolMetrics {
Expand All @@ -185,14 +191,6 @@ impl PoolMetrics {
pool_portion,
"catch_up_package_share",
),
total_size: registry.register(
IntGauge::with_opts(opts!(
"consensus_pool_size",
"The total size of a consensus pool",
labels! {LABEL_POOL_TYPE => pool_portion}
))
.unwrap(),
),
}
}

Expand All @@ -219,34 +217,33 @@ impl PoolMetrics {
.update_from_height_indexed_pool(pool_section.finalization_share());
self.catch_up_package_share
.update_from_height_indexed_pool(pool_section.catch_up_package_share());
self.total_size.set(pool_section.size() as i64)
}

fn update_count<T>(
fn update_count_per_height<T>(
&mut self,
pool_section: &dyn PoolSection<T>,
last_height: Height,
new_height: Height,
) {
macro_rules! update_count {
macro_rules! update_count_per_height {
($artifact_name:ident) => {
self.$artifact_name.update_count(
self.$artifact_name.update_count_per_height(
pool_section.$artifact_name(),
last_height,
new_height,
);
};
}

update_count!(random_beacon);
update_count!(random_tape);
update_count!(block_proposal);
update_count!(notarization);
update_count!(finalization);
update_count!(random_beacon_share);
update_count!(random_tape_share);
update_count!(notarization_share);
update_count!(finalization_share);
update_count_per_height!(random_beacon);
update_count_per_height!(random_tape);
update_count_per_height!(block_proposal);
update_count_per_height!(notarization);
update_count_per_height!(finalization);
update_count_per_height!(random_beacon_share);
update_count_per_height!(random_tape_share);
update_count_per_height!(notarization_share);
update_count_per_height!(finalization_share);
}
}

Expand Down Expand Up @@ -474,10 +471,11 @@ impl ConsensusPoolImpl {
let new_height = self.validated.pool_section().finalization().max_height();

self.validated_metrics.update(self.validated.pool_section());

// Update the metrics if necessary.
if let (Some(last_height), Some(new_height)) = (last_height, new_height) {
if new_height != last_height {
self.validated_metrics.update_count(
self.validated_metrics.update_count_per_height(
self.validated.pool_section(),
last_height,
new_height,
Expand Down
4 changes: 4 additions & 0 deletions rs/artifact_pool/src/height_index.rs
Expand Up @@ -85,6 +85,10 @@ impl<T: Eq + Clone> HeightIndex<T> {
Box::new(self.buckets.values().flat_map(|bucket| bucket.iter()))
}

pub fn size(&self) -> usize {
self.buckets.values().map(Vec::len).sum()
}

/// Returns all heights of the index, in sorted order.
pub fn heights(&self) -> Box<dyn Iterator<Item = &Height> + '_> {
Box::new(self.buckets.keys())
Expand Down
4 changes: 4 additions & 0 deletions rs/artifact_pool/src/inmemory_pool.rs
Expand Up @@ -208,6 +208,10 @@ where
Box::new(std::iter::empty())
}
}

fn size(&self) -> usize {
self.select_index::<CryptoHashOf<T>>().size()
}
}

impl<T: IntoInner<ConsensusMessage> + HasTimestamp + Clone> PoolSection<T>
Expand Down
35 changes: 34 additions & 1 deletion rs/artifact_pool/src/lmdb_pool.rs
Expand Up @@ -412,7 +412,7 @@ impl<Artifact: PoolArtifact> PersistentHeightIndexedPool<Artifact> {
}

/// Get the index database of the given type_key.
/// Each index database maps HeightKey to a list of IdKey.
/// Each index database maps [`HeightKey`] to a list of [`IdKey`].
fn get_index_db(&self, type_key: &TypeKey) -> Database {
self.indices
.iter()
Expand Down Expand Up @@ -738,6 +738,19 @@ where
_ => Err(OnlyError::MultipleValues),
}
}

fn size(&self) -> usize {
let index_db = self.get_index_db(&Message::type_key());
let Some(tx) = log_err!(self.db_env.begin_ro_txn(), &self.log, "begin_ro_txn") else {
return 0;
};
let Some(mut cursor) = log_err!(tx.open_ro_cursor(index_db), &self.log, "open_ro_cursor")
else {
return 0;
};

cursor.iter().count()
}
}

///////////////////////////// Consensus Pool /////////////////////////////
Expand Down Expand Up @@ -2107,9 +2120,29 @@ mod tests {
});
}

fn assert_count_consistency_<T>(pool: &dyn HeightIndexedPool<T>) {
assert_eq!(pool.size(), pool.get_all().count());
}

fn assert_count_consistency(pool: &PersistentHeightIndexedPool<ConsensusMessage>) {
assert_count_consistency_(pool.random_beacon());
assert_count_consistency_(pool.random_tape());
assert_count_consistency_(pool.block_proposal());
assert_count_consistency_(pool.notarization());
assert_count_consistency_(pool.finalization());
assert_count_consistency_(pool.random_beacon_share());
assert_count_consistency_(pool.random_tape_share());
assert_count_consistency_(pool.notarization_share());
assert_count_consistency_(pool.finalization_share());
assert_count_consistency_(pool.catch_up_package());
assert_count_consistency_(pool.catch_up_package_share());
}

// Assert that entries in artifacts db are reflected by index db and vice versa.
// Each entry should have a join partner when joining on IdKey.
fn assert_consistency(pool: &PersistentHeightIndexedPool<ConsensusMessage>) {
assert_count_consistency(pool);

let tx = pool.db_env.begin_ro_txn().unwrap();
// get all ids from all indices
let mut ids_index = pool
Expand Down
9 changes: 9 additions & 0 deletions rs/artifact_pool/src/rocksdb_pool.rs
Expand Up @@ -722,6 +722,11 @@ impl<Message: ConsensusMessageHashable + PerTypeCFInfo + 'static> HeightIndexedP
_ => Err(OnlyError::MultipleValues),
}
}

// not implemented
fn size(&self) -> usize {
0
}
}

pub fn new_pool_snapshot_iterator<Message: ConsensusMessageHashable + PerTypeCFInfo>(
Expand Down Expand Up @@ -1172,6 +1177,10 @@ impl<Message: CertificationType + PerTypeCFInfo + 'static> HeightIndexedPool<Mes
_ => Err(OnlyError::MultipleValues),
}
}

fn size(&self) -> usize {
0
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions rs/interfaces/src/consensus_pool.rs
Expand Up @@ -298,6 +298,9 @@ pub trait HeightIndexedPool<T> {
/// Return an iterator over instances of artifact of type T at the highest
/// height currently in the pool.
fn get_highest_iter(&self) -> Box<dyn Iterator<Item = T>>;

/// Return the number of artifacts of type T in the pool.
fn size(&self) -> usize;
}
// end::interface[]

Expand Down

0 comments on commit 1951764

Please sign in to comment.