Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 177 additions & 25 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_optimism_forks::OpHardforks;
use reth_optimism_node::{OpBuiltPayload, OpPayloadBuilderAttributes};
use reth_optimism_primitives::{OpPrimitives, OpReceipt, OpTransactionSigned};
use reth_payload_primitives::BuiltPayloadExecutedBlock;
use reth_payload_primitives::{BuiltPayload, BuiltPayloadExecutedBlock};
use reth_payload_util::BestPayloadTransactions;
use reth_primitives_traits::RecoveredBlock;
use reth_provider::{
ExecutionOutcome, HashedPostStateProvider, ProviderError, StateRootProvider,
StorageRootProvider,
};
use reth_revm::{
State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention,
State,
database::StateProviderDatabase,
db::{BundleState, states::bundle_state::BundleRetention},
};
use reth_transaction_pool::TransactionPool;
use reth_trie::{HashedPostState, updates::TrieUpdates};
Expand Down Expand Up @@ -145,9 +147,12 @@ pub(super) struct OpPayloadBuilder<Pool, Client, BuilderTx> {
pub pool: Pool,
/// Node client
pub client: Client,
/// Sender for sending built payloads to [`PayloadHandler`],
/// which broadcasts outgoing payloads via p2p.
pub payload_tx: mpsc::Sender<OpBuiltPayload>,
/// Sender for sending built flashblock payloads to [`PayloadHandler`],
/// which broadcasts outgoing flashblock payloads via p2p.
pub built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
/// Sender for sending built full block payloads to [`PayloadHandler`],
/// which updates the engine tree state.
pub built_payload_tx: mpsc::Sender<OpBuiltPayload>,
/// WebSocket publisher for broadcasting flashblocks
/// to all connected subscribers.
pub ws_pub: Arc<WebSocketPublisher>,
Expand All @@ -170,7 +175,8 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
client: Client,
config: BuilderConfig<FlashblocksConfig>,
builder_tx: BuilderTx,
payload_tx: mpsc::Sender<OpBuiltPayload>,
built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
built_payload_tx: mpsc::Sender<OpBuiltPayload>,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
) -> Self {
Expand All @@ -179,7 +185,8 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
evm_config,
pool,
client,
payload_tx,
built_fb_payload_tx,
built_payload_tx,
ws_pub,
config,
metrics,
Expand Down Expand Up @@ -297,7 +304,7 @@ where
async fn build_payload(
&self,
args: BuildArguments<OpPayloadBuilderAttributes<OpTransactionSigned>, OpBuiltPayload>,
best_payload: BlockCell<OpBuiltPayload>,
resolve_payload: BlockCell<OpBuiltPayload>,
) -> Result<(), PayloadBuilderError> {
let block_build_start_time = Instant::now();
let BuildArguments {
Expand Down Expand Up @@ -366,18 +373,14 @@ where
);
};

let (payload, fb_payload) = build_block(
&mut state,
&ctx,
&mut info,
!disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync
)?;

self.payload_tx
.send(payload.clone())
// We should always calculate state root for fallback payload
let (fallback_payload, fb_payload, bundle_state) =
build_block(&mut state, &ctx, &mut info, true)?;
self.built_fb_payload_tx
.send(fallback_payload.clone())
.await
.map_err(PayloadBuilderError::other)?;
best_payload.set(payload);
let mut best_payload = (fallback_payload.clone(), bundle_state);

info!(
target: "payload_builder",
Expand Down Expand Up @@ -529,6 +532,14 @@ where
let _entered = fb_span.enter();

if ctx.flashblock_index() > ctx.target_flashblock_count() {
self.resolve_best_payload(
&mut state,
&ctx,
best_payload,
fallback_payload,
&resolve_payload,
)
.await;
self.record_flashblocks_metrics(
&ctx,
&info,
Expand All @@ -548,13 +559,21 @@ where
&state_provider,
&mut best_txs,
&block_cancel,
&best_payload,
&mut best_payload,
&fb_span,
)
.await
{
Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx,
Ok(None) => {
self.resolve_best_payload(
&mut state,
&ctx,
best_payload,
fallback_payload,
&resolve_payload,
)
.await;
self.record_flashblocks_metrics(
&ctx,
&info,
Expand All @@ -581,6 +600,14 @@ where
ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx);
},
_ = block_cancel.cancelled() => {
self.resolve_best_payload(
&mut state,
&ctx,
best_payload,
fallback_payload,
&resolve_payload,
)
.await;
self.record_flashblocks_metrics(
&ctx,
&info,
Expand All @@ -606,7 +633,7 @@ where
state_provider: impl reth::providers::StateProvider + Clone,
best_txs: &mut NextBestFlashblocksTxs<Pool>,
block_cancel: &CancellationToken,
best_payload: &BlockCell<OpBuiltPayload>,
best_payload: &mut (OpBuiltPayload, BundleState),
span: &tracing::Span,
) -> eyre::Result<Option<FlashblocksExtraCtx>> {
let flashblock_index = ctx.flashblock_index();
Expand Down Expand Up @@ -746,7 +773,7 @@ where
ctx.metrics.invalid_built_blocks_count.increment(1);
Err(err).wrap_err("failed to build payload")
}
Ok((new_payload, mut fb_payload)) => {
Ok((new_payload, mut fb_payload, bundle_state)) => {
fb_payload.index = flashblock_index;
fb_payload.base = None;

Expand All @@ -766,11 +793,11 @@ where
.ws_pub
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;
self.payload_tx
self.built_fb_payload_tx
.send(new_payload.clone())
.await
.wrap_err("failed to send built payload to handler")?;
best_payload.set(new_payload);
*best_payload = (new_payload, bundle_state);

// Record flashblock build duration
ctx.metrics
Expand Down Expand Up @@ -824,6 +851,93 @@ where
}
}

async fn resolve_best_payload<
DB: Database<Error = ProviderError> + std::fmt::Debug + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
>(
&self,
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx<FlashblocksExtraCtx>,
best_payload: (OpBuiltPayload, BundleState),
fallback_payload: OpBuiltPayload,
resolve_payload: &BlockCell<OpBuiltPayload>,
) {
if resolve_payload.get().is_some() {
return;
}

let payload = match best_payload.0.block().header().state_root {
B256::ZERO => {
info!(target: "payload_builder", "Resolving payload with zero state root");
self.resolve_zero_state_root(state, ctx, best_payload)
.await
.unwrap_or_else(|err| {
warn!(
target: "payload_builder",
error = %err,
"Failed to calculate state root, falling back to fallback payload"
);
fallback_payload
})
}
_ => best_payload.0,
};
resolve_payload.set(payload);
}

async fn resolve_zero_state_root<
DB: Database<Error = ProviderError> + std::fmt::Debug + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
>(
&self,
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx<FlashblocksExtraCtx>,
best_payload: (OpBuiltPayload, BundleState),
) -> Result<OpBuiltPayload, PayloadBuilderError> {
let (state_root, trie_updates, hashed_state) =
calculate_state_root_on_resolve(state, ctx, best_payload.1)?;

let payload_id = best_payload.0.id();
let fees = best_payload.0.fees();
let executed_block = best_payload.0.executed_block().ok_or_else(|| {
PayloadBuilderError::Other(
eyre::eyre!("No executed block available in best payload for payload resolution")
.into(),
)
})?;
let block = best_payload.0.into_sealed_block().into_block();
let (mut header, body) = block.split();
header.state_root = state_root;
let updated_block = alloy_consensus::Block::<OpTransactionSigned>::new(header, body);
let recovered_block = RecoveredBlock::new_unhashed(
updated_block.clone(),
executed_block.recovered_block().senders().to_vec(),
);
let sealed_block = Arc::new(updated_block.seal_slow());

let executed = BuiltPayloadExecutedBlock {
recovered_block: Arc::new(recovered_block),
execution_output: executed_block.execution_output.clone(),
hashed_state: Either::Left(Arc::new(hashed_state)),
trie_updates: Either::Left(Arc::new(trie_updates)),
};
let updated_payload = OpBuiltPayload::new(payload_id, sealed_block, fees, Some(executed));
if let Err(e) = self.built_payload_tx.send(updated_payload.clone()).await {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
}
debug!(
target: "payload_builder",
state_root = %state_root,
"Updated payload with calculated state root"
);

Ok(updated_payload)
}

/// Do some logging and metric recording when we stop build flashblocks
fn record_flashblocks_metrics(
&self,
Expand Down Expand Up @@ -972,7 +1086,7 @@ pub(super) fn build_block<DB, P, ExtraCtx>(
ctx: &OpPayloadBuilderCtx<ExtraCtx>,
info: &mut ExecutionInfo<FlashblocksExecutionInfo>,
calculate_state_root: bool,
) -> Result<(OpBuiltPayload, FlashblocksPayloadV1), PayloadBuilderError>
) -> Result<(OpBuiltPayload, FlashblocksPayloadV1, BundleState), PayloadBuilderError>
where
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
Expand Down Expand Up @@ -1190,7 +1304,7 @@ where
};

// We clean bundle and place initial state transaction back
state.take_bundle();
let bundle_state = state.take_bundle();
state.transition_state = untouched_transition_state;

Ok((
Expand All @@ -1201,5 +1315,43 @@ where
Some(executed),
),
fb_payload,
bundle_state,
))
}

/// Calculates only the state root for an existing payload
fn calculate_state_root_on_resolve<DB, P, ExtraCtx>(
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx<ExtraCtx>,
bundle_state: BundleState,
) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError>
where
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
ExtraCtx: std::fmt::Debug + Default,
{
let state_root_start_time = Instant::now();
let state_provider = state.database.as_ref();
let hashed_state = state_provider.hashed_post_state(&bundle_state);
let state_root_updates = state
.database
.as_ref()
.state_root_with_updates(hashed_state.clone())
.inspect_err(|err| {
warn!(target: "payload_builder",
parent_header=%ctx.parent().hash(),
%err,
"failed to calculate state root for payload"
);
})?;

let state_root_calculation_time = state_root_start_time.elapsed();
ctx.metrics
.state_root_calculation_duration
.record(state_root_calculation_time);
ctx.metrics
.state_root_calculation_gauge
.set(state_root_calculation_time);

Ok((state_root_updates.0, state_root_updates.1, hashed_state))
}
Loading
Loading