Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit
TIPS_INGRESS_LOG_LEVEL=info
TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800
TIPS_INGRESS_RPC_SIMULATION=http://localhost:8549
TIPS_INGRESS_METRICS_ADDR=0.0.0.0:9002

# Audit service configuration
TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties
Expand Down
79 changes: 79 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ backon = "1.5.2"
op-revm = { version = "12.0.0", default-features = false }
revm-context-interface = "10.2.0"
alloy-signer-local = "1.0.36"

# Misc
metrics = "0.24.1"
metrics-derive = "0.1"
metrics-exporter-prometheus = { version = "0.17.0", features = ["http-listener"]}
3 changes: 3 additions & 0 deletions crates/ingress-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ op-revm.workspace = true
revm-context-interface.workspace = true
alloy-signer-local.workspace = true
reth-optimism-evm.workspace = true
metrics.workspace = true
metrics-derive.workspace = true
metrics-exporter-prometheus.workspace = true
16 changes: 14 additions & 2 deletions crates/ingress-rpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use jsonrpsee::server::Server;
use op_alloy_network::Optimism;
use rdkafka::ClientConfig;
use rdkafka::producer::FutureProducer;
use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};
use tips_audit::KafkaBundleEventPublisher;
use tips_core::kafka::load_kafka_config_from_file;
use tips_core::logger::init_logger;
use tips_ingress_rpc::metrics::init_prometheus_exporter;
use tips_ingress_rpc::queue::KafkaQueuePublisher;
use tips_ingress_rpc::service::{IngressApiServer, IngressService};
use tracing::info;
Expand Down Expand Up @@ -70,6 +71,14 @@ struct Config {
/// URL of the simulation RPC service for bundle metering
#[arg(long, env = "TIPS_INGRESS_RPC_SIMULATION")]
simulation_rpc: Url,

/// Port to bind the Prometheus metrics server to
#[arg(
long,
env = "TIPS_INGRESS_METRICS_ADDR",
default_value = "0.0.0.0:9002"
)]
metrics_addr: SocketAddr,
}

#[tokio::main]
Expand All @@ -80,12 +89,15 @@ async fn main() -> anyhow::Result<()> {

init_logger(&config.log_level);

init_prometheus_exporter(config.metrics_addr).expect("Failed to install Prometheus exporter");

info!(
message = "Starting ingress service",
address = %config.address,
port = config.port,
mempool_url = %config.mempool_url,
simulation_rpc = %config.simulation_rpc
simulation_rpc = %config.simulation_rpc,
metrics_address = %config.metrics_addr,
);

let provider: RootProvider<Optimism> = ProviderBuilder::new()
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod metrics;
pub mod queue;
pub mod service;
pub mod validation;
40 changes: 40 additions & 0 deletions crates/ingress-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use metrics::Histogram;
use metrics_derive::Metrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::net::SocketAddr;
use tokio::time::Duration;

/// `record_histogram` lets us record with tags.
pub fn record_histogram(rpc_latency: Duration, rpc: String) {
metrics::histogram!("tips_ingress_rpc_rpc_latency", "rpc" => rpc)
.record(rpc_latency.as_secs_f64());
}

/// Metrics for the `tips_ingress_rpc` component.
/// Conventions:
/// - Durations are recorded in seconds (histograms).
/// - Counters are monotonic event counts.
/// - Gauges reflect the current value/state.
#[derive(Metrics, Clone)]
#[metrics(scope = "tips_ingress_rpc")]
pub struct Metrics {
#[metric(describe = "Duration of validate_tx")]
pub validate_tx_duration: Histogram,

#[metric(describe = "Duration of validate_bundle")]
pub validate_bundle_duration: Histogram,

#[metric(describe = "Duration of meter_bundle")]
pub meter_bundle_duration: Histogram,

#[metric(describe = "Duration of send_raw_transaction")]
pub send_raw_transaction_duration: Histogram,
}

/// Initialize Prometheus metrics exporter
pub fn init_prometheus_exporter(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
PrometheusBuilder::new()
.with_http_listener(addr)
.install()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
}
18 changes: 18 additions & 0 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use tips_core::{
AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, CancelBundle,
MeterBundleResponse,
};
use tokio::time::Instant;
use tracing::{info, warn};

use crate::metrics::{Metrics, record_histogram};
use crate::queue::QueuePublisher;
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_bundle, validate_tx};

Expand All @@ -43,6 +45,7 @@ pub struct IngressService<Queue, Audit> {
bundle_queue: Queue,
audit_publisher: Audit,
send_transaction_default_lifetime_seconds: u64,
metrics: Metrics,
}

impl<Queue, Audit> IngressService<Queue, Audit> {
Expand All @@ -61,6 +64,7 @@ impl<Queue, Audit> IngressService<Queue, Audit> {
bundle_queue: queue,
audit_publisher,
send_transaction_default_lifetime_seconds,
metrics: Metrics::default(),
}
}
}
Expand Down Expand Up @@ -116,6 +120,7 @@ where
}

async fn send_raw_transaction(&self, data: Bytes) -> RpcResult<B256> {
let start = Instant::now();
let transaction = self.validate_tx(&data).await?;

let expiry_timestamp = SystemTime::now()
Expand Down Expand Up @@ -175,6 +180,9 @@ where
warn!(message = "Failed to publish audit event", bundle_id = %accepted_bundle.uuid(), error = %e);
}

self.metrics
.send_raw_transaction_duration
.record(start.elapsed().as_secs_f64());
Ok(transaction.tx_hash())
}
}
Expand All @@ -185,6 +193,7 @@ where
Audit: BundleEventPublisher + Sync + Send + 'static,
{
async fn validate_tx(&self, data: &Bytes) -> RpcResult<Recovered<OpTxEnvelope>> {
let start = Instant::now();
if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData.into_rpc_err());
}
Expand All @@ -204,10 +213,14 @@ where
.await?;
validate_tx(account, &transaction, data, &mut l1_block_info).await?;

self.metrics
.validate_tx_duration
.record(start.elapsed().as_secs_f64());
Ok(transaction)
}

async fn validate_bundle(&self, bundle: &Bundle) -> RpcResult<()> {
let start = Instant::now();
if bundle.txs.is_empty() {
return Err(
EthApiError::InvalidParams("Bundle cannot have empty transactions".into())
Expand All @@ -224,19 +237,24 @@ where
}
validate_bundle(bundle, total_gas, tx_hashes)?;

self.metrics
.validate_bundle_duration
.record(start.elapsed().as_secs_f64());
Ok(())
}

/// `meter_bundle` is used to determine how long a bundle will take to execute. A bundle that
/// is within `BLOCK_TIME` will return the `MeterBundleResponse` that can be passed along
/// to the builder.
async fn meter_bundle(&self, bundle: &Bundle) -> RpcResult<MeterBundleResponse> {
let start = Instant::now();
let res: MeterBundleResponse = self
.simulation_provider
.client()
.request("base_meterBundle", (bundle,))
.await
.map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?;
record_histogram(start.elapsed(), "base_meterBundle".to_string());

// we can save some builder payload building computation by not including bundles
// that we know will take longer than the block time to execute
Expand Down
8 changes: 8 additions & 0 deletions crates/ingress-rpc/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError};
use std::collections::HashSet;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tips_core::Bundle;
use tokio::time::Instant;
use tracing::warn;

use crate::metrics::record_histogram;

const MAX_BUNDLE_GAS: u64 = 25_000_000;

/// Account info for a given address
Expand All @@ -33,10 +36,13 @@ pub trait AccountInfoLookup: Send + Sync {
#[async_trait]
impl AccountInfoLookup for RootProvider<Optimism> {
async fn fetch_account_info(&self, address: Address) -> RpcResult<AccountInfo> {
let start = Instant::now();
let account = self
.get_account(address)
.await
.map_err(|_| EthApiError::Signing(SignError::NoAccount))?;
record_histogram(start.elapsed(), "eth_getAccount".to_string());

Ok(AccountInfo {
balance: account.balance,
nonce: account.nonce,
Expand All @@ -55,6 +61,7 @@ pub trait L1BlockInfoLookup: Send + Sync {
#[async_trait]
impl L1BlockInfoLookup for RootProvider<Optimism> {
async fn fetch_l1_block_info(&self) -> RpcResult<L1BlockInfo> {
let start = Instant::now();
let block = self
.get_block(BlockId::Number(BlockNumberOrTag::Latest))
.full()
Expand All @@ -67,6 +74,7 @@ impl L1BlockInfoLookup for RootProvider<Optimism> {
warn!(message = "empty latest block returned");
EthApiError::InternalEthError.into_rpc_err()
})?;
record_histogram(start.elapsed(), "eth_getBlockByNumber".to_string());

let txs = block.transactions.clone();
let first_tx = txs.first_transaction().ok_or_else(|| {
Expand Down
1 change: 1 addition & 0 deletions docker-compose.tips.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
container_name: tips-ingress-rpc
ports:
- "8080:8080"
- "9002:9002"
env_file:
- .env.docker
volumes:
Expand Down