Skip to content

Commit

Permalink
feat: add metrics to /ping response of event observer server (#297)
Browse files Browse the repository at this point in the history
Fixes #285
  • Loading branch information
MicaiahReid committed Jun 30, 2023
1 parent a2d3b14 commit 0e1ee7c
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 8 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/service/mod.rs
Expand Up @@ -216,7 +216,7 @@ impl Service {
match event {
ObserverEvent::PredicateRegistered(spec) => {
// If start block specified, use it.
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
// If no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/Cargo.toml
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = "1"
serde = {version = "1", features = ["rc"]}
serde_json = { version = "1", features = ["arbitrary_precision"] }
serde-hex = "0.1.0"
serde_derive = "1"
Expand Down
9 changes: 6 additions & 3 deletions components/chainhook-sdk/src/observer/http.rs
Expand Up @@ -8,16 +8,19 @@ use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex, RwLock};

use super::{
BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand,
BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, ObserverMetrics,
StacksChainMempoolEvent,
};

#[rocket::get("/ping", format = "application/json")]
pub fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
pub fn handle_ping(
ctx: &State<Context>,
metrics_rw_lock: &State<Arc<RwLock<ObserverMetrics>>>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "GET /ping"));
Json(json!({
"status": 200,
"result": "Ok",
"result": metrics_rw_lock.inner(),
}))
}

Expand Down
198 changes: 195 additions & 3 deletions components/chainhook-sdk/src/observer/mod.rs
Expand Up @@ -46,7 +46,7 @@ use std::str;
use std::str::FromStr;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "zeromq")]
use zeromq::{Socket, SocketRecv};

Expand Down Expand Up @@ -359,6 +359,35 @@ impl ChainhookStore {
}
}

#[derive(Debug, Default, Serialize, Clone)]
pub struct ReorgMetrics {
timestamp: i64,
applied_blocks: usize,
rolled_back_blocks: usize,
}

#[derive(Debug, Default, Serialize, Clone)]
pub struct ChainMetrics {
pub tip_height: u64,
pub last_reorg: Option<ReorgMetrics>,
pub last_block_ingestion_at: u128,
pub registered_predicates: usize,
pub deregistered_predicates: usize,
}

impl ChainMetrics {
pub fn deregister_prediate(&mut self) {
self.registered_predicates -= 1;
self.deregistered_predicates += 1;
}
}

#[derive(Debug, Default, Serialize, Clone)]
pub struct ObserverMetrics {
pub bitcoin: ChainMetrics,
pub stacks: ChainMetrics,
}

pub async fn start_event_observer(
mut config: EventObserverConfig,
observer_commands_tx: Sender<ObserverCommand>,
Expand Down Expand Up @@ -409,6 +438,18 @@ pub async fn start_event_observer(

let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));

let observer_metrics = ObserverMetrics {
bitcoin: ChainMetrics {
registered_predicates: chainhook_store.predicates.bitcoin_chainhooks.len(),
..Default::default()
},
stacks: ChainMetrics {
registered_predicates: chainhook_store.predicates.stacks_chainhooks.len(),
..Default::default()
},
};
let observer_metrics_rw_lock = Arc::new(RwLock::new(observer_metrics));

let limits = Limits::default().limit("json", 20.megabytes());
let mut shutdown_config = config::Shutdown::default();
shutdown_config.ctrlc = false;
Expand Down Expand Up @@ -451,6 +492,7 @@ pub async fn start_event_observer(
.manage(background_job_tx_mutex)
.manage(bitcoin_config)
.manage(ctx_cloned)
.manage(observer_metrics_rw_lock.clone())
.mount("/", routes)
.ignite()
.await?;
Expand All @@ -470,6 +512,7 @@ pub async fn start_event_observer(
observer_commands_rx,
observer_events_tx,
ingestion_shutdown,
observer_metrics_rw_lock.clone(),
ctx,
)
.await
Expand Down Expand Up @@ -653,6 +696,7 @@ pub async fn start_observer_commands_handler(
observer_commands_rx: Receiver<ObserverCommand>,
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
ingestion_shutdown: Option<Shutdown>,
observer_metrics: Arc<RwLock<ObserverMetrics>>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
let mut chainhooks_occurrences_tracker: HashMap<String, u64> = HashMap::new();
Expand Down Expand Up @@ -728,6 +772,21 @@ pub async fn start_observer_commands_handler(
}
};
};
match observer_metrics.write() {
Ok(mut metrics) => {
if new_block.block_identifier.index > metrics.bitcoin.tip_height {
metrics.bitcoin.tip_height = new_block.block_identifier.index;
}
metrics.bitcoin.last_block_ingestion_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.into();
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
};
bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block);
}
ObserverCommand::CacheBitcoinBlock(block) => {
Expand Down Expand Up @@ -974,6 +1033,29 @@ pub async fn start_observer_commands_handler(
}
}

