diff --git a/Cargo.lock b/Cargo.lock index 2b7062607b6..76e0bc7fc58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,10 +182,10 @@ dependencies = [ "serde", "sync_wrapper", "tokio", - "tower", + "tower 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "tower-http", - "tower-layer", - "tower-service", + "tower-layer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -217,6 +217,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base64" version = "0.13.0" @@ -271,6 +277,28 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2b_simd" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72936ee4afc7f8f736d1c38383b56480b5497b4617b4a77bdbf1d2ababc76127" +dependencies = [ + "arrayref", + "arrayvec 0.7.2", + "constant_time_eq", +] + +[[package]] +name = "blake2s_simd" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db539cc2b5f6003621f1cd9ef92d7ded8ea5232c7de0f9faa2de251cd98730d4" +dependencies = [ + "arrayref", + "arrayvec 0.7.2", + "constant_time_eq", +] + [[package]] name = "blake3" version = "0.3.8" @@ -434,6 +462,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "cid" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc949bff6704880faf064c42a4854032ab07bfcf3a4fcb82a57470acededb69c" +dependencies = [ + "core2", + "multibase", + "multihash", + "serde", + "unsigned-varint", +] + [[package]] name = "clap" version = "2.34.0" @@ -557,6 +598,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpp_demangle" version = "0.3.3" @@ -845,6 +895,32 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + +[[package]] +name = "data-encoding-macro" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86927b7cd2fe88fa698b87404b287ab98d1a0063a34071d92e575b72d3029aca" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5bbed42daaa95e780b60a50546aa345b8413a1e46f9a40a12907d3598f038db" +dependencies = [ + "data-encoding", + "syn", +] + [[package]] name = "defer" version = "0.1.0" @@ -1631,6 +1707,7 @@ dependencies = [ "async-trait", "atomic_refcell", "bytes", + "cid", "futures 0.1.31", "futures 0.3.16", "graph", @@ -1650,6 +1727,8 @@ dependencies = [ "serde_json", "serde_yaml", "test-store", + "tower 0.4.12 (git+https://github.com/tower-rs/tower.git)", + "tower-test", "walkdir", ] @@ -2094,7 +2173,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower-service", + "tower-service 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", "want", ] @@ -2567,6 +2646,48 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0debeb9fcf88823ea64d64e4a815ab1643f33127d995978e099942ce38f25238" +[[package]] +name = "multibase" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + +[[package]] +name = "multihash" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3db354f401db558759dfc1e568d010a5d4146f4d3f637be1275ec4a3cf09689" +dependencies = [ + "blake2b_simd", + "blake2s_simd", + "blake3 1.3.1", + "core2", + "digest 0.10.3", + "multihash-derive", + "sha2 0.10.2", + "sha3", + "unsigned-varint", +] + +[[package]] +name = "multihash-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc076939022111618a5026d3be019fd8b366e76314538ff9a1b59ffbcbf98bcd" +dependencies = [ + "proc-macro-crate", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "multimap" version = "0.8.3" @@ -4377,6 +4498,19 @@ dependencies = [ "tokio-util 0.7.1", ] +[[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.14.0" @@ -4456,9 +4590,9 @@ dependencies = [ "tokio-rustls", "tokio-stream", "tokio-util 0.7.1", - "tower", - "tower-layer", - "tower-service", + "tower 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-layer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", "tracing-futures", ] @@ -4491,8 +4625,23 @@ dependencies = [ "slab", "tokio", "tokio-util 0.7.1", - "tower-layer", - "tower-service", + "tower-layer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.12" +source = "git+https://github.com/tower-rs/tower.git#ee826286fd1f994eabf14229e1e579ae29237386" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "tokio", + "tokio-util 0.7.1", + "tower-layer 0.3.1 (git+https://github.com/tower-rs/tower.git)", + "tower-service 0.3.1 (git+https://github.com/tower-rs/tower.git)", "tracing", ] @@ -4510,9 +4659,9 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite", - "tower", - "tower-layer", - "tower-service", + "tower 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-layer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4521,12 +4670,35 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "git+https://github.com/tower-rs/tower.git#ee826286fd1f994eabf14229e1e579ae29237386" + [[package]] name = "tower-service" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" +[[package]] +name = "tower-service" +version = "0.3.1" +source = "git+https://github.com/tower-rs/tower.git#ee826286fd1f994eabf14229e1e579ae29237386" + +[[package]] +name = "tower-test" +version = "0.4.0" +source = "git+https://github.com/tower-rs/tower.git#ee826286fd1f994eabf14229e1e579ae29237386" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-test", + "tower-layer 0.3.1 (git+https://github.com/tower-rs/tower.git)", + "tower-service 0.3.1 (git+https://github.com/tower-rs/tower.git)", +] + [[package]] name = "tracing" version = "0.1.26" @@ -4680,6 +4852,12 @@ dependencies = [ "void", ] +[[package]] +name = "unsigned-varint" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836" + [[package]] name = "untrusted" version = "0.7.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 3011291d506..d656fb0d433 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,10 +23,14 @@ semver = "1.0.12" serde = "1.0" serde_json = "1.0" serde_yaml = "0.8" - +# Switch to crates.io once tower 0.5 is released +tower = { git = "https://github.com/tower-rs/tower.git", features = ["util", "limit"] } graph-runtime-wasm = { path = "../runtime/wasm" } +cid = "0.8.3" +anyhow = "1.0" [dev-dependencies] +tower-test = { git = "https://github.com/tower-rs/tower.git" } graph-mock = { path = "../mock" } walkdir = "2.3.2" test-store = { path = "../store/test-store" } diff --git a/core/src/lib.rs b/core/src/lib.rs index 796a4d5f94e..08ac3fc3345 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,3 +1,5 @@ +pub mod polling_monitor; + mod link_resolver; mod metrics; mod subgraph; diff --git a/core/src/link_resolver.rs b/core/src/link_resolver.rs index 65d311c005b..b9d9ed5b8fa 100644 --- a/core/src/link_resolver.rs +++ b/core/src/link_resolver.rs @@ -192,7 +192,7 @@ impl LinkResolverTrait for LinkResolver { .run(move || { let path = req_path.clone(); let client = client.clone(); - async move { Ok(client.cat_all(path.clone(), timeout).await?.to_vec()) } + async move { Ok(client.cat_all(&path, timeout).await?.to_vec()) } }) .await?; @@ -262,7 +262,7 @@ impl LinkResolverTrait for LinkResolver { let max_file_size = Some(self.env_vars.mappings.max_ipfs_map_file_size as u64); restrict_file_size(path, size, &max_file_size)?; - let mut stream = client.cat(path.to_string()).await?.fuse().boxed().compat(); + let mut stream = client.cat(path, None).await?.fuse().boxed().compat(); let mut buf = BytesMut::with_capacity(1024); diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs new file mode 100644 index 00000000000..5c03e7a0325 --- /dev/null +++ b/core/src/polling_monitor/ipfs_service.rs @@ -0,0 +1,105 @@ +use anyhow::{anyhow, Error}; +use bytes::Bytes; +use cid::Cid; +use futures::{Future, FutureExt}; +use graph::{ + cheap_clone::CheapClone, + ipfs_client::{IpfsClient, StatApi}, + tokio::sync::Semaphore, +}; +use std::{pin::Pin, sync::Arc, task::Poll, time::Duration}; +use tower::Service; + +const CLOUDFLARE_TIMEOUT: u16 = 524; +const GATEWAY_TIMEOUT: u16 = 504; + +#[derive(Clone)] +pub struct IpfsService { + client: IpfsClient, + max_file_size: u64, + timeout: Duration, + concurrency_limiter: Arc, +} + +impl CheapClone for IpfsService { + fn cheap_clone(&self) -> Self { + Self { + client: self.client.cheap_clone(), + max_file_size: self.max_file_size, + timeout: self.timeout, + concurrency_limiter: self.concurrency_limiter.cheap_clone(), + } + } +} + +impl IpfsService { + #[allow(dead_code)] + pub fn new( + client: IpfsClient, + max_file_size: u64, + timeout: Duration, + concurrency_limit: u16, + ) -> Self { + Self { + client, + max_file_size, + timeout, + concurrency_limiter: Arc::new(Semaphore::new(concurrency_limit as usize)), + } + } + + async fn call(&self, cid: Cid) -> Result, Error> { + let cid_str = cid.to_string(); + let size = match self + .client + .stat_size(StatApi::Files, cid_str, self.timeout) + .await + { + Ok(size) => size, + Err(e) => match e.status().map(|e| e.as_u16()) { + Some(GATEWAY_TIMEOUT) | Some(CLOUDFLARE_TIMEOUT) => return Ok(None), + _ if e.is_timeout() => return Ok(None), + _ => return Err(e.into()), + }, + }; + + if size > self.max_file_size { + return Err(anyhow!( + "IPFS file {} is too large. It can be at most {} bytes but is {} bytes", + cid.to_string(), + self.max_file_size, + size + )); + } + + Ok(self + .client + .cat_all(&cid.to_string(), self.timeout) + .await + .map(Some)?) + } +} + +impl Service for IpfsService { + type Response = (Cid, Option); + type Error = (Cid, Error); + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + // The permit is acquired and immediately dropped, as tower does not yet allow returning it. + // So this is only indicative of capacity being available. + Pin::new(&mut self.concurrency_limiter.acquire().boxed()) + .poll(cx) + .map_ok(|_| ()) + .map_err(|_| unreachable!("semaphore is never closed")) + } + + fn call(&mut self, cid: Cid) -> Self::Future { + let this = self.cheap_clone(); + async move { + let _permit = this.concurrency_limiter.acquire().await; + this.call(cid).await.map(|x| (cid, x)).map_err(|e| (cid, e)) + } + .boxed() + } +} diff --git a/core/src/polling_monitor/metrics.rs b/core/src/polling_monitor/metrics.rs new file mode 100644 index 00000000000..c7cfa89bc90 --- /dev/null +++ b/core/src/polling_monitor/metrics.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use graph::{ + prelude::{DeploymentHash, MetricsRegistry}, + prometheus::{Counter, Gauge}, +}; + +pub struct PollingMonitorMetrics { + pub requests: Counter, + pub errors: Counter, + pub not_found: Counter, + pub queue_depth: Gauge, +} + +impl PollingMonitorMetrics { + pub fn new(registry: Arc, subgraph_hash: &DeploymentHash) -> Self { + let requests = registry + .new_deployment_counter( + "polling_monitor_requests", + "counts the total requests made to the service being polled", + subgraph_hash.as_str(), + ) + .unwrap(); + let not_found = registry + .new_deployment_counter( + "polling_monitor_not_found", + "counts 'not found' responses returned from the service being polled", + subgraph_hash.as_str(), + ) + .unwrap(); + let errors = registry + .new_deployment_counter( + "polling_monitor_errors", + "counts errors returned from the service being polled", + subgraph_hash.as_str(), + ) + .unwrap(); + let queue_depth = registry + .new_deployment_gauge( + "polling_monitor_queue_depth", + "size of the queue of polling requests", + subgraph_hash.as_str(), + ) + .unwrap(); + Self { + requests, + errors, + not_found, + queue_depth: queue_depth.into(), + } + } + + #[cfg(test)] + pub(crate) fn mock() -> Self { + Self { + requests: Counter::new("x", " ").unwrap(), + errors: Counter::new("y", " ").unwrap(), + not_found: Counter::new("z", " ").unwrap(), + queue_depth: Gauge::new("w", " ").unwrap(), + } + } +} diff --git a/core/src/polling_monitor/mod.rs b/core/src/polling_monitor/mod.rs new file mode 100644 index 00000000000..1ffe87c9482 --- /dev/null +++ b/core/src/polling_monitor/mod.rs @@ -0,0 +1,263 @@ +pub mod ipfs_service; +mod metrics; + +use std::fmt::Display; +use std::sync::Arc; + +use futures::stream; +use futures::stream::StreamExt; +use graph::cheap_clone::CheapClone; +use graph::parking_lot::Mutex; +use graph::prelude::tokio; +use graph::slog::{debug, Logger}; +use graph::util::monitored::MonitoredVecDeque as VecDeque; +use tokio::sync::{mpsc, watch}; +use tower::{Service, ServiceExt}; + +use self::metrics::PollingMonitorMetrics; + +/// Spawn a monitor that actively polls a service. Whenever the service has capacity, the monitor +/// pulls object ids from the queue and polls the service. If the object is not present or in case +/// of error, the object id is pushed to the back of the queue to be polled again. +/// +/// The service returns the request ID along with errors or responses. The response is an +/// `Option`, to represent the object not being found. +pub fn spawn_monitor( + service: S, + response_sender: mpsc::Sender<(ID, Response)>, + logger: Logger, + metrics: PollingMonitorMetrics, +) -> PollingMonitor +where + ID: Display + Send + 'static, + S: Service), Error = (ID, E)> + Send + 'static, + E: Display + Send + 'static, + S::Future: Send, +{ + let queue = Arc::new(Mutex::new(VecDeque::new( + metrics.queue_depth.clone(), + metrics.requests.clone(), + ))); + let (wake_up_queue, queue_woken) = watch::channel(()); + + let queue_to_stream = { + let queue = queue.cheap_clone(); + stream::unfold((), move |()| { + let queue = queue.cheap_clone(); + let mut queue_woken = queue_woken.clone(); + async move { + loop { + let id = queue.lock().pop_front(); + match id { + Some(id) => break Some((id, ())), + None => match queue_woken.changed().await { + // Queue woken, check it. + Ok(()) => {} + + // The `PollingMonitor` has been dropped, cancel this task. + Err(_) => break None, + }, + }; + } + } + }) + }; + + { + let queue = queue.cheap_clone(); + graph::spawn(async move { + let mut responses = service.call_all(queue_to_stream).unordered().boxed(); + while let Some(response) = responses.next().await { + match response { + Ok((id, Some(response))) => { + let send_result = response_sender.send((id, response)).await; + if send_result.is_err() { + // The receiver has been dropped, cancel this task. + break; + } + } + + // Object not found, push the id to the back of the queue. + Ok((id, None)) => { + metrics.not_found.inc(); + queue.lock().push_back(id); + } + + // Error polling, log it and push the id to the back of the queue. + Err((id, e)) => { + debug!(logger, "error polling"; + "error" => format!("{:#}", e), + "object_id" => id.to_string()); + metrics.errors.inc(); + queue.lock().push_back(id); + } + } + } + }); + } + + PollingMonitor { + queue, + wake_up_queue, + } +} + +/// Handle for adding objects to be monitored. +pub struct PollingMonitor { + queue: Arc>>, + + // This serves two purposes, to wake up the monitor when an item arrives on an empty queue, and + // to stop the montior task when this handle is dropped. + wake_up_queue: watch::Sender<()>, +} + +impl PollingMonitor { + /// Add an object id to the polling queue. New requests have priority and are pushed to the + /// front of the queue. + pub fn monitor(&self, id: ID) { + let mut queue = self.queue.lock(); + if queue.is_empty() { + // If the send fails, the response receiver has been dropped, so this handle is useless. + let _ = self.wake_up_queue.send(()); + } + queue.push_front(id); + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use futures::{Future, FutureExt, TryFutureExt}; + use graph::log; + use std::{pin::Pin, task::Poll}; + use tower_test::mock; + + use super::*; + + struct MockService(mock::Mock<&'static str, Option<&'static str>>); + + impl Service<&'static str> for MockService { + type Response = (&'static str, Option<&'static str>); + + type Error = (&'static str, anyhow::Error); + + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(|_| unreachable!()) + } + + fn call(&mut self, req: &'static str) -> Self::Future { + self.0 + .call(req) + .map_ok(move |x| (req, x)) + .map_err(move |e| (req, anyhow!(e.to_string()))) + .boxed() + } + } + + async fn send_response(handle: &mut mock::Handle, res: U) { + handle.next_request().await.unwrap().1.send_response(res) + } + + #[tokio::test] + async fn polling_monitor_simple() { + let (svc, mut handle) = mock::pair(); + let (tx, mut rx) = mpsc::channel(10); + let monitor = spawn_monitor( + MockService(svc), + tx, + log::discard(), + PollingMonitorMetrics::mock(), + ); + + // Basic test, single file is immediately available. + monitor.monitor("req-0"); + send_response(&mut handle, Some("res-0")).await; + assert_eq!(rx.recv().await, Some(("req-0", "res-0"))); + } + + #[tokio::test] + async fn polling_monitor_unordered() { + let (svc, mut handle) = mock::pair(); + let (tx, mut rx) = mpsc::channel(10); + let monitor = spawn_monitor( + MockService(svc), + tx, + log::discard(), + PollingMonitorMetrics::mock(), + ); + + // Test unorderedness of the response stream, and the LIFO semantics of `monitor`. + // + // `req-1` has priority since it is the last request, but `req-0` is responded first. + monitor.monitor("req-0"); + monitor.monitor("req-1"); + let req_1 = handle.next_request().await.unwrap().1; + let req_0 = handle.next_request().await.unwrap().1; + req_0.send_response(Some("res-0")); + assert_eq!(rx.recv().await, Some(("req-0", "res-0"))); + req_1.send_response(Some("res-1")); + assert_eq!(rx.recv().await, Some(("req-1", "res-1"))); + } + + #[tokio::test] + async fn polling_monitor_failed_push_to_back() { + let (svc, mut handle) = mock::pair(); + let (tx, mut rx) = mpsc::channel(10); + + // Limit service to one request at a time. + let svc = tower::limit::ConcurrencyLimit::new(MockService(svc), 1); + let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock()); + + // Test that objects not found go on the back of the queue. + monitor.monitor("req-0"); + monitor.monitor("req-1"); + send_response(&mut handle, None).await; + send_response(&mut handle, Some("res-0")).await; + assert_eq!(rx.recv().await, Some(("req-0", "res-0"))); + send_response(&mut handle, Some("res-1")).await; + assert_eq!(rx.recv().await, Some(("req-1", "res-1"))); + + // Test that failed requests go on the back of the queue. + monitor.monitor("req-0"); + monitor.monitor("req-1"); + let req = handle.next_request().await.unwrap().1; + req.send_error(anyhow!("e")); + send_response(&mut handle, Some("res-0")).await; + assert_eq!(rx.recv().await, Some(("req-0", "res-0"))); + send_response(&mut handle, Some("res-1")).await; + assert_eq!(rx.recv().await, Some(("req-1", "res-1"))); + } + + #[tokio::test] + async fn polling_monitor_cancelation() { + let (svc, _handle) = mock::pair(); + let (tx, mut rx) = mpsc::channel(10); + let monitor = spawn_monitor( + MockService(svc), + tx, + log::discard(), + PollingMonitorMetrics::mock(), + ); + + // Cancelation on monitor drop. + drop(monitor); + assert_eq!(rx.recv().await, None); + + let (svc, mut handle) = mock::pair(); + let (tx, rx) = mpsc::channel(10); + let monitor = spawn_monitor( + MockService(svc), + tx, + log::discard(), + PollingMonitorMetrics::mock(), + ); + + // Cancelation on receiver drop. + monitor.monitor("req-0"); + drop(rx); + send_response(&mut handle, Some("res-0")).await; + assert!(handle.next_request().await.is_none()); + } +} diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 849336f5b37..b4a4ba54767 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -309,7 +309,7 @@ where pub struct BlockStreamMetrics { pub deployment_head: Box, pub deployment_failed: Box, - pub reverted_blocks: Box, + pub reverted_blocks: Gauge, pub stopwatch: StopwatchMetrics, } diff --git a/graph/src/components/metrics/aggregate.rs b/graph/src/components/metrics/aggregate.rs index cad400d4539..a8f0822e82e 100644 --- a/graph/src/components/metrics/aggregate.rs +++ b/graph/src/components/metrics/aggregate.rs @@ -4,16 +4,16 @@ use crate::prelude::*; pub struct Aggregate { /// Number of values. - count: Box, + count: Gauge, /// Sum over all values. - sum: Box, + sum: Gauge, /// Moving average over the values. - avg: Box, + avg: Gauge, /// Latest value. - cur: Box, + cur: Gauge, } impl Aggregate { diff --git a/graph/src/components/metrics/mod.rs b/graph/src/components/metrics/mod.rs index 7e473c9c41f..1a50bcb2962 100644 --- a/graph/src/components/metrics/mod.rs +++ b/graph/src/components/metrics/mod.rs @@ -102,10 +102,10 @@ pub trait MetricsRegistry: Send + Sync + 'static { name: &str, help: &str, subgraph: &str, - ) -> Result, PrometheusError> { + ) -> Result { let opts = Opts::new(name, help).const_labels(deployment_labels(subgraph)); - let gauge = Box::new(Gauge::with_opts(opts)?); - self.register(name, gauge.clone()); + let gauge = Gauge::with_opts(opts)?; + self.register(name, Box::new(gauge.clone())); Ok(gauge) } @@ -171,13 +171,9 @@ pub trait MetricsRegistry: Send + Sync + 'static { name: &str, help: &str, subgraph: &str, - ) -> Result, PrometheusError> { - let counter = Box::new(counter_with_labels( - name, - help, - deployment_labels(subgraph), - )?); - self.register(name, counter.clone()); + ) -> Result { + let counter = counter_with_labels(name, help, deployment_labels(subgraph))?; + self.register(name, Box::new(counter.clone())); Ok(counter) } diff --git a/graph/src/ipfs_client.rs b/graph/src/ipfs_client.rs index 0f6ac4befdf..c99d83b1cb7 100644 --- a/graph/src/ipfs_client.rs +++ b/graph/src/ipfs_client.rs @@ -44,6 +44,7 @@ pub struct AddResponse { pub size: String, } +/// Reference type, clones will share the connection pool. #[derive(Clone)] pub struct IpfsClient { base: Arc, @@ -86,7 +87,7 @@ impl IpfsClient { // files/stat requires a leading `/ipfs/`. cid = format!("/ipfs/{}", cid); } - let url = self.url(&route, cid); + let url = self.url(&route, &cid); let res = self.call(url, None, Some(timeout)).await?; match api { StatApi::Files => Ok(res.json::().await?.cumulative_size), @@ -95,7 +96,7 @@ impl IpfsClient { } /// Download the entire contents. - pub async fn cat_all(&self, cid: String, timeout: Duration) -> Result { + pub async fn cat_all(&self, cid: &str, timeout: Duration) -> Result { self.call(self.url("cat", cid), None, Some(timeout)) .await? .bytes() @@ -104,10 +105,11 @@ impl IpfsClient { pub async fn cat( &self, - cid: String, + cid: &str, + timeout: Option, ) -> Result>, reqwest::Error> { Ok(self - .call(self.url("cat", cid), None, None) + .call(self.url("cat", cid), None, timeout) .await? .bytes_stream()) } @@ -135,7 +137,7 @@ impl IpfsClient { .await } - fn url(&self, route: &str, arg: String) -> String { + fn url(&self, route: &str, arg: &str) -> String { // URL security: We control the base and the route, user-supplied input goes only into the // query parameters. format!("{}api/v0/{}?arg={}", self.base, route, arg) diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 84847e52110..b4435dfc9c5 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -43,6 +43,7 @@ pub use prometheus; pub use semver; pub use slog; pub use stable_hash_legacy; +pub use tokio; pub use tokio_stream; pub use url; diff --git a/graph/src/log/mod.rs b/graph/src/log/mod.rs index cf93c54e073..6b2841332ce 100644 --- a/graph/src/log/mod.rs +++ b/graph/src/log/mod.rs @@ -53,6 +53,10 @@ pub fn logger(show_debug: bool) -> Logger { Logger::root(drain, o!()) } +pub fn discard() -> Logger { + Logger::root(slog::Discard, o!()) +} + pub struct CustomFormat where D: Decorator, diff --git a/graph/src/util/mod.rs b/graph/src/util/mod.rs index a7bc8ccb034..8af2540f401 100644 --- a/graph/src/util/mod.rs +++ b/graph/src/util/mod.rs @@ -26,3 +26,6 @@ pub mod bounded_queue; pub mod stable_hash_glue; pub mod mem; + +/// Data structures instrumented with Prometheus metrics. +pub mod monitored; diff --git a/graph/src/util/monitored.rs b/graph/src/util/monitored.rs new file mode 100644 index 00000000000..c0f7147b416 --- /dev/null +++ b/graph/src/util/monitored.rs @@ -0,0 +1,39 @@ +use prometheus::{Counter, Gauge}; +use std::collections::VecDeque; + +pub struct MonitoredVecDeque { + vec_deque: VecDeque, + depth: Gauge, + popped: Counter, +} + +impl MonitoredVecDeque { + pub fn new(depth: Gauge, popped: Counter) -> Self { + Self { + vec_deque: VecDeque::new(), + depth, + popped, + } + } + + pub fn push_back(&mut self, item: T) { + self.vec_deque.push_back(item); + self.depth.set(self.vec_deque.len() as f64); + } + + pub fn push_front(&mut self, item: T) { + self.vec_deque.push_front(item); + self.depth.set(self.vec_deque.len() as f64); + } + + pub fn pop_front(&mut self) -> Option { + let item = self.vec_deque.pop_front(); + self.depth.set(self.vec_deque.len() as f64); + self.popped.inc(); + item + } + + pub fn is_empty(&self) -> bool { + self.vec_deque.is_empty() + } +}