diff --git a/Cargo.lock b/Cargo.lock index 4d00a3417..2e846d7da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9200,6 +9200,7 @@ dependencies = [ "reth-trie-parallel", "revm", "revm-inspectors", + "schnellru", "secp256k1 0.30.0", "serde", "serde_json", @@ -9207,7 +9208,6 @@ dependencies = [ "sha2 0.10.9", "shellexpand", "sqlx", - "ssz_types 0.8.0", "sysperf", "tempfile", "test_utils", @@ -9222,9 +9222,6 @@ dependencies = [ "tonic-build", "tracing", "tracing-subscriber 0.3.19", - "tree_hash 0.8.0", - "tree_hash_derive 0.8.0", - "typenum", "url", "uuid", "warp", @@ -9327,10 +9324,14 @@ dependencies = [ "serde_json", "serde_with", "sha2 0.10.9", + "ssz_types 0.8.0", "thiserror 1.0.69", "time", "toml 0.8.20", "tracing", + "tree_hash 0.8.0", + "tree_hash_derive 0.8.0", + "typenum", "uuid", ] diff --git a/crates/rbuilder-operator/src/blocks_processor.rs b/crates/rbuilder-operator/src/blocks_processor.rs index d57e56c84..39a685ba6 100644 --- a/crates/rbuilder-operator/src/blocks_processor.rs +++ b/crates/rbuilder-operator/src/blocks_processor.rs @@ -13,7 +13,6 @@ use rbuilder_primitives::{ serialize::{RawBundle, RawShareBundle}, Bundle, Order, OrderId, }; -use reth_primitives::SealedBlock; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_with::{serde_as, DisplayFromStr}; @@ -134,20 +133,20 @@ impl BlocksProcessorClient { } pub async fn submit_built_block( &self, - sealed_block: &SealedBlock, submit_block_request: &SubmitBlockRequest, built_block_trace: &BuiltBlockTrace, builder_name: String, best_bid_value: U256, ) -> eyre::Result<()> { + let execution_payload_v1 = submit_block_request.execution_payload_v1(); let header = BlocksProcessorHeader { - hash: sealed_block.hash(), - gas_limit: U256::from(sealed_block.gas_limit), - gas_used: U256::from(sealed_block.gas_used), - base_fee_per_gas: sealed_block.base_fee_per_gas.map(U256::from), - parent_hash: sealed_block.parent_hash, - timestamp: U256::from(sealed_block.timestamp), - number: Some(U256::from(sealed_block.number)), + hash: execution_payload_v1.block_hash, + gas_limit: U256::from(execution_payload_v1.gas_limit), + gas_used: U256::from(execution_payload_v1.gas_used), + base_fee_per_gas: Some(execution_payload_v1.base_fee_per_gas), + parent_hash: execution_payload_v1.parent_hash, + timestamp: U256::from(execution_payload_v1.timestamp), + number: Some(U256::from(execution_payload_v1.block_number)), }; let closed_at = built_block_trace .orders_closed_at @@ -343,7 +342,6 @@ impl fn block_submitted( &self, _slot_data: &MevBoostSlotData, - sealed_block: &SealedBlock, submit_block_request: &SubmitBlockRequest, built_block_trace: &BuiltBlockTrace, builder_name: String, @@ -351,13 +349,11 @@ impl ) { let client = self.client.clone(); let parent_span = Span::current(); - let sealed_block = sealed_block.clone(); let submit_block_request = submit_block_request.clone(); let built_block_trace = built_block_trace.clone(); tokio::spawn(async move { let block_processor_result = client .submit_built_block( - &sealed_block, &submit_block_request, &built_block_trace, builder_name, diff --git a/crates/rbuilder-operator/src/flashbots_config.rs b/crates/rbuilder-operator/src/flashbots_config.rs index 6d37b7378..212e6814b 100644 --- a/crates/rbuilder-operator/src/flashbots_config.rs +++ b/crates/rbuilder-operator/src/flashbots_config.rs @@ -30,7 +30,6 @@ use rbuilder::{ }; use rbuilder_config::EnvOrValue; use rbuilder_primitives::mev_boost::SubmitBlockRequest; -use reth_primitives::SealedBlock; use serde::Deserialize; use serde_with::serde_as; use tokio_util::sync::CancellationToken; @@ -427,7 +426,6 @@ impl BidObserver for RbuilderOperatorBidObserver { fn block_submitted( &self, slot_data: &MevBoostSlotData, - sealed_block: &SealedBlock, submit_block_request: &SubmitBlockRequest, built_block_trace: &BuiltBlockTrace, builder_name: String, @@ -436,7 +434,6 @@ impl BidObserver for RbuilderOperatorBidObserver { if let Some(p) = self.block_processor.as_ref() { p.block_submitted( slot_data, - sealed_block, submit_block_request, built_block_trace, builder_name.clone(), @@ -446,7 +443,6 @@ impl BidObserver for RbuilderOperatorBidObserver { if let Some(p) = self.tbv_pusher.as_ref() { p.block_submitted( slot_data, - sealed_block, submit_block_request, built_block_trace, builder_name, diff --git a/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs b/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs index b789295a3..a7de3dd53 100644 --- a/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs +++ b/crates/rbuilder-operator/src/true_block_value_push/best_true_value_observer.rs @@ -7,7 +7,6 @@ use rbuilder::{ }; use rbuilder_primitives::mev_boost::SubmitBlockRequest; use redis::RedisError; -use reth_primitives::SealedBlock; use tokio_util::sync::CancellationToken; use super::{ @@ -74,7 +73,6 @@ impl BidObserver for BestTrueValueObserver { fn block_submitted( &self, slot_data: &MevBoostSlotData, - _sealed_block: &SealedBlock, _submit_block_request: &SubmitBlockRequest, built_block_trace: &BuiltBlockTrace, builder_name: String, diff --git a/crates/rbuilder-primitives/Cargo.toml b/crates/rbuilder-primitives/Cargo.toml index f8d51811b..092d84003 100644 --- a/crates/rbuilder-primitives/Cargo.toml +++ b/crates/rbuilder-primitives/Cargo.toml @@ -22,7 +22,20 @@ alloy-rpc-types-eth.workspace = true revm.workspace = true revm-inspectors.workspace = true +# reth +reth-chainspec.workspace = true +reth-primitives-traits.workspace = true +reth-primitives.workspace = true +reth-ethereum-primitives.workspace = true +reth-transaction-pool.workspace = true + ethereum-consensus.workspace = true +ethereum_ssz.workspace = true +ethereum_ssz_derive.workspace = true +ssz_types = "0.8.0" +tree_hash = "0.8.0" +tree_hash_derive = "0.8.0" +typenum = "1.17.0" # misc derivative.workspace = true @@ -38,18 +51,9 @@ tracing.workspace = true time.workspace = true thiserror.workspace = true eyre.workspace = true -ethereum_ssz_derive.workspace = true -ethereum_ssz.workspace = true serde.workspace = true derive_more.workspace = true serde_json.workspace = true -# reth -reth-chainspec.workspace = true -reth-primitives-traits.workspace = true -reth-primitives.workspace = true -reth-ethereum-primitives.workspace = true -reth-transaction-pool.workspace = true - [dev-dependencies] rand.workspace = true diff --git a/crates/rbuilder-primitives/src/built_block.rs b/crates/rbuilder-primitives/src/built_block.rs index feb3df508..eff29f8b9 100644 --- a/crates/rbuilder-primitives/src/built_block.rs +++ b/crates/rbuilder-primitives/src/built_block.rs @@ -1,5 +1,5 @@ use crate::mev_boost::{ - BidAdjustmentData, CapellaSubmitBlockRequest, DenebSubmitBlockRequest, + BidAdjustmentDataV1, CapellaSubmitBlockRequest, DenebSubmitBlockRequest, ElectraSubmitBlockRequest, FuluSubmitBlockRequest, SubmitBlockRequest, }; use alloy_eips::{ @@ -37,10 +37,27 @@ pub struct SignedBuiltBlock { } impl SignedBuiltBlock { - pub fn into_request( + /// Convert the signed block into [`SubmitBlockRequest`]. + /// NOTE: This does not set bid adjustment data. Use [`Self::into_request_with_adjustment_data`] instead. + pub fn into_request(self, chain_spec: &ChainSpec) -> eyre::Result { + self.into_request_inner(chain_spec, None) + } + + /// Convert the signed block into [`SubmitBlockRequest`] with ad + pub fn into_request_with_adjustment_data( + self, + chain_spec: &ChainSpec, + adjustment_data: Option, + ) -> eyre::Result { + self.into_request_inner(chain_spec, adjustment_data) + } + + /// Convert the signed block into [`SubmitBlockRequest`]. + /// NOTE: + fn into_request_inner( self, chain_spec: &ChainSpec, - adjustment_data: Option, + adjustment_data: Option, ) -> eyre::Result { match self.execution_payload { ExecutionPayload::V1(_v1) => { @@ -183,11 +200,7 @@ pub fn block_to_execution_payload( .body() .transactions .iter() - .map(|tx| { - let mut buf = Vec::new(); - tx.encode_2718(&mut buf); - buf.into() - }) + .map(|tx| tx.encoded_2718().into()) .collect(); let payload_v1 = ExecutionPayloadV1 { parent_hash: sealed_block.parent_hash, diff --git a/crates/rbuilder-primitives/src/mev_boost/adjustment.rs b/crates/rbuilder-primitives/src/mev_boost/adjustment.rs index c124178cb..c24c98baa 100644 --- a/crates/rbuilder-primitives/src/mev_boost/adjustment.rs +++ b/crates/rbuilder-primitives/src/mev_boost/adjustment.rs @@ -11,7 +11,7 @@ use alloy_primitives::{Address, Bloom, Bytes, B256}; ssz_derive::Encode, ssz_derive::Decode, )] -pub struct BidAdjustmentData { +pub struct BidAdjustmentDataV1 { /// State root of the payload. pub state_root: B256, /// Transactions root of the payload. @@ -84,3 +84,85 @@ pub struct BidAdjustmentDataV2 { /// New in V2: Logs bloom accrued until but not including the last (payment) transaction. pub pre_payment_logs_bloom: Bloom, } + +/// Common bid adjustment information that can be used for creating bid adjustment data. +#[derive(Clone, Debug)] +pub struct BidAdjustmentData { + /// State root of the payload. + pub state_root: B256, + /// Transactions root of the payload. + pub el_transactions_root: B256, + /// Withdrawals root of the payload. + pub el_withdrawals_root: B256, + /// Receipts root of the payload. + pub receipts_root: B256, + /// The merkle proof for the last transaction in the block, which will be overwritten with a + /// payment from `fee_payer` to `fee_recipient` if we adjust the bid. + pub el_placeholder_transaction_proof: Vec, + /// New in V2: SSZ merkle proof for last transaction + pub cl_placeholder_transaction_proof: Vec, + /// The merkle proof for the receipt of the placeholder transaction. It's required for + /// adjusting payments to contract addresses. + pub placeholder_receipt_proof: Vec, + /// New in V2: Logs bloom accrued until but not including the last (payment) transaction. + pub pre_payment_logs_bloom: Bloom, + /// State proofs. + pub state_proofs: BidAdjustmentStateProofs, +} + +impl BidAdjustmentData { + /// Convert bid adjustment data into [`BidAdjustmentDataV1`]. + pub fn into_v1(self) -> BidAdjustmentDataV1 { + BidAdjustmentDataV1 { + state_root: self.state_root, + transactions_root: self.el_transactions_root, + receipts_root: self.receipts_root, + builder_address: self.state_proofs.builder_address, + builder_proof: self.state_proofs.builder_proof, + fee_recipient_address: self.state_proofs.fee_recipient_address, + fee_recipient_proof: self.state_proofs.fee_recipient_proof, + fee_payer_address: self.state_proofs.fee_payer_address, + fee_payer_proof: self.state_proofs.fee_payer_proof, + placeholder_transaction_proof: self.el_placeholder_transaction_proof, + placeholder_receipt_proof: self.placeholder_receipt_proof, + } + } + + /// Convert bid adjustment data into [`BidAdjustmentDataV2`]. + pub fn into_v2(self) -> BidAdjustmentDataV2 { + BidAdjustmentDataV2 { + el_transactions_root: self.el_transactions_root, + el_withdrawals_root: self.el_withdrawals_root, + builder_address: self.state_proofs.builder_address, + builder_proof: self.state_proofs.builder_proof, + fee_recipient_address: self.state_proofs.fee_recipient_address, + fee_recipient_proof: self.state_proofs.fee_recipient_proof, + fee_payer_address: self.state_proofs.fee_payer_address, + fee_payer_proof: self.state_proofs.fee_payer_proof, + el_placeholder_transaction_proof: self.el_placeholder_transaction_proof, + cl_placeholder_transaction_proof: self.cl_placeholder_transaction_proof, + placeholder_receipt_proof: self.placeholder_receipt_proof, + pre_payment_logs_bloom: self.pre_payment_logs_bloom, + } + } +} + +/// Bid adjustment state proofs. +#[derive(Clone, Debug)] +pub struct BidAdjustmentStateProofs { + /// The usual builder address that pays the proposer in the last transaction of the block. + /// When we adjust a bid, this transaction is overwritten by a transaction from the collateral + /// account `fee_payer_address`. If we don't adjust the bid, `builder_address` pays the + /// proposer as per usual. + pub builder_address: Address, + /// The state proof for the builder account. + pub builder_proof: Vec, + /// The proposer's fee recipient. + pub fee_recipient_address: Address, + /// The state proof for the fee recipient account. + pub fee_recipient_proof: Vec, + /// The fee payer address that is custodied by the relay. + pub fee_payer_address: Address, + /// The state proof for the fee payer account. + pub fee_payer_proof: Vec, +} diff --git a/crates/rbuilder-primitives/src/mev_boost/mod.rs b/crates/rbuilder-primitives/src/mev_boost/mod.rs index 78afbcd10..01514c20f 100644 --- a/crates/rbuilder-primitives/src/mev_boost/mod.rs +++ b/crates/rbuilder-primitives/src/mev_boost/mod.rs @@ -7,7 +7,7 @@ use alloy_rpc_types_engine::{BlobsBundleV1, BlobsBundleV2, ExecutionPayloadV3}; use reqwest::Url; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; mod submit_block; pub use submit_block::*; @@ -15,6 +15,8 @@ pub use submit_block::*; mod submit_header; pub use submit_header::*; +pub mod ssz_roots; + mod optimistic_v3; pub use optimistic_v3::*; @@ -208,7 +210,7 @@ pub struct BidValueMetadata { #[derive(Clone, Debug)] pub struct SubmitBlockRequestWithMetadata { - pub submission: SubmitBlockRequest, + pub submission: Arc, pub metadata: BidMetadata, } @@ -232,7 +234,7 @@ impl serde::Serialize for SubmitBlockRequestNoBlobs<'_> { blobs_bundle: &'a BlobsBundleV1, signature: &'a BlsSignature, #[serde(skip_serializing_if = "Option::is_none")] - adjustment_data: &'a Option, + adjustment_data: &'a Option, } SignedBidSubmissionV3Ref { @@ -254,7 +256,7 @@ impl serde::Serialize for SubmitBlockRequestNoBlobs<'_> { execution_requests: &'a ExecutionRequestsV4, signature: &'a BlsSignature, #[serde(skip_serializing_if = "Option::is_none")] - adjustment_data: &'a Option, + adjustment_data: &'a Option, } SignedBidSubmissionV4Ref { diff --git a/crates/rbuilder/src/mev_boost/ssz_roots.rs b/crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs similarity index 100% rename from crates/rbuilder/src/mev_boost/ssz_roots.rs rename to crates/rbuilder-primitives/src/mev_boost/ssz_roots.rs diff --git a/crates/rbuilder-primitives/src/mev_boost/submit_block.rs b/crates/rbuilder-primitives/src/mev_boost/submit_block.rs index cf9b2aa7b..423372023 100644 --- a/crates/rbuilder-primitives/src/mev_boost/submit_block.rs +++ b/crates/rbuilder-primitives/src/mev_boost/submit_block.rs @@ -1,4 +1,4 @@ -use crate::mev_boost::BidAdjustmentData; +use crate::mev_boost::BidAdjustmentDataV1; use alloy_rpc_types_beacon::{ relay::{ BidTrace, SignedBidSubmissionV2, SignedBidSubmissionV3, SignedBidSubmissionV4, @@ -8,7 +8,7 @@ use alloy_rpc_types_beacon::{ BlsSignature, }; use alloy_rpc_types_engine::{ - BlobsBundleV1, BlobsBundleV2, ExecutionPayloadV2, ExecutionPayloadV3, + BlobsBundleV1, BlobsBundleV2, ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, }; use derive_more::Deref; use serde::{Deserialize, Serialize}; @@ -46,13 +46,50 @@ impl SubmitBlockRequest { pub fn bid_trace(&self) -> &BidTrace { match self { - SubmitBlockRequest::Fulu(req) => &req.message, SubmitBlockRequest::Capella(req) => &req.message, SubmitBlockRequest::Deneb(req) => &req.message, SubmitBlockRequest::Electra(req) => &req.message, + SubmitBlockRequest::Fulu(req) => &req.message, + } + } + + pub fn signature(&self) -> BlsSignature { + match self { + SubmitBlockRequest::Capella(req) => req.signature, + SubmitBlockRequest::Deneb(req) => req.signature, + SubmitBlockRequest::Electra(req) => req.signature, + SubmitBlockRequest::Fulu(req) => req.signature, + } + } + + pub fn execution_payload_v1(&self) -> &ExecutionPayloadV1 { + match self { + SubmitBlockRequest::Capella(req) => &req.execution_payload.payload_inner, + SubmitBlockRequest::Deneb(req) => &req.execution_payload.payload_inner.payload_inner, + SubmitBlockRequest::Electra(req) => &req.execution_payload.payload_inner.payload_inner, + SubmitBlockRequest::Fulu(req) => &req.execution_payload.payload_inner.payload_inner, + } + } + + pub fn execution_payload_v2(&self) -> &ExecutionPayloadV2 { + match self { + SubmitBlockRequest::Capella(req) => &req.execution_payload, + SubmitBlockRequest::Deneb(req) => &req.execution_payload.payload_inner, + SubmitBlockRequest::Electra(req) => &req.execution_payload.payload_inner, + SubmitBlockRequest::Fulu(req) => &req.execution_payload.payload_inner, } } + pub fn execution_payload_v3(&self) -> Option<&ExecutionPayloadV3> { + match self { + SubmitBlockRequest::Capella(_) => None, + SubmitBlockRequest::Deneb(req) => Some(&req.execution_payload), + SubmitBlockRequest::Electra(req) => Some(&req.execution_payload), + SubmitBlockRequest::Fulu(req) => Some(&req.execution_payload), + } + } + + /// Returns `true` if block has adjustment data. pub fn has_adjustment_data(&self) -> bool { let maybe_adjustment_data = match self { SubmitBlockRequest::Capella(req) => &req.adjustment_data, @@ -62,6 +99,40 @@ impl SubmitBlockRequest { }; maybe_adjustment_data.is_some() } + + /// Return mutable reference to bid adjustment data. + fn adjustment_data_mut(&mut self) -> &mut Option { + match self { + Self::Capella(CapellaSubmitBlockRequest { + adjustment_data, .. + }) + | Self::Deneb(DenebSubmitBlockRequest { + adjustment_data, .. + }) + | Self::Electra(ElectraSubmitBlockRequest { + adjustment_data, .. + }) + | Self::Fulu(FuluSubmitBlockRequest { + adjustment_data, .. + }) => adjustment_data, + } + } + + /// Set the bid adjustment data on the request. + pub fn set_adjustment_data(&mut self, data: BidAdjustmentDataV1) { + *self.adjustment_data_mut() = Some(data); + } + + /// Remove adjustment data from the bid. + pub fn remove_adjustment_data(&mut self) { + self.adjustment_data_mut().take(); + } + + /// Remove adjustment data from the bid and return it. + pub fn without_adjustment_data(mut self) -> Self { + self.remove_adjustment_data(); + self + } } impl ssz::Decode for SubmitBlockRequest { @@ -91,13 +162,13 @@ pub struct FuluSubmitBlockRequest { #[serde(flatten)] pub submission: SignedBidSubmissionV5, #[serde(skip_serializing_if = "Option::is_none")] - pub adjustment_data: Option, + pub adjustment_data: Option, } impl FuluSubmitBlockRequest { pub fn new( submission: SignedBidSubmissionV5, - adjustment_data: Option, + adjustment_data: Option, ) -> Self { Self { submission, @@ -114,14 +185,14 @@ pub struct ElectraSubmitBlockRequest { pub submission: SignedBidSubmissionV4, /// Bid adjustment data if present. #[serde(skip_serializing_if = "Option::is_none")] - pub adjustment_data: Option, + pub adjustment_data: Option, } impl ElectraSubmitBlockRequest { /// Create new Electra submit block request. pub fn new( submission: SignedBidSubmissionV4, - adjustment_data: Option, + adjustment_data: Option, ) -> Self { Self { submission, @@ -142,7 +213,7 @@ impl ssz::Encode for FuluSubmitBlockRequest { + ::ssz_fixed_len() + ::ssz_fixed_len(); if self.adjustment_data.is_some() { - offset += ::ssz_fixed_len(); + offset += ::ssz_fixed_len(); } let mut encoder = ssz::SszEncoder::container(buf, offset); @@ -166,7 +237,7 @@ impl ssz::Encode for FuluSubmitBlockRequest { + ::ssz_bytes_len(&self.execution_requests) + ::ssz_bytes_len(&self.signature); if let Some(adjustment) = &self.adjustment_data { - len += ::ssz_bytes_len(adjustment); + len += ::ssz_bytes_len(adjustment); } len } @@ -185,7 +256,7 @@ impl ssz::Decode for FuluSubmitBlockRequest { blobs_bundle: BlobsBundleV2, execution_requests: ExecutionRequestsV4, signature: BlsSignature, - adjustment_data: BidAdjustmentData, + adjustment_data: BidAdjustmentDataV1, } if let Ok(request) = FuluSubmitBlockRequestSszHelper::from_ssz_bytes(bytes) { @@ -224,7 +295,7 @@ impl ssz::Encode for ElectraSubmitBlockRequest { + ::ssz_fixed_len() + ::ssz_fixed_len(); if self.adjustment_data.is_some() { - offset += ::ssz_fixed_len(); + offset += ::ssz_fixed_len(); } let mut encoder = ssz::SszEncoder::container(buf, offset); @@ -248,7 +319,7 @@ impl ssz::Encode for ElectraSubmitBlockRequest { + ::ssz_bytes_len(&self.execution_requests) + ::ssz_bytes_len(&self.signature); if let Some(adjustment) = &self.adjustment_data { - len += ::ssz_bytes_len(adjustment); + len += ::ssz_bytes_len(adjustment); } len } @@ -267,7 +338,7 @@ impl ssz::Decode for ElectraSubmitBlockRequest { blobs_bundle: BlobsBundleV1, execution_requests: ExecutionRequestsV4, signature: BlsSignature, - adjustment_data: BidAdjustmentData, + adjustment_data: BidAdjustmentDataV1, } if let Ok(request) = ElectraSubmitBlockRequestSszHelper::from_ssz_bytes(bytes) { @@ -302,14 +373,14 @@ pub struct DenebSubmitBlockRequest { pub submission: SignedBidSubmissionV3, /// Bid adjustment data if present. #[serde(skip_serializing_if = "Option::is_none")] - pub adjustment_data: Option, + pub adjustment_data: Option, } impl DenebSubmitBlockRequest { /// Create new Deneb submit block request. pub fn new( submission: SignedBidSubmissionV3, - adjustment_data: Option, + adjustment_data: Option, ) -> Self { Self { submission, @@ -329,7 +400,7 @@ impl ssz::Encode for DenebSubmitBlockRequest { + ::ssz_fixed_len() + ::ssz_fixed_len(); if self.adjustment_data.is_some() { - offset += ::ssz_fixed_len(); + offset += ::ssz_fixed_len(); } let mut encoder = ssz::SszEncoder::container(buf, offset); @@ -351,7 +422,7 @@ impl ssz::Encode for DenebSubmitBlockRequest { + ::ssz_bytes_len(&self.blobs_bundle) + ::ssz_bytes_len(&self.signature); if let Some(adjustment) = &self.adjustment_data { - len += ::ssz_bytes_len(adjustment); + len += ::ssz_bytes_len(adjustment); } len } @@ -369,7 +440,7 @@ impl ssz::Decode for DenebSubmitBlockRequest { execution_payload: ExecutionPayloadV3, blobs_bundle: BlobsBundleV1, signature: BlsSignature, - adjustment_data: BidAdjustmentData, + adjustment_data: BidAdjustmentDataV1, } if let Ok(request) = DenebSubmitBlockRequestSszHelper::from_ssz_bytes(bytes) { @@ -402,14 +473,14 @@ pub struct CapellaSubmitBlockRequest { pub submission: SignedBidSubmissionV2, /// Bid adjustment data if present. #[serde(skip_serializing_if = "Option::is_none")] - pub adjustment_data: Option, + pub adjustment_data: Option, } impl CapellaSubmitBlockRequest { /// Create new Capella submit block request. pub fn new( submission: SignedBidSubmissionV2, - adjustment_data: Option, + adjustment_data: Option, ) -> Self { Self { submission, @@ -428,7 +499,7 @@ impl ssz::Encode for CapellaSubmitBlockRequest { + ::ssz_fixed_len() + ::ssz_fixed_len(); if self.adjustment_data.is_some() { - offset += ::ssz_fixed_len(); + offset += ::ssz_fixed_len(); } let mut encoder = ssz::SszEncoder::container(buf, offset); @@ -447,7 +518,7 @@ impl ssz::Encode for CapellaSubmitBlockRequest { + ::ssz_bytes_len(&self.execution_payload) + ::ssz_bytes_len(&self.signature); if let Some(adjustment) = &self.adjustment_data { - len += ::ssz_bytes_len(adjustment); + len += ::ssz_bytes_len(adjustment); } len } @@ -464,7 +535,7 @@ impl ssz::Decode for CapellaSubmitBlockRequest { message: BidTrace, execution_payload: ExecutionPayloadV2, signature: BlsSignature, - adjustment_data: BidAdjustmentData, + adjustment_data: BidAdjustmentDataV1, } if let Ok(request) = CapellaSubmitBlockRequestSszHelper::from_ssz_bytes(bytes) { diff --git a/crates/rbuilder-primitives/src/mev_boost/submit_header.rs b/crates/rbuilder-primitives/src/mev_boost/submit_header.rs index 8cc79075b..2839a4a92 100644 --- a/crates/rbuilder-primitives/src/mev_boost/submit_header.rs +++ b/crates/rbuilder-primitives/src/mev_boost/submit_header.rs @@ -1,6 +1,10 @@ -use crate::mev_boost::adjustment::BidAdjustmentDataV2; +use crate::mev_boost::{ + adjustment::BidAdjustmentDataV2, + ssz_roots::{calculate_transactions_root_ssz, calculate_withdrawals_root_ssz}, +}; use alloy_primitives::{Address, Bloom, Bytes, B256, U256}; use alloy_rpc_types_beacon::{relay::BidTrace, requests::ExecutionRequestsV4, BlsSignature}; +use alloy_rpc_types_engine::ExecutionPayloadV3; use serde_with::{serde_as, DisplayFromStr}; /// Optimistic V3 bid submission. @@ -14,7 +18,7 @@ use serde_with::{serde_as, DisplayFromStr}; ssz_derive::Encode, ssz_derive::Decode, )] -pub struct HeaderSubmissionV3 { +pub struct HeaderSubmissionOptimisticV3 { /// URL pointing to the builder's server endpoint for retrieving /// the full block payload if this header is selected. pub url: Vec, @@ -38,11 +42,26 @@ pub struct HeaderSubmissionV3 { )] pub struct SignedHeaderSubmission { /// Electra header submission. - pub message: HeaderSubmissionElectra, + pub message: HeaderSubmission, /// Builder signature. pub signature: BlsSignature, } +#[derive( + PartialEq, + Eq, + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + ssz_derive::Encode, + ssz_derive::Decode, +)] +#[ssz(enum_behaviour = "transparent")] +pub enum HeaderSubmission { + Electra(HeaderSubmissionElectra), +} + /// Electra header submission. #[derive( PartialEq, @@ -125,3 +144,29 @@ pub struct ExecutionPayloadHeaderElectra { #[serde_as(as = "DisplayFromStr")] pub excess_blob_gas: u64, } + +impl From<&ExecutionPayloadV3> for ExecutionPayloadHeaderElectra { + fn from(v3: &ExecutionPayloadV3) -> Self { + let v2 = &v3.payload_inner; + let v1 = &v2.payload_inner; + ExecutionPayloadHeaderElectra { + parent_hash: v1.parent_hash, + fee_recipient: v1.fee_recipient, + state_root: v1.state_root, + receipts_root: v1.receipts_root, + logs_bloom: v1.logs_bloom, + prev_randao: v1.prev_randao, + block_number: v1.block_number, + gas_limit: v1.gas_limit, + gas_used: v1.gas_used, + timestamp: v1.timestamp, + extra_data: v1.extra_data.clone(), + base_fee_per_gas: v1.base_fee_per_gas, + block_hash: v1.block_hash, + transactions_root: calculate_transactions_root_ssz(&v1.transactions), + withdrawals_root: calculate_withdrawals_root_ssz(&v2.withdrawals), + blob_gas_used: v3.blob_gas_used, + excess_blob_gas: v3.excess_blob_gas, + } + } +} diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index 6e36293bb..443eb4ecb 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -60,10 +60,6 @@ alloy-trie.workspace = true ethereum_ssz_derive.workspace = true ethereum_ssz.workspace = true -ssz_types = "0.8.0" -tree_hash = "0.8.0" -tree_hash_derive = "0.8.0" -typenum = "1.17.0" test_utils = { path = "src/test_utils" } metrics_macros = { path = "src/telemetry/metrics_macros" } @@ -130,6 +126,7 @@ dashmap = "6.1.0" tonic.workspace = true prost.workspace = true prost-types.workspace = true +schnellru = "0.2.4" # IPC state provider deps reipc = { git = "https://github.com/nethermindeth/reipc.git", rev = "3837f3539201f948dd1c2c96a85a60d589feb4c6" } diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index 513e9d1aa..6cfec1915 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -25,6 +25,7 @@ use alloy_eips::{ eip7685::Requests, eip7840::BlobParams, merge::BEACON_NONCE, + Encodable2718, }; use alloy_evm::{block::system_calls::SystemCaller, env::EvmEnv, eth::eip6110}; use alloy_primitives::{Address, BlockNumber, Bytes, B256, I256, U256}; @@ -35,8 +36,10 @@ use eth_sparse_mpt::SparseTrieLocalCache; use evm::EthCachedEvmFactory; use jsonrpsee::core::Serialize; use rbuilder_primitives::{ - mev_boost::BidAdjustmentData, BlockSpace, Order, OrderId, SimValue, SimulatedOrder, - TransactionSignedEcRecoveredWithBlobs, + mev_boost::{ + ssz_roots::generate_transaction_proof_ssz, BidAdjustmentData, BidAdjustmentStateProofs, + }, + BlockSpace, Order, OrderId, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs, }; use reth::{ payload::PayloadId, @@ -60,6 +63,7 @@ use revm::{ }; use serde::Deserialize; use std::{ + cell::LazyCell, collections::{hash_map, HashMap, HashSet}, hash::Hash, str::FromStr, @@ -1015,8 +1019,9 @@ impl>(); + generate_transaction_proof_ssz(&encoded_txs, target) + }); + let bid_adjustment_state_proofs = + Self::generate_bid_adjustment_state_proofs(state, ctx, local_ctx) + .inspect_err(|error| { + error!( + block_number = block.number, + ?error, + "Error generating bid adjustment data" + ); + }) + .unwrap_or_default(); + let bid_adjustments = bid_adjustment_state_proofs + .into_iter() + .map(|(fee_payer, state_proofs)| { + ( + fee_payer, + BidAdjustmentData { + state_root: block.header.state_root, + el_transactions_root: block.header.transactions_root, + el_withdrawals_root: block.header.withdrawals_root.unwrap_or_default(), + receipts_root: block.header.receipts_root, + el_placeholder_transaction_proof: el_placeholder_transaction_proof.clone(), + cl_placeholder_transaction_proof: cl_placeholder_transaction_proof.clone(), + placeholder_receipt_proof: placeholder_receipt_proof.clone(), + pre_payment_logs_bloom, + state_proofs, + }, + ) + }) + .collect(); let result = FinalizeResult { sealed_block: block.seal_slow(), @@ -1187,14 +1215,11 @@ impl, - placeholder_receipt_proof: Vec, - ) -> Result, FinalizeError> { + ) -> Result, FinalizeError> { if ctx.adjustment_fee_payers.is_empty() { return Ok(Default::default()); } @@ -1261,18 +1286,13 @@ impl, + pub optimistic_v3_config: Option, pub bid_observer: Box, } @@ -100,6 +108,15 @@ pub struct OptimisticConfig { pub max_bid_value: U256, } +/// Configuration for optimistic V3. +#[derive(Debug, Clone)] +pub struct OptimisticV3Config { + /// The URL where the relay can call to retrieve the block. + pub builder_url: Vec, + /// Sender for Optimistic V3 blocks. + pub block_sender: broadcast::Sender>, +} + /// Values from [`BuiltBlockTrace`] struct BuiltBlockInfo { pub bid_value: U256, @@ -123,31 +140,17 @@ async fn run_submit_to_relays_job( ) -> Option { let mut res = None; - let (normal_relays, optimistic_relays) = { - let mut normal_relays = Vec::new(); - let mut optimistic_relays = Vec::new(); - for relay in relays { - if relay.optimistic() { - optimistic_relays.push(relay); - } else { - normal_relays.push(relay); - } - } - (normal_relays, optimistic_relays) - }; + let (regular_relays, optimistic_relays) = + relays.into_iter().partition(|relay| !relay.optimistic()); let mut last_bid_hash = None; 'submit: loop { tokio::select! { _ = cancel.cancelled() => { - info!( - block = slot_data.block(), - "run_submit_to_relays_job cancelled" - ); + info!( block = slot_data.block(), "run_submit_to_relays_job cancelled"); break 'submit res; }, - _ = pending_bid.wait_for_change() => { - } + _ = pending_bid.wait_for_change() => {} }; let block = if let Some(new_block) = pending_bid.take_pending_block() { @@ -239,240 +242,273 @@ async fn run_submit_to_relays_job( "Submitting bid", ); inc_initiated_submissions(optimistic_config.is_some()); - let relay_filter = get_relay_filter(&block); let execution_payload = block_to_execution_payload( &config.chain_spec, &slot_data.payload_attributes_event.data, &block.sealed_block, ); - let (normal_signed_block, optimistic_signed_block) = { - let normal_signed_block = match sign_block_for_relay( + let (regular_request, optimistic_request) = { + let regular = create_submit_block_request( &config.signer, - &block.sealed_block, - &slot_data.payload_attributes_event.data, - slot_data.slot_data.pubkey, - block.trace.bid_value, - ) { - Ok((message, signature)) => SignedBuiltBlock { - message, - signature, - execution_payload: execution_payload.clone(), - blob_sidecars: block.txs_blobs_sidecars.clone(), - execution_requests: block.execution_requests.clone(), - }, - Err(err) => { - error!(parent: &submission_span, err = ?err, "Error signing block for relay"); - continue 'submit; - } - }; - - let optimistic_signed_block = if let Some(optimistic_config) = optimistic_config { - match sign_block_for_relay( + &config.chain_spec, + &slot_data, + &block, + &execution_payload, + ) + .inspect_err(|error| { + error!(parent: &submission_span, ?error, "Error creating regular submit block request"); + }) + .ok(); + + let mut optimistic = None; + if let Some(optimistic_config) = optimistic_config { + optimistic = create_submit_block_request( &optimistic_config.signer, - &block.sealed_block, - &slot_data.payload_attributes_event.data, - slot_data.slot_data.pubkey, - block.trace.bid_value, - ) { - Ok((message, signature)) => Some(( - SignedBuiltBlock { - message, - signature, - execution_payload, - blob_sidecars: block.txs_blobs_sidecars.clone(), - execution_requests: block.execution_requests.clone(), - }, - optimistic_config, - )), - Err(err) => { - error!(parent: &submission_span, err = ?err, "Error signing block for relay"); - continue 'submit; - } - } - } else { - None - }; + &config.chain_spec, + &slot_data, + &block, + &execution_payload, + ).inspect_err(|error| { + error!(parent: &submission_span, ?error, "Error creating optimistic submit block request"); + }) + .ok(); + } - (normal_signed_block, optimistic_signed_block) + (regular, optimistic) }; - mark_submission_start_time(block.trace.orders_sealed_at); - submit_block_to_relays( - &config.chain_spec, - &normal_signed_block, - &bid_metadata, - &block.bid_adjustments, - &normal_relays, - &slot_data.relay_registrations, - &relay_filter, - false, - &submission_span, - &cancel, - ); + if regular_request.is_none() && optimistic_request.is_none() { + error!(parent: &submission_span, "Unable to construct request from the built block"); + continue 'submit; + } - let signed_block = if let Some((optimistic_signed_block, _)) = optimistic_signed_block { + mark_submission_start_time(block.trace.orders_sealed_at); + if let Some(request) = ®ular_request { submit_block_to_relays( - &config.chain_spec, - &optimistic_signed_block, + request, &bid_metadata, &block.bid_adjustments, - &optimistic_relays, + ®ular_relays, &slot_data.relay_registrations, - &relay_filter, - true, + false, + &config.optimistic_v3_config, &submission_span, &cancel, - ); - optimistic_signed_block - } else { + ) + } + + let optimistic_request = optimistic_request + .map(|req| (req, true)) // non-optimistic submission to optimistic relays + .or(regular_request.map(|req| (req, false))); + if let Some((request, optimistic)) = optimistic_request { submit_block_to_relays( - &config.chain_spec, - &normal_signed_block, + &request, &bid_metadata, &block.bid_adjustments, &optimistic_relays, &slot_data.relay_registrations, - &relay_filter, - false, + optimistic, + &config.optimistic_v3_config, &submission_span, &cancel, ); - normal_signed_block - }; - match signed_block.into_request(&config.chain_spec, None) { - Ok(request) => { - submission_span.in_scope(|| { - // NOTE: we only notify normal submission here because they have the same contents but different pubkeys - config.bid_observer.block_submitted( - &slot_data, - &block.sealed_block, - &request, - &block.trace, - builder_name, - bid_metadata.value.top_competitor_bid.unwrap_or_default(), - ); - }) - } - Err(err) => { - error!(parent: &submission_span, err = ?err, "Error converting request for bid observer"); - } - }; + submission_span.in_scope(|| { + // NOTE: we only notify normal submission here because they have the same contents but different pubkeys + config.bid_observer.block_submitted( + &slot_data, + &request, + &block.trace, + builder_name, + bid_metadata.value.top_competitor_bid.unwrap_or_default(), + ); + }) + } } } +/// Create submit block request _without_ bid adjustments. +fn create_submit_block_request( + signer: &BLSBlockSigner, + chain_spec: &ChainSpec, + slot_data: &MevBoostSlotData, + block: &Block, + execution_payload: &ExecutionPayload, +) -> eyre::Result { + let (message, signature) = sign_block_for_relay( + signer, + &block.sealed_block, + &slot_data.payload_attributes_event.data, + slot_data.slot_data.pubkey, + block.trace.bid_value, + )?; + SignedBuiltBlock { + message, + signature, + execution_payload: execution_payload.clone(), + blob_sidecars: block.txs_blobs_sidecars.clone(), + execution_requests: block.execution_requests.clone(), + } + .into_request(chain_spec) +} + +// TODO: support Fulu +fn create_optimistic_v3_request( + builder_url: &[u8], + request: &SubmitBlockRequest, + maybe_adjustment_data: Option<&BidAdjustmentData>, +) -> eyre::Result { + let SubmitBlockRequest::Electra(request) = request else { + eyre::bail!("only electra requests are supported") + }; + + let Some(adjustment_data) = maybe_adjustment_data else { + eyre::bail!("adjustment data must exist") + }; + + let execution_payload_header = ExecutionPayloadHeaderElectra::from(&request.execution_payload); + let header_submission = HeaderSubmission::Electra(HeaderSubmissionElectra { + bid_trace: request.message.clone(), + execution_payload_header, + execution_requests: request.execution_requests.clone(), + commitments: request.blobs_bundle.commitments.clone(), + adjustment_data: adjustment_data.clone().into_v2(), + }); + + let tx_count = request + .execution_payload + .payload_inner + .payload_inner + .transactions + .len(); + Ok(HeaderSubmissionOptimisticV3 { + url: builder_url.to_vec(), + tx_count: tx_count as u32, + submission: SignedHeaderSubmission { + message: header_submission, + signature: request.signature, + }, + }) +} + #[allow(clippy::too_many_arguments)] fn submit_block_to_relays( - chain_spec: &ChainSpec, - signed_block: &SignedBuiltBlock, + request: &SubmitBlockRequest, bid_metadata: &BidMetadata, bid_adjustments: &std::collections::HashMap, relays: &Vec, registrations: &HashMap, - relay_filter: &impl Fn(&MevBoostRelayBidSubmitter) -> bool, optimistic: bool, + optimistic_v3_config: &Option, submission_span: &Span, cancel: &CancellationToken, ) { for relay in relays { - if relay_filter(relay) { - let registration = match registrations.get(relay.id()) { - Some(registration) => registration.clone(), - None => { - // Use any registrations for submitting to test relays. - debug_assert!(relay.test_relay()); - registrations.values().next().unwrap().clone() - } - }; - - let adjustment_data = registration - .adjustment_fee_payer - .and_then(|fee_payer| bid_adjustments.get(&fee_payer).cloned()); - let submission = match signed_block - .clone() - .into_request(chain_spec, adjustment_data) - { - Ok(submission) => SubmitBlockRequestWithMetadata { - submission, - metadata: bid_metadata.clone(), - }, - Err(error) => { - error_span!(parent: submission_span, "request_convert_error", relay = &relay.id(), ?error); - continue; - } - }; - - let span = info_span!(parent: submission_span, "relay_submit", relay = &relay.id(), optimistic); - let relay = relay.clone(); - let cancel = cancel.clone(); - tokio::spawn( - async move { - submit_bid_to_the_relay( - &relay, - cancel.clone(), - submission, - registration.registration, - optimistic, - ) - .await; - } - .instrument(span), - ); + // Blocks go only to relays that have a max bid > bid_value (or no max bid). + let bid_value = request.bid_trace().value; + if relay.max_bid().is_some_and(|max| bid_value > max) { + continue; } - } -} -/// Creates a Fn to decide if the block should go to a relay. -/// It's a Fn because the code changes a lot (used to be more complex). -/// Blocks go only to relays that have a max bid >= bid_value (or no max bid). -fn get_relay_filter(block: &Block) -> impl Fn(&MevBoostRelayBidSubmitter) -> bool { - let bid_value = block.trace.bid_value; - move |relay: &MevBoostRelayBidSubmitter| { - relay.max_bid().is_none_or(|max_bid| bid_value <= max_bid) - } -} + let registration = match registrations.get(relay.id()) { + Some(registration) => registration.clone(), + None => { + // Use any registrations for submitting to test relays. + debug_assert!(relay.test_relay()); + registrations.values().next().unwrap().clone() + } + }; -pub async fn run_submit_to_relays_job_and_metrics( - pending_bid: Arc, - slot_data: MevBoostSlotData, - relays: Vec, - config: Arc, - cancel: CancellationToken, -) { - let last_build_block_info = - run_submit_to_relays_job(pending_bid, slot_data, relays, config, cancel).await; - if let Some(last_build_block_info) = last_build_block_info { - if last_build_block_info.bid_value > last_build_block_info.true_bid_value { - inc_subsidized_blocks(false); - add_subsidy_value( - last_build_block_info.bid_value - last_build_block_info.true_bid_value, - false, - ); + let maybe_adjustment_data = registration + .adjustment_fee_payer + .and_then(|fee_payer| bid_adjustments.get(&fee_payer)); + + let mut optimistic_v3 = None; + if relay.optimistic_v3() { + if let Some(config) = optimistic_v3_config { + optimistic_v3 = create_optimistic_v3_request( + &config.builder_url, + request, + maybe_adjustment_data, + ) + .map(|request| (config.clone(), request)) + .inspect_err(|error| { + error!(parent: submission_span, ?error, "Unable to create optimistic V3 request"); + }) + .ok(); + } + } + + let mut request = request.clone(); + + // We only set adjustment data on non optimistic v3 submissions. + // For optimistic v3, it is already included in the header submission. + if let Some(adjustment_data) = maybe_adjustment_data.filter(|_| optimistic_v3.is_none()) { + request.set_adjustment_data(adjustment_data.clone().into_v1()); } + + let submission = SubmitBlockRequestWithMetadata { + submission: Arc::new(request), + metadata: bid_metadata.clone(), + }; + + let span = + info_span!(parent: submission_span, "relay_submit", relay = &relay.id(), optimistic); + let relay = relay.clone(); + let cancel = cancel.clone(); + tokio::spawn( + async move { + submit_bid_to_the_relay( + &relay, + submission, + optimistic_v3, + registration.registration, + optimistic, + cancel, + ) + .await; + } + .instrument(span), + ); } } async fn submit_bid_to_the_relay( relay: &MevBoostRelayBidSubmitter, - cancel: CancellationToken, - signed_submit_request: SubmitBlockRequestWithMetadata, + submit_block_request: SubmitBlockRequestWithMetadata, + optimistic_v3_request: Option<(OptimisticV3Config, HeaderSubmissionOptimisticV3)>, registration: ValidatorSlotData, optimistic: bool, + cancel: CancellationToken, ) { let submit_start = Instant::now(); if !relay.can_submit_bid() { trace!("Relay submission is skipped due to rate limit"); return; - } + }; + + let request_fut = if let Some((config, request)) = optimistic_v3_request { + // Send the block to be saved in cache + let _ = config + .block_sender + .send(submit_block_request.submission.clone()); + relay + .submit_optimistic_v3(request, registration) + .left_future() + } else { + relay + .submit_block(submit_block_request.clone(), registration) + .right_future() + }; let relay_result = tokio::select! { _ = cancel.cancelled() => { return; }, - res = relay.submit_block(&signed_submit_request, ®istration) => res + res = request_fut => res }; let submit_time = submit_start.elapsed(); match relay_result { @@ -496,7 +532,7 @@ async fn submit_bid_to_the_relay( store_error_event( SIM_ERROR_CATEGORY, relay_result.as_ref().unwrap_err().to_string().as_str(), - &signed_submit_request.submission, + &submit_block_request.submission, ); error!( err = ?relay_result.unwrap_err(), @@ -552,12 +588,7 @@ async fn submit_bid_to_the_relay( #[derive(Debug)] pub struct RelaySubmitSinkFactory { submission_config: Arc, - /// Real relays (!MevBoostRelayBidSubmitter::test_relay()) - /// We submit to these only if the MevBoostRelayID is included on the MevBoostSlotData of the slot. - relays: HashMap, - /// Test relays (MevBoostRelayBidSubmitter::test_relay()) - /// Always included on submissions. - test_relays: Vec, + relays: Vec, } impl RelaySubmitSinkFactory { @@ -565,21 +596,12 @@ impl RelaySubmitSinkFactory { submission_config: SubmissionConfig, relays: Vec, ) -> Self { - let test_relays = relays.iter().filter(|r| r.test_relay()).cloned().collect(); - let relays = relays - .into_iter() - .filter(|r| !r.test_relay()) - .map(|relay| (relay.id().clone(), relay)) - .collect(); Self { submission_config: Arc::new(submission_config), relays, - test_relays, } } -} -impl RelaySubmitSinkFactory { pub fn create_builder_sink( &self, slot_data: MevBoostSlotData, @@ -587,20 +609,34 @@ impl RelaySubmitSinkFactory { ) -> Box { let pending_block_cell = Arc::new(PendingBlockCell::default()); - let relays = slot_data - .relay_registrations - .iter() - .flat_map(|(id, _)| self.relays.get(id)) - .chain(self.test_relays.iter()) - .cloned() - .collect(); - tokio::spawn(run_submit_to_relays_job_and_metrics( - pending_block_cell.clone(), - slot_data, - relays, - self.submission_config.clone(), - cancel, - )); + // Collect all relays to submit to. + let mut relays = Vec::new(); + for relay in &self.relays { + // Only submit to the relays with validator registrations in the slot... + if slot_data.relay_registrations.contains_key(relay.id()) + // ...and all test relays. + || relay.test_relay() + { + relays.push(relay.clone()); + } + } + + // Spawn the task to submit to selected relays and keep track of subsidized blocks. + tokio::spawn({ + let pending = pending_block_cell.clone(); + let config = self.submission_config.clone(); + async move { + let last_info = + run_submit_to_relays_job(pending, slot_data, relays, config, cancel).await; + if let Some(info) = last_info { + if info.bid_value > info.true_bid_value { + inc_subsidized_blocks(false); + add_subsidy_value(info.bid_value - info.true_bid_value, false); + } + } + } + }); + Box::new(PendingBlockCellToBlockBuildingSink { pending_block_cell }) } } diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index d0fe40781..83e8ae925 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -31,12 +31,18 @@ use crate::{ PartialBlockExecutionTracer, Sorting, }, live_builder::{ - block_output::bidding_service_interface::BiddingService2BidSender, cli::LiveBuilderConfig, + base_config::default_ip, + block_output::{ + bidding_service_interface::BiddingService2BidSender, relay_submit::OptimisticV3Config, + }, + cli::LiveBuilderConfig, payload_events::MevBoostSlotDataGenerator, }, mev_boost::{ - bloxroute_grpc, BLSBlockSigner, MevBoostRelayBidSubmitter, MevBoostRelaySlotInfoProvider, - RelayClient, RelayConfig, RelaySubmitConfig, + bloxroute_grpc, + optimistic_v3::{self, OPTIMISTIC_V3_CHANNEL_SIZE}, + BLSBlockSigner, MevBoostRelayBidSubmitter, MevBoostRelaySlotInfoProvider, RelayClient, + RelayConfig, RelaySubmitConfig, }, provider::StateProviderFactory, roothash::RootHashContext, @@ -47,6 +53,7 @@ use alloy_primitives::{ utils::{format_ether, parse_ether}, Address, FixedBytes, B256, U256, }; +use alloy_rpc_types_beacon::BlsPublicKey; use bid_scraper::config::NamedPublisherConfig; use ethereum_consensus::{ builder::compute_builder_domain, crypto::SecretKey, primitives::Version, @@ -65,14 +72,16 @@ use reth_provider::StaticFileProviderFactory; use serde::Deserialize; use serde_with::{serde_as, OneOrMany}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fmt::Debug, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::{Path, PathBuf}, str::FromStr, sync::Arc, time::Duration, }; -use tokio::sync::Mutex as TokioMutex; +use tokio::sync::{broadcast, Mutex as TokioMutex}; +use tokio_stream::wrappers::BroadcastStream; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use url::Url; @@ -154,6 +163,16 @@ pub struct L1Config { pub genesis_fork_version: Option, /// A bid scraper will be spawned for each NamedPublisherConfig. pub relay_bid_scrapers: Vec, + + /// Optimistic V3 server IP. + #[serde(default = "default_ip")] + pub optimistic_v3_server_ip: Ipv4Addr, + /// Optimistic V3 server port. + pub optimistic_v3_server_port: u16, + /// Optimistic V3 public URL. + pub optimistic_v3_public_url: String, + /// The relay pubkey. + pub optimistic_v3_relay_pubkeys: HashSet, } impl Default for L1Config { @@ -169,6 +188,10 @@ impl Default for L1Config { genesis_fork_version: None, relay_bid_scrapers: Default::default(), registration_update_interval_ms: None, + optimistic_v3_server_ip: default_ip(), + optimistic_v3_server_port: 6071, + optimistic_v3_public_url: String::new(), + optimistic_v3_relay_pubkeys: HashSet::default(), } } } @@ -208,6 +231,7 @@ impl L1Config { client.clone(), relay_config.name.clone(), submit_config, + relay_config.optimistic_v3, relay_config.mode == RelayMode::Test, )?); } else { @@ -300,14 +324,10 @@ impl L1Config { fn submission_config( &self, chain_spec: Arc, + signing_domain: B256, bid_observer: Box, + optimistic_v3_config: Option, ) -> eyre::Result { - let signing_domain = get_signing_domain( - chain_spec.chain, - self.beacon_clients()?, - self.genesis_fork_version.clone(), - )?; - let relay_secret_key = if let Some(secret_key) = &self.relay_secret_key { let resolved_key = secret_key.value()?; SecretKey::try_from(resolved_key)? @@ -341,6 +361,7 @@ impl L1Config { chain_spec, signer, optimistic_config, + optimistic_v3_config, bid_observer, }) } @@ -356,7 +377,46 @@ impl L1Config { Vec, ahash::HashMap, )> { - let submission_config = self.submission_config(chain_spec, bid_observer)?; + let signing_domain = get_signing_domain( + chain_spec.chain, + self.beacon_clients()?, + self.genesis_fork_version.clone(), + )?; + + let mut optimistic_v3_config = None; + if self.relays.iter().any(|r| r.optimistic_v3) { + let address = SocketAddr::V4(SocketAddrV4::new( + self.optimistic_v3_server_ip, + self.optimistic_v3_server_port, + )); + let builder_url = self.optimistic_v3_public_url.clone(); + + info!(local = %address, %builder_url, "Optimistic V3 is enabled for at least one relay, spawning server"); + if self.optimistic_v3_relay_pubkeys.is_empty() { + warn!("Optimistic V3 is enabled, but no relay pubkeys have been configured"); + } + + let (optimistic_v3_block_tx, optimistic_v3_block_rx) = + broadcast::channel(OPTIMISTIC_V3_CHANNEL_SIZE); + optimistic_v3::spawn_server( + address, + signing_domain, + self.optimistic_v3_relay_pubkeys.clone(), + BroadcastStream::from(optimistic_v3_block_rx), + )?; + + optimistic_v3_config = Some(OptimisticV3Config { + builder_url: builder_url.into_bytes(), + block_sender: optimistic_v3_block_tx, + }) + } + + let submission_config = self.submission_config( + chain_spec, + signing_domain, + bid_observer, + optimistic_v3_config, + )?; info!( "Builder mev boost normal relay pubkey: {:?}", submission_config.signer.pub_key() @@ -816,6 +876,7 @@ lazy_static! { bloxroute_rproxy_regions: Vec::new(), ask_for_filtering_validators: None, can_ignore_gas_limit: None, + optimistic_v3: false, }, ); map.insert( @@ -841,6 +902,7 @@ lazy_static! { bloxroute_rproxy_regions: Vec::new(), ask_for_filtering_validators: None, can_ignore_gas_limit: None, + optimistic_v3: false, }, ); map.insert( @@ -866,6 +928,7 @@ lazy_static! { bloxroute_rproxy_regions: Vec::new(), ask_for_filtering_validators: None, can_ignore_gas_limit: None, + optimistic_v3: false, }, ); map.insert( @@ -890,6 +953,7 @@ lazy_static! { bloxroute_rproxy_regions: Vec::new(), ask_for_filtering_validators: None, can_ignore_gas_limit: None, + optimistic_v3: false }, ); map.insert( @@ -915,6 +979,7 @@ lazy_static! { bloxroute_rproxy_regions: Vec::new(), ask_for_filtering_validators: None, can_ignore_gas_limit: None, + optimistic_v3: false }, ); map diff --git a/crates/rbuilder/src/mev_boost/mod.rs b/crates/rbuilder/src/mev_boost/mod.rs index 2e95dfa43..a7cd5d867 100644 --- a/crates/rbuilder/src/mev_boost/mod.rs +++ b/crates/rbuilder/src/mev_boost/mod.rs @@ -1,24 +1,14 @@ -pub mod bloxroute_grpc; -mod error; -pub mod fake_mev_boost_relay; -pub mod rpc; -pub mod sign_payload; -pub mod ssz_roots; - -use crate::mev_boost::bloxroute_grpc::GrpcRelayClient; -use rbuilder_primitives::mev_boost::{ - KnownRelay, MevBoostRelayID, RelayMode, SubmitBlockRequestNoBlobs, - SubmitBlockRequestWithMetadata, ValidatorRegistration, ValidatorSlotData, - MEV_BOOST_SLOT_INFO_REQUEST_TIMEOUT, -}; - use super::utils::u256decimal_serde_helper; - use alloy_primitives::{utils::parse_ether, Address, BlockHash, U256}; use alloy_rpc_types_beacon::BlsPublicKey; use flate2::{write::GzEncoder, Compression}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use itertools::Itertools; +use rbuilder_primitives::mev_boost::{ + HeaderSubmissionOptimisticV3, KnownRelay, MevBoostRelayID, RelayMode, + SubmitBlockRequestNoBlobs, SubmitBlockRequestWithMetadata, ValidatorRegistration, + ValidatorSlotData, MEV_BOOST_SLOT_INFO_REQUEST_TIMEOUT, +}; use reqwest::{ header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE}, Body, Response, StatusCode, @@ -27,8 +17,17 @@ use serde::{Deserialize, Deserializer, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use ssz::Encode; use std::{io::Write, sync::Arc, time::Duration}; +use tracing::*; use url::Url; +pub mod bloxroute_grpc; +use bloxroute_grpc::GrpcRelayClient; + +mod error; +pub mod fake_mev_boost_relay; +pub mod optimistic_v3; +pub mod rpc; +pub mod sign_payload; pub use error::*; pub use sign_payload::*; @@ -107,6 +106,9 @@ pub struct RelayConfig { /// If we submit a block with a different gas than the one the validator registered with in this relay the relay does not mind. /// None -> false pub can_ignore_gas_limit: Option, + /// Flag indicating whether optimistic V3 submissions should be used. + #[serde(default)] + pub optimistic_v3: bool, } #[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)] @@ -233,13 +235,15 @@ pub struct MevBoostRelayBidSubmitter { /// Relay accepts optimistic submissions. optimistic: bool, submission_rate_limiter: Option>, - /// This is not a real relay so we can send blocks to it even if it does not have any validator registered. - test_relay: bool, /// Parameter for the relay cancellations: bool, + /// Flag indicating whether optimistic v3 submissions should be used. + optimistic_v3: bool, /// Max bid we can submit to this relay. Any bid above this will be skipped. /// None -> No limit. max_bid: Option, + /// This is not a real relay so we can send blocks to it even if it does not have any validator registered. + test_relay: bool, } impl MevBoostRelayBidSubmitter { @@ -247,6 +251,7 @@ impl MevBoostRelayBidSubmitter { client: RelayClient, id: String, config: &RelaySubmitConfig, + optimistic_v3: bool, test_relay: bool, ) -> eyre::Result { let max_bid = config @@ -267,24 +272,29 @@ impl MevBoostRelayBidSubmitter { use_gzip_for_submit: config.use_gzip_for_submit, optimistic: config.optimistic, submission_rate_limiter, - test_relay, cancellations: true, + optimistic_v3, max_bid, + test_relay, }) } - pub fn test_relay(&self) -> bool { - self.test_relay - } - pub fn id(&self) -> &MevBoostRelayID { &self.id } + pub fn test_relay(&self) -> bool { + self.test_relay + } + pub fn optimistic(&self) -> bool { self.optimistic } + pub fn optimistic_v3(&self) -> bool { + self.optimistic_v3 + } + pub fn max_bid(&self) -> Option { self.max_bid } @@ -300,13 +310,13 @@ impl MevBoostRelayBidSubmitter { pub async fn submit_block( &self, - data: &SubmitBlockRequestWithMetadata, - registration: &ValidatorSlotData, + data: SubmitBlockRequestWithMetadata, + registration: ValidatorSlotData, ) -> Result<(), SubmitBlockErr> { self.client .submit_block( - data, - registration, + &data, + ®istration, self.use_ssz_for_submit, self.use_gzip_for_submit, self.test_relay, @@ -314,6 +324,14 @@ impl MevBoostRelayBidSubmitter { ) .await } + + pub async fn submit_optimistic_v3( + &self, + data: HeaderSubmissionOptimisticV3, + registration: ValidatorSlotData, + ) -> Result<(), SubmitBlockErr> { + self.client.submit_optimistic_v3(&data, ®istration).await + } } /// Wrapper over RelayClient that allows to ask for slot validators info. @@ -496,7 +514,7 @@ pub enum SubmitBlockErr { #[error("Block known")] BlockKnown, #[error("gRPC error")] - Grpc(#[from] tonic::Status), + Grpc(#[from] Box), } impl std::fmt::Debug for SubmitBlockErr { @@ -658,54 +676,33 @@ impl RelayClient { fake_relay: bool, cancellations: bool, ) -> Result { - let mut bloxroute_region = None; - let url = { - let maybe_regional_endpoint = self.bloxroute_rproxy_regions.iter().find_map(|region| { - registration - .regional_endpoints - .iter() - .find(|r| r.region.ends_with(region.as_str())) - }); - let mut url = if let Some(regional) = maybe_regional_endpoint { - Url::parse(®ional.http_endpoint).map_err(|error| { - tracing::error!(?error, url = %regional.http_endpoint, "Error parsing rproxy URL"); - SubmitBlockErr::InvalidUrl(error) - })? - } else { - if self.is_bloxroute { - // It's a bloxroute endpoint and we are not using rProxy, restrict to main relay region. - bloxroute_region = Some(HeaderValue::from_static("na")); - } - self.url.clone() - }; - - url.set_path("/relay/v1/builder/blocks"); - url.query_pairs_mut() - .append_pair("cancellations", if cancellations { "1" } else { "0" }); + let SubmitBlockRequestWithMetadata { + submission, + metadata, + } = submission_with_metadata; - if submission_with_metadata.submission.has_adjustment_data() { - url.query_pairs_mut().append_pair("adjustments", "1"); - } + let mut headers = HeaderMap::new(); + self.add_auth_headers(&mut headers) + .map_err(|_| SubmitBlockErr::InvalidHeader)?; - url - }; + let mut url = self.get_base_submit_block_url(registration, &mut headers)?; + url.set_path("/relay/v1/builder/blocks"); + url.query_pairs_mut() + .append_pair("cancellations", if cancellations { "1" } else { "0" }); + if submission.has_adjustment_data() { + url.query_pairs_mut().append_pair("adjustments", "1"); + } let mut builder = self.client.post(url.clone()); - let mut headers = HeaderMap::new(); // SSZ vs JSON let (mut body_data, content_type) = if ssz { - ( - submission_with_metadata.submission.as_ssz_bytes(), - SSZ_CONTENT_TYPE, - ) + (submission.as_ssz_bytes(), SSZ_CONTENT_TYPE) } else { let json_result = if fake_relay { // For the fake relay we remove the blobs - serde_json::to_vec(&SubmitBlockRequestNoBlobs( - &submission_with_metadata.submission, - )) + serde_json::to_vec(&SubmitBlockRequestNoBlobs(submission)) } else { - serde_json::to_vec(&submission_with_metadata.submission) + serde_json::to_vec(submission) }; ( @@ -714,8 +711,6 @@ impl RelayClient { ) }; headers.insert(CONTENT_TYPE, HeaderValue::from_static(content_type)); - self.add_auth_headers(&mut headers) - .map_err(|_| SubmitBlockErr::InvalidHeader)?; // GZIP if gzip { @@ -734,13 +729,9 @@ impl RelayClient { // Set bloxroute specific headers. if self.is_bloxroute { - if let Some(region) = bloxroute_region { - headers.insert(BLOXROUTE_SHARE_HEADER, region); - } headers.insert( BLOXROUTE_BUILDER_VALUE_HEADER, - submission_with_metadata - .metadata + metadata .value .coinbase_reward .to_string() @@ -753,24 +744,17 @@ impl RelayClient { if fake_relay { builder = builder.header( TOTAL_PAYMENT_HEADER, - submission_with_metadata - .metadata - .value - .coinbase_reward - .to_string(), + metadata.value.coinbase_reward.to_string(), ); - if let Some(top_competitor_bid) = - submission_with_metadata.metadata.value.top_competitor_bid - { + if let Some(top_competitor_bid) = metadata.value.top_competitor_bid { builder = builder.header(TOP_BID_HEADER, top_competitor_bid.to_string()); } - if !submission_with_metadata.metadata.order_ids.is_empty() { + if !metadata.order_ids.is_empty() { const MAX_BUNDLE_IDS: usize = 150; - let bundle_ids: Vec<_> = submission_with_metadata - .metadata + let bundle_ids: Vec<_> = metadata .order_ids .iter() - .filter_map(|or| match or { + .filter_map(|order| match order { rbuilder_primitives::OrderId::Tx(_fixed_bytes) => None, rbuilder_primitives::OrderId::Bundle(uuid) => Some(uuid), rbuilder_primitives::OrderId::ShareBundle(_fixed_bytes) => None, @@ -807,10 +791,10 @@ impl RelayClient { async fn call_bloxroute_grpc_submit_block( &self, client: &GrpcRelayClient, - submission_with_metadata: &SubmitBlockRequestWithMetadata, + submission: &SubmitBlockRequestWithMetadata, ) -> Result { let mut request = tonic::Request::new(bloxroute_grpc::types::SubmitBlockRequest::from( - &submission_with_metadata.submission, + submission.submission.as_ref(), )); request.set_timeout(Duration::from_secs(2)); request.metadata_mut().insert( @@ -823,7 +807,7 @@ impl RelayClient { ); request.metadata_mut().insert( BLOXROUTE_BUILDER_VALUE_HEADER, - submission_with_metadata + submission .metadata .value .coinbase_reward @@ -848,7 +832,12 @@ impl RelayClient { .map_err(|_| SubmitBlockErr::InvalidHeader)?, ); - let response = client.lock().await.submit_block(request).await?; + let response = client + .lock() + .await + .submit_block(request) + .await + .map_err(Box::new)?; Ok(response.into_inner()) } @@ -891,43 +880,63 @@ impl RelayClient { .call_relay_submit_block(data, registration, ssz, gzip, fake_relay, cancellations) .await?; - let status = response.status(); - if status == StatusCode::TOO_MANY_REQUESTS { - return Err(RelayError::TooManyRequests.into()); - } - if status == StatusCode::GATEWAY_TIMEOUT { - return Err(RelayError::ConnectionError.into()); - } + map_response(response).await + } - let data = response - .bytes() + pub async fn submit_optimistic_v3( + &self, + request: &HeaderSubmissionOptimisticV3, + registration: &ValidatorSlotData, + ) -> Result<(), SubmitBlockErr> { + let mut headers = HeaderMap::new(); + self.add_auth_headers(&mut headers) + .map_err(|_| SubmitBlockErr::InvalidHeader)?; + + let mut url = self.get_base_submit_block_url(registration, &mut headers)?; + url.set_path("/relay/v3/builder/headers"); + + let body = request.as_ssz_bytes(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static(SSZ_CONTENT_TYPE)); + + let response = self + .client + .post(url.clone()) + .headers(headers) + .body(body) + .send() .await - .map_err(|err| RelayError::RequestError(err.into()))?; + .map_err(|e| RelayError::RequestError(e.into()))?; - if status == StatusCode::OK && data.is_empty() { - return Ok(()); - } + map_response(response).await + } - match serde_json::from_slice::>(&data) { - Ok(RelayResponse::Ok(_)) => Ok(()), - Ok(RelayResponse::Error(error)) => { - Err(map_relay_error_message(&error.message, error.code)) - } - Err(_) => { - // bloxroute returns empty response in this format which we handle here because its not valid - // jsonrpc response - let data = String::from_utf8_lossy(&data).to_string(); - if data.trim() == "{}" { - return Ok(()); - } + /// Constructs the URL for block submission. + /// For the bloxroute relay, if there is no rproxy endpoint to submit to, + /// sets the sharing header restricting bids to the location of the main relay. + fn get_base_submit_block_url( + &self, + registration: &ValidatorSlotData, + headers: &mut HeaderMap, + ) -> Result { + let maybe_regional_endpoint = self.bloxroute_rproxy_regions.iter().find_map(|region| { + registration + .regional_endpoints + .iter() + .find(|r| r.region.ends_with(region.as_str())) + }); - if is_ignorable_relay_error(status, &data) { - Ok(()) - } else { - Err(RelayError::UnknownRelayError(status, data).into()) - } - } + if let Some(regional) = maybe_regional_endpoint { + return Url::parse(®ional.http_endpoint).map_err(|error| { + error!(?error, url = %regional.http_endpoint, "Error parsing rproxy URL"); + SubmitBlockErr::InvalidUrl(error) + }); } + + if self.is_bloxroute { + // It's a bloxroute endpoint and we are not using rProxy, restrict to main relay region. + headers.insert(BLOXROUTE_SHARE_HEADER, HeaderValue::from_static("na")); + } + Ok(self.url.clone()) } fn add_auth_headers(&self, headers: &mut HeaderMap) -> eyre::Result<()> { @@ -950,6 +959,44 @@ impl RelayClient { } } +async fn map_response(response: Response) -> Result<(), SubmitBlockErr> { + let status = response.status(); + if status == StatusCode::TOO_MANY_REQUESTS { + return Err(RelayError::TooManyRequests.into()); + } + if status == StatusCode::GATEWAY_TIMEOUT { + return Err(RelayError::ConnectionError.into()); + } + + let bytes = response + .bytes() + .await + .map_err(|err| RelayError::RequestError(err.into()))?; + + if status == StatusCode::OK && bytes.is_empty() { + return Ok(()); + } + + match serde_json::from_slice::>(&bytes) { + Ok(RelayResponse::Ok(_)) => Ok(()), + Ok(RelayResponse::Error(error)) => Err(map_relay_error_message(&error.message, error.code)), + Err(_) => { + // bloxroute returns empty response in this format which we handle here because its not valid + // jsonrpc response + let data = String::from_utf8_lossy(&bytes).to_string(); + if data.trim() == "{}" { + return Ok(()); + } + + if is_ignorable_relay_error(status, &data) { + Ok(()) + } else { + Err(RelayError::UnknownRelayError(status, data).into()) + } + } + } +} + fn map_relay_error_message(msg: &str, code: Option) -> SubmitBlockErr { match msg { "payload attributes not (yet) known" => SubmitBlockErr::PayloadAttributesNotKnown, @@ -1202,7 +1249,9 @@ mod tests { let relay_url = Url::from_str(&srv.endpoint()).unwrap(); let relay = RelayClient::from_url(relay_url, None, None, None, false, Vec::new(), false, false); - let submission = SubmitBlockRequest::Deneb(generator.create_deneb_submit_block_request()); + let submission = Arc::new(SubmitBlockRequest::Deneb( + generator.create_deneb_submit_block_request(), + )); let sub_relay = SubmitBlockRequestWithMetadata { submission, metadata: BidMetadata { diff --git a/crates/rbuilder/src/mev_boost/optimistic_v3.rs b/crates/rbuilder/src/mev_boost/optimistic_v3.rs new file mode 100644 index 000000000..d9e8a71bb --- /dev/null +++ b/crates/rbuilder/src/mev_boost/optimistic_v3.rs @@ -0,0 +1,179 @@ +use crate::telemetry::REGISTRY; +use alloy_primitives::{bytes::Bytes, B256}; +use alloy_rpc_types_beacon::BlsPublicKey; +use ctor::ctor; +use futures::StreamExt as _; +use lazy_static::lazy_static; +use metrics_macros::register_metrics; +use parking_lot::Mutex; +use prometheus::IntCounter; +use rbuilder_primitives::mev_boost::{ + verify_signed_relay_request, SignedGetPayloadV3, SubmitBlockRequest, +}; +use schnellru::{ByLength, LruMap}; +use ssz::{Decode as _, Encode}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; +use tracing::*; +use warp::{ + http::{header::CONTENT_TYPE, HeaderValue, StatusCode}, + Filter, +}; + +register_metrics! { + pub static REQUESTS_TOTAL: IntCounter = IntCounter::new("relay_server_requests", "The total number of requests on the optimistic V3 relay server").unwrap(); + pub static BAD_REQUESTS_TOTAL: IntCounter = IntCounter::new("relay_server_bad_requests", "The total number of bad requests on the optimistic V3 relay server").unwrap(); + pub static UNKNOWN_PUBKEY_TOTAL: IntCounter = IntCounter::new("relay_server_unknown_pubkey", "The total number of unknown pubkey errors on the optimistic V3 relay server").unwrap(); + pub static INVALID_SIGNATURE_TOTAL: IntCounter = IntCounter::new("relay_server_invalid_signature", "The total number of invalid signature errors on the optimistic V3 relay server").unwrap(); + pub static BLOCK_NOT_FOUND_TOTAL: IntCounter = IntCounter::new("relay_server_block_not_found", "The total number of block not found errors on the optimistic V3 relay server").unwrap(); +} + +/// The channel buffer size for optimistic V3 broadcast channel +pub const OPTIMISTIC_V3_CHANNEL_SIZE: usize = 100; + +/// The default number of blocks to keep in cache. With bid update latency of 1ms this would preserve the last second worth of submitted blocks. +pub const OPTIMISTIC_V3_CACHE_SIZE_DEFAULT: u32 = 1_000; + +/// The content length limit for the incoming relay requests. The actual raw data should fit in roughly +/// 96 (signature) + 32 (block hash) + 8 (timestamp) + 48 (pubkey) bytes plus the encoding overhead. +pub const OPTIMISTIC_V3_SERVER_CONTENT_LENGTH_LIMIT: u64 = 1_024; + +/// Endpoint for returning for block payloads to the relays. +/// Reference: +pub const GET_PAYLOAD_V3: &str = "get_payload_v3"; + +/// Initialize the HTTP server. +pub fn spawn_server( + address: impl Into, + domain: B256, + relay_pubkeys: HashSet, + bid_stream: BroadcastStream>, +) -> eyre::Result<()> { + let blocks = Arc::new(Mutex::new(LruMap::new(ByLength::new( + OPTIMISTIC_V3_CACHE_SIZE_DEFAULT, + )))); + + // Spawn block cache maintenance task. + tokio::spawn(Box::pin({ + let blocks = blocks.clone(); + async move { maintain_block_cache(bid_stream, blocks).await } + })); + + // Spawn relay server. + let handler = Handler { + domain, + relay_pubkeys, + blocks, + }; + let address = address.into(); + let path = warp::path(GET_PAYLOAD_V3) + .and(warp::post()) + .and(warp::any().map(move || handler.clone())) + .and(warp::header::("content-type")) + .and(warp::body::content_length_limit( + OPTIMISTIC_V3_SERVER_CONTENT_LENGTH_LIMIT, + )) + .and(warp::body::bytes()) + .map(Handler::get_payload_v3); + tokio::spawn(warp::serve(path).run(address)); + info!(target: "relay_server", %address, "Relay server listening"); + + Ok(()) +} + +#[derive(Clone, Debug)] +struct Handler { + domain: B256, + relay_pubkeys: HashSet, + blocks: Arc>>>, +} + +impl Handler { + fn get_payload_v3( + self, + content_type: String, + bytes: Bytes, + ) -> Result { + REQUESTS_TOTAL.inc(); + + let mut is_json = false; + let request: SignedGetPayloadV3 = if content_type == "application/json" { + is_json = true; + serde_json::from_slice(&bytes).map_err(|error| { + error!(target: "relay_server", ?error, "error parsing json request"); + BAD_REQUESTS_TOTAL.inc(); + StatusCode::BAD_REQUEST + })? + } else if content_type == "application/octet-stream" { + SignedGetPayloadV3::from_ssz_bytes(&bytes).map_err(|error| { + error!(target: "relay_server", ?error, "error parsing ssz request"); + BAD_REQUESTS_TOTAL.inc(); + StatusCode::BAD_REQUEST + })? + } else { + error!(target: "relay_server", %content_type, "invalid content type"); + BAD_REQUESTS_TOTAL.inc(); + return Err(StatusCode::BAD_REQUEST); + }; + + let relay_pubkey = request.message.relay_public_key; + if !self.relay_pubkeys.contains(&relay_pubkey) { + UNKNOWN_PUBKEY_TOTAL.inc(); + debug!(target: "relay_server", %relay_pubkey, "unknown relay pubkey"); + return Err(StatusCode::UNAUTHORIZED); + } + + if let Err(error) = verify_signed_relay_request(&request, self.domain) { + INVALID_SIGNATURE_TOTAL.inc(); + debug!(target: "relay_server", %relay_pubkey, ?error, "error verifying request signature"); + return Err(StatusCode::UNAUTHORIZED); + } + + let block_hash = request.message.block_hash; + let block = { + let mut blocks = self.blocks.lock(); + blocks.get(&block_hash).cloned().ok_or_else(|| { + debug!(target: "relay_server", %relay_pubkey, %block_hash, "block not found"); + BLOCK_NOT_FOUND_TOTAL.inc(); + StatusCode::NOT_FOUND + })? + }; + + let (body, content_ty) = if is_json { + let json = serde_json::to_vec(&block).map_err(|error| { + error!(target: "relay_server", %relay_pubkey, %block_hash, ?error, "error serializing the block"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + (json, "application/json") + } else { + let ssz = block.as_ssz_bytes(); + (ssz, "application/octet-stream") + }; + + let mut res = warp::http::Response::new(body.into()); + res.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static(content_ty)); + Ok(res) + } +} + +async fn maintain_block_cache( + mut bid_stream: BroadcastStream>, + blocks: Arc>>>, +) { + loop { + match bid_stream.next().await { + Some(Ok(block)) => { + let block_hash = block.bid_trace().block_hash; + blocks.lock().insert(block_hash, block); + trace!(target: "relay_server", %block_hash, "Block added to the relay server cache") + } + Some(Err(BroadcastStreamRecvError::Lagged(lag))) => { + error!(target: "relay_server", lag, "Block stream lagging behind"); + } + None => { + error!(target: "relay_server", "Block stream closed"); + } + } + } +} diff --git a/crates/rbuilder/src/mev_boost/sign_payload.rs b/crates/rbuilder/src/mev_boost/sign_payload.rs index de1c82e06..fcdff6346 100644 --- a/crates/rbuilder/src/mev_boost/sign_payload.rs +++ b/crates/rbuilder/src/mev_boost/sign_payload.rs @@ -33,7 +33,6 @@ impl BLSBlockSigner { pub fn sign_payload(&self, bid_trace: &BidTrace) -> eyre::Result> { // We use RPCBidTrace not because of it's RPC nature but because it's also Merkleized let bid_trace = marshal_bid_trace(bid_trace); - let signature = sign_with_domain(&bid_trace, &self.sec, *self.domain)?; Ok(signature.to_vec()) } diff --git a/crates/rbuilder/src/utils/receipts.rs b/crates/rbuilder/src/utils/receipts.rs index 5704f57dd..6ec58b3bb 100644 --- a/crates/rbuilder/src/utils/receipts.rs +++ b/crates/rbuilder/src/utils/receipts.rs @@ -26,6 +26,8 @@ pub struct ReceiptsData { pub receipts_root: B256, /// Logs bloom for the block. pub logs_bloom: Bloom, + /// Logs bloom for the block before the last payment transaction. + pub pre_payment_logs_bloom: Bloom, /// Merkle proof of the last receipt. /// Used for bid adjustments. pub placeholder_receipt_proof: Vec, @@ -67,6 +69,7 @@ pub fn calculate_receipts_data( cache.receipts_bloom_without_last_tx = receipts_blooms.clone(); } + let pre_payment_logs_bloom = block_logs_bloom; { let executed_tx_info = executed_tx_infos.last().unwrap(); let receipt = &executed_tx_info.receipt; @@ -107,6 +110,7 @@ pub fn calculate_receipts_data( ReceiptsData { logs_bloom: block_logs_bloom, + pre_payment_logs_bloom, receipts_root, placeholder_receipt_proof, }