diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index fdfc45cd..77d942cd 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -25,6 +25,9 @@ pub enum PbsError { #[error("URL parsing error: {0}")] UrlParsing(#[from] url::ParseError), + + #[error("tokio join error: {0}")] + TokioJoinError(#[from] tokio::task::JoinError), } impl PbsError { diff --git a/crates/pbs/src/api.rs b/crates/pbs/src/api.rs index a3786f0b..594b7d36 100644 --- a/crates/pbs/src/api.rs +++ b/crates/pbs/src/api.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; use axum::{Router, http::HeaderMap}; use cb_common::pbs::{ @@ -34,10 +36,10 @@ pub trait BuilderApi: 'static { /// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock and /// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2 async fn submit_block( - signed_blinded_block: SignedBlindedBeaconBlock, + signed_blinded_block: Arc, req_headers: HeaderMap, state: PbsState, - api_version: &BuilderApiVersion, + api_version: BuilderApiVersion, ) -> eyre::Result> { mev_boost::submit_block(signed_blinded_block, req_headers, state, api_version).await } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 91df8996..15f68416 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -7,7 +7,10 @@ use cb_common::{ utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, }; use eyre::bail; -use futures::future::{join_all, select_ok}; +use futures::{ + FutureExt, + future::{join_all, select_ok}, +}; use reqwest::header::{CONTENT_TYPE, USER_AGENT}; use tracing::{Instrument, debug, error}; use url::Url; @@ -49,32 +52,38 @@ pub async fn register_validator( for (n_regs, body) in bodies { for relay in state.all_relays().iter().cloned() { - handles.push(tokio::spawn( - send_register_validator_with_timeout( - n_regs, - body.clone(), - relay, - send_headers.clone(), - state.pbs_config().timeout_register_validator_ms, - state.pbs_config().register_validator_retry_limit, + handles.push( + tokio::spawn( + send_register_validator_with_timeout( + n_regs, + body.clone(), + relay, + send_headers.clone(), + state.pbs_config().timeout_register_validator_ms, + state.pbs_config().register_validator_retry_limit, + ) + .in_current_span(), ) - .in_current_span(), - )); + .map(|join_result| match join_result { + Ok(res) => res, + Err(err) => Err(PbsError::TokioJoinError(err)), + }), + ); } } if state.pbs_config().wait_all_registrations { // wait for all relays registrations to complete let results = join_all(handles).await; - if results.into_iter().any(|res| res.is_ok_and(|res| res.is_ok())) { + if results.into_iter().any(|res| res.is_ok()) { Ok(()) } else { bail!("No relay passed register_validator successfully") } } else { // return once first completes, others proceed in background - let result = select_ok(handles).await?; - match result.0 { + let result = select_ok(handles).await; + match result { Ok(_) => Ok(()), Err(_) => bail!("No relay passed register_validator successfully"), } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index e8d8ea6b..2b10dcaa 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,5 +1,6 @@ use std::{ str::FromStr, + sync::Arc, time::{Duration, Instant}, }; @@ -14,7 +15,7 @@ use cb_common::{ }, utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, }; -use futures::future::select_ok; +use futures::{FutureExt, future::select_ok}; use reqwest::header::USER_AGENT; use tracing::{debug, warn}; use url::Url; @@ -31,10 +32,10 @@ use crate::{ /// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to /// distinguish between the two. pub async fn submit_block( - signed_blinded_block: SignedBlindedBeaconBlock, + signed_blinded_block: Arc, req_headers: HeaderMap, state: PbsState, - api_version: &BuilderApiVersion, + api_version: BuilderApiVersion, ) -> eyre::Result> { debug!(?req_headers, "received headers"); @@ -58,17 +59,22 @@ pub async fn submit_block( send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); send_headers.insert(HEADER_CONSENSUS_VERSION, consensus_version); - let relays = state.all_relays(); - let mut handles = Vec::with_capacity(relays.len()); - for relay in relays.iter() { - handles.push(Box::pin(submit_block_with_timeout( - &signed_blinded_block, - relay, - send_headers.clone(), - state.pbs_config().timeout_get_payload_ms, - api_version, - fork_name, - ))); + let mut handles = Vec::with_capacity(state.all_relays().len()); + for relay in state.all_relays().iter().cloned() { + handles.push( + tokio::spawn(submit_block_with_timeout( + signed_blinded_block.clone(), + relay, + send_headers.clone(), + state.pbs_config().timeout_get_payload_ms, + api_version, + fork_name, + )) + .map(|join_result| match join_result { + Ok(res) => res, + Err(err) => Err(PbsError::TokioJoinError(err)), + }), + ); } let results = select_ok(handles).await; @@ -81,14 +87,14 @@ pub async fn submit_block( /// Submit blinded block to relay, retry connection errors until the /// given timeout has passed async fn submit_block_with_timeout( - signed_blinded_block: &SignedBlindedBeaconBlock, - relay: &RelayClient, + signed_blinded_block: Arc, + relay: RelayClient, headers: HeaderMap, timeout_ms: u64, - api_version: &BuilderApiVersion, + api_version: BuilderApiVersion, fork_name: ForkName, ) -> Result, PbsError> { - let mut url = relay.submit_block_url(*api_version)?; + let mut url = relay.submit_block_url(api_version)?; let mut remaining_timeout_ms = timeout_ms; let mut retry = 0; let mut backoff = Duration::from_millis(250); @@ -97,12 +103,12 @@ async fn submit_block_with_timeout( let start_request = Instant::now(); match send_submit_block( url.clone(), - signed_blinded_block, - relay, + &signed_blinded_block, + &relay, headers.clone(), remaining_timeout_ms, retry, - api_version, + &api_version, fork_name, ) .await diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 4784e6b1..004b601e 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; use cb_common::{ pbs::{BuilderApiVersion, GetPayloadInfo, SignedBlindedBeaconBlock}, @@ -17,7 +19,7 @@ use crate::{ pub async fn handle_submit_block_v1>( state: State>, req_headers: HeaderMap, - signed_blinded_block: Json, + Json(signed_blinded_block): Json>, ) -> Result { handle_submit_block_impl::( state, @@ -31,7 +33,7 @@ pub async fn handle_submit_block_v1>( pub async fn handle_submit_block_v2>( state: State>, req_headers: HeaderMap, - signed_blinded_block: Json, + Json(signed_blinded_block): Json>, ) -> Result { handle_submit_block_impl::( state, @@ -45,7 +47,7 @@ pub async fn handle_submit_block_v2>( async fn handle_submit_block_impl>( State(state): State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json, + signed_blinded_block: Arc, api_version: BuilderApiVersion, ) -> Result { tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64); @@ -65,7 +67,7 @@ async fn handle_submit_block_impl>( info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); - match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await { + match A::submit_block(signed_blinded_block, req_headers, state, api_version).await { Ok(res) => match res { Some(block_response) => { trace!(?block_response);