match blocks_to_apply
.iter()
.max_by_key(|b| b.block_identifier.index)
{
Some(highest_tip_block) => match observer_metrics.write() {
Ok(mut metrics) => {
metrics.bitcoin.last_reorg = Some(ReorgMetrics {
timestamp: highest_tip_block.timestamp.into(),
applied_blocks: blocks_to_apply.len(),
rolled_back_blocks: blocks_to_rollback.len(),
});
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}

BitcoinChainEvent::ChainUpdatedWithReorg(BitcoinChainUpdatedWithReorgData {
blocks_to_apply,
blocks_to_rollback,
Expand Down Expand Up @@ -1108,6 +1190,17 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Bitcoin(chainhook),
));
}

match observer_metrics.write() {
Ok(mut metrics) => metrics.bitcoin.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
}
}
}

Expand Down Expand Up @@ -1157,6 +1250,66 @@ pub async fn start_observer_commands_handler(
stacks_chainhooks.len()
)
});
// track stacks chain metrics
match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(update) => {
match update
.new_blocks
.iter()
.max_by_key(|b| b.block.block_identifier.index)
{
Some(highest_tip_update) => match observer_metrics.write() {
Ok(mut metrics) => {
if highest_tip_update.block.block_identifier.index
> metrics.stacks.tip_height
{
metrics.stacks.tip_height =
highest_tip_update.block.block_identifier.index;
}
metrics.stacks.last_block_ingestion_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.into();
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}
}
StacksChainEvent::ChainUpdatedWithReorg(update) => {
match update
.blocks_to_apply
.iter()
.max_by_key(|b| b.block.block_identifier.index)
{
Some(highest_tip_update) => match observer_metrics.write() {
Ok(mut metrics) => {
metrics.stacks.last_reorg = Some(ReorgMetrics {
timestamp: highest_tip_update.block.timestamp.into(),
applied_blocks: update.blocks_to_apply.len(),
rolled_back_blocks: update.blocks_to_rollback.len(),
});
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}
}
_ => {}
}

// process hooks
let (predicates_triggered, predicates_evaluated) =
Expand Down Expand Up @@ -1241,6 +1394,17 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Stacks(chainhook),
));
}

match observer_metrics.write() {
Ok(mut metrics) => metrics.stacks.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
}
}
}

Expand Down Expand Up @@ -1286,7 +1450,7 @@ pub async fn start_observer_commands_handler(
.predicates
.register_full_specification(networks, spec)
{
Ok(uuid) => uuid,
Ok(spec) => spec,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(
Expand All @@ -1300,11 +1464,25 @@ pub async fn start_observer_commands_handler(
};
ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),));
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::PredicateRegistered(spec));
let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone()));
} else {
ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid()));
chainhook_store.predicates.enable_specification(&mut spec);
}

match observer_metrics.write() {
Ok(mut metrics) => match spec {
ChainhookSpecification::Bitcoin(_) => {
metrics.bitcoin.registered_predicates += 1
}
ChainhookSpecification::Stacks(_) => {
metrics.stacks.registered_predicates += 1
}
},
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
};
}
ObserverCommand::EnablePredicate(mut spec) => {
ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid()));
Expand All @@ -1323,6 +1501,13 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Stacks(hook),
));
}

match observer_metrics.write() {
Ok(mut metrics) => metrics.stacks.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
}
}
ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => {
ctx.try_log(|logger| {
Expand All @@ -1335,6 +1520,13 @@ pub async fn start_observer_commands_handler(
let _ = tx.send(ObserverEvent::PredicateDeregistered(
ChainhookSpecification::Bitcoin(hook),
));

match observer_metrics.write() {
Ok(mut metrics) => metrics.bitcoin.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
}
}
}
}
Expand Down

0 comments on commit 0e1ee7c

Please sign in to comment.