diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index b6556c24..1b595864 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -216,7 +216,7 @@ impl Service { match event { ObserverEvent::PredicateRegistered(spec) => { // If start block specified, use it. - // I no start block specified, depending on the nature the hook, we'd like to retrieve: + // If no start block specified, depending on the nature the hook, we'd like to retrieve: // - contract-id if let PredicatesApi::On(ref config) = self.config.http_api { let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) diff --git a/components/chainhook-sdk/Cargo.toml b/components/chainhook-sdk/Cargo.toml index d1247e71..1d5aced6 100644 --- a/components/chainhook-sdk/Cargo.toml +++ b/components/chainhook-sdk/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = "1" +serde = {version = "1", features = ["rc"]} serde_json = { version = "1", features = ["arbitrary_precision"] } serde-hex = "0.1.0" serde_derive = "1" diff --git a/components/chainhook-sdk/src/observer/http.rs b/components/chainhook-sdk/src/observer/http.rs index 53bbff1d..5cce36e3 100644 --- a/components/chainhook-sdk/src/observer/http.rs +++ b/components/chainhook-sdk/src/observer/http.rs @@ -8,16 +8,19 @@ use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex, RwLock}; use super::{ - BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, + BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, ObserverMetrics, StacksChainMempoolEvent, }; #[rocket::get("/ping", format = "application/json")] -pub fn handle_ping(ctx: &State) -> Json { +pub fn handle_ping( + ctx: &State, + metrics_rw_lock: &State>>, +) -> Json { ctx.try_log(|logger| slog::info!(logger, "GET /ping")); Json(json!({ "status": 200, - "result": "Ok", + "result": metrics_rw_lock.inner(), })) } diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 4473e776..8293a819 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -46,7 +46,7 @@ use std::str; use std::str::FromStr; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[cfg(feature = "zeromq")] use zeromq::{Socket, SocketRecv}; @@ -359,6 +359,35 @@ impl ChainhookStore { } } +#[derive(Debug, Default, Serialize, Clone)] +pub struct ReorgMetrics { + timestamp: i64, + applied_blocks: usize, + rolled_back_blocks: usize, +} + +#[derive(Debug, Default, Serialize, Clone)] +pub struct ChainMetrics { + pub tip_height: u64, + pub last_reorg: Option, + pub last_block_ingestion_at: u128, + pub registered_predicates: usize, + pub deregistered_predicates: usize, +} + +impl ChainMetrics { + pub fn deregister_prediate(&mut self) { + self.registered_predicates -= 1; + self.deregistered_predicates += 1; + } +} + +#[derive(Debug, Default, Serialize, Clone)] +pub struct ObserverMetrics { + pub bitcoin: ChainMetrics, + pub stacks: ChainMetrics, +} + pub async fn start_event_observer( mut config: EventObserverConfig, observer_commands_tx: Sender, @@ -409,6 +438,18 @@ pub async fn start_event_observer( let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone())); + let observer_metrics = ObserverMetrics { + bitcoin: ChainMetrics { + registered_predicates: chainhook_store.predicates.bitcoin_chainhooks.len(), + ..Default::default() + }, + stacks: ChainMetrics { + registered_predicates: chainhook_store.predicates.stacks_chainhooks.len(), + ..Default::default() + }, + }; + let observer_metrics_rw_lock = Arc::new(RwLock::new(observer_metrics)); + let limits = Limits::default().limit("json", 20.megabytes()); let mut shutdown_config = config::Shutdown::default(); shutdown_config.ctrlc = false; @@ -451,6 +492,7 @@ pub async fn start_event_observer( .manage(background_job_tx_mutex) .manage(bitcoin_config) .manage(ctx_cloned) + .manage(observer_metrics_rw_lock.clone()) .mount("/", routes) .ignite() .await?; @@ -470,6 +512,7 @@ pub async fn start_event_observer( observer_commands_rx, observer_events_tx, ingestion_shutdown, + observer_metrics_rw_lock.clone(), ctx, ) .await @@ -653,6 +696,7 @@ pub async fn start_observer_commands_handler( observer_commands_rx: Receiver, observer_events_tx: Option>, ingestion_shutdown: Option, + observer_metrics: Arc>, ctx: Context, ) -> Result<(), Box> { let mut chainhooks_occurrences_tracker: HashMap = HashMap::new(); @@ -728,6 +772,21 @@ pub async fn start_observer_commands_handler( } }; }; + match observer_metrics.write() { + Ok(mut metrics) => { + if new_block.block_identifier.index > metrics.bitcoin.tip_height { + metrics.bitcoin.tip_height = new_block.block_identifier.index; + } + metrics.bitcoin.last_block_ingestion_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_millis() + .into(); + } + Err(e) => ctx.try_log(|logger| { + slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) + }), + }; bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block); } ObserverCommand::CacheBitcoinBlock(block) => { @@ -974,6 +1033,29 @@ pub async fn start_observer_commands_handler( } } + match blocks_to_apply + .iter() + .max_by_key(|b| b.block_identifier.index) + { + Some(highest_tip_block) => match observer_metrics.write() { + Ok(mut metrics) => { + metrics.bitcoin.last_reorg = Some(ReorgMetrics { + timestamp: highest_tip_block.timestamp.into(), + applied_blocks: blocks_to_apply.len(), + rolled_back_blocks: blocks_to_rollback.len(), + }); + } + Err(e) => ctx.try_log(|logger| { + slog::warn!( + logger, + "unable to acquire observer_metrics_rw_lock:{}", + e + ) + }), + }, + None => {} + } + BitcoinChainEvent::ChainUpdatedWithReorg(BitcoinChainUpdatedWithReorgData { blocks_to_apply, blocks_to_rollback, @@ -1108,6 +1190,17 @@ pub async fn start_observer_commands_handler( ChainhookSpecification::Bitcoin(chainhook), )); } + + match observer_metrics.write() { + Ok(mut metrics) => metrics.bitcoin.deregister_prediate(), + Err(e) => ctx.try_log(|logger| { + slog::warn!( + logger, + "unable to acquire observer_metrics_rw_lock:{}", + e + ) + }), + } } } @@ -1157,6 +1250,66 @@ pub async fn start_observer_commands_handler( stacks_chainhooks.len() ) }); + // track stacks chain metrics + match &chain_event { + StacksChainEvent::ChainUpdatedWithBlocks(update) => { + match update + .new_blocks + .iter() + .max_by_key(|b| b.block.block_identifier.index) + { + Some(highest_tip_update) => match observer_metrics.write() { + Ok(mut metrics) => { + if highest_tip_update.block.block_identifier.index + > metrics.stacks.tip_height + { + metrics.stacks.tip_height = + highest_tip_update.block.block_identifier.index; + } + metrics.stacks.last_block_ingestion_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_millis() + .into(); + } + Err(e) => ctx.try_log(|logger| { + slog::warn!( + logger, + "unable to acquire observer_metrics_rw_lock:{}", + e + ) + }), + }, + None => {} + } + } + StacksChainEvent::ChainUpdatedWithReorg(update) => { + match update + .blocks_to_apply + .iter() + .max_by_key(|b| b.block.block_identifier.index) + { + Some(highest_tip_update) => match observer_metrics.write() { + Ok(mut metrics) => { + metrics.stacks.last_reorg = Some(ReorgMetrics { + timestamp: highest_tip_update.block.timestamp.into(), + applied_blocks: update.blocks_to_apply.len(), + rolled_back_blocks: update.blocks_to_rollback.len(), + }); + } + Err(e) => ctx.try_log(|logger| { + slog::warn!( + logger, + "unable to acquire observer_metrics_rw_lock:{}", + e + ) + }), + }, + None => {} + } + } + _ => {} + } // process hooks let (predicates_triggered, predicates_evaluated) = @@ -1241,6 +1394,17 @@ pub async fn start_observer_commands_handler( ChainhookSpecification::Stacks(chainhook), )); } + + match observer_metrics.write() { + Ok(mut metrics) => metrics.stacks.deregister_prediate(), + Err(e) => ctx.try_log(|logger| { + slog::warn!( + logger, + "unable to acquire observer_metrics_rw_lock:{}", + e + ) + }), + } } } @@ -1286,7 +1450,7 @@ pub async fn start_observer_commands_handler( .predicates .register_full_specification(networks, spec) { - Ok(uuid) => uuid, + Ok(spec) => spec, Err(e) => { ctx.try_log(|logger| { slog::error!( @@ -1300,11 +1464,25 @@ pub async fn start_observer_commands_handler( }; ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),)); if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateRegistered(spec)); + let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone())); } else { ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid())); chainhook_store.predicates.enable_specification(&mut spec); } + + match observer_metrics.write() { + Ok(mut metrics) => match spec { + ChainhookSpecification::Bitcoin(_) => { + metrics.bitcoin.registered_predicates += 1 + } + ChainhookSpecification::Stacks(_) => { + metrics.stacks.registered_predicates += 1 + } + }, + Err(e) => ctx.try_log(|logger| { + slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) + }), + }; } ObserverCommand::EnablePredicate(mut spec) => { ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid())); @@ -1323,6 +1501,13 @@ pub async fn start_observer_commands_handler( ChainhookSpecification::Stacks(hook), )); } + + match observer_metrics.write() { + Ok(mut metrics) => metrics.stacks.deregister_prediate(), + Err(e) => ctx.try_log(|logger| { + slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) + }), + } } ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => { ctx.try_log(|logger| { @@ -1335,6 +1520,13 @@ pub async fn start_observer_commands_handler( let _ = tx.send(ObserverEvent::PredicateDeregistered( ChainhookSpecification::Bitcoin(hook), )); + + match observer_metrics.write() { + Ok(mut metrics) => metrics.bitcoin.deregister_prediate(), + Err(e) => ctx.try_log(|logger| { + slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e) + }), + } } } } diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 8016270b..6f11f857 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -11,6 +11,7 @@ use crate::indexer::tests::helpers::{ }; use crate::observer::{ start_observer_commands_handler, ChainhookStore, EventObserverConfig, ObserverCommand, + ObserverMetrics, }; use crate::utils::{AbstractBlock, Context}; use chainhook_types::{ @@ -20,6 +21,7 @@ use chainhook_types::{ use hiro_system_kit; use std::collections::BTreeMap; use std::sync::mpsc::{channel, Sender}; +use std::sync::{Arc, RwLock}; use super::ObserverEvent; @@ -201,6 +203,8 @@ fn generate_and_register_new_bitcoin_chainhook( fn test_stacks_chainhook_register_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); + let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); + let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -210,6 +214,7 @@ fn test_stacks_chainhook_register_deregister() { observer_commands_rx, Some(observer_events_tx), None, + observer_metrics_rw_lock_moved, Context::empty(), )); }); @@ -223,6 +228,16 @@ fn test_stacks_chainhook_register_deregister() { "increment", ); + // registering stacks chainhook should increment the observer_metric's registered stacks hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .registered_predicates + ); + // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( 0, @@ -373,6 +388,25 @@ fn test_stacks_chainhook_register_deregister() { _ => false, }); + // deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks + assert_eq!( + 0, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .registered_predicates + ); + // and increment the deregistered hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .deregistered_predicates + ); + // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( 2, @@ -443,6 +477,8 @@ fn test_stacks_chainhook_register_deregister() { fn test_stacks_chainhook_auto_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); + let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); + let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -452,6 +488,7 @@ fn test_stacks_chainhook_auto_deregister() { observer_commands_rx, Some(observer_events_tx), None, + observer_metrics_rw_lock_moved, Context::empty(), )); }); @@ -479,6 +516,15 @@ fn test_stacks_chainhook_auto_deregister() { } _ => false, }); + // registering stacks chainhook should increment the observer_metric's registered stacks hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .registered_predicates + ); // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_stacks_contract_call( @@ -591,6 +637,25 @@ fn test_stacks_chainhook_auto_deregister() { _ => false, }); + // deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks + assert_eq!( + 0, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .registered_predicates + ); + // and increment the deregistered hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .stacks + .deregistered_predicates + ); + // Should propagate block assert!(match observer_events_rx.recv() { Ok(ObserverEvent::StacksChainEvent(_)) => { @@ -614,6 +679,8 @@ fn test_stacks_chainhook_auto_deregister() { fn test_bitcoin_chainhook_register_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); + let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); + let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -623,6 +690,7 @@ fn test_bitcoin_chainhook_register_deregister() { observer_commands_rx, Some(observer_events_tx), None, + observer_metrics_rw_lock_moved, Context::empty(), )); }); @@ -636,6 +704,16 @@ fn test_bitcoin_chainhook_register_deregister() { None, ); + // registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .registered_predicates + ); + // Simulate a block that does not include a trigger (wallet_1 to wallet_3) let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( 0, @@ -785,6 +863,25 @@ fn test_bitcoin_chainhook_register_deregister() { _ => false, }); + // deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks + assert_eq!( + 0, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .registered_predicates + ); + // and increment the deregistered hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .deregistered_predicates + ); + // Simulate a block that does not include a trigger let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( 2, @@ -854,6 +951,8 @@ fn test_bitcoin_chainhook_register_deregister() { fn test_bitcoin_chainhook_auto_deregister() { let (observer_commands_tx, observer_commands_rx) = channel(); let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded(); + let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default())); + let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone(); let handle = std::thread::spawn(move || { let (config, chainhook_store) = generate_test_config(); @@ -863,6 +962,7 @@ fn test_bitcoin_chainhook_auto_deregister() { observer_commands_rx, Some(observer_events_tx), None, + observer_metrics_rw_lock_moved, Context::empty(), )); }); @@ -876,6 +976,16 @@ fn test_bitcoin_chainhook_auto_deregister() { Some(1), ); + // registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .registered_predicates + ); + // Simulate a block that does not include a trigger (wallet_1 to wallet_3) let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer( 0, @@ -1012,6 +1122,25 @@ fn test_bitcoin_chainhook_auto_deregister() { _ => false, }); + // deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks + assert_eq!( + 0, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .registered_predicates + ); + // and increment the deregistered hooks + assert_eq!( + 1, + observer_metrics_rw_lock + .read() + .unwrap() + .bitcoin + .deregistered_predicates + ); + // Should propagate block assert!(match observer_events_rx.recv() { Ok(ObserverEvent::BitcoinChainEvent(_)) => {