diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index bd860f155..dce85c189 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -157,6 +157,14 @@ pub struct FlashblocksArgs { )] pub flashblocks_calculate_state_root: bool, + /// Should we calculate state root for each flashblock + #[arg( + long = "flashblocks.enable-continuous-building", + default_value = "true", + env = "FLASHBLOCKS_ENABLE_CONTINUOUS_BUILDING" + )] + pub flashblocks_enable_continuous_building: bool, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 032e97e69..646419ee0 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,3 +1,4 @@ +use alloy_consensus::transaction::TxHashRef; use alloy_consensus::{Eip658Value, Transaction, conditional::BlockConditionalAttributes}; use alloy_eips::Typed2718; use alloy_evm::Database; @@ -326,10 +327,11 @@ impl OpPayloadBuilderCtx { Ok(info) } - /// Executes the given best transactions and updates the execution info. + /// Simulates the given best transactions. + /// The simulation updates the execution info and commit changes to the db /// /// Returns `Ok(Some(())` if the job was cancelled. - pub(super) fn execute_best_transactions( + pub(super) fn simulate_best_transactions( &self, info: &mut ExecutionInfo, db: &mut State, diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index f2dca7759..3065fa3f8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -34,6 +34,9 @@ pub struct FlashblocksConfig { /// Should we calculate state root for each flashblock pub calculate_state_root: bool, + /// Whether to enable continuous flashblock building or not + pub enable_continuous_building: bool, + /// The address of the flashblocks number contract. /// /// If set a builder tx will be added to the start of every flashblock instead of the regular builder tx. @@ -48,6 +51,7 @@ impl Default for FlashblocksConfig { leeway_time: Duration::from_millis(50), fixed: false, calculate_state_root: true, + enable_continuous_building: true, flashblocks_number_contract_address: None, } } @@ -70,6 +74,8 @@ impl TryFrom for FlashblocksConfig { let calculate_state_root = args.flashblocks.flashblocks_calculate_state_root; + let enable_continuous_building = args.flashblocks.flashblocks_enable_continuous_building; + let flashblocks_number_contract_address = args.flashblocks.flashblocks_number_contract_address; @@ -79,6 +85,7 @@ impl TryFrom for FlashblocksConfig { leeway_time, fixed, calculate_state_root, + enable_continuous_building, flashblocks_number_contract_address, }) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e350260bd..fa9d5e844 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -36,10 +36,11 @@ use reth_provider::{ ExecutionOutcome, HashedPostStateProvider, ProviderError, StateRootProvider, StorageRootProvider, }; +use reth_revm::db::CacheState; use reth_revm::{ State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, }; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; use rollup_boost::{ @@ -210,7 +211,7 @@ where impl OpPayloadBuilder where Pool: PoolBounds, - Client: ClientBounds, + Client: ClientBounds + 'static, BuilderTx: BuilderTransactions + Send + Sync, { fn get_op_payload_builder_ctx( @@ -302,6 +303,9 @@ where let timestamp = config.attributes.timestamp(); let calculate_state_root = self.config.specific.calculate_state_root; + //let enable_continuous_building = self.config.specific.enable_continuous_building; + let enable_continuous_building = true; + let ctx = self .get_op_payload_builder_ctx( config.clone(), @@ -334,19 +338,16 @@ where let builder_txs = if ctx.attributes().no_tx_pool { vec![] } else { - match self.builder_tx.add_builder_txs( + self.builder_tx.add_builder_txs( &state_provider, &mut info, &ctx, &mut state, false, - ) { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); - vec![] - } - } + ).unwrap_or_else(|e| { + error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); + vec![] + }) }; // We subtract gas limit and da limit for builder transaction from the whole limit @@ -370,16 +371,6 @@ where ); // not emitting flashblock if no_tx_pool in FCU, it's just syncing - if !ctx.attributes().no_tx_pool { - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - } - if ctx.attributes().no_tx_pool { info!( target: "payload_builder", @@ -402,7 +393,17 @@ where // return early since we don't need to build a block with transactions from the pool return Ok(()); + } else { + // Publish base flashblock + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .map_err(PayloadBuilderError::other)?; + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); } + // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable let (flashblocks_per_block, first_flashblock_offset) = self.calculate_flashblocks(timestamp); @@ -465,6 +466,7 @@ where let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + // Spawn a task to orchestrate flashblock building jobs per interval tokio::spawn({ let block_cancel = block_cancel.clone(); @@ -478,12 +480,13 @@ where loop { tokio::select! { + // end of interval reached, conclude current flashblock building and start next one _ = timer.tick() => { - // cancel current payload building job + // cancel current flashblock building job fb_cancel.cancel(); fb_cancel = block_cancel.child_token(); - // this will tick at first_flashblock_offset, - // starting the second flashblock + // this will tick at first_flashblock_offset + k * interval, + // starting the next flashblock (k+1) if tx.send(fb_cancel.clone()).await.is_err() { // receiver channel was dropped, return. // this will only happen if the `build_payload` function returns, @@ -524,40 +527,76 @@ where return Ok(()); } - // build first flashblock immediately - let next_flashblocks_ctx = match self.build_next_flashblock( - &mut ctx, - &mut info, - &mut state, - &state_provider, - &mut best_txs, - &block_cancel, - &best_payload, - &fb_span, - ) { - Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, - Ok(None) => { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, job cancelled or target flashblock count reached", - ); - return Ok(()); + // build next flashblock immediately + let next_flashblocks_ctx = if enable_continuous_building { + match self.build_next_flashblock_continuous( + &mut ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } - Err(err) => { - error!( - target: "payload_builder", - "Failed to build flashblock {} for block number {}: {}", - ctx.flashblock_index(), - ctx.block_number(), - err - ); - return Err(PayloadBuilderError::Other(err.into())); + } else { + match self.build_next_flashblock( + &mut ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } }; + // Wait for next flashblock building job to start, or end of main block building job tokio::select! { Some(fb_cancel) = rx.recv() => { ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); @@ -608,17 +647,13 @@ where ); let flashblock_build_start_time = Instant::now(); - let builder_txs = - match self - .builder_tx - .add_builder_txs(&state_provider, info, ctx, state, true) - { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error simulating builder txs: {}", e); - vec![] - } - }; + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); let builder_tx_gas = builder_txs.iter().fold(0, |acc, tx| acc + tx.gas_used); let builder_tx_da_size: u64 = builder_txs.iter().fold(0, |acc, tx| acc + tx.da_size); @@ -646,7 +681,7 @@ where .set(transaction_pool_fetch_time); let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( + ctx.simulate_best_transactions( info, state, best_txs, @@ -714,7 +749,7 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( @@ -776,6 +811,221 @@ where } } + /// Takes the current best flashblock candidate, execute new transaction and build a new candidate only if it's better + /// No state is mutated, but it can be applied later by replacing `state.cache` with the returned `CacheState`. + #[expect(clippy::too_many_arguments)] + fn refresh_best_flashblock_candidate< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + best: Option<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + CacheState, + )>, + ctx: &OpPayloadBuilderCtx, + info: &ExecutionInfo, + state: &State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + target_gas_for_batch: u64, + target_da_for_batch: Option, + ) -> eyre::Result<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + CacheState, + )> { + info!("[continuous] building new candidate"); + // Clone execution info + let mut batch_info = info.clone(); + + // create simulation state + let mut simulation_state = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(state.cache.clone()) + .with_bundle_update() + .build(); + + // Refresh pool txs + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()) + .without_updates(), + ), + ctx.flashblock_index(), + ); + + ctx.simulate_best_transactions( + &mut batch_info, + &mut simulation_state, + best_txs, + target_gas_for_batch.min(ctx.block_gas_limit()), + target_da_for_batch, + ) + .wrap_err("failed to execute best transactions")?; + + // Add bottom of block builder txs + if let Err(e) = self.builder_tx.add_builder_txs( + &state_provider, + &mut batch_info, + ctx, + &mut simulation_state, + false, + ) { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + }; + + // Check if we can build a better block by comparing execution results + let is_better_candidate = |prev: &ExecutionInfo<_>, new: &ExecutionInfo<_>| { + new.cumulative_gas_used > prev.cumulative_gas_used + }; + if best + .as_ref() + .is_some_and(|(_, _, prev, _)| !is_better_candidate(prev, &batch_info)) + { + // Not better, nothing to refresh so we can return early + return Ok(best.expect("safe: best matched Some")); + } + + info!("[continuous] new best candidate found!"); + + // build block and return new best + build_block( + &mut simulation_state, + ctx, + &mut batch_info, + ctx.extra_ctx.calculate_state_root || ctx.attributes().no_tx_pool, + ) + .map(|(payload, mut fb)| { + fb.index = ctx.flashblock_index(); + fb.base = None; + + (payload, fb, batch_info, simulation_state.cache) + }) + .wrap_err("failed to build payload") + } + + #[expect(clippy::too_many_arguments)] + fn build_next_flashblock_continuous< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &mut OpPayloadBuilderCtx, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + best_payload: &BlockCell, + _span: &tracing::Span, + ) -> eyre::Result> { + // 1. --- Prepare shared context --- + let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; + let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + + // top of block builder txs is shared across all candidates + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); + + let builder_tx_gas = builder_txs.iter().map(|t| t.gas_used).sum(); + let builder_tx_da_size = builder_txs.iter().map(|t| t.da_size).sum(); + target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size + if let Some(da_limit) = target_da_for_batch.as_mut() { + *da_limit = da_limit.saturating_sub(builder_tx_da_size); + } + + // 2. --- Build candidates and update best --- + let mut best: Option<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + CacheState, + )> = None; + + loop { + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload + // To ensure that we will return same blocks as rollup-boost (to leverage caches) + if block_cancel.is_cancelled() { + info!("[continuous] block cancelled"); + return Ok(None); + } + // interval end: abort worker and publish current best immediately (below) + if ctx.cancel.is_cancelled() { + break; + } + + // Build one candidate (blocking here) + best = Some(self.refresh_best_flashblock_candidate( + best, + &*ctx, + &*info, + state, + &state_provider, + best_txs, + target_gas_for_batch, + target_da_for_batch, + )?); + } + + info!("[continuous] interval finished, sending best"); + + // 3. --- Cancellation token received, send best --- + let (payload, fb_payload, execution_info, cache_state) = best.unwrap(); + + // Apply state mutations from best + state.cache = cache_state; + + // Mark selected transactions as commited + let batch_new_transactions = execution_info.executed_transactions + [execution_info.extra.last_flashblock_index..] + .to_vec() + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + // warn: it also marks the top of blocks builder_txs + best_txs.mark_commited(batch_new_transactions); + + // update execution info + *info = execution_info; + + // Send payloads + let _flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.send_payload_to_engine(payload.clone()); + best_payload.set(payload); + + // Update bundle_state for next iteration + if let Some(da_limit) = ctx.extra_ctx.da_per_batch { + if let Some(da) = target_da_for_batch.as_mut() { + *da += da_limit; + } else { + error!( + "Builder end up in faulty invariant, if da_per_batch is set then total_da_per_batch must be set" + ); + } + } + + let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; + let next_extra_ctx = ctx + .extra_ctx + .clone() + .next(target_gas_for_batch, target_da_for_batch); + Ok(Some(next_extra_ctx)) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, @@ -894,7 +1144,7 @@ where impl PayloadBuilder for OpPayloadBuilder where Pool: PoolBounds, - Client: ClientBounds, + Client: ClientBounds + 'static, BuilderTx: BuilderTransactions + Clone + Send + Sync, { diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index 66e0adda4..2a344acef 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -388,7 +388,7 @@ impl OpBuilder<'_, Txs> { .set(transaction_pool_fetch_time); if ctx - .execute_best_transactions( + .simulate_best_transactions( &mut info, db, &mut best_txs, diff --git a/crates/op-rbuilder/src/primitives/reth/execution.rs b/crates/op-rbuilder/src/primitives/reth/execution.rs index b2591c212..eedaab670 100644 --- a/crates/op-rbuilder/src/primitives/reth/execution.rs +++ b/crates/op-rbuilder/src/primitives/reth/execution.rs @@ -24,7 +24,7 @@ pub enum TxnExecutionResult { MaxGasUsageExceeded, } -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct ExecutionInfo { /// All executed transactions (unrecovered). pub executed_transactions: Vec,