diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 07f1a008..1e15e7f6 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -30,7 +30,7 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_forks::OpHardforks; use reth_optimism_node::{OpBuiltPayload, OpPayloadBuilderAttributes}; use reth_optimism_primitives::{OpPrimitives, OpReceipt, OpTransactionSigned}; -use reth_payload_primitives::BuiltPayloadExecutedBlock; +use reth_payload_primitives::{BuiltPayload, BuiltPayloadExecutedBlock}; use reth_payload_util::BestPayloadTransactions; use reth_primitives_traits::RecoveredBlock; use reth_provider::{ @@ -38,7 +38,9 @@ use reth_provider::{ StorageRootProvider, }; use reth_revm::{ - State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, + State, + database::StateProviderDatabase, + db::{BundleState, states::bundle_state::BundleRetention}, }; use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, updates::TrieUpdates}; @@ -145,9 +147,12 @@ pub(super) struct OpPayloadBuilder { pub pool: Pool, /// Node client pub client: Client, - /// Sender for sending built payloads to [`PayloadHandler`], - /// which broadcasts outgoing payloads via p2p. - pub payload_tx: mpsc::Sender, + /// Sender for sending built flashblock payloads to [`PayloadHandler`], + /// which broadcasts outgoing flashblock payloads via p2p. + pub built_fb_payload_tx: mpsc::Sender, + /// Sender for sending built full block payloads to [`PayloadHandler`], + /// which updates the engine tree state. + pub built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. pub ws_pub: Arc, @@ -170,7 +175,8 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, + built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, ) -> Self { @@ -179,7 +185,8 @@ impl OpPayloadBuilder { evm_config, pool, client, - payload_tx, + built_fb_payload_tx, + built_payload_tx, ws_pub, config, metrics, @@ -297,7 +304,7 @@ where async fn build_payload( &self, args: BuildArguments, OpBuiltPayload>, - best_payload: BlockCell, + resolve_payload: BlockCell, ) -> Result<(), PayloadBuilderError> { let block_build_start_time = Instant::now(); let BuildArguments { @@ -366,18 +373,14 @@ where ); }; - let (payload, fb_payload) = build_block( - &mut state, - &ctx, - &mut info, - !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync - )?; - - self.payload_tx - .send(payload.clone()) + // We should always calculate state root for fallback payload + let (fallback_payload, fb_payload, bundle_state) = + build_block(&mut state, &ctx, &mut info, true)?; + self.built_fb_payload_tx + .send(fallback_payload.clone()) .await .map_err(PayloadBuilderError::other)?; - best_payload.set(payload); + let mut best_payload = (fallback_payload.clone(), bundle_state); info!( target: "payload_builder", @@ -529,6 +532,14 @@ where let _entered = fb_span.enter(); if ctx.flashblock_index() > ctx.target_flashblock_count() { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -548,13 +559,21 @@ where &state_provider, &mut best_txs, &block_cancel, - &best_payload, + &mut best_payload, &fb_span, ) .await { Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, Ok(None) => { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -581,6 +600,14 @@ where ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); }, _ = block_cancel.cancelled() => { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -606,7 +633,7 @@ where state_provider: impl reth::providers::StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, - best_payload: &BlockCell, + best_payload: &mut (OpBuiltPayload, BundleState), span: &tracing::Span, ) -> eyre::Result> { let flashblock_index = ctx.flashblock_index(); @@ -746,7 +773,7 @@ where ctx.metrics.invalid_built_blocks_count.increment(1); Err(err).wrap_err("failed to build payload") } - Ok((new_payload, mut fb_payload)) => { + Ok((new_payload, mut fb_payload, bundle_state)) => { fb_payload.index = flashblock_index; fb_payload.base = None; @@ -766,11 +793,11 @@ where .ws_pub .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; - self.payload_tx + self.built_fb_payload_tx .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; - best_payload.set(new_payload); + *best_payload = (new_payload, bundle_state); // Record flashblock build duration ctx.metrics @@ -824,6 +851,93 @@ where } } + async fn resolve_best_payload< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + state: &mut State, + ctx: &OpPayloadBuilderCtx, + best_payload: (OpBuiltPayload, BundleState), + fallback_payload: OpBuiltPayload, + resolve_payload: &BlockCell, + ) { + if resolve_payload.get().is_some() { + return; + } + + let payload = match best_payload.0.block().header().state_root { + B256::ZERO => { + info!(target: "payload_builder", "Resolving payload with zero state root"); + self.resolve_zero_state_root(state, ctx, best_payload) + .await + .unwrap_or_else(|err| { + warn!( + target: "payload_builder", + error = %err, + "Failed to calculate state root, falling back to fallback payload" + ); + fallback_payload + }) + } + _ => best_payload.0, + }; + resolve_payload.set(payload); + } + + async fn resolve_zero_state_root< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + state: &mut State, + ctx: &OpPayloadBuilderCtx, + best_payload: (OpBuiltPayload, BundleState), + ) -> Result { + let (state_root, trie_updates, hashed_state) = + calculate_state_root_on_resolve(state, ctx, best_payload.1)?; + + let payload_id = best_payload.0.id(); + let fees = best_payload.0.fees(); + let executed_block = best_payload.0.executed_block().ok_or_else(|| { + PayloadBuilderError::Other( + eyre::eyre!("No executed block available in best payload for payload resolution") + .into(), + ) + })?; + let block = best_payload.0.into_sealed_block().into_block(); + let (mut header, body) = block.split(); + header.state_root = state_root; + let updated_block = alloy_consensus::Block::::new(header, body); + let recovered_block = RecoveredBlock::new_unhashed( + updated_block.clone(), + executed_block.recovered_block().senders().to_vec(), + ); + let sealed_block = Arc::new(updated_block.seal_slow()); + + let executed = BuiltPayloadExecutedBlock { + recovered_block: Arc::new(recovered_block), + execution_output: executed_block.execution_output.clone(), + hashed_state: Either::Left(Arc::new(hashed_state)), + trie_updates: Either::Left(Arc::new(trie_updates)), + }; + let updated_payload = OpBuiltPayload::new(payload_id, sealed_block, fees, Some(executed)); + if let Err(e) = self.built_payload_tx.send(updated_payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } + debug!( + target: "payload_builder", + state_root = %state_root, + "Updated payload with calculated state root" + ); + + Ok(updated_payload) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, @@ -972,7 +1086,7 @@ pub(super) fn build_block( ctx: &OpPayloadBuilderCtx, info: &mut ExecutionInfo, calculate_state_root: bool, -) -> Result<(OpBuiltPayload, FlashblocksPayloadV1), PayloadBuilderError> +) -> Result<(OpBuiltPayload, FlashblocksPayloadV1, BundleState), PayloadBuilderError> where DB: Database + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, @@ -1190,7 +1304,7 @@ where }; // We clean bundle and place initial state transaction back - state.take_bundle(); + let bundle_state = state.take_bundle(); state.transition_state = untouched_transition_state; Ok(( @@ -1201,5 +1315,43 @@ where Some(executed), ), fb_payload, + bundle_state, )) } + +/// Calculates only the state root for an existing payload +fn calculate_state_root_on_resolve( + state: &mut State, + ctx: &OpPayloadBuilderCtx, + bundle_state: BundleState, +) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> +where + DB: Database + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + ExtraCtx: std::fmt::Debug + Default, +{ + let state_root_start_time = Instant::now(); + let state_provider = state.database.as_ref(); + let hashed_state = state_provider.hashed_post_state(&bundle_state); + let state_root_updates = state + .database + .as_ref() + .state_root_with_updates(hashed_state.clone()) + .inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent().hash(), + %err, + "failed to calculate state root for payload" + ); + })?; + + let state_root_calculation_time = state_root_start_time.elapsed(); + ctx.metrics + .state_root_calculation_duration + .record(state_root_calculation_time); + ctx.metrics + .state_root_calculation_gauge + .set(state_root_calculation_time); + + Ok((state_root_updates.0, state_root_updates.1, hashed_state)) +} diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 96b6f683..ed127778 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -29,8 +29,10 @@ use tracing::warn; /// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { - // receives new payloads built by this builder. - built_rx: mpsc::Receiver, + // receives new flashblock payloads built by this builder. + built_fb_payload_rx: mpsc::Receiver, + // receives new full block payloads built by this builder. + built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, // outgoing p2p channel to broadcast new payloads to peers. @@ -50,7 +52,8 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, + built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, @@ -59,7 +62,8 @@ where cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { - built_rx, + built_fb_payload_rx, + built_payload_rx, p2p_rx, p2p_tx, payload_events_handle, @@ -71,7 +75,8 @@ where pub(crate) async fn run(self) { let Self { - mut built_rx, + mut built_fb_payload_rx, + mut built_payload_rx, mut p2p_rx, p2p_tx, payload_events_handle, @@ -84,12 +89,15 @@ where loop { tokio::select! { - Some(payload) = built_rx.recv() => { + Some(payload) = built_fb_payload_rx.recv() => { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; + } + Some(payload) = built_payload_rx.recv() => { + // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(e = ?e, "failed to send BuiltPayload event"); } - // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { @@ -241,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 2c1e684b..568a10b0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -106,6 +106,9 @@ impl FlashblocksServiceBuilder { }; let metrics = Arc::new(OpRBuilderMetrics::default()); + // Channels for built flashblock payloads + let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); + // Channels for built full block payloads let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let ws_pub: Arc = @@ -118,6 +121,7 @@ impl FlashblocksServiceBuilder { ctx.provider().clone(), self.0.clone(), builder_tx, + built_fb_payload_tx, built_payload_tx, ws_pub.clone(), metrics.clone(), @@ -145,6 +149,7 @@ impl FlashblocksServiceBuilder { .wrap_err("failed to create flashblocks payload builder context")?; let payload_handler = PayloadHandler::new( + built_fb_payload_rx, built_payload_rx, incoming_message_rx, outgoing_message_tx,