Skip to content

Commit

Permalink
fix: add block number value to the metrics (#2219)
Browse files Browse the repository at this point in the history
* fix: add block number value to the metrics

* parse block_number as BlockNumber (which is u64)
  • Loading branch information
kmd-fl committed Apr 11, 2024
1 parent d3d5f27 commit 39f71d7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
22 changes: 17 additions & 5 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy_primitives::{Address, FixedBytes, Uint, U256};
use alloy_primitives::{Address, BlockNumber, FixedBytes, Uint, U256};
use alloy_sol_types::SolEvent;
use backoff::Error::Permanent;
use std::collections::{BTreeMap, BTreeSet, HashMap};
Expand Down Expand Up @@ -648,7 +648,7 @@ impl ChainListener {
let header = event.ok_or(eyre!("Failed to process newHeads event: got None"))?;

let (block_timestamp, block_number) = Self::parse_block_header(header?)?;
self.observe(|m| m.observe_new_block(block_number.to_string()));
self.observe(|m| m.observe_new_block(block_number));

// `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration`
let epoch_number =
Expand Down Expand Up @@ -685,7 +685,7 @@ impl ChainListener {
}
}
}
self.observe(|m| m.observe_processed_block());
self.observe(|m| m.observe_processed_block(block_number));
Ok(())
}

Expand Down Expand Up @@ -1162,7 +1162,16 @@ impl ChainListener {
}
}

fn parse_block_header(header: Value) -> eyre::Result<(U256, U256)> {
fn parse_block_number(block_number: &str) -> eyre::Result<BlockNumber> {
let block_number_ = block_number.strip_prefix("0x").ok_or(eyre::eyre!(
"newHeads: block number is not hex; got {block_number}"
))?;
BlockNumber::from_str_radix(block_number_, 16).map_err(|err| {
eyre::eyre!("Failed to parse block number: {err}, block_number {block_number}")
})
}

fn parse_block_header(header: Value) -> eyre::Result<(U256, BlockNumber)> {
let obj = header.as_object().ok_or(eyre::eyre!(
"newHeads: header is not an object; got {header}"
))?;
Expand All @@ -1183,7 +1192,10 @@ impl ChainListener {
))?
.to_string();

Ok((U256::from_str(&timestamp)?, U256::from_str(&block_number)?))
Ok((
U256::from_str(&timestamp)?,
Self::parse_block_number(&block_number)?,
))
}

async fn poll_deal_statuses(&mut self) -> eyre::Result<()> {
Expand Down
36 changes: 25 additions & 11 deletions crates/peer-metrics/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ use crate::{execution_time_buckets, register};
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::exemplar::CounterWithExemplar;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct BlockLabel {
block_number: String,
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct TxLabel {
tx_hash: String,
Expand All @@ -33,9 +29,11 @@ pub struct ChainListenerMetrics {
// how many proofs transaction are failed
ccp_proofs_tx_failed: CounterWithExemplar<TxLabel>,
// How many blocks we have received from the newHead subscription
blocks_seen: CounterWithExemplar<BlockLabel>,
blocks_seen: Counter,
last_seen_block: Gauge,
// How many block we manage to process while processing the block
blocks_processed: Counter,
last_process_block: Gauge,
}

impl ChainListenerMetrics {
Expand Down Expand Up @@ -93,7 +91,7 @@ impl ChainListenerMetrics {

let blocks_seen = register(
sub_registry,
CounterWithExemplar::default(),
Counter::default(),
"blocks_seen",
"Total number of blocks seen from the newHead subscription",
);
Expand All @@ -105,6 +103,19 @@ impl ChainListenerMetrics {
"Total number of blocks processed",
);

let last_seen_block = register(
sub_registry,
Gauge::default(),
"last_seen_block",
"Last block seen from the newHead subscription",
);
let last_process_block = register(
sub_registry,
Gauge::default(),
"last_process_block",
"Last processed block from the newHead subscription",
);

Self {
ccp_requests_total,
ccp_replies_total,
Expand All @@ -114,7 +125,9 @@ impl ChainListenerMetrics {
ccp_proofs_tx_success,
ccp_proofs_tx_failed,
blocks_seen,
last_seen_block,
blocks_processed,
last_process_block,
}
}

Expand Down Expand Up @@ -144,12 +157,13 @@ impl ChainListenerMetrics {
.inc_by(1, Some(TxLabel { tx_hash }));
}

pub fn observe_new_block(&self, block_number: String) {
self.blocks_seen
.inc_by(1, Some(BlockLabel { block_number }));
pub fn observe_new_block(&self, block_number: u64) {
self.blocks_seen.inc();
self.last_seen_block.set(block_number as i64);
}

pub fn observe_processed_block(&self) {
pub fn observe_processed_block(&self, block_number: u64) {
self.blocks_processed.inc();
self.last_process_block.set(block_number as i64);
}
}

0 comments on commit 39f71d7

Please sign in to comment.