diff --git a/crates/rollup-boost/src/cli.rs b/crates/rollup-boost/src/cli.rs index 914a770a..6da62fb9 100644 --- a/crates/rollup-boost/src/cli.rs +++ b/crates/rollup-boost/src/cli.rs @@ -1,25 +1,18 @@ -use alloy_rpc_types_engine::JwtSecret; use clap::Parser; -use eyre::bail; use jsonrpsee::{RpcModule, server::Server}; -use parking_lot::Mutex; -use std::{ - net::{IpAddr, SocketAddr}, - path::PathBuf, - str::FromStr, - sync::Arc, -}; +use std::{net::SocketAddr, path::PathBuf}; use tokio::signal::unix::{SignalKind, signal as unix_signal}; use tracing::{Level, info}; use crate::{ - BlockSelectionPolicy, Flashblocks, FlashblocksArgs, ProxyLayer, RollupBoostServer, RpcClient, + BlockSelectionPolicy, ClientArgs, DebugServer, FlashblocksArgs, PayloadSource, ProxyLayer, + RollupBoostServer, client::rpc::{BuilderArgs, L2ClientArgs}, debug_api::ExecutionMode, get_version, init_metrics, - payload::PayloadSource, probe::ProbeLayer, }; +use crate::{FlashblocksService, RpcClient}; #[derive(Clone, Parser, Debug)] #[clap(author, version = get_version(), about)] @@ -112,90 +105,34 @@ impl RollupBoostArgs { init_metrics(&self)?; let debug_addr = format!("{}:{}", self.debug_host, self.debug_server_port); - let l2_client_args = self.l2_client; - - let l2_auth_jwt = if let Some(secret) = l2_client_args.l2_jwt_token { - secret - } else if let Some(path) = l2_client_args.l2_jwt_path.as_ref() { - JwtSecret::from_file(path)? - } else { - bail!("Missing L2 Client JWT secret"); - }; + let l2_client_args: ClientArgs = self.l2_client.clone().into(); + let l2_http_client = l2_client_args.new_http_client(PayloadSource::L2)?; - let l2_client = RpcClient::new( - l2_client_args.l2_url.clone(), - l2_auth_jwt, - l2_client_args.l2_timeout, - PayloadSource::L2, - )?; - - let builder_args = self.builder; - let builder_auth_jwt = if let Some(secret) = builder_args.builder_jwt_token { - secret - } else if let Some(path) = builder_args.builder_jwt_path.as_ref() { - JwtSecret::from_file(path)? - } else { - bail!("Missing Builder JWT secret"); - }; - - let builder_client = RpcClient::new( - builder_args.builder_url.clone(), - builder_auth_jwt, - builder_args.builder_timeout, - PayloadSource::Builder, - )?; + let builder_client_args: ClientArgs = self.builder.clone().into(); + let builder_http_client = builder_client_args.new_http_client(PayloadSource::Builder)?; let (probe_layer, probes) = ProbeLayer::new(); - let execution_mode = Arc::new(Mutex::new(self.execution_mode)); - - let (rpc_module, health_handle): (RpcModule<()>, _) = if self.flashblocks.flashblocks { - let flashblocks_args = self.flashblocks; - let inbound_url = flashblocks_args.flashblocks_builder_url; - let outbound_addr = SocketAddr::new( - IpAddr::from_str(&flashblocks_args.flashblocks_host)?, - flashblocks_args.flashblocks_port, - ); - - let builder_client = Arc::new(Flashblocks::run( - builder_client.clone(), - inbound_url, - outbound_addr, - flashblocks_args.flashblock_builder_ws_reconnect_ms, - )?); - - let rollup_boost = RollupBoostServer::new( - l2_client, - builder_client, - execution_mode.clone(), - self.block_selection_policy, - probes.clone(), - self.external_state_root, - self.ignore_unhealthy_builders, - ); + let (health_handle, rpc_module) = if self.flashblocks.flashblocks { + let rollup_boost = RollupBoostServer::::new_from_args( + self.clone(), + probes.clone(), + )?; let health_handle = rollup_boost .spawn_health_check(self.health_check_interval, self.max_unsafe_interval); - - // Spawn the debug server - rollup_boost.start_debug_server(debug_addr.as_str()).await?; - (rollup_boost.try_into()?, health_handle) + let debug_server = DebugServer::new(rollup_boost.execution_mode.clone()); + debug_server.run(&debug_addr).await?; + let rpc_module: RpcModule<()> = rollup_boost.try_into()?; + (health_handle, rpc_module) } else { - let rollup_boost = RollupBoostServer::new( - l2_client, - Arc::new(builder_client), - execution_mode.clone(), - self.block_selection_policy, - probes.clone(), - self.external_state_root, - self.ignore_unhealthy_builders, - ); - + let rollup_boost = + RollupBoostServer::::new_from_args(self.clone(), probes.clone())?; let health_handle = rollup_boost .spawn_health_check(self.health_check_interval, self.max_unsafe_interval); - - // Spawn the debug server - rollup_boost.start_debug_server(debug_addr.as_str()).await?; - (rollup_boost.try_into()?, health_handle) + let debug_server = DebugServer::new(rollup_boost.execution_mode.clone()); + debug_server.run(&debug_addr).await?; + let rpc_module: RpcModule<()> = rollup_boost.try_into()?; + (health_handle, rpc_module) }; // Build and start the server @@ -205,12 +142,8 @@ impl RollupBoostArgs { tower::ServiceBuilder::new() .layer(probe_layer) .layer(ProxyLayer::new( - l2_client_args.l2_url, - l2_auth_jwt, - l2_client_args.l2_timeout, - builder_args.builder_url, - builder_auth_jwt, - builder_args.builder_timeout, + l2_http_client.clone(), + builder_http_client.clone(), )); let server = Server::builder() @@ -247,6 +180,12 @@ impl RollupBoostArgs { } } +impl Default for RollupBoostArgs { + fn default() -> Self { + Self::parse_from::<_, &str>(std::iter::empty()) + } +} + #[derive(Clone, Debug)] pub enum LogFormat { Json, diff --git a/crates/rollup-boost/src/client/rpc.rs b/crates/rollup-boost/src/client/rpc.rs index 6633e643..8560496a 100644 --- a/crates/rollup-boost/src/client/rpc.rs +++ b/crates/rollup-boost/src/client/rpc.rs @@ -1,5 +1,6 @@ use crate::EngineApiExt; use crate::client::auth::AuthLayer; +use crate::client::http::HttpClient as RollupBoostHttpClient; use crate::payload::{NewPayload, OpExecutionPayloadEnvelope, PayloadSource, PayloadVersion}; use crate::server::EngineApiClient; use crate::version::{CARGO_PKG_VERSION, VERGEN_GIT_SHA}; @@ -10,6 +11,7 @@ use alloy_rpc_types_engine::{ }; use alloy_rpc_types_eth::{Block, BlockNumberOrTag}; use clap::{Parser, arg}; +use eyre::bail; use http::{HeaderMap, Uri}; use jsonrpsee::core::async_trait; use jsonrpsee::core::middleware::layer::RpcLogger; @@ -101,7 +103,7 @@ impl From for ErrorObjectOwned { /// /// - **Engine API** calls are faciliated via the `auth_client` (requires JWT authentication). /// -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RpcClient { /// Handles requests to the authenticated Engine API (requires JWT authentication) auth_client: RpcClientService, @@ -376,8 +378,57 @@ impl EngineApiExt for RpcClient { } } +#[derive(Debug, Clone)] +pub struct ClientArgs { + /// Auth server address + pub url: Uri, + + /// Hex encoded JWT secret to use for the authenticated engine-API RPC server. + pub jwt_token: Option, + + /// Path to a JWT secret to use for the authenticated engine-API RPC server. + pub jwt_path: Option, + + /// Timeout for http calls in milliseconds + pub timeout: u64, +} + +impl ClientArgs { + fn get_auth_jwt(&self) -> eyre::Result { + if let Some(secret) = self.jwt_token { + Ok(secret) + } else if let Some(path) = self.jwt_path.as_ref() { + Ok(JwtSecret::from_file(path)?) + } else { + bail!("Missing Client JWT secret"); + } + } + + pub fn new_rpc_client(&self, payload_source: PayloadSource) -> eyre::Result { + RpcClient::new( + self.url.clone(), + self.get_auth_jwt()?, + self.timeout, + payload_source, + ) + .map_err(eyre::Report::from) + } + + pub fn new_http_client( + &self, + payload_source: PayloadSource, + ) -> eyre::Result { + Ok(RollupBoostHttpClient::new( + self.url.clone(), + self.get_auth_jwt()?, + payload_source, + self.timeout, + )) + } +} + /// Generates Clap argument structs with a prefix to create a unique namespace when specifying RPC client config via the CLI. -macro_rules! define_rpc_args { +macro_rules! define_client_args { ($(($name:ident, $prefix:ident)),*) => { $( paste! { @@ -399,12 +450,24 @@ macro_rules! define_rpc_args { #[arg(long, env, default_value_t = 1000)] pub [<$prefix _timeout>]: u64, } + + + impl From<$name> for ClientArgs { + fn from(args: $name) -> Self { + ClientArgs { + url: args.[<$prefix _url>].clone(), + jwt_token: args.[<$prefix _jwt_token>].clone(), + jwt_path: args.[<$prefix _jwt_path>], + timeout: args.[<$prefix _timeout>], + } + } + } } )* }; } -define_rpc_args!((BuilderArgs, builder), (L2ClientArgs, l2)); +define_client_args!((BuilderArgs, builder), (L2ClientArgs, l2)); #[cfg(test)] pub mod tests { diff --git a/crates/rollup-boost/src/flashblocks/mod.rs b/crates/rollup-boost/src/flashblocks/mod.rs index 3cdd3e7e..2e219c61 100644 --- a/crates/rollup-boost/src/flashblocks/mod.rs +++ b/crates/rollup-boost/src/flashblocks/mod.rs @@ -4,9 +4,9 @@ pub use launcher::*; mod primitives; mod service; +pub use service::*; pub use primitives::*; -pub use service::*; mod inbound; mod outbound; diff --git a/crates/rollup-boost/src/flashblocks/service.rs b/crates/rollup-boost/src/flashblocks/service.rs index 9850d45c..209a3efc 100644 --- a/crates/rollup-boost/src/flashblocks/service.rs +++ b/crates/rollup-boost/src/flashblocks/service.rs @@ -164,7 +164,7 @@ impl FlashblockBuilder { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FlashblocksService { client: RpcClient, diff --git a/crates/rollup-boost/src/payload.rs b/crates/rollup-boost/src/payload.rs index 818c59d1..8efad848 100644 --- a/crates/rollup-boost/src/payload.rs +++ b/crates/rollup-boost/src/payload.rs @@ -239,6 +239,7 @@ pub struct PayloadTrace { pub trace_id: Option, } +#[derive(Debug)] pub struct PayloadTraceContext { block_hash_to_payload_ids: Cache>, payload_id: Cache, diff --git a/crates/rollup-boost/src/probe.rs b/crates/rollup-boost/src/probe.rs index 18b64c6d..f297e4d8 100644 --- a/crates/rollup-boost/src/probe.rs +++ b/crates/rollup-boost/src/probe.rs @@ -62,7 +62,7 @@ pub struct ProbeLayer { } impl ProbeLayer { - pub(crate) fn new() -> (Self, Arc) { + pub fn new() -> (Self, Arc) { let probes = Arc::new(Probes::default()); ( Self { diff --git a/crates/rollup-boost/src/proxy.rs b/crates/rollup-boost/src/proxy.rs index 4853907f..64709ccb 100644 --- a/crates/rollup-boost/src/proxy.rs +++ b/crates/rollup-boost/src/proxy.rs @@ -1,8 +1,5 @@ use crate::client::http::HttpClient; -use crate::payload::PayloadSource; use crate::{Request, Response, from_buffered_request, into_buffered_request}; -use alloy_rpc_types_engine::JwtSecret; -use http::Uri; use http_body_util::BodyExt as _; use jsonrpsee::core::BoxError; use jsonrpsee::server::HttpBody; @@ -25,30 +22,15 @@ const FORWARD_REQUESTS: [&str; 6] = [ #[derive(Debug, Clone)] pub struct ProxyLayer { - l2_auth_rpc: Uri, - l2_auth_secret: JwtSecret, - l2_timeout: u64, - builder_auth_rpc: Uri, - builder_auth_secret: JwtSecret, - builder_timeout: u64, + l2_client: HttpClient, + builder_client: HttpClient, } impl ProxyLayer { - pub fn new( - l2_auth_rpc: Uri, - l2_auth_secret: JwtSecret, - l2_timeout: u64, - builder_auth_rpc: Uri, - builder_auth_secret: JwtSecret, - builder_timeout: u64, - ) -> Self { + pub fn new(l2_client: HttpClient, builder_client: HttpClient) -> Self { ProxyLayer { - l2_auth_rpc, - l2_auth_secret, - l2_timeout, - builder_auth_rpc, - builder_auth_secret, - builder_timeout, + l2_client, + builder_client, } } } @@ -57,24 +39,10 @@ impl Layer for ProxyLayer { type Service = ProxyService; fn layer(&self, inner: S) -> Self::Service { - let l2_client = HttpClient::new( - self.l2_auth_rpc.clone(), - self.l2_auth_secret, - PayloadSource::L2, - self.l2_timeout, - ); - - let builder_client = HttpClient::new( - self.builder_auth_rpc.clone(), - self.builder_auth_secret, - PayloadSource::Builder, - self.builder_timeout, - ); - ProxyService { inner, - l2_client, - builder_client, + l2_client: self.l2_client.clone(), + builder_client: self.builder_client.clone(), } } } @@ -162,12 +130,13 @@ where #[cfg(test)] mod tests { use crate::probe::ProbeLayer; + use crate::{ClientArgs, PayloadSource}; use super::*; use alloy_primitives::{B256, Bytes, U64, U128, hex}; use alloy_rpc_types_engine::JwtSecret; use alloy_rpc_types_eth::erc4337::TransactionConditional; - use http::StatusCode; + use http::{StatusCode, Uri}; use http_body_util::{BodyExt, Full}; use hyper::service::service_fn; use hyper_util::client::legacy::Client; @@ -183,17 +152,14 @@ mod tests { server::{ServerBuilder, ServerHandle}, }; use serde_json::json; - use std::{ - net::{IpAddr, SocketAddr}, - str::FromStr, - sync::Arc, - }; + use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; use tokio::task::JoinHandle; - const PORT: u32 = 8552; - const ADDR: &str = "127.0.0.1"; - const PROXY_PORT: u32 = 8553; + // A JSON-RPC error is retriable if error.code ∉ (-32700, -32600] + fn is_retriable_code(code: i32) -> bool { + code < -32700 || code > -32600 + } struct TestHarness { builder: MockHttpServer, @@ -213,12 +179,22 @@ mod tests { let builder = MockHttpServer::serve().await?; let l2 = MockHttpServer::serve().await?; let middleware = tower::ServiceBuilder::new().layer(ProxyLayer::new( - format!("http://{}:{}", l2.addr.ip(), l2.addr.port()).parse::()?, - JwtSecret::random(), - 1, - format!("http://{}:{}", builder.addr.ip(), builder.addr.port()).parse::()?, - JwtSecret::random(), - 1, + ClientArgs { + url: format!("http://{}:{}", l2.addr.ip(), l2.addr.port()).parse::()?, + jwt_token: Some(JwtSecret::random()), + jwt_path: None, + timeout: 1, + } + .new_http_client(PayloadSource::L2) + .unwrap(), + ClientArgs { + url: format!("http://{}:{}", builder.addr.ip(), builder.addr.port()) + .parse::()?, + jwt_token: Some(JwtSecret::random()), + jwt_path: None, + timeout: 1, + } + .new_http_client(PayloadSource::Builder)?, )); let temp_listener = TcpListener::bind("127.0.0.1:0").await?; @@ -252,10 +228,22 @@ mod tests { addr: SocketAddr, requests: Arc>>, join_handle: JoinHandle<()>, + shutdown_tx: Option>, + connections: Arc>>>, } impl Drop for MockHttpServer { fn drop(&mut self) { + // Send shutdown signal if available + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + // Abort active connections to simulate a crash closing open sockets + if let Ok(mut conns) = self.connections.try_lock() { + for handle in conns.drain(..) { + handle.abort(); + } + } self.join_handle.abort(); } } @@ -264,31 +252,58 @@ mod tests { async fn serve() -> eyre::Result { let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; + Self::serve_with_listener(listener, addr).await + } + + async fn serve_on_addr(addr: SocketAddr) -> eyre::Result { + let listener = TcpListener::bind(addr).await?; + let actual_addr = listener.local_addr()?; + Self::serve_with_listener(listener, actual_addr).await + } + + async fn serve_with_listener( + listener: TcpListener, + addr: SocketAddr, + ) -> eyre::Result { let requests = Arc::new(tokio::sync::Mutex::new(vec![])); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let connections: Arc>>> = + Arc::new(tokio::sync::Mutex::new(Vec::new())); let requests_clone = requests.clone(); + let connections_clone = connections.clone(); let handle = tokio::spawn(async move { loop { - match listener.accept().await { - Ok((stream, _)) => { - let io = TokioIo::new(stream); - let requests = requests_clone.clone(); - - tokio::spawn(async move { - if let Err(err) = hyper::server::conn::http1::Builder::new() - .serve_connection( - io, - service_fn(move |req| { - Self::handle_request(req, requests.clone()) - }), - ) - .await - { - eprintln!("Error serving connection: {}", err); + tokio::select! { + _ = &mut shutdown_rx => { + // Shutdown signal received + break; + } + result = listener.accept() => { + match result { + Ok((stream, _)) => { + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + let conn_task = tokio::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| { + Self::handle_request(req, requests.clone()) + }), + ) + .await + { + eprintln!("Error serving connection: {}", err); + } + }); + // Track the connection task so we can abort on crash + connections_clone.lock().await.push(conn_task); } - }); + Err(e) => eprintln!("Error accepting connection: {}", e), + } } - Err(e) => eprintln!("Error accepting connection: {}", e), } } }); @@ -297,6 +312,8 @@ mod tests { addr, requests, join_handle: handle, + shutdown_tx: Some(shutdown_tx), + connections, }) } @@ -413,13 +430,15 @@ mod tests { } async fn health_check() { - let proxy_server = spawn_proxy_server().await; + // Spawn a backend for the proxy to point to, and a proxy with dynamic port + let (backend_server, backend_addr) = spawn_server().await; + let (proxy_server, proxy_addr) = spawn_proxy_server_with_l2(backend_addr).await; // Create a new HTTP client let client: Client> = Client::builder(TokioExecutor::new()).build_http(); // Test the health check endpoint - let health_check_url = format!("http://{ADDR}:{PORT}/healthz"); + let health_check_url = format!("http://{proxy_addr}/healthz"); let health_response = client.get(health_check_url.parse::().unwrap()).await; assert!(health_response.is_ok()); let status = health_response.unwrap().status(); @@ -427,36 +446,34 @@ mod tests { proxy_server.stop().unwrap(); proxy_server.stopped().await; + backend_server.stop().unwrap(); + backend_server.stopped().await; } async fn send_request(method: &str) -> Result { - let server = spawn_server().await; - let proxy_server = spawn_proxy_server().await; + let (backend_server, backend_addr) = spawn_server().await; + let (proxy_server, proxy_addr) = spawn_proxy_server_with_l2(backend_addr).await; let proxy_client = HttpClient::builder() - .build(format!("http://{ADDR}:{PORT}")) + .build(format!("http://{}", proxy_addr)) .unwrap(); let response = proxy_client .request::(method, rpc_params![]) .await; - server.stop().unwrap(); - server.stopped().await; + backend_server.stop().unwrap(); + backend_server.stopped().await; proxy_server.stop().unwrap(); proxy_server.stopped().await; response } - async fn spawn_server() -> ServerHandle { - let server = ServerBuilder::default() - .build( - format!("{ADDR}:{PROXY_PORT}") - .parse::() - .unwrap(), - ) - .await - .unwrap(); + async fn spawn_server() -> (ServerHandle, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + drop(listener); + let server = ServerBuilder::default().build(addr).await.unwrap(); // Create a mock rpc module let mut module = RpcModule::new(()); @@ -464,23 +481,37 @@ mod tests { .register_method("greet_melkor", |_, _, _| "You are the dark lord") .unwrap(); - server.start(module) + (server.start(module), addr) } - /// Spawn a new RPC server with a proxy layer. - async fn spawn_proxy_server() -> ServerHandle { - let addr = format!("{ADDR}:{PORT}"); + /// Spawn a new RPC server with a proxy layer pointing to a provided L2 address. + async fn spawn_proxy_server_with_l2(l2_addr: SocketAddr) -> (ServerHandle, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let proxy_addr = listener.local_addr().unwrap(); + drop(listener); let jwt = JwtSecret::random(); - let l2_auth_uri = format!( - "http://{}", - SocketAddr::new(IpAddr::from_str(ADDR).unwrap(), PROXY_PORT as u16) - ) - .parse::() - .unwrap(); + let l2_auth_uri = format!("http://{}", l2_addr).parse::().unwrap(); let (probe_layer, _) = ProbeLayer::new(); - let proxy_layer = ProxyLayer::new(l2_auth_uri.clone(), jwt, 1, l2_auth_uri, jwt, 1); + let proxy_layer = ProxyLayer::new( + ClientArgs { + url: l2_auth_uri.clone(), + jwt_token: Some(jwt), + jwt_path: None, + timeout: 1, + } + .new_http_client(PayloadSource::L2) + .unwrap(), + ClientArgs { + url: l2_auth_uri.clone(), + jwt_token: Some(jwt), + jwt_path: None, + timeout: 1, + } + .new_http_client(PayloadSource::Builder) + .unwrap(), + ); // Create a layered server let server = ServerBuilder::default() @@ -489,7 +520,7 @@ mod tests { .layer(probe_layer) .layer(proxy_layer), ) - .build(addr.parse::().unwrap()) + .build(proxy_addr) .await .unwrap(); @@ -508,7 +539,7 @@ mod tests { .register_method("non_existent_method", |_, _, _| "no proxy response") .unwrap(); - server.start(module) + (server.start(module), proxy_addr) } #[tokio::test] @@ -757,4 +788,212 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_l2_server_recovery() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Step 1: Reserve a port for L2 by binding and then releasing it + let temp_listener = TcpListener::bind("127.0.0.1:0").await?; + let l2_addr = temp_listener.local_addr()?; + drop(temp_listener); + + // Wait for port to be fully released + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Step 2: Create builder and proxy WITHOUT an L2 server running yet + let builder = MockHttpServer::serve().await?; + let builder_addr = builder.addr; + let jwt = JwtSecret::random(); + + // Create proxy layer with L2 client pointing to a non-existent server + // Use a short timeout to fail quickly + let proxy_layer = ProxyLayer::new( + ClientArgs { + url: format!("http://{}:{}", l2_addr.ip(), l2_addr.port()).parse::()?, + jwt_token: Some(jwt), + jwt_path: None, + timeout: 200, // Short timeout for faster failure + } + .new_http_client(PayloadSource::L2)?, + ClientArgs { + url: format!("http://{}:{}", builder_addr.ip(), builder_addr.port()) + .parse::()?, + jwt_token: Some(jwt), + jwt_path: None, + timeout: 200, + } + .new_http_client(PayloadSource::Builder)?, + ); + + // Start proxy server + let listener = TcpListener::bind("127.0.0.1:0").await?; + let proxy_addr = listener.local_addr()?; + drop(listener); + + let server = Server::builder() + .set_http_middleware(tower::ServiceBuilder::new().layer(proxy_layer)) + .build(proxy_addr) + .await?; + + let proxy_addr = server.local_addr()?; + let proxy_client: HttpClient = HttpClient::builder().build(format!( + "http://{}:{}", + proxy_addr.ip(), + proxy_addr.port() + ))?; + + let server_handle = server.start(RpcModule::new(())); + + // Step 3: Request should fail (connection refused) because L2 server doesn't exist + let mock_data = U128::from(42); + let result = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + assert!( + result.is_err(), + "Request should fail when L2 server is not running" + ); + println!("Request failed as expected (no server): {:?}", result); + + // Step 4: Start the L2 server + let l2 = MockHttpServer::serve_on_addr(l2_addr).await?; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Step 5: Request should now succeed (demonstrating auto-recovery) + let result = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + assert!( + result.is_ok(), + "Request should succeed after L2 server starts (auto-recovery): {:?}", + result + ); + println!("Request succeeded after server started: {:?}", result); + + // Step 6: Verify multiple subsequent requests work consistently + for i in 0..3 { + let result = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + assert!( + result.is_ok(), + "Request {} should continue to succeed: {:?}", + i, + result + ); + } + + // Verify the server received requests + { + let l2_requests = l2.requests.lock().await; + assert!( + l2_requests.len() >= 1, + "L2 server should have received requests" + ); + assert_eq!(l2_requests[0]["method"], "mock_forwardedMethod"); + } + + // Cleanup + server_handle.stop()?; + drop(builder); + drop(l2); + + Ok(()) + } + + #[tokio::test] + async fn test_success_then_failure_then_success() -> eyre::Result<()> { + // Dynamically bind L2 and Proxy servers + let l2 = MockHttpServer::serve().await?; + let l2_addr = l2.addr; + + // Build proxy with short timeouts pointing to current L2 + let jwt = JwtSecret::random(); + let proxy_layer = ProxyLayer::new( + ClientArgs { + url: format!("http://{}:{}", l2_addr.ip(), l2_addr.port()).parse::()?, + jwt_token: Some(jwt), + jwt_path: None, + timeout: 200, + } + .new_http_client(PayloadSource::L2)?, + ClientArgs { + url: format!("http://{}:{}", l2_addr.ip(), l2_addr.port()).parse::()?, + jwt_token: Some(jwt), + jwt_path: None, + timeout: 200, + } + .new_http_client(PayloadSource::Builder)?, + ); + + // Start proxy on dynamic port + let listener = TcpListener::bind("127.0.0.1:0").await?; + let proxy_addr = listener.local_addr()?; + drop(listener); + + let server = Server::builder() + .set_http_middleware(tower::ServiceBuilder::new().layer(proxy_layer)) + .build(proxy_addr) + .await?; + let proxy_addr = server.local_addr()?; + let proxy_client: HttpClient = HttpClient::builder().build(format!( + "http://{}:{}", + proxy_addr.ip(), + proxy_addr.port() + ))?; + let server_handle = server.start(RpcModule::new(())); + + let mock_data = U128::from(7); + + // 1) Initial success + let res = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + assert!(res.is_ok(), "initial request should succeed: {:?}", res); + + // 2) Stop L2 -> subsequent failure + drop(l2); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + // Expect a JSON-RPC error object with code -32000 (retriable) + let res = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + match res { + Ok(v) => unreachable!("expected error when L2 down, got: {v:?}"), + Err(ClientError::Call(e)) => { + let code = e.code(); + assert!( + is_retriable_code(code), + "expected retriable code (not parse/invalid), got {}", + code + ); + } + Err(_other) => { + // Transport or other non-Call errors are considered retriable + assert!( + matches!(_other, ClientError::Transport(_)), + "expected transport error" + ); + } + } + + // 3) Restart L2 -> subsequent success + let l2_restarted = MockHttpServer::serve_on_addr(l2_addr).await?; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let res = proxy_client + .request::("mock_forwardedMethod", (mock_data,)) + .await; + assert!( + res.is_ok(), + "request should succeed after L2 restart: {:?}", + res + ); + + // Cleanup + server_handle.stop()?; + drop(l2_restarted); + + Ok(()) + } } diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index aabb1ee1..9749f66f 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -1,8 +1,10 @@ use crate::debug_api::ExecutionMode; -use crate::{BlockSelectionPolicy, EngineApiExt}; +use crate::{ + BlockSelectionPolicy, ClientArgs, EngineApiExt, Flashblocks, FlashblocksService, + RollupBoostArgs, update_execution_mode_gauge, +}; use crate::{ client::rpc::RpcClient, - debug_api::DebugServer, health::HealthHandle, payload::{ NewPayload, NewPayloadV3, NewPayloadV4, OpExecutionPayloadEnvelope, PayloadSource, @@ -34,6 +36,8 @@ use op_alloy_rpc_types_engine::{ }; use opentelemetry::trace::SpanKind; use parking_lot::Mutex; +use std::net::{IpAddr, SocketAddr}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; @@ -52,38 +56,106 @@ pub struct BuilderPayloadResult { pub type BuilderResult = Result>; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RollupBoostServer { pub l2_client: Arc, pub builder_client: Arc, pub payload_trace_context: Arc, + pub execution_mode: Arc>, block_selection_policy: Option, external_state_root: bool, ignore_unhealthy_builders: bool, - execution_mode: Arc>, probes: Arc, payload_to_fcu_request: DashMap)>, } -impl RollupBoostServer -where - T: EngineApiExt, -{ +impl RollupBoostServer { + pub fn new_from_args( + rollup_boost_args: RollupBoostArgs, + probes: Arc, + ) -> eyre::Result { + if !rollup_boost_args.flashblocks.flashblocks { + eyre::bail!( + "FlashblocksService requires flashblocks to be enabled, first check rollup_boost_args.flashblocks.flashblocks == true before calling this constructor" + ); + } + let l2_client_args: ClientArgs = rollup_boost_args.l2_client.into(); + let builder_client_args: ClientArgs = rollup_boost_args.builder.into(); + + let l2_client = l2_client_args.new_rpc_client(PayloadSource::L2)?; + let builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + + let flashblocks_args = rollup_boost_args.flashblocks; + let inbound_url = flashblocks_args.flashblocks_builder_url; + let outbound_addr = SocketAddr::new( + IpAddr::from_str(&flashblocks_args.flashblocks_host)?, + flashblocks_args.flashblocks_port, + ); + + let builder_client = Arc::new(Flashblocks::run( + builder_client.clone(), + inbound_url, + outbound_addr, + flashblocks_args.flashblock_builder_ws_reconnect_ms, + )?); + + Ok(RollupBoostServer::new( + l2_client, + builder_client, + rollup_boost_args.execution_mode, + rollup_boost_args.block_selection_policy, + probes.clone(), + rollup_boost_args.external_state_root, + rollup_boost_args.ignore_unhealthy_builders, + )) + } +} + +impl RollupBoostServer { + pub fn new_from_args( + rollup_boost_args: RollupBoostArgs, + probes: Arc, + ) -> eyre::Result { + if rollup_boost_args.flashblocks.flashblocks { + eyre::bail!( + "RpcClient requires flashblocks to be disabled, first check rollup_boost_args.flashblocks.flashblocks == false before calling this constructor" + ); + } + let l2_client_args: ClientArgs = rollup_boost_args.l2_client.into(); + let builder_client_args: ClientArgs = rollup_boost_args.builder.into(); + + let l2_client = l2_client_args.new_rpc_client(PayloadSource::L2)?; + let builder_client = builder_client_args.new_rpc_client(PayloadSource::Builder)?; + + Ok(RollupBoostServer::new( + l2_client, + Arc::new(builder_client), + rollup_boost_args.execution_mode, + rollup_boost_args.block_selection_policy, + probes.clone(), + rollup_boost_args.external_state_root, + rollup_boost_args.ignore_unhealthy_builders, + )) + } +} + +impl RollupBoostServer { pub fn new( l2_client: RpcClient, builder_client: Arc, - initial_execution_mode: Arc>, + initial_execution_mode: ExecutionMode, block_selection_policy: Option, probes: Arc, external_state_root: bool, ignore_unhealthy_builders: bool, ) -> Self { + update_execution_mode_gauge(initial_execution_mode); Self { l2_client: Arc::new(l2_client), builder_client, block_selection_policy, payload_trace_context: Arc::new(PayloadTraceContext::new()), - execution_mode: initial_execution_mode, + execution_mode: Arc::new(Mutex::new(initial_execution_mode)), probes, external_state_root, ignore_unhealthy_builders, @@ -107,16 +179,6 @@ where handle.spawn() } - pub async fn start_debug_server(&self, debug_addr: &str) -> eyre::Result<()> { - let server = DebugServer::new(self.execution_mode.clone()); - server.run(debug_addr).await?; - Ok(()) - } - - pub fn execution_mode(&self) -> ExecutionMode { - *self.execution_mode.lock() - } - async fn new_payload(&self, new_payload: NewPayload) -> RpcResult { let execution_payload = ExecutionPayload::from(new_payload.clone()); let block_hash = execution_payload.block_hash(); @@ -138,7 +200,7 @@ where .await; // async call to builder to sync the builder node - if !self.execution_mode().is_disabled() && !self.should_skip_unhealthy_builder() { + if !self.execution_mode.lock().is_disabled() && !self.should_skip_unhealthy_builder() { let builder = self.builder_client.clone(); let new_payload_clone = new_payload.clone(); tokio::spawn(async move { builder.new_payload(new_payload_clone).await }); @@ -155,7 +217,7 @@ where // If execution mode is disabled, return the l2 payload without sending // the request to the builder - if self.execution_mode().is_disabled() { + if self.execution_mode.lock().is_disabled() { return match l2_fut.await { Ok(payload) => { self.probes.set_health(Health::Healthy); @@ -271,7 +333,7 @@ where // If execution mode is set to DryRun, fallback to the l2_payload, // otherwise prefer the builder payload - if self.execution_mode().is_dry_run() { + if self.execution_mode.lock().is_dry_run() { (l2_payload, PayloadSource::L2) } else if let Some(selection_policy) = &self.block_selection_policy { selection_policy.select_block(builder_payload, l2_payload) @@ -281,7 +343,7 @@ where } else { // Only update the health status if the builder payload fails // and execution mode is enabled - if self.execution_mode().is_enabled() && builder_api_failed { + if self.execution_mode.lock().is_enabled() && builder_api_failed { self.probes.set_health(Health::PartialContent); } (l2_payload, PayloadSource::L2) @@ -437,10 +499,7 @@ pub trait EngineApi { } #[async_trait] -impl EngineApiServer for RollupBoostServer -where - T: EngineApiExt, -{ +impl EngineApiServer for RollupBoostServer { #[instrument( skip_all, err, @@ -462,7 +521,7 @@ where .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()); // If execution mode is disabled, return the l2 client response immediately - if self.execution_mode().is_disabled() { + if self.execution_mode.lock().is_disabled() { return Ok(l2_fut.await?); } @@ -789,31 +848,43 @@ pub mod tests { let (l2_server, l2_server_addr) = spawn_server(l2_mock.clone()).await; let (builder_server, builder_server_addr) = spawn_server(builder_mock.clone()).await; - let l2_auth_rpc = Uri::from_str(&format!("http://{l2_server_addr}")).unwrap(); - let l2_client = - RpcClient::new(l2_auth_rpc.clone(), jwt_secret, 2000, PayloadSource::L2).unwrap(); - - let builder_auth_rpc = Uri::from_str(&format!("http://{builder_server_addr}")).unwrap(); - let builder_client = Arc::new( - RpcClient::new( - builder_auth_rpc.clone(), - jwt_secret, - 2000, - PayloadSource::Builder, - ) - .unwrap(), + // Build l2 clients + let l2_client_args = ClientArgs { + url: format!("http://{l2_server_addr}") + .parse() + .expect("l2 server address is valid url"), + jwt_token: Some(jwt_secret), + jwt_path: None, + timeout: 2000, + }; + let l2_rpc_client = l2_client_args.new_rpc_client(PayloadSource::L2).unwrap(); + let l2_http_client = l2_client_args.new_http_client(PayloadSource::L2).unwrap(); + + // Build builder clients + let builder_client_args = ClientArgs { + url: Uri::from_str(&format!("http://{builder_server_addr}")).unwrap(), + jwt_token: Some(jwt_secret), + jwt_path: None, + timeout: 2000, + }; + let builder_rpc_client = Arc::new( + builder_client_args + .new_rpc_client(PayloadSource::Builder) + .unwrap(), ); + let builder_http_client = builder_client_args + .new_http_client(PayloadSource::Builder) + .unwrap(); let (probe_layer, probes) = ProbeLayer::new(); - let execution_mode = Arc::new(Mutex::new(ExecutionMode::Enabled)); // For tests, set initial health to Healthy since we don't run health checks probes.set_health(Health::Healthy); let rollup_boost = RollupBoostServer::new( - l2_client, - builder_client, - execution_mode.clone(), + l2_rpc_client, + builder_rpc_client, + ExecutionMode::Enabled, None, probes.clone(), external_state_root, @@ -822,17 +893,9 @@ pub mod tests { let module: RpcModule<()> = rollup_boost.try_into().unwrap(); - let http_middleware = - tower::ServiceBuilder::new() - .layer(probe_layer) - .layer(ProxyLayer::new( - l2_auth_rpc, - jwt_secret, - 1, - builder_auth_rpc, - jwt_secret, - 1, - )); + let http_middleware = tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(ProxyLayer::new(l2_http_client, builder_http_client)); let server = Server::builder() .set_http_middleware(http_middleware)