From b34ef1dc139b0a0cc9bd6636ebf1b01a367709ea Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Fri, 3 May 2024 14:23:58 +0100 Subject: [PATCH] ipfs metrics --- graph/src/components/link_resolver/ipfs.rs | 51 +++++++++++++++++++--- graph/src/endpoint.rs | 2 + graph/src/ipfs_client.rs | 2 +- node/src/main.rs | 22 +++++++--- node/src/manager/commands/run.rs | 6 ++- runtime/test/src/common.rs | 2 + tests/src/fixture/mod.rs | 1 + 7 files changed, 72 insertions(+), 14 deletions(-) diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 627c9a95412..8808428cc66 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -1,6 +1,8 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; +use crate::data::value::Word; +use crate::endpoint::{EndpointMetrics, RequestLabels}; use crate::env::EnvVars; use crate::futures01::{stream::poll_fn, try_ready}; use crate::futures01::{Async, Poll}; @@ -106,10 +108,15 @@ pub struct IpfsResolver { timeout: Duration, retry: bool, env_vars: Arc, + endpoint_metrics: Arc, } impl IpfsResolver { - pub fn new(clients: Vec, env_vars: Arc) -> Self { + pub fn new( + clients: Vec, + env_vars: Arc, + endpoint_metrics: Arc, + ) -> Self { Self { clients: Arc::new(clients.into_iter().collect()), cache: Arc::new(Mutex::new(LruCache::with_capacity( @@ -118,6 +125,7 @@ impl IpfsResolver { timeout: env_vars.mappings.ipfs_timeout, retry: false, env_vars, + endpoint_metrics, } } } @@ -171,15 +179,38 @@ impl LinkResolverTrait for IpfsResolver { let req_path = path.clone(); let timeout = self.timeout; + let metrics = self.endpoint_metrics.cheap_clone(); let data = retry_policy(self.retry, "ipfs.cat", logger) .run(move || { let path = req_path.clone(); let client = client.clone(); + let metrics = metrics.cheap_clone(); async move { - Ok(client + let provider = client.base.to_string().into(); + let req_type = Word::from("cat"); + + match client .cat_all(&path, Some(timeout), max_file_size) - .await? - .to_vec()) + .await + .map(|b| b.to_vec()) + { + Ok(res) => { + metrics.success(&RequestLabels { + provider, + req_type, + conn_type: crate::endpoint::ConnectionType::Ipfs, + }); + Ok(res) + } + Err(err) => { + metrics.failure(&RequestLabels { + provider, + req_type, + conn_type: crate::endpoint::ConnectionType::Ipfs, + }); + return Err(err); + } + } } }) .await?; @@ -335,7 +366,11 @@ mod tests { let file: &[u8] = &[0u8; 201]; let client = IpfsClient::localhost(); - let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars)); + let resolver = super::IpfsResolver::new( + vec![client.clone()], + Arc::new(env_vars), + Arc::new(EndpointMetrics::mock()), + ); let logger = Logger::root(slog::Discard, o!()); @@ -354,7 +389,11 @@ mod tests { async fn json_round_trip(text: &'static str, env_vars: EnvVars) -> Result, Error> { let client = IpfsClient::localhost(); - let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars)); + let resolver = super::IpfsResolver::new( + vec![client.clone()], + Arc::new(env_vars), + Arc::new(EndpointMetrics::mock()), + ); let logger = Logger::root(slog::Discard, o!()); let link = client.add(text.as_bytes().into()).await.unwrap().hash; diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index bff6b0c53f9..a40dcbd5382 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -35,6 +35,7 @@ pub enum ConnectionType { Firehose, Substreams, Rpc, + Ipfs, } impl Into<&str> for &ConnectionType { @@ -43,6 +44,7 @@ impl Into<&str> for &ConnectionType { ConnectionType::Firehose => "firehose", ConnectionType::Substreams => "substreams", ConnectionType::Rpc => "rpc", + ConnectionType::Ipfs => "ipfs", } } } diff --git a/graph/src/ipfs_client.rs b/graph/src/ipfs_client.rs index 07221e6dd6e..1d1e3fca64d 100644 --- a/graph/src/ipfs_client.rs +++ b/graph/src/ipfs_client.rs @@ -117,7 +117,7 @@ pub struct AddResponse { /// Reference type, clones will share the connection pool. #[derive(Clone, CheapClone)] pub struct IpfsClient { - base: Arc, + pub base: Arc, // reqwest::Client doesn't need to be `Arc` because it has one internally // already. client: reqwest::Client, diff --git a/node/src/main.rs b/node/src/main.rs index 28a637ea4c1..aa492cbcf00 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -262,18 +262,28 @@ async fn main() { n => FileSizeLimit::MaxBytes(n as u64), }, ); - - // Convert the clients into a link resolver. Since we want to get past - // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone())); - let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); + let providers: Vec<_> = config + .chains + .providers() + .into_iter() + .chain(ipfs_clients.iter().map(|c| c.base.to_string())) + .collect(); let endpoint_metrics = Arc::new(EndpointMetrics::new( logger.clone(), - &config.chains.providers(), + &providers, metrics_registry.cheap_clone(), )); + // Convert the clients into a link resolver. Since we want to get past + // possible temporary DNS failures, make the resolver retry + let link_resolver = Arc::new(IpfsResolver::new( + ipfs_clients, + env_vars.cheap_clone(), + endpoint_metrics.cheap_clone(), + )); + let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); + // Ethereum clients; query nodes ignore all ethereum clients and never // connect to them directly let eth_networks = if query_only { diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 639b5c0e3d9..6b483b87140 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -98,7 +98,11 @@ pub async fn run( // Convert the clients into a link resolver. Since we want to get past // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone())); + let link_resolver = Arc::new(IpfsResolver::new( + ipfs_clients, + env_vars.cheap_clone(), + endpoint_metrics.cheap_clone(), + )); let eth_rpc_metrics = Arc::new(ProviderEthRpcMetrics::new(metrics_registry.clone())); let eth_networks = create_ethereum_networks_for_chain( diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 46a17f54f22..27d0c85dc77 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -3,6 +3,7 @@ use graph::blockchain::BlockTime; use graph::components::store::DeploymentLocator; use graph::data::subgraph::*; use graph::data_source; +use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; use graph::ipfs_client::IpfsClient; use graph::log; @@ -71,6 +72,7 @@ fn mock_host_exports( Arc::new(graph::prelude::IpfsResolver::new( vec![IpfsClient::localhost()], Arc::new(EnvVars::default()), + Arc::new(EndpointMetrics::mock()), )), ens_lookup, ) diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c24f688f0f7..547962d9a56 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -460,6 +460,7 @@ pub async fn setup( let link_resolver = Arc::new(IpfsResolver::new( vec![ipfs.cheap_clone()], Default::default(), + Arc::new(EndpointMetrics::mock()), )); let ipfs_service = ipfs_service( ipfs.cheap_clone(),