Skip to content

Commit

Permalink
ipfs metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed May 3, 2024
1 parent ec176b4 commit b34ef1d
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 14 deletions.
51 changes: 45 additions & 6 deletions graph/src/components/link_resolver/ipfs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::data::value::Word;
use crate::endpoint::{EndpointMetrics, RequestLabels};
use crate::env::EnvVars;
use crate::futures01::{stream::poll_fn, try_ready};
use crate::futures01::{Async, Poll};
Expand Down Expand Up @@ -106,10 +108,15 @@ pub struct IpfsResolver {
timeout: Duration,
retry: bool,
env_vars: Arc<EnvVars>,
endpoint_metrics: Arc<EndpointMetrics>,
}

impl IpfsResolver {
pub fn new(clients: Vec<IpfsClient>, env_vars: Arc<EnvVars>) -> Self {
pub fn new(
clients: Vec<IpfsClient>,
env_vars: Arc<EnvVars>,
endpoint_metrics: Arc<EndpointMetrics>,
) -> Self {
Self {
clients: Arc::new(clients.into_iter().collect()),
cache: Arc::new(Mutex::new(LruCache::with_capacity(
Expand All @@ -118,6 +125,7 @@ impl IpfsResolver {
timeout: env_vars.mappings.ipfs_timeout,
retry: false,
env_vars,
endpoint_metrics,
}
}
}
Expand Down Expand Up @@ -171,15 +179,38 @@ impl LinkResolverTrait for IpfsResolver {

let req_path = path.clone();
let timeout = self.timeout;
let metrics = self.endpoint_metrics.cheap_clone();
let data = retry_policy(self.retry, "ipfs.cat", logger)
.run(move || {
let path = req_path.clone();
let client = client.clone();
let metrics = metrics.cheap_clone();
async move {
Ok(client
let provider = client.base.to_string().into();
let req_type = Word::from("cat");

match client
.cat_all(&path, Some(timeout), max_file_size)
.await?
.to_vec())
.await
.map(|b| b.to_vec())
{
Ok(res) => {
metrics.success(&RequestLabels {
provider,
req_type,
conn_type: crate::endpoint::ConnectionType::Ipfs,
});
Ok(res)
}
Err(err) => {
metrics.failure(&RequestLabels {
provider,
req_type,
conn_type: crate::endpoint::ConnectionType::Ipfs,
});
return Err(err);
}
}
}
})
.await?;
Expand Down Expand Up @@ -335,7 +366,11 @@ mod tests {

let file: &[u8] = &[0u8; 201];
let client = IpfsClient::localhost();
let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars));
let resolver = super::IpfsResolver::new(
vec![client.clone()],
Arc::new(env_vars),
Arc::new(EndpointMetrics::mock()),
);

let logger = Logger::root(slog::Discard, o!());

Expand All @@ -354,7 +389,11 @@ mod tests {

async fn json_round_trip(text: &'static str, env_vars: EnvVars) -> Result<Vec<Value>, Error> {
let client = IpfsClient::localhost();
let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars));
let resolver = super::IpfsResolver::new(
vec![client.clone()],
Arc::new(env_vars),
Arc::new(EndpointMetrics::mock()),
);

let logger = Logger::root(slog::Discard, o!());
let link = client.add(text.as_bytes().into()).await.unwrap().hash;
Expand Down
2 changes: 2 additions & 0 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum ConnectionType {
Firehose,
Substreams,
Rpc,
Ipfs,
}

impl Into<&str> for &ConnectionType {
Expand All @@ -43,6 +44,7 @@ impl Into<&str> for &ConnectionType {
ConnectionType::Firehose => "firehose",
ConnectionType::Substreams => "substreams",
ConnectionType::Rpc => "rpc",
ConnectionType::Ipfs => "ipfs",
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/ipfs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct AddResponse {
/// Reference type, clones will share the connection pool.
#[derive(Clone, CheapClone)]
pub struct IpfsClient {
base: Arc<Uri>,
pub base: Arc<Uri>,
// reqwest::Client doesn't need to be `Arc` because it has one internally
// already.
client: reqwest::Client,
Expand Down
22 changes: 16 additions & 6 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,28 @@ async fn main() {
n => FileSizeLimit::MaxBytes(n as u64),
},
);

// Convert the clients into a link resolver. Since we want to get past
// possible temporary DNS failures, make the resolver retry
let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone()));
let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone());
let providers: Vec<_> = config
.chains
.providers()
.into_iter()
.chain(ipfs_clients.iter().map(|c| c.base.to_string()))
.collect();

let endpoint_metrics = Arc::new(EndpointMetrics::new(
logger.clone(),
&config.chains.providers(),
&providers,
metrics_registry.cheap_clone(),
));

// Convert the clients into a link resolver. Since we want to get past
// possible temporary DNS failures, make the resolver retry
let link_resolver = Arc::new(IpfsResolver::new(
ipfs_clients,
env_vars.cheap_clone(),
endpoint_metrics.cheap_clone(),
));
let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone());

// Ethereum clients; query nodes ignore all ethereum clients and never
// connect to them directly
let eth_networks = if query_only {
Expand Down
6 changes: 5 additions & 1 deletion node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ pub async fn run(

// Convert the clients into a link resolver. Since we want to get past
// possible temporary DNS failures, make the resolver retry
let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone()));
let link_resolver = Arc::new(IpfsResolver::new(
ipfs_clients,
env_vars.cheap_clone(),
endpoint_metrics.cheap_clone(),
));

let eth_rpc_metrics = Arc::new(ProviderEthRpcMetrics::new(metrics_registry.clone()));
let eth_networks = create_ethereum_networks_for_chain(
Expand Down
2 changes: 2 additions & 0 deletions runtime/test/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use graph::blockchain::BlockTime;
use graph::components::store::DeploymentLocator;
use graph::data::subgraph::*;
use graph::data_source;
use graph::endpoint::EndpointMetrics;
use graph::env::EnvVars;
use graph::ipfs_client::IpfsClient;
use graph::log;
Expand Down Expand Up @@ -71,6 +72,7 @@ fn mock_host_exports(
Arc::new(graph::prelude::IpfsResolver::new(
vec![IpfsClient::localhost()],
Arc::new(EnvVars::default()),
Arc::new(EndpointMetrics::mock()),
)),
ens_lookup,
)
Expand Down
1 change: 1 addition & 0 deletions tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ pub async fn setup<C: Blockchain>(
let link_resolver = Arc::new(IpfsResolver::new(
vec![ipfs.cheap_clone()],
Default::default(),
Arc::new(EndpointMetrics::mock()),
));
let ipfs_service = ipfs_service(
ipfs.cheap_clone(),
Expand Down

0 comments on commit b34ef1d

Please sign in to comment.