diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index b0b90f84..90025b11 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -157,6 +157,14 @@ pub struct FlashblocksArgs { )] pub flashblocks_calculate_state_root: bool, + /// Should we calculate state root for each flashblock + #[arg( + long = "flashblocks.enable-continuous-building", + default_value = "true", + env = "FLASHBLOCKS_ENABLE_CONTINUOUS_BUILDING" + )] + pub flashblocks_enable_continuous_building: bool, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 54894d28..d7757809 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,4 +1,6 @@ -use alloy_consensus::{Eip658Value, Transaction, conditional::BlockConditionalAttributes}; +use alloy_consensus::{ + Eip658Value, Transaction, conditional::BlockConditionalAttributes, transaction::TxHashRef, +}; use alloy_eips::Typed2718; use alloy_evm::Database; use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; @@ -326,10 +328,11 @@ impl OpPayloadBuilderCtx { Ok(info) } - /// Executes the given best transactions and updates the execution info. + /// Simulates the given best transactions. + /// The simulation updates the execution info and commit changes to the db /// /// Returns `Ok(Some(())` if the job was cancelled. - pub(super) fn execute_best_transactions( + pub(super) fn simulate_best_transactions( &self, info: &mut ExecutionInfo, db: &mut State, @@ -375,8 +378,7 @@ impl OpPayloadBuilderCtx { // Note that we need to use the Option to signal whether the transaction comes from a bundle, // otherwise, we would exclude all transactions that are not in the reverted hashes. let is_bundle_tx = reverted_hashes.is_some(); - let exclude_reverting_txs = - is_bundle_tx && !reverted_hashes.unwrap().contains(&tx_hash); + let exclude_reverting_txs = is_bundle_tx && !reverted_hashes.unwrap().contains(tx_hash); let log_txn = |result: TxnExecutionResult| { debug!( diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index a3345edb..c2c0aab1 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -34,6 +34,9 @@ pub struct FlashblocksConfig { /// Should we calculate state root for each flashblock pub calculate_state_root: bool, + /// Whether to enable continuous flashblock building or not + pub enable_continuous_building: bool, + /// The address of the flashblocks number contract. /// /// If set a builder tx will be added to the start of every flashblock instead of the regular builder tx. @@ -63,6 +66,7 @@ impl Default for FlashblocksConfig { leeway_time: Duration::from_millis(50), fixed: false, calculate_state_root: true, + enable_continuous_building: true, flashblocks_number_contract_address: None, p2p_enabled: false, p2p_port: 9009, @@ -90,6 +94,8 @@ impl TryFrom for FlashblocksConfig { let calculate_state_root = args.flashblocks.flashblocks_calculate_state_root; + let enable_continuous_building = args.flashblocks.flashblocks_enable_continuous_building; + let flashblocks_number_contract_address = args.flashblocks.flashblocks_number_contract_address; @@ -99,6 +105,7 @@ impl TryFrom for FlashblocksConfig { leeway_time, fixed, calculate_state_root, + enable_continuous_building, flashblocks_number_contract_address, p2p_enabled: args.flashblocks.p2p.p2p_enabled, p2p_port: args.flashblocks.p2p.p2p_port, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 89f19210..3946c4a3 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -36,9 +36,11 @@ use reth_provider::{ StorageRootProvider, }; use reth_revm::{ - State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, + State, + database::StateProviderDatabase, + db::{CacheState, TransitionState, states::bundle_state::BundleRetention}, }; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; use rollup_boost::{ @@ -102,6 +104,14 @@ impl FlashblocksExtraCtx { } } +type FlashblockCandidate = ( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + CacheState, + Option, +); + impl OpPayloadBuilderCtx { /// Returns the current flashblock index pub(crate) fn flashblock_index(&self) -> u64 { @@ -299,6 +309,8 @@ where let timestamp = config.attributes.timestamp(); let calculate_state_root = self.config.specific.calculate_state_root; + let enable_continuous_building = self.config.specific.enable_continuous_building; + let ctx = self .get_op_payload_builder_ctx( config.clone(), @@ -331,19 +343,16 @@ where let builder_txs = if ctx.attributes().no_tx_pool { vec![] } else { - match self.builder_tx.add_builder_txs( + self.builder_tx.add_builder_txs( &state_provider, &mut info, &ctx, &mut state, false, - ) { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); - vec![] - } - } + ).unwrap_or_else(|e| { + error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); + vec![] + }) }; // We subtract gas limit and da limit for builder transaction from the whole limit @@ -370,16 +379,6 @@ where ); // not emitting flashblock if no_tx_pool in FCU, it's just syncing - if !ctx.attributes().no_tx_pool { - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - } - if ctx.attributes().no_tx_pool { info!( target: "payload_builder", @@ -402,7 +401,17 @@ where // return early since we don't need to build a block with transactions from the pool return Ok(()); + } else { + // Publish base flashblock + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .map_err(PayloadBuilderError::other)?; + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); } + // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable let (flashblocks_per_block, first_flashblock_offset) = self.calculate_flashblocks(timestamp); @@ -465,6 +474,7 @@ where let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + // Spawn a task to orchestrate flashblock building jobs per interval tokio::spawn({ let block_cancel = block_cancel.clone(); @@ -478,12 +488,13 @@ where loop { tokio::select! { + // end of interval reached, conclude current flashblock building and start next one _ = timer.tick() => { - // cancel current payload building job + // cancel current flashblock building job fb_cancel.cancel(); fb_cancel = block_cancel.child_token(); - // this will tick at first_flashblock_offset, - // starting the second flashblock + // this will tick at first_flashblock_offset + k * interval, + // starting the next flashblock (k+1) if tx.send(fb_cancel.clone()).await.is_err() { // receiver channel was dropped, return. // this will only happen if the `build_payload` function returns, @@ -524,43 +535,82 @@ where return Ok(()); } - // build first flashblock immediately - let next_flashblocks_ctx = match self - .build_next_flashblock( - &ctx, - &mut info, - &mut state, - &state_provider, - &mut best_txs, - &block_cancel, - &best_payload, - &fb_span, - ) - .await - { - Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, - Ok(None) => { - self.record_flashblocks_metrics( + // build next flashblock immediately + let next_flashblocks_ctx = if enable_continuous_building { + match self + .build_next_flashblock_continuous( &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, job cancelled or target flashblock count reached", - ); - return Ok(()); + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) + .await + { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } - Err(err) => { - error!( - target: "payload_builder", - "Failed to build flashblock {} for block number {}: {}", - ctx.flashblock_index(), - ctx.block_number(), - err - ); - return Err(PayloadBuilderError::Other(err.into())); + } else { + match self + .build_next_flashblock( + &ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) + .await + { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } }; + // Wait for next flashblock building job to start, or end of main block building job tokio::select! { Some(fb_cancel) = rx.recv() => { ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); @@ -611,17 +661,13 @@ where ); let flashblock_build_start_time = Instant::now(); - let builder_txs = - match self - .builder_tx - .add_builder_txs(&state_provider, info, ctx, state, true) - { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error simulating builder txs: {}", e); - vec![] - } - }; + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); let builder_tx_gas = builder_txs.iter().fold(0, |acc, tx| acc + tx.gas_used); let builder_tx_da_size: u64 = builder_txs.iter().fold(0, |acc, tx| acc + tx.da_size); @@ -649,7 +695,7 @@ where .set(transaction_pool_fetch_time); let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( + ctx.simulate_best_transactions( info, state, best_txs, @@ -717,7 +763,7 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( @@ -782,6 +828,296 @@ where } } + /// Takes the current best flashblock candidate, execute new transaction and build a new candidate only if it's better. + /// No state is mutated, but it can be applied later by replacing `state.cache` and `state.transition_state` with the returned `CacheState` and `Option`. + /// The `best_txns` iterator is updated with no updates, it needs to be refreshed again to take into account new mempool transactions. + /// + /// When a new best is found: return `Ok(Some(best))` + /// Else: return the `current` with `Ok(current)` + #[expect(clippy::too_many_arguments)] + fn refresh_best_flashblock_candidate< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + current: Option, + ctx: &OpPayloadBuilderCtx, + info: &ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + target_gas_for_batch: u64, + target_da_for_batch: Option, + ) -> eyre::Result> { + // create simulation info and simulation state with same cache and transition state as current state + let mut simulation_info = info.clone(); + let simulation_cache = state.cache.clone(); + let simulation_transition_state = state.transition_state.clone(); + let mut simulation_state = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(simulation_cache) + .with_bundle_update() + .build(); + simulation_state.transition_state = simulation_transition_state; + + // Refresh pool txs + let best_txs_start_time = Instant::now(); + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()) + .without_updates(), + ), + ctx.flashblock_index(), + ); + let transaction_pool_fetch_time = best_txs_start_time.elapsed(); + ctx.metrics + .transaction_pool_fetch_duration + .record(transaction_pool_fetch_time); + ctx.metrics + .transaction_pool_fetch_gauge + .set(transaction_pool_fetch_time); + + let tx_execution_start_time = Instant::now(); + ctx.simulate_best_transactions( + &mut simulation_info, + &mut simulation_state, + best_txs, + target_gas_for_batch.min(ctx.block_gas_limit()), + target_da_for_batch, + ) + .wrap_err("failed to execute best transactions")?; + let payload_transaction_simulation_time = tx_execution_start_time.elapsed(); + ctx.metrics + .payload_transaction_simulation_duration + .record(payload_transaction_simulation_time); + ctx.metrics + .payload_transaction_simulation_gauge + .set(payload_transaction_simulation_time); + + // Try early return condition + if block_cancel.is_cancelled() { + return Ok(current); + } + + // Add bottom of block builder txs + if let Err(e) = self.builder_tx.add_builder_txs( + &state_provider, + &mut simulation_info, + ctx, + &mut simulation_state, + false, + ) { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + }; + + // Check if we can build a better block by comparing execution results + let is_better_candidate = |prev: &ExecutionInfo<_>, new: &ExecutionInfo<_>| { + new.cumulative_gas_used > prev.cumulative_gas_used + }; + if current + .as_ref() + .is_some_and(|(_, _, cur, _, _)| !is_better_candidate(cur, &simulation_info)) + { + // Not better, nothing to refresh so we can return early + return Ok(current); + } + + // build block and return new best + let total_block_built_duration = Instant::now(); + let build_result = build_block( + &mut simulation_state, + ctx, + &mut simulation_info, + ctx.extra_ctx.calculate_state_root || ctx.attributes().no_tx_pool, + ); + let total_block_built_duration = total_block_built_duration.elapsed(); + ctx.metrics + .total_block_built_duration + .record(total_block_built_duration); + ctx.metrics + .total_block_built_gauge + .set(total_block_built_duration); + + match build_result { + Err(err) => { + ctx.metrics.invalid_built_blocks_count.increment(1); + Err(err).wrap_err("failed to build payload") + } + Ok((payload, mut fb)) => { + fb.index = ctx.flashblock_index(); + fb.base = None; + + Ok(Some(( + payload, + fb, + simulation_info, + simulation_state.cache, + simulation_state.transition_state, + ))) + } + } + } + + #[expect(clippy::too_many_arguments)] + async fn build_next_flashblock_continuous< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &OpPayloadBuilderCtx, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + best_payload: &BlockCell, + span: &tracing::Span, + ) -> eyre::Result> { + // 1. --- Prepare shared context --- + + let flashblock_index = ctx.flashblock_index(); + let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; + let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + info!( + target: "payload_builder", + block_number = ctx.block_number(), + flashblock_index, + target_gas = target_gas_for_batch, + gas_used = info.cumulative_gas_used, + target_da = target_da_for_batch, + da_used = info.cumulative_da_bytes_used, + block_gas_used = ctx.block_gas_limit(), + "Building flashblock", + ); + let flashblock_build_start_time = Instant::now(); + + // Add top of block builder txns + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); + + let builder_tx_gas = builder_txs.iter().map(|t| t.gas_used).sum(); + let builder_tx_da_size = builder_txs.iter().map(|t| t.da_size).sum(); + target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size + if let Some(da_limit) = target_da_for_batch.as_mut() { + *da_limit = da_limit.saturating_sub(builder_tx_da_size); + } + + let mut best: Option = None; + + // 2. --- Build candidates and update best --- + loop { + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload + // To ensure that we will return same blocks as rollup-boost (to leverage caches) + if block_cancel.is_cancelled() { + self.record_flashblocks_metrics( + ctx, + info, + ctx.target_flashblock_count(), + span, + "Payload building complete, channel closed or job cancelled", + ); + return Ok(None); + } + // interval end: abort worker and publish current best immediately (below) + if ctx.cancel.is_cancelled() { + break; + } + + // Build one candidate + best = self.refresh_best_flashblock_candidate( + best, + ctx, + &*info, + state, + &state_provider, + best_txs, + block_cancel, + target_gas_for_batch, + target_da_for_batch, + )?; + } + + // if we weren't able to build a single best payload before this point + // then we should drop everything + if best.is_none() { + warn!("Didn't build any best candidate"); + return Ok(None); + } + + // 3. --- Cancellation token received, send best --- + let (payload, fb_payload, execution_info, cache_state, transition_state) = + best.expect("we checked best.is_none and returned early"); + + // Apply state mutations from best + state.cache = cache_state; + state.transition_state = transition_state; + + // Send payloads + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.payload_tx + .send(payload.clone()) + .await + .wrap_err("failed to send built payload to handler")?; + best_payload.set(payload); + + // update execution info + *info = execution_info; + + // Mark selected transactions as commited + let batch_new_transactions = info.executed_transactions[info.extra.last_flashblock_index..] + .to_vec() + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + best_txs.mark_commited(batch_new_transactions); + + // Record flashblock build duration + ctx.metrics + .flashblock_build_duration + .record(flashblock_build_start_time.elapsed()); + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); + ctx.metrics + .flashblock_num_tx_histogram + .record(info.executed_transactions.len() as f64); + + // Update context for next iteration + let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; + let target_da_for_batch = ctx + .extra_ctx + .da_per_batch + .zip(ctx.extra_ctx.target_da_for_batch) + .map(|(da_limit, da)| da + da_limit); + + let next_extra_ctx = ctx + .extra_ctx + .clone() + .next(target_gas_for_batch, target_da_for_batch); + + info!( + target: "payload_builder", + message = "Flashblock built", + flashblock_index, + current_gas = info.cumulative_gas_used, + current_da = info.cumulative_da_bytes_used, + target_flashblocks = ctx.target_flashblock_count(), + ); + + Ok(Some(next_extra_ctx)) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index 66e0adda..2a344ace 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -388,7 +388,7 @@ impl OpBuilder<'_, Txs> { .set(transaction_pool_fetch_time); if ctx - .execute_best_transactions( + .simulate_best_transactions( &mut info, db, &mut best_txs, diff --git a/crates/op-rbuilder/src/primitives/reth/execution.rs b/crates/op-rbuilder/src/primitives/reth/execution.rs index b2591c21..eedaab67 100644 --- a/crates/op-rbuilder/src/primitives/reth/execution.rs +++ b/crates/op-rbuilder/src/primitives/reth/execution.rs @@ -24,7 +24,7 @@ pub enum TxnExecutionResult { MaxGasUsageExceeded, } -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct ExecutionInfo { /// All executed transactions (unrecovered). pub executed_transactions: Vec,