Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric tap agent #161

Merged
merged 10 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ confy = "0.5.1"
dotenvy = "0.15.7"
ethereum-types = "0.14.1"
eventuals = "0.6.7"
log = "0.4.19"
prometheus = "0.13.3"
axum = "0.6.18"
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
futures-util = "0.3.28"
indexer-common = { version = "0.1.0", path = "../common" }
jsonrpsee = { version = "0.20.2", features = ["http-client", "macros"] }
lazy_static = "1.4.0"
Expand Down
16 changes: 16 additions & 0 deletions tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashSet;
use std::{collections::HashMap, str::FromStr};

use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::lazy_static;
use alloy_sol_types::Eip712Domain;
use anyhow::anyhow;
use anyhow::Result;
Expand All @@ -18,9 +19,20 @@ use thegraph::types::Address;
use tokio::select;
use tracing::{error, warn};

use prometheus::{register_counter_vec, CounterVec};

use super::sender_account::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
use crate::config;

lazy_static! {
static ref RECEIPTS_PER_SENDER_ALLOCATION: CounterVec = register_counter_vec!(
format!("aggregations_per_sender_x_allocation"),
"Unggregated Fees",
&["sender_allocation"]
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap();
}

#[derive(Deserialize, Debug)]
pub struct NewReceiptNotification {
pub id: u64,
Expand Down Expand Up @@ -444,6 +456,7 @@ async fn new_receipts_watcher(
};

let allocation_id = &new_receipt_notification.allocation_id;
let sender_allocation_str = sender_address.to_string() + "-" + &allocation_id.to_string();

let actor_name = format!(
"{}{sender_address}:{allocation_id}",
Expand All @@ -469,6 +482,9 @@ async fn new_receipts_watcher(
allocation_id
);
}
RECEIPTS_PER_SENDER_ALLOCATION
.with_label_values(&[&sender_allocation_str])
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
.inc();
}
}

Expand Down
95 changes: 93 additions & 2 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use alloy_primitives::hex::ToHex;
use alloy_sol_types::Eip712Domain;
Expand All @@ -10,6 +13,10 @@ use bigdecimal::num_bigint::BigInt;
use eventuals::Eventual;
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use prometheus::{
register_counter, register_counter_vec, register_gauge_vec, register_histogram_vec, Counter,
CounterVec, GaugeVec, HistogramVec,
};
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use sqlx::{types::BigDecimal, PgPool};
use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse;
Expand All @@ -24,6 +31,8 @@ use tap_core::{
use thegraph::types::Address;
use tracing::{error, warn};

use crate::lazy_static;

use crate::agent::sender_account::SenderAccountMessage;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
Expand All @@ -34,6 +43,55 @@ use crate::{
tap::{context::checks::AllocationId, escrow_adapter::EscrowAdapter},
};

lazy_static! {
static ref UNAGGREGATED_FEE_PER_SENDER_X_ALLOCATION: GaugeVec = register_gauge_vec!(
format!("unagregated_fee_per_sender_x_allocation"),
"Unggregated Fees",
&["sender_allocation"]
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap();
}

lazy_static! {
static ref RAV_VALUE: GaugeVec =
register_gauge_vec!(format!("rav_value"), "RAV Updated value", &["allocation"]).unwrap();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's missing the sender label.

Suggested change
lazy_static! {
static ref RAV_VALUE: GaugeVec =
register_gauge_vec!(format!("rav_value"), "RAV Updated value", &["allocation"]).unwrap();
}
lazy_static! {
static ref RAV_VALUE: GaugeVec =
register_gauge_vec!(format!("rav_value"), "Value of the last RAV per sender-allocation", &["sender", "allocation"]).unwrap();
}


lazy_static! {
static ref CLOSED_ALLOCATIONS: Counter = register_counter!(
format!("amount_of_closed_allocations"),
"allocations closed",
)
.unwrap();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be more specific as to what this is counting:

Suggested change
lazy_static! {
static ref CLOSED_ALLOCATIONS: Counter = register_counter!(
format!("amount_of_closed_allocations"),
"allocations closed",
)
.unwrap();
}
lazy_static! {
static ref CLOSED_SENDER_ALLOCATIONS: Counter = register_counter!(
format!("closed_sender_allocation_total"),
"Total count of sender-allocation managers closed.",
)
.unwrap();
}


lazy_static! {
static ref RAVS_PER_SENDER_ALLOCATION: CounterVec = register_counter_vec!(
format!("ravs_created_per_sender_allocation"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent with other labels. Sometimes you use sender_x_allocation and sometimes sender_allocation. Just pick either one and use it everywhere

"RAVs updated or created per sender allocation",
&["sender_allocation"]
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap();
}

lazy_static! {
static ref RAVS_FAILED_PER_SENDER_ALLOCATION: CounterVec = register_counter_vec!(
format!("ravs_failed_per_sender_allocation"),
"RAVs failed when created or updated per sender allocation",
&["sender_allocation"]
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap();
}

lazy_static! {
static ref RAV_RESPONSE_TIME_PER_SENDER: HistogramVec = register_histogram_vec!(
format!("rav_response_time_per_sender"),
"RAV response time per sender",
&["sender"]
)
.unwrap();
}

type TapManager = tap_core::manager::Manager<TapAgentContext>;

/// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair.
Expand Down Expand Up @@ -99,6 +157,11 @@ impl Actor for SenderAllocation {
allocation_id = %state.allocation_id,
"SenderAllocation created!",
);
let sender_allocation = state.sender.to_string() + "-" + &state.allocation_id.to_string();

UNAGGREGATED_FEE_PER_SENDER_X_ALLOCATION
.with_label_values(&[&sender_allocation])
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
.set(state.unaggregated_fees.value as f64);

Ok(state)
}
Expand All @@ -115,14 +178,17 @@ impl Actor for SenderAllocation {
allocation_id = %state.allocation_id,
"Closing SenderAllocation, triggering last rav",
);

let sender_allocation = state.sender.to_string() + "-" + &state.allocation_id.to_string();
// Request a RAV and mark the allocation as final.
if state.unaggregated_fees.value > 0 {
state.rav_requester_single().await.inspect_err(|e| {
error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
state.sender, state.allocation_id, e
);
RAVS_FAILED_PER_SENDER_ALLOCATION
.with_label_values(&[&sender_allocation])
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
.inc();
})?;
}
state.mark_rav_last().await.inspect_err(|e| {
Expand All @@ -132,6 +198,13 @@ impl Actor for SenderAllocation {
);
})?;

//Since this is only triggered after allocation is closed will be counted here
CLOSED_ALLOCATIONS.inc();

RAVS_PER_SENDER_ALLOCATION
.with_label_values(&[&sender_allocation])
.inc();
gusinacio marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

Expand Down Expand Up @@ -179,6 +252,12 @@ impl Actor for SenderAllocation {
match state.rav_requester_single().await {
Ok(_) => {
state.unaggregated_fees = state.calculate_unaggregated_fee().await?;
let sender_allocation =
state.sender.to_string() + "-" + &state.allocation_id.to_string();

UNAGGREGATED_FEE_PER_SENDER_X_ALLOCATION
.with_label_values(&[&sender_allocation])
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
.set(state.unaggregated_fees.value as f64);
}
Err(e) => {
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
error! (
Expand Down Expand Up @@ -363,6 +442,7 @@ impl SenderAllocationState {
self.config.tap.rav_request_timeout_secs,
))
.build(&self.sender_aggregator_endpoint)?;
let rav_response_time_start = Instant::now();
let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = client
.request(
"aggregate_receipts",
Expand All @@ -373,6 +453,12 @@ impl SenderAllocationState {
),
)
.await?;

let rav_response_time = rav_response_time_start.elapsed();
RAV_RESPONSE_TIME_PER_SENDER
.with_label_values(&[&self.sender.to_string()])
.observe(rav_response_time.as_secs_f64());

if let Some(warnings) = response.warnings {
warn!("Warnings from sender's TAP aggregator: {:?}", warnings);
}
Expand Down Expand Up @@ -408,6 +494,11 @@ impl SenderAllocationState {
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
}
}

RAV_VALUE
.with_label_values(&[&self.allocation_id.to_string()])
.set(expected_rav.clone().valueAggregate as f64);
//TODO: ADD RAV VALUE
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

Expand Down
8 changes: 8 additions & 0 deletions tap-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ pub struct EscrowSubgraph {
#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
#[group(required = true, multiple = true)]
pub struct Tap {
#[clap(
long,
value_name = "tap-metrics-port",
env = "TAP_METRICS_PORT",
default_value_t = 7303,
help = "Port to serve tap agent Prometheus metrics at"
)]
pub metrics_port: u16,
#[clap(
long,
value_name = "rav-request-trigger-value",
Expand Down
1 change: 1 addition & 0 deletions tap-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub mod agent;
pub mod aggregator_endpoints;
pub mod config;
pub mod database;
pub mod metrics;
pub mod tap;
5 changes: 4 additions & 1 deletion tap-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ractor::ActorStatus;
use tokio::signal::unix::{signal, SignalKind};
use tracing::{debug, error, info};

use indexer_tap_agent::{agent, CONFIG};
use indexer_tap_agent::{agent, metrics, CONFIG};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -17,6 +17,9 @@ async fn main() -> Result<()> {
let (manager, handler) = agent::start_agent().await;
info!("TAP Agent started.");

tokio::spawn(metrics::run_server(CONFIG.tap.metrics_port));
info!("Metrics port opened");

// Have tokio wait for SIGTERM or SIGINT.
let mut signal_sigint = signal(SignalKind::interrupt())?;
let mut signal_sigterm = signal(SignalKind::terminate())?;
Expand Down
59 changes: 59 additions & 0 deletions tap-agent/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{net::SocketAddr, panic};

use axum::{http::StatusCode, response::IntoResponse, routing::get, Router, Server};
use futures_util::FutureExt;
use jsonrpsee::tracing::error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I can't remember why I used jsonrpsee::tracing::error here https://github.com/semiotic-ai/timeline-aggregation-protocol/blob/c179dfedee1ed8078b358de3d35f4d051f74c9c1/tap_aggregator/src/metrics.rs#L8
But in any case, it should look less weird with tracing::error instead.

use log::{debug, info};
use prometheus::TextEncoder;

async fn handler_metrics() -> (StatusCode, String) {
let metric_families = prometheus::gather();
let encoder = TextEncoder::new();

match encoder.encode_to_string(&metric_families) {
Ok(s) => (StatusCode::OK, s),
Err(e) => {
error!("Error encoding metrics: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error encoding metrics: {}", e),
)
}
}
}

async fn handler_404() -> impl IntoResponse {
(StatusCode::NOT_FOUND, "404 Not Found")
}

async fn _run_server(port: u16) {
let app = Router::new()
.route("/metrics", get(handler_metrics))
.fallback(handler_404);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let server = Server::bind(&addr).serve(app.into_make_service());

info!("Metrics server listening on {}", addr);

let res = server.await;

debug!("Metrics server stopped");

if let Err(err) = res {
panic!("Metrics server error: {:#?}", err);
};
}

pub async fn run_server(port: u16) {
// Code here is to abort program if there is a panic in _run_server
// Otherwise, when spawning the task, the panic will be silently ignored
let res = panic::AssertUnwindSafe(_run_server(port))
.catch_unwind()
.await;
if res.is_err() {
std::process::abort();
}
}