Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 9 additions & 12 deletions crates/rbuilder/src/live_builder/block_output/relay_submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::{
block_output::bidding_service_interface::RelaySet, payload_events::MevBoostSlotData,
},
mev_boost::{
sign_block_for_relay, BLSBlockSigner, MevBoostRelayBidSubmitter, RelayError, RelaySlotData,
SubmitBlockErr,
optimistic_v3::OptimisticV3BlockCache, sign_block_for_relay, BLSBlockSigner,
MevBoostRelayBidSubmitter, RelayError, RelaySlotData, SubmitBlockErr,
},
telemetry::{
add_relay_submit_time, add_subsidy_value, inc_conn_relay_errors,
Expand Down Expand Up @@ -34,10 +34,7 @@ use rbuilder_primitives::{
use reth_chainspec::ChainSpec;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::{
sync::{broadcast, Notify},
time::Instant,
};
use tokio::{sync::Notify, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, trace, warn, Instrument, Span};

Expand Down Expand Up @@ -116,8 +113,8 @@ pub struct SubmissionConfig {
pub struct OptimisticV3Config {
/// The URL where the relay can call to retrieve the block.
pub builder_url: Vec<u8>,
/// Sender for Optimistic V3 blocks.
pub block_sender: broadcast::Sender<Arc<AlloySubmitBlockRequest>>,
/// Cache for blocks submitted via optimistic v3.
pub cache: OptimisticV3BlockCache,
}

/// Values from [`BuiltBlockTrace`]
Expand Down Expand Up @@ -478,10 +475,10 @@ async fn submit_bid_to_the_relay(
};

let request_fut = if let Some((config, request)) = optimistic_v3_request {
// Send the block to be saved in cache
let _ = config
.block_sender
.send(submit_block_request.submission.request.clone());
// Save block to cache
config
.cache
.insert(submit_block_request.submission.request.clone());
relay
.submit_optimistic_v3(request, registration)
.left_future()
Expand Down
12 changes: 5 additions & 7 deletions crates/rbuilder/src/live_builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
mev_boost::{
bloxroute_grpc,
optimistic_v3::{self, OPTIMISTIC_V3_CHANNEL_SIZE},
optimistic_v3::{self, OptimisticV3BlockCache},
BLSBlockSigner, MevBoostRelayBidSubmitter, MevBoostRelaySlotInfoProvider, RelayClient,
RelayConfig, RelaySubmitConfig,
},
Expand Down Expand Up @@ -78,8 +78,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::{broadcast, Mutex as TokioMutex};
use tokio_stream::wrappers::BroadcastStream;
use tokio::sync::Mutex as TokioMutex;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use url::Url;
Expand Down Expand Up @@ -389,19 +388,18 @@ impl L1Config {
warn!("Optimistic V3 is enabled, but no relay pubkeys have been configured");
}

let (optimistic_v3_block_tx, optimistic_v3_block_rx) =
broadcast::channel(OPTIMISTIC_V3_CHANNEL_SIZE);
let optimistic_v3_cache = OptimisticV3BlockCache::default();
optimistic_v3::spawn_server(
address,
signing_domain,
self.optimistic_v3_relay_pubkeys.clone(),
BroadcastStream::from(optimistic_v3_block_rx),
optimistic_v3_cache.clone(),
cancellation_token,
)?;

optimistic_v3_config = Some(OptimisticV3Config {
builder_url: builder_url.into_bytes(),
block_sender: optimistic_v3_block_tx,
cache: optimistic_v3_cache,
})
}

Expand Down
79 changes: 33 additions & 46 deletions crates/rbuilder/src/mev_boost/optimistic_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use alloy_primitives::{bytes::Bytes, B256};
use alloy_rpc_types_beacon::relay::SubmitBlockRequest as AlloySubmitBlockRequest;
use alloy_rpc_types_beacon::BlsPublicKey;
use ctor::ctor;
use futures::StreamExt as _;
use lazy_static::lazy_static;
use metrics_macros::register_metrics;
use parking_lot::Mutex;
Expand All @@ -20,7 +19,6 @@ use std::{
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tokio_util::sync::CancellationToken;
use tracing::*;
use warp::{
Expand All @@ -46,9 +44,6 @@ register_metrics! {
pub static BLOCK_NOT_FOUND_TOTAL: IntCounter = IntCounter::new("relay_server_block_not_found", "The total number of block not found errors on the optimistic V3 relay server").unwrap();
}

/// The channel buffer size for optimistic V3 broadcast channel
pub const OPTIMISTIC_V3_CHANNEL_SIZE: usize = 100;

/// The default number of blocks to keep in cache. With bid update latency of 1ms this would preserve the last second worth of submitted blocks.
pub const OPTIMISTIC_V3_CACHE_SIZE_DEFAULT: u32 = 1_000;

Expand All @@ -60,24 +55,40 @@ pub const OPTIMISTIC_V3_SERVER_CONTENT_LENGTH_LIMIT: u64 = 1_024;
/// Reference: <https://ethresear.ch/t/introduction-to-optimistic-v3-relays/22066#p-53641-technical-specification-8>
pub const GET_PAYLOAD_V3: &str = "get_payload_v3";

/// The cache containing blocks submitted via optimistic V3.
#[derive(Clone, Debug)]
pub struct OptimisticV3BlockCache(Arc<Mutex<LruMap<B256, Arc<AlloySubmitBlockRequest>>>>);

impl Default for OptimisticV3BlockCache {
fn default() -> Self {
Self::new(OPTIMISTIC_V3_CACHE_SIZE_DEFAULT)
}
}

impl OptimisticV3BlockCache {
/// Create new cache with provided size.
pub fn new(size: u32) -> Self {
Self(Arc::new(Mutex::new(LruMap::new(ByLength::new(size)))))
}

pub fn insert(&self, block: Arc<AlloySubmitBlockRequest>) {
let block_hash = block.bid_trace().block_hash;
self.0.lock().insert(block_hash, block);
}

pub fn get(&self, hash: &B256) -> Option<Arc<AlloySubmitBlockRequest>> {
self.0.lock().get(hash).cloned()
}
}

/// Initialize the HTTP server.
pub fn spawn_server(
address: impl Into<SocketAddr>,
domain: B256,
relay_pubkeys: HashSet<BlsPublicKey>,
bid_stream: BroadcastStream<Arc<AlloySubmitBlockRequest>>,
blocks: OptimisticV3BlockCache,
cancellation: CancellationToken,
) -> eyre::Result<()> {
let blocks = Arc::new(Mutex::new(LruMap::new(ByLength::new(
OPTIMISTIC_V3_CACHE_SIZE_DEFAULT,
))));

// Spawn block cache maintenance task.
tokio::spawn(Box::pin({
let blocks = blocks.clone();
async move { maintain_block_cache(bid_stream, blocks).await }
}));

// Spawn relay server.
let handler = Handler {
domain,
Expand Down Expand Up @@ -113,7 +124,7 @@ pub fn spawn_server(
struct Handler {
domain: B256,
relay_pubkeys: HashSet<BlsPublicKey>,
blocks: Arc<Mutex<LruMap<B256, Arc<AlloySubmitBlockRequest>>>>,
blocks: OptimisticV3BlockCache,
}

impl Handler {
Expand Down Expand Up @@ -180,14 +191,11 @@ impl Handler {
return Err(StatusCode::UNAUTHORIZED);
}

let block = {
let mut blocks = self.blocks.lock();
blocks.get(&block_hash).cloned().ok_or_else(|| {
debug!(target: "relay_server", %relay_pubkey, %block_hash, "block not found");
BLOCK_NOT_FOUND_TOTAL.inc();
StatusCode::NOT_FOUND
})?
};
let block = self.blocks.get(&block_hash).ok_or_else(|| {
debug!(target: "relay_server", %relay_pubkey, %block_hash, "block not found");
BLOCK_NOT_FOUND_TOTAL.inc();
StatusCode::NOT_FOUND
})?;

let (body, content_ty) = if is_json {
let json = serde_json::to_vec(&block).map_err(|error| {
Expand All @@ -207,24 +215,3 @@ impl Handler {
Ok(res)
}
}

async fn maintain_block_cache(
mut bid_stream: BroadcastStream<Arc<AlloySubmitBlockRequest>>,
blocks: Arc<Mutex<LruMap<B256, Arc<AlloySubmitBlockRequest>>>>,
) {
loop {
match bid_stream.next().await {
Some(Ok(block)) => {
let block_hash = block.bid_trace().block_hash;
blocks.lock().insert(block_hash, block);
trace!(target: "relay_server", %block_hash, "Block added to the relay server cache")
}
Some(Err(BroadcastStreamRecvError::Lagged(lag))) => {
error!(target: "relay_server", lag, "Block stream lagging behind");
}
None => {
error!(target: "relay_server", "Block stream closed");
}
}
}
}
Loading