diff --git a/Cargo.lock b/Cargo.lock index 28c8a78be..3acc72895 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11627,6 +11627,7 @@ dependencies = [ "reth-transaction-pool", "tikv-jemallocator", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index 4f00b4ea0..e4bd12f73 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -4,8 +4,8 @@ use crate::{ block_output::bidding_service_interface::RelaySet, payload_events::MevBoostSlotData, }, mev_boost::{ - sign_block_for_relay, BLSBlockSigner, MevBoostRelayBidSubmitter, RelayError, RelaySlotData, - SubmitBlockErr, + optimistic_v3::OptimisticV3BlockCache, sign_block_for_relay, BLSBlockSigner, + MevBoostRelayBidSubmitter, RelayError, RelaySlotData, SubmitBlockErr, }, telemetry::{ add_relay_submit_time, add_subsidy_value, inc_conn_relay_errors, @@ -34,10 +34,7 @@ use rbuilder_primitives::{ use reth_chainspec::ChainSpec; use std::sync::Arc; use time::OffsetDateTime; -use tokio::{ - sync::{broadcast, Notify}, - time::Instant, -}; +use tokio::{sync::Notify, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, trace, warn, Instrument, Span}; @@ -116,8 +113,8 @@ pub struct SubmissionConfig { pub struct OptimisticV3Config { /// The URL where the relay can call to retrieve the block. pub builder_url: Vec, - /// Sender for Optimistic V3 blocks. - pub block_sender: broadcast::Sender>, + /// Cache for blocks submitted via optimistic v3. + pub cache: OptimisticV3BlockCache, } /// Values from [`BuiltBlockTrace`] @@ -478,10 +475,10 @@ async fn submit_bid_to_the_relay( }; let request_fut = if let Some((config, request)) = optimistic_v3_request { - // Send the block to be saved in cache - let _ = config - .block_sender - .send(submit_block_request.submission.request.clone()); + // Save block to cache + config + .cache + .insert(submit_block_request.submission.request.clone()); relay .submit_optimistic_v3(request, registration) .left_future() diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 6f0c32a67..cd55f4b17 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -41,7 +41,7 @@ use crate::{ }, mev_boost::{ bloxroute_grpc, - optimistic_v3::{self, OPTIMISTIC_V3_CHANNEL_SIZE}, + optimistic_v3::{self, OptimisticV3BlockCache}, BLSBlockSigner, MevBoostRelayBidSubmitter, MevBoostRelaySlotInfoProvider, RelayClient, RelayConfig, RelaySubmitConfig, }, @@ -78,8 +78,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::sync::{broadcast, Mutex as TokioMutex}; -use tokio_stream::wrappers::BroadcastStream; +use tokio::sync::Mutex as TokioMutex; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use url::Url; @@ -389,19 +388,18 @@ impl L1Config { warn!("Optimistic V3 is enabled, but no relay pubkeys have been configured"); } - let (optimistic_v3_block_tx, optimistic_v3_block_rx) = - broadcast::channel(OPTIMISTIC_V3_CHANNEL_SIZE); + let optimistic_v3_cache = OptimisticV3BlockCache::default(); optimistic_v3::spawn_server( address, signing_domain, self.optimistic_v3_relay_pubkeys.clone(), - BroadcastStream::from(optimistic_v3_block_rx), + optimistic_v3_cache.clone(), cancellation_token, )?; optimistic_v3_config = Some(OptimisticV3Config { builder_url: builder_url.into_bytes(), - block_sender: optimistic_v3_block_tx, + cache: optimistic_v3_cache, }) } diff --git a/crates/rbuilder/src/mev_boost/optimistic_v3.rs b/crates/rbuilder/src/mev_boost/optimistic_v3.rs index c9cf2ca1d..4f25d2d4e 100644 --- a/crates/rbuilder/src/mev_boost/optimistic_v3.rs +++ b/crates/rbuilder/src/mev_boost/optimistic_v3.rs @@ -6,7 +6,6 @@ use alloy_primitives::{bytes::Bytes, B256}; use alloy_rpc_types_beacon::relay::SubmitBlockRequest as AlloySubmitBlockRequest; use alloy_rpc_types_beacon::BlsPublicKey; use ctor::ctor; -use futures::StreamExt as _; use lazy_static::lazy_static; use metrics_macros::register_metrics; use parking_lot::Mutex; @@ -20,7 +19,6 @@ use std::{ sync::Arc, time::{Duration, Instant, SystemTime}, }; -use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tokio_util::sync::CancellationToken; use tracing::*; use warp::{ @@ -46,9 +44,6 @@ register_metrics! { pub static BLOCK_NOT_FOUND_TOTAL: IntCounter = IntCounter::new("relay_server_block_not_found", "The total number of block not found errors on the optimistic V3 relay server").unwrap(); } -/// The channel buffer size for optimistic V3 broadcast channel -pub const OPTIMISTIC_V3_CHANNEL_SIZE: usize = 100; - /// The default number of blocks to keep in cache. With bid update latency of 1ms this would preserve the last second worth of submitted blocks. pub const OPTIMISTIC_V3_CACHE_SIZE_DEFAULT: u32 = 1_000; @@ -60,24 +55,40 @@ pub const OPTIMISTIC_V3_SERVER_CONTENT_LENGTH_LIMIT: u64 = 1_024; /// Reference: pub const GET_PAYLOAD_V3: &str = "get_payload_v3"; +/// The cache containing blocks submitted via optimistic V3. +#[derive(Clone, Debug)] +pub struct OptimisticV3BlockCache(Arc>>>); + +impl Default for OptimisticV3BlockCache { + fn default() -> Self { + Self::new(OPTIMISTIC_V3_CACHE_SIZE_DEFAULT) + } +} + +impl OptimisticV3BlockCache { + /// Create new cache with provided size. + pub fn new(size: u32) -> Self { + Self(Arc::new(Mutex::new(LruMap::new(ByLength::new(size))))) + } + + pub fn insert(&self, block: Arc) { + let block_hash = block.bid_trace().block_hash; + self.0.lock().insert(block_hash, block); + } + + pub fn get(&self, hash: &B256) -> Option> { + self.0.lock().get(hash).cloned() + } +} + /// Initialize the HTTP server. pub fn spawn_server( address: impl Into, domain: B256, relay_pubkeys: HashSet, - bid_stream: BroadcastStream>, + blocks: OptimisticV3BlockCache, cancellation: CancellationToken, ) -> eyre::Result<()> { - let blocks = Arc::new(Mutex::new(LruMap::new(ByLength::new( - OPTIMISTIC_V3_CACHE_SIZE_DEFAULT, - )))); - - // Spawn block cache maintenance task. - tokio::spawn(Box::pin({ - let blocks = blocks.clone(); - async move { maintain_block_cache(bid_stream, blocks).await } - })); - // Spawn relay server. let handler = Handler { domain, @@ -113,7 +124,7 @@ pub fn spawn_server( struct Handler { domain: B256, relay_pubkeys: HashSet, - blocks: Arc>>>, + blocks: OptimisticV3BlockCache, } impl Handler { @@ -180,14 +191,11 @@ impl Handler { return Err(StatusCode::UNAUTHORIZED); } - let block = { - let mut blocks = self.blocks.lock(); - blocks.get(&block_hash).cloned().ok_or_else(|| { - debug!(target: "relay_server", %relay_pubkey, %block_hash, "block not found"); - BLOCK_NOT_FOUND_TOTAL.inc(); - StatusCode::NOT_FOUND - })? - }; + let block = self.blocks.get(&block_hash).ok_or_else(|| { + debug!(target: "relay_server", %relay_pubkey, %block_hash, "block not found"); + BLOCK_NOT_FOUND_TOTAL.inc(); + StatusCode::NOT_FOUND + })?; let (body, content_ty) = if is_json { let json = serde_json::to_vec(&block).map_err(|error| { @@ -207,24 +215,3 @@ impl Handler { Ok(res) } } - -async fn maintain_block_cache( - mut bid_stream: BroadcastStream>, - blocks: Arc>>>, -) { - loop { - match bid_stream.next().await { - Some(Ok(block)) => { - let block_hash = block.bid_trace().block_hash; - blocks.lock().insert(block_hash, block); - trace!(target: "relay_server", %block_hash, "Block added to the relay server cache") - } - Some(Err(BroadcastStreamRecvError::Lagged(lag))) => { - error!(target: "relay_server", lag, "Block stream lagging behind"); - } - None => { - error!(target: "relay_server", "Block stream closed"); - } - } - } -}