Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/common/src/pbs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub enum PbsError {

#[error("URL parsing error: {0}")]
UrlParsing(#[from] url::ParseError),

#[error("tokio join error: {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
}

impl PbsError {
Expand Down
6 changes: 4 additions & 2 deletions crates/pbs/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use axum::{Router, http::HeaderMap};
use cb_common::pbs::{
Expand Down Expand Up @@ -34,10 +36,10 @@ pub trait BuilderApi<S: BuilderApiState>: 'static {
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock and
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2
async fn submit_block(
signed_blinded_block: SignedBlindedBeaconBlock,
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
req_headers: HeaderMap,
state: PbsState<S>,
api_version: &BuilderApiVersion,
api_version: BuilderApiVersion,
) -> eyre::Result<Option<SubmitBlindedBlockResponse>> {
mev_boost::submit_block(signed_blinded_block, req_headers, state, api_version).await
}
Expand Down
37 changes: 23 additions & 14 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use cb_common::{
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
};
use eyre::bail;
use futures::future::{join_all, select_ok};
use futures::{
FutureExt,
future::{join_all, select_ok},
};
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use tracing::{Instrument, debug, error};
use url::Url;
Expand Down Expand Up @@ -49,32 +52,38 @@ pub async fn register_validator<S: BuilderApiState>(

for (n_regs, body) in bodies {
for relay in state.all_relays().iter().cloned() {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
n_regs,
body.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
handles.push(
tokio::spawn(
send_register_validator_with_timeout(
n_regs,
body.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
)
.in_current_span(),
)
.in_current_span(),
));
.map(|join_result| match join_result {
Ok(res) => res,
Err(err) => Err(PbsError::TokioJoinError(err)),
}),
);
}
}

if state.pbs_config().wait_all_registrations {
// wait for all relays registrations to complete
let results = join_all(handles).await;
if results.into_iter().any(|res| res.is_ok_and(|res| res.is_ok())) {
if results.into_iter().any(|res| res.is_ok()) {
Ok(())
} else {
bail!("No relay passed register_validator successfully")
}
} else {
// return once first completes, others proceed in background
let result = select_ok(handles).await?;
match result.0 {
let result = select_ok(handles).await;
match result {
Ok(_) => Ok(()),
Err(_) => bail!("No relay passed register_validator successfully"),
}
Expand Down
48 changes: 27 additions & 21 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -14,7 +15,7 @@ use cb_common::{
},
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
};
use futures::future::select_ok;
use futures::{FutureExt, future::select_ok};
use reqwest::header::USER_AGENT;
use tracing::{debug, warn};
use url::Url;
Expand All @@ -31,10 +32,10 @@ use crate::{
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to
/// distinguish between the two.
pub async fn submit_block<S: BuilderApiState>(
signed_blinded_block: SignedBlindedBeaconBlock,
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
req_headers: HeaderMap,
state: PbsState<S>,
api_version: &BuilderApiVersion,
api_version: BuilderApiVersion,
) -> eyre::Result<Option<SubmitBlindedBlockResponse>> {
debug!(?req_headers, "received headers");

Expand All @@ -58,17 +59,22 @@ pub async fn submit_block<S: BuilderApiState>(
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);
send_headers.insert(HEADER_CONSENSUS_VERSION, consensus_version);

let relays = state.all_relays();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays.iter() {
handles.push(Box::pin(submit_block_with_timeout(
&signed_blinded_block,
relay,
send_headers.clone(),
state.pbs_config().timeout_get_payload_ms,
api_version,
fork_name,
)));
let mut handles = Vec::with_capacity(state.all_relays().len());
for relay in state.all_relays().iter().cloned() {
handles.push(
tokio::spawn(submit_block_with_timeout(
signed_blinded_block.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_get_payload_ms,
api_version,
fork_name,
))
.map(|join_result| match join_result {
Ok(res) => res,
Err(err) => Err(PbsError::TokioJoinError(err)),
}),
);
}

let results = select_ok(handles).await;
Expand All @@ -81,14 +87,14 @@ pub async fn submit_block<S: BuilderApiState>(
/// Submit blinded block to relay, retry connection errors until the
/// given timeout has passed
async fn submit_block_with_timeout(
signed_blinded_block: &SignedBlindedBeaconBlock,
relay: &RelayClient,
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
relay: RelayClient,
headers: HeaderMap,
timeout_ms: u64,
api_version: &BuilderApiVersion,
api_version: BuilderApiVersion,
fork_name: ForkName,
) -> Result<Option<SubmitBlindedBlockResponse>, PbsError> {
let mut url = relay.submit_block_url(*api_version)?;
let mut url = relay.submit_block_url(api_version)?;
let mut remaining_timeout_ms = timeout_ms;
let mut retry = 0;
let mut backoff = Duration::from_millis(250);
Expand All @@ -97,12 +103,12 @@ async fn submit_block_with_timeout(
let start_request = Instant::now();
match send_submit_block(
url.clone(),
signed_blinded_block,
relay,
&signed_blinded_block,
&relay,
headers.clone(),
remaining_timeout_ms,
retry,
api_version,
&api_version,
fork_name,
)
.await
Expand Down
10 changes: 6 additions & 4 deletions crates/pbs/src/routes/submit_block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse};
use cb_common::{
pbs::{BuilderApiVersion, GetPayloadInfo, SignedBlindedBeaconBlock},
Expand All @@ -17,7 +19,7 @@ use crate::{
pub async fn handle_submit_block_v1<S: BuilderApiState, A: BuilderApi<S>>(
state: State<PbsStateGuard<S>>,
req_headers: HeaderMap,
signed_blinded_block: Json<SignedBlindedBeaconBlock>,
Json(signed_blinded_block): Json<Arc<SignedBlindedBeaconBlock>>,
) -> Result<impl IntoResponse, PbsClientError> {
handle_submit_block_impl::<S, A>(
state,
Expand All @@ -31,7 +33,7 @@ pub async fn handle_submit_block_v1<S: BuilderApiState, A: BuilderApi<S>>(
pub async fn handle_submit_block_v2<S: BuilderApiState, A: BuilderApi<S>>(
state: State<PbsStateGuard<S>>,
req_headers: HeaderMap,
signed_blinded_block: Json<SignedBlindedBeaconBlock>,
Json(signed_blinded_block): Json<Arc<SignedBlindedBeaconBlock>>,
) -> Result<impl IntoResponse, PbsClientError> {
handle_submit_block_impl::<S, A>(
state,
Expand All @@ -45,7 +47,7 @@ pub async fn handle_submit_block_v2<S: BuilderApiState, A: BuilderApi<S>>(
async fn handle_submit_block_impl<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsStateGuard<S>>,
req_headers: HeaderMap,
Json(signed_blinded_block): Json<SignedBlindedBeaconBlock>,
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
api_version: BuilderApiVersion,
) -> Result<impl IntoResponse, PbsClientError> {
tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64);
Expand All @@ -65,7 +67,7 @@ async fn handle_submit_block_impl<S: BuilderApiState, A: BuilderApi<S>>(

info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request");

match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await {
match A::submit_block(signed_blinded_block, req_headers, state, api_version).await {
Ok(res) => match res {
Some(block_response) => {
trace!(?block_response);
Expand Down
Loading