diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 7cfb1153434..68a6f2371b9 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -1,10 +1,12 @@ use anyhow::{anyhow, Error}; use anyhow::{ensure, Context}; use graph::blockchain::{BlockPtr, TriggerWithHandler}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::metrics::subgraph::SubgraphInstanceMetrics; use graph::components::store::{EthereumCallCache, StoredDynamicDataSource}; use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError}; use graph::components::trigger_processor::RunnableTriggers; +use graph::data::subgraph::DeploymentHash; use graph::data_source::common::{ AbiJson, CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedCallDecls, UnresolvedMappingABI, @@ -1198,6 +1200,7 @@ pub struct UnresolvedDataSource { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -1212,7 +1215,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { context, } = self; - let mapping = mapping.resolve(resolver, logger, spec_version).await.with_context(|| { + let mapping = mapping.resolve(deployment_hash, resolver, logger, spec_version).await.with_context(|| { format!( "failed to resolve data source {} with source_address {:?} and source_start_block {}", name, source.address, source.start_block @@ -1246,6 +1249,7 @@ pub struct DataSourceTemplate { impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -1260,7 +1264,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem } = self; let mapping = mapping - .resolve(resolver, logger, spec_version) + .resolve(deployment_hash, resolver, logger, spec_version) .await .with_context(|| format!("failed to resolve data source template {}", name))?; @@ -1358,6 +1362,7 @@ impl FindMappingABI for Mapping { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, spec_version: &semver::Version, @@ -1380,12 +1385,18 @@ impl UnresolvedMapping { // resolve each abi abis.into_iter() .map(|unresolved_abi| async { - Result::<_, Error>::Ok(unresolved_abi.resolve(resolver, logger).await?) + Result::<_, Error>::Ok( + unresolved_abi + .resolve(deployment_hash, resolver, logger) + .await?, + ) }) .collect::>() .try_collect::>(), async { - let module_bytes = resolver.cat(logger, &link).await?; + let module_bytes = resolver + .cat(&LinkResolverContext::new(deployment_hash, logger), &link) + .await?; Ok(Arc::new(module_bytes)) }, ) diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index 46d84dc1e3c..6eac3e2d92d 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -1,8 +1,9 @@ use graph::anyhow::Context; use graph::blockchain::{Block, TriggerWithHandler}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::StoredDynamicDataSource; use graph::components::subgraph::InstanceDSTemplateInfo; -use graph::data::subgraph::DataSourceContext; +use graph::data::subgraph::{DataSourceContext, DeploymentHash}; use graph::prelude::SubgraphManifestValidationError; use graph::{ anyhow::{anyhow, Error}, @@ -330,6 +331,7 @@ pub struct UnresolvedDataSource { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, @@ -344,7 +346,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { context, } = self; - let mapping = mapping.resolve(resolver, logger).await.with_context(|| { + let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| { format!( "failed to resolve data source {} with source_account {:?} and source_start_block {}", name, source.account, source.start_block @@ -370,6 +372,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate; impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, @@ -383,7 +386,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem } = self; let mapping = mapping - .resolve(resolver, logger) + .resolve(deployment_hash, resolver, logger) .await .with_context(|| format!("failed to resolve data source template {}", name))?; @@ -434,6 +437,7 @@ pub struct UnresolvedMapping { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { @@ -449,7 +453,7 @@ impl UnresolvedMapping { let api_version = semver::Version::parse(&api_version)?; let module_bytes = resolver - .cat(logger, &link) + .cat(&LinkResolverContext::new(deployment_hash, logger), &link) .await .with_context(|| format!("failed to resolve mapping {}", link.link))?; diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index 4895d76a8c6..a85f9a8d6cf 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error}; use graph::{ blockchain, cheap_clone::CheapClone, - components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo}, + components::{ + link_resolver::{LinkResolver, LinkResolverContext}, + subgraph::InstanceDSTemplateInfo, + }, + data::subgraph::DeploymentHash, prelude::{async_trait, BlockNumber, Link}, slog::Logger, }; @@ -184,12 +188,18 @@ pub struct UnresolvedMapping { impl blockchain::UnresolvedDataSource for UnresolvedDataSource { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, _manifest_idx: u32, _spec_version: &semver::Version, ) -> Result { - let content = resolver.cat(logger, &self.source.package.file).await?; + let content = resolver + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.source.package.file, + ) + .await?; let mut package = graph::substreams::Package::decode(content.as_ref())?; @@ -235,7 +245,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { let handler = match (self.mapping.handler, self.mapping.file) { (Some(handler), Some(file)) => { let module_bytes = resolver - .cat(logger, &file) + .cat(&LinkResolverContext::new(deployment_hash, logger), &file) .await .with_context(|| format!("failed to resolve mapping {}", file.link))?; @@ -315,6 +325,7 @@ impl blockchain::DataSourceTemplate for NoopDataSourceTemplate { impl blockchain::UnresolvedDataSourceTemplate for NoopDataSourceTemplate { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &Logger, _manifest_idx: u32, @@ -331,8 +342,8 @@ mod test { use anyhow::Error; use graph::{ blockchain::{DataSource as _, UnresolvedDataSource as _}, - components::link_resolver::LinkResolver, - data::subgraph::{LATEST_VERSION, SPEC_VERSION_1_2_0}, + components::link_resolver::{LinkResolver, LinkResolverContext}, + data::subgraph::{DeploymentHash, LATEST_VERSION, SPEC_VERSION_1_2_0}, prelude::{async_trait, serde_yaml, JsonValueStream, Link}, slog::{o, Discard, Logger}, substreams::{ @@ -436,7 +447,13 @@ mod test { let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); let ds: DataSource = ds - .resolve(&link_resolver, &logger, 0, &SPEC_VERSION_1_2_0) + .resolve( + &DeploymentHash::default(), + &link_resolver, + &logger, + 0, + &SPEC_VERSION_1_2_0, + ) .await .unwrap(); let expected = DataSource { @@ -476,7 +493,13 @@ mod test { let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); let ds: DataSource = ds - .resolve(&link_resolver, &logger, 0, &SPEC_VERSION_1_2_0) + .resolve( + &DeploymentHash::default(), + &link_resolver, + &logger, + 0, + &SPEC_VERSION_1_2_0, + ) .await .unwrap(); let expected = DataSource { @@ -717,17 +740,21 @@ mod test { unimplemented!() } - async fn cat(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + async fn cat(&self, _ctx: &LinkResolverContext, _link: &Link) -> Result, Error> { Ok(gen_package().encode_to_vec()) } - async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + async fn get_block( + &self, + _ctx: &LinkResolverContext, + _link: &Link, + ) -> Result, Error> { unimplemented!() } async fn json_stream( &self, - _logger: &Logger, + _ctx: &LinkResolverContext, _link: &Link, ) -> Result { unimplemented!() diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index 86a5feef0ab..b02578c0ed5 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -5,13 +5,17 @@ use anyhow::anyhow; use anyhow::Error; use bytes::Bytes; use graph::futures03::future::BoxFuture; -use graph::ipfs::ContentPath; -use graph::ipfs::IpfsClient; -use graph::ipfs::RetryPolicy; +use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy}; use graph::{derive::CheapClone, prelude::CheapClone}; use tower::{buffer::Buffer, ServiceBuilder, ServiceExt}; -pub type IpfsService = Buffer, Error>>>; +pub type IpfsService = Buffer, Error>>>; + +#[derive(Debug, Clone, CheapClone)] +pub struct IpfsRequest { + pub ctx: IpfsContext, + pub path: ContentPath, +} pub fn ipfs_service( client: Arc, @@ -43,7 +47,10 @@ struct IpfsServiceInner { } impl IpfsServiceInner { - async fn call_inner(self, path: ContentPath) -> Result, Error> { + async fn call_inner( + self, + IpfsRequest { ctx, path }: IpfsRequest, + ) -> Result, Error> { let multihash = path.cid().hash().code(); if !SAFE_MULTIHASHES.contains(&multihash) { return Err(anyhow!("CID multihash {} is not allowed", multihash)); @@ -52,6 +59,7 @@ impl IpfsServiceInner { let res = self .client .cat( + &ctx, &path, self.max_file_size, Some(self.timeout), @@ -99,8 +107,7 @@ mod test { use graph::components::link_resolver::ArweaveResolver; use graph::data::value::Word; use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; - use graph::ipfs::IpfsRpcClient; - use graph::ipfs::ServerAddress; + use graph::ipfs::{IpfsContext, IpfsMetrics, IpfsRpcClient, ServerAddress}; use graph::log::discard; use graph::tokio; use tower::ServiceExt; @@ -126,14 +133,24 @@ mod test { let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash; - let client = - IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard()) - .unwrap(); + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + IpfsMetrics::test(), + &graph::log::discard(), + ) + .unwrap(); let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10); let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap(); - let content = svc.oneshot(path).await.unwrap().unwrap(); + let content = svc + .oneshot(IpfsRequest { + ctx: IpfsContext::test(), + path, + }) + .await + .unwrap() + .unwrap(); assert_eq!(content.to_vec(), random_bytes); } @@ -157,7 +174,8 @@ mod test { const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; let server = MockServer::start().await; - let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + let ipfs_client = + IpfsRpcClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()).unwrap(); let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1); let path = ContentPath::new(CID).unwrap(); @@ -179,6 +197,12 @@ mod test { .await; // This means that we never reached the successful response. - ipfs_service.oneshot(path).await.unwrap_err(); + ipfs_service + .oneshot(IpfsRequest { + ctx: IpfsContext::test(), + path, + }) + .await + .unwrap_err(); } } diff --git a/core/src/polling_monitor/mod.rs b/core/src/polling_monitor/mod.rs index ffa36f63b09..7bf4726e7c3 100644 --- a/core/src/polling_monitor/mod.rs +++ b/core/src/polling_monitor/mod.rs @@ -1,6 +1,7 @@ mod arweave_service; mod ipfs_service; mod metrics; +mod request; use std::collections::HashMap; use std::fmt::Display; @@ -24,9 +25,11 @@ use tower::retry::backoff::{Backoff, ExponentialBackoff, ExponentialBackoffMaker use tower::util::rng::HasherRng; use tower::{Service, ServiceExt}; +use self::request::RequestId; + pub use self::metrics::PollingMonitorMetrics; pub use arweave_service::{arweave_service, ArweaveService}; -pub use ipfs_service::{ipfs_service, IpfsService}; +pub use ipfs_service::{ipfs_service, IpfsRequest, IpfsService}; const MIN_BACKOFF: Duration = Duration::from_secs(5); @@ -97,15 +100,15 @@ impl Queue { /// /// The service returns the request ID along with errors or responses. The response is an /// `Option`, to represent the object not being found. -pub fn spawn_monitor( +pub fn spawn_monitor( service: S, - response_sender: mpsc::UnboundedSender<(ID, Res)>, + response_sender: mpsc::UnboundedSender<(Req, Res)>, logger: Logger, metrics: Arc, -) -> PollingMonitor +) -> PollingMonitor where - S: Service, Error = E> + Send + 'static, - ID: Display + Clone + Default + Eq + Send + Sync + Hash + 'static, + S: Service, Error = E> + Send + 'static, + Req: RequestId + Clone + Send + Sync + 'static, E: Display + Send + 'static, S::Future: Send, { @@ -125,9 +128,9 @@ where break None; } - let id = queue.pop_front(); - match id { - Some(id) => break Some((id, ())), + let req = queue.pop_front(); + match req { + Some(req) => break Some((req, ())), // Nothing on the queue, wait for a queue wake up or cancellation. None => { @@ -154,36 +157,39 @@ where // the `CallAll` from being polled. This can cause starvation as those requests may // be holding on to resources such as slots for concurrent calls. match response { - Ok((id, Some(response))) => { - backoffs.remove(&id); - let send_result = response_sender.send((id, response)); + Ok((req, Some(response))) => { + backoffs.remove(req.request_id()); + let send_result = response_sender.send((req, response)); if send_result.is_err() { // The receiver has been dropped, cancel this task. break; } } - // Object not found, push the id to the back of the queue. - Ok((id, None)) => { - debug!(logger, "not found on polling"; "object_id" => id.to_string()); - + // Object not found, push the request to the back of the queue. + Ok((req, None)) => { + debug!(logger, "not found on polling"; "object_id" => req.request_id().to_string()); metrics.not_found.inc(); // We'll try again after a backoff. - backoff(id, &queue, &mut backoffs); + backoff(req, &queue, &mut backoffs); } - // Error polling, log it and push the id to the back of the queue. - Err((id, e)) => { - debug!(logger, "error polling"; - "error" => format!("{:#}", e), - "object_id" => id.to_string()); + // Error polling, log it and push the request to the back of the queue. + Err((Some(req), e)) => { + debug!(logger, "error polling"; "error" => format!("{:#}", e), "object_id" => req.request_id().to_string()); metrics.errors.inc(); // Requests that return errors could mean there is a permanent issue with // fetching the given item, or could signal the endpoint is overloaded. // Either way a backoff makes sense. - backoff(id, &queue, &mut backoffs); + backoff(req, &queue, &mut backoffs); + } + + // poll_ready call failure + Err((None, e)) => { + debug!(logger, "error polling"; "error" => format!("{:#}", e)); + metrics.errors.inc(); } } } @@ -193,28 +199,28 @@ where PollingMonitor { queue } } -fn backoff(id: ID, queue: &Arc>, backoffs: &mut Backoffs) +fn backoff(req: Req, queue: &Arc>, backoffs: &mut Backoffs) where - ID: Eq + Hash + Clone + Send + 'static, + Req: RequestId + Send + Sync + 'static, { let queue = queue.cheap_clone(); - let backoff = backoffs.next_backoff(id.clone()); + let backoff = backoffs.next_backoff(req.request_id().clone()); graph::spawn(async move { backoff.await; - queue.push_back(id); + queue.push_back(req); }); } /// Handle for adding objects to be monitored. -pub struct PollingMonitor { - queue: Arc>, +pub struct PollingMonitor { + queue: Arc>, } -impl PollingMonitor { - /// Add an object id to the polling queue. New requests have priority and are pushed to the +impl PollingMonitor { + /// Add a request to the polling queue. New requests have priority and are pushed to the /// front of the queue. - pub fn monitor(&self, id: ID) { - self.queue.push_front(id); + pub fn monitor(&self, req: Req) { + self.queue.push_front(req); } } @@ -225,17 +231,16 @@ struct ReturnRequest { impl Service for ReturnRequest where S: Service, - Req: Clone + Default + Send + Sync + 'static, + Req: Clone + Send + Sync + 'static, S::Error: Send, S::Future: Send + 'static, { type Response = (Req, S::Response); - type Error = (Req, S::Error); + type Error = (Option, S::Error); type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - // `Req::default` is a value that won't be used since if `poll_ready` errors, the service is shot anyways. - self.service.poll_ready(cx).map_err(|e| (Req::default(), e)) + self.service.poll_ready(cx).map_err(|e| (None, e)) } fn call(&mut self, req: Req) -> Self::Future { @@ -243,7 +248,7 @@ where self.service .call(req.clone()) .map_ok(move |x| (req, x)) - .map_err(move |e| (req1, e)) + .map_err(move |e| (Some(req1), e)) .boxed() } } diff --git a/core/src/polling_monitor/request.rs b/core/src/polling_monitor/request.rs new file mode 100644 index 00000000000..42375fb38fb --- /dev/null +++ b/core/src/polling_monitor/request.rs @@ -0,0 +1,39 @@ +use std::fmt::Display; +use std::hash::Hash; + +use graph::{data_source::offchain::Base64, ipfs::ContentPath}; + +use crate::polling_monitor::ipfs_service::IpfsRequest; + +/// Request ID is used to create backoffs on request failures. +pub trait RequestId { + type Id: Clone + Display + Eq + Hash + Send + Sync + 'static; + + /// Returns the ID of the request. + fn request_id(&self) -> &Self::Id; +} + +impl RequestId for IpfsRequest { + type Id = ContentPath; + + fn request_id(&self) -> &ContentPath { + &self.path + } +} + +impl RequestId for Base64 { + type Id = Base64; + + fn request_id(&self) -> &Base64 { + self + } +} + +#[cfg(debug_assertions)] +impl RequestId for &'static str { + type Id = &'static str; + + fn request_id(&self) -> &Self::Id { + self + } +} diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index 3f35d570a7d..78a3c1d83c3 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -1,7 +1,7 @@ mod instance; use crate::polling_monitor::{ - spawn_monitor, ArweaveService, IpfsService, PollingMonitor, PollingMonitorMetrics, + spawn_monitor, ArweaveService, IpfsRequest, IpfsService, PollingMonitor, PollingMonitorMetrics, }; use anyhow::{self, Error}; use bytes::Bytes; @@ -18,7 +18,7 @@ use graph::{ CausalityRegion, DataSource, DataSourceTemplate, }, derive::CheapClone, - ipfs::ContentPath, + ipfs::IpfsContext, prelude::{ BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics, @@ -31,7 +31,6 @@ use std::sync::{Arc, RwLock}; use std::{collections::HashMap, time::Instant}; use self::instance::SubgraphInstance; - use super::Decoder; #[derive(Clone, CheapClone, Debug)] @@ -224,10 +223,12 @@ impl> IndexingContext { } pub struct OffchainMonitor { - ipfs_monitor: PollingMonitor, - ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>, + ipfs_monitor: PollingMonitor, + ipfs_monitor_rx: mpsc::UnboundedReceiver<(IpfsRequest, Bytes)>, arweave_monitor: PollingMonitor, arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>, + deployment_hash: DeploymentHash, + logger: Logger, } impl OffchainMonitor { @@ -251,18 +252,29 @@ impl OffchainMonitor { metrics.cheap_clone(), ); - let arweave_monitor = spawn_monitor(arweave_service, arweave_monitor_tx, logger, metrics); + let arweave_monitor = spawn_monitor( + arweave_service, + arweave_monitor_tx, + logger.cheap_clone(), + metrics, + ); + Self { ipfs_monitor, ipfs_monitor_rx, arweave_monitor, arweave_monitor_rx, + deployment_hash: subgraph_hash.to_owned(), + logger, } } fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> { match source { - offchain::Source::Ipfs(cid_file) => self.ipfs_monitor.monitor(cid_file), + offchain::Source::Ipfs(path) => self.ipfs_monitor.monitor(IpfsRequest { + ctx: IpfsContext::new(&self.deployment_hash, &self.logger), + path, + }), offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64), }; Ok(()) @@ -274,8 +286,8 @@ impl OffchainMonitor { let mut triggers = vec![]; loop { match self.ipfs_monitor_rx.try_recv() { - Ok((cid_file, data)) => triggers.push(offchain::TriggerData { - source: offchain::Source::Ipfs(cid_file), + Ok((req, data)) => triggers.push(offchain::TriggerData { + source: offchain::Source::Ipfs(req.path), data: Arc::new(data), }), Err(TryRecvError::Disconnected) => { diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index cdf3688c574..81c1a3ccd1a 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -11,6 +11,7 @@ use std::collections::BTreeSet; use crate::subgraph::runner::SubgraphRunner; use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper}; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; use graph::components::store::SourceableStore; @@ -83,7 +84,10 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< .map_err(SubgraphAssignmentProviderError::ResolveError)?; let file_bytes = link_resolver - .cat(&logger, &loc.hash.to_ipfs_link()) + .cat( + &LinkResolverContext::new(&loc.hash, &logger), + &loc.hash.to_ipfs_link(), + ) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; @@ -299,7 +303,10 @@ impl SubgraphInstanceManager { if self.subgraph_store.is_deployed(&graft.base)? { let file_bytes = self .link_resolver - .cat(&logger, &graft.base.to_ipfs_link()) + .cat( + &LinkResolverContext::new(&deployment.hash, &logger), + &graft.base.to_ipfs_link(), + ) .await?; let yaml = String::from_utf8(file_bytes)?; @@ -315,7 +322,12 @@ impl SubgraphInstanceManager { ); let manifest = manifest - .resolve(&link_resolver, &logger, ENV_VARS.max_spec_version.clone()) + .resolve( + &deployment.hash, + &link_resolver, + &logger, + ENV_VARS.max_spec_version.clone(), + ) .await?; { diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7fd7bf7c0f2..87a7ebe4663 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use graph::blockchain::Blockchain; use graph::blockchain::BlockchainKind; use graph::blockchain::BlockchainMap; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{DeploymentId, DeploymentLocator, SubscriptionManager}; use graph::components::subgraph::Settings; use graph::data::subgraph::schema::DeploymentCreate; @@ -296,15 +297,17 @@ where let raw = { let mut raw: serde_yaml::Mapping = { - let file_bytes = - resolver - .cat(&logger, &hash.to_ipfs_link()) - .await - .map_err(|e| { - SubgraphRegistrarError::ResolveError( - SubgraphManifestResolveError::ResolveError(e), - ) - })?; + let file_bytes = resolver + .cat( + &LinkResolverContext::new(&hash, &logger), + &hash.to_ipfs_link(), + ) + .await + .map_err(|e| { + SubgraphRegistrarError::ResolveError( + SubgraphManifestResolveError::ResolveError(e), + ) + })?; serde_yaml::from_slice(&file_bytes) .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? diff --git a/gnd/src/main.rs b/gnd/src/main.rs index 8837560fde2..4c34a59317e 100644 --- a/gnd/src/main.rs +++ b/gnd/src/main.rs @@ -162,7 +162,9 @@ async fn run_graph_node( ) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); - let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + let (prometheus_registry, metrics_registry) = launcher::setup_metrics(logger); + + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &metrics_registry, &logger) .await .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); @@ -180,6 +182,8 @@ async fn run_graph_node( ipfs_service, link_resolver, Some(subgraph_updates_channel), + prometheus_registry, + metrics_registry, ) .await; Ok(()) diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 8ee6ddf6fec..8f0bc565e6c 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -9,7 +9,7 @@ use crate::{ }, subgraph::InstanceDSTemplateInfo, }, - data::subgraph::UnifiedMappingApiVersion, + data::subgraph::{DeploymentHash, UnifiedMappingApiVersion}, data_source, prelude::{ transaction_receipt::LightTransactionReceipt, BlockHash, ChainStore, @@ -190,6 +190,7 @@ pub struct MockUnresolvedDataSource; impl UnresolvedDataSource for MockUnresolvedDataSource { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &slog::Logger, _manifest_idx: u32, @@ -241,6 +242,7 @@ pub struct MockUnresolvedDataSourceTemplate; impl UnresolvedDataSourceTemplate for MockUnresolvedDataSourceTemplate { async fn resolve( self, + _deployment_hash: &DeploymentHash, _resolver: &Arc, _logger: &slog::Logger, _manifest_idx: u32, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index eab4ef8ea7c..3bf54f64659 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -375,6 +375,7 @@ pub trait UnresolvedDataSourceTemplate: { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -405,6 +406,7 @@ pub trait UnresolvedDataSource: { async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index 3d78bb9244d..f743efae1d2 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -4,8 +4,8 @@ use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; -use slog::Logger; +use crate::components::link_resolver::LinkResolverContext; use crate::data::subgraph::Link; use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait}; @@ -123,17 +123,17 @@ impl LinkResolverTrait for FileLinkResolver { Box::new(self.clone()) } - async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { + async fn cat(&self, ctx: &LinkResolverContext, link: &Link) -> Result, Error> { let link = remove_prefix(&link.link); let path = self.resolve_path(&link); - slog::debug!(logger, "File resolver: reading file"; + slog::debug!(ctx.logger, "File resolver: reading file"; "path" => path.to_string_lossy().to_string()); match tokio::fs::read(&path).await { Ok(data) => Ok(data), Err(e) => { - slog::error!(logger, "Failed to read file"; + slog::error!(ctx.logger, "Failed to read file"; "path" => path.to_string_lossy().to_string(), "error" => e.to_string()); Err(anyhow!("Failed to read file {}: {}", path.display(), e).into()) @@ -145,11 +145,15 @@ impl LinkResolverTrait for FileLinkResolver { Ok(Box::new(self.clone_for_manifest(manifest_path)?)) } - async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + async fn get_block(&self, _ctx: &LinkResolverContext, _link: &Link) -> Result, Error> { Err(anyhow!("get_block is not implemented for FileLinkResolver").into()) } - async fn json_stream(&self, _logger: &Logger, _link: &Link) -> Result { + async fn json_stream( + &self, + _ctx: &LinkResolverContext, + _link: &Link, + ) -> Result { Err(anyhow!("json_stream is not implemented for FileLinkResolver").into()) } } @@ -177,20 +181,22 @@ mod tests { // Create a resolver without a base directory let resolver = FileLinkResolver::default(); - let logger = slog::Logger::root(slog::Discard, slog::o!()); // Test valid path resolution let link = Link { link: test_file_path.to_string_lossy().to_string(), }; - let result = resolver.cat(&logger, &link).await.unwrap(); + let result = resolver + .cat(&LinkResolverContext::test(), &link) + .await + .unwrap(); assert_eq!(result, test_content); // Test path with leading slash that likely doesn't exist let link = Link { link: "/test.txt".to_string(), }; - let result = resolver.cat(&logger, &link).await; + let result = resolver.cat(&LinkResolverContext::test(), &link).await; assert!( result.is_err(), "Reading /test.txt should fail as it doesn't exist" @@ -217,27 +223,32 @@ mod tests { // Create a resolver with a base directory let resolver = FileLinkResolver::with_base_dir(&temp_dir); - let logger = slog::Logger::root(slog::Discard, slog::o!()); // Test relative path (no leading slash) let link = Link { link: "test.txt".to_string(), }; - let result = resolver.cat(&logger, &link).await.unwrap(); + let result = resolver + .cat(&LinkResolverContext::test(), &link) + .await + .unwrap(); assert_eq!(result, test_content); // Test absolute path let link = Link { link: test_file_path.to_string_lossy().to_string(), }; - let result = resolver.cat(&logger, &link).await.unwrap(); + let result = resolver + .cat(&LinkResolverContext::test(), &link) + .await + .unwrap(); assert_eq!(result, test_content); // Test missing file let link = Link { link: "missing.txt".to_string(), }; - let result = resolver.cat(&logger, &link).await; + let result = resolver.cat(&LinkResolverContext::test(), &link).await; assert!(result.is_err()); // Clean up @@ -270,19 +281,24 @@ mod tests { // Create resolver with aliases let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases); - let logger = slog::Logger::root(slog::Discard, slog::o!()); // Test resolving by aliases let link1 = Link { link: "alias1".to_string(), }; - let result1 = resolver.cat(&logger, &link1).await.unwrap(); + let result1 = resolver + .cat(&LinkResolverContext::test(), &link1) + .await + .unwrap(); assert_eq!(result1, test_content1); let link2 = Link { link: "alias2".to_string(), }; - let result2 = resolver.cat(&logger, &link2).await.unwrap(); + let result2 = resolver + .cat(&LinkResolverContext::test(), &link2) + .await + .unwrap(); assert_eq!(result2, test_content2); // Test that the alias works in for_deployment as well diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 8cf717130b2..bd609247458 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -17,10 +17,10 @@ use crate::futures01::stream::Stream; use crate::futures01::try_ready; use crate::futures01::Async; use crate::futures01::Poll; -use crate::ipfs::ContentPath; -use crate::ipfs::IpfsClient; -use crate::ipfs::RetryPolicy; -use crate::prelude::{LinkResolver as LinkResolverTrait, *}; +use crate::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy}; +use crate::prelude::*; + +use super::{LinkResolver, LinkResolverContext}; #[derive(Clone, CheapClone, Derivative)] #[derivative(Debug)] @@ -51,24 +51,29 @@ impl IpfsResolver { } #[async_trait] -impl LinkResolverTrait for IpfsResolver { - fn with_timeout(&self, timeout: Duration) -> Box { +impl LinkResolver for IpfsResolver { + fn with_timeout(&self, timeout: Duration) -> Box { let mut s = self.cheap_clone(); s.timeout = timeout; Box::new(s) } - fn with_retries(&self) -> Box { + fn with_retries(&self) -> Box { let mut s = self.cheap_clone(); s.retry = true; Box::new(s) } - fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { + fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { Ok(Box::new(self.cheap_clone())) } - async fn cat(&self, _logger: &Logger, link: &Link) -> Result, Error> { + async fn cat(&self, ctx: &LinkResolverContext, link: &Link) -> Result, Error> { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let timeout = self.timeout; let max_file_size = self.max_file_size; @@ -79,17 +84,26 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash: deployment_hash.cheap_clone(), + logger: logger.cheap_clone(), + }; let data = self .client .clone() - .cat(&path, max_file_size, timeout, retry_policy) + .cat(&ctx, &path, max_file_size, timeout, retry_policy) .await? .to_vec(); Ok(data) } - async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error> { + async fn get_block(&self, ctx: &LinkResolverContext, link: &Link) -> Result, Error> { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let timeout = self.timeout; @@ -101,17 +115,30 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash: deployment_hash.cheap_clone(), + logger: logger.cheap_clone(), + }; let data = self .client .clone() - .get_block(&path, timeout, retry_policy) + .get_block(&ctx, &path, timeout, retry_policy) .await? .to_vec(); Ok(data) } - async fn json_stream(&self, logger: &Logger, link: &Link) -> Result { + async fn json_stream( + &self, + ctx: &LinkResolverContext, + link: &Link, + ) -> Result { + let LinkResolverContext { + deployment_hash, + logger, + } = ctx; + let path = ContentPath::new(&link.link)?; let max_map_file_size = self.max_map_file_size; let timeout = self.timeout; @@ -124,10 +151,14 @@ impl LinkResolverTrait for IpfsResolver { (Some(timeout), RetryPolicy::Networking) }; + let ctx = IpfsContext { + deployment_hash: deployment_hash.cheap_clone(), + logger: logger.cheap_clone(), + }; let mut stream = self .client .clone() - .cat_stream(&path, timeout, retry_policy) + .cat_stream(&ctx, &path, timeout, retry_policy) .await? .fuse() .boxed() @@ -213,8 +244,7 @@ mod tests { use super::*; use crate::env::EnvVars; use crate::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; - use crate::ipfs::IpfsRpcClient; - use crate::ipfs::ServerAddress; + use crate::ipfs::{IpfsMetrics, IpfsRpcClient, ServerAddress}; #[tokio::test] async fn max_file_size() { @@ -231,12 +261,21 @@ mod tests { let logger = crate::log::discard(); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger).unwrap(); + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + IpfsMetrics::test(), + &logger, + ) + .unwrap(); let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); - let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() }) - .await - .unwrap_err(); + let err = IpfsResolver::cat( + &resolver, + &LinkResolverContext::test(), + &Link { link: cid.clone() }, + ) + .await + .unwrap_err(); assert_eq!( err.to_string(), @@ -250,10 +289,16 @@ mod tests { .to_owned(); let logger = crate::log::discard(); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?; + let client = IpfsRpcClient::new_unchecked( + ServerAddress::local_rpc_api(), + IpfsMetrics::test(), + &logger, + )?; let resolver = IpfsResolver::new(Arc::new(client), Arc::new(env_vars)); - let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?; + let stream = + IpfsResolver::json_stream(&resolver, &LinkResolverContext::test(), &Link { link: cid }) + .await?; stream.map_ok(|sv| sv.value).try_collect().await } diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 4788a9bd51f..5ec9ecaea61 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -1,10 +1,13 @@ -use std::time::Duration; +use std::{fmt::Debug, sync::Arc, time::Duration}; use slog::Logger; -use crate::data::subgraph::Link; -use crate::prelude::Error; -use std::fmt::Debug; +use crate::{ + cheap_clone::CheapClone, + data::subgraph::{DeploymentHash, Link}, + derive::CheapClone, + prelude::Error, +}; mod arweave; mod file; @@ -25,10 +28,10 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { fn with_retries(&self) -> Box; /// Fetches the link contents as bytes. - async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error>; + async fn cat(&self, ctx: &LinkResolverContext, link: &Link) -> Result, Error>; /// Fetches the IPLD block contents as bytes. - async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; + async fn get_block(&self, ctx: &LinkResolverContext, link: &Link) -> Result, Error>; /// Creates a new resolver scoped to a specific subgraph manifest. /// @@ -48,5 +51,32 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { /// values. The values must each be on a single line; newlines are significant /// as they are used to split the file contents and each line is deserialized /// separately. - async fn json_stream(&self, logger: &Logger, link: &Link) -> Result; + async fn json_stream( + &self, + ctx: &LinkResolverContext, + link: &Link, + ) -> Result; +} + +#[derive(Debug, Clone, CheapClone)] +pub struct LinkResolverContext { + pub deployment_hash: Arc, + pub logger: Logger, +} + +impl LinkResolverContext { + pub fn new(deployment_hash: &DeploymentHash, logger: &Logger) -> Self { + Self { + deployment_hash: deployment_hash.as_str().into(), + logger: logger.cheap_clone(), + } + } + + #[cfg(debug_assertions)] + pub fn test() -> Self { + Self { + deployment_hash: "test".into(), + logger: crate::log::discard(), + } + } } diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 068b390bf64..25287a94e95 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -35,7 +35,7 @@ use crate::{ bail, blockchain::{BlockPtr, Blockchain}, components::{ - link_resolver::LinkResolver, + link_resolver::{LinkResolver, LinkResolverContext}, store::{StoreError, SubgraphStore}, }, data::{ @@ -475,13 +475,17 @@ pub struct UnresolvedSchema { impl UnresolvedSchema { pub async fn resolve( self, + deployment_hash: &DeploymentHash, spec_version: &Version, id: DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result { let schema_bytes = resolver - .cat(logger, &self.file) + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) .await .with_context(|| format!("failed to resolve schema {}", &self.file.link))?; InputSchema::parse(spec_version, &String::from_utf8(schema_bytes)?, id) @@ -891,9 +895,9 @@ impl SubgraphManifest { logger: &Logger, max_spec_version: semver::Version, ) -> Result { - let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?; + let unresolved = UnresolvedSubgraphManifest::parse(id.cheap_clone(), raw)?; let resolved = unresolved - .resolve(resolver, logger, max_spec_version) + .resolve(&id, resolver, logger, max_spec_version) .await?; Ok(resolved) } @@ -1031,6 +1035,7 @@ impl UnresolvedSubgraphManifest { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, max_spec_version: semver::Version, @@ -1067,14 +1072,16 @@ impl UnresolvedSubgraphManifest { } let schema = schema - .resolve(&spec_version, id.clone(), resolver, logger) + .resolve(&id, &spec_version, id.clone(), resolver, logger) .await?; let (data_sources, templates) = try_join( data_sources .into_iter() .enumerate() - .map(|(idx, ds)| ds.resolve(resolver, logger, idx as u32, &spec_version)) + .map(|(idx, ds)| { + ds.resolve(deployment_hash, resolver, logger, idx as u32, &spec_version) + }) .collect::>() .try_collect::>(), templates @@ -1082,6 +1089,7 @@ impl UnresolvedSubgraphManifest { .enumerate() .map(|(idx, template)| { template.resolve( + deployment_hash, resolver, &schema, logger, diff --git a/graph/src/data_source/common.rs b/graph/src/data_source/common.rs index 9f22f109860..344253cebdf 100644 --- a/graph/src/data_source/common.rs +++ b/graph/src/data_source/common.rs @@ -1,7 +1,12 @@ use crate::blockchain::block_stream::EntitySourceOperation; use crate::data::subgraph::SPEC_VERSION_1_4_0; use crate::prelude::{BlockPtr, Value}; -use crate::{components::link_resolver::LinkResolver, data::value::Word, prelude::Link}; +use crate::{ + components::link_resolver::{LinkResolver, LinkResolverContext}, + data::subgraph::DeploymentHash, + data::value::Word, + prelude::Link, +}; use anyhow::{anyhow, Context, Error}; use ethabi::{Address, Contract, Function, LogParam, ParamType, Token}; use graph_derive::CheapClone; @@ -337,15 +342,22 @@ pub struct UnresolvedMappingABI { impl UnresolvedMappingABI { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result<(MappingABI, AbiJson), anyhow::Error> { - let contract_bytes = resolver.cat(logger, &self.file).await.with_context(|| { - format!( - "failed to resolve ABI {} from {}", - self.name, self.file.link + let contract_bytes = resolver + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.file, ) - })?; + .await + .with_context(|| { + format!( + "failed to resolve ABI {} from {}", + self.name, self.file.link + ) + })?; let contract = Contract::load(&*contract_bytes) .with_context(|| format!("failed to load ABI {}", self.name))?; diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index 51abe638b42..e7fc22228ea 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -3,6 +3,8 @@ pub mod common; pub mod offchain; pub mod subgraph; +use crate::data::subgraph::DeploymentHash; + pub use self::DataSource as DataSourceEnum; pub use causality_region::CausalityRegion; @@ -329,6 +331,7 @@ pub enum UnresolvedDataSource { impl UnresolvedDataSource { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -336,11 +339,23 @@ impl UnresolvedDataSource { ) -> Result, anyhow::Error> { match self { Self::Onchain(unresolved) => unresolved - .resolve(resolver, logger, manifest_idx, spec_version) + .resolve( + deployment_hash, + resolver, + logger, + manifest_idx, + spec_version, + ) .await .map(DataSource::Onchain), Self::Subgraph(unresolved) => unresolved - .resolve::(resolver, logger, manifest_idx, spec_version) + .resolve::( + deployment_hash, + resolver, + logger, + manifest_idx, + spec_version, + ) .await .map(DataSource::Subgraph), Self::Offchain(_unresolved) => { @@ -459,6 +474,7 @@ impl Default for UnresolvedDataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, schema: &InputSchema, logger: &Logger, @@ -467,15 +483,27 @@ impl UnresolvedDataSourceTemplate { ) -> Result, Error> { match self { Self::Onchain(ds) => ds - .resolve(resolver, logger, manifest_idx, spec_version) + .resolve( + deployment_hash, + resolver, + logger, + manifest_idx, + spec_version, + ) .await .map(|ti| DataSourceTemplate::Onchain(ti)), Self::Offchain(ds) => ds - .resolve(resolver, logger, manifest_idx, schema) + .resolve(deployment_hash, resolver, logger, manifest_idx, schema) .await .map(DataSourceTemplate::Offchain), Self::Subgraph(ds) => ds - .resolve(resolver, logger, manifest_idx, spec_version) + .resolve( + deployment_hash, + resolver, + logger, + manifest_idx, + spec_version, + ) .await .map(DataSourceTemplate::Subgraph), } diff --git a/graph/src/data_source/offchain.rs b/graph/src/data_source/offchain.rs index bde499fc2d1..70459a86692 100644 --- a/graph/src/data_source/offchain.rs +++ b/graph/src/data_source/offchain.rs @@ -2,11 +2,15 @@ use crate::{ bail, blockchain::{BlockPtr, BlockTime, Blockchain}, components::{ - link_resolver::LinkResolver, + link_resolver::{LinkResolver, LinkResolverContext}, store::{BlockNumber, StoredDynamicDataSource}, subgraph::{InstanceDSTemplate, InstanceDSTemplateInfo}, }, - data::{store::scalar::Bytes, subgraph::SPEC_VERSION_0_0_7, value::Word}, + data::{ + store::scalar::Bytes, + subgraph::{DeploymentHash, SPEC_VERSION_0_0_7}, + value::Word, + }, data_source, ipfs::ContentPath, prelude::{DataSourceContext, Link}, @@ -377,6 +381,7 @@ pub struct UnresolvedMapping { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, schema: &InputSchema, logger: &Logger, @@ -400,7 +405,14 @@ impl UnresolvedMapping { api_version: semver::Version::parse(&self.api_version)?, entities, handler: self.handler, - runtime: Arc::new(resolver.cat(logger, &self.file).await?), + runtime: Arc::new( + resolver + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) + .await?, + ), link: self.file, }) } @@ -446,6 +458,7 @@ impl Into for DataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -455,7 +468,7 @@ impl UnresolvedDataSourceTemplate { let mapping = self .mapping - .resolve(resolver, schema, logger) + .resolve(deployment_hash, resolver, schema, logger) .await .with_context(|| format!("failed to resolve data source template {}", self.name))?; diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index cd728aa6a43..9f20260c6de 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -1,6 +1,9 @@ use crate::{ blockchain::{block_stream::EntitySourceOperation, Block, Blockchain}, - components::{link_resolver::LinkResolver, store::BlockNumber}, + components::{ + link_resolver::{LinkResolver, LinkResolverContext}, + store::BlockNumber, + }, data::{ subgraph::{ calls_host_fn, SubgraphManifest, UnresolvedSubgraphManifest, LATEST_VERSION, @@ -281,13 +284,17 @@ impl UnresolvedDataSource { async fn resolve_source_manifest( &self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, ) -> Result>, Error> { let resolver: Arc = Arc::from(resolver.for_manifest(&self.source.address.to_string())?); let source_raw = resolver - .cat(logger, &self.source.address.to_ipfs_link()) + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.source.address.to_ipfs_link(), + ) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", @@ -302,16 +309,17 @@ impl UnresolvedDataSource { let deployment_hash = self.source.address.clone(); - let source_manifest = UnresolvedSubgraphManifest::::parse(deployment_hash, source_raw) - .context(format!( - "Failed to parse source subgraph [{}] manifest", - self.source.address - ))?; + let source_manifest = + UnresolvedSubgraphManifest::::parse(deployment_hash.cheap_clone(), source_raw) + .context(format!( + "Failed to parse source subgraph [{}] manifest", + self.source.address + ))?; let resolver: Arc = Arc::from(resolver.for_manifest(&self.source.address.to_string())?); source_manifest - .resolve(&resolver, logger, LATEST_VERSION.clone()) + .resolve(&deployment_hash, &resolver, logger, LATEST_VERSION.clone()) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", @@ -343,7 +351,10 @@ impl UnresolvedDataSource { // If there's a graft, recursively verify it if let Some(graft) = &manifest.graft { let graft_raw = resolver - .cat(logger, &graft.base.to_ipfs_link()) + .cat( + &LinkResolverContext::new(&manifest.id, logger), + &graft.base.to_ipfs_link(), + ) .await .context("Failed to resolve graft base manifest")?; @@ -353,7 +364,7 @@ impl UnresolvedDataSource { let graft_manifest = UnresolvedSubgraphManifest::::parse(graft.base.clone(), graft_raw) .context("Failed to parse graft base manifest")? - .resolve(resolver, logger, LATEST_VERSION.clone()) + .resolve(&manifest.id, resolver, logger, LATEST_VERSION.clone()) .await .context("Failed to resolve graft base manifest")?; @@ -371,6 +382,7 @@ impl UnresolvedDataSource { pub(super) async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -383,7 +395,9 @@ impl UnresolvedDataSource { ); let kind = self.kind.clone(); - let source_manifest = self.resolve_source_manifest::(resolver, logger).await?; + let source_manifest = self + .resolve_source_manifest::(deployment_hash, resolver, logger) + .await?; let source_spec_version = &source_manifest.spec_version; if source_spec_version < &SPEC_VERSION_1_3_0 { return Err(anyhow!( @@ -435,7 +449,10 @@ impl UnresolvedDataSource { name: self.name, network: self.network, source, - mapping: self.mapping.resolve(resolver, logger, spec_version).await?, + mapping: self + .mapping + .resolve(deployment_hash, resolver, logger, spec_version) + .await?, context: Arc::new(self.context), creation_block: None, }) @@ -445,6 +462,7 @@ impl UnresolvedDataSource { impl UnresolvedMapping { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, spec_version: &semver::Version, @@ -459,7 +477,9 @@ impl UnresolvedMapping { let resolver = Arc::clone(resolver); let logger = logger.clone(); async move { - let resolved_abi = unresolved_abi.resolve(&resolver, &logger).await?; + let resolved_abi = unresolved_abi + .resolve(deployment_hash, &resolver, &logger) + .await?; Ok::<_, Error>(resolved_abi) } }) @@ -510,7 +530,14 @@ impl UnresolvedMapping { entities: self.entities, handlers: resolved_handlers, abis: mapping_abis, - runtime: Arc::new(resolver.cat(logger, &self.file).await?), + runtime: Arc::new( + resolver + .cat( + &LinkResolverContext::new(deployment_hash, logger), + &self.file, + ) + .await?, + ), link: self.file, }) } @@ -556,6 +583,7 @@ impl Into for DataSourceTemplate { impl UnresolvedDataSourceTemplate { pub async fn resolve( self, + deployment_hash: &DeploymentHash, resolver: &Arc, logger: &Logger, manifest_idx: u32, @@ -565,7 +593,7 @@ impl UnresolvedDataSourceTemplate { let mapping = self .mapping - .resolve(resolver, logger, spec_version) + .resolve(deployment_hash, resolver, logger, spec_version) .await .with_context(|| format!("failed to resolve data source template {}", self.name))?; diff --git a/graph/src/ipfs/cache.rs b/graph/src/ipfs/cache.rs index 4c15e2cbc3d..e0e256a7c22 100644 --- a/graph/src/ipfs/cache.rs +++ b/graph/src/ipfs/cache.rs @@ -20,7 +20,8 @@ use tokio::sync::Mutex as AsyncMutex; use crate::{env::ENV_VARS, prelude::CheapClone}; use super::{ - ContentPath, IpfsClient, IpfsError, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ContentPath, IpfsClient, IpfsContext, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, + IpfsResult, RetryPolicy, }; struct RedisClient { @@ -217,39 +218,38 @@ pub struct CachingClient { } impl CachingClient { - pub async fn new(client: Arc) -> IpfsResult { + pub async fn new(client: Arc, logger: &Logger) -> IpfsResult { let env = &ENV_VARS.mappings; let cache = Cache::new( - client.logger(), + logger, env.max_ipfs_cache_size as usize, env.max_ipfs_cache_file_size, env.ipfs_cache_location.clone(), ) .await?; + Ok(CachingClient { client, cache }) } - async fn with_cache(&self, path: &ContentPath, f: F) -> IpfsResult + async fn with_cache(&self, logger: Logger, path: &ContentPath, f: F) -> IpfsResult where F: AsyncFnOnce() -> IpfsResult, { - if let Some(data) = self.cache.find(self.logger(), path).await { + if let Some(data) = self.cache.find(&logger, path).await { return Ok(data); } let data = f().await?; - self.cache - .insert(self.logger(), path.clone(), data.clone()) - .await; + self.cache.insert(&logger, path.clone(), data.clone()).await; Ok(data) } } #[async_trait] impl IpfsClient for CachingClient { - fn logger(&self) -> &Logger { - self.client.logger() + fn metrics(&self) -> &IpfsMetrics { + self.client.metrics() } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -258,16 +258,17 @@ impl IpfsClient for CachingClient { async fn cat( self: Arc, + ctx: &IpfsContext, path: &ContentPath, max_size: usize, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - self.with_cache(path, async || { + self.with_cache(ctx.logger(path), path, async || { { self.client .cheap_clone() - .cat(path, max_size, timeout, retry_policy) + .cat(ctx, path, max_size, timeout, retry_policy) .await } }) @@ -276,14 +277,15 @@ impl IpfsClient for CachingClient { async fn get_block( self: Arc, + ctx: &IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - self.with_cache(path, async || { + self.with_cache(ctx.logger(path), path, async || { self.client .cheap_clone() - .get_block(path, timeout, retry_policy) + .get_block(ctx, path, timeout, retry_policy) .await }) .await diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index 65b5a74a9f6..06bf7aee99c 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; @@ -10,16 +10,16 @@ use futures03::StreamExt; use futures03::TryStreamExt; use slog::Logger; -use crate::ipfs::ContentPath; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; +use crate::cheap_clone::CheapClone as _; +use crate::data::subgraph::DeploymentHash; +use crate::derive::CheapClone; +use crate::ipfs::{ContentPath, IpfsError, IpfsMetrics, IpfsResult, RetryPolicy}; /// A read-only connection to an IPFS server. #[async_trait] pub trait IpfsClient: Send + Sync + 'static { - /// Returns the logger associated with the client. - fn logger(&self) -> &Logger; + /// Returns the metrics associated with the IPFS client. + fn metrics(&self) -> &IpfsMetrics; /// Sends a request to the IPFS server and returns a raw response. async fn call(self: Arc, req: IpfsRequest) -> IpfsResult; @@ -32,21 +32,32 @@ pub trait IpfsClient: Send + Sync + 'static { /// The timeout is not propagated to the resulting stream. async fn cat_stream( self: Arc, + ctx: &IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult>> { let fut = retry_policy - .create(format!("IPFS.cat_stream[{}]", path), self.logger()) + .create("IPFS.cat_stream", &ctx.logger(path)) .no_timeout() .run({ - let path = path.to_owned(); + let path = path.cheap_clone(); + let deployment_hash = ctx.deployment_hash(); move || { - let path = path.clone(); - let client = self.clone(); + let client = self.cheap_clone(); + let metrics = self.metrics().cheap_clone(); + let deployment_hash = deployment_hash.cheap_clone(); + let path = path.cheap_clone(); - async move { client.call(IpfsRequest::Cat(path)).await } + async move { + run_with_metrics( + client.call(IpfsRequest::Cat(path)), + deployment_hash, + metrics, + ) + .await + } } }); @@ -61,27 +72,34 @@ pub trait IpfsClient: Send + Sync + 'static { /// does not return a response within the specified amount of time. async fn cat( self: Arc, + ctx: &IpfsContext, path: &ContentPath, max_size: usize, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create(format!("IPFS.cat[{}]", path), self.logger()) + .create("IPFS.cat", &ctx.logger(path)) .no_timeout() .run({ - let path = path.to_owned(); + let path = path.cheap_clone(); + let deployment_hash = ctx.deployment_hash(); move || { - let path = path.clone(); - let client = self.clone(); + let client = self.cheap_clone(); + let metrics = self.metrics().cheap_clone(); + let deployment_hash = deployment_hash.cheap_clone(); + let path = path.cheap_clone(); async move { - client - .call(IpfsRequest::Cat(path)) - .await? - .bytes(Some(max_size)) - .await + run_with_metrics( + client.call(IpfsRequest::Cat(path)), + deployment_hash, + metrics, + ) + .await? + .bytes(Some(max_size)) + .await } } }); @@ -95,26 +113,33 @@ pub trait IpfsClient: Send + Sync + 'static { /// does not return a response within the specified amount of time. async fn get_block( self: Arc, + ctx: &IpfsContext, path: &ContentPath, timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create(format!("IPFS.get_block[{}]", path), self.logger()) + .create("IPFS.get_block", &ctx.logger(path)) .no_timeout() .run({ - let path = path.to_owned(); + let path = path.cheap_clone(); + let deployment_hash = ctx.deployment_hash(); move || { - let path = path.clone(); - let client = self.clone(); + let client = self.cheap_clone(); + let metrics = self.metrics().cheap_clone(); + let deployment_hash = deployment_hash.cheap_clone(); + let path = path.cheap_clone(); async move { - client - .call(IpfsRequest::GetBlock(path)) - .await? - .bytes(None) - .await + run_with_metrics( + client.call(IpfsRequest::GetBlock(path)), + deployment_hash, + metrics, + ) + .await? + .bytes(None) + .await } } }); @@ -123,6 +148,39 @@ pub trait IpfsClient: Send + Sync + 'static { } } +#[derive(Clone, Debug, CheapClone)] +pub struct IpfsContext { + pub deployment_hash: Arc, + pub logger: Logger, +} + +impl IpfsContext { + pub fn new(deployment_hash: &DeploymentHash, logger: &Logger) -> Self { + Self { + deployment_hash: deployment_hash.as_str().into(), + logger: logger.cheap_clone(), + } + } + + pub(super) fn deployment_hash(&self) -> Arc { + self.deployment_hash.cheap_clone() + } + + pub(super) fn logger(&self, path: &ContentPath) -> Logger { + self.logger.new( + slog::o!("deployment" => self.deployment_hash.to_string(), "path" => path.to_string()), + ) + } + + #[cfg(debug_assertions)] + pub fn test() -> Self { + Self { + deployment_hash: "test".into(), + logger: crate::log::discard(), + } + } +} + /// Describes a request to an IPFS server. #[derive(Clone, Debug)] pub enum IpfsRequest { @@ -193,3 +251,27 @@ where None => fut.await, } } + +async fn run_with_metrics( + fut: F, + deployment_hash: Arc, + metrics: IpfsMetrics, +) -> IpfsResult +where + F: Future>, +{ + let timer = Instant::now(); + metrics.add_request(&deployment_hash); + + fut.await + .inspect(|_resp| { + metrics.observe_request_duration(&deployment_hash, timer.elapsed().as_secs_f64()) + }) + .inspect_err(|err| { + if err.is_timeout() { + metrics.add_not_found(&deployment_hash) + } else { + metrics.add_error(&deployment_hash) + } + }) +} diff --git a/graph/src/ipfs/content_path.rs b/graph/src/ipfs/content_path.rs index 2032526b6ae..39c8b95d29e 100644 --- a/graph/src/ipfs/content_path.rs +++ b/graph/src/ipfs/content_path.rs @@ -1,60 +1,112 @@ +use std::sync::Arc; + use anyhow::anyhow; use cid::Cid; +use url::Url; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsResult; +use crate::{ + derive::CheapClone, + ipfs::{IpfsError, IpfsResult}, +}; /// Represents a path to some data on IPFS. -#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, CheapClone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ContentPath { + inner: Arc, +} + +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct Inner { cid: Cid, path: Option, } impl ContentPath { /// Creates a new [ContentPath] from the specified input. + /// + /// Supports the following formats: + /// - [/] + /// - /ipfs/[/] + /// - ipfs://[/] + /// - http[s]://.../ipfs/[/] + /// - http[s]://.../api/v0/cat?arg=[/] pub fn new(input: impl AsRef) -> IpfsResult { - let input = input.as_ref(); + let input = input.as_ref().trim(); if input.is_empty() { return Err(IpfsError::InvalidContentPath { - input: "".to_owned(), - source: anyhow!("path is empty"), + input: "".to_string(), + source: anyhow!("content path is empty"), }); } - let (cid, path) = input - .strip_prefix("/ipfs/") - .unwrap_or(input) - .split_once('/') - .unwrap_or((input, "")); + if input.starts_with("http://") || input.starts_with("https://") { + return Self::parse_from_url(input); + } + + Self::parse_from_cid_and_path(input) + } + + fn parse_from_url(input: &str) -> IpfsResult { + let url = Url::parse(input).map_err(|_err| IpfsError::InvalidContentPath { + input: input.to_string(), + source: anyhow!("input is not a valid URL"), + })?; + + if let Some((_, x)) = url.query_pairs().find(|(key, _)| key == "arg") { + return Self::parse_from_cid_and_path(&x); + } + + if let Some((_, x)) = url.path().split_once("/ipfs/") { + return Self::parse_from_cid_and_path(x); + } + + Self::parse_from_cid_and_path(url.path()) + } + + fn parse_from_cid_and_path(mut input: &str) -> IpfsResult { + input = input.trim_matches('/'); + + for prefix in ["ipfs/", "ipfs://"] { + if let Some(input_without_prefix) = input.strip_prefix(prefix) { + input = input_without_prefix + } + } + + let (cid, path) = input.split_once('/').unwrap_or((input, "")); let cid = cid .parse::() .map_err(|err| IpfsError::InvalidContentPath { - input: input.to_owned(), + input: input.to_string(), source: anyhow::Error::from(err).context("invalid CID"), })?; if path.contains('?') { return Err(IpfsError::InvalidContentPath { - input: input.to_owned(), + input: input.to_string(), source: anyhow!("query parameters not allowed"), }); } Ok(Self { - cid, - path: (!path.is_empty()).then_some(path.to_owned()), + inner: Arc::new(Inner { + cid, + path: if path.is_empty() { + None + } else { + Some(path.to_string()) + }, + }), }) } pub fn cid(&self) -> &Cid { - &self.cid + &self.inner.cid } pub fn path(&self) -> Option<&str> { - self.path.as_deref() + self.inner.path.as_deref() } } @@ -81,9 +133,9 @@ impl TryFrom for ContentPath { impl std::fmt::Display for ContentPath { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let cid = &self.cid; + let cid = &self.inner.cid; - match self.path { + match self.inner.path { Some(ref path) => write!(f, "{cid}/{path}"), None => write!(f, "{cid}"), } @@ -97,13 +149,22 @@ mod tests { const CID_V0: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; const CID_V1: &str = "bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354"; + fn make_path(cid: &str, path: Option<&str>) -> ContentPath { + ContentPath { + inner: Arc::new(Inner { + cid: cid.parse().unwrap(), + path: path.map(ToOwned::to_owned), + }), + } + } + #[test] fn fails_on_empty_input() { let err = ContentPath::new("").unwrap_err(); assert_eq!( err.to_string(), - "'' is not a valid IPFS content path: path is empty", + "'' is not a valid IPFS content path: content path is empty", ); } @@ -119,75 +180,37 @@ mod tests { #[test] fn accepts_a_valid_cid_v0() { let path = ContentPath::new(CID_V0).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V0, None)); } #[test] fn accepts_a_valid_cid_v1() { let path = ContentPath::new(CID_V1).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V1.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V1, None)); } #[test] - fn fails_on_a_leading_slash_followed_by_a_valid_cid() { - let err = ContentPath::new(format!("/{CID_V0}")).unwrap_err(); + fn accepts_and_removes_leading_slashes() { + let path = ContentPath::new(format!("/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); - assert!(err.to_string().starts_with(&format!( - "'/{CID_V0}' is not a valid IPFS content path: invalid CID: " - ))); + let path = ContentPath::new(format!("///////{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); } #[test] - fn ignores_the_first_slash_after_the_cid() { + fn accepts_and_removes_trailing_slashes() { let path = ContentPath::new(format!("{CID_V0}/")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + let path = ContentPath::new(format!("{CID_V0}///////")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); } #[test] fn accepts_a_path_after_the_cid() { let path = ContentPath::new(format!("{CID_V0}/readme.md")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("readme.md".to_owned()), - } - ); - } - - #[test] - fn accepts_multiple_consecutive_slashes_after_the_cid() { - let path = ContentPath::new(format!("{CID_V0}//")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("/".to_owned()), - } - ); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); } #[test] @@ -214,23 +237,67 @@ mod tests { #[test] fn accepts_and_removes_the_ipfs_prefix() { let path = ContentPath::new(format!("/ipfs/{CID_V0}")).unwrap(); - - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: None, - } - ); + assert_eq!(path, make_path(CID_V0, None)); let path = ContentPath::new(format!("/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } - assert_eq!( - path, - ContentPath { - cid: CID_V0.parse().unwrap(), - path: Some("readme.md".to_owned()), - } - ); + #[test] + fn accepts_and_removes_the_ipfs_schema() { + let path = ContentPath::new(format!("ipfs://{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("ipfs://{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_ipfs_rpc_urls() { + let path = ContentPath::new(format!("http://ipfs.com/api/v0/cat?arg={CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = + ContentPath::new(format!("http://ipfs.com/api/v0/cat?arg={CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/api/v0/cat?arg={CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!( + "https://ipfs.com/api/v0/cat?arg={CID_V0}/readme.md" + )) + .unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_ipfs_gateway_urls() { + let path = ContentPath::new(format!("http://ipfs.com/ipfs/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("http://ipfs.com/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/ipfs/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("https://ipfs.com/ipfs/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + } + + #[test] + fn accepts_and_parses_paths_from_urls() { + let path = ContentPath::new(format!("http://ipfs.com/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("http://ipfs.com/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); + + let path = ContentPath::new(format!("https://ipfs.com/{CID_V0}")).unwrap(); + assert_eq!(path, make_path(CID_V0, None)); + + let path = ContentPath::new(format!("https://ipfs.com/{CID_V0}/readme.md")).unwrap(); + assert_eq!(path, make_path(CID_V0, Some("readme.md"))); } } diff --git a/graph/src/ipfs/error.rs b/graph/src/ipfs/error.rs index 1722b02f467..6553813628b 100644 --- a/graph/src/ipfs/error.rs +++ b/graph/src/ipfs/error.rs @@ -50,7 +50,7 @@ pub enum IpfsError { #[error(transparent)] RequestFailed(RequestError), - #[error("Invalid cache configuration: {source}")] + #[error("Invalid cache configuration: {source:#}")] InvalidCacheConfig { source: anyhow::Error }, } diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index 4b190cb0133..5c2da25daff 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -5,17 +5,14 @@ use async_trait::async_trait; use derivative::Derivative; use http::header::ACCEPT; use http::header::CACHE_CONTROL; -use reqwest::StatusCode; +use reqwest::{redirect::Policy as RedirectPolicy, StatusCode}; use slog::Logger; use crate::env::ENV_VARS; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; -use crate::ipfs::ServerAddress; +use crate::ipfs::{ + IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ServerAddress, +}; /// A client that connects to an IPFS gateway. /// @@ -28,14 +25,19 @@ pub struct IpfsGatewayClient { #[derivative(Debug = "ignore")] http_client: reqwest::Client, + metrics: IpfsMetrics, logger: Logger, } impl IpfsGatewayClient { /// Creates a new [IpfsGatewayClient] with the specified server address. /// Verifies that the server is responding to IPFS gateway requests. - pub(crate) async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { - let client = Self::new_unchecked(server_address, logger)?; + pub(crate) async fn new( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { + let client = Self::new_unchecked(server_address, metrics, logger)?; client .send_test_request() @@ -50,10 +52,20 @@ impl IpfsGatewayClient { /// Creates a new [IpfsGatewayClient] with the specified server address. /// Does not verify that the server is responding to IPFS gateway requests. - pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + pub fn new_unchecked( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { Ok(Self { server_address: ServerAddress::new(server_address)?, - http_client: reqwest::Client::new(), + http_client: reqwest::Client::builder() + // IPFS gateways allow requests to directory CIDs. + // However, they sometimes redirect before displaying the directory listing. + // This policy permits that behavior. + .redirect(RedirectPolicy::limited(1)) + .build()?, + metrics, logger: logger.to_owned(), }) } @@ -113,8 +125,8 @@ impl IpfsGatewayClient { #[async_trait] impl IpfsClient for IpfsGatewayClient { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + &self.metrics } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -158,7 +170,8 @@ mod tests { use wiremock::ResponseTemplate; use super::*; - use crate::ipfs::ContentPath; + use crate::data::subgraph::DeploymentHash; + use crate::ipfs::{ContentPath, IpfsContext, IpfsMetrics}; use crate::log::discard; const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -189,7 +202,9 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; - let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsGatewayClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()) + .unwrap(); (server, Arc::new(client)) } @@ -206,7 +221,7 @@ mod tests { async fn new_fails_to_create_the_client_if_gateway_is_not_accessible() { let server = mock_server().await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), IpfsMetrics::test(), &discard()) .await .unwrap_err(); } @@ -222,7 +237,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), IpfsMetrics::test(), &discard()) .await .unwrap(); @@ -232,7 +247,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), IpfsMetrics::test(), &discard()) .await .unwrap(); } @@ -252,7 +267,7 @@ mod tests { .mount(&server) .await; - IpfsGatewayClient::new(server.uri(), &discard()) + IpfsGatewayClient::new(server.uri(), IpfsMetrics::test(), &discard()) .await .unwrap(); } @@ -261,7 +276,7 @@ mod tests { async fn new_unchecked_creates_the_client_without_checking_the_gateway() { let server = mock_server().await; - IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + IpfsGatewayClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()).unwrap(); } #[tokio::test] @@ -275,7 +290,7 @@ mod tests { .await; let bytes = client - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -300,7 +315,12 @@ mod tests { .await; let result = client - .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .cat_stream( + &IpfsContext::test(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await; assert!(matches!(result, Err(_))); @@ -324,7 +344,12 @@ mod tests { .await; let _stream = client - .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .cat_stream( + &IpfsContext::test(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); } @@ -340,7 +365,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -360,7 +391,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), data.len(), None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + data.len(), + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -380,7 +417,13 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + data.len() - 1, + None, + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -396,7 +439,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + usize::MAX, + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -420,6 +469,7 @@ mod tests { let bytes = client .cat( + &IpfsContext::test(), &make_path(), usize::MAX, None, @@ -442,7 +492,7 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap(); @@ -460,7 +510,12 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) + .get_block( + &IpfsContext::test(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -483,7 +538,12 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .get_block( + &IpfsContext::test(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); @@ -507,10 +567,31 @@ mod tests { fn log( &self, record: &Record, - _: &slog::OwnedKVList, + values: &slog::OwnedKVList, ) -> std::result::Result { - let message = format!("{}", record.msg()); + use slog::KV; + + let mut serialized_values = String::new(); + let mut serializer = StringSerializer(&mut serialized_values); + values.serialize(record, &mut serializer).unwrap(); + + let message = format!("{}; {serialized_values}", record.msg()); self.messages.lock().unwrap().push(message); + + Ok(()) + } + } + + struct StringSerializer<'a>(&'a mut String); + + impl<'a> slog::Serializer for StringSerializer<'a> { + fn emit_arguments( + &mut self, + key: slog::Key, + val: &std::fmt::Arguments, + ) -> slog::Result { + use std::fmt::Write; + write!(self.0, "{}: {}, ", key, val).unwrap(); Ok(()) } } @@ -522,7 +603,9 @@ mod tests { let logger = Logger::root(drain.fuse(), o!()); let server = mock_server().await; - let client = Arc::new(IpfsGatewayClient::new_unchecked(server.uri(), &logger).unwrap()); + let client = Arc::new( + IpfsGatewayClient::new_unchecked(server.uri(), IpfsMetrics::test(), &logger).unwrap(), + ); // Set up mock to fail twice then succeed to trigger retry with warning logs mock_get() @@ -542,7 +625,13 @@ mod tests { // This should trigger retry logs because we set up failures first let _result = client - .cat(&path, usize::MAX, None, RetryPolicy::NonDeterministic) + .cat( + &IpfsContext::new(&DeploymentHash::default(), &logger), + &path, + usize::MAX, + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); @@ -563,7 +652,7 @@ mod tests { let expected_cid = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; let has_cid_in_operation = retry_messages .iter() - .any(|msg| msg.contains(&format!("IPFS.cat[{}]", expected_cid))); + .any(|msg| msg.contains(&format!("path: {expected_cid}"))); assert!( has_cid_in_operation, diff --git a/graph/src/ipfs/metrics.rs b/graph/src/ipfs/metrics.rs new file mode 100644 index 00000000000..48d6e3c7893 --- /dev/null +++ b/graph/src/ipfs/metrics.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use prometheus::{HistogramVec, IntCounterVec}; + +use crate::{components::metrics::MetricsRegistry, derive::CheapClone}; + +#[derive(Debug, Clone, CheapClone)] +pub struct IpfsMetrics { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + request_count: Box, + error_count: Box, + not_found_count: Box, + request_duration: Box, +} + +impl IpfsMetrics { + pub fn new(registry: &MetricsRegistry) -> Self { + let request_count = registry + .new_int_counter_vec( + "ipfs_request_count", + "The total number of IPFS requests.", + &["deployment"], + ) + .unwrap(); + + let error_count = registry + .new_int_counter_vec( + "ipfs_error_count", + "The total number of failed IPFS requests.", + &["deployment"], + ) + .unwrap(); + + let not_found_count = registry + .new_int_counter_vec( + "ipfs_not_found_count", + "The total number of IPFS requests that timed out.", + &["deployment"], + ) + .unwrap(); + + let request_duration = registry + .new_histogram_vec( + "ipfs_request_duration", + "The duration of successful IPFS requests.\n\ + The time it takes to download the response body is not included.", + vec!["deployment".to_owned()], + vec![ + 0.2, 0.5, 1.0, 5.0, 10.0, 20.0, 30.0, 60.0, 90.0, 120.0, 180.0, 240.0, + ], + ) + .unwrap(); + + Self { + inner: Arc::new(Inner { + request_count, + error_count, + not_found_count, + request_duration, + }), + } + } + + pub(super) fn add_request(&self, deployment_hash: &str) { + self.inner + .request_count + .with_label_values(&[deployment_hash]) + .inc() + } + + pub(super) fn add_error(&self, deployment_hash: &str) { + self.inner + .error_count + .with_label_values(&[deployment_hash]) + .inc() + } + + pub(super) fn add_not_found(&self, deployment_hash: &str) { + self.inner + .not_found_count + .with_label_values(&[deployment_hash]) + .inc() + } + + pub(super) fn observe_request_duration(&self, deployment_hash: &str, duration_secs: f64) { + self.inner + .request_duration + .with_label_values(&[deployment_hash]) + .observe(duration_secs.clamp(0.2, 240.0)); + } + + #[cfg(debug_assertions)] + pub fn test() -> Self { + Self::new(&MetricsRegistry::mock()) + } +} diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs index 3a5fe211d26..403cbf614cd 100644 --- a/graph/src/ipfs/mod.rs +++ b/graph/src/ipfs/mod.rs @@ -8,6 +8,7 @@ use futures03::stream::StreamExt; use slog::info; use slog::Logger; +use crate::components::metrics::MetricsRegistry; use crate::util::security::SafeDisplay; mod cache; @@ -15,6 +16,7 @@ mod client; mod content_path; mod error; mod gateway_client; +mod metrics; mod pool; mod retry_policy; mod rpc_client; @@ -22,13 +24,12 @@ mod server_address; pub mod test_utils; -pub use self::client::IpfsClient; -pub use self::client::IpfsRequest; -pub use self::client::IpfsResponse; +pub use self::client::{IpfsClient, IpfsContext, IpfsRequest, IpfsResponse}; pub use self::content_path::ContentPath; pub use self::error::IpfsError; pub use self::error::RequestError; pub use self::gateway_client::IpfsGatewayClient; +pub use self::metrics::IpfsMetrics; pub use self::pool::IpfsClientPool; pub use self::retry_policy::RetryPolicy; pub use self::rpc_client::IpfsRpcClient; @@ -45,12 +46,14 @@ pub type IpfsResult = Result; /// All clients are set up to cache results pub async fn new_ipfs_client( server_addresses: I, + registry: &MetricsRegistry, logger: &Logger, ) -> IpfsResult> where I: IntoIterator, S: AsRef, { + let metrics = IpfsMetrics::new(registry); let mut clients: Vec> = Vec::new(); for server_address in server_addresses { @@ -62,8 +65,8 @@ where SafeDisplay(server_address) ); - let client = use_first_valid_api(server_address, logger).await?; - let client = Arc::new(CachingClient::new(client).await?); + let client = use_first_valid_api(server_address, metrics.clone(), logger).await?; + let client = Arc::new(CachingClient::new(client, logger).await?); clients.push(client); } @@ -76,8 +79,7 @@ where n => { info!(logger, "Creating a pool of {} IPFS clients", n); - let pool = IpfsClientPool::new(clients, logger); - + let pool = IpfsClientPool::new(clients); Ok(Arc::new(pool)) } } @@ -85,11 +87,12 @@ where async fn use_first_valid_api( server_address: &str, + metrics: IpfsMetrics, logger: &Logger, ) -> IpfsResult> { let supported_apis: Vec>>> = vec![ Box::pin(async { - IpfsGatewayClient::new(server_address, logger) + IpfsGatewayClient::new(server_address, metrics.clone(), logger) .await .map(|client| { info!( @@ -102,7 +105,7 @@ async fn use_first_valid_api( }) }), Box::pin(async { - IpfsRpcClient::new(server_address, logger) + IpfsRpcClient::new(server_address, metrics.clone(), logger) .await .map(|client| { info!( diff --git a/graph/src/ipfs/pool.rs b/graph/src/ipfs/pool.rs index 80abd7ca3e8..dab1191ccce 100644 --- a/graph/src/ipfs/pool.rs +++ b/graph/src/ipfs/pool.rs @@ -4,13 +4,8 @@ use anyhow::anyhow; use async_trait::async_trait; use futures03::stream::FuturesUnordered; use futures03::stream::StreamExt; -use slog::Logger; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; +use crate::ipfs::{IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult}; /// Contains a list of IPFS clients and, for each read request, selects the fastest IPFS client /// that can provide the content and streams the response from that client. @@ -19,23 +14,21 @@ use crate::ipfs::IpfsResult; /// as some of them may already have the content cached. pub struct IpfsClientPool { clients: Vec>, - logger: Logger, } impl IpfsClientPool { /// Creates a new IPFS client pool from the specified clients. - pub fn new(clients: Vec>, logger: &Logger) -> Self { - Self { - clients, - logger: logger.to_owned(), - } + pub fn new(clients: Vec>) -> Self { + assert!(!clients.is_empty()); + Self { clients } } } #[async_trait] impl IpfsClient for IpfsClientPool { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + // All clients are expected to share the same metrics. + self.clients[0].metrics() } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -82,9 +75,7 @@ mod tests { use wiremock::ResponseTemplate; use super::*; - use crate::ipfs::ContentPath; - use crate::ipfs::IpfsGatewayClient; - use crate::ipfs::RetryPolicy; + use crate::ipfs::{ContentPath, IpfsContext, IpfsGatewayClient, IpfsMetrics, RetryPolicy}; use crate::log::discard; const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -95,7 +86,9 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = MockServer::start().await; - let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsGatewayClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()) + .unwrap(); (server, Arc::new(client)) } @@ -145,10 +138,10 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -198,10 +191,16 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -245,10 +244,10 @@ mod tests { .await; let clients: Vec> = vec![client_1, client_2, client_3]; - let pool = Arc::new(IpfsClientPool::new(clients, &discard())); + let pool = Arc::new(IpfsClientPool::new(clients)); let bytes = pool - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap(); diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs index 16976537044..8d5d6fe643d 100644 --- a/graph/src/ipfs/rpc_client.rs +++ b/graph/src/ipfs/rpc_client.rs @@ -10,13 +10,10 @@ use reqwest::StatusCode; use slog::Logger; use crate::env::ENV_VARS; -use crate::ipfs::IpfsClient; -use crate::ipfs::IpfsError; -use crate::ipfs::IpfsRequest; -use crate::ipfs::IpfsResponse; -use crate::ipfs::IpfsResult; -use crate::ipfs::RetryPolicy; -use crate::ipfs::ServerAddress; +use crate::ipfs::{ + IpfsClient, IpfsError, IpfsMetrics, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, + ServerAddress, +}; /// A client that connects to an IPFS RPC API. /// @@ -29,6 +26,7 @@ pub struct IpfsRpcClient { #[derivative(Debug = "ignore")] http_client: reqwest::Client, + metrics: IpfsMetrics, logger: Logger, test_request_timeout: Duration, } @@ -36,8 +34,12 @@ pub struct IpfsRpcClient { impl IpfsRpcClient { /// Creates a new [IpfsRpcClient] with the specified server address. /// Verifies that the server is responding to IPFS RPC API requests. - pub async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { - let client = Self::new_unchecked(server_address, logger)?; + pub async fn new( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { + let client = Self::new_unchecked(server_address, metrics, logger)?; client .send_test_request() @@ -52,10 +54,15 @@ impl IpfsRpcClient { /// Creates a new [IpfsRpcClient] with the specified server address. /// Does not verify that the server is responding to IPFS RPC API requests. - pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + pub fn new_unchecked( + server_address: impl AsRef, + metrics: IpfsMetrics, + logger: &Logger, + ) -> IpfsResult { Ok(Self { server_address: ServerAddress::new(server_address)?, http_client: reqwest::Client::new(), + metrics, logger: logger.to_owned(), test_request_timeout: ENV_VARS.ipfs_request_timeout, }) @@ -113,8 +120,8 @@ impl IpfsRpcClient { #[async_trait] impl IpfsClient for IpfsRpcClient { - fn logger(&self) -> &Logger { - &self.logger + fn metrics(&self) -> &IpfsMetrics { + &self.metrics } async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { @@ -142,7 +149,7 @@ mod tests { use wiremock::ResponseTemplate; use super::*; - use crate::ipfs::ContentPath; + use crate::ipfs::{ContentPath, IpfsContext, IpfsMetrics}; use crate::log::discard; const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; @@ -165,7 +172,8 @@ mod tests { async fn make_client() -> (MockServer, Arc) { let server = mock_server().await; - let client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + let client = + IpfsRpcClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()).unwrap(); (server, Arc::new(client)) } @@ -182,7 +190,7 @@ mod tests { async fn new_fails_to_create_the_client_if_rpc_api_is_not_accessible() { let server = mock_server().await; - IpfsRpcClient::new(server.uri(), &discard()) + IpfsRpcClient::new(server.uri(), IpfsMetrics::test(), &discard()) .await .unwrap_err(); } @@ -197,7 +205,9 @@ mod tests { .mount(&server) .await; - IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + IpfsRpcClient::new(server.uri(), IpfsMetrics::test(), &discard()) + .await + .unwrap(); } #[tokio::test] @@ -217,14 +227,16 @@ mod tests { .mount(&server) .await; - IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + IpfsRpcClient::new(server.uri(), IpfsMetrics::test(), &discard()) + .await + .unwrap(); } #[tokio::test] async fn new_unchecked_creates_the_client_without_checking_the_rpc_api() { let server = mock_server().await; - IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + IpfsRpcClient::new_unchecked(server.uri(), IpfsMetrics::test(), &discard()).unwrap(); } #[tokio::test] @@ -238,7 +250,7 @@ mod tests { .await; let bytes = client - .cat_stream(&make_path(), None, RetryPolicy::None) + .cat_stream(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap() .try_fold(BytesMut::new(), |mut acc, chunk| async { @@ -263,7 +275,12 @@ mod tests { .await; let result = client - .cat_stream(&make_path(), Some(ms(300)), RetryPolicy::None) + .cat_stream( + &IpfsContext::test(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await; assert!(matches!(result, Err(_))); @@ -287,7 +304,12 @@ mod tests { .await; let _stream = client - .cat_stream(&make_path(), None, RetryPolicy::NonDeterministic) + .cat_stream( + &IpfsContext::test(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); } @@ -303,7 +325,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), usize::MAX, None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + usize::MAX, + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -323,7 +351,13 @@ mod tests { .await; let bytes = client - .cat(&make_path(), data.len(), None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + data.len(), + None, + RetryPolicy::None, + ) .await .unwrap(); @@ -343,7 +377,13 @@ mod tests { .await; client - .cat(&make_path(), data.len() - 1, None, RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + data.len() - 1, + None, + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -359,7 +399,13 @@ mod tests { .await; client - .cat(&make_path(), usize::MAX, Some(ms(300)), RetryPolicy::None) + .cat( + &IpfsContext::test(), + &make_path(), + usize::MAX, + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -383,6 +429,7 @@ mod tests { let bytes = client .cat( + &IpfsContext::test(), &make_path(), usize::MAX, None, @@ -405,7 +452,7 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::None) + .get_block(&IpfsContext::test(), &make_path(), None, RetryPolicy::None) .await .unwrap(); @@ -423,7 +470,12 @@ mod tests { .await; client - .get_block(&make_path(), Some(ms(300)), RetryPolicy::None) + .get_block( + &IpfsContext::test(), + &make_path(), + Some(ms(300)), + RetryPolicy::None, + ) .await .unwrap_err(); } @@ -446,7 +498,12 @@ mod tests { .await; let bytes = client - .get_block(&make_path(), None, RetryPolicy::NonDeterministic) + .get_block( + &IpfsContext::test(), + &make_path(), + None, + RetryPolicy::NonDeterministic, + ) .await .unwrap(); diff --git a/justfile b/justfile index ed12ad84356..32ae928faa3 100644 --- a/justfile +++ b/justfile @@ -14,6 +14,10 @@ lint: check *EXTRA_FLAGS: cargo check {{EXTRA_FLAGS}} +# Check all workspace members, all their targets and all their features +check-all: + cargo check --workspace --all-features --all-targets + # Build graph-node (cargo build --bin graph-node) build *EXTRA_FLAGS: cargo build --bin graph-node {{EXTRA_FLAGS}} diff --git a/node/src/launcher.rs b/node/src/launcher.rs index ba16b20b1b4..1776e0feba3 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -43,7 +43,7 @@ use tokio::sync::mpsc; git_testament!(TESTAMENT); /// Sets up metrics and monitoring -fn setup_metrics(logger: &Logger) -> (Arc, Arc) { +pub fn setup_metrics(logger: &Logger) -> (Arc, Arc) { // Set up Prometheus registry let prometheus_registry = Arc::new(Registry::new()); let metrics_registry = Arc::new(MetricsRegistry::new( @@ -359,6 +359,8 @@ pub async fn run( ipfs_service: IpfsService, link_resolver: Arc, dev_updates: Option>, + prometheus_registry: Arc, + metrics_registry: Arc, ) { // Log version information info!( @@ -397,9 +399,6 @@ pub async fn run( info!(logger, "Starting up"; "node_id" => &node_id); - // Set up metrics - let (prometheus_registry, metrics_registry) = setup_metrics(&logger); - // Optionally, identify the Elasticsearch logging configuration let elastic_config = opt .elasticsearch_url diff --git a/node/src/main.rs b/node/src/main.rs index aa140ac8f4e..795b28e05aa 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -36,7 +36,10 @@ async fn main_inner() { logger, "Runtime configured with {} max blocking threads", *MAX_BLOCKING_THREADS ); - let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + + let (prometheus_registry, metrics_registry) = launcher::setup_metrics(&logger); + + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &metrics_registry, &logger) .await .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); @@ -49,5 +52,15 @@ async fn main_inner() { let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); - launcher::run(logger, opt, env_vars, ipfs_service, link_resolver, None).await; + launcher::run( + logger, + opt, + env_vars, + ipfs_service, + link_resolver, + None, + prometheus_registry, + metrics_registry, + ) + .await; } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 003fac69ae9..38048c55ba3 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -58,7 +58,7 @@ pub async fn run( let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone()); // FIXME: Hard-coded IPFS config, take it from config file instead? - let ipfs_client = graph::ipfs::new_ipfs_client(&ipfs_url, &logger).await?; + let ipfs_client = graph::ipfs::new_ipfs_client(&ipfs_url, &metrics_registry, &logger).await?; let ipfs_service = ipfs_service( ipfs_client.cheap_clone(), diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 461d4a08256..b0ec8018db2 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -6,8 +6,7 @@ use graph::data::subgraph::*; use graph::data_source; use graph::data_source::common::MappingABI; use graph::env::EnvVars; -use graph::ipfs::IpfsRpcClient; -use graph::ipfs::ServerAddress; +use graph::ipfs::{IpfsMetrics, IpfsRpcClient, ServerAddress}; use graph::log; use graph::prelude::*; use graph_chain_ethereum::{Chain, DataSource, DataSourceTemplate, Mapping, TemplateSource}; @@ -65,7 +64,9 @@ fn mock_host_exports( Arc::new(templates.iter().map(|t| t.into()).collect()), ); - let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &LOGGER).unwrap(); + let client = + IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), IpfsMetrics::test(), &LOGGER) + .unwrap(); HostExports::new( subgraph_id, diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 12099c55b7e..f57c9d3b528 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -13,6 +13,7 @@ use web3::types::H160; use graph::blockchain::BlockTime; use graph::blockchain::Blockchain; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest}; use graph::components::subgraph::{ InstanceDSTemplate, PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -479,7 +480,10 @@ impl HostExports { // Does not consume gas because this is not a part of the deterministic feature set. // Ideally this would first consume gas for fetching the file stats, and then again // for the bytes of the file. - graph::block_on(self.link_resolver.cat(logger, &Link { link })) + graph::block_on(self.link_resolver.cat( + &LinkResolverContext::new(&self.subgraph_id, logger), + &Link { link }, + )) } pub(crate) fn ipfs_get_block( @@ -490,7 +494,10 @@ impl HostExports { // Does not consume gas because this is not a part of the deterministic feature set. // Ideally this would first consume gas for fetching the file stats, and then again // for the bytes of the file. - graph::block_on(self.link_resolver.get_block(logger, &Link { link })) + graph::block_on(self.link_resolver.get_block( + &LinkResolverContext::new(&self.subgraph_id, logger), + &Link { link }, + )) } // Read the IPFS file `link`, split it into JSON objects, and invoke the @@ -501,7 +508,7 @@ impl HostExports { // of the callback must be `callback(JSONValue, Value)`, and the `userData` // parameter is passed to the callback without any changes pub(crate) fn ipfs_map( - link_resolver: &Arc, + &self, wasm_ctx: &WasmInstanceData, link: String, callback: &str, @@ -533,8 +540,10 @@ impl HostExports { let logger = ctx.logger.new(o!("ipfs_map" => link.clone())); let result = { - let mut stream: JsonValueStream = - graph::block_on(link_resolver.json_stream(&logger, &Link { link }))?; + let mut stream: JsonValueStream = graph::block_on(self.link_resolver.json_stream( + &LinkResolverContext::new(&self.subgraph_id, &logger), + &Link { link }, + ))?; let mut v = Vec::new(); while let Some(sv) = graph::block_on(stream.next()) { let sv = sv?; diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 03cbf244c23..15f765e2030 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -609,14 +609,9 @@ impl WasmInstanceContext<'_> { // Pause the timeout while running ipfs_map, and resume it when done. self.suspend_timeout(); let start_time = Instant::now(); - let output_states = HostExports::ipfs_map( - &self.as_ref().ctx.host_exports.link_resolver.cheap_clone(), - self.as_ref(), - link.clone(), - &callback, - user_data, - flags, - )?; + let host_exports = self.as_ref().ctx.host_exports.cheap_clone(); + let output_states = + host_exports.ipfs_map(self.as_ref(), link.clone(), &callback, user_data, flags)?; self.start_timeout(); debug!( diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index f7737e80fad..f1b5b4ecab6 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -8,6 +8,7 @@ use web3::types::Address; use git_testament::{git_testament, CommitKind}; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; +use graph::components::link_resolver::LinkResolverContext; use graph::components::store::{BlockPtrForNumber, BlockStore, QueryPermit, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; @@ -495,7 +496,10 @@ impl IndexNodeResolver { let raw_yaml: serde_yaml::Mapping = { let file_bytes = self .link_resolver - .cat(&self.logger, &deployment_hash.to_ipfs_link()) + .cat( + &LinkResolverContext::new(deployment_hash, &self.logger), + &deployment_hash.to_ipfs_link(), + ) .await .map_err(SubgraphManifestResolveError::ResolveError)?; diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 084398502bb..b72f70dcd78 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -19,13 +19,13 @@ use graph::entity; use graph::env::ENV_VARS; use graph::prelude::web3::types::H256; use graph::prelude::{ - anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, Logger, + anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, SubgraphManifest, SubgraphManifestResolveError, SubgraphManifestValidationError, SubgraphStore, UnvalidatedSubgraphManifest, }; use graph::{ blockchain::NodeCapabilities as _, - components::link_resolver::{JsonValueStream, LinkResolver as LinkResolverTrait}, + components::link_resolver::{JsonValueStream, LinkResolver, LinkResolverContext}, data::subgraph::SubgraphFeature, }; @@ -82,36 +82,37 @@ impl TextResolver { } #[async_trait] -impl LinkResolverTrait for TextResolver { - fn with_timeout(&self, _timeout: Duration) -> Box { +impl LinkResolver for TextResolver { + fn with_timeout(&self, _timeout: Duration) -> Box { Box::new(self.clone()) } - fn with_retries(&self) -> Box { + fn with_retries(&self) -> Box { Box::new(self.clone()) } - fn for_manifest( - &self, - _manifest_path: &str, - ) -> Result, anyhow::Error> { + fn for_manifest(&self, _manifest_path: &str) -> Result, anyhow::Error> { Ok(Box::new(self.clone())) } - async fn cat(&self, _logger: &Logger, link: &Link) -> Result, anyhow::Error> { + async fn cat(&self, _ctx: &LinkResolverContext, link: &Link) -> Result, anyhow::Error> { self.texts .get(&link.link) .ok_or(anyhow!("No text for {}", &link.link)) .map(Clone::clone) } - async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, anyhow::Error> { + async fn get_block( + &self, + _ctx: &LinkResolverContext, + _link: &Link, + ) -> Result, anyhow::Error> { unimplemented!() } async fn json_stream( &self, - _logger: &Logger, + _ctx: &LinkResolverContext, _link: &Link, ) -> Result { unimplemented!() @@ -134,7 +135,7 @@ async fn try_resolve_manifest( resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); resolver.add(FILE_CID, &FILE); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(text)?; Ok(SubgraphManifest::resolve_from_raw(id, raw, &resolver, &LOGGER, max_spec_version).await?) @@ -156,7 +157,7 @@ async fn resolve_unvalidated(text: &str) -> UnvalidatedSubgraphManifest { resolver.add(id.as_str(), &text); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(text).unwrap(); UnvalidatedSubgraphManifest::resolve(id, raw, &resolver, &LOGGER, SPEC_VERSION_0_0_4.clone()) @@ -228,7 +229,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -271,7 +272,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -307,7 +308,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1305,7 +1306,7 @@ schema: resolver.add("/ipfs/Qmabi", &ABI); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA_FULLTEXT); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1357,7 +1358,7 @@ schema: resolver.add("/ipfs/Qmabi", &ABI); resolver.add("/ipfs/Qmschema", &GQL_SCHEMA_FULLTEXT); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1433,7 +1434,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1511,7 +1512,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1620,7 +1621,7 @@ dataSources: resolver.add("/ipfs/Qmschema", &GQL_SCHEMA); resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1658,7 +1659,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1693,7 +1694,7 @@ dataSources: resolver.add("/ipfs/QmSource", &SOURCE_SUBGRAPH_MANIFEST); resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(YAML).unwrap(); UnvalidatedSubgraphManifest::resolve( @@ -1728,7 +1729,7 @@ dataSources: entities: - User network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1787,7 +1788,7 @@ dataSources: entities: - User network: mainnet - source: + source: address: 'QmNestedSource' startBlock: 9562480 mapping: @@ -1841,7 +1842,7 @@ specVersion: 1.3.0 resolver.add("/ipfs/QmSource", &SOURCE_SUBGRAPH_MANIFEST); resolver.add("/ipfs/QmSourceSchema", &SOURCE_SUBGRAPH_SCHEMA); - let resolver: Arc = Arc::new(resolver); + let resolver: Arc = Arc::new(resolver); let raw = serde_yaml::from_str(yaml).unwrap(); test_store::run_test_sequentially(|_| async move { @@ -1880,7 +1881,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: @@ -1916,7 +1917,7 @@ dataSources: entities: - Gravatar network: mainnet - source: + source: address: 'QmSource' startBlock: 9562480 mapping: diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index e7de6a59460..c969edb2a78 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -19,7 +19,7 @@ use graph::blockchain::{ }; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ - ArweaveClient, ArweaveResolver, FileLinkResolver, FileSizeLimit, + ArweaveClient, ArweaveResolver, FileLinkResolver, FileSizeLimit, LinkResolverContext, }; use graph::components::metrics::MetricsRegistry; use graph::components::network_provider::ChainName; @@ -36,7 +36,7 @@ use graph::futures03::{Stream, StreamExt}; use graph::http_body_util::Full; use graph::hyper::body::Bytes; use graph::hyper::Request; -use graph::ipfs::IpfsClient; +use graph::ipfs::{IpfsClient, IpfsMetrics}; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::serde_json::{self, json}; use graph::prelude::{ @@ -268,7 +268,10 @@ impl TestContext { // Stolen from the IPFS provider, there's prolly a nicer way to re-use it let file_bytes = self .link_resolver - .cat(&logger, &deployment.hash.to_ipfs_link()) + .cat( + &LinkResolverContext::new(&deployment.hash, &logger), + &deployment.hash.to_ipfs_link(), + ) .await .unwrap(); @@ -510,6 +513,7 @@ pub async fn setup_inner( let ipfs_client: Arc = Arc::new( graph::ipfs::IpfsRpcClient::new_unchecked( graph::ipfs::ServerAddress::local_rpc_api(), + IpfsMetrics::new(&mock_registry), &logger, ) .unwrap(), diff --git a/tests/src/recipe.rs b/tests/src/recipe.rs index 6540e90ad4a..0fde590f546 100644 --- a/tests/src/recipe.rs +++ b/tests/src/recipe.rs @@ -2,8 +2,8 @@ use crate::{ fixture::{stores, Stores, TestInfo}, helpers::run_cmd, }; -use graph::ipfs; use graph::prelude::{DeploymentHash, SubgraphName}; +use graph::{ipfs, prelude::MetricsRegistry}; use std::process::Command; pub struct RunnerTestRecipe { pub stores: Stores, @@ -91,9 +91,13 @@ pub async fn build_subgraph_with_pnpm_cmd_and_arg( arg: Option<&str>, ) -> DeploymentHash { // Test that IPFS is up. - ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) - .await - .expect("Could not connect to IPFS, make sure it's running at port 5001"); + ipfs::IpfsRpcClient::new( + ipfs::ServerAddress::local_rpc_api(), + ipfs::IpfsMetrics::new(&MetricsRegistry::mock()), + &graph::log::discard(), + ) + .await + .expect("Could not connect to IPFS, make sure it's running at port 5001"); // Run codegen. run_cmd(Command::new("pnpm").arg("codegen").current_dir(dir));