Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): retry layer #849

Merged
merged 19 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/json-rpc/src/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,53 @@ pub struct ErrorPayload<ErrData = Box<RawValue>> {
pub data: Option<ErrData>,
}

impl<E> ErrorPayload<E> {
/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the
/// error code or the message.
pub fn is_retry_err(&self) -> bool {
// alchemy throws it this way
if self.code == 429 {
return true;
}

// This is an infura error code for `exceeded project rate limit`
if self.code == -32005 {
return true;
}

// alternative alchemy error for specific IPs
if self.code == -32016 && self.message.contains("rate limit") {
return true;
}

// quick node error `"credits limited to 6000/sec"`
// <https://github.com/foundry-rs/foundry/pull/6712#issuecomment-1951441240>
if self.code == -32012 && self.message.contains("credits") {
return true;
}

// quick node rate limit error: `100/second request limit reached - reduce calls per second
// or upgrade your account at quicknode.com` <https://github.com/foundry-rs/foundry/issues/4894>
if self.code == -32007 && self.message.contains("request limit reached") {
return true;
}

match self.message.as_str() {
// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
"header not found" => true,
// also thrown by infura if out of budget for the day and ratelimited
"daily request count exceeded, request rate limited" => true,
msg => {
msg.contains("rate limit")
|| msg.contains("rate exceeded")
|| msg.contains("too many requests")
|| msg.contains("credits limited")
|| msg.contains("request limit")
}
}
}
}

impl<ErrData> fmt::Display for ErrorPayload<ErrData> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "error code {}: {}", self.code, self.message)
Expand Down
8 changes: 4 additions & 4 deletions crates/transport-http/src/hyper_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ where
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != hyper::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
return Err(TransportErrorKind::http_error(
status.as_u16(),
String::from_utf8_lossy(&body).into_owned(),
));
}

// Deserialize a Box<RawValue> from the body. If deserialization fails, return
Expand Down
8 changes: 4 additions & 4 deletions crates/transport-http/src/reqwest_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ impl Http<Client> {
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != reqwest::StatusCode::OK {
return Err(TransportErrorKind::custom_str(&format!(
"HTTP error {status} with body: {}",
String::from_utf8_lossy(&body)
)));
return Err(TransportErrorKind::http_error(
status.as_u16(),
String::from_utf8_lossy(&body).into_owned(),
));
}

// Deserialize a Box<RawValue> from the body. If deserialization fails, return
Expand Down
6 changes: 3 additions & 3 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ serde.workspace = true
thiserror.workspace = true
tower.workspace = true
url.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["rt"] }

[features]

wasm-bindgen = ["dep:wasm-bindgen-futures"]
109 changes: 108 additions & 1 deletion crates/transport/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_json_rpc::{Id, RpcError, RpcResult};
use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult};
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{error::Error as StdError, fmt::Debug};
use thiserror::Error;
Expand Down Expand Up @@ -31,6 +32,10 @@ pub enum TransportErrorKind {
#[error("subscriptions are not available on this provider")]
PubsubUnavailable,

/// HTTP Error with code and body
#[error("{0}")]
HttpError(#[from] HttpError),

/// Custom error.
#[error("{0}")]
Custom(#[source] Box<dyn StdError + Send + Sync + 'static>),
Expand Down Expand Up @@ -67,4 +72,106 @@ impl TransportErrorKind {
pub const fn pubsub_unavailable() -> TransportError {
RpcError::Transport(Self::PubsubUnavailable)
}

/// Instantiate a new `TrasnportError::HttpError`.
pub const fn http_error(status: u16, body: String) -> TransportError {
RpcError::Transport(Self::HttpError(HttpError { status, body }))
}

/// Analyzes the [TransportErrorKind] and decides if the request should be retried based on the
/// variant.
pub fn is_retry_err(&self) -> bool {
match self {
// Missing batch response errors can be retried.
Self::MissingBatchResponse(_) => true,
Self::HttpError(http_err) => http_err.is_rate_limit_err(),
Self::Custom(err) => {
let msg = err.to_string();
msg.contains("429 Too Many Requests")
mattsse marked this conversation as resolved.
Show resolved Hide resolved
}
// If the backend is gone, or there's a completely custom error, we should assume it's
// not retryable.
_ => false,
}
}
}

/// Type for holding HTTP errors such as 429 rate limit error.
#[derive(Debug, thiserror::Error)]
#[error("HTTP error {status} with body: {body}")]
pub struct HttpError {
pub status: u16,
pub body: String,
}

impl HttpError {
/// Checks the `status` to determine whether the request should be retried.
pub const fn is_rate_limit_err(&self) -> bool {
if self.status == 429 {
return true;
}
false
}
}

/// Extension trait to implement methods for [`RpcError<TransportErrorKind, E>`].
pub trait RpcErrorExt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this to implement is_retryable_err and backoff_hint methods specifically on RpcError<TransportErrorKind> and not the RpcError<E, ErrResp> generic type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this is only implemented for RpcError?

could we at least make this pub(crate)?

/// Analyzes whether to retry the request depending on the error.
fn is_retryable(&self) -> bool;

/// Fetches the backoff hint from the error message if present
fn backoff_hint(&self) -> Option<std::time::Duration>;
}

impl RpcErrorExt for RpcError<TransportErrorKind> {
fn is_retryable(&self) -> bool {
match self {
// There was a transport-level error. This is either a non-retryable error,
// or a server error that should be retried.
Self::Transport(err) => err.is_retry_err(),
// The transport could not serialize the error itself. The request was malformed from
// the start.
Self::SerError(_) => false,
Self::DeserError { text, .. } => {
if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
return resp.is_retry_err();
}

// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
// text should be a `JsonRpcError`
#[derive(Deserialize)]
struct Resp {
error: ErrorPayload,
}

if let Ok(resp) = serde_json::from_str::<Resp>(text) {
return resp.error.is_retry_err();
}

false
}
Self::ErrorResp(err) => err.is_retry_err(),
Self::NullResp => true,
_ => false,
}
}

fn backoff_hint(&self) -> Option<std::time::Duration> {
if let Self::ErrorResp(resp) = self {
let data = resp.try_data_as::<serde_json::Value>();
if let Some(Ok(data)) = data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let backoff_seconds = &data["rate"]["backoff_seconds"];
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(std::time::Duration::from_secs(seconds));
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(std::time::Duration::from_secs(seconds as u64 + 1));
}
}
}
None
}
}
6 changes: 6 additions & 0 deletions crates/transport/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Module for housing transport layers.

mod retry;

/// RetryBackoffLayer
pub use retry::{RateLimitRetryPolicy, RetryBackoffLayer, RetryBackoffService, RetryPolicy};
Loading