From ac12785772926be2a9bb7c13e08d1b5f054548c4 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 11 Sep 2025 19:41:34 -0400 Subject: [PATCH 1/9] move flashblock building to its own fn, remove unused tokens/channels --- .../src/builders/flashblocks/best_txs.rs | 20 +- .../src/builders/flashblocks/payload.rs | 488 +++++++++--------- crates/op-rbuilder/src/builders/generator.rs | 8 +- 3 files changed, 259 insertions(+), 257 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs index e8d73bad5..9bae7d2c8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs @@ -1,7 +1,7 @@ use alloy_primitives::{Address, TxHash}; use reth_payload_util::PayloadTransactions; -use reth_transaction_pool::PoolTransaction; -use std::collections::HashSet; +use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction}; +use std::{collections::HashSet, sync::Arc}; use tracing::debug; use crate::tx::MaybeFlashblockFilter; @@ -9,9 +9,9 @@ use crate::tx::MaybeFlashblockFilter; pub struct BestFlashblocksTxs where T: PoolTransaction, - I: PayloadTransactions, + I: Iterator>>, { - inner: I, + inner: reth_payload_util::BestPayloadTransactions, current_flashblock_number: u64, // Transactions that were already commited to the state. Using them again would cause NonceTooLow // so we skip them @@ -21,9 +21,9 @@ where impl BestFlashblocksTxs where T: PoolTransaction, - I: PayloadTransactions, + I: Iterator>>, { - pub fn new(inner: I) -> Self { + pub fn new(inner: reth_payload_util::BestPayloadTransactions) -> Self { Self { inner, current_flashblock_number: 0, @@ -33,7 +33,11 @@ where /// Replaces current iterator with new one. We use it on new flashblock building, to refresh /// priority boundaries - pub fn refresh_iterator(&mut self, inner: I, current_flashblock_number: u64) { + pub fn refresh_iterator( + &mut self, + inner: reth_payload_util::BestPayloadTransactions, + current_flashblock_number: u64, + ) { self.inner = inner; self.current_flashblock_number = current_flashblock_number; } @@ -47,7 +51,7 @@ where impl PayloadTransactions for BestFlashblocksTxs where T: PoolTransaction + MaybeFlashblockFilter, - I: PayloadTransactions, + I: Iterator>>, { type Transaction = T; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 3110e1c6d..03283bf7c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -37,6 +37,7 @@ use reth_provider::{ use reth_revm::{ State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, }; +use reth_transaction_pool::TransactionPool; use revm::Database; use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, @@ -47,13 +48,22 @@ use std::{ sync::{Arc, OnceLock}, time::Instant, }; -use tokio::sync::{ - mpsc, - mpsc::{Sender, error::SendError}, -}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; +type NextBestFlashblocksTxs = BestFlashblocksTxs< + ::Transaction, + Box< + dyn reth_transaction_pool::BestTransactions< + Item = Arc< + reth_transaction_pool::ValidPoolTransaction< + ::Transaction, + >, + >, + >, + >, +>; + #[derive(Debug, Default)] struct ExtraExecutionInfo { /// Index of the last consumed flashblock @@ -194,7 +204,7 @@ where /// Given build arguments including an Optimism client, transaction pool, /// and configuration, this function creates a transaction payload. Returns /// a result indicating success with the payload or an error in case of failure. - fn build_payload( + async fn build_payload( &self, args: BuildArguments, OpBuiltPayload>, best_payload: BlockCell, @@ -388,14 +398,24 @@ where self.pool .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); - // This channel coordinates flashblock building - let (fb_cancel_token_rx, mut fb_cancel_token_tx) = - mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); - self.spawn_timer_task( - block_cancel.clone(), - fb_cancel_token_rx, - first_flashblock_offset, - ); + + let mut timer = tokio::time::interval(first_flashblock_offset); + tokio::select! { + _ = timer.tick() => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, channel closed or job cancelled", + ); + return Ok(()); + } + _ = block_cancel.cancelled() => { + + } + }; + // Process flashblocks in a blocking loop loop { let fb_span = if span.is_none() { @@ -409,193 +429,176 @@ where }; let _entered = fb_span.enter(); - // We get token from time loop. Token from this channel means that we need to start build flashblock - // Cancellation of this token means that we need to stop building flashblock. - // If channel return None it means that we built all flashblock or parent_token got cancelled - let fb_cancel_token = - tokio::task::block_in_place(|| fb_cancel_token_tx.blocking_recv()).flatten(); - - match fb_cancel_token { - Some(cancel_token) => { - // We use fb_cancel_token inside context so we could exit from - // execute_best_transaction without cancelling parent token - ctx.cancel = cancel_token; - // TODO: remove this - if ctx.flashblock_index() >= ctx.target_flashblock_count() { - info!( - target: "payload_builder", - target = ctx.target_flashblock_count(), - flashblock_count = ctx.flashblock_index(), - block_number = ctx.block_number(), - "Skipping flashblock reached target", - ); - continue; - } - // Continue with flashblock building - info!( + (best_txs, total_gas_per_batch) = match self.build_next_flashblock( + &mut ctx, + &mut info, + total_gas_per_batch, + total_da_per_batch, + builder_tx_da_size, + builder_tx_gas, + &mut state, + best_txs, + &block_cancel, + flashblocks_per_block, + message.clone(), + &best_payload, + gas_per_batch, + da_per_batch, + &fb_span, + ) { + Ok((best_txs, total_gas_per_batch)) => (best_txs, total_gas_per_batch), + Err(err) => { + error!( target: "payload_builder", - block_number = ctx.block_number(), - flashblock_count = ctx.flashblock_index(), - target_gas = total_gas_per_batch, - gas_used = info.cumulative_gas_used, - target_da = total_da_per_batch.unwrap_or(0), - da_used = info.cumulative_da_bytes_used, - "Building flashblock", - ); - let flashblock_build_start_time = Instant::now(); - // If it is the last flashblock, we need to account for the builder tx - if ctx.is_last_flashblock() { - total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); - // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size - if let Some(da_limit) = total_da_per_batch.as_mut() { - *da_limit = da_limit.saturating_sub(builder_tx_da_size); - } - } - - let best_txs_start_time = Instant::now(); - best_txs.refresh_iterator( - BestPayloadTransactions::new( - self.pool.best_transactions_with_attributes( - ctx.best_transaction_attributes(), - ), - ), + "Failed to build flashblock {}, flashblock {}: {}", + ctx.block_number(), ctx.flashblock_index(), + err ); - let transaction_pool_fetch_time = best_txs_start_time.elapsed(); - ctx.metrics - .transaction_pool_fetch_duration - .record(transaction_pool_fetch_time); - ctx.metrics - .transaction_pool_fetch_gauge - .set(transaction_pool_fetch_time); - - let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( - &mut info, - &mut state, - &mut best_txs, - total_gas_per_batch.min(ctx.block_gas_limit()), - total_da_per_batch, - )?; - // Extract last transactions - let new_transactions = info.executed_transactions - [info.extra.last_flashblock_index..] - .to_vec() - .iter() - .map(|tx| tx.tx_hash()) - .collect::>(); - best_txs.mark_commited(new_transactions); - - // We got block cancelled, we won't need anything from the block at this point - // Caution: this assume that block cancel token only cancelled when new FCU is received - if block_cancel.is_cancelled() { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, channel closed or job cancelled", - ); - return Ok(()); - } - - let payload_tx_simulation_time = tx_execution_start_time.elapsed(); - ctx.metrics - .payload_tx_simulation_duration - .record(payload_tx_simulation_time); - ctx.metrics - .payload_tx_simulation_gauge - .set(payload_tx_simulation_time); - - // If it is the last flashblocks, add the builder txn to the block if enabled - if ctx.is_last_flashblock() { - ctx.add_builder_tx(&mut info, &mut state, builder_tx_gas, message.clone()); - }; - - let total_block_built_duration = Instant::now(); - let build_result = build_block(&mut state, &ctx, &mut info); - let total_block_built_duration = total_block_built_duration.elapsed(); - ctx.metrics - .total_block_built_duration - .record(total_block_built_duration); - ctx.metrics - .total_block_built_gauge - .set(total_block_built_duration); - - // Handle build errors with match pattern - match build_result { - Err(err) => { - // Track invalid/bad block - ctx.metrics.invalid_blocks_count.increment(1); - error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), ctx.flashblock_index(), err); - // Return the error - return Err(err); - } - Ok((new_payload, mut fb_payload)) => { - fb_payload.index = ctx.increment_flashblock_index(); // fallback block is index 0, so we need to increment here - fb_payload.base = None; - - // We check that child_job got cancelled before sending flashblock. - // This will ensure consistent timing between flashblocks. - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(async { ctx.cancel.cancelled().await }); - }); - - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload - // To ensure that we will return same blocks as rollup-boost (to leverage caches) - if block_cancel.is_cancelled() { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, channel closed or job cancelled", - ); - return Ok(()); - } - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - - // Record flashblock build duration - ctx.metrics - .flashblock_build_duration - .record(flashblock_build_start_time.elapsed()); - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - ctx.metrics - .flashblock_num_tx_histogram - .record(info.executed_transactions.len() as f64); - - best_payload.set(new_payload.clone()); - self.send_payload_to_engine(new_payload); - // Update bundle_state for next iteration - total_gas_per_batch += gas_per_batch; - if let Some(da_limit) = da_per_batch { - if let Some(da) = total_da_per_batch.as_mut() { - *da += da_limit; - } else { - error!( - "Builder end up in faulty invariant, if da_per_batch is set then total_da_per_batch must be set" - ); - } - } - - info!( - target: "payload_builder", - message = "Flashblock built", - flashblock_count = ctx.flashblock_index(), - current_gas = info.cumulative_gas_used, - current_da = info.cumulative_da_bytes_used, - target_flashblocks = flashblocks_per_block, - ); - } - } + return Err(err); } - None => { + } + } + } + + fn build_next_flashblock< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &mut OpPayloadBuilderCtx, + mut info: &mut ExecutionInfo, + mut total_gas_per_batch: u64, + mut total_da_per_batch: Option, + builder_tx_da_size: u64, + builder_tx_gas: u64, + mut state: &mut State, + mut best_txs: NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + flashblocks_per_block: u64, + message: Vec, + best_payload: &BlockCell, + gas_per_batch: u64, + da_per_batch: Option, + span: &tracing::Span, + ) -> Result<(NextBestFlashblocksTxs, u64), PayloadBuilderError> { + // TODO: remove this + if ctx.flashblock_index() >= ctx.target_flashblock_count() { + info!( + target: "payload_builder", + target = ctx.target_flashblock_count(), + flashblock_count = ctx.flashblock_index(), + block_number = ctx.block_number(), + "Skipping flashblock reached target", + ); + return Ok((best_txs, total_gas_per_batch)); + }; + + // Continue with flashblock building + info!( + target: "payload_builder", + block_number = ctx.block_number(), + flashblock_count = ctx.flashblock_index(), + target_gas = total_gas_per_batch, + gas_used = info.cumulative_gas_used, + target_da = total_da_per_batch.unwrap_or(0), + da_used = info.cumulative_da_bytes_used, + "Building flashblock", + ); + let flashblock_build_start_time = Instant::now(); + // If it is the last flashblock, we need to account for the builder tx + if ctx.is_last_flashblock() { + total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size + if let Some(da_limit) = total_da_per_batch.as_mut() { + *da_limit = da_limit.saturating_sub(builder_tx_da_size); + } + } + + let best_txs_start_time = Instant::now(); + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()), + ), + ctx.flashblock_index(), + ); + let transaction_pool_fetch_time = best_txs_start_time.elapsed(); + ctx.metrics + .transaction_pool_fetch_duration + .record(transaction_pool_fetch_time); + ctx.metrics + .transaction_pool_fetch_gauge + .set(transaction_pool_fetch_time); + + let tx_execution_start_time = Instant::now(); + ctx.execute_best_transactions( + &mut info, + &mut state, + &mut best_txs, + total_gas_per_batch.min(ctx.block_gas_limit()), + total_da_per_batch, + )?; + // Extract last transactions + let new_transactions = info.executed_transactions[info.extra.last_flashblock_index..] + .to_vec() + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + best_txs.mark_commited(new_transactions); + + // We got block cancelled, we won't need anything from the block at this point + // Caution: this assume that block cancel token only cancelled when new FCU is received + if block_cancel.is_cancelled() { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, channel closed or job cancelled", + ); + return Ok((best_txs, total_gas_per_batch)); + } + + let payload_tx_simulation_time = tx_execution_start_time.elapsed(); + ctx.metrics + .payload_tx_simulation_duration + .record(payload_tx_simulation_time); + ctx.metrics + .payload_tx_simulation_gauge + .set(payload_tx_simulation_time); + + // If it is the last flashblocks, add the builder txn to the block if enabled + if ctx.is_last_flashblock() { + ctx.add_builder_tx(&mut info, &mut state, builder_tx_gas, message.clone()); + }; + + let total_block_built_duration = Instant::now(); + let build_result = build_block(&mut state, &ctx, &mut info); + let total_block_built_duration = total_block_built_duration.elapsed(); + ctx.metrics + .total_block_built_duration + .record(total_block_built_duration); + ctx.metrics + .total_block_built_gauge + .set(total_block_built_duration); + + // Handle build errors with match pattern + match build_result { + Err(err) => { + // Track invalid/bad block + ctx.metrics.invalid_blocks_count.increment(1); + error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), ctx.flashblock_index(), err); + // Return the error + return Err(err); + } + Ok((new_payload, mut fb_payload)) => { + fb_payload.index = ctx.increment_flashblock_index(); // fallback block is index 0, so we need to increment here + fb_payload.base = None; + + // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // To ensure that we will return same blocks as rollup-boost (to leverage caches) + if block_cancel.is_cancelled() { self.record_flashblocks_metrics( &ctx, &info, @@ -603,10 +606,49 @@ where &span, "Payload building complete, channel closed or job cancelled", ); - return Ok(()); + return Ok((best_txs, total_gas_per_batch)); + } + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .map_err(PayloadBuilderError::other)?; + + // Record flashblock build duration + ctx.metrics + .flashblock_build_duration + .record(flashblock_build_start_time.elapsed()); + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); + ctx.metrics + .flashblock_num_tx_histogram + .record(info.executed_transactions.len() as f64); + + best_payload.set(new_payload.clone()); + self.send_payload_to_engine(new_payload); + // Update bundle_state for next iteration + total_gas_per_batch += gas_per_batch; + if let Some(da_limit) = da_per_batch { + if let Some(da) = total_da_per_batch.as_mut() { + *da += da_limit; + } else { + error!( + "Builder end up in faulty invariant, if da_per_batch is set then total_da_per_batch must be set" + ); + } } + + info!( + target: "payload_builder", + message = "Flashblock built", + flashblock_count = ctx.flashblock_index(), + current_gas = info.cumulative_gas_used, + current_da = info.cumulative_da_bytes_used, + target_flashblocks = flashblocks_per_block, + ); } } + Ok((best_txs, total_gas_per_batch)) } /// Do some logging and metric recording when we stop build flashblocks @@ -662,53 +704,6 @@ where } } - /// Spawn task that will send new flashblock level cancel token in steady intervals (first interval - /// may vary if --flashblocks.dynamic enabled) - pub fn spawn_timer_task( - &self, - block_cancel: CancellationToken, - flashblock_cancel_token_rx: Sender>, - first_flashblock_offset: Duration, - ) { - let interval = self.config.specific.interval; - tokio::spawn(async move { - let cancelled: Option>>> = block_cancel - .run_until_cancelled(async { - // Create first fb interval already started - let mut timer = tokio::time::interval(first_flashblock_offset); - timer.tick().await; - let child_token = block_cancel.child_token(); - flashblock_cancel_token_rx - .send(Some(child_token.clone())) - .await?; - timer.tick().await; - // Time to build flashblock has ended so we cancel the token - child_token.cancel(); - // We would start using regular intervals from here on - let mut timer = tokio::time::interval(interval); - timer.tick().await; - loop { - // Initiate fb job - let child_token = block_cancel.child_token(); - debug!(target: "payload_builder", "Sending child cancel token to execution loop"); - flashblock_cancel_token_rx - .send(Some(child_token.clone())) - .await?; - timer.tick().await; - debug!(target: "payload_builder", "Cancelling child token to complete flashblock"); - // Cancel job once time is up - child_token.cancel(); - } - }) - .await; - if let Some(Err(err)) = cancelled { - error!(target: "payload_builder", "Timer task encountered error: {err}"); - } else { - info!(target: "payload_builder", "Building job cancelled, stopping payload building"); - } - }); - } - /// Calculate number of flashblocks. /// If dynamic is enabled this function will take time drift into the account. pub fn calculate_flashblocks(&self, timestamp: u64) -> (u64, Duration) { @@ -771,6 +766,7 @@ where } } +#[async_trait::async_trait] impl crate::builders::generator::PayloadBuilder for OpPayloadBuilder where @@ -781,12 +777,12 @@ where type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; - fn try_build( + async fn try_build( &self, args: BuildArguments, best_payload: BlockCell, ) -> Result<(), PayloadBuilderError> { - self.build_payload(args, best_payload) + self.build_payload(args, best_payload).await } } diff --git a/crates/op-rbuilder/src/builders/generator.rs b/crates/op-rbuilder/src/builders/generator.rs index c77930835..4413c7475 100644 --- a/crates/op-rbuilder/src/builders/generator.rs +++ b/crates/op-rbuilder/src/builders/generator.rs @@ -34,6 +34,7 @@ use tracing::info; /// /// Generic parameters `Pool` and `Client` represent the transaction pool and /// Ethereum client types. +#[async_trait::async_trait] pub trait PayloadBuilder: Send + Sync + Clone { /// The payload attributes type to accept for building. type Attributes: PayloadBuilderAttributes; @@ -52,7 +53,7 @@ pub trait PayloadBuilder: Send + Sync + Clone { /// # Returns /// /// A `Result` indicating the build outcome or an error. - fn try_build( + async fn try_build( &self, args: BuildArguments, best_payload: BlockCell, @@ -329,7 +330,7 @@ where cancel, }; - let result = builder.try_build(args, cell); + let result = builder.try_build(args, cell).await; let _ = tx.send(result); })); } @@ -605,6 +606,7 @@ mod tests { Cancelled, } + #[async_trait::async_trait] impl PayloadBuilder for MockBuilder where N: OpPayloadPrimitives, @@ -612,7 +614,7 @@ mod tests { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = MockPayload; - fn try_build( + async fn try_build( &self, args: BuildArguments, _best_payload: BlockCell, From 03f41764803c474a03582b6f284a7eb2ca4a6a0d Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 11 Sep 2025 20:22:29 -0400 Subject: [PATCH 2/9] fix ticker timing --- .../src/builders/flashblocks/payload.rs | 82 +++++++++++++++---- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 03283bf7c..4215e6fc5 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -399,22 +399,33 @@ where .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); - let mut timer = tokio::time::interval(first_flashblock_offset); + // // This channel coordinates flashblock building + // let (fb_cancel_token_tx, mut fb_cancel_token_rx) = + // tokio::sync::mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + // self.spawn_timer_task( + // block_cancel.clone(), + // fb_cancel_token_tx, + // first_flashblock_offset, + // ); + + let mut timer = tokio::time::interval(self.config.specific.interval); + let first_flashblock_offset_signal = tokio::time::sleep(first_flashblock_offset); + tokio::select! { - _ = timer.tick() => { + _ = first_flashblock_offset_signal => { + // wait for the first offset to pass + } + _ = block_cancel.cancelled() => { self.record_flashblocks_metrics( &ctx, &info, flashblocks_per_block, &span, - "Payload building complete, channel closed or job cancelled", + "Payload building cancelled before starting", ); return Ok(()); } - _ = block_cancel.cancelled() => { - - } - }; + } // Process flashblocks in a blocking loop loop { @@ -429,7 +440,40 @@ where }; let _entered = fb_span.enter(); - (best_txs, total_gas_per_batch) = match self.build_next_flashblock( + // // We get token from time loop. Token from this channel means that we need to start build flashblock + // // Cancellation of this token means that we need to stop building flashblock. + // // If channel return None it means that we built all flashblock or parent_token got cancelled + // let fb_cancel_token = + // tokio::task::block_in_place(|| fb_cancel_token_rx.blocking_recv()).flatten(); + + // if fb_cancel_token.is_none() { + // self.record_flashblocks_metrics( + // &ctx, + // &info, + // flashblocks_per_block, + // &span, + // "Payload building complete, channel closed or job cancelled", + // ); + // return Ok(()); + // } + + tokio::select! { + _ = timer.tick() => { + // time to build next flashblock + } + _ = block_cancel.cancelled() => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, channel closed or job cancelled", + ); + return Ok(()); + } + } + + (best_txs, total_gas_per_batch, total_da_per_batch) = match self.build_next_flashblock( &mut ctx, &mut info, total_gas_per_batch, @@ -446,7 +490,9 @@ where da_per_batch, &fb_span, ) { - Ok((best_txs, total_gas_per_batch)) => (best_txs, total_gas_per_batch), + Ok((best_txs, total_gas_per_batch, total_da_per_batch)) => { + (best_txs, total_gas_per_batch, total_da_per_batch) + } Err(err) => { error!( target: "payload_builder", @@ -467,7 +513,7 @@ where >( &self, ctx: &mut OpPayloadBuilderCtx, - mut info: &mut ExecutionInfo, + info: &mut ExecutionInfo, mut total_gas_per_batch: u64, mut total_da_per_batch: Option, builder_tx_da_size: u64, @@ -481,7 +527,7 @@ where gas_per_batch: u64, da_per_batch: Option, span: &tracing::Span, - ) -> Result<(NextBestFlashblocksTxs, u64), PayloadBuilderError> { + ) -> Result<(NextBestFlashblocksTxs, u64, Option), PayloadBuilderError> { // TODO: remove this if ctx.flashblock_index() >= ctx.target_flashblock_count() { info!( @@ -491,7 +537,7 @@ where block_number = ctx.block_number(), "Skipping flashblock reached target", ); - return Ok((best_txs, total_gas_per_batch)); + return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); }; // Continue with flashblock building @@ -533,7 +579,7 @@ where let tx_execution_start_time = Instant::now(); ctx.execute_best_transactions( - &mut info, + info, &mut state, &mut best_txs, total_gas_per_batch.min(ctx.block_gas_limit()), @@ -557,7 +603,7 @@ where &span, "Payload building complete, channel closed or job cancelled", ); - return Ok((best_txs, total_gas_per_batch)); + return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); } let payload_tx_simulation_time = tx_execution_start_time.elapsed(); @@ -570,11 +616,11 @@ where // If it is the last flashblocks, add the builder txn to the block if enabled if ctx.is_last_flashblock() { - ctx.add_builder_tx(&mut info, &mut state, builder_tx_gas, message.clone()); + ctx.add_builder_tx(info, &mut state, builder_tx_gas, message.clone()); }; let total_block_built_duration = Instant::now(); - let build_result = build_block(&mut state, &ctx, &mut info); + let build_result = build_block(&mut state, &ctx, info); let total_block_built_duration = total_block_built_duration.elapsed(); ctx.metrics .total_block_built_duration @@ -606,7 +652,7 @@ where &span, "Payload building complete, channel closed or job cancelled", ); - return Ok((best_txs, total_gas_per_batch)); + return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); } let flashblock_byte_size = self .ws_pub @@ -648,7 +694,7 @@ where ); } } - Ok((best_txs, total_gas_per_batch)) + Ok((best_txs, total_gas_per_batch, total_da_per_batch)) } /// Do some logging and metric recording when we stop build flashblocks From de140219ea8e00f9dad8114a9e86c791c4b96fe2 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 11 Sep 2025 20:23:53 -0400 Subject: [PATCH 3/9] remove unused code --- .../src/builders/flashblocks/payload.rs | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 4215e6fc5..11675a13e 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -399,15 +399,6 @@ where .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); - // // This channel coordinates flashblock building - // let (fb_cancel_token_tx, mut fb_cancel_token_rx) = - // tokio::sync::mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); - // self.spawn_timer_task( - // block_cancel.clone(), - // fb_cancel_token_tx, - // first_flashblock_offset, - // ); - let mut timer = tokio::time::interval(self.config.specific.interval); let first_flashblock_offset_signal = tokio::time::sleep(first_flashblock_offset); @@ -440,23 +431,6 @@ where }; let _entered = fb_span.enter(); - // // We get token from time loop. Token from this channel means that we need to start build flashblock - // // Cancellation of this token means that we need to stop building flashblock. - // // If channel return None it means that we built all flashblock or parent_token got cancelled - // let fb_cancel_token = - // tokio::task::block_in_place(|| fb_cancel_token_rx.blocking_recv()).flatten(); - - // if fb_cancel_token.is_none() { - // self.record_flashblocks_metrics( - // &ctx, - // &info, - // flashblocks_per_block, - // &span, - // "Payload building complete, channel closed or job cancelled", - // ); - // return Ok(()); - // } - tokio::select! { _ = timer.tick() => { // time to build next flashblock From a05f1b7d0ff18d697a4add98d367911d4297932d Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 11 Sep 2025 20:33:13 -0400 Subject: [PATCH 4/9] remove return params in favour of mut ref --- .../src/builders/flashblocks/payload.rs | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 11675a13e..0565c0267 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -447,15 +447,15 @@ where } } - (best_txs, total_gas_per_batch, total_da_per_batch) = match self.build_next_flashblock( + match self.build_next_flashblock( &mut ctx, &mut info, - total_gas_per_batch, - total_da_per_batch, + &mut total_gas_per_batch, + &mut total_da_per_batch, builder_tx_da_size, builder_tx_gas, &mut state, - best_txs, + &mut best_txs, &block_cancel, flashblocks_per_block, message.clone(), @@ -464,9 +464,7 @@ where da_per_batch, &fb_span, ) { - Ok((best_txs, total_gas_per_batch, total_da_per_batch)) => { - (best_txs, total_gas_per_batch, total_da_per_batch) - } + Ok(()) => {} Err(err) => { error!( target: "payload_builder", @@ -488,12 +486,12 @@ where &self, ctx: &mut OpPayloadBuilderCtx, info: &mut ExecutionInfo, - mut total_gas_per_batch: u64, - mut total_da_per_batch: Option, + total_gas_per_batch: &mut u64, + total_da_per_batch: &mut Option, builder_tx_da_size: u64, builder_tx_gas: u64, mut state: &mut State, - mut best_txs: NextBestFlashblocksTxs, + best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, flashblocks_per_block: u64, message: Vec, @@ -501,7 +499,7 @@ where gas_per_batch: u64, da_per_batch: Option, span: &tracing::Span, - ) -> Result<(NextBestFlashblocksTxs, u64, Option), PayloadBuilderError> { + ) -> Result<(), PayloadBuilderError> { // TODO: remove this if ctx.flashblock_index() >= ctx.target_flashblock_count() { info!( @@ -511,7 +509,7 @@ where block_number = ctx.block_number(), "Skipping flashblock reached target", ); - return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); + return Ok(()); }; // Continue with flashblock building @@ -528,7 +526,7 @@ where let flashblock_build_start_time = Instant::now(); // If it is the last flashblock, we need to account for the builder tx if ctx.is_last_flashblock() { - total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); + *total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size if let Some(da_limit) = total_da_per_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); @@ -555,9 +553,9 @@ where ctx.execute_best_transactions( info, &mut state, - &mut best_txs, - total_gas_per_batch.min(ctx.block_gas_limit()), - total_da_per_batch, + best_txs, + (*total_gas_per_batch).min(ctx.block_gas_limit()), + *total_da_per_batch, )?; // Extract last transactions let new_transactions = info.executed_transactions[info.extra.last_flashblock_index..] @@ -577,7 +575,7 @@ where &span, "Payload building complete, channel closed or job cancelled", ); - return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); + return Ok(()); } let payload_tx_simulation_time = tx_execution_start_time.elapsed(); @@ -626,7 +624,7 @@ where &span, "Payload building complete, channel closed or job cancelled", ); - return Ok((best_txs, total_gas_per_batch, total_da_per_batch)); + return Ok(()); } let flashblock_byte_size = self .ws_pub @@ -647,7 +645,7 @@ where best_payload.set(new_payload.clone()); self.send_payload_to_engine(new_payload); // Update bundle_state for next iteration - total_gas_per_batch += gas_per_batch; + *total_gas_per_batch += gas_per_batch; if let Some(da_limit) = da_per_batch { if let Some(da) = total_da_per_batch.as_mut() { *da += da_limit; @@ -668,7 +666,7 @@ where ); } } - Ok((best_txs, total_gas_per_batch, total_da_per_batch)) + Ok(()) } /// Do some logging and metric recording when we stop build flashblocks From 07888d282fa803c4dad70c3d3217a21b52ba9e89 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 11 Sep 2025 21:28:27 -0400 Subject: [PATCH 5/9] clippy --- .../src/builders/flashblocks/payload.rs | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 0565c0267..8d101156b 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -479,6 +479,7 @@ where } } + #[allow(clippy::too_many_arguments)] fn build_next_flashblock< DB: Database + std::fmt::Debug + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, @@ -490,7 +491,7 @@ where total_da_per_batch: &mut Option, builder_tx_da_size: u64, builder_tx_gas: u64, - mut state: &mut State, + state: &mut State, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, flashblocks_per_block: u64, @@ -552,7 +553,7 @@ where let tx_execution_start_time = Instant::now(); ctx.execute_best_transactions( info, - &mut state, + state, best_txs, (*total_gas_per_batch).min(ctx.block_gas_limit()), *total_da_per_batch, @@ -569,10 +570,10 @@ where // Caution: this assume that block cancel token only cancelled when new FCU is received if block_cancel.is_cancelled() { self.record_flashblocks_metrics( - &ctx, - &info, + ctx, + info, flashblocks_per_block, - &span, + span, "Payload building complete, channel closed or job cancelled", ); return Ok(()); @@ -588,11 +589,11 @@ where // If it is the last flashblocks, add the builder txn to the block if enabled if ctx.is_last_flashblock() { - ctx.add_builder_tx(info, &mut state, builder_tx_gas, message.clone()); + ctx.add_builder_tx(info, state, builder_tx_gas, message.clone()); }; let total_block_built_duration = Instant::now(); - let build_result = build_block(&mut state, &ctx, info); + let build_result = build_block(state, ctx, info); let total_block_built_duration = total_block_built_duration.elapsed(); ctx.metrics .total_block_built_duration @@ -618,10 +619,10 @@ where // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( - &ctx, - &info, + ctx, + info, flashblocks_per_block, - &span, + span, "Payload building complete, channel closed or job cancelled", ); return Ok(()); From fa1f41a1e59ad0db4ffeb5b7986913a1059f5013 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 12 Sep 2025 09:59:06 -0400 Subject: [PATCH 6/9] update flashblock timing to be correct, re-add child cancel --- .../src/builders/flashblocks/payload.rs | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 8d101156b..5f6d85939 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -399,24 +399,14 @@ where .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); - let mut timer = tokio::time::interval(self.config.specific.interval); - let first_flashblock_offset_signal = tokio::time::sleep(first_flashblock_offset); - - tokio::select! { - _ = first_flashblock_offset_signal => { - // wait for the first offset to pass - } - _ = block_cancel.cancelled() => { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building cancelled before starting", - ); - return Ok(()); - } - } + let mut timer = tokio::time::interval_at( + tokio::time::Instant::now() + .checked_add(first_flashblock_offset) + .expect("can add flashblock offset to current time"), + self.config.specific.interval, + ); + let sleep = tokio::time::sleep(Duration::from_secs(0)); + tokio::pin!(sleep); // Process flashblocks in a blocking loop loop { @@ -431,22 +421,8 @@ where }; let _entered = fb_span.enter(); - tokio::select! { - _ = timer.tick() => { - // time to build next flashblock - } - _ = block_cancel.cancelled() => { - self.record_flashblocks_metrics( - &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, channel closed or job cancelled", - ); - return Ok(()); - } - } - + // build base flashblock immediately + ctx.cancel = block_cancel.child_token(); match self.build_next_flashblock( &mut ctx, &mut info, @@ -476,6 +452,28 @@ where return Err(err); } } + + tokio::select! { + _ = &mut sleep => { + // immediately after the base flashblock, we want to start the first non-base flashblock + ctx.cancel.cancel(); + } + _ = timer.tick() => { + // this will tick at first_flashblock_offset, + // starting the second (non-base) flashblock + ctx.cancel.cancel(); + } + _ = block_cancel.cancelled() => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, channel closed or job cancelled", + ); + return Ok(()); + } + } } } From 47bb2df68508da3db9e7b5c97dc56a536728b562 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 12 Sep 2025 12:29:16 -0400 Subject: [PATCH 7/9] improve timing by spawning timer --- .../src/builders/flashblocks/payload.rs | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 5f6d85939..8f77a0f02 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -48,6 +48,7 @@ use std::{ sync::{Arc, OnceLock}, time::Instant, }; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; @@ -194,7 +195,7 @@ impl OpPayloadBuilder where Pool: PoolBounds, Client: ClientBounds, - BT: BuilderTx, + BT: BuilderTx + Send + Sync, { /// Constructs an Optimism payload from the transactions sent via the /// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in @@ -398,15 +399,36 @@ where self.pool .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); + let interval = self.config.specific.interval; + let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + + tokio::spawn({ + let block_cancel = block_cancel.clone(); + async move { + let mut timer = tokio::time::interval_at( + tokio::time::Instant::now() + .checked_add(first_flashblock_offset) + .expect("can add flashblock offset to current time"), + interval, + ); - let mut timer = tokio::time::interval_at( - tokio::time::Instant::now() - .checked_add(first_flashblock_offset) - .expect("can add flashblock offset to current time"), - self.config.specific.interval, - ); - let sleep = tokio::time::sleep(Duration::from_secs(0)); - tokio::pin!(sleep); + loop { + tokio::select! { + _ = timer.tick() => { + // this will tick at first_flashblock_offset, + // starting the second flashblock + if let Err(_) = tx.send(()).await { + // receiver channel was dropped, return + return; + } + } + _ = block_cancel.cancelled() => { + return; + } + } + } + } + }); // Process flashblocks in a blocking loop loop { @@ -421,7 +443,7 @@ where }; let _entered = fb_span.enter(); - // build base flashblock immediately + // build first flashblock immediately ctx.cancel = block_cancel.child_token(); match self.build_next_flashblock( &mut ctx, @@ -454,15 +476,10 @@ where } tokio::select! { - _ = &mut sleep => { - // immediately after the base flashblock, we want to start the first non-base flashblock - ctx.cancel.cancel(); - } - _ = timer.tick() => { - // this will tick at first_flashblock_offset, - // starting the second (non-base) flashblock + Some(()) = rx.recv() => { + // cancel current flashblock building, it's time to move to the next one ctx.cancel.cancel(); - } + }, _ = block_cancel.cancelled() => { self.record_flashblocks_metrics( &ctx, From fe780cfbc8628534dda1c8f6f5f2cee3a47686b3 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 12 Sep 2025 12:57:54 -0400 Subject: [PATCH 8/9] fix child cancellation token --- .../src/builders/flashblocks/payload.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 8f77a0f02..f099237a8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -401,9 +401,12 @@ where )); let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + let mut fb_cancel = block_cancel.child_token(); + ctx.cancel = fb_cancel.clone(); tokio::spawn({ let block_cancel = block_cancel.clone(); + async move { let mut timer = tokio::time::interval_at( tokio::time::Instant::now() @@ -415,10 +418,16 @@ where loop { tokio::select! { _ = timer.tick() => { + // cancel current payload building job + fb_cancel.cancel(); + fb_cancel = block_cancel.child_token(); // this will tick at first_flashblock_offset, // starting the second flashblock - if let Err(_) = tx.send(()).await { - // receiver channel was dropped, return + if tx.send(fb_cancel.clone()).await.is_err() { + // receiver channel was dropped, return. + // this will only happen if the `build_payload` function returns, + // due to payload building error or the main cancellation token being + // cancelled. return; } } @@ -444,7 +453,6 @@ where let _entered = fb_span.enter(); // build first flashblock immediately - ctx.cancel = block_cancel.child_token(); match self.build_next_flashblock( &mut ctx, &mut info, @@ -476,9 +484,8 @@ where } tokio::select! { - Some(()) = rx.recv() => { - // cancel current flashblock building, it's time to move to the next one - ctx.cancel.cancel(); + Some(fb_cancel) = rx.recv() => { + ctx.cancel = fb_cancel; }, _ = block_cancel.cancelled() => { self.record_flashblocks_metrics( From 4c8017f6b2aac1e5ac1691e008a570376575cca6 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 15 Sep 2025 13:53:13 -0400 Subject: [PATCH 9/9] move parameters to ctx, recalculate builder parameters in build_next_flashblock --- .../src/builders/flashblocks/payload.rs | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index f099237a8..bab5c04c6 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -74,36 +74,40 @@ struct ExtraExecutionInfo { #[derive(Debug, Default)] struct FlashblocksExtraCtx { /// Current flashblock index - pub flashblock_index: u64, - /// Target flashblock count - pub target_flashblock_count: u64, + flashblock_index: u64, + /// Target flashblock count per block + target_flashblock_count: u64, + total_gas_per_batch: u64, + total_da_per_batch: Option, + gas_per_batch: u64, + da_per_batch: Option, } impl OpPayloadBuilderCtx { /// Returns the current flashblock index - pub fn flashblock_index(&self) -> u64 { + pub(crate) fn flashblock_index(&self) -> u64 { self.extra_ctx.flashblock_index } /// Returns the target flashblock count - pub fn target_flashblock_count(&self) -> u64 { + pub(crate) fn target_flashblock_count(&self) -> u64 { self.extra_ctx.target_flashblock_count } /// Increments the flashblock index - pub fn increment_flashblock_index(&mut self) -> u64 { + pub(crate) fn increment_flashblock_index(&mut self) -> u64 { self.extra_ctx.flashblock_index += 1; self.extra_ctx.flashblock_index } /// Sets the target flashblock count - pub fn set_target_flashblock_count(&mut self, target_flashblock_count: u64) -> u64 { + pub(crate) fn set_target_flashblock_count(&mut self, target_flashblock_count: u64) -> u64 { self.extra_ctx.target_flashblock_count = target_flashblock_count; self.extra_ctx.target_flashblock_count } /// Returns if the flashblock is the last one - pub fn is_last_flashblock(&self) -> bool { + pub(crate) fn is_last_flashblock(&self) -> bool { self.flashblock_index() == self.target_flashblock_count() - 1 } } @@ -274,6 +278,10 @@ where extra_ctx: FlashblocksExtraCtx { flashblock_index: 0, target_flashblock_count: self.config.flashblocks_per_block(), + total_gas_per_batch: 0, + total_da_per_batch: None, + gas_per_batch: 0, + da_per_batch: None, }, max_gas_per_txn: self.config.max_gas_per_txn, address_gas_limiter: self.address_gas_limiter.clone(), @@ -372,7 +380,7 @@ where .first_flashblock_time_offset .record(first_flashblock_offset.as_millis() as f64); let gas_per_batch = ctx.block_gas_limit() / ctx.target_flashblock_count(); - let mut total_gas_per_batch = gas_per_batch; + let total_gas_per_batch = gas_per_batch; let da_per_batch = ctx .da_config .max_da_block_size() @@ -389,10 +397,13 @@ where let mut total_da_per_batch = da_per_batch; // Account for already included builder tx - total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); + ctx.extra_ctx.total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); if let Some(da_limit) = total_da_per_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); } + ctx.extra_ctx.total_da_per_batch = total_da_per_batch; + ctx.extra_ctx.gas_per_batch = gas_per_batch; + ctx.extra_ctx.da_per_batch = da_per_batch; // Create best_transaction iterator let mut best_txs = BestFlashblocksTxs::new(BestPayloadTransactions::new( @@ -456,18 +467,11 @@ where match self.build_next_flashblock( &mut ctx, &mut info, - &mut total_gas_per_batch, - &mut total_da_per_batch, - builder_tx_da_size, - builder_tx_gas, &mut state, &mut best_txs, &block_cancel, - flashblocks_per_block, message.clone(), &best_payload, - gas_per_batch, - da_per_batch, &fb_span, ) { Ok(()) => {} @@ -509,18 +513,11 @@ where &self, ctx: &mut OpPayloadBuilderCtx, info: &mut ExecutionInfo, - total_gas_per_batch: &mut u64, - total_da_per_batch: &mut Option, - builder_tx_da_size: u64, - builder_tx_gas: u64, state: &mut State, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, - flashblocks_per_block: u64, message: Vec, best_payload: &BlockCell, - gas_per_batch: u64, - da_per_batch: Option, span: &tracing::Span, ) -> Result<(), PayloadBuilderError> { // TODO: remove this @@ -535,7 +532,17 @@ where return Ok(()); }; + let builder_tx_gas = ctx + .builder_signer() + .map_or(0, |_| estimate_gas_for_builder_tx(message.clone())); + let builder_tx_da_size = ctx + .estimate_builder_tx_da_size(state, builder_tx_gas, message.clone()) + .unwrap_or(0); + // Continue with flashblock building + let mut total_gas_per_batch = ctx.extra_ctx.total_gas_per_batch; + let mut total_da_per_batch = ctx.extra_ctx.total_da_per_batch; + info!( target: "payload_builder", block_number = ctx.block_number(), @@ -549,7 +556,7 @@ where let flashblock_build_start_time = Instant::now(); // If it is the last flashblock, we need to account for the builder tx if ctx.is_last_flashblock() { - *total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); + total_gas_per_batch = total_gas_per_batch.saturating_sub(builder_tx_gas); // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size if let Some(da_limit) = total_da_per_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); @@ -577,8 +584,8 @@ where info, state, best_txs, - (*total_gas_per_batch).min(ctx.block_gas_limit()), - *total_da_per_batch, + total_gas_per_batch.min(ctx.block_gas_limit()), + total_da_per_batch, )?; // Extract last transactions let new_transactions = info.executed_transactions[info.extra.last_flashblock_index..] @@ -594,7 +601,7 @@ where self.record_flashblocks_metrics( ctx, info, - flashblocks_per_block, + ctx.target_flashblock_count(), span, "Payload building complete, channel closed or job cancelled", ); @@ -643,7 +650,7 @@ where self.record_flashblocks_metrics( ctx, info, - flashblocks_per_block, + ctx.target_flashblock_count(), span, "Payload building complete, channel closed or job cancelled", ); @@ -668,8 +675,8 @@ where best_payload.set(new_payload.clone()); self.send_payload_to_engine(new_payload); // Update bundle_state for next iteration - *total_gas_per_batch += gas_per_batch; - if let Some(da_limit) = da_per_batch { + total_gas_per_batch += ctx.extra_ctx.gas_per_batch; + if let Some(da_limit) = ctx.extra_ctx.da_per_batch { if let Some(da) = total_da_per_batch.as_mut() { *da += da_limit; } else { @@ -679,13 +686,16 @@ where } } + ctx.extra_ctx.total_gas_per_batch = total_gas_per_batch; + ctx.extra_ctx.total_da_per_batch = total_da_per_batch; + info!( target: "payload_builder", message = "Flashblock built", flashblock_count = ctx.flashblock_index(), current_gas = info.cumulative_gas_used, current_da = info.cumulative_da_bytes_used, - target_flashblocks = flashblocks_per_block, + target_flashblocks = ctx.target_flashblock_count(), ); } }