diff --git a/Cargo.lock b/Cargo.lock index 0850f369..85f155cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,10 +831,6 @@ checksum = "c6332f6d470e465bf00f9306743ff172f54b83e7e31edfe28f1444c085ccb0e4" dependencies = [ "alloy-json-rpc", "alloy-transport", - "http-body-util", - "hyper", - "hyper-tls", - "hyper-util", "reqwest", "serde_json", "tower 0.5.2", @@ -3299,7 +3295,7 @@ dependencies = [ "clap", "eyre", "futures-util", - "jsonrpsee", + "jsonrpsee 0.26.0", "metrics", "metrics-derive", "op-alloy-consensus", @@ -4497,6 +4493,20 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpsee" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "jsonrpsee-core 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "jsonrpsee-http-client 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "jsonrpsee-proc-macros 0.25.1", + "jsonrpsee-server 0.25.1", + "jsonrpsee-types 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "tokio", + "tracing", +] + [[package]] name = "jsonrpsee" version = "0.26.0" @@ -4506,8 +4516,8 @@ dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core 0.26.0", "jsonrpsee-http-client 0.26.0", - "jsonrpsee-proc-macros", - "jsonrpsee-server", + "jsonrpsee-proc-macros 0.26.0", + "jsonrpsee-server 0.26.0", "jsonrpsee-types 0.26.0", "jsonrpsee-wasm-client", "jsonrpsee-ws-client", @@ -4552,7 +4562,7 @@ dependencies = [ "http", "http-body", "http-body-util", - "jsonrpsee-types 0.25.1", + "jsonrpsee-types 0.25.1 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project", "serde", "serde_json", @@ -4562,6 +4572,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-core" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "parking_lot", + "pin-project", + "rand 0.9.1", + "rustc-hash 2.1.1", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tower 0.5.2", + "tracing", +] + [[package]] name = "jsonrpsee-core" version = "0.26.0" @@ -4601,8 +4635,30 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "jsonrpsee-core 0.25.1", - "jsonrpsee-types 0.25.1", + "jsonrpsee-core 0.25.1 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpsee-types 0.25.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rustls", + "rustls-platform-verifier", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tower 0.5.2", + "url", +] + +[[package]] +name = "jsonrpsee-http-client" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "base64 0.22.1", + "http-body", + "hyper", + "hyper-rustls", + "hyper-util", + "jsonrpsee-core 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "jsonrpsee-types 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", "rustls", "rustls-platform-verifier", "serde", @@ -4636,6 +4692,18 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "jsonrpsee-proc-macros" version = "0.26.0" @@ -4649,6 +4717,32 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "jsonrpsee-server" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "jsonrpsee-types 0.25.1 (git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff)", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tower 0.5.2", + "tracing", +] + [[package]] name = "jsonrpsee-server" version = "0.26.0" @@ -4688,6 +4782,17 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "jsonrpsee-types" +version = "0.25.1" +source = "git+https://github.com/paritytech/jsonrpsee?rev=f04afa740e55db60dce20d9839758792f035ffff#f04afa740e55db60dce20d9839758792f035ffff" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror 2.0.12", +] + [[package]] name = "jsonrpsee-types" version = "0.26.0" @@ -5667,7 +5772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8eb878fc5ea95adb5abe55fb97475b3eb0dcc77dfcd6f61bd626a68ae0bdba1" dependencies = [ "alloy-primitives", - "jsonrpsee", + "jsonrpsee 0.26.0", ] [[package]] @@ -7310,7 +7415,7 @@ dependencies = [ "derive_more", "eyre", "futures-util", - "jsonrpsee", + "jsonrpsee 0.26.0", "reth-chainspec", "reth-cli-commands", "reth-config", @@ -7900,7 +8005,7 @@ dependencies = [ "alloy-rpc-types-debug", "eyre", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "pretty_assertions", "reth-engine-primitives", "reth-evm", @@ -7925,7 +8030,7 @@ dependencies = [ "futures", "futures-util", "interprocess", - "jsonrpsee", + "jsonrpsee 0.26.0", "pin-project", "serde_json", "thiserror 2.0.12", @@ -8183,7 +8288,7 @@ dependencies = [ "eyre", "fdlimit", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "rayon", "reth-basic-payload-builder", "reth-chain-state", @@ -8381,7 +8486,7 @@ source = "git+https://github.com/paradigmxyz/reth?tag=v1.8.1#e6608be51ea34424b8e dependencies = [ "eyre", "http", - "jsonrpsee-server", + "jsonrpsee-server 0.26.0", "metrics", "metrics-exporter-prometheus 0.16.2", "metrics-process", @@ -8705,7 +8810,7 @@ dependencies = [ "derive_more", "eyre", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "jsonrpsee-core 0.26.0", "jsonrpsee-types 0.26.0", "metrics", @@ -9048,7 +9153,7 @@ dependencies = [ "http-body", "hyper", "itertools 0.14.0", - "jsonrpsee", + "jsonrpsee 0.26.0", "jsonrpsee-types 0.26.0", "jsonwebtoken", "parking_lot", @@ -9113,7 +9218,7 @@ dependencies = [ "alloy-rpc-types-trace", "alloy-rpc-types-txpool", "alloy-serde", - "jsonrpsee", + "jsonrpsee 0.26.0", "reth-chain-state", "reth-engine-primitives", "reth-network-peers", @@ -9130,7 +9235,7 @@ dependencies = [ "alloy-provider", "dyn-clone", "http", - "jsonrpsee", + "jsonrpsee 0.26.0", "metrics", "pin-project", "reth-chain-state", @@ -9237,7 +9342,7 @@ dependencies = [ "auto_impl", "dyn-clone", "futures", - "jsonrpsee", + "jsonrpsee 0.26.0", "jsonrpsee-types 0.26.0", "parking_lot", "reth-chain-state", @@ -9315,7 +9420,7 @@ source = "git+https://github.com/paradigmxyz/reth.git?tag=v1.4.7#dc7cb6e6670b0da dependencies = [ "alloy-rpc-types-engine", "http", - "jsonrpsee-http-client 0.25.1", + "jsonrpsee-http-client 0.25.1 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project", "tower 0.5.2", "tower-http", @@ -10059,14 +10164,10 @@ version = "0.1.0" dependencies = [ "alloy-consensus", "alloy-eips", - "alloy-json-rpc", "alloy-primitives", - "alloy-rpc-client", "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-serde", - "alloy-transport", - "alloy-transport-http", "anyhow", "assert_cmd", "bytes", @@ -10081,11 +10182,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "jsonrpsee", - "jsonrpsee-core 0.26.0", - "jsonrpsee-http-client 0.26.0", - "jsonrpsee-server", - "jsonrpsee-types 0.26.0", + "jsonrpsee 0.25.1", "metrics", "metrics-derive", "metrics-exporter-prometheus 0.16.2", @@ -10102,7 +10199,6 @@ dependencies = [ "rand 0.9.1", "reqwest", "reth-optimism-payload-builder", - "reth-rpc-eth-types", "reth-rpc-layer 1.4.7", "rustls", "serde", diff --git a/Cargo.toml b/Cargo.toml index 1ce52f95..1a6e5737 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ sha2 = { version = "0.10", default-features = false } # Reth deps reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } -reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } # Alloy libraries alloy-rpc-types-engine = "1.0.35" @@ -40,8 +39,6 @@ alloy-rpc-types = "1.0.35" alloy-genesis = "1.0.35" alloy-rpc-client = "1.0.35" alloy-provider = "1.0.35" -alloy-transport = "1.0.35" -alloy-transport-http = "1.0.35" op-alloy-network = "0.20.0" op-alloy-rpc-types-engine = "0.20.0" op-alloy-consensus = "0.20.0" diff --git a/crates/rollup-boost/Cargo.toml b/crates/rollup-boost/Cargo.toml index 194d8fd8..ef5785f4 100644 --- a/crates/rollup-boost/Cargo.toml +++ b/crates/rollup-boost/Cargo.toml @@ -19,27 +19,23 @@ url.workspace = true sha2.workspace = true reth-optimism-payload-builder.workspace = true -reth-rpc-eth-types.workspace = true op-alloy-rpc-types-engine.workspace = true alloy-rpc-types-engine.workspace = true alloy-rpc-types-eth.workspace = true -alloy-json-rpc.workspace = true alloy-primitives.workspace = true alloy-serde.workspace = true -alloy-rpc-client = { workspace = true, features = ["hyper"]} -alloy-transport = {workspace = true} -alloy-transport-http = {workspace = true, features = ["hyper", "hyper-tls"]} tokio-tungstenite.workspace = true metrics-derive.workspace = true testcontainers.workspace = true -# rpc -jsonrpsee = { version = "0.26.0", features = ["server", "http-client", "macros"] } -jsonrpsee-core = "0.26.0" -jsonrpsee-server = "0.26.0" -jsonrpsee-http-client = "0.26.0" -jsonrpsee-types = "0.26.0" +# TODO: update to latest release when it is published +# jsonrpsee = { version = "0.25.1", features = ["server", "http-client", "macros"] } +jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee", rev = "f04afa740e55db60dce20d9839758792f035ffff", features = [ + "server", + "http-client", + "macros", +] } moka = { version = "0.12.10", features = ["future"] } http = "1.1.0" dotenvy = "0.15.7" diff --git a/crates/rollup-boost/src/cli.rs b/crates/rollup-boost/src/cli.rs index 7358e8de..914a770a 100644 --- a/crates/rollup-boost/src/cli.rs +++ b/crates/rollup-boost/src/cli.rs @@ -2,7 +2,6 @@ use alloy_rpc_types_engine::JwtSecret; use clap::Parser; use eyre::bail; use jsonrpsee::{RpcModule, server::Server}; -use jsonrpsee_core::middleware::RpcServiceBuilder; use parking_lot::Mutex; use std::{ net::{IpAddr, SocketAddr}, @@ -15,7 +14,6 @@ use tracing::{Level, info}; use crate::{ BlockSelectionPolicy, Flashblocks, FlashblocksArgs, ProxyLayer, RollupBoostServer, RpcClient, - RpcProxyClient, client::rpc::{BuilderArgs, L2ClientArgs}, debug_api::ExecutionMode, get_version, init_metrics, @@ -203,23 +201,20 @@ impl RollupBoostArgs { // Build and start the server info!("Starting server on :{}", self.rpc_port); - let http_middleware = tower::ServiceBuilder::new().layer(probe_layer); - let l2_client = RpcProxyClient::new_l2( - l2_client_args.l2_url.clone(), - l2_auth_jwt, - l2_client_args.l2_timeout, - ); - let builder_client = RpcProxyClient::new_builder( - builder_args.builder_url.clone(), - builder_auth_jwt, - builder_args.builder_timeout, - ); - let rpc_middleware = - RpcServiceBuilder::new().layer(ProxyLayer::new(l2_client, builder_client)); + let http_middleware = + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(ProxyLayer::new( + l2_client_args.l2_url, + l2_auth_jwt, + l2_client_args.l2_timeout, + builder_args.builder_url, + builder_auth_jwt, + builder_args.builder_timeout, + )); let server = Server::builder() .set_http_middleware(http_middleware) - .set_rpc_middleware(rpc_middleware) .build(format!("{}:{}", self.rpc_host, self.rpc_port).parse::()?) .await?; let handle = server.start(rpc_module); diff --git a/crates/rollup-boost/src/client/error.rs b/crates/rollup-boost/src/client/error.rs deleted file mode 100644 index fd1e029c..00000000 --- a/crates/rollup-boost/src/client/error.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::{error, fmt}; -use tower::BoxError; - -/// This error is substitution for tower Elapsed error, because it's pub(super) and cannot be used -#[derive(Debug, Default)] -pub struct Elapsed; - -impl fmt::Display for Elapsed { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("request timed out") - } -} - -impl error::Error for Elapsed {} - -#[derive(Debug)] -pub enum HyperError { - Hyper(Box), - Timeout, -} - -impl fmt::Display for HyperError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Hyper(e) => e.fmt(f), - Self::Timeout => Elapsed.fmt(f), - } - } -} - -impl error::Error for HyperError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - Self::Hyper(e) => e.source(), - Self::Timeout => Elapsed.source(), - } - } -} - -impl From for HyperError { - fn from(value: BoxError) -> Self { - value - .downcast::() - .map(HyperError::Hyper) - .unwrap_or_else(|_| HyperError::Timeout) - } -} diff --git a/crates/rollup-boost/src/client/http.rs b/crates/rollup-boost/src/client/http.rs new file mode 100644 index 00000000..5e18edd7 --- /dev/null +++ b/crates/rollup-boost/src/client/http.rs @@ -0,0 +1,116 @@ +use std::time::Duration; + +use crate::client::auth::AuthLayer; +use crate::payload::PayloadSource; +use alloy_primitives::bytes::Bytes; +use alloy_rpc_types_engine::JwtSecret; +use http::Uri; +use http_body_util::{BodyExt, Full}; +use hyper::body::Body; +use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::Client; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use jsonrpsee::core::BoxError; +use jsonrpsee::server::HttpBody; +use opentelemetry::trace::SpanKind; +use tower::{ + Service as _, ServiceBuilder, ServiceExt, + timeout::{Timeout, TimeoutLayer}, +}; +use tower_http::decompression::{Decompression, DecompressionLayer}; +use tracing::{debug, error, instrument}; + +use super::auth::Auth; + +pub type HttpClientService = + Timeout, HttpBody>>>>; + +#[derive(Clone, Debug)] +pub struct HttpClient { + client: HttpClientService, + url: Uri, + target: PayloadSource, +} + +impl HttpClient { + pub fn new(url: Uri, secret: JwtSecret, target: PayloadSource, timeout: u64) -> Self { + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .expect("no native root CA certificates found") + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let client = ServiceBuilder::new() + .layer(TimeoutLayer::new(Duration::from_millis(timeout))) + .layer(DecompressionLayer::new()) + .layer(AuthLayer::new(secret)) + .service(client); + + Self { + client, + url, + target, + } + } + + /// Forwards an HTTP request to the `authrpc`, attaching the provided JWT authorization. + #[instrument( + skip(self, req), + fields( + otel.kind = ?SpanKind::Client, + url = %self.url, + method, + code, + ), + err(Debug) + )] + pub async fn forward( + &mut self, + mut req: http::Request, + method: String, + ) -> Result>, BoxError> + where + B: Body>> + + Send + + 'static, + { + debug!("forwarding {} to {}", method, self.target); + tracing::Span::current().record("method", method); + *req.uri_mut() = self.url.clone(); + + let req = req.map(HttpBody::new); + + let res = self.client.ready().await?.call(req).await?; + + let (parts, body) = res.into_parts(); + let body_bytes = body.collect().await?.to_bytes(); + + if let Some(code) = parse_response_code(&body_bytes)? { + error!(%code, "error in forwarded response"); + tracing::Span::current().record("code", code); + } + + Ok(http::Response::from_parts(parts, Full::from(body_bytes))) + } +} + +fn parse_response_code(body_bytes: &[u8]) -> eyre::Result> { + #[derive(serde::Deserialize, Debug)] + struct RpcResponse { + error: Option, + } + + #[derive(serde::Deserialize, Debug)] + struct JsonRpcError { + code: i32, + } + + let res = serde_json::from_slice::(body_bytes)?; + + Ok(res.error.map(|e| e.code)) +} diff --git a/crates/rollup-boost/src/client/mod.rs b/crates/rollup-boost/src/client/mod.rs index 2f629832..364eb9b9 100644 --- a/crates/rollup-boost/src/client/mod.rs +++ b/crates/rollup-boost/src/client/mod.rs @@ -1,4 +1,3 @@ pub mod auth; -pub mod error; -pub mod proxy_rpc; +pub mod http; pub mod rpc; diff --git a/crates/rollup-boost/src/client/proxy_rpc.rs b/crates/rollup-boost/src/client/proxy_rpc.rs deleted file mode 100644 index a9004aa0..00000000 --- a/crates/rollup-boost/src/client/proxy_rpc.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::time::Duration; - -use crate::AuthLayer; -use crate::client::error::HyperError; -use crate::payload::PayloadSource; -use alloy_json_rpc::{RpcError, RpcRecv, RpcSend}; -use alloy_rpc_client::RpcClient; -use alloy_rpc_types_engine::JwtSecret; -use alloy_transport::TransportResult; -use alloy_transport_http::{Http, HyperClient}; -use http::Uri; -use http_body_util::Full; -use hyper_util::client::legacy::Client; -use hyper_util::rt::TokioExecutor; -use opentelemetry::trace::SpanKind; -use tower::ServiceBuilder; -use tower::util::MapErrLayer; -use tracing::{debug, instrument}; - -#[derive(Clone, Debug)] -pub struct RpcProxyClient { - client: RpcClient, - url: Uri, - target: PayloadSource, -} - -impl RpcProxyClient { - pub fn new(url: Uri, secret: JwtSecret, target: PayloadSource, timeout: u64) -> Self { - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .expect("can find native root CA certificates") - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - let client = - Client::builder(TokioExecutor::new()).build::<_, Full>(connector); - - let service = ServiceBuilder::new() - // This layer formats error, because timeout layer erases error types and we need it for alloy - .layer(MapErrLayer::new(HyperError::from)) - // TODO: Add DecompressionLayer - .layer(AuthLayer::new(secret)) - .timeout(Duration::from_secs(timeout)) - .service(client); - - let layer_transport = HyperClient::with_service(service); - let http = Http::with_client(layer_transport, url.to_string().parse().unwrap()); - let local = http.guess_local(); - let client = RpcClient::new(http, local); - - Self { - client, - url, - target, - } - } - - pub fn new_l2(url: Uri, secret: JwtSecret, timeout: u64) -> Self { - Self::new(url, secret, PayloadSource::L2, timeout) - } - - pub fn new_builder(url: Uri, secret: JwtSecret, timeout: u64) -> Self { - Self::new(url, secret, PayloadSource::Builder, timeout) - } - - /// Forwards a JSON-RPC request to the endpoint - #[instrument( - skip(self), - fields( - otel.kind = ?SpanKind::Client, - url = %self.url, - method, - code, - ), - err(Display) - )] - pub async fn request( - &self, - method: &str, - params: Params, - ) -> TransportResult { - debug!("forwarding {} to {}", method, self.target); - tracing::Span::current().record("method", method); - let resp = self - .client - .request::(method.to_string(), params) - .await - .inspect_err(|err| { - if let RpcError::ErrorResp(err) = err { - tracing::Span::current().record("code", err.code); - } - })?; - Ok(resp) - } -} diff --git a/crates/rollup-boost/src/lib.rs b/crates/rollup-boost/src/lib.rs index 7ef9e6f1..5d79271e 100644 --- a/crates/rollup-boost/src/lib.rs +++ b/crates/rollup-boost/src/lib.rs @@ -1,7 +1,7 @@ #![allow(clippy::complexity)] mod client; -pub use client::{auth::*, proxy_rpc::*, rpc::*}; +pub use client::{auth::*, http::*, rpc::*}; mod cli; pub use cli::*; diff --git a/crates/rollup-boost/src/proxy.rs b/crates/rollup-boost/src/proxy.rs index 566e4128..4853907f 100644 --- a/crates/rollup-boost/src/proxy.rs +++ b/crates/rollup-boost/src/proxy.rs @@ -1,12 +1,15 @@ -use crate::RpcProxyClient; -use futures::FutureExt; -use jsonrpsee::MethodResponse; -use jsonrpsee::core::middleware::{Batch, Notification, RpcServiceT}; -use jsonrpsee_types::{ErrorCode, ErrorObject}; -use reth_rpc_eth_types::error::ToRpcError; -use std::future::Future; -use tower::Layer; -use tracing::{error, info}; +use crate::client::http::HttpClient; +use crate::payload::PayloadSource; +use crate::{Request, Response, from_buffered_request, into_buffered_request}; +use alloy_rpc_types_engine::JwtSecret; +use http::Uri; +use http_body_util::BodyExt as _; +use jsonrpsee::core::BoxError; +use jsonrpsee::server::HttpBody; +use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin}; +use tower::{Layer, Service}; +use tracing::info; const ENGINE_METHOD: &str = "engine_"; @@ -22,15 +25,30 @@ const FORWARD_REQUESTS: [&str; 6] = [ #[derive(Debug, Clone)] pub struct ProxyLayer { - l2_client: RpcProxyClient, - builder_client: RpcProxyClient, + l2_auth_rpc: Uri, + l2_auth_secret: JwtSecret, + l2_timeout: u64, + builder_auth_rpc: Uri, + builder_auth_secret: JwtSecret, + builder_timeout: u64, } impl ProxyLayer { - pub fn new(l2_client: RpcProxyClient, builder_client: RpcProxyClient) -> Self { + pub fn new( + l2_auth_rpc: Uri, + l2_auth_secret: JwtSecret, + l2_timeout: u64, + builder_auth_rpc: Uri, + builder_auth_secret: JwtSecret, + builder_timeout: u64, + ) -> Self { ProxyLayer { - l2_client, - builder_client, + l2_auth_rpc, + l2_auth_secret, + l2_timeout, + builder_auth_rpc, + builder_auth_secret, + builder_timeout, } } } @@ -39,10 +57,24 @@ impl Layer for ProxyLayer { type Service = ProxyService; fn layer(&self, inner: S) -> Self::Service { + let l2_client = HttpClient::new( + self.l2_auth_rpc.clone(), + self.l2_auth_secret, + PayloadSource::L2, + self.l2_timeout, + ); + + let builder_client = HttpClient::new( + self.builder_auth_rpc.clone(), + self.builder_auth_secret, + PayloadSource::Builder, + self.builder_timeout, + ); + ProxyService { inner, - l2_client: self.l2_client.clone(), - builder_client: self.builder_client.clone(), + l2_client, + builder_client, } } } @@ -50,103 +82,80 @@ impl Layer for ProxyLayer { #[derive(Clone)] pub struct ProxyService { inner: S, - l2_client: RpcProxyClient, - builder_client: RpcProxyClient, + l2_client: HttpClient, + builder_client: HttpClient, } -impl RpcServiceT for ProxyService +// Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged +impl Service for ProxyService where - S: RpcServiceT + Send + Sync + Clone + 'static, + S: Service + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, { - type MethodResponse = S::MethodResponse; - type NotificationResponse = S::NotificationResponse; - type BatchResponse = S::BatchResponse; - - fn call<'a>( - &self, - request: jsonrpsee::types::Request<'a>, - ) -> impl Future + Send + 'a { - if request.method_name().starts_with(ENGINE_METHOD) { - info!(target: "proxy::call", message = "proxying request to rollup-boost server", method = request.method_name()); - return self.inner.call(request).boxed(); - } - let l2_client = self.l2_client.clone(); - let builder_client = self.builder_client.clone(); - async move { - // Proxy request to builder if needed - maybe_proxy_to_builder(builder_client, &request); - proxy_to_el(l2_client, request).await - } - .boxed() - } + type Response = Response; + type Error = BoxError; + type Future = + Pin> + Send + 'static>>; - fn batch<'a>( - &self, - requests: Batch<'a>, - ) -> impl Future + Send + 'a { - self.inner.batch(requests) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) } - fn notification<'a>( - &self, - n: Notification<'a>, - ) -> impl Future + Send + 'a { - self.inner.notification(n) - } -} - -pub async fn proxy_to_el( - l2_client: RpcProxyClient, - request: jsonrpsee::types::Request<'_>, -) -> MethodResponse { - let params = request.params(); - let params_str = params.as_str().unwrap_or("[]"); - - let params = serde_json::from_str::(params_str); - let params = match params { - Ok(params) => params, - Err(_e) => { - return MethodResponse::error( - request.id.clone(), - ErrorObject::from(ErrorCode::ParseError), - ); - } - }; - let raw = l2_client - .request::<_, serde_json::Value>(request.method_name(), params) - .await; - match raw { - Ok(raw) => { - let payload = jsonrpsee_types::ResponsePayload::success(raw).into(); - MethodResponse::response(request.id.clone(), payload, usize::MAX) + fn call(&mut self, req: Request) -> Self::Future { + #[derive(serde::Deserialize, Debug)] + struct RpcRequest<'a> { + #[serde(borrow)] + method: &'a str, } - Err(e) => MethodResponse::error(request.id.clone(), ErrorObject::from(e.to_rpc_error())), - } -} -pub fn maybe_proxy_to_builder( - builder_client: RpcProxyClient, - request: &jsonrpsee::types::Request<'_>, -) { - if FORWARD_REQUESTS.contains(&request.method_name()) { - let method = request.method_name().to_string(); - let params = request.params.clone().map(|p| p.to_string()); - - tokio::spawn(async move { - let params_str = params.unwrap_or(String::from("[]")); - let params = serde_json::from_str::(params_str.as_str()); - let params = match params { - Ok(params) => params, - Err(err) => { - error!("Failed to parse params from request: {}", err); - return; - } - }; - // We handle the error inside request - let _ = builder_client - .request::<_, serde_json::Value>(method.as_str(), params) - .await; - }); + // See https://github.com/tower-rs/tower/blob/abb375d08cf0ba34c1fe76f66f1aba3dc4341013/tower-service/src/lib.rs#L276 + // for an explanation of this pattern + let mut service = self.clone(); + service.inner = std::mem::replace(&mut self.inner, service.inner); + + let fut = async move { + let buffered = into_buffered_request(req).await?; + let body_bytes = buffered.clone().collect().await?.to_bytes(); + + // Deserialize the bytes to find the method + let method = serde_json::from_slice::(&body_bytes)? + .method + .to_string(); + + // If the request is an Engine API method, call the inner RollupBoostServer + if method.starts_with(ENGINE_METHOD) { + info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); + return service + .inner + .call(from_buffered_request(buffered)) + .await + .map_err(|e| e.into()); + } + + if FORWARD_REQUESTS.contains(&method.as_str()) { + // If the request should be forwarded, send to both the + // default execution client and the builder + let method_clone = method.clone(); + let buffered_clone = buffered.clone(); + let mut builder_client = service.builder_client.clone(); + + // Fire and forget the builder request + tokio::spawn(async move { + let _ = builder_client.forward(buffered_clone, method_clone).await; + }); + } + + // Return the response from the L2 client + service + .l2_client + .forward(buffered, method) + .await + .map(|res| res.map(HttpBody::new)) + }; + + Box::pin(fut) } } @@ -158,7 +167,7 @@ mod tests { use alloy_primitives::{B256, Bytes, U64, U128, hex}; use alloy_rpc_types_engine::JwtSecret; use alloy_rpc_types_eth::erc4337::TransactionConditional; - use http::{StatusCode, Uri}; + use http::StatusCode; use http_body_util::{BodyExt, Full}; use hyper::service::service_fn; use hyper_util::client::legacy::Client; @@ -173,7 +182,6 @@ mod tests { rpc_params, server::{ServerBuilder, ServerHandle}, }; - use jsonrpsee_core::middleware::RpcServiceBuilder; use serde_json::json; use std::{ net::{IpAddr, SocketAddr}, @@ -204,25 +212,21 @@ mod tests { async fn new() -> eyre::Result { let builder = MockHttpServer::serve().await?; let l2 = MockHttpServer::serve().await?; - let l2_client = RpcProxyClient::new_l2( + let middleware = tower::ServiceBuilder::new().layer(ProxyLayer::new( format!("http://{}:{}", l2.addr.ip(), l2.addr.port()).parse::()?, JwtSecret::random(), 1, - ); - let builder_client = RpcProxyClient::new_builder( format!("http://{}:{}", builder.addr.ip(), builder.addr.port()).parse::()?, JwtSecret::random(), 1, - ); - let middleware = - RpcServiceBuilder::new().layer(ProxyLayer::new(l2_client, builder_client)); + )); let temp_listener = TcpListener::bind("127.0.0.1:0").await?; let server_addr = temp_listener.local_addr()?; drop(temp_listener); let server = Server::builder() - .set_rpc_middleware(middleware.clone()) + .set_http_middleware(middleware.clone()) .build(server_addr) .await?; @@ -476,15 +480,15 @@ mod tests { .unwrap(); let (probe_layer, _) = ProbeLayer::new(); - let l2_client = RpcProxyClient::new_l2(l2_auth_uri.clone(), jwt, 1); - let builder_client = RpcProxyClient::new_builder(l2_auth_uri, jwt, 1); - - let proxy_layer = ProxyLayer::new(l2_client, builder_client); + let proxy_layer = ProxyLayer::new(l2_auth_uri.clone(), jwt, 1, l2_auth_uri, jwt, 1); // Create a layered server let server = ServerBuilder::default() - .set_http_middleware(tower::ServiceBuilder::new().layer(probe_layer)) - .set_rpc_middleware(RpcServiceBuilder::new().layer(proxy_layer)) + .set_http_middleware( + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(proxy_layer), + ) .build(addr.parse::().unwrap()) .await .unwrap(); diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index cc478c62..aabb1ee1 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -682,7 +682,6 @@ pub fn from_buffered_request(req: BufferedRequest) -> HttpRequest { #[allow(clippy::complexity)] pub mod tests { use super::*; - use crate::RpcProxyClient; use crate::probe::ProbeLayer; use crate::proxy::ProxyLayer; use alloy_primitives::hex; @@ -695,7 +694,6 @@ pub mod tests { use jsonrpsee::RpcModule; use jsonrpsee::http_client::HttpClient; use jsonrpsee::server::{Server, ServerBuilder, ServerHandle}; - use jsonrpsee_core::middleware::RpcServiceBuilder; use parking_lot::Mutex; use std::net::SocketAddr; use std::str::FromStr; @@ -824,14 +822,20 @@ pub mod tests { let module: RpcModule<()> = rollup_boost.try_into().unwrap(); - let http_middleware = tower::ServiceBuilder::new().layer(probe_layer); - let l2_client = RpcProxyClient::new_l2(l2_auth_rpc, jwt_secret, 1); - let builder_client = RpcProxyClient::new_builder(builder_auth_rpc, jwt_secret, 1); - let rpc_middleware = - RpcServiceBuilder::new().layer(ProxyLayer::new(l2_client, builder_client)); + let http_middleware = + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(ProxyLayer::new( + l2_auth_rpc, + jwt_secret, + 1, + builder_auth_rpc, + jwt_secret, + 1, + )); + let server = Server::builder() .set_http_middleware(http_middleware) - .set_rpc_middleware(rpc_middleware) .build("127.0.0.1:0".parse::().unwrap()) .await .unwrap();