diff --git a/Cargo.lock b/Cargo.lock index f868a993..ff8d2e34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,7 @@ dependencies = [ "base64 0.22.1", "bimap", "blst", + "bytes", "cipher 0.4.4", "ctr 0.9.2", "derive_more", @@ -1316,6 +1317,7 @@ dependencies = [ "ethereum_ssz_derive", "eyre", "k256", + "mediatype", "pbkdf2 0.12.2", "rand", "reqwest", @@ -1360,6 +1362,7 @@ dependencies = [ "cb-common", "cb-metrics", "dashmap 5.5.3", + "ethereum_ssz 0.8.0", "eyre", "futures", "lazy_static", @@ -1408,6 +1411,7 @@ dependencies = [ "axum", "cb-common", "cb-pbs", + "ethereum_ssz 0.8.0", "eyre", "reqwest", "serde_json", @@ -2838,6 +2842,12 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.2" diff --git a/Cargo.toml b/Cargo.toml index 029088a3..3e2b2ee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,3 +103,5 @@ derive_more = { version = "1.0.0", features = [ "deref", "display", ] } +mediatype = "0.19.13" +bytes = "1.6" diff --git a/crates/cli/src/docker_init.rs b/crates/cli/src/docker_init.rs index 5d257365..f4bb82a3 100644 --- a/crates/cli/src/docker_init.rs +++ b/crates/cli/src/docker_init.rs @@ -156,9 +156,10 @@ pub async fn handle_docker_init(config_path: String, output_dir: String) -> Resu // depends_on let mut module_dependencies = IndexMap::new(); - module_dependencies.insert("cb_signer".into(), DependsCondition { - condition: "service_healthy".into(), - }); + module_dependencies.insert( + "cb_signer".into(), + DependsCondition { condition: "service_healthy".into() }, + ); Service { container_name: Some(module_cid.clone()), diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2c037c47..e8b13ebd 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -51,6 +51,8 @@ url.workspace = true rand.workspace = true bimap.workspace = true derive_more.workspace = true +mediatype.workspace = true +bytes.workspace = true unicode-normalization.workspace = true base64.workspace = true diff --git a/crates/common/src/pbs/types/mod.rs b/crates/common/src/pbs/types/mod.rs index c90b8edf..a8e194d6 100644 --- a/crates/common/src/pbs/types/mod.rs +++ b/crates/common/src/pbs/types/mod.rs @@ -18,8 +18,8 @@ pub use execution_payload::{ EMPTY_TX_ROOT_HASH, }; pub use get_header::{ - ExecutionPayloadHeaderMessageDeneb, GetHeaderParams, GetHeaderResponse, - SignedExecutionPayloadHeader, + ExecutionPayloadHeaderMessageDeneb, ExecutionPayloadHeaderMessageElectra, GetHeaderParams, + GetHeaderResponse, SignedExecutionPayloadHeader, }; pub use kzg::{ KzgCommitment, KzgCommitments, KzgProof, KzgProofs, BYTES_PER_COMMITMENT, BYTES_PER_PROOF, diff --git a/crates/common/src/signer/store.rs b/crates/common/src/signer/store.rs index 3127d254..ee52335b 100644 --- a/crates/common/src/signer/store.rs +++ b/crates/common/src/signer/store.rs @@ -279,8 +279,8 @@ impl ProxyStore { let entry = entry?; let path = entry.path(); - if !path.is_file() || - path.extension().is_none_or(|ext| ext != "json") + if !path.is_file() + || path.extension().is_none_or(|ext| ext != "json") { continue; } @@ -335,8 +335,8 @@ impl ProxyStore { let entry = entry?; let path = entry.path(); - if !path.is_file() || - path.extension().is_none_or(|ext| ext != "json") + if !path.is_file() + || path.extension().is_none_or(|ext| ext != "json") { continue; } diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 71656178..cb3268f8 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -172,10 +172,10 @@ impl KnownChain { pub fn slot_time_sec(&self) -> u64 { match self { - KnownChain::Mainnet | - KnownChain::Holesky | - KnownChain::Sepolia | - KnownChain::Helder => 12, + KnownChain::Mainnet + | KnownChain::Holesky + | KnownChain::Sepolia + | KnownChain::Helder => 12, } } } @@ -332,11 +332,14 @@ mod tests { fn test_load_custom() { let s = r#"chain = { genesis_time_secs = 1, slot_time_secs = 2, genesis_fork_version = "0x01000000" }"#; let decoded: MockConfig = toml::from_str(s).unwrap(); - assert_eq!(decoded.chain, Chain::Custom { - genesis_time_secs: 1, - slot_time_secs: 2, - genesis_fork_version: [1, 0, 0, 0] - }) + assert_eq!( + decoded.chain, + Chain::Custom { + genesis_time_secs: 1, + slot_time_secs: 2, + genesis_fork_version: [1, 0, 0, 0] + } + ) } #[test] @@ -372,11 +375,14 @@ mod tests { let s = format!("chain = {{ genesis_time_secs = 1, path = {path:?}}}"); let decoded: MockConfig = toml::from_str(&s).unwrap(); - assert_eq!(decoded.chain, Chain::Custom { - genesis_time_secs: 1, - slot_time_secs: KnownChain::Holesky.slot_time_sec(), - genesis_fork_version: KnownChain::Holesky.genesis_fork_version() - }) + assert_eq!( + decoded.chain, + Chain::Custom { + genesis_time_secs: 1, + slot_time_secs: KnownChain::Holesky.slot_time_sec(), + genesis_fork_version: KnownChain::Holesky.genesis_fork_version() + } + ) } #[test] @@ -391,11 +397,14 @@ mod tests { let s = format!("chain = {{ genesis_time_secs = 1, path = {path:?}}}"); let decoded: MockConfig = toml::from_str(&s).unwrap(); - assert_eq!(decoded.chain, Chain::Custom { - genesis_time_secs: 1, - slot_time_secs: KnownChain::Sepolia.slot_time_sec(), - genesis_fork_version: KnownChain::Sepolia.genesis_fork_version() - }) + assert_eq!( + decoded.chain, + Chain::Custom { + genesis_time_secs: 1, + slot_time_secs: KnownChain::Sepolia.slot_time_sec(), + genesis_fork_version: KnownChain::Sepolia.genesis_fork_version() + } + ) } #[test] @@ -410,10 +419,13 @@ mod tests { let s = format!("chain = {{ genesis_time_secs = 1, path = {path:?}}}"); let decoded: MockConfig = toml::from_str(&s).unwrap(); - assert_eq!(decoded.chain, Chain::Custom { - genesis_time_secs: 1, - slot_time_secs: KnownChain::Helder.slot_time_sec(), - genesis_fork_version: KnownChain::Helder.genesis_fork_version() - }) + assert_eq!( + decoded.chain, + Chain::Custom { + genesis_time_secs: 1, + slot_time_secs: KnownChain::Helder.slot_time_sec(), + genesis_fork_version: KnownChain::Helder.genesis_fork_version() + } + ) } } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 918b29d5..8791ef7e 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,5 +1,7 @@ use std::{ + fmt, net::Ipv4Addr, + str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; @@ -7,10 +9,19 @@ use alloy::{ primitives::U256, rpc::types::beacon::{BlsPublicKey, BlsSignature}, }; -use axum::http::HeaderValue; +use axum::{ + extract::{FromRequest, Request}, + http::HeaderValue, + response::{IntoResponse, Response}, +}; use blst::min_pk::{PublicKey, Signature}; +use bytes::Bytes; +use mediatype::{names, MediaType, MediaTypeList}; use rand::{distributions::Alphanumeric, Rng}; -use reqwest::header::HeaderMap; +use reqwest::{ + header::{HeaderMap, ACCEPT, CONTENT_TYPE}, + StatusCode, +}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use ssz::{Decode, Encode}; @@ -25,6 +36,7 @@ use crate::{ }; const MILLIS_PER_SECOND: u64 = 1_000; +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 { chain.genesis_time_sec() + slot * chain.slot_time_sec() @@ -288,6 +300,192 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result Accept { + Accept::from_str( + req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or("application/json"), + ) + .unwrap_or(Accept::Json) +} + +/// Parse CONTENT TYPE header, default to JSON if missing or mal-formatted +pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType { + ContentType::from_str( + req_headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .unwrap_or("application/json"), + ) + .unwrap_or(ContentType::Json) +} + +/// Parse CONSENSUS_VERSION header +pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option { + ForkName::from_str( + req_headers + .get(CONSENSUS_VERSION_HEADER) + .and_then(|value| value.to_str().ok()) + .unwrap_or(""), + ) + .ok() +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ForkName { + Deneb, + Electra, +} + +impl std::fmt::Display for ForkName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ForkName::Deneb => write!(f, "deneb"), + ForkName::Electra => write!(f, "electra"), + } + } +} + +impl FromStr for ForkName { + type Err = String; + fn from_str(value: &str) -> Result { + match value { + "deneb" => Ok(ForkName::Deneb), + "electra" => Ok(ForkName::Electra), + _ => Err(format!("Invalid fork name {}", value)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ContentType { + Json, + Ssz, +} + +impl std::fmt::Display for ContentType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ContentType::Json => write!(f, "application/json"), + ContentType::Ssz => write!(f, "application/octet-stream"), + } + } +} + +impl FromStr for ContentType { + type Err = String; + fn from_str(value: &str) -> Result { + match value { + "application/json" => Ok(ContentType::Json), + "application/octet-stream" => Ok(ContentType::Ssz), + _ => Ok(ContentType::Json), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Accept { + Json, + Ssz, + Any, +} + +impl fmt::Display for Accept { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Accept::Ssz => write!(f, "application/octet-stream"), + Accept::Json => write!(f, "application/json"), + Accept::Any => write!(f, "*/*"), + } + } +} + +impl FromStr for Accept { + type Err = String; + + fn from_str(s: &str) -> Result { + let media_type_list = MediaTypeList::new(s); + + // [q-factor weighting]: https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2 + // find the highest q-factor supported accept type + let mut highest_q = 0_u16; + let mut accept_type = None; + + const APPLICATION: &str = names::APPLICATION.as_str(); + const OCTET_STREAM: &str = names::OCTET_STREAM.as_str(); + const JSON: &str = names::JSON.as_str(); + const STAR: &str = names::_STAR.as_str(); + const Q: &str = names::Q.as_str(); + + media_type_list.into_iter().for_each(|item| { + if let Ok(MediaType { ty, subty, suffix: _, params }) = item { + let q_accept = match (ty.as_str(), subty.as_str()) { + (APPLICATION, OCTET_STREAM) => Some(Accept::Ssz), + (APPLICATION, JSON) => Some(Accept::Json), + (STAR, STAR) => Some(Accept::Any), + _ => None, + } + .map(|item_accept_type| { + let q_val = params + .iter() + .find_map(|(n, v)| match n.as_str() { + Q => { + Some((v.as_str().parse::().unwrap_or(0_f32) * 1000_f32) as u16) + } + _ => None, + }) + .or(Some(1000_u16)); + + (q_val.unwrap(), item_accept_type) + }); + + match q_accept { + Some((q, accept)) if q > highest_q => { + highest_q = q; + accept_type = Some(accept); + } + _ => (), + } + } + }); + accept_type.ok_or_else(|| "accept header is not supported".to_string()) + } +} + +#[must_use] +#[derive(Debug, Clone, Copy, Default)] +pub struct JsonOrSsz(pub T); + +impl FromRequest for JsonOrSsz +where + T: serde::de::DeserializeOwned + ssz::Decode + 'static, + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, _state: &S) -> Result { + let headers = req.headers().clone(); + let content_type = headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok()); + + let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?; + + if let Some(content_type) = content_type { + if content_type.starts_with(&ContentType::Json.to_string()) { + let payload: T = serde_json::from_slice(&bytes) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); + } + + if content_type.starts_with(&ContentType::Ssz.to_string()) { + let payload = T::from_ssz_bytes(&bytes) + .map_err(|_| StatusCode::BAD_REQUEST.into_response())?; + return Ok(Self(payload)); + } + } + + Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()) + } +} + #[cfg(unix)] pub async fn wait_for_signal() -> eyre::Result<()> { use tokio::signal::unix::{signal, SignalKind}; diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index 6630f6e6..f278e172 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -11,6 +11,7 @@ cb-metrics.workspace = true # ethereum alloy.workspace = true +ethereum_ssz.workspace = true # networking axum.workspace = true diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index b96945b5..9b5f41f7 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -215,9 +215,9 @@ fn validate_unblinded_block_deneb( let blobs = &block_response.blobs_bundle; let expected_commitments = &signed_blinded_block.body.blob_kzg_commitments; - if expected_commitments.len() != blobs.blobs.len() || - expected_commitments.len() != blobs.commitments.len() || - expected_commitments.len() != blobs.proofs.len() + if expected_commitments.len() != blobs.blobs.len() + || expected_commitments.len() != blobs.commitments.len() + || expected_commitments.len() != blobs.proofs.len() { return Err(PbsError::Validation(ValidationError::KzgCommitments { expected_blobs: expected_commitments.len(), @@ -248,9 +248,9 @@ fn validate_unblinded_block_electra( let blobs = &block_response.blobs_bundle; let expected_commitments = &signed_blinded_block.body.blob_kzg_commitments; - if expected_commitments.len() != blobs.blobs.len() || - expected_commitments.len() != blobs.commitments.len() || - expected_commitments.len() != blobs.proofs.len() + if expected_commitments.len() != blobs.blobs.len() + || expected_commitments.len() != blobs.commitments.len() + || expected_commitments.len() != blobs.proofs.len() { return Err(PbsError::Validation(ValidationError::KzgCommitments { expected_blobs: expected_commitments.len(), diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 919bad11..ea96299d 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -1,14 +1,15 @@ use alloy::primitives::utils::format_ether; use axum::{ extract::{Path, State}, - http::HeaderMap, + http::{HeaderMap, HeaderValue}, response::IntoResponse, }; use cb_common::{ - pbs::{BuilderEvent, GetHeaderParams}, - utils::{get_user_agent, ms_into_slot}, + pbs::{BuilderEvent, GetHeaderParams, VersionedResponse}, + utils::{get_accept_header, get_user_agent, ms_into_slot, Accept, CONSENSUS_VERSION_HEADER}, }; -use reqwest::StatusCode; +use reqwest::{header::CONTENT_TYPE, StatusCode}; +use ssz::Encode; use tracing::{error, info}; use uuid::Uuid; @@ -33,17 +34,48 @@ pub async fn handle_get_header>( let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let accept_header = get_accept_header(&req_headers); info!(ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); match A::get_header(params, req_headers, state.clone()).await { Ok(res) => { state.publish_event(BuilderEvent::GetHeaderResponse(Box::new(res.clone()))); - if let Some(max_bid) = res { info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header"); - BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + let response = match accept_header { + Accept::Ssz => { + let mut res = match &max_bid { + VersionedResponse::Deneb(max_bid) => { + (StatusCode::OK, max_bid.as_ssz_bytes()).into_response() + } + VersionedResponse::Electra(max_bid) => { + (StatusCode::OK, max_bid.as_ssz_bytes()).into_response() + } + }; + let Ok(consensus_version_header) = + HeaderValue::from_str(&format!("{}", max_bid.version())) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); + }; + let Ok(content_type_header) = + HeaderValue::from_str(&format!("{}", Accept::Ssz)) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); + }; + res.headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + res + } + Accept::Json | Accept::Any => { + (StatusCode::OK, axum::Json(max_bid)).into_response() + } + }; + Ok(response) } else { // spec: return 204 if request is valid but no bid available info!("no header available for slot"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 760f17ff..2c1caa79 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,9 +1,18 @@ -use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; +use axum::{ + extract::State, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, + Json, +}; use cb_common::{ - pbs::{BuilderEvent, SignedBlindedBeaconBlock}, - utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}, + pbs::{BuilderEvent, SignedBlindedBeaconBlock, VersionedResponse}, + utils::{ + get_accept_header, get_user_agent, timestamp_of_slot_start_millis, utcnow_ms, ContentType, + JsonOrSsz, CONSENSUS_VERSION_HEADER, + }, }; -use reqwest::StatusCode; +use reqwest::{header::CONTENT_TYPE, StatusCode}; +use ssz::Encode; use tracing::{error, info, trace}; use uuid::Uuid; @@ -19,7 +28,7 @@ use crate::{ pub async fn handle_submit_block>( State(state): State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json, + JsonOrSsz(signed_blinded_block): JsonOrSsz, ) -> Result { let state = state.read().clone(); @@ -31,17 +40,55 @@ pub async fn handle_submit_block>( let block_hash = signed_blinded_block.block_hash(); let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain); let ua = get_user_agent(&req_headers); + let accept_header = get_accept_header(&req_headers); info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); match A::submit_block(signed_blinded_block, req_headers, state.clone()).await { - Ok(res) => { - trace!(?res); - state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new(res.clone()))); + Ok(payload_and_blobs) => { + trace!(?payload_and_blobs); + state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new( + payload_and_blobs.clone(), + ))); info!("received unblinded block"); - BEACON_NODE_STATUS.with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, Json(res).into_response())) + + let response = match accept_header { + cb_common::utils::Accept::Json | cb_common::utils::Accept::Any => { + info!("sending response as JSON"); + (StatusCode::OK, Json(payload_and_blobs)).into_response() + } + cb_common::utils::Accept::Ssz => { + let mut response = match &payload_and_blobs { + VersionedResponse::Deneb(payload_and_blobs) => { + (StatusCode::OK, payload_and_blobs.as_ssz_bytes()).into_response() + } + VersionedResponse::Electra(payload_and_blobs) => { + (StatusCode::OK, payload_and_blobs.as_ssz_bytes()).into_response() + } + }; + let Ok(consensus_version_header) = + HeaderValue::from_str(&format!("{}", payload_and_blobs.version())) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(payload_and_blobs)).into_response()); + }; + let Ok(content_type_header) = + HeaderValue::from_str(&ContentType::Ssz.to_string()) + else { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(payload_and_blobs)).into_response()); + }; + response + .headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + response + } + }; + + Ok(response) } Err(err) => { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 19175ebf..c5155a7a 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -24,4 +24,5 @@ tracing-subscriber.workspace = true tree_hash.workspace = true eyre.workspace = true -url.workspace = true \ No newline at end of file +ethereum_ssz.workspace = true +url.workspace = true diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index 9974bae3..8b14f93c 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -9,23 +9,29 @@ use std::{ use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; use axum::{ extract::{Path, State}, - http::StatusCode, + http::{HeaderMap, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, Json, Router, }; use cb_common::{ pbs::{ - ExecutionPayloadHeaderMessageDeneb, GetHeaderParams, GetHeaderResponse, - SignedExecutionPayloadHeader, SubmitBlindedBlockResponse, BUILDER_API_PATH, - GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, + ExecutionPayloadHeaderMessageDeneb, ExecutionPayloadHeaderMessageElectra, GetHeaderParams, + PayloadAndBlobsDeneb, PayloadAndBlobsElectra, SignedExecutionPayloadHeader, + SubmitBlindedBlockResponse, VersionedResponse, BUILDER_API_PATH, GET_HEADER_PATH, + GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }, signature::sign_builder_root, signer::BlsSecretKey, types::Chain, - utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec}, + utils::{ + blst_pubkey_to_alloy, get_accept_header, get_consensus_version_header, + timestamp_of_slot_start_sec, Accept, ForkName, CONSENSUS_VERSION_HEADER, + }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK; +use reqwest::header::CONTENT_TYPE; +use ssz::Encode; use tokio::net::TcpListener; use tracing::debug; use tree_hash::TreeHash; @@ -100,23 +106,67 @@ pub fn mock_relay_app_router(state: Arc) -> Router { async fn handle_get_header( State(state): State>, Path(GetHeaderParams { parent_hash, .. }): Path, + headers: HeaderMap, ) -> Response { state.received_get_header.fetch_add(1, Ordering::Relaxed); - - let mut response: SignedExecutionPayloadHeader = - SignedExecutionPayloadHeader::default(); - - response.message.header.parent_hash = parent_hash; - response.message.header.block_hash.0[0] = 1; - response.message.value = U256::from(10); - response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); - response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); - - let object_root = response.message.tree_hash_root().0; - response.signature = sign_builder_root(state.chain, &state.signer, object_root); - - let response = GetHeaderResponse::Deneb(response); - (StatusCode::OK, Json(response)).into_response() + let accept_header = get_accept_header(&headers); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + + let data = match consensus_version_header { + ForkName::Deneb => { + let mut response: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::default(); + response.message.header.parent_hash = parent_hash; + response.message.header.block_hash.0[0] = 1; + response.message.value = U256::from(10); + response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); + response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); + + let object_root = response.message.tree_hash_root().0; + response.signature = sign_builder_root(state.chain, &state.signer, object_root); + match accept_header { + Accept::Json | Accept::Any => { + let versioned_response: VersionedResponse< + SignedExecutionPayloadHeader, + SignedExecutionPayloadHeader, + > = VersionedResponse::Deneb(response); + serde_json::to_vec(&versioned_response).unwrap() + } + Accept::Ssz => response.as_ssz_bytes(), + } + } + ForkName::Electra => { + let mut response: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::default(); + response.message.header.parent_hash = parent_hash; + response.message.header.block_hash.0[0] = 1; + response.message.value = U256::from(10); + response.message.pubkey = blst_pubkey_to_alloy(&state.signer.sk_to_pk()); + response.message.header.timestamp = timestamp_of_slot_start_sec(0, state.chain); + + let object_root = response.message.tree_hash_root().0; + response.signature = sign_builder_root(state.chain, &state.signer, object_root); + match accept_header { + Accept::Json | Accept::Any => { + let versioned_response: VersionedResponse< + SignedExecutionPayloadHeader, + SignedExecutionPayloadHeader, + > = VersionedResponse::Electra(response); + serde_json::to_vec(&versioned_response).unwrap() + } + Accept::Ssz => response.as_ssz_bytes(), + } + } + }; + + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } async fn handle_get_status(State(state): State>) -> impl IntoResponse { @@ -133,12 +183,36 @@ async fn handle_register_validator( StatusCode::OK } -async fn handle_submit_block(State(state): State>) -> Response { +async fn handle_submit_block( + State(state): State>, + headers: HeaderMap, +) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); - if state.large_body() { - (StatusCode::OK, Json(vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK])).into_response() + let accept_header = get_accept_header(&headers); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + + let data = if state.large_body() { + vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK] } else { - let response = SubmitBlindedBlockResponse::default(); - (StatusCode::OK, Json(response)).into_response() - } + match accept_header { + cb_common::utils::Accept::Json | cb_common::utils::Accept::Any => { + serde_json::to_vec(&SubmitBlindedBlockResponse::default()).unwrap() + } + cb_common::utils::Accept::Ssz => match consensus_version_header { + cb_common::utils::ForkName::Deneb => PayloadAndBlobsDeneb::default().as_ssz_bytes(), + cb_common::utils::ForkName::Electra => { + PayloadAndBlobsElectra::default().as_ssz_bytes() + } + }, + } + }; + + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 7d31f770..6a83b212 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -2,8 +2,15 @@ use alloy::{ primitives::B256, rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}, }; -use cb_common::pbs::{RelayClient, SignedBlindedBeaconBlock}; -use reqwest::Response; +use cb_common::{ + pbs::{RelayClient, SignedBlindedBeaconBlock}, + utils::{Accept, ContentType, ForkName, CONSENSUS_VERSION_HEADER}, +}; +use reqwest::{ + header::{ACCEPT, CONTENT_TYPE}, + Response, +}; +use ssz::Encode; use crate::utils::generate_mock_relay; @@ -16,9 +23,25 @@ impl MockValidator { Ok(Self { comm_boost: generate_mock_relay(port, BlsPublicKey::default())? }) } - pub async fn do_get_header(&self, pubkey: Option) -> eyre::Result { - let url = self.comm_boost.get_header_url(0, B256::ZERO, pubkey.unwrap_or_default())?; - Ok(self.comm_boost.client.get(url).send().await?) + pub async fn do_get_header( + &self, + pubkey: Option, + accept: Option, + fork_name: ForkName, + ) -> eyre::Result { + let url = self + .comm_boost + .get_header_url(0, B256::ZERO, pubkey.unwrap_or(BlsPublicKey::ZERO)) + .unwrap(); + let res = self + .comm_boost + .client + .get(url) + .header(ACCEPT, &accept.unwrap_or(Accept::Any).to_string()) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) + .send() + .await?; + Ok(res) } pub async fn do_get_status(&self) -> eyre::Result { @@ -41,15 +64,28 @@ impl MockValidator { pub async fn do_submit_block( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, + accept: Accept, + content_type: ContentType, + fork_name: ForkName, ) -> eyre::Result { let url = self.comm_boost.submit_block_url().unwrap(); + let signed_blinded_block = signed_blinded_block_opt.unwrap_or_default(); + + let body = match content_type { + ContentType::Json => serde_json::to_vec(&signed_blinded_block).unwrap(), + ContentType::Ssz => signed_blinded_block.as_ssz_bytes(), + }; + Ok(self .comm_boost .client .post(url) - .json(&signed_blinded_block.unwrap_or_default()) + .body(body) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) + .header(CONTENT_TYPE, &content_type.to_string()) + .header(ACCEPT, &accept.to_string()) .send() .await?) } diff --git a/tests/tests/pbs_get_header.rs b/tests/tests/pbs_get_header.rs index 17803021..bc46cdf5 100644 --- a/tests/tests/pbs_get_header.rs +++ b/tests/tests/pbs_get_header.rs @@ -2,11 +2,11 @@ use std::{sync::Arc, time::Duration}; use alloy::primitives::{B256, U256}; use cb_common::{ - pbs::GetHeaderResponse, + pbs::{ExecutionPayloadHeaderMessageElectra, GetHeaderResponse, SignedExecutionPayloadHeader}, signature::sign_builder_root, signer::{random_secret, BlsPublicKey}, types::Chain, - utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec}, + utils::{blst_pubkey_to_alloy, timestamp_of_slot_start_sec, Accept, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -16,6 +16,7 @@ use cb_tests::{ }; use eyre::Result; use reqwest::StatusCode; +use ssz::Decode; use tracing::info; use tree_hash::TreeHash; @@ -44,7 +45,7 @@ async fn test_get_header() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); let res = serde_json::from_slice::(&res.bytes().await?)?; @@ -69,6 +70,50 @@ async fn test_get_header() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 3200; + let relay_port = pbs_port + 1; + + // Run a mock relay + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mock_relay = generate_mock_relay(relay_port, *pubkey)?; + tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(None, Some(Accept::Ssz), ForkName::Electra).await?; + assert_eq!(res.status(), StatusCode::OK); + + let res: SignedExecutionPayloadHeader = + SignedExecutionPayloadHeader::from_ssz_bytes(&res.bytes().await?).unwrap(); + + assert_eq!(mock_state.received_get_header(), 1); + assert_eq!(res.message.header.block_hash.0[0], 1); + assert_eq!(res.message.header.parent_hash, B256::ZERO); + assert_eq!(res.message.value, U256::from(10)); + assert_eq!(res.message.pubkey, blst_pubkey_to_alloy(&mock_state.signer.sk_to_pk())); + assert_eq!(res.message.header.timestamp, timestamp_of_slot_start_sec(0, chain)); + assert_eq!( + res.signature, + sign_builder_root(chain, &mock_state.signer, res.message.tree_hash_root().0) + ); + Ok(()) +} + #[tokio::test] async fn test_get_header_returns_204_if_relay_down() -> Result<()> { setup_test_env(); @@ -96,7 +141,7 @@ async fn test_get_header_returns_204_if_relay_down() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::NO_CONTENT); // 204 error assert_eq!(mock_state.received_get_header(), 0); // no header received diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 111fe27e..ee0bc7f0 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -4,7 +4,7 @@ use cb_common::{ config::RuntimeMuxConfig, signer::{random_secret, BlsPublicKey}, types::Chain, - utils::blst_pubkey_to_alloy, + utils::{blst_pubkey_to_alloy, Accept, ContentType, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -61,13 +61,19 @@ async fn test_mux() -> Result<()> { // Send default request without specifying a validator key let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header with default"); - assert_eq!(mock_validator.do_get_header(None).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator.do_get_header(None, None, ForkName::Electra).await?.status(), + StatusCode::OK + ); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used // Send request specifying a validator key to use mux info!("Sending get header with mux"); assert_eq!( - mock_validator.do_get_header(Some(validator_pubkey)).await?.status(), + mock_validator + .do_get_header(Some(validator_pubkey), None, ForkName::Electra) + .await? + .status(), StatusCode::OK ); assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used @@ -84,7 +90,13 @@ async fn test_mux() -> Result<()> { // Submit block requests should go to all relays info!("Sending submit block"); - assert_eq!(mock_validator.do_submit_block(None).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator + .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Electra) + .await? + .status(), + StatusCode::OK + ); assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used Ok(()) diff --git a/tests/tests/pbs_post_blinded_blocks.rs b/tests/tests/pbs_post_blinded_blocks.rs index 64255195..a468ca1e 100644 --- a/tests/tests/pbs_post_blinded_blocks.rs +++ b/tests/tests/pbs_post_blinded_blocks.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Duration}; use cb_common::{ - pbs::{SignedBlindedBeaconBlock, SubmitBlindedBlockResponse}, + pbs::{PayloadAndBlobsElectra, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse}, signer::{random_secret, BlsPublicKey}, types::Chain, - utils::blst_pubkey_to_alloy, + utils::{blst_pubkey_to_alloy, Accept, ContentType, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -14,6 +14,7 @@ use cb_tests::{ }; use eyre::Result; use reqwest::StatusCode; +use ssz::Decode; use tracing::info; #[tokio::test] @@ -40,7 +41,14 @@ async fn test_submit_block() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block(Some(SignedBlindedBeaconBlock::default())).await?; + let res = mock_validator + .do_submit_block( + Some(SignedBlindedBeaconBlock::default()), + Accept::Json, + ContentType::Json, + ForkName::Electra, + ) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(mock_state.received_submit_block(), 1); @@ -50,6 +58,47 @@ async fn test_submit_block() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_submit_block_ssz() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 3800; + + // Run a mock relay + let relays = vec![generate_mock_relay(pbs_port + 1, *pubkey)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending submit block"); + let res = mock_validator + .do_submit_block( + Some(SignedBlindedBeaconBlock::default()), + Accept::Ssz, + ContentType::Ssz, + ForkName::Electra, + ) + .await?; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(mock_state.received_submit_block(), 1); + + let response_body = PayloadAndBlobsElectra::from_ssz_bytes(&res.bytes().await?).unwrap(); + assert_eq!(response_body.block_hash(), SubmitBlindedBlockResponse::default().block_hash()); + Ok(()) +} + #[tokio::test] async fn test_submit_block_too_large() -> Result<()> { setup_test_env(); @@ -72,7 +121,9 @@ async fn test_submit_block_too_large() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block(None).await; + let res = mock_validator + .do_submit_block(None, Accept::Json, ContentType::Json, ForkName::Electra) + .await; // response size exceeds max size: max: 20971520 assert_eq!(res.unwrap().status(), StatusCode::BAD_GATEWAY);