diff --git a/Cargo.lock b/Cargo.lock index d82ead2..7f25384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3224,6 +3224,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "async-trait", + "axum 0.7.5", "bigdecimal 0.4.2", "clap", "confy", @@ -3234,9 +3235,12 @@ dependencies = [ "ethers-signers", "eventuals", "futures", + "futures-util", "indexer-common", "jsonrpsee 0.20.2", "lazy_static", + "log", + "prometheus", "ractor", "reqwest 0.12.3", "serde", diff --git a/tap-agent/Cargo.toml b/tap-agent/Cargo.toml index 80eb04e..2076f39 100644 --- a/tap-agent/Cargo.toml +++ b/tap-agent/Cargo.toml @@ -19,6 +19,10 @@ confy = "0.5.1" dotenvy = "0.15.7" ethereum-types = "0.14.1" eventuals = "0.6.7" +log = "0.4.19" +prometheus = "0.13.3" +axum = "0.7.5" +futures-util = "0.3.28" indexer-common = { version = "0.1.0", path = "../common" } jsonrpsee = { version = "0.20.2", features = ["http-client", "macros"] } lazy_static = "1.4.0" diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index a4d75b0..19f83a4 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -5,6 +5,7 @@ use std::collections::HashSet; use std::{collections::HashMap, str::FromStr}; use crate::agent::sender_allocation::SenderAllocationMessage; +use crate::lazy_static; use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use anyhow::Result; @@ -18,9 +19,20 @@ use thegraph::types::Address; use tokio::select; use tracing::{error, warn}; +use prometheus::{register_counter_vec, CounterVec}; + use super::sender_account::{SenderAccount, SenderAccountArgs, SenderAccountMessage}; use crate::config; +lazy_static! { + static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!( + format!("receipt_creted"), + "Receipt created", + &["sender", "allocation"] + ) + .unwrap(); +} + #[derive(Deserialize, Debug)] pub struct NewReceiptNotification { pub id: u64, @@ -444,6 +456,7 @@ async fn new_receipts_watcher( }; let allocation_id = &new_receipt_notification.allocation_id; + let allocation_str = &allocation_id.to_string(); let actor_name = format!( "{}{sender_address}:{allocation_id}", @@ -469,6 +482,9 @@ async fn new_receipts_watcher( allocation_id ); } + RECEIPTS_CREATED + .with_label_values(&[&sender_address.to_string(), &allocation_str]) + .inc(); } } diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index a1ebbe8..b19e8af 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -1,7 +1,10 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use alloy_primitives::hex::ToHex; use alloy_sol_types::Eip712Domain; @@ -10,6 +13,10 @@ use bigdecimal::num_bigint::BigInt; use eventuals::Eventual; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; +use prometheus::{ + register_counter, register_counter_vec, register_gauge_vec, register_histogram_vec, Counter, + CounterVec, GaugeVec, HistogramVec, +}; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; @@ -24,6 +31,8 @@ use tap_core::{ use thegraph::types::Address; use tracing::{error, warn}; +use crate::lazy_static; + use crate::agent::sender_account::SenderAccountMessage; use crate::agent::sender_accounts_manager::NewReceiptNotification; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; @@ -34,6 +43,59 @@ use crate::{ tap::{context::checks::AllocationId, escrow_adapter::EscrowAdapter}, }; +lazy_static! { + static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!( + format!("unagregated_fees"), + "Unggregated Fees", + &["sender", "allocation"] + ) + .unwrap(); +} + +lazy_static! { + static ref RAV_VALUE: GaugeVec = register_gauge_vec!( + format!("rav_value"), + "RAV Updated value", + &["sender", "allocation"] + ) + .unwrap(); +} + +lazy_static! { + static ref CLOSED_SENDER_ALLOCATIONS: Counter = register_counter!( + format!("closed_sender_allocation_total"), + "Total count of sender-allocation managers closed.", + ) + .unwrap(); +} + +lazy_static! { + static ref RAVS_CREATED: CounterVec = register_counter_vec!( + format!("ravs_created"), + "RAVs updated or created per sender allocation", + &["sender", "allocation"] + ) + .unwrap(); +} + +lazy_static! { + static ref RAVS_FAILED: CounterVec = register_counter_vec!( + format!("ravs_failed"), + "RAVs failed when created or updated per sender allocation", + &["sender", "allocation"] + ) + .unwrap(); +} + +lazy_static! { + static ref RAV_RESPONSE_TIME: HistogramVec = register_histogram_vec!( + format!("rav_response_time"), + "RAV response time per sender", + &["sender"] + ) + .unwrap(); +} + type TapManager = tap_core::manager::Manager; /// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair. @@ -100,6 +162,10 @@ impl Actor for SenderAllocation { "SenderAllocation created!", ); + UNAGGREGATED_FEES + .with_label_values(&[&state.sender.to_string(), &state.allocation_id.to_string()]) + .set(state.unaggregated_fees.value as f64); + Ok(state) } @@ -115,7 +181,6 @@ impl Actor for SenderAllocation { allocation_id = %state.allocation_id, "Closing SenderAllocation, triggering last rav", ); - // Request a RAV and mark the allocation as final. if state.unaggregated_fees.value > 0 { state.rav_requester_single().await.inspect_err(|e| { @@ -123,6 +188,12 @@ impl Actor for SenderAllocation { "Error while requesting RAV for sender {} and allocation {}: {}", state.sender, state.allocation_id, e ); + RAVS_FAILED + .with_label_values(&[ + &state.sender.to_string(), + &state.allocation_id.to_string(), + ]) + .inc(); })?; } state.mark_rav_last().await.inspect_err(|e| { @@ -132,6 +203,9 @@ impl Actor for SenderAllocation { ); })?; + //Since this is only triggered after allocation is closed will be counted here + CLOSED_SENDER_ALLOCATIONS.inc(); + Ok(()) } @@ -179,8 +253,22 @@ impl Actor for SenderAllocation { match state.rav_requester_single().await { Ok(_) => { state.unaggregated_fees = state.calculate_unaggregated_fee().await?; + + UNAGGREGATED_FEES + .with_label_values(&[ + &state.sender.to_string(), + &state.allocation_id.to_string(), + ]) + .set(state.unaggregated_fees.value as f64); } Err(e) => { + RAVS_FAILED + .with_label_values(&[ + &state.sender.to_string(), + &state.allocation_id.to_string(), + ]) + .inc(); + error! ( %state.sender, %state.allocation_id, @@ -363,6 +451,7 @@ impl SenderAllocationState { self.config.tap.rav_request_timeout_secs, )) .build(&self.sender_aggregator_endpoint)?; + let rav_response_time_start = Instant::now(); let response: JsonRpcResponse> = client .request( "aggregate_receipts", @@ -373,6 +462,12 @@ impl SenderAllocationState { ), ) .await?; + + let rav_response_time = rav_response_time_start.elapsed(); + RAV_RESPONSE_TIME + .with_label_values(&[&self.sender.to_string()]) + .observe(rav_response_time.as_secs_f64()); + if let Some(warnings) = response.warnings { warn!("Warnings from sender's TAP aggregator: {:?}", warnings); } @@ -408,6 +503,14 @@ impl SenderAllocationState { anyhow::bail!("Error while verifying and storing RAV: {:?}", e); } } + + RAV_VALUE + .with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()]) + .set(expected_rav.clone().valueAggregate as f64); + RAVS_CREATED + .with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()]) + .inc(); + Ok(()) } diff --git a/tap-agent/src/config.rs b/tap-agent/src/config.rs index 1718ef1..773f1ca 100644 --- a/tap-agent/src/config.rs +++ b/tap-agent/src/config.rs @@ -224,6 +224,14 @@ pub struct EscrowSubgraph { #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] #[group(required = true, multiple = true)] pub struct Tap { + #[clap( + long, + value_name = "tap-metrics-port", + env = "TAP_METRICS_PORT", + default_value_t = 7303, + help = "Port to serve tap agent Prometheus metrics at" + )] + pub metrics_port: u16, #[clap( long, value_name = "rav-request-trigger-value", diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index 3e82585..84b14a5 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -20,4 +20,5 @@ pub mod agent; pub mod aggregator_endpoints; pub mod config; pub mod database; +pub mod metrics; pub mod tap; diff --git a/tap-agent/src/main.rs b/tap-agent/src/main.rs index b53b13a..029ef2e 100644 --- a/tap-agent/src/main.rs +++ b/tap-agent/src/main.rs @@ -6,7 +6,7 @@ use ractor::ActorStatus; use tokio::signal::unix::{signal, SignalKind}; use tracing::{debug, error, info}; -use indexer_tap_agent::{agent, CONFIG}; +use indexer_tap_agent::{agent, metrics, CONFIG}; #[tokio::main] async fn main() -> Result<()> { @@ -17,6 +17,9 @@ async fn main() -> Result<()> { let (manager, handler) = agent::start_agent().await; info!("TAP Agent started."); + tokio::spawn(metrics::run_server(CONFIG.tap.metrics_port)); + info!("Metrics port opened"); + // Have tokio wait for SIGTERM or SIGINT. let mut signal_sigint = signal(SignalKind::interrupt())?; let mut signal_sigterm = signal(SignalKind::terminate())?; diff --git a/tap-agent/src/metrics.rs b/tap-agent/src/metrics.rs new file mode 100644 index 0000000..f3cbb02 --- /dev/null +++ b/tap-agent/src/metrics.rs @@ -0,0 +1,62 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{net::SocketAddr, panic}; + +use axum::{http::StatusCode, response::IntoResponse, routing::get, Router}; +use futures_util::FutureExt; +use log::{debug, info}; +use prometheus::TextEncoder; +use tracing::error; + +async fn handler_metrics() -> (StatusCode, String) { + let metric_families = prometheus::gather(); + let encoder = TextEncoder::new(); + + match encoder.encode_to_string(&metric_families) { + Ok(s) => (StatusCode::OK, s), + Err(e) => { + error!("Error encoding metrics: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error encoding metrics: {}", e), + ) + } + } +} + +async fn handler_404() -> impl IntoResponse { + (StatusCode::NOT_FOUND, "404 Not Found") +} + +async fn _run_server(port: u16) { + let app = Router::new() + .route("/metrics", get(handler_metrics)) + .fallback(handler_404); + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("Failed to Bind metrics address`"); + let server = axum::serve(listener, app.into_make_service()); + + info!("Metrics server listening on {}", addr); + + let res = server.await; + + debug!("Metrics server stopped"); + + if let Err(err) = res { + panic!("Metrics server error: {:#?}", err); + }; +} + +pub async fn run_server(port: u16) { + // Code here is to abort program if there is a panic in _run_server + // Otherwise, when spawning the task, the panic will be silently ignored + let res = panic::AssertUnwindSafe(_run_server(port)) + .catch_unwind() + .await; + if res.is_err() { + std::process::abort(); + } +}