diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 90dea5546..6f0c32a67 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -360,6 +360,7 @@ impl L1Config { chain_spec: Arc, relay_sets: Vec, bid_observer: Box, + cancellation_token: CancellationToken, ) -> eyre::Result<( RelaySubmitSinkFactory, Vec, @@ -395,6 +396,7 @@ impl L1Config { signing_domain, self.optimistic_v3_relay_pubkeys.clone(), BroadcastStream::from(optimistic_v3_block_rx), + cancellation_token, )?; optimistic_v3_config = Some(OptimisticV3Config { @@ -1068,6 +1070,7 @@ where base_config.chain_spec()?, relay_sets.clone(), bid_observer, + cancellation_token.clone(), )?; if !l1_config.relay_bid_scrapers.is_empty() { diff --git a/crates/rbuilder/src/mev_boost/optimistic_v3.rs b/crates/rbuilder/src/mev_boost/optimistic_v3.rs index 06607e52c..c9cf2ca1d 100644 --- a/crates/rbuilder/src/mev_boost/optimistic_v3.rs +++ b/crates/rbuilder/src/mev_boost/optimistic_v3.rs @@ -21,6 +21,7 @@ use std::{ time::{Duration, Instant, SystemTime}, }; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; +use tokio_util::sync::CancellationToken; use tracing::*; use warp::{ http::{header::CONTENT_TYPE, HeaderValue, StatusCode}, @@ -65,6 +66,7 @@ pub fn spawn_server( domain: B256, relay_pubkeys: HashSet, bid_stream: BroadcastStream>, + cancellation: CancellationToken, ) -> eyre::Result<()> { let blocks = Arc::new(Mutex::new(LruMap::new(ByLength::new( OPTIMISTIC_V3_CACHE_SIZE_DEFAULT, @@ -92,7 +94,16 @@ pub fn spawn_server( )) .and(warp::body::bytes()) .map(Handler::get_payload_v3_metered); - tokio::spawn(warp::serve(path).run(address)); + + let (_, server) = warp::serve(path).bind_with_graceful_shutdown(address, async move { + cancellation.cancelled().await; + let now = std::time::Instant::now(); + info!(target: "relay_server", "Received cancellation, initiating graceful shutdown"); + // Sleep for 12 seconds to avoid being demoted for the current slot. + tokio::time::sleep(Duration::from_secs(12)).await; + info!(target: "relay_server", elapsed = ?now.elapsed(), "Graceful shutdown complete"); + }); + tokio::spawn(server); info!(target: "relay_server", %address, "Relay server listening"); Ok(())