From 5a37e32804b8dcf9ae42165bd60977faeb082ed4 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Fri, 7 Jun 2024 19:39:31 +0530 Subject: [PATCH 01/16] feat(transport): port foundry retry layer --- crates/transport/src/layers/mod.rs | 6 + crates/transport/src/layers/retry.rs | 340 +++++++++++++++++++++++++++ crates/transport/src/lib.rs | 2 + 3 files changed, 348 insertions(+) create mode 100644 crates/transport/src/layers/mod.rs create mode 100644 crates/transport/src/layers/retry.rs diff --git a/crates/transport/src/layers/mod.rs b/crates/transport/src/layers/mod.rs new file mode 100644 index 00000000000..49b47fc85a6 --- /dev/null +++ b/crates/transport/src/layers/mod.rs @@ -0,0 +1,6 @@ +//! Module for housing transport layers. + +mod retry; + +/// RetryBackoffLayer +pub use retry::RetryBackoffLayer; diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs new file mode 100644 index 00000000000..94d9ffe824a --- /dev/null +++ b/crates/transport/src/layers/retry.rs @@ -0,0 +1,340 @@ +use crate::{ + error::{TransportError, TransportErrorKind}, + TransportFut, +}; +use alloy_json_rpc::{ErrorPayload, RequestPacket, ResponsePacket}; +use serde::Deserialize; +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; +use tower::{Layer, Service}; + +/// A Transport Layer that is responsible for retrying requests based on the +/// error type. See [`TransportError`]. +/// +/// TransportError: crate::error::TransportError +#[derive(Debug, Clone)] +pub struct RetryBackoffLayer { + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this provider + compute_units_per_second: u64, +} + +impl RetryBackoffLayer { + /// Creates a new retry layer with the given parameters. + pub fn new( + max_rate_limit_retries: u32, + max_timeout_retries: u32, + initial_backoff: u64, + compute_units_per_second: u64, + ) -> Self { + Self { + max_rate_limit_retries, + max_timeout_retries, + initial_backoff, + compute_units_per_second, + } + } +} + +#[derive(Debug, Clone)] +pub struct RateLimitRetryPolicy; + +/// [RetryPolicy] defines logic for which [TransportError] instances should +/// the client retry the request and try to recover from. +pub trait RetryPolicy: Send + Sync + std::fmt::Debug { + /// Whether to retry the request based on the given `error` + fn should_retry(&self, error: &TransportError) -> bool; + + /// Providers may include the `backoff` in the error response directly + fn backoff_hint(&self, error: &TransportError) -> Option; +} + +impl RetryPolicy for RateLimitRetryPolicy { + fn should_retry(&self, error: &TransportError) -> bool { + match error { + // There was a transport-level error. This is either a non-retryable error, + // or a server error that should be retried. + TransportError::Transport(err) => should_retry_transport_level_error(err), + // The transport could not serialize the error itself. The request was malformed from + // the start. + TransportError::SerError(_) => false, + TransportError::DeserError { text, .. } => { + if let Ok(resp) = serde_json::from_str::(text) { + return should_retry_json_rpc_error(&resp); + } + + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(Deserialize)] + struct Resp { + error: ErrorPayload, + } + + if let Ok(resp) = serde_json::from_str::(text) { + return should_retry_json_rpc_error(&resp.error); + } + + false + } + TransportError::ErrorResp(err) => should_retry_json_rpc_error(err), + TransportError::NullResp => true, + TransportError::UnsupportedFeature(_) => false, + TransportError::LocalUsageError(_) => false, + } + } + + /// Provides a backoff hint if the error response contains it + fn backoff_hint(&self, error: &TransportError) -> Option { + if let TransportError::ErrorResp(resp) = error { + let data = resp.try_data_as::(); + if let Some(Ok(data)) = data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let backoff_seconds = &data["rate"]["backoff_seconds"]; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(std::time::Duration::from_secs(seconds)); + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(std::time::Duration::from_secs(seconds as u64 + 1)); + } + } + } + None + } +} + +impl Layer for RetryBackoffLayer { + type Service = RetryBackoffService; + + fn layer(&self, inner: S) -> Self::Service { + RetryBackoffService { + inner, + policy: RateLimitRetryPolicy, + max_rate_limit_retries: self.max_rate_limit_retries, + max_timeout_retries: self.max_timeout_retries, + initial_backoff: self.initial_backoff, + compute_units_per_second: self.compute_units_per_second, + requests_enqueued: Arc::new(AtomicU32::new(0)), + } + } +} + +/// An Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based +/// on the error type. See [TransportError] and [RateLimitRetryPolicy]. +#[derive(Debug, Clone)] +pub struct RetryBackoffService { + /// The inner service + inner: S, + /// The retry policy + policy: RateLimitRetryPolicy, + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this service + compute_units_per_second: u64, + /// The number of requests currently enqueued + requests_enqueued: Arc, +} + +// impl tower service +impl Service for RetryBackoffService +where + S: Service + + Send + + 'static + + Clone, + S::Future: Send + 'static, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Our middleware doesn't care about backpressure, so it's ready as long + // as the inner service is ready. + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let inner = self.inner.clone(); + let this = self.clone(); + let policy = self.policy.clone(); + let mut inner = std::mem::replace(&mut self.inner, inner); + Box::pin(async move { + let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; + let mut rate_limit_retry_number: u32 = 0; + let mut timeout_retries: u32 = 0; + loop { + let err; + let fut = inner.call(request.clone()).await; + + match fut { + Ok(res) => { + if let Some(e) = res.as_error() { + err = TransportError::ErrorResp(e.clone()) + } else { + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Ok(res); + } + } + Err(e) => err = e, + } + + let should_retry = this.policy.should_retry(&err); + if should_retry { + rate_limit_retry_number += 1; + if rate_limit_retry_number > this.max_rate_limit_retries { + return Err(TransportErrorKind::custom_str("Max retries exceeded")); + } + // tracing!("retrying request due to {:?}", err); + + let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; + + // try to extract the requested backoff from the error or compute the next + // backoff based on retry count + let backoff_hint = this.policy.backoff_hint(&err); + let next_backoff = backoff_hint + .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); + + // requests are usually weighted and can vary from 10 CU to several 100 CU, + // cheaper requests are more common some example alchemy + // weights: + // - `eth_getStorageAt`: 17 + // - `eth_getBlockByNumber`: 16 + // - `eth_newFilter`: 20 + // + // (coming from forking mode) assuming here that storage request will be the + // driver for Rate limits we choose `17` as the average cost + // of any request + const AVG_COST: u64 = 17u64; + let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs( + AVG_COST, + this.compute_units_per_second, + current_queued_reqs, + ahead_in_queue, + ); + let total_backoff = next_backoff + + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); + + // trace!(?total_backoff, budget_backoff = ?seconds_to_wait_for_compute_budget, + // default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate + // limit"); + + tokio::time::sleep(total_backoff).await; + } else { + if timeout_retries < this.max_timeout_retries { + timeout_retries += 1; + continue; + } + + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Err(err); + } + } + }) + } +} + +/// Calculates an offset in seconds by taking into account the number of currently queued requests, +/// number of requests that were ahead in the queue when the request was first issued, the average +/// cost a weighted request (heuristic), and the number of available compute units per seconds. +/// +/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request +/// is supposed to wait to not get rate limited. The budget per second is +/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) +/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. +/// By taking into account the number of concurrent request and the position in queue when the +/// request was first issued and determine the number of seconds a request is supposed to wait, if +/// at all +fn compute_unit_offset_in_secs( + avg_cost: u64, + compute_units_per_second: u64, + current_queued_requests: u64, + ahead_in_queue: u64, +) -> u64 { + let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); + if current_queued_requests > request_capacity_per_second { + current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) + } else { + 0 + } +} + +/// Analyzes the [TransportErrorKind] and decides if the request should be retried based on the +/// variant. +fn should_retry_transport_level_error(error: &TransportErrorKind) -> bool { + match error { + // Missing batch response errors can be retried. + TransportErrorKind::MissingBatchResponse(_) => true, + TransportErrorKind::Custom(err) => { + // currently http error responses are not standard in alloy + let msg = err.to_string(); + msg.contains("429 Too Many Requests") + } + + // If the backend is gone, or there's a completely custom error, we should assume it's not + // retryable. + _ => false, + } +} + +/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the +/// error code or the message. +fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { + let ErrorPayload { code, message, .. } = error; + // alchemy throws it this way + if *code == 429 { + return true; + } + + // This is an infura error code for `exceeded project rate limit` + if *code == -32005 { + return true; + } + + // alternative alchemy error for specific IPs + if *code == -32016 && message.contains("rate limit") { + return true; + } + + // quick node error `"credits limited to 6000/sec"` + // + if *code == -32012 && message.contains("credits") { + return true; + } + + // quick node rate limit error: `100/second request limit reached - reduce calls per second or + // upgrade your account at quicknode.com` + if *code == -32007 && message.contains("request limit reached") { + return true; + } + + match message.as_str() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + msg => { + msg.contains("rate limit") + || msg.contains("rate exceeded") + || msg.contains("too many requests") + || msg.contains("credits limited") + || msg.contains("request limit") + } + } +} diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 8506fa61a2d..c253944fbe5 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -26,6 +26,8 @@ pub use r#trait::Transport; pub use alloy_json_rpc::{RpcError, RpcResult}; pub use futures_utils_wasm::{impl_future, BoxFuture}; +mod layers; + /// Misc. utilities for building transports. pub mod utils; From 44c6a78ddd6bf0804ac3eeb79b9aacabf0386ed6 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:11:48 +0530 Subject: [PATCH 02/16] feat(transport): HTTPError struct in TransportErrorKind --- crates/transport-http/src/hyper_transport.rs | 8 +-- .../transport-http/src/reqwest_transport.rs | 8 +-- crates/transport/Cargo.toml | 1 + crates/transport/src/error.rs | 71 ++++++++++++++++++- crates/transport/src/layers/mod.rs | 2 +- crates/transport/src/layers/retry.rs | 62 +++------------- crates/transport/src/lib.rs | 3 +- 7 files changed, 92 insertions(+), 63 deletions(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index d052c098c21..10168dca641 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -87,10 +87,10 @@ where trace!(body = %String::from_utf8_lossy(&body), "response body"); if status != hyper::StatusCode::OK { - return Err(TransportErrorKind::custom_str(&format!( - "HTTP error {status} with body: {}", - String::from_utf8_lossy(&body) - ))); + return Err(TransportErrorKind::http_error( + status.as_u16() as i64, + String::from_utf8_lossy(&body).to_string(), + )); } // Deser a Box from the body. If deser fails, return diff --git a/crates/transport-http/src/reqwest_transport.rs b/crates/transport-http/src/reqwest_transport.rs index 786d1d0b617..366101b026e 100644 --- a/crates/transport-http/src/reqwest_transport.rs +++ b/crates/transport-http/src/reqwest_transport.rs @@ -63,10 +63,10 @@ impl Http { trace!(body = %String::from_utf8_lossy(&body), "response body"); if status != reqwest::StatusCode::OK { - return Err(TransportErrorKind::custom_str(&format!( - "HTTP error {status} with body: {}", - String::from_utf8_lossy(&body) - ))); + return Err(TransportErrorKind::http_error( + status.as_u16() as i64, + String::from_utf8_lossy(&body).to_string(), + )); } // Deser a Box from the body. If deser fails, return diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 02b0d715bb1..6511e8eaf5e 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -25,6 +25,7 @@ serde.workspace = true thiserror.workspace = true tower.workspace = true url.workspace = true +tracing = { workspace = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 9ed5f982edc..1e1c549039d 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,4 +1,4 @@ -use alloy_json_rpc::{Id, RpcError, RpcResult}; +use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult}; use serde_json::value::RawValue; use std::{error::Error as StdError, fmt::Debug}; use thiserror::Error; @@ -31,6 +31,10 @@ pub enum TransportErrorKind { #[error("subscriptions are not available on this provider")] PubsubUnavailable, + /// HTTP Error with code and body + #[error("{0}")] + HttpError(#[from] HTTPError), + /// Custom error. #[error("{0}")] Custom(#[source] Box), @@ -67,4 +71,69 @@ impl TransportErrorKind { pub const fn pubsub_unavailable() -> TransportError { RpcError::Transport(Self::PubsubUnavailable) } + + /// Instantiate a new `TrasnportError::HttpError`. + pub const fn http_error(status: i64, body: String) -> TransportError { + RpcError::Transport(Self::HttpError(HTTPError { status, body })) + } +} + +/// Type for holding HTTP errors such as 429 or -32005 by the RPC provider. +#[derive(Debug, thiserror::Error)] +#[error("HTTP error {status} with body: {body}")] +pub struct HTTPError { + pub status: i64, + pub body: String, +} + +impl HTTPError { + /// Analyzes the `status` and `body` to determine whether the request should be retried. + pub fn is_retry_err(&self) -> bool { + // alchemy throws it this way + if self.status == 429 { + return true; + } + + // This is an infura error code for `exceeded project rate limit` + if self.status == -32005 { + return true; + } + + // alternative alchemy error for specific IPs + if self.status == -32016 && self.body.contains("rate limit") { + return true; + } + + // quick node error `"credits limited to 6000/sec"` + // + if self.status == -32012 && self.body.contains("credits") { + return true; + } + + // quick node rate limit error: `100/second request limit reached - reduce calls per second + // or upgrade your account at quicknode.com` + if self.status == -32007 && self.body.contains("request limit reached") { + return true; + } + + match self.body.as_str() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + msg => { + msg.contains("rate limit") + || msg.contains("rate exceeded") + || msg.contains("too many requests") + || msg.contains("credits limited") + || msg.contains("request limit") + } + } + } +} + +impl From<&ErrorPayload> for HTTPError { + fn from(value: &ErrorPayload) -> Self { + Self { status: value.code, body: value.message.clone() } + } } diff --git a/crates/transport/src/layers/mod.rs b/crates/transport/src/layers/mod.rs index 49b47fc85a6..c5d471715a2 100644 --- a/crates/transport/src/layers/mod.rs +++ b/crates/transport/src/layers/mod.rs @@ -3,4 +3,4 @@ mod retry; /// RetryBackoffLayer -pub use retry::RetryBackoffLayer; +pub use retry::{RetryBackoffLayer, RetryBackoffService, RetryPolicy}; diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 94d9ffe824a..3072ac57228 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -1,5 +1,5 @@ use crate::{ - error::{TransportError, TransportErrorKind}, + error::{HTTPError, TransportError, TransportErrorKind}, TransportFut, }; use alloy_json_rpc::{ErrorPayload, RequestPacket, ResponsePacket}; @@ -10,9 +10,9 @@ use std::{ Arc, }, task::{Context, Poll}, - time::Duration, }; use tower::{Layer, Service}; +use tracing::trace; /// A Transport Layer that is responsible for retrying requests based on the /// error type. See [`TransportError`]. @@ -48,7 +48,7 @@ impl RetryBackoffLayer { } #[derive(Debug, Clone)] -pub struct RateLimitRetryPolicy; +pub(crate) struct RateLimitRetryPolicy; /// [RetryPolicy] defines logic for which [TransportError] instances should /// the client retry the request and try to recover from. @@ -131,7 +131,7 @@ impl Layer for RetryBackoffLayer { } } -/// An Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based +/// A Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based /// on the error type. See [TransportError] and [RateLimitRetryPolicy]. #[derive(Debug, Clone)] pub struct RetryBackoffService { @@ -151,7 +151,6 @@ pub struct RetryBackoffService { requests_enqueued: Arc, } -// impl tower service impl Service for RetryBackoffService where S: Service @@ -173,7 +172,6 @@ where fn call(&mut self, request: RequestPacket) -> Self::Future { let inner = self.inner.clone(); let this = self.clone(); - let policy = self.policy.clone(); let mut inner = std::mem::replace(&mut self.inner, inner); Box::pin(async move { let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; @@ -201,7 +199,7 @@ where if rate_limit_retry_number > this.max_rate_limit_retries { return Err(TransportErrorKind::custom_str("Max retries exceeded")); } - // tracing!("retrying request due to {:?}", err); + trace!("retrying request due to {:?}", err); let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; @@ -231,9 +229,9 @@ where let total_backoff = next_backoff + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); - // trace!(?total_backoff, budget_backoff = ?seconds_to_wait_for_compute_budget, - // default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate - // limit"); + trace!(?total_backoff, budget_backoff = ?seconds_to_wait_for_compute_budget, + default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate + limit"); tokio::time::sleep(total_backoff).await; } else { @@ -286,6 +284,7 @@ fn should_retry_transport_level_error(error: &TransportErrorKind) -> bool { let msg = err.to_string(); msg.contains("429 Too Many Requests") } + TransportErrorKind::HttpError(http_err) => http_err.is_retry_err(), // If the backend is gone, or there's a completely custom error, we should assume it's not // retryable. @@ -296,45 +295,6 @@ fn should_retry_transport_level_error(error: &TransportErrorKind) -> bool { /// Analyzes the [ErrorPayload] and decides if the request should be retried based on the /// error code or the message. fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { - let ErrorPayload { code, message, .. } = error; - // alchemy throws it this way - if *code == 429 { - return true; - } - - // This is an infura error code for `exceeded project rate limit` - if *code == -32005 { - return true; - } - - // alternative alchemy error for specific IPs - if *code == -32016 && message.contains("rate limit") { - return true; - } - - // quick node error `"credits limited to 6000/sec"` - // - if *code == -32012 && message.contains("credits") { - return true; - } - - // quick node rate limit error: `100/second request limit reached - reduce calls per second or - // upgrade your account at quicknode.com` - if *code == -32007 && message.contains("request limit reached") { - return true; - } - - match message.as_str() { - // this is commonly thrown by infura and is apparently a load balancer issue, see also - "header not found" => true, - // also thrown by infura if out of budget for the day and ratelimited - "daily request count exceeded, request rate limited" => true, - msg => { - msg.contains("rate limit") - || msg.contains("rate exceeded") - || msg.contains("too many requests") - || msg.contains("credits limited") - || msg.contains("request limit") - } - } + let http_err: HTTPError = error.into(); + http_err.is_retry_err() } diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index c253944fbe5..d85e2b8376f 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -26,8 +26,7 @@ pub use r#trait::Transport; pub use alloy_json_rpc::{RpcError, RpcResult}; pub use futures_utils_wasm::{impl_future, BoxFuture}; -mod layers; - +pub mod layers; /// Misc. utilities for building transports. pub mod utils; From a9bde9f30b0dfabc2c5a57fcfaf9d494956ca692 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:12:31 +0530 Subject: [PATCH 03/16] nit --- crates/transport/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 6511e8eaf5e..aca1a15ce84 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -25,7 +25,7 @@ serde.workspace = true thiserror.workspace = true tower.workspace = true url.workspace = true -tracing = { workspace = true } +tracing.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" From 242e4fb610a9d190d648dc6d857b70586eea11b8 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:26:34 +0530 Subject: [PATCH 04/16] nit: use std::thread::sleep --- crates/transport/src/layers/mod.rs | 2 +- crates/transport/src/layers/retry.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/transport/src/layers/mod.rs b/crates/transport/src/layers/mod.rs index c5d471715a2..a6a1ccbf7b4 100644 --- a/crates/transport/src/layers/mod.rs +++ b/crates/transport/src/layers/mod.rs @@ -3,4 +3,4 @@ mod retry; /// RetryBackoffLayer -pub use retry::{RetryBackoffLayer, RetryBackoffService, RetryPolicy}; +pub use retry::{RateLimitRetryPolicy, RetryBackoffLayer, RetryBackoffService, RetryPolicy}; diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 3072ac57228..c77cbc7957b 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -32,7 +32,7 @@ pub struct RetryBackoffLayer { impl RetryBackoffLayer { /// Creates a new retry layer with the given parameters. - pub fn new( + pub const fn new( max_rate_limit_retries: u32, max_timeout_retries: u32, initial_backoff: u64, @@ -48,7 +48,9 @@ impl RetryBackoffLayer { } #[derive(Debug, Clone)] -pub(crate) struct RateLimitRetryPolicy; +/// [RateLimitRetryPolicy] implements [RetryPolicy] to determine whether to retry depending on the +/// err. +pub struct RateLimitRetryPolicy; /// [RetryPolicy] defines logic for which [TransportError] instances should /// the client retry the request and try to recover from. @@ -233,7 +235,7 @@ where default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate limit"); - tokio::time::sleep(total_backoff).await; + std::thread::sleep(total_backoff); } else { if timeout_retries < this.max_timeout_retries { timeout_retries += 1; From 06d3e49d1a73b45282cf10441569e33f4ae75cd2 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 10 Jun 2024 23:37:08 +0530 Subject: [PATCH 05/16] use tokio::sleep --- crates/contract/src/call.rs | 2 +- crates/transport/src/layers/retry.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/contract/src/call.rs b/crates/contract/src/call.rs index 0cec8645fa3..fdedf43792c 100644 --- a/crates/contract/src/call.rs +++ b/crates/contract/src/call.rs @@ -718,7 +718,7 @@ mod tests { let my_state_builder = my_contract.myState(); assert_eq!(my_state_builder.calldata()[..], MyContract::myStateCall {}.abi_encode(),); let result: MyContract::myStateReturn = my_state_builder.call().await.unwrap(); - assert!(result._0); + assert!(result.myState); let do_stuff_builder = my_contract.doStuff(U256::from(0x69), true); assert_eq!( diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index c77cbc7957b..338efe306e3 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -235,7 +235,7 @@ where default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate limit"); - std::thread::sleep(total_backoff); + tokio::time::sleep(total_backoff).await; } else { if timeout_retries < this.max_timeout_retries { timeout_retries += 1; From 3eeec66ec81e8502985ef146458205eb6e6ad64c Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 02:09:44 +0530 Subject: [PATCH 06/16] fix: HttpError and retry err parsing --- crates/json-rpc/src/response/error.rs | 47 ++++++++++++++++ crates/transport-http/src/hyper_transport.rs | 2 +- .../transport-http/src/reqwest_transport.rs | 2 +- crates/transport/src/error.rs | 54 +++++++++++-------- crates/transport/src/layers/retry.rs | 38 +++---------- 5 files changed, 86 insertions(+), 57 deletions(-) diff --git a/crates/json-rpc/src/response/error.rs b/crates/json-rpc/src/response/error.rs index 371fe4e97c9..3c6033d5a51 100644 --- a/crates/json-rpc/src/response/error.rs +++ b/crates/json-rpc/src/response/error.rs @@ -20,6 +20,53 @@ pub struct ErrorPayload> { pub data: Option, } +impl ErrorPayload { + /// Analyzes the [ErrorPayload] and decides if the request should be retried based on the + /// error code or the message. + pub fn is_retry_err(&self) -> bool { + // alchemy throws it this way + if self.code == 429 { + return true; + } + + // This is an infura error code for `exceeded project rate limit` + if self.code == -32005 { + return true; + } + + // alternative alchemy error for specific IPs + if self.code == -32016 && self.message.contains("rate limit") { + return true; + } + + // quick node error `"credits limited to 6000/sec"` + // + if self.code == -32012 && self.message.contains("credits") { + return true; + } + + // quick node rate limit error: `100/second request limit reached - reduce calls per second + // or upgrade your account at quicknode.com` + if self.code == -32007 && self.message.contains("request limit reached") { + return true; + } + + match self.message.as_str() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + msg => { + msg.contains("rate limit") + || msg.contains("rate exceeded") + || msg.contains("too many requests") + || msg.contains("credits limited") + || msg.contains("request limit") + } + } + } +} + impl fmt::Display for ErrorPayload { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "error code {}: {}", self.code, self.message) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 10168dca641..babbb0d256e 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -88,7 +88,7 @@ where if status != hyper::StatusCode::OK { return Err(TransportErrorKind::http_error( - status.as_u16() as i64, + status.as_u16(), String::from_utf8_lossy(&body).to_string(), )); } diff --git a/crates/transport-http/src/reqwest_transport.rs b/crates/transport-http/src/reqwest_transport.rs index 366101b026e..302d588016d 100644 --- a/crates/transport-http/src/reqwest_transport.rs +++ b/crates/transport-http/src/reqwest_transport.rs @@ -64,7 +64,7 @@ impl Http { if status != reqwest::StatusCode::OK { return Err(TransportErrorKind::http_error( - status.as_u16() as i64, + status.as_u16(), String::from_utf8_lossy(&body).to_string(), )); } diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 1e1c549039d..fc6d69ba334 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,4 +1,4 @@ -use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult}; +use alloy_json_rpc::{Id, RpcError, RpcResult}; use serde_json::value::RawValue; use std::{error::Error as StdError, fmt::Debug}; use thiserror::Error; @@ -33,7 +33,7 @@ pub enum TransportErrorKind { /// HTTP Error with code and body #[error("{0}")] - HttpError(#[from] HTTPError), + HttpError(#[from] HttpError), /// Custom error. #[error("{0}")] @@ -73,46 +73,60 @@ impl TransportErrorKind { } /// Instantiate a new `TrasnportError::HttpError`. - pub const fn http_error(status: i64, body: String) -> TransportError { - RpcError::Transport(Self::HttpError(HTTPError { status, body })) + pub const fn http_error(status: u16, body: String) -> TransportError { + RpcError::Transport(Self::HttpError(HttpError { status, body })) + } + + /// Analyzes the [TransportErrorKind] and decides if the request should be retried based on the + /// variant. + pub fn is_retry_err(&self) -> bool { + match self { + // Missing batch response errors can be retried. + Self::MissingBatchResponse(_) => true, + Self::Custom(err) => { + // currently http error responses are not standard in alloy + let msg = err.to_string(); + msg.contains("429 Too Many Requests") + } + Self::HttpError(http_err) => http_err.is_rate_limit_err(), + + // If the backend is gone, or there's a completely custom error, we should assume it's + // not retryable. + _ => false, + } } } -/// Type for holding HTTP errors such as 429 or -32005 by the RPC provider. +/// Type for holding HTTP errors such as 429 rate limit error. #[derive(Debug, thiserror::Error)] #[error("HTTP error {status} with body: {body}")] -pub struct HTTPError { - pub status: i64, +pub struct HttpError { + pub status: u16, pub body: String, } -impl HTTPError { +impl HttpError { /// Analyzes the `status` and `body` to determine whether the request should be retried. - pub fn is_retry_err(&self) -> bool { + pub fn is_rate_limit_err(&self) -> bool { // alchemy throws it this way if self.status == 429 { return true; } - // This is an infura error code for `exceeded project rate limit` - if self.status == -32005 { - return true; - } - // alternative alchemy error for specific IPs - if self.status == -32016 && self.body.contains("rate limit") { + if self.body.contains("rate limit") { return true; } // quick node error `"credits limited to 6000/sec"` // - if self.status == -32012 && self.body.contains("credits") { + if self.body.contains("credits") { return true; } // quick node rate limit error: `100/second request limit reached - reduce calls per second // or upgrade your account at quicknode.com` - if self.status == -32007 && self.body.contains("request limit reached") { + if self.body.contains("request limit reached") { return true; } @@ -131,9 +145,3 @@ impl HTTPError { } } } - -impl From<&ErrorPayload> for HTTPError { - fn from(value: &ErrorPayload) -> Self { - Self { status: value.code, body: value.message.clone() } - } -} diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 338efe306e3..a33e28701f7 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -1,5 +1,5 @@ use crate::{ - error::{HTTPError, TransportError, TransportErrorKind}, + error::{TransportError, TransportErrorKind}, TransportFut, }; use alloy_json_rpc::{ErrorPayload, RequestPacket, ResponsePacket}; @@ -47,9 +47,9 @@ impl RetryBackoffLayer { } } -#[derive(Debug, Clone)] /// [RateLimitRetryPolicy] implements [RetryPolicy] to determine whether to retry depending on the /// err. +#[derive(Debug, Clone)] pub struct RateLimitRetryPolicy; /// [RetryPolicy] defines logic for which [TransportError] instances should @@ -67,13 +67,13 @@ impl RetryPolicy for RateLimitRetryPolicy { match error { // There was a transport-level error. This is either a non-retryable error, // or a server error that should be retried. - TransportError::Transport(err) => should_retry_transport_level_error(err), + TransportError::Transport(err) => err.is_retry_err(), // The transport could not serialize the error itself. The request was malformed from // the start. TransportError::SerError(_) => false, TransportError::DeserError { text, .. } => { if let Ok(resp) = serde_json::from_str::(text) { - return should_retry_json_rpc_error(&resp); + return resp.is_retry_err(); } // some providers send invalid JSON RPC in the error case (no `id:u64`), but the @@ -84,12 +84,12 @@ impl RetryPolicy for RateLimitRetryPolicy { } if let Ok(resp) = serde_json::from_str::(text) { - return should_retry_json_rpc_error(&resp.error); + return resp.error.is_retry_err(); } false } - TransportError::ErrorResp(err) => should_retry_json_rpc_error(err), + TransportError::ErrorResp(err) => err.is_retry_err(), TransportError::NullResp => true, TransportError::UnsupportedFeature(_) => false, TransportError::LocalUsageError(_) => false, @@ -274,29 +274,3 @@ fn compute_unit_offset_in_secs( 0 } } - -/// Analyzes the [TransportErrorKind] and decides if the request should be retried based on the -/// variant. -fn should_retry_transport_level_error(error: &TransportErrorKind) -> bool { - match error { - // Missing batch response errors can be retried. - TransportErrorKind::MissingBatchResponse(_) => true, - TransportErrorKind::Custom(err) => { - // currently http error responses are not standard in alloy - let msg = err.to_string(); - msg.contains("429 Too Many Requests") - } - TransportErrorKind::HttpError(http_err) => http_err.is_retry_err(), - - // If the backend is gone, or there's a completely custom error, we should assume it's not - // retryable. - _ => false, - } -} - -/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the -/// error code or the message. -fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { - let http_err: HTTPError = error.into(); - http_err.is_retry_err() -} From 9a690f6c4c8eb76971fad31f65ad1602a0a247e0 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 12:40:37 +0530 Subject: [PATCH 07/16] impl RpcErrorExt --- crates/json-rpc/src/response/error.rs | 2 +- crates/transport/Cargo.toml | 2 +- crates/transport/src/error.rs | 74 +++++++++++++++++++++++++-- crates/transport/src/layers/retry.rs | 53 ++----------------- crates/transport/src/lib.rs | 2 +- 5 files changed, 76 insertions(+), 57 deletions(-) diff --git a/crates/json-rpc/src/response/error.rs b/crates/json-rpc/src/response/error.rs index 3c6033d5a51..c6de4dba544 100644 --- a/crates/json-rpc/src/response/error.rs +++ b/crates/json-rpc/src/response/error.rs @@ -20,7 +20,7 @@ pub struct ErrorPayload> { pub data: Option, } -impl ErrorPayload { +impl ErrorPayload { /// Analyzes the [ErrorPayload] and decides if the request should be retried based on the /// error code or the message. pub fn is_retry_err(&self) -> bool { diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index aca1a15ce84..4189d298c1e 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -31,4 +31,4 @@ tracing.workspace = true wasm-bindgen-futures = "0.4" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { workspace = true, features = ["rt"] } +tokio = { workspace = true, features = ["rt", "time"] } diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index fc6d69ba334..06c9fca17d1 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,6 +1,7 @@ -use alloy_json_rpc::{Id, RpcError, RpcResult}; +use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult}; +use serde::Deserialize; use serde_json::value::RawValue; -use std::{error::Error as StdError, fmt::Debug}; +use std::{borrow::Borrow, error::Error as StdError, fmt::Debug}; use thiserror::Error; /// A transport error is an [`RpcError`] containing a [`TransportErrorKind`]. @@ -9,6 +10,71 @@ pub type TransportError> = RpcError> = RpcResult; +/// Extension trait to implement methods for [`RpcError`]. +pub trait RpcErrorExt { + /// Analyzes whether to retry the request depending on the error. + fn is_retryable(&self) -> bool; + + /// Fetches the backoff hint from the error message if present + fn backoff_hint(&self) -> Option; +} + +impl RpcErrorExt for RpcError +where + E: Borrow, +{ + fn is_retryable(&self) -> bool { + match self { + // There was a transport-level error. This is either a non-retryable error, + // or a server error that should be retried. + TransportError::Transport(err) => err.is_retry_err(), + // The transport could not serialize the error itself. The request was malformed from + // the start. + TransportError::SerError(_) => false, + TransportError::DeserError { text, .. } => { + if let Ok(resp) = serde_json::from_str::(text) { + return resp.is_retry_err(); + } + + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(Deserialize)] + struct Resp { + error: ErrorPayload, + } + + if let Ok(resp) = serde_json::from_str::(text) { + return resp.error.is_retry_err(); + } + + false + } + TransportError::ErrorResp(err) => err.is_retry_err(), + TransportError::NullResp => true, + _ => false, + } + } + + fn backoff_hint(&self) -> Option { + if let TransportError::ErrorResp(resp) = self { + let data = resp.try_data_as::(); + if let Some(Ok(data)) = data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let backoff_seconds = &data["rate"]["backoff_seconds"]; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(std::time::Duration::from_secs(seconds)); + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(std::time::Duration::from_secs(seconds as u64 + 1)); + } + } + } + None + } +} + /// Transport error. /// /// All transport errors are wrapped in this enum. @@ -83,13 +149,11 @@ impl TransportErrorKind { match self { // Missing batch response errors can be retried. Self::MissingBatchResponse(_) => true, + Self::HttpError(http_err) => http_err.is_rate_limit_err(), Self::Custom(err) => { - // currently http error responses are not standard in alloy let msg = err.to_string(); msg.contains("429 Too Many Requests") } - Self::HttpError(http_err) => http_err.is_rate_limit_err(), - // If the backend is gone, or there's a completely custom error, we should assume it's // not retryable. _ => false, diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index a33e28701f7..41280d9b19c 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -1,9 +1,8 @@ use crate::{ error::{TransportError, TransportErrorKind}, - TransportFut, + RpcErrorExt, TransportFut, }; -use alloy_json_rpc::{ErrorPayload, RequestPacket, ResponsePacket}; -use serde::Deserialize; +use alloy_json_rpc::{RequestPacket, ResponsePacket}; use std::{ sync::{ atomic::{AtomicU32, Ordering}, @@ -64,56 +63,12 @@ pub trait RetryPolicy: Send + Sync + std::fmt::Debug { impl RetryPolicy for RateLimitRetryPolicy { fn should_retry(&self, error: &TransportError) -> bool { - match error { - // There was a transport-level error. This is either a non-retryable error, - // or a server error that should be retried. - TransportError::Transport(err) => err.is_retry_err(), - // The transport could not serialize the error itself. The request was malformed from - // the start. - TransportError::SerError(_) => false, - TransportError::DeserError { text, .. } => { - if let Ok(resp) = serde_json::from_str::(text) { - return resp.is_retry_err(); - } - - // some providers send invalid JSON RPC in the error case (no `id:u64`), but the - // text should be a `JsonRpcError` - #[derive(Deserialize)] - struct Resp { - error: ErrorPayload, - } - - if let Ok(resp) = serde_json::from_str::(text) { - return resp.error.is_retry_err(); - } - - false - } - TransportError::ErrorResp(err) => err.is_retry_err(), - TransportError::NullResp => true, - TransportError::UnsupportedFeature(_) => false, - TransportError::LocalUsageError(_) => false, - } + error.is_retryable() } /// Provides a backoff hint if the error response contains it fn backoff_hint(&self, error: &TransportError) -> Option { - if let TransportError::ErrorResp(resp) = error { - let data = resp.try_data_as::(); - if let Some(Ok(data)) = data { - // if daily rate limit exceeded, infura returns the requested backoff in the error - // response - let backoff_seconds = &data["rate"]["backoff_seconds"]; - // infura rate limit error - if let Some(seconds) = backoff_seconds.as_u64() { - return Some(std::time::Duration::from_secs(seconds)); - } - if let Some(seconds) = backoff_seconds.as_f64() { - return Some(std::time::Duration::from_secs(seconds as u64 + 1)); - } - } - } - None + error.backoff_hint() } } diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index d85e2b8376f..023ad23df90 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -18,7 +18,7 @@ pub use common::Authorization; mod error; #[doc(hidden)] pub use error::TransportErrorKind; -pub use error::{TransportError, TransportResult}; +pub use error::{RpcErrorExt, TransportError, TransportResult}; mod r#trait; pub use r#trait::Transport; From 153bb3514dbf2223e53869d821e69fc9cfa623db Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 12:55:49 +0530 Subject: [PATCH 08/16] dedup --- crates/transport/src/error.rs | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 9a55e23cf14..beac040fd68 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -172,40 +172,9 @@ pub struct HttpError { impl HttpError { /// Analyzes the `status` and `body` to determine whether the request should be retried. pub fn is_rate_limit_err(&self) -> bool { - // alchemy throws it this way if self.status == 429 { return true; } - - // alternative alchemy error for specific IPs - if self.body.contains("rate limit") { - return true; - } - - // quick node error `"credits limited to 6000/sec"` - // - if self.body.contains("credits") { - return true; - } - - // quick node rate limit error: `100/second request limit reached - reduce calls per second - // or upgrade your account at quicknode.com` - if self.body.contains("request limit reached") { - return true; - } - - match self.body.as_str() { - // this is commonly thrown by infura and is apparently a load balancer issue, see also - "header not found" => true, - // also thrown by infura if out of budget for the day and ratelimited - "daily request count exceeded, request rate limited" => true, - msg => { - msg.contains("rate limit") - || msg.contains("rate exceeded") - || msg.contains("too many requests") - || msg.contains("credits limited") - || msg.contains("request limit") - } - } + false } } From 26820a166308d6511813ae36760e3e16c0c7d204 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:01:28 +0530 Subject: [PATCH 09/16] clippy --- crates/transport/Cargo.toml | 2 +- crates/transport/src/error.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 71c011567a4..16a09a63350 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -29,7 +29,7 @@ tracing.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = { version = "0.4", optional = true } - +tokio = { workspace = true, features = ["rt", "time"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = ["rt", "time"] } diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index beac040fd68..249de0261ec 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -170,8 +170,8 @@ pub struct HttpError { } impl HttpError { - /// Analyzes the `status` and `body` to determine whether the request should be retried. - pub fn is_rate_limit_err(&self) -> bool { + /// Checks the `status` to determine whether the request should be retried. + pub const fn is_rate_limit_err(&self) -> bool { if self.status == 429 { return true; } From be349d8a3775af9da5d32a5bbec848ea9402348f Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:09:23 +0530 Subject: [PATCH 10/16] nit --- crates/transport/Cargo.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 16a09a63350..9382b3b91b6 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -26,12 +26,10 @@ thiserror.workspace = true tower.workspace = true url.workspace = true tracing.workspace = true +tokio = { workspace = true, features = ["rt", "time"] } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = { version = "0.4", optional = true } -tokio = { workspace = true, features = ["rt", "time"] } -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { workspace = true, features = ["rt", "time"] } [features] From e13d393a810febb94131345bb0cb2b69d9dbc3dd Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 07:13:26 -0400 Subject: [PATCH 11/16] suggested nits Co-authored-by: Matthias Seitz --- crates/transport-http/src/hyper_transport.rs | 2 +- crates/transport-http/src/reqwest_transport.rs | 2 +- crates/transport/src/layers/retry.rs | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 7262b84a1f3..a92a941ee07 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -89,7 +89,7 @@ where if status != hyper::StatusCode::OK { return Err(TransportErrorKind::http_error( status.as_u16(), - String::from_utf8_lossy(&body).to_string(), + String::from_utf8_lossy(&body).into_owned(), )); } diff --git a/crates/transport-http/src/reqwest_transport.rs b/crates/transport-http/src/reqwest_transport.rs index 818cc0765a8..0911a8345de 100644 --- a/crates/transport-http/src/reqwest_transport.rs +++ b/crates/transport-http/src/reqwest_transport.rs @@ -65,7 +65,7 @@ impl Http { if status != reqwest::StatusCode::OK { return Err(TransportErrorKind::http_error( status.as_u16(), - String::from_utf8_lossy(&body).to_string(), + String::from_utf8_lossy(&body).into_owned(), )); } diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 41280d9b19c..07a82c481de 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -48,7 +48,8 @@ impl RetryBackoffLayer { /// [RateLimitRetryPolicy] implements [RetryPolicy] to determine whether to retry depending on the /// err. -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, Default)] +#[non_exhaustive] pub struct RateLimitRetryPolicy; /// [RetryPolicy] defines logic for which [TransportError] instances should From fa2f1f5b5001c0e9d6751c7801f35fa00b861de5 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 16:45:12 +0530 Subject: [PATCH 12/16] nit --- crates/transport/src/error.rs | 130 +++++++++++++++++----------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 249de0261ec..81af525f372 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -10,71 +10,6 @@ pub type TransportError> = RpcError> = RpcResult; -/// Extension trait to implement methods for [`RpcError`]. -pub trait RpcErrorExt { - /// Analyzes whether to retry the request depending on the error. - fn is_retryable(&self) -> bool; - - /// Fetches the backoff hint from the error message if present - fn backoff_hint(&self) -> Option; -} - -impl RpcErrorExt for RpcError -where - E: Borrow, -{ - fn is_retryable(&self) -> bool { - match self { - // There was a transport-level error. This is either a non-retryable error, - // or a server error that should be retried. - Self::Transport(err) => err.is_retry_err(), - // The transport could not serialize the error itself. The request was malformed from - // the start. - Self::SerError(_) => false, - Self::DeserError { text, .. } => { - if let Ok(resp) = serde_json::from_str::(text) { - return resp.is_retry_err(); - } - - // some providers send invalid JSON RPC in the error case (no `id:u64`), but the - // text should be a `JsonRpcError` - #[derive(Deserialize)] - struct Resp { - error: ErrorPayload, - } - - if let Ok(resp) = serde_json::from_str::(text) { - return resp.error.is_retry_err(); - } - - false - } - Self::ErrorResp(err) => err.is_retry_err(), - Self::NullResp => true, - _ => false, - } - } - - fn backoff_hint(&self) -> Option { - if let Self::ErrorResp(resp) = self { - let data = resp.try_data_as::(); - if let Some(Ok(data)) = data { - // if daily rate limit exceeded, infura returns the requested backoff in the error - // response - let backoff_seconds = &data["rate"]["backoff_seconds"]; - // infura rate limit error - if let Some(seconds) = backoff_seconds.as_u64() { - return Some(std::time::Duration::from_secs(seconds)); - } - if let Some(seconds) = backoff_seconds.as_f64() { - return Some(std::time::Duration::from_secs(seconds as u64 + 1)); - } - } - } - None - } -} - /// Transport error. /// /// All transport errors are wrapped in this enum. @@ -178,3 +113,68 @@ impl HttpError { false } } + +/// Extension trait to implement methods for [`RpcError`]. +pub trait RpcErrorExt { + /// Analyzes whether to retry the request depending on the error. + fn is_retryable(&self) -> bool; + + /// Fetches the backoff hint from the error message if present + fn backoff_hint(&self) -> Option; +} + +impl RpcErrorExt for RpcError +where + E: Borrow, +{ + fn is_retryable(&self) -> bool { + match self { + // There was a transport-level error. This is either a non-retryable error, + // or a server error that should be retried. + Self::Transport(err) => err.is_retry_err(), + // The transport could not serialize the error itself. The request was malformed from + // the start. + Self::SerError(_) => false, + Self::DeserError { text, .. } => { + if let Ok(resp) = serde_json::from_str::(text) { + return resp.is_retry_err(); + } + + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(Deserialize)] + struct Resp { + error: ErrorPayload, + } + + if let Ok(resp) = serde_json::from_str::(text) { + return resp.error.is_retry_err(); + } + + false + } + Self::ErrorResp(err) => err.is_retry_err(), + Self::NullResp => true, + _ => false, + } + } + + fn backoff_hint(&self) -> Option { + if let Self::ErrorResp(resp) = self { + let data = resp.try_data_as::(); + if let Some(Ok(data)) = data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let backoff_seconds = &data["rate"]["backoff_seconds"]; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(std::time::Duration::from_secs(seconds)); + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(std::time::Duration::from_secs(seconds as u64 + 1)); + } + } + } + None + } +} From a1c21c16cd3d89b1a5dec2172d55b9d98a33b583 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 11 Jun 2024 22:24:37 +0530 Subject: [PATCH 13/16] nits --- crates/transport/src/error.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 81af525f372..46c0b68ebb8 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,7 +1,7 @@ use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult}; use serde::Deserialize; use serde_json::value::RawValue; -use std::{borrow::Borrow, error::Error as StdError, fmt::Debug}; +use std::{error::Error as StdError, fmt::Debug}; use thiserror::Error; /// A transport error is an [`RpcError`] containing a [`TransportErrorKind`]. @@ -115,7 +115,7 @@ impl HttpError { } /// Extension trait to implement methods for [`RpcError`]. -pub trait RpcErrorExt { +pub trait RpcErrorExt { /// Analyzes whether to retry the request depending on the error. fn is_retryable(&self) -> bool; @@ -123,10 +123,7 @@ pub trait RpcErrorExt { fn backoff_hint(&self) -> Option; } -impl RpcErrorExt for RpcError -where - E: Borrow, -{ +impl RpcErrorExt for RpcError { fn is_retryable(&self) -> bool { match self { // There was a transport-level error. This is either a non-retryable error, From 3ebccf43b875e300f9d266e27310949b2f0d809b Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 12 Jun 2024 10:59:14 +0530 Subject: [PATCH 14/16] nits --- crates/transport/Cargo.toml | 5 ++++- crates/transport/src/layers/retry.rs | 31 ++++++++++++++++++++-------- crates/transport/src/lib.rs | 1 + 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 9382b3b91b6..461db863eaf 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -26,7 +26,10 @@ thiserror.workspace = true tower.workspace = true url.workspace = true tracing.workspace = true -tokio = { workspace = true, features = ["rt", "time"] } +tokio = { workspace = true, features = ["time"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt"] } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = { version = "0.4", optional = true } diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 07a82c481de..df933e35507 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -9,6 +9,7 @@ use std::{ Arc, }, task::{Context, Poll}, + time::Duration, }; use tower::{Layer, Service}; use tracing::trace; @@ -109,6 +110,12 @@ pub struct RetryBackoffService { requests_enqueued: Arc, } +impl RetryBackoffService { + const fn initial_backoff(&self) -> Duration { + Duration::from_millis(self.initial_backoff) + } +} + impl Service for RetryBackoffService where S: Service @@ -137,9 +144,9 @@ where let mut timeout_retries: u32 = 0; loop { let err; - let fut = inner.call(request.clone()).await; + let res = inner.call(request.clone()).await; - match fut { + match res { Ok(res) => { if let Some(e) = res.as_error() { err = TransportError::ErrorResp(e.clone()) @@ -155,17 +162,19 @@ where if should_retry { rate_limit_retry_number += 1; if rate_limit_retry_number > this.max_rate_limit_retries { - return Err(TransportErrorKind::custom_str("Max retries exceeded")); + return Err(TransportErrorKind::custom_str(&format!( + "Max retries exceeded {}", + err + ))); } - trace!("retrying request due to {:?}", err); + trace!(%err, "retrying request"); let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; // try to extract the requested backoff from the error or compute the next // backoff based on retry count let backoff_hint = this.policy.backoff_hint(&err); - let next_backoff = backoff_hint - .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); + let next_backoff = backoff_hint.unwrap_or_else(|| this.initial_backoff()); // requests are usually weighted and can vary from 10 CU to several 100 CU, // cheaper requests are more common some example alchemy @@ -187,9 +196,13 @@ where let total_backoff = next_backoff + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); - trace!(?total_backoff, budget_backoff = ?seconds_to_wait_for_compute_budget, - default_backoff = ?next_backoff, ?backoff_hint, "backing off due to rate - limit"); + trace!( + total_backoff_millis = total_backoff.as_millis(), + budget_backoff_millis = seconds_to_wait_for_compute_budget * 1000, + default_backoff_millis = next_backoff.as_millis(), + backoff_hint_millis = backoff_hint.map(|d| d.as_millis()), + "(all in ms) backing off due to rate limit" + ); tokio::time::sleep(total_backoff).await; } else { diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 023ad23df90..8082ef697a1 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -27,6 +27,7 @@ pub use alloy_json_rpc::{RpcError, RpcResult}; pub use futures_utils_wasm::{impl_future, BoxFuture}; pub mod layers; + /// Misc. utilities for building transports. pub mod utils; From cb2eed92e758ae1f0195cd2ef8942cc0720bf490 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 2 Jul 2024 19:28:34 +0530 Subject: [PATCH 15/16] nits --- crates/transport/Cargo.toml | 5 +---- crates/transport/src/error.rs | 2 +- crates/transport/src/layers/retry.rs | 4 ++-- crates/transport/src/lib.rs | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 912ef752487..89c3cca99d3 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -30,10 +30,7 @@ thiserror.workspace = true tower.workspace = true url.workspace = true tracing.workspace = true -tokio = { workspace = true, features = ["time"] } - -[dev-dependencies] -tokio = { workspace = true, features = ["rt"] } +tokio = { workspace = true, features = ["rt", "time"] } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = { version = "0.4", optional = true } diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 410a12e2900..55b55f1bc1c 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -113,7 +113,7 @@ impl HttpError { } /// Extension trait to implement methods for [`RpcError`]. -pub trait RpcErrorExt { +pub(crate) trait RpcErrorExt { /// Analyzes whether to retry the request depending on the error. fn is_retryable(&self) -> bool; diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index df933e35507..77180a39d2f 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -1,6 +1,6 @@ use crate::{ - error::{TransportError, TransportErrorKind}, - RpcErrorExt, TransportFut, + error::{RpcErrorExt, TransportError, TransportErrorKind}, + TransportFut, }; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use std::{ diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 8082ef697a1..eed8a694221 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -18,7 +18,7 @@ pub use common::Authorization; mod error; #[doc(hidden)] pub use error::TransportErrorKind; -pub use error::{RpcErrorExt, TransportError, TransportResult}; +pub use error::{TransportError, TransportResult}; mod r#trait; pub use r#trait::Transport; From 60a7f9fc4872b1d1d77036ce2a67418bf5430206 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 2 Jul 2024 19:31:20 +0530 Subject: [PATCH 16/16] rm timeout retries --- crates/transport/src/layers/retry.rs | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/crates/transport/src/layers/retry.rs b/crates/transport/src/layers/retry.rs index 77180a39d2f..ce66cc0fc6c 100644 --- a/crates/transport/src/layers/retry.rs +++ b/crates/transport/src/layers/retry.rs @@ -22,8 +22,6 @@ use tracing::trace; pub struct RetryBackoffLayer { /// The maximum number of retries for rate limit errors max_rate_limit_retries: u32, - /// The maximum number of retries for timeout errors - max_timeout_retries: u32, /// The initial backoff in milliseconds initial_backoff: u64, /// The number of compute units per second for this provider @@ -34,16 +32,10 @@ impl RetryBackoffLayer { /// Creates a new retry layer with the given parameters. pub const fn new( max_rate_limit_retries: u32, - max_timeout_retries: u32, initial_backoff: u64, compute_units_per_second: u64, ) -> Self { - Self { - max_rate_limit_retries, - max_timeout_retries, - initial_backoff, - compute_units_per_second, - } + Self { max_rate_limit_retries, initial_backoff, compute_units_per_second } } } @@ -82,7 +74,6 @@ impl Layer for RetryBackoffLayer { inner, policy: RateLimitRetryPolicy, max_rate_limit_retries: self.max_rate_limit_retries, - max_timeout_retries: self.max_timeout_retries, initial_backoff: self.initial_backoff, compute_units_per_second: self.compute_units_per_second, requests_enqueued: Arc::new(AtomicU32::new(0)), @@ -100,8 +91,6 @@ pub struct RetryBackoffService { policy: RateLimitRetryPolicy, /// The maximum number of retries for rate limit errors max_rate_limit_retries: u32, - /// The maximum number of retries for timeout errors - max_timeout_retries: u32, /// The initial backoff in milliseconds initial_backoff: u64, /// The number of compute units per second for this service @@ -141,7 +130,6 @@ where Box::pin(async move { let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; let mut rate_limit_retry_number: u32 = 0; - let mut timeout_retries: u32 = 0; loop { let err; let res = inner.call(request.clone()).await; @@ -206,11 +194,6 @@ where tokio::time::sleep(total_backoff).await; } else { - if timeout_retries < this.max_timeout_retries { - timeout_retries += 1; - continue; - } - this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); return Err(err); }