From 7de44a308d91f143f35261a5befc3dbfb87306df Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Fri, 24 Oct 2025 17:04:36 +0800 Subject: [PATCH 1/8] feat(wip): continuous block building --- crates/op-rbuilder/src/args/op.rs | 8 + .../src/builders/flashblocks/best_txs.rs | 5 + .../src/builders/flashblocks/config.rs | 7 + .../src/builders/flashblocks/payload.rs | 394 +++++++++++++++--- 4 files changed, 345 insertions(+), 69 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index bd860f155..dce85c189 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -157,6 +157,14 @@ pub struct FlashblocksArgs { )] pub flashblocks_calculate_state_root: bool, + /// Should we calculate state root for each flashblock + #[arg( + long = "flashblocks.enable-continuous-building", + default_value = "true", + env = "FLASHBLOCKS_ENABLE_CONTINUOUS_BUILDING" + )] + pub flashblocks_enable_continuous_building: bool, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs index 2f36c96dc..4e9cb39cd 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs @@ -46,6 +46,11 @@ where pub(super) fn mark_commited(&mut self, txs: Vec) { self.commited_transactions.extend(txs); } + + /// Check if a given transaction is commited + pub(super) fn is_commited(&self, tx: &TxHash) -> bool { + self.commited_transactions.contains(tx) + } } impl PayloadTransactions for BestFlashblocksTxs diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index f2dca7759..3065fa3f8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -34,6 +34,9 @@ pub struct FlashblocksConfig { /// Should we calculate state root for each flashblock pub calculate_state_root: bool, + /// Whether to enable continuous flashblock building or not + pub enable_continuous_building: bool, + /// The address of the flashblocks number contract. /// /// If set a builder tx will be added to the start of every flashblock instead of the regular builder tx. @@ -48,6 +51,7 @@ impl Default for FlashblocksConfig { leeway_time: Duration::from_millis(50), fixed: false, calculate_state_root: true, + enable_continuous_building: true, flashblocks_number_contract_address: None, } } @@ -70,6 +74,8 @@ impl TryFrom for FlashblocksConfig { let calculate_state_root = args.flashblocks.flashblocks_calculate_state_root; + let enable_continuous_building = args.flashblocks.flashblocks_enable_continuous_building; + let flashblocks_number_contract_address = args.flashblocks.flashblocks_number_contract_address; @@ -79,6 +85,7 @@ impl TryFrom for FlashblocksConfig { leeway_time, fixed, calculate_state_root, + enable_continuous_building, flashblocks_number_contract_address, }) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e350260bd..faaa8d4c8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -18,7 +18,7 @@ use alloy_consensus::{ use alloy_eips::{Encodable2718, eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE}; use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; use core::time::Duration; -use eyre::WrapErr as _; +use eyre::{ContextCompat, WrapErr as _}; use reth::payload::PayloadBuilderAttributes; use reth_basic_payload_builder::BuildOutcome; use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates}; @@ -39,7 +39,7 @@ use reth_provider::{ use reth_revm::{ State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, }; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{BestTransactions, TransactionPool, ValidPoolTransaction}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; use rollup_boost::{ @@ -207,11 +207,28 @@ where } } +/// The arena in which multiple [`FlashblockCandidate`] are competing for a specific flashblock interval batch. +/// It contains context of the building job, and shared precomputed elements that are required in each candidate. +struct BatchArena { + target_gas_for_batch: u64, + target_da_for_batch: Option, +} + +/// A candidate flashblock payload within a flashblock interval batch building job +struct FlashblockCandidate { + payload: OpBuiltPayload, + fb_payload: FlashblocksPayloadV1, + execution_info: ExecutionInfo, + gas_used: u64, + // todo metrics +} + impl OpPayloadBuilder where Pool: PoolBounds, - Client: ClientBounds, - BuilderTx: BuilderTransactions + Send + Sync, + Client: ClientBounds + 'static, + BuilderTx: + BuilderTransactions + Send + Sync + 'static, { fn get_op_payload_builder_ctx( &self, @@ -302,6 +319,8 @@ where let timestamp = config.attributes.timestamp(); let calculate_state_root = self.config.specific.calculate_state_root; + let enable_continuous_building = self.config.specific.enable_continuous_building; + let ctx = self .get_op_payload_builder_ctx( config.clone(), @@ -334,19 +353,16 @@ where let builder_txs = if ctx.attributes().no_tx_pool { vec![] } else { - match self.builder_tx.add_builder_txs( + self.builder_tx.add_builder_txs( &state_provider, &mut info, &ctx, &mut state, false, - ) { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); - vec![] - } - } + ).unwrap_or_else(|e| { + error!(target: "payload_builder", "Error adding builder txs to fallback block: {}", e); + vec![] + }) }; // We subtract gas limit and da limit for builder transaction from the whole limit @@ -370,16 +386,6 @@ where ); // not emitting flashblock if no_tx_pool in FCU, it's just syncing - if !ctx.attributes().no_tx_pool { - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - } - if ctx.attributes().no_tx_pool { info!( target: "payload_builder", @@ -402,7 +408,17 @@ where // return early since we don't need to build a block with transactions from the pool return Ok(()); + } else { + // Publish base flashblock + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .map_err(PayloadBuilderError::other)?; + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); } + // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable let (flashblocks_per_block, first_flashblock_offset) = self.calculate_flashblocks(timestamp); @@ -465,6 +481,7 @@ where let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + // Spawn a task to orchestrate flashblock building jobs per interval tokio::spawn({ let block_cancel = block_cancel.clone(); @@ -478,12 +495,13 @@ where loop { tokio::select! { + // end of interval reached, conclude current flashblock building and start next one _ = timer.tick() => { - // cancel current payload building job + // cancel current flashblock building job fb_cancel.cancel(); fb_cancel = block_cancel.child_token(); - // this will tick at first_flashblock_offset, - // starting the second flashblock + // this will tick at first_flashblock_offset + k * interval, + // starting the next flashblock (k+1) if tx.send(fb_cancel.clone()).await.is_err() { // receiver channel was dropped, return. // this will only happen if the `build_payload` function returns, @@ -524,40 +542,76 @@ where return Ok(()); } - // build first flashblock immediately - let next_flashblocks_ctx = match self.build_next_flashblock( - &mut ctx, - &mut info, - &mut state, - &state_provider, - &mut best_txs, - &block_cancel, - &best_payload, - &fb_span, - ) { - Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, - Ok(None) => { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, job cancelled or target flashblock count reached", - ); - return Ok(()); + // build next flashblock immediately + let next_flashblocks_ctx = if enable_continuous_building { + match self.build_next_flashblock_continuous( + &mut ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } - Err(err) => { - error!( - target: "payload_builder", - "Failed to build flashblock {} for block number {}: {}", - ctx.flashblock_index(), - ctx.block_number(), - err - ); - return Err(PayloadBuilderError::Other(err.into())); + } else { + match self.build_next_flashblock( + &mut ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } }; + // Wait for next flashblock building job to start, or end of main block building job tokio::select! { Some(fb_cancel) = rx.recv() => { ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); @@ -608,17 +662,13 @@ where ); let flashblock_build_start_time = Instant::now(); - let builder_txs = - match self - .builder_tx - .add_builder_txs(&state_provider, info, ctx, state, true) - { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error simulating builder txs: {}", e); - vec![] - } - }; + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); let builder_tx_gas = builder_txs.iter().fold(0, |acc, tx| acc + tx.gas_used); let builder_tx_da_size: u64 = builder_txs.iter().fold(0, |acc, tx| acc + tx.da_size); @@ -776,6 +826,209 @@ where } } + fn build_candidate< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &OpPayloadBuilderCtx, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &NextBestFlashblocksTxs, + batch_targets: &BatchArena, + ) -> eyre::Result { + // Collect candidate best txs without updates + // This is not using best_txn to be able to bound number of txs to execute and collect them, so we need to remove commited txs + let candidate_best_txs: Vec>> = self + .pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()) + .without_updates() + .filter(|tx| best_txs.is_commited(tx.hash())) + .collect(); + + // Initialize empty execution info + let mut batch_info: ExecutionInfo = + ExecutionInfo::with_capacity(candidate_best_txs.len()); + + let mut candidate_best_txs = BestPayloadTransactions::new(candidate_best_txs.into_iter()); + + ctx.execute_best_transactions( + &mut batch_info, + state, //todo + &mut candidate_best_txs, + batch_targets.target_gas_for_batch, + batch_targets.target_da_for_batch, + ) + .wrap_err("failed to execute best transactions")?; + + // Add bottom of block builder txs + if let Err(e) = self.builder_tx.add_builder_txs( + &state_provider, + &mut batch_info, + ctx, + state, // todo + false, + ) { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + }; + + // build block + let build_result = build_block( + state, // todo + ctx, + &mut batch_info, + ctx.extra_ctx.calculate_state_root || ctx.attributes().no_tx_pool, + ); + build_result + .map(|(payload, mut fb)| { + fb.index = ctx.flashblock_index(); + fb.base = None; + let gas_used = fb.diff.gas_used; + FlashblockCandidate { + payload, + fb_payload: fb, + execution_info: batch_info, + gas_used, + } + }) + .wrap_err("failed to build payload") + } + + #[allow(clippy::too_many_arguments)] + fn build_next_flashblock_continuous< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &mut OpPayloadBuilderCtx, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + best_payload: &BlockCell, + _span: &tracing::Span, + ) -> eyre::Result> { + // 1. --- Prepare batch arena with shared context --- + let flashblock_index = ctx.flashblock_index(); + let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; + let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + + // top of block builder txs is shared across all candidates + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); + + let builder_tx_gas = builder_txs.iter().map(|t| t.gas_used).sum(); + let builder_tx_da_size = builder_txs.iter().map(|t| t.da_size).sum(); + target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size + if let Some(da_limit) = target_da_for_batch.as_mut() { + *da_limit = da_limit.saturating_sub(builder_tx_da_size); + } + + // update the batch best txs with a dynamic iterator + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()), + ), + flashblock_index, + ); + + let batch_targets = BatchArena { + target_gas_for_batch: target_gas_for_batch.min(ctx.block_gas_limit()), + target_da_for_batch, + }; + + let mut best: Option = None; + loop { + // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // To ensure that we will return same blocks as rollup-boost (to leverage caches) + if block_cancel.is_cancelled() { + return Ok(None); + } + // interval end: abort worker and publish current best immediately (below) + if ctx.cancel.is_cancelled() { + break; + } + + // Build one candidate (blocking here) + // todo: would be best to build async and select on candidate_build/block_cancel/fb_cancel + match self.build_candidate(&*ctx, state, &state_provider, &*best_txs, &batch_targets) { + Ok(candidate) => { + if best + .as_ref() + .is_none_or(|b| candidate.gas_used > b.gas_used) + { + best = Some(candidate); + } + } + Err(_) => { + ctx.metrics.invalid_blocks_count.increment(1); + } + } + } + + let mut best = best.wrap_err("No best flashblock payload")?; + + // Directly send payloads + let _flashblock_byte_size = self + .ws_pub + .publish(&best.fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.send_payload_to_engine(best.payload.clone()); + best_payload.set(best.payload); + + // Apply state mutations from best + // todo: update best_txs to take into account candidate_best_txs? + let batch_new_transactions = best + .execution_info + .executed_transactions + .to_vec() + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + // update batch best txs + best_txs.mark_commited(batch_new_transactions); + + // update batch execution info + info.executed_transactions + .append(&mut best.execution_info.executed_transactions); + info.executed_senders + .append(&mut best.execution_info.executed_senders); + info.receipts.append(&mut best.execution_info.receipts); + info.cumulative_gas_used += best.execution_info.cumulative_gas_used; + info.cumulative_da_bytes_used += best.execution_info.cumulative_da_bytes_used; + info.total_fees += best.execution_info.total_fees; + info.extra.last_flashblock_index = best.execution_info.extra.last_flashblock_index; + + // todo state + + // Update bundle_state for next iteration + if let Some(da_limit) = ctx.extra_ctx.da_per_batch { + if let Some(da) = target_da_for_batch.as_mut() { + *da += da_limit; + } else { + error!( + "Builder end up in faulty invariant, if da_per_batch is set then total_da_per_batch must be set" + ); + } + } + + let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; + let next_extra_ctx = ctx + .extra_ctx + .clone() + .next(target_gas_for_batch, target_da_for_batch); + + Ok(Some(next_extra_ctx)) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, @@ -894,9 +1147,12 @@ where impl PayloadBuilder for OpPayloadBuilder where Pool: PoolBounds, - Client: ClientBounds, - BuilderTx: - BuilderTransactions + Clone + Send + Sync, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + + Clone + + Send + + Sync + + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; From f669215e463ac47a56b53482b4cb0d39fba3ad31 Mon Sep 17 00:00:00 2001 From: Julio <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 09:21:21 +0800 Subject: [PATCH 2/8] Apply suggestion from @SozinM Co-authored-by: Solar Mithril --- crates/op-rbuilder/src/builders/flashblocks/payload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index faaa8d4c8..b19ccec02 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -947,7 +947,7 @@ where let mut best: Option = None; loop { - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // If main token got canceled in here that means we received get_payload and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { return Ok(None); From 017718a27069cc373253cc7f675264883517a927 Mon Sep 17 00:00:00 2001 From: Julio <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 09:21:34 +0800 Subject: [PATCH 3/8] Apply suggestion from @SozinM Co-authored-by: Solar Mithril --- crates/op-rbuilder/src/builders/flashblocks/payload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index b19ccec02..8ba5f5f74 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -894,7 +894,7 @@ where .wrap_err("failed to build payload") } - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] fn build_next_flashblock_continuous< DB: Database + std::fmt::Debug + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, From b43522f5bf8480a5b971e3baadce39fb44cd9fa3 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 09:44:27 +0800 Subject: [PATCH 4/8] fix: simplify build_candidate --- .../src/builders/flashblocks/best_txs.rs | 5 - .../src/builders/flashblocks/payload.rs | 129 ++++++++---------- 2 files changed, 55 insertions(+), 79 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs index 4e9cb39cd..2f36c96dc 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs @@ -46,11 +46,6 @@ where pub(super) fn mark_commited(&mut self, txs: Vec) { self.commited_transactions.extend(txs); } - - /// Check if a given transaction is commited - pub(super) fn is_commited(&self, tx: &TxHash) -> bool { - self.commited_transactions.contains(tx) - } } impl PayloadTransactions for BestFlashblocksTxs diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 8ba5f5f74..15110246e 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -39,7 +39,7 @@ use reth_provider::{ use reth_revm::{ State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, }; -use reth_transaction_pool::{BestTransactions, TransactionPool, ValidPoolTransaction}; +use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; use rollup_boost::{ @@ -207,22 +207,6 @@ where } } -/// The arena in which multiple [`FlashblockCandidate`] are competing for a specific flashblock interval batch. -/// It contains context of the building job, and shared precomputed elements that are required in each candidate. -struct BatchArena { - target_gas_for_batch: u64, - target_da_for_batch: Option, -} - -/// A candidate flashblock payload within a flashblock interval batch building job -struct FlashblockCandidate { - payload: OpBuiltPayload, - fb_payload: FlashblocksPayloadV1, - execution_info: ExecutionInfo, - gas_used: u64, - // todo metrics -} - impl OpPayloadBuilder where Pool: PoolBounds, @@ -834,30 +818,33 @@ where ctx: &OpPayloadBuilderCtx, state: &mut State, state_provider: impl reth::providers::StateProvider + Clone, - best_txs: &NextBestFlashblocksTxs, - batch_targets: &BatchArena, - ) -> eyre::Result { - // Collect candidate best txs without updates - // This is not using best_txn to be able to bound number of txs to execute and collect them, so we need to remove commited txs - let candidate_best_txs: Vec>> = self - .pool - .best_transactions_with_attributes(ctx.best_transaction_attributes()) - .without_updates() - .filter(|tx| best_txs.is_commited(tx.hash())) - .collect(); + best_txs: &mut NextBestFlashblocksTxs, + target_gas_for_batch: u64, + target_da_for_batch: Option, + ) -> eyre::Result<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + )> { + // Update iterator + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()) + .without_updates(), + ), + ctx.flashblock_index(), + ); // Initialize empty execution info - let mut batch_info: ExecutionInfo = - ExecutionInfo::with_capacity(candidate_best_txs.len()); - - let mut candidate_best_txs = BestPayloadTransactions::new(candidate_best_txs.into_iter()); + let mut batch_info: ExecutionInfo = ExecutionInfo::default(); ctx.execute_best_transactions( &mut batch_info, state, //todo - &mut candidate_best_txs, - batch_targets.target_gas_for_batch, - batch_targets.target_da_for_batch, + best_txs, + target_gas_for_batch, + target_da_for_batch, ) .wrap_err("failed to execute best transactions")?; @@ -883,13 +870,7 @@ where .map(|(payload, mut fb)| { fb.index = ctx.flashblock_index(); fb.base = None; - let gas_used = fb.diff.gas_used; - FlashblockCandidate { - payload, - fb_payload: fb, - execution_info: batch_info, - gas_used, - } + (payload, fb, batch_info) }) .wrap_err("failed to build payload") } @@ -909,8 +890,7 @@ where best_payload: &BlockCell, _span: &tracing::Span, ) -> eyre::Result> { - // 1. --- Prepare batch arena with shared context --- - let flashblock_index = ctx.flashblock_index(); + // 1. --- Prepare shared context --- let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; @@ -931,21 +911,15 @@ where *da_limit = da_limit.saturating_sub(builder_tx_da_size); } - // update the batch best txs with a dynamic iterator - best_txs.refresh_iterator( - BestPayloadTransactions::new( - self.pool - .best_transactions_with_attributes(ctx.best_transaction_attributes()), - ), - flashblock_index, - ); + let target_gas_for_batch = target_gas_for_batch.min(ctx.block_gas_limit()); - let batch_targets = BatchArena { - target_gas_for_batch: target_gas_for_batch.min(ctx.block_gas_limit()), - target_da_for_batch, - }; + let mut best: Option<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + )> = None; - let mut best: Option = None; + // 2. --- Build candidates and update best --- loop { // If main token got canceled in here that means we received get_payload and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) @@ -959,11 +933,18 @@ where // Build one candidate (blocking here) // todo: would be best to build async and select on candidate_build/block_cancel/fb_cancel - match self.build_candidate(&*ctx, state, &state_provider, &*best_txs, &batch_targets) { + match self.build_candidate( + &*ctx, + state, + &state_provider, + best_txs, + target_gas_for_batch, + target_da_for_batch, + ) { Ok(candidate) => { if best .as_ref() - .is_none_or(|b| candidate.gas_used > b.gas_used) + .is_none_or(|b| candidate.1.diff.gas_used > b.1.diff.gas_used) { best = Some(candidate); } @@ -974,38 +955,38 @@ where } } - let mut best = best.wrap_err("No best flashblock payload")?; + // 3. --- Cancellation token received, send best --- + let (payload, fb_payload, mut execution_info) = + best.wrap_err("No best flashblock payload")?; // Directly send payloads let _flashblock_byte_size = self .ws_pub - .publish(&best.fb_payload) + .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; - self.send_payload_to_engine(best.payload.clone()); - best_payload.set(best.payload); + self.send_payload_to_engine(payload.clone()); + best_payload.set(payload); // Apply state mutations from best - // todo: update best_txs to take into account candidate_best_txs? - let batch_new_transactions = best - .execution_info + let batch_new_transactions = execution_info .executed_transactions .to_vec() .iter() .map(|tx| tx.tx_hash()) .collect::>(); - // update batch best txs + // update best txns best_txs.mark_commited(batch_new_transactions); // update batch execution info info.executed_transactions - .append(&mut best.execution_info.executed_transactions); + .append(&mut execution_info.executed_transactions); info.executed_senders - .append(&mut best.execution_info.executed_senders); - info.receipts.append(&mut best.execution_info.receipts); - info.cumulative_gas_used += best.execution_info.cumulative_gas_used; - info.cumulative_da_bytes_used += best.execution_info.cumulative_da_bytes_used; - info.total_fees += best.execution_info.total_fees; - info.extra.last_flashblock_index = best.execution_info.extra.last_flashblock_index; + .append(&mut execution_info.executed_senders); + info.receipts.append(&mut execution_info.receipts); + info.cumulative_gas_used += execution_info.cumulative_gas_used; + info.cumulative_da_bytes_used += execution_info.cumulative_da_bytes_used; + info.total_fees += execution_info.total_fees; + info.extra.last_flashblock_index = execution_info.extra.last_flashblock_index; // todo state From 5449d3076fdf06b7c8a300288a675fadce0b92f5 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 10:30:21 +0800 Subject: [PATCH 5/8] feat: refresh fb candidate and only build block when better --- .../src/builders/flashblocks/payload.rs | 76 ++++++++++--------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 15110246e..a620e2273 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -211,8 +211,7 @@ impl OpPayloadBuilder where Pool: PoolBounds, Client: ClientBounds + 'static, - BuilderTx: - BuilderTransactions + Send + Sync + 'static, + BuilderTx: BuilderTransactions + Send + Sync, { fn get_op_payload_builder_ctx( &self, @@ -748,7 +747,7 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( @@ -810,11 +809,19 @@ where } } - fn build_candidate< + /// Takes the current best flashblock candidate, execute new transaction and build a new candidate only if it's better + #[expect(clippy::too_many_arguments)] + fn refresh_best_flashblock_candidate< DB: Database + std::fmt::Debug + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, >( &self, + best: Option<( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + )>, + info: &ExecutionInfo, ctx: &OpPayloadBuilderCtx, state: &mut State, state_provider: impl reth::providers::StateProvider + Clone, @@ -837,7 +844,7 @@ where ); // Initialize empty execution info - let mut batch_info: ExecutionInfo = ExecutionInfo::default(); + let mut batch_info = *info.clone(); ctx.execute_best_transactions( &mut batch_info, @@ -859,20 +866,32 @@ where error!(target: "payload_builder", "Error simulating builder txs: {}", e); }; - // build block - let build_result = build_block( + // Check if we can build a better block by comparing execution results + let is_better_candidate = |prev: &ExecutionInfo<_>, new: &ExecutionInfo<_>| { + new.cumulative_gas_used > prev.cumulative_gas_used + }; + if best + .as_ref() + .is_some_and(|(_, _, prev)| !is_better_candidate(prev, &batch_info)) + { + // Not better, nothing to refresh so we can return early + return Ok(best.expect("safe: best matched Some")); + } + + // build block and return new best + build_block( state, // todo ctx, &mut batch_info, ctx.extra_ctx.calculate_state_root || ctx.attributes().no_tx_pool, - ); - build_result - .map(|(payload, mut fb)| { - fb.index = ctx.flashblock_index(); - fb.base = None; - (payload, fb, batch_info) - }) - .wrap_err("failed to build payload") + ) + .map(|(payload, mut fb)| { + fb.index = ctx.flashblock_index(); + fb.base = None; + + (payload, fb, batch_info) + }) + .wrap_err("failed to build payload") } #[expect(clippy::too_many_arguments)] @@ -921,7 +940,7 @@ where // 2. --- Build candidates and update best --- loop { - // If main token got canceled in here that means we received get_payload and we should drop everything and not update best_payload + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { return Ok(None); @@ -933,26 +952,16 @@ where // Build one candidate (blocking here) // todo: would be best to build async and select on candidate_build/block_cancel/fb_cancel - match self.build_candidate( + best = Some(self.refresh_best_flashblock_candidate( + best, + &*info, &*ctx, state, &state_provider, best_txs, target_gas_for_batch, target_da_for_batch, - ) { - Ok(candidate) => { - if best - .as_ref() - .is_none_or(|b| candidate.1.diff.gas_used > b.1.diff.gas_used) - { - best = Some(candidate); - } - } - Err(_) => { - ctx.metrics.invalid_blocks_count.increment(1); - } - } + )?); } // 3. --- Cancellation token received, send best --- @@ -1129,11 +1138,8 @@ impl PayloadBuilder for OpPayloadBuilder - + Clone - + Send - + Sync - + 'static, + BuilderTx: + BuilderTransactions + Clone + Send + Sync, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; From c0371dc1016244721d308140da27fba20803022b Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:07:28 +0800 Subject: [PATCH 6/8] feat: use simulated state in fb continuous building --- crates/op-rbuilder/src/builders/context.rs | 6 ++- .../src/builders/flashblocks/payload.rs | 43 +++++++++++++------ .../src/builders/standard/payload.rs | 2 +- .../src/primitives/reth/execution.rs | 2 +- 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 032e97e69..646419ee0 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,3 +1,4 @@ +use alloy_consensus::transaction::TxHashRef; use alloy_consensus::{Eip658Value, Transaction, conditional::BlockConditionalAttributes}; use alloy_eips::Typed2718; use alloy_evm::Database; @@ -326,10 +327,11 @@ impl OpPayloadBuilderCtx { Ok(info) } - /// Executes the given best transactions and updates the execution info. + /// Simulates the given best transactions. + /// The simulation updates the execution info and commit changes to the db /// /// Returns `Ok(Some(())` if the job was cancelled. - pub(super) fn execute_best_transactions( + pub(super) fn simulate_best_transactions( &self, info: &mut ExecutionInfo, db: &mut State, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index a620e2273..73c8fe770 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -42,6 +42,7 @@ use reth_revm::{ use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; +use revm::database::BundleState; use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, }; @@ -679,7 +680,7 @@ where .set(transaction_pool_fetch_time); let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( + ctx.simulate_best_transactions( info, state, best_txs, @@ -820,10 +821,11 @@ where OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, + BundleState, )>, info: &ExecutionInfo, ctx: &OpPayloadBuilderCtx, - state: &mut State, + state: &State, state_provider: impl reth::providers::StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, target_gas_for_batch: u64, @@ -832,6 +834,7 @@ where OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, + BundleState, )> { // Update iterator best_txs.refresh_iterator( @@ -844,11 +847,19 @@ where ); // Initialize empty execution info - let mut batch_info = *info.clone(); + let mut batch_info = info.clone(); + + // create simulation state + // todo/check: is this also taking into account current state only from state cache? + let mut simulation_state = State::builder() + .with_database(StateProviderDatabase::new(state_provider.clone())) + .with_cached_prestate(state.cache.clone()) + .with_bundle_update() + .build(); - ctx.execute_best_transactions( + ctx.simulate_best_transactions( &mut batch_info, - state, //todo + &mut simulation_state, best_txs, target_gas_for_batch, target_da_for_batch, @@ -860,7 +871,7 @@ where &state_provider, &mut batch_info, ctx, - state, // todo + &mut simulation_state, false, ) { error!(target: "payload_builder", "Error simulating builder txs: {}", e); @@ -872,7 +883,7 @@ where }; if best .as_ref() - .is_some_and(|(_, _, prev)| !is_better_candidate(prev, &batch_info)) + .is_some_and(|(_, _, prev, _)| !is_better_candidate(prev, &batch_info)) { // Not better, nothing to refresh so we can return early return Ok(best.expect("safe: best matched Some")); @@ -880,7 +891,7 @@ where // build block and return new best build_block( - state, // todo + &mut simulation_state, ctx, &mut batch_info, ctx.extra_ctx.calculate_state_root || ctx.attributes().no_tx_pool, @@ -889,7 +900,8 @@ where fb.index = ctx.flashblock_index(); fb.base = None; - (payload, fb, batch_info) + simulation_state.merge_transitions(BundleRetention::Reverts); + (payload, fb, batch_info, simulation_state.take_bundle()) }) .wrap_err("failed to build payload") } @@ -936,6 +948,7 @@ where OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, + BundleState, )> = None; // 2. --- Build candidates and update best --- @@ -965,9 +978,16 @@ where } // 3. --- Cancellation token received, send best --- - let (payload, fb_payload, mut execution_info) = + let (payload, fb_payload, mut execution_info, _bundle_state) = best.wrap_err("No best flashblock payload")?; + // Apply state mutations from best + // todo, something like: + // state = StateBuilder::new() + // .with_database(move state.database) + // .with_bundle_prestate(bundle_state) + // .build(); + // Directly send payloads let _flashblock_byte_size = self .ws_pub @@ -976,7 +996,6 @@ where self.send_payload_to_engine(payload.clone()); best_payload.set(payload); - // Apply state mutations from best let batch_new_transactions = execution_info .executed_transactions .to_vec() @@ -997,8 +1016,6 @@ where info.total_fees += execution_info.total_fees; info.extra.last_flashblock_index = execution_info.extra.last_flashblock_index; - // todo state - // Update bundle_state for next iteration if let Some(da_limit) = ctx.extra_ctx.da_per_batch { if let Some(da) = target_da_for_batch.as_mut() { diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index 66e0adda4..2a344acef 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -388,7 +388,7 @@ impl OpBuilder<'_, Txs> { .set(transaction_pool_fetch_time); if ctx - .execute_best_transactions( + .simulate_best_transactions( &mut info, db, &mut best_txs, diff --git a/crates/op-rbuilder/src/primitives/reth/execution.rs b/crates/op-rbuilder/src/primitives/reth/execution.rs index b2591c212..eedaab670 100644 --- a/crates/op-rbuilder/src/primitives/reth/execution.rs +++ b/crates/op-rbuilder/src/primitives/reth/execution.rs @@ -24,7 +24,7 @@ pub enum TxnExecutionResult { MaxGasUsageExceeded, } -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct ExecutionInfo { /// All executed transactions (unrecovered). pub executed_transactions: Vec, From 6aac3581e2ea4ebb916c38f3a36d37308c39beb5 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 27 Oct 2025 23:48:01 +0800 Subject: [PATCH 7/8] fix: execution info mutations --- .../src/builders/flashblocks/payload.rs | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 73c8fe770..0705ca04a 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -988,16 +988,8 @@ where // .with_bundle_prestate(bundle_state) // .build(); - // Directly send payloads - let _flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; - self.send_payload_to_engine(payload.clone()); - best_payload.set(payload); - - let batch_new_transactions = execution_info - .executed_transactions + let batch_new_transactions = execution_info.executed_transactions + [execution_info.extra.last_flashblock_index..] .to_vec() .iter() .map(|tx| tx.tx_hash()) @@ -1005,16 +997,8 @@ where // update best txns best_txs.mark_commited(batch_new_transactions); - // update batch execution info - info.executed_transactions - .append(&mut execution_info.executed_transactions); - info.executed_senders - .append(&mut execution_info.executed_senders); - info.receipts.append(&mut execution_info.receipts); - info.cumulative_gas_used += execution_info.cumulative_gas_used; - info.cumulative_da_bytes_used += execution_info.cumulative_da_bytes_used; - info.total_fees += execution_info.total_fees; - info.extra.last_flashblock_index = execution_info.extra.last_flashblock_index; + // update execution info + *info = execution_info; // Update bundle_state for next iteration if let Some(da_limit) = ctx.extra_ctx.da_per_batch { @@ -1033,6 +1017,14 @@ where .clone() .next(target_gas_for_batch, target_da_for_batch); + // Send payloads + let _flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.send_payload_to_engine(payload.clone()); + best_payload.set(payload); + Ok(Some(next_extra_ctx)) } From 2c5020199a2ec2ac9ce5e07baa1a0e0000ee5921 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Fri, 31 Oct 2025 17:52:53 +0800 Subject: [PATCH 8/8] fix: continuous candidate simulation state --- .../src/builders/flashblocks/payload.rs | 86 +++++++++---------- 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 0705ca04a..fa9d5e844 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -18,7 +18,7 @@ use alloy_consensus::{ use alloy_eips::{Encodable2718, eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE}; use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; use core::time::Duration; -use eyre::{ContextCompat, WrapErr as _}; +use eyre::WrapErr as _; use reth::payload::PayloadBuilderAttributes; use reth_basic_payload_builder::BuildOutcome; use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates}; @@ -36,13 +36,13 @@ use reth_provider::{ ExecutionOutcome, HashedPostStateProvider, ProviderError, StateRootProvider, StorageRootProvider, }; +use reth_revm::db::CacheState; use reth_revm::{ State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, }; use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; -use revm::database::BundleState; use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, }; @@ -303,7 +303,8 @@ where let timestamp = config.attributes.timestamp(); let calculate_state_root = self.config.specific.calculate_state_root; - let enable_continuous_building = self.config.specific.enable_continuous_building; + //let enable_continuous_building = self.config.specific.enable_continuous_building; + let enable_continuous_building = true; let ctx = self .get_op_payload_builder_ctx( @@ -811,6 +812,7 @@ where } /// Takes the current best flashblock candidate, execute new transaction and build a new candidate only if it's better + /// No state is mutated, but it can be applied later by replacing `state.cache` with the returned `CacheState`. #[expect(clippy::too_many_arguments)] fn refresh_best_flashblock_candidate< DB: Database + std::fmt::Debug + AsRef

, @@ -821,10 +823,10 @@ where OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, - BundleState, + CacheState, )>, - info: &ExecutionInfo, ctx: &OpPayloadBuilderCtx, + info: &ExecutionInfo, state: &State, state_provider: impl reth::providers::StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, @@ -834,9 +836,20 @@ where OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, - BundleState, + CacheState, )> { - // Update iterator + info!("[continuous] building new candidate"); + // Clone execution info + let mut batch_info = info.clone(); + + // create simulation state + let mut simulation_state = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(state.cache.clone()) + .with_bundle_update() + .build(); + + // Refresh pool txs best_txs.refresh_iterator( BestPayloadTransactions::new( self.pool @@ -846,22 +859,11 @@ where ctx.flashblock_index(), ); - // Initialize empty execution info - let mut batch_info = info.clone(); - - // create simulation state - // todo/check: is this also taking into account current state only from state cache? - let mut simulation_state = State::builder() - .with_database(StateProviderDatabase::new(state_provider.clone())) - .with_cached_prestate(state.cache.clone()) - .with_bundle_update() - .build(); - ctx.simulate_best_transactions( &mut batch_info, &mut simulation_state, best_txs, - target_gas_for_batch, + target_gas_for_batch.min(ctx.block_gas_limit()), target_da_for_batch, ) .wrap_err("failed to execute best transactions")?; @@ -889,6 +891,8 @@ where return Ok(best.expect("safe: best matched Some")); } + info!("[continuous] new best candidate found!"); + // build block and return new best build_block( &mut simulation_state, @@ -900,8 +904,7 @@ where fb.index = ctx.flashblock_index(); fb.base = None; - simulation_state.merge_transitions(BundleRetention::Reverts); - (payload, fb, batch_info, simulation_state.take_bundle()) + (payload, fb, batch_info, simulation_state.cache) }) .wrap_err("failed to build payload") } @@ -942,20 +945,19 @@ where *da_limit = da_limit.saturating_sub(builder_tx_da_size); } - let target_gas_for_batch = target_gas_for_batch.min(ctx.block_gas_limit()); - + // 2. --- Build candidates and update best --- let mut best: Option<( OpBuiltPayload, FlashblocksPayloadV1, ExecutionInfo, - BundleState, + CacheState, )> = None; - // 2. --- Build candidates and update best --- loop { // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { + info!("[continuous] block cancelled"); return Ok(None); } // interval end: abort worker and publish current best immediately (below) @@ -964,11 +966,10 @@ where } // Build one candidate (blocking here) - // todo: would be best to build async and select on candidate_build/block_cancel/fb_cancel best = Some(self.refresh_best_flashblock_candidate( best, - &*info, &*ctx, + &*info, state, &state_provider, best_txs, @@ -977,29 +978,35 @@ where )?); } + info!("[continuous] interval finished, sending best"); + // 3. --- Cancellation token received, send best --- - let (payload, fb_payload, mut execution_info, _bundle_state) = - best.wrap_err("No best flashblock payload")?; + let (payload, fb_payload, execution_info, cache_state) = best.unwrap(); // Apply state mutations from best - // todo, something like: - // state = StateBuilder::new() - // .with_database(move state.database) - // .with_bundle_prestate(bundle_state) - // .build(); + state.cache = cache_state; + // Mark selected transactions as commited let batch_new_transactions = execution_info.executed_transactions [execution_info.extra.last_flashblock_index..] .to_vec() .iter() .map(|tx| tx.tx_hash()) .collect::>(); - // update best txns + // warn: it also marks the top of blocks builder_txs best_txs.mark_commited(batch_new_transactions); // update execution info *info = execution_info; + // Send payloads + let _flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.send_payload_to_engine(payload.clone()); + best_payload.set(payload); + // Update bundle_state for next iteration if let Some(da_limit) = ctx.extra_ctx.da_per_batch { if let Some(da) = target_da_for_batch.as_mut() { @@ -1016,15 +1023,6 @@ where .extra_ctx .clone() .next(target_gas_for_batch, target_da_for_batch); - - // Send payloads - let _flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; - self.send_payload_to_engine(payload.clone()); - best_payload.set(payload); - Ok(Some(next_extra_ctx)) }