From d17f071eaf17ced0458a51aea04470660d008cc9 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Fri, 7 Nov 2025 16:47:30 -0500 Subject: [PATCH 01/17] wip: limit max depth of pending state --- crates/flashblocks-rpc/src/pending_blocks.rs | 4 ++++ crates/flashblocks-rpc/src/state.rs | 19 ++++++++++++++++-- crates/flashblocks-rpc/src/tests/rpc.rs | 2 +- crates/flashblocks-rpc/src/tests/state.rs | 2 +- crates/node/src/main.rs | 21 ++++++++++++++++++-- 5 files changed, 42 insertions(+), 6 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 17277650..79464c6d 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -153,6 +153,10 @@ impl PendingBlocks { BlockNumberOrTag::Number(self.headers.first().unwrap().number - 1) } + pub fn earliest_block_number(&self) -> BlockNumber { + self.headers.first().unwrap().number + } + pub fn latest_flashblock_index(&self) -> u64 { self.flashblocks.last().unwrap().index } diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 08d2a774..9909590b 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -72,13 +72,14 @@ where + Clone + 'static, { - pub fn new(client: Client) -> Self { + pub fn new(client: Client, max_pending_blocks_depth: u64) -> Self { let (tx, rx) = mpsc::unbounded_channel::(); let pending_blocks: Arc> = Arc::new(ArcSwapOption::new(None)); let (flashblock_sender, _) = broadcast::channel(BUFFER_SIZE); let state_processor = StateProcessor::new( client, pending_blocks.clone(), + max_pending_blocks_depth, Arc::new(Mutex::new(rx)), flashblock_sender.clone(), ); @@ -179,6 +180,7 @@ impl PendingBlocksAPI for Guard>> { struct StateProcessor { rx: Arc>>, pending_blocks: Arc>, + max_depth: u64, metrics: Metrics, client: Client, sender: Sender>, @@ -195,10 +197,11 @@ where fn new( client: Client, pending_blocks: Arc>, + max_depth: u64, rx: Arc>>, sender: Sender>, ) -> Self { - Self { metrics: Metrics::default(), pending_blocks, client, rx, sender } + Self { metrics: Metrics::default(), pending_blocks, client, max_depth, rx, sender } } async fn start(&self) { @@ -295,6 +298,18 @@ where return self.build_pending_state(None, &flashblocks); } + let pending_blocks_depth = + block.number - pending_blocks.earliest_block_number() - 1; + if pending_blocks_depth > self.max_depth { + debug!( + message = + "pending blocks depth exceeds max depth, resetting pending blocks", + pending_blocks_depth = pending_blocks_depth, + max_depth = self.max_depth, + ); + return self.build_pending_state(None, &flashblocks); + } + // If no reorg, we can continue building on top of the existing pending state self.build_pending_state(prev_pending_blocks, &flashblocks) } diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index b8bbdfe0..9c5cdaa4 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -126,7 +126,7 @@ mod tests { .extend_rpc_modules(move |ctx| { // We are not going to use the websocket connection to send payloads so we use // a dummy url. - let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone())); + let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone(), 5)); flashblocks_state.start(); let api_ext = EthApiExt::new( diff --git a/crates/flashblocks-rpc/src/tests/state.rs b/crates/flashblocks-rpc/src/tests/state.rs index 4244e2db..20005507 100644 --- a/crates/flashblocks-rpc/src/tests/state.rs +++ b/crates/flashblocks-rpc/src/tests/state.rs @@ -256,7 +256,7 @@ mod tests { .try_into_recovered() .expect("able to recover block"); - let flashblocks = FlashblocksState::new(provider.clone()); + let flashblocks = FlashblocksState::new(provider.clone(), 4); flashblocks.start(); flashblocks.on_canonical_block_received(&block); diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index c598ae3f..61f5170f 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -35,6 +35,13 @@ struct Args { #[arg(long = "websocket-url", value_name = "WEBSOCKET_URL")] pub websocket_url: Option, + #[arg( + long = "max-pending-blocks-depth", + value_name = "MAX_PENDING_BLOCKS_DEPTH", + default_value = "5" + )] + pub max_pending_blocks_depth: u64, + /// Enable transaction tracing ExEx for mempool-to-block timing analysis #[arg(long = "enable-transaction-tracing", value_name = "ENABLE_TRANSACTION_TRACING")] pub enable_transaction_tracing: bool, @@ -105,7 +112,12 @@ fn main() { let fb_cell = fb_cell.clone(); move |mut ctx| async move { let fb = fb_cell - .get_or_init(|| Arc::new(FlashblocksState::new(ctx.provider().clone()))) + .get_or_init(|| { + Arc::new(FlashblocksState::new( + ctx.provider().clone(), + args.max_pending_blocks_depth, + )) + }) .clone(); Ok(async move { while let Some(note) = ctx.notifications.try_next().await? { @@ -139,7 +151,12 @@ fn main() { )?; let fb = fb_cell - .get_or_init(|| Arc::new(FlashblocksState::new(ctx.provider().clone()))) + .get_or_init(|| { + Arc::new(FlashblocksState::new( + ctx.provider().clone(), + args.max_pending_blocks_depth, + )) + }) .clone(); fb.start(); From 1593145f5ba66c821e8801bc41758fb5e44afccc Mon Sep 17 00:00:00 2001 From: Haardik H Date: Fri, 7 Nov 2025 17:03:36 -0500 Subject: [PATCH 02/17] track pending snapshot height accurately --- crates/flashblocks-rpc/src/state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 9909590b..202bbfa3 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -258,12 +258,12 @@ where .filter(|fb| fb.metadata.block_number == block.number) .count(); self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); + self.metrics + .pending_snapshot_height + .set(pending_blocks.latest_block_number() as f64); if pending_blocks.latest_block_number() <= block.number { self.metrics.pending_clear_catchup.increment(1); - self.metrics - .pending_snapshot_height - .set(pending_blocks.latest_block_number() as f64); self.metrics .pending_snapshot_fb_index .set(pending_blocks.latest_flashblock_index() as f64); From 18ba14a1d84bdb552932385a7c2be350a15300f3 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 11:21:31 -0500 Subject: [PATCH 03/17] add additional debug logs --- crates/flashblocks-rpc/src/state.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 202bbfa3..781af27e 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -263,6 +263,11 @@ where .set(pending_blocks.latest_block_number() as f64); if pending_blocks.latest_block_number() <= block.number { + debug!( + message = "pending snapshot cleared because canonical caught up", + latest_pending_block = pending_blocks.latest_block_number(), + canonical_block = block.number, + ); self.metrics.pending_clear_catchup.increment(1); self.metrics .pending_snapshot_fb_index @@ -276,6 +281,18 @@ where tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); let block_txn_hashes: HashSet<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); + let pending_blocks_depth = + block.number - pending_blocks.earliest_block_number() - 1; + + debug!( + message = "canonical block behind latest pending block, checking for reorg and max depth", + latest_pending_block = pending_blocks.latest_block_number(), + canonical_block = block.number, + pending_txns_for_block = ?tracked_txn_hashes.len(), + canonical_txns_for_block = ?block_txn_hashes.len(), + pending_blocks_depth = pending_blocks_depth, + max_depth = self.max_depth, + ); flashblocks .retain(|flashblock| flashblock.metadata.block_number > block.number); @@ -298,8 +315,6 @@ where return self.build_pending_state(None, &flashblocks); } - let pending_blocks_depth = - block.number - pending_blocks.earliest_block_number() - 1; if pending_blocks_depth > self.max_depth { debug!( message = From f5630214e81aa754e1cbcc0d1b99af6eb5b6d36e Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 11:42:45 -0500 Subject: [PATCH 04/17] wi --- crates/flashblocks-rpc/src/state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 781af27e..e21506e9 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -287,6 +287,7 @@ where debug!( message = "canonical block behind latest pending block, checking for reorg and max depth", latest_pending_block = pending_blocks.latest_block_number(), + earliest_pending_block = pending_blocks.earliest_block_number(), canonical_block = block.number, pending_txns_for_block = ?tracked_txn_hashes.len(), canonical_txns_for_block = ?block_txn_hashes.len(), From 4b368eab5dfbb2bddca453fdfaff049ff56e3adf Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 12:07:33 -0500 Subject: [PATCH 05/17] remove unnecessary logs --- crates/flashblocks-rpc/src/state.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index e21506e9..95f89044 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -282,7 +282,7 @@ where let block_txn_hashes: HashSet<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); let pending_blocks_depth = - block.number - pending_blocks.earliest_block_number() - 1; + block.number - pending_blocks.earliest_block_number(); debug!( message = "canonical block behind latest pending block, checking for reorg and max depth", @@ -303,10 +303,6 @@ where { debug!( message = "reorg detected, recomputing pending flashblocks going ahead of reorg", - latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number, - tracked_txn_hashes_len = tracked_txn_hashes.len(), - block_txn_hashes_len = block_txn_hashes.len(), tracked_txn_hashes = ?tracked_txn_hashes, block_txn_hashes = ?block_txn_hashes, ); From ca667b7ec4d81ce3e2d81255e2c801266bac6793 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 13:12:37 -0500 Subject: [PATCH 06/17] add bunch of timing logs --- crates/flashblocks-rpc/src/state.rs | 215 +++++++++++++++++++++++++++- 1 file changed, 214 insertions(+), 1 deletion(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 95f89044..646baeaf 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -393,7 +393,33 @@ where prev_pending_blocks: Option>, flashblocks: &Vec, ) -> eyre::Result>> { + let fn_start = Instant::now(); + + // Timing tracking + let mut time_per_block_loop = std::time::Duration::ZERO; + let mut time_base_collection = std::time::Duration::ZERO; + let mut time_transactions_collection = std::time::Duration::ZERO; + let mut time_receipts_collection = std::time::Duration::ZERO; + let mut time_balances_collection = std::time::Duration::ZERO; + let mut time_execution_payload_build = std::time::Duration::ZERO; + let mut time_block_conversion = std::time::Duration::ZERO; + let mut time_evm_env_setup = std::time::Duration::ZERO; + let mut time_transaction_loop = std::time::Duration::ZERO; + let mut time_tx_sender_recovery = std::time::Duration::ZERO; + let mut time_tx_receipt_lookup = std::time::Duration::ZERO; + let mut time_tx_rpc_build = std::time::Duration::ZERO; + let mut time_tx_receipt_build = std::time::Duration::ZERO; + let mut time_tx_state_check = std::time::Duration::ZERO; + let mut time_tx_execution = std::time::Duration::ZERO; + let mut time_tx_state_commit = std::time::Duration::ZERO; + let mut time_balance_updates = std::time::Duration::ZERO; + + let mut total_transactions = 0; + let mut executed_transactions = 0; + let mut skipped_transactions = 0; + // BTreeMap guarantees ascending order of keys while iterating + let step_start = Instant::now(); let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { flashblocks_per_block @@ -401,32 +427,47 @@ where .or_default() .push(flashblock); } + let time_btree_construction = step_start.elapsed(); + let step_start = Instant::now(); let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( "Failed to extract header for canonical block number {}. This is okay if your node is not fully synced to tip yet.", canonical_block ))?; + let time_header_fetch = step_start.elapsed(); + let step_start = Instant::now(); let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); + let time_evm_config = step_start.elapsed(); + let step_start = Instant::now(); let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); + let time_state_provider_setup = step_start.elapsed(); + let step_start = Instant::now(); let mut db = match &prev_pending_blocks { Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, None => CacheDB::new(state), }; + let time_db_cache_setup = step_start.elapsed(); + + let step_start = Instant::now(); let mut state_overrides = match &prev_pending_blocks { Some(pending_blocks) => pending_blocks.get_state_overrides().unwrap_or_default(), None => StateOverride::default(), }; + let time_state_overrides_setup = step_start.elapsed(); for (_block_number, flashblocks) in flashblocks_per_block { + let block_start = Instant::now(); + + let step_start = Instant::now(); let base = flashblocks .first() .ok_or(eyre!("cannot build a pending block from no flashblocks"))? @@ -438,7 +479,9 @@ where .last() .cloned() .ok_or(eyre!("cannot build a pending block from no flashblocks"))?; + time_base_collection += step_start.elapsed(); + let step_start = Instant::now(); let transactions: Vec = flashblocks .iter() .flat_map(|flashblock| flashblock.diff.transactions.clone()) @@ -448,7 +491,9 @@ where .iter() .flat_map(|flashblock| flashblock.diff.withdrawals.clone()) .collect(); + time_transactions_collection += step_start.elapsed(); + let step_start = Instant::now(); let receipt_by_hash = flashblocks .iter() .map(|flashblock| flashblock.metadata.receipts.clone()) @@ -456,7 +501,9 @@ where acc.extend(receipts); acc }); + time_receipts_collection += step_start.elapsed(); + let step_start = Instant::now(); let updated_balances = flashblocks .iter() .map(|flashblock| flashblock.metadata.new_account_balances.clone()) @@ -464,7 +511,9 @@ where acc.extend(balances); acc }); + time_balances_collection += step_start.elapsed(); + let step_start = Instant::now(); pending_blocks_builder.with_flashblocks( flashblocks.iter().map(|&x| x.clone()).collect::>(), ); @@ -492,7 +541,9 @@ where }, }, }; + time_execution_payload_build += step_start.elapsed(); + let step_start = Instant::now(); let block: OpBlock = execution_payload.try_into_block()?; let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; let header = block.header.clone().seal_slow(); @@ -506,25 +557,36 @@ where parent_beacon_block_root: Some(base.parent_beacon_block_root), extra_data: base.extra_data.clone(), }; + time_block_conversion += step_start.elapsed(); + let step_start = Instant::now(); let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; let mut evm = evm_config.evm_with_env(db, evm_env); + time_evm_env_setup += step_start.elapsed(); let mut gas_used = 0; let mut next_log_index = 0; + let tx_loop_start = Instant::now(); for (idx, transaction) in block.body.transactions.iter().enumerate() { + total_transactions += 1; + + let step_start = Instant::now(); let sender = match transaction.recover_signer() { Ok(signer) => signer, Err(err) => return Err(err.into()), }; pending_blocks_builder.increment_nonce(sender); + time_tx_sender_recovery += step_start.elapsed(); + let step_start = Instant::now(); let receipt = receipt_by_hash .get(&transaction.tx_hash()) .cloned() .ok_or(eyre!("missing receipt for {:?}", transaction.tx_hash()))?; + time_tx_receipt_lookup += step_start.elapsed(); + let step_start = Instant::now(); let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); let envelope = recovered_transaction.clone().convert::(); @@ -564,7 +626,9 @@ where }; pending_blocks_builder.with_transaction(rpc_txn); + time_tx_rpc_build += step_start.elapsed(); + let step_start = Instant::now(); // Receipt Generation let meta = TransactionMeta { tx_hash: transaction.tx_hash(), @@ -594,7 +658,9 @@ where pending_blocks_builder.with_receipt(transaction.tx_hash(), op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); + time_tx_receipt_build += step_start.elapsed(); + let step_start = Instant::now(); let mut should_execute_transaction = false; match &prev_pending_blocks { Some(pending_blocks) => { @@ -612,10 +678,17 @@ where should_execute_transaction = true; } } + time_tx_state_check += step_start.elapsed(); if should_execute_transaction { + executed_transactions += 1; + + let step_start = Instant::now(); match evm.transact(recovered_transaction) { Ok(ResultAndState { state, .. }) => { + time_tx_execution += step_start.elapsed(); + + let step_start = Instant::now(); for (addr, acc) in &state { let existing_override = state_overrides.entry(*addr).or_insert(Default::default()); @@ -635,6 +708,7 @@ where pending_blocks_builder .with_transaction_state(transaction.tx_hash(), state.clone()); evm.db_mut().commit(state); + time_tx_state_commit += step_start.elapsed(); } Err(e) => { return Err(eyre!( @@ -645,20 +719,151 @@ where )); } } + } else { + skipped_transactions += 1; } } + time_transaction_loop += tx_loop_start.elapsed(); + let step_start = Instant::now(); for (address, balance) in updated_balances { pending_blocks_builder.with_account_balance(address, balance); } + time_balance_updates += step_start.elapsed(); db = evm.into_db(); last_block_header = block.header.clone(); + + time_per_block_loop += block_start.elapsed(); } + let step_start = Instant::now(); pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); - Ok(Some(Arc::new(pending_blocks_builder.build()?))) + let result = Some(Arc::new(pending_blocks_builder.build()?)); + let time_final_builder = step_start.elapsed(); + + let total_time = fn_start.elapsed(); + + // Pretty print timing breakdown + debug!( + "\n╔══════════════════════════════════════════════════════════════════════════════════════════════╗\n\ + ║ BUILD_PENDING_STATE TIMING BREAKDOWN ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TOTAL TIME: {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ INITIALIZATION ║\n\ + ║ • BTree Construction : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Header Fetch : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • EVM Config : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Provider Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • DB Cache Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Overrides Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ PER-BLOCK PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Base Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Transactions Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipts Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Balances Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Execution Payload Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Block Conversion : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • EVM Environment Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Balance Updates : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TRANSACTION PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Sender Recovery : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipt Lookup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • RPC Transaction Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipt Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Check : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Transaction Execution : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ + ║ • State Commit : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ FINALIZATION ║\n\ + ║ • Final Builder Operations : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TRANSACTION STATS ║\n\ + ║ • Total Transactions : {:>6} ║\n\ + ║ • Executed Transactions : {:>6} ({:>6.2}%) ║\n\ + ║ • Skipped Transactions : {:>6} ({:>6.2}%) ║\n\ + ║ • Avg Time per TX (executed) : {:>8.2?} ║\n\ + ╚══════════════════════════════════════════════════════════════════════════════════════════════╝", + total_time, + 100.0, + // Initialization + time_btree_construction, + percentage(time_btree_construction, total_time), + time_header_fetch, + percentage(time_header_fetch, total_time), + time_evm_config, + percentage(time_evm_config, total_time), + time_state_provider_setup, + percentage(time_state_provider_setup, total_time), + time_db_cache_setup, + percentage(time_db_cache_setup, total_time), + time_state_overrides_setup, + percentage(time_state_overrides_setup, total_time), + // Per-block processing + time_per_block_loop, + percentage(time_per_block_loop, total_time), + time_base_collection, + percentage(time_base_collection, total_time), + time_transactions_collection, + percentage(time_transactions_collection, total_time), + time_receipts_collection, + percentage(time_receipts_collection, total_time), + time_balances_collection, + percentage(time_balances_collection, total_time), + time_execution_payload_build, + percentage(time_execution_payload_build, total_time), + time_block_conversion, + percentage(time_block_conversion, total_time), + time_evm_env_setup, + percentage(time_evm_env_setup, total_time), + time_balance_updates, + percentage(time_balance_updates, total_time), + // Transaction processing + time_transaction_loop, + percentage(time_transaction_loop, total_time), + time_tx_sender_recovery, + percentage(time_tx_sender_recovery, total_time), + time_tx_receipt_lookup, + percentage(time_tx_receipt_lookup, total_time), + time_tx_rpc_build, + percentage(time_tx_rpc_build, total_time), + time_tx_receipt_build, + percentage(time_tx_receipt_build, total_time), + time_tx_state_check, + percentage(time_tx_state_check, total_time), + time_tx_execution, + percentage(time_tx_execution, total_time), + time_tx_state_commit, + percentage(time_tx_state_commit, total_time), + // Finalization + time_final_builder, + percentage(time_final_builder, total_time), + // Transaction stats + total_transactions, + executed_transactions, + if total_transactions > 0 { + (executed_transactions as f64 / total_transactions as f64) * 100.0 + } else { + 0.0 + }, + skipped_transactions, + if total_transactions > 0 { + (skipped_transactions as f64 / total_transactions as f64) * 100.0 + } else { + 0.0 + }, + if executed_transactions > 0 { + time_tx_execution / executed_transactions as u32 + } else { + std::time::Duration::ZERO + }, + ); + + Ok(result) } fn is_next_flashblock( @@ -676,3 +881,11 @@ where is_next_of_block || is_first_of_next_block } } + +fn percentage(duration: std::time::Duration, total: std::time::Duration) -> f64 { + if total.as_nanos() > 0 { + (duration.as_nanos() as f64 / total.as_nanos() as f64) * 100.0 + } else { + 0.0 + } +} From 99329e636bd60dd953c62a6c19ba317b6522427c Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 14:30:19 -0500 Subject: [PATCH 07/17] . --- crates/flashblocks-rpc/src/state.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 646baeaf..5c945a79 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -295,9 +295,6 @@ where max_depth = self.max_depth, ); - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); - if tracked_txn_hashes.len() != block_txn_hashes.len() || tracked_txn_hashes != block_txn_hashes { @@ -309,6 +306,8 @@ where self.metrics.pending_clear_reorg.increment(1); // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state + flashblocks + .retain(|flashblock| flashblock.metadata.block_number > block.number); return self.build_pending_state(None, &flashblocks); } @@ -319,10 +318,14 @@ where pending_blocks_depth = pending_blocks_depth, max_depth = self.max_depth, ); + + flashblocks + .retain(|flashblock| flashblock.metadata.block_number > block.number); return self.build_pending_state(None, &flashblocks); } // If no reorg, we can continue building on top of the existing pending state + // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number self.build_pending_state(prev_pending_blocks, &flashblocks) } } From 3c54c9c523d262fc9932fbcc9ff6b3ef6c1331ea Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 15:09:01 -0500 Subject: [PATCH 08/17] . --- crates/flashblocks-rpc/src/pending_blocks.rs | 25 ++++++++++++++++++-- crates/flashblocks-rpc/src/state.rs | 7 +++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 79464c6d..4defb69e 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,3 +1,5 @@ +use crate::subscription::Flashblock; +use alloy_consensus::transaction::SignerRecoverable; use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{ @@ -8,13 +10,12 @@ use alloy_provider::network::TransactionResponse; use alloy_rpc_types::{state::StateOverride, BlockTransactions}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use eyre::eyre; +use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{db::Cache, state::EvmState}; use reth_rpc_eth_api::RpcBlock; -use crate::subscription::Flashblock; - pub struct PendingBlocksBuilder { flashblocks: Vec, headers: Vec>, @@ -25,6 +26,7 @@ pub struct PendingBlocksBuilder { transaction_receipts: HashMap, transactions_by_hash: HashMap, transaction_state: HashMap, + transaction_senders: HashMap, state_overrides: Option, db_cache: Cache, @@ -41,6 +43,7 @@ impl PendingBlocksBuilder { transaction_receipts: HashMap::new(), transactions_by_hash: HashMap::new(), transaction_state: HashMap::new(), + transaction_senders: HashMap::new(), state_overrides: None, db_cache: Cache::default(), } @@ -77,6 +80,12 @@ impl PendingBlocksBuilder { self } + #[inline] + pub(crate) fn with_transaction_sender(&mut self, hash: B256, sender: Address) -> &Self { + self.transaction_senders.insert(hash, sender); + self + } + #[inline] pub(crate) fn increment_nonce(&mut self, sender: Address) -> &Self { let zero = U256::from(0); @@ -122,6 +131,7 @@ impl PendingBlocksBuilder { transaction_receipts: self.transaction_receipts, transactions_by_hash: self.transactions_by_hash, transaction_state: self.transaction_state, + transaction_senders: self.transaction_senders, state_overrides: self.state_overrides, db_cache: self.db_cache, }) @@ -139,6 +149,7 @@ pub struct PendingBlocks { transaction_receipts: HashMap, transactions_by_hash: HashMap, transaction_state: HashMap, + transaction_senders: HashMap, state_overrides: Option, db_cache: Cache, @@ -173,6 +184,16 @@ impl PendingBlocks { self.transaction_state.get(&hash).cloned() } + pub fn get_transaction_sender(&self, tx: &OpTxEnvelope) -> eyre::Result
{ + let hash = tx.tx_hash(); + + Ok(self + .transaction_senders + .get(&hash) + .cloned() + .unwrap_or(tx.recover_signer()?)) + } + pub fn get_db_cache(&self) -> Cache { self.db_cache.clone() } diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 5c945a79..d8a6ab7e 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -575,10 +575,11 @@ where total_transactions += 1; let step_start = Instant::now(); - let sender = match transaction.recover_signer() { - Ok(signer) => signer, - Err(err) => return Err(err.into()), + let sender = match &prev_pending_blocks { + Some(pending_blocks) => pending_blocks.get_transaction_sender(transaction)?, + None => transaction.recover_signer()?, }; + pending_blocks_builder.with_transaction_sender(transaction.tx_hash(), sender); pending_blocks_builder.increment_nonce(sender); time_tx_sender_recovery += step_start.elapsed(); From 59aee9b7bcd9fec54e49bbaad5a2702f24a459cd Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 15:56:02 -0500 Subject: [PATCH 09/17] . --- crates/flashblocks-rpc/src/pending_blocks.rs | 16 ++------ crates/flashblocks-rpc/src/state.rs | 40 ++++++++++---------- crates/node/src/main.rs | 2 +- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 4defb69e..23aac9e3 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,5 +1,4 @@ use crate::subscription::Flashblock; -use alloy_consensus::transaction::SignerRecoverable; use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{ @@ -10,7 +9,6 @@ use alloy_provider::network::TransactionResponse; use alloy_rpc_types::{state::StateOverride, BlockTransactions}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use eyre::eyre; -use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{db::Cache, state::EvmState}; @@ -180,18 +178,12 @@ impl PendingBlocks { self.flashblocks.clone() } - pub fn get_transaction_state(&self, hash: B256) -> Option { - self.transaction_state.get(&hash).cloned() + pub fn get_transaction_state(&self, hash: &B256) -> Option { + self.transaction_state.get(hash).cloned() } - pub fn get_transaction_sender(&self, tx: &OpTxEnvelope) -> eyre::Result
{ - let hash = tx.tx_hash(); - - Ok(self - .transaction_senders - .get(&hash) - .cloned() - .unwrap_or(tx.recover_signer()?)) + pub fn get_transaction_sender(&self, tx_hash: &B256) -> Option
{ + self.transaction_senders.get(tx_hash).cloned() } pub fn get_db_cache(&self) -> Cache { diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index d8a6ab7e..299c76e0 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -573,19 +573,21 @@ where let tx_loop_start = Instant::now(); for (idx, transaction) in block.body.transactions.iter().enumerate() { total_transactions += 1; + let tx_hash = transaction.tx_hash(); let step_start = Instant::now(); - let sender = match &prev_pending_blocks { - Some(pending_blocks) => pending_blocks.get_transaction_sender(transaction)?, - None => transaction.recover_signer()?, - }; + let sender = prev_pending_blocks + .as_ref() + .and_then(|pb| pb.get_transaction_sender(&tx_hash)) + .unwrap_or(transaction.recover_signer()?); + pending_blocks_builder.with_transaction_sender(transaction.tx_hash(), sender); pending_blocks_builder.increment_nonce(sender); time_tx_sender_recovery += step_start.elapsed(); let step_start = Instant::now(); let receipt = receipt_by_hash - .get(&transaction.tx_hash()) + .get(&tx_hash) .cloned() .ok_or(eyre!("missing receipt for {:?}", transaction.tx_hash()))?; time_tx_receipt_lookup += step_start.elapsed(); @@ -634,8 +636,8 @@ where let step_start = Instant::now(); // Receipt Generation - let meta = TransactionMeta { - tx_hash: transaction.tx_hash(), + let meta: TransactionMeta = TransactionMeta { + tx_hash, index: idx as u64, block_hash: header.hash(), block_number: block.number, @@ -659,7 +661,7 @@ where )? .build(); - pending_blocks_builder.with_receipt(transaction.tx_hash(), op_receipt); + pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); time_tx_receipt_build += step_start.elapsed(); @@ -667,17 +669,14 @@ where let step_start = Instant::now(); let mut should_execute_transaction = false; match &prev_pending_blocks { - Some(pending_blocks) => { - match pending_blocks.get_transaction_state(transaction.tx_hash()) { - Some(state) => { - pending_blocks_builder - .with_transaction_state(transaction.tx_hash(), state); - } - None => { - should_execute_transaction = true; - } + Some(pending_blocks) => match pending_blocks.get_transaction_state(&tx_hash) { + Some(state) => { + pending_blocks_builder.with_transaction_state(tx_hash, state); } - } + None => { + should_execute_transaction = true; + } + }, None => { should_execute_transaction = true; } @@ -709,8 +708,7 @@ where existing.extend(changed_slots); } - pending_blocks_builder - .with_transaction_state(transaction.tx_hash(), state.clone()); + pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); evm.db_mut().commit(state); time_tx_state_commit += step_start.elapsed(); } @@ -718,7 +716,7 @@ where return Err(eyre!( "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", e, - transaction.tx_hash(), + tx_hash, sender )); } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 61f5170f..740f912c 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -38,7 +38,7 @@ struct Args { #[arg( long = "max-pending-blocks-depth", value_name = "MAX_PENDING_BLOCKS_DEPTH", - default_value = "5" + default_value = "3" )] pub max_pending_blocks_depth: u64, From 2b488007bb94d547131e0043bb7f3733d6013df5 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Mon, 10 Nov 2025 16:40:09 -0500 Subject: [PATCH 10/17] remove debug logs --- crates/flashblocks-rpc/src/state.rs | 217 +--------------------------- 1 file changed, 3 insertions(+), 214 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 299c76e0..32d51fd3 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -396,33 +396,7 @@ where prev_pending_blocks: Option>, flashblocks: &Vec, ) -> eyre::Result>> { - let fn_start = Instant::now(); - - // Timing tracking - let mut time_per_block_loop = std::time::Duration::ZERO; - let mut time_base_collection = std::time::Duration::ZERO; - let mut time_transactions_collection = std::time::Duration::ZERO; - let mut time_receipts_collection = std::time::Duration::ZERO; - let mut time_balances_collection = std::time::Duration::ZERO; - let mut time_execution_payload_build = std::time::Duration::ZERO; - let mut time_block_conversion = std::time::Duration::ZERO; - let mut time_evm_env_setup = std::time::Duration::ZERO; - let mut time_transaction_loop = std::time::Duration::ZERO; - let mut time_tx_sender_recovery = std::time::Duration::ZERO; - let mut time_tx_receipt_lookup = std::time::Duration::ZERO; - let mut time_tx_rpc_build = std::time::Duration::ZERO; - let mut time_tx_receipt_build = std::time::Duration::ZERO; - let mut time_tx_state_check = std::time::Duration::ZERO; - let mut time_tx_execution = std::time::Duration::ZERO; - let mut time_tx_state_commit = std::time::Duration::ZERO; - let mut time_balance_updates = std::time::Duration::ZERO; - - let mut total_transactions = 0; - let mut executed_transactions = 0; - let mut skipped_transactions = 0; - // BTreeMap guarantees ascending order of keys while iterating - let step_start = Instant::now(); let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { flashblocks_per_block @@ -430,47 +404,31 @@ where .or_default() .push(flashblock); } - let time_btree_construction = step_start.elapsed(); - let step_start = Instant::now(); let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( - "Failed to extract header for canonical block number {}. This is okay if your node is not fully synced to tip yet.", + "Failed to extract header for canonical block number {}. This is okay if your node is not fully synced to tip yet or was recently restarted.", canonical_block ))?; - let time_header_fetch = step_start.elapsed(); - let step_start = Instant::now(); let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); - let time_evm_config = step_start.elapsed(); - - let step_start = Instant::now(); let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); - let time_state_provider_setup = step_start.elapsed(); - let step_start = Instant::now(); let mut db = match &prev_pending_blocks { Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, None => CacheDB::new(state), }; - let time_db_cache_setup = step_start.elapsed(); - - let step_start = Instant::now(); let mut state_overrides = match &prev_pending_blocks { Some(pending_blocks) => pending_blocks.get_state_overrides().unwrap_or_default(), None => StateOverride::default(), }; - let time_state_overrides_setup = step_start.elapsed(); for (_block_number, flashblocks) in flashblocks_per_block { - let block_start = Instant::now(); - - let step_start = Instant::now(); let base = flashblocks .first() .ok_or(eyre!("cannot build a pending block from no flashblocks"))? @@ -482,9 +440,7 @@ where .last() .cloned() .ok_or(eyre!("cannot build a pending block from no flashblocks"))?; - time_base_collection += step_start.elapsed(); - let step_start = Instant::now(); let transactions: Vec = flashblocks .iter() .flat_map(|flashblock| flashblock.diff.transactions.clone()) @@ -494,9 +450,7 @@ where .iter() .flat_map(|flashblock| flashblock.diff.withdrawals.clone()) .collect(); - time_transactions_collection += step_start.elapsed(); - let step_start = Instant::now(); let receipt_by_hash = flashblocks .iter() .map(|flashblock| flashblock.metadata.receipts.clone()) @@ -504,9 +458,7 @@ where acc.extend(receipts); acc }); - time_receipts_collection += step_start.elapsed(); - let step_start = Instant::now(); let updated_balances = flashblocks .iter() .map(|flashblock| flashblock.metadata.new_account_balances.clone()) @@ -514,9 +466,7 @@ where acc.extend(balances); acc }); - time_balances_collection += step_start.elapsed(); - let step_start = Instant::now(); pending_blocks_builder.with_flashblocks( flashblocks.iter().map(|&x| x.clone()).collect::>(), ); @@ -544,9 +494,7 @@ where }, }, }; - time_execution_payload_build += step_start.elapsed(); - let step_start = Instant::now(); let block: OpBlock = execution_payload.try_into_block()?; let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; let header = block.header.clone().seal_slow(); @@ -560,39 +508,29 @@ where parent_beacon_block_root: Some(base.parent_beacon_block_root), extra_data: base.extra_data.clone(), }; - time_block_conversion += step_start.elapsed(); - let step_start = Instant::now(); let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; let mut evm = evm_config.evm_with_env(db, evm_env); - time_evm_env_setup += step_start.elapsed(); let mut gas_used = 0; let mut next_log_index = 0; - let tx_loop_start = Instant::now(); for (idx, transaction) in block.body.transactions.iter().enumerate() { - total_transactions += 1; let tx_hash = transaction.tx_hash(); - let step_start = Instant::now(); let sender = prev_pending_blocks .as_ref() .and_then(|pb| pb.get_transaction_sender(&tx_hash)) .unwrap_or(transaction.recover_signer()?); - pending_blocks_builder.with_transaction_sender(transaction.tx_hash(), sender); + pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); - time_tx_sender_recovery += step_start.elapsed(); - let step_start = Instant::now(); let receipt = receipt_by_hash .get(&tx_hash) .cloned() - .ok_or(eyre!("missing receipt for {:?}", transaction.tx_hash()))?; - time_tx_receipt_lookup += step_start.elapsed(); + .ok_or(eyre!("missing receipt for {:?}", tx_hash))?; - let step_start = Instant::now(); let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); let envelope = recovered_transaction.clone().convert::(); @@ -632,9 +570,7 @@ where }; pending_blocks_builder.with_transaction(rpc_txn); - time_tx_rpc_build += step_start.elapsed(); - let step_start = Instant::now(); // Receipt Generation let meta: TransactionMeta = TransactionMeta { tx_hash, @@ -664,9 +600,7 @@ where pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); - time_tx_receipt_build += step_start.elapsed(); - let step_start = Instant::now(); let mut should_execute_transaction = false; match &prev_pending_blocks { Some(pending_blocks) => match pending_blocks.get_transaction_state(&tx_hash) { @@ -681,17 +615,10 @@ where should_execute_transaction = true; } } - time_tx_state_check += step_start.elapsed(); if should_execute_transaction { - executed_transactions += 1; - - let step_start = Instant::now(); match evm.transact(recovered_transaction) { Ok(ResultAndState { state, .. }) => { - time_tx_execution += step_start.elapsed(); - - let step_start = Instant::now(); for (addr, acc) in &state { let existing_override = state_overrides.entry(*addr).or_insert(Default::default()); @@ -710,7 +637,6 @@ where } pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); evm.db_mut().commit(state); - time_tx_state_commit += step_start.elapsed(); } Err(e) => { return Err(eyre!( @@ -721,149 +647,20 @@ where )); } } - } else { - skipped_transactions += 1; } } - time_transaction_loop += tx_loop_start.elapsed(); - let step_start = Instant::now(); for (address, balance) in updated_balances { pending_blocks_builder.with_account_balance(address, balance); } - time_balance_updates += step_start.elapsed(); db = evm.into_db(); last_block_header = block.header.clone(); - - time_per_block_loop += block_start.elapsed(); } - let step_start = Instant::now(); pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); let result = Some(Arc::new(pending_blocks_builder.build()?)); - let time_final_builder = step_start.elapsed(); - - let total_time = fn_start.elapsed(); - - // Pretty print timing breakdown - debug!( - "\n╔══════════════════════════════════════════════════════════════════════════════════════════════╗\n\ - ║ BUILD_PENDING_STATE TIMING BREAKDOWN ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TOTAL TIME: {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ INITIALIZATION ║\n\ - ║ • BTree Construction : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Header Fetch : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • EVM Config : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Provider Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • DB Cache Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Overrides Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ PER-BLOCK PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Base Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Transactions Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipts Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Balances Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Execution Payload Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Block Conversion : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • EVM Environment Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Balance Updates : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TRANSACTION PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Sender Recovery : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipt Lookup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • RPC Transaction Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipt Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Check : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Transaction Execution : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ - ║ • State Commit : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ FINALIZATION ║\n\ - ║ • Final Builder Operations : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TRANSACTION STATS ║\n\ - ║ • Total Transactions : {:>6} ║\n\ - ║ • Executed Transactions : {:>6} ({:>6.2}%) ║\n\ - ║ • Skipped Transactions : {:>6} ({:>6.2}%) ║\n\ - ║ • Avg Time per TX (executed) : {:>8.2?} ║\n\ - ╚══════════════════════════════════════════════════════════════════════════════════════════════╝", - total_time, - 100.0, - // Initialization - time_btree_construction, - percentage(time_btree_construction, total_time), - time_header_fetch, - percentage(time_header_fetch, total_time), - time_evm_config, - percentage(time_evm_config, total_time), - time_state_provider_setup, - percentage(time_state_provider_setup, total_time), - time_db_cache_setup, - percentage(time_db_cache_setup, total_time), - time_state_overrides_setup, - percentage(time_state_overrides_setup, total_time), - // Per-block processing - time_per_block_loop, - percentage(time_per_block_loop, total_time), - time_base_collection, - percentage(time_base_collection, total_time), - time_transactions_collection, - percentage(time_transactions_collection, total_time), - time_receipts_collection, - percentage(time_receipts_collection, total_time), - time_balances_collection, - percentage(time_balances_collection, total_time), - time_execution_payload_build, - percentage(time_execution_payload_build, total_time), - time_block_conversion, - percentage(time_block_conversion, total_time), - time_evm_env_setup, - percentage(time_evm_env_setup, total_time), - time_balance_updates, - percentage(time_balance_updates, total_time), - // Transaction processing - time_transaction_loop, - percentage(time_transaction_loop, total_time), - time_tx_sender_recovery, - percentage(time_tx_sender_recovery, total_time), - time_tx_receipt_lookup, - percentage(time_tx_receipt_lookup, total_time), - time_tx_rpc_build, - percentage(time_tx_rpc_build, total_time), - time_tx_receipt_build, - percentage(time_tx_receipt_build, total_time), - time_tx_state_check, - percentage(time_tx_state_check, total_time), - time_tx_execution, - percentage(time_tx_execution, total_time), - time_tx_state_commit, - percentage(time_tx_state_commit, total_time), - // Finalization - time_final_builder, - percentage(time_final_builder, total_time), - // Transaction stats - total_transactions, - executed_transactions, - if total_transactions > 0 { - (executed_transactions as f64 / total_transactions as f64) * 100.0 - } else { - 0.0 - }, - skipped_transactions, - if total_transactions > 0 { - (skipped_transactions as f64 / total_transactions as f64) * 100.0 - } else { - 0.0 - }, - if executed_transactions > 0 { - time_tx_execution / executed_transactions as u32 - } else { - std::time::Duration::ZERO - }, - ); Ok(result) } @@ -883,11 +680,3 @@ where is_next_of_block || is_first_of_next_block } } - -fn percentage(duration: std::time::Duration, total: std::time::Duration) -> f64 { - if total.as_nanos() > 0 { - (duration.as_nanos() as f64 / total.as_nanos() as f64) * 100.0 - } else { - 0.0 - } -} From d4c1334af5f963244fae84af22fa3e332b15b1db Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 09:20:50 -0500 Subject: [PATCH 11/17] update error log --- crates/flashblocks-rpc/src/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 32d51fd3..8b8614f4 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -408,7 +408,7 @@ where let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( - "Failed to extract header for canonical block number {}. This is okay if your node is not fully synced to tip yet or was recently restarted.", + "Failed to extract header for canonical block number {}. Allow 10 to 15 minutes after a restart/redeploy or after your node has synced to tip.", canonical_block ))?; From f90ca19078e13857517710f3940c8894166961ac Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 11:13:44 -0500 Subject: [PATCH 12/17] re-add logs --- crates/flashblocks-rpc/src/state.rs | 211 ++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 8b8614f4..64377b0b 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -396,7 +396,33 @@ where prev_pending_blocks: Option>, flashblocks: &Vec, ) -> eyre::Result>> { + let fn_start = Instant::now(); + + // Timing tracking + let mut time_per_block_loop = std::time::Duration::ZERO; + let mut time_base_collection = std::time::Duration::ZERO; + let mut time_transactions_collection = std::time::Duration::ZERO; + let mut time_receipts_collection = std::time::Duration::ZERO; + let mut time_balances_collection = std::time::Duration::ZERO; + let mut time_execution_payload_build = std::time::Duration::ZERO; + let mut time_block_conversion = std::time::Duration::ZERO; + let mut time_evm_env_setup = std::time::Duration::ZERO; + let mut time_transaction_loop = std::time::Duration::ZERO; + let mut time_tx_sender_recovery = std::time::Duration::ZERO; + let mut time_tx_receipt_lookup = std::time::Duration::ZERO; + let mut time_tx_rpc_build = std::time::Duration::ZERO; + let mut time_tx_receipt_build = std::time::Duration::ZERO; + let mut time_tx_state_check = std::time::Duration::ZERO; + let mut time_tx_execution = std::time::Duration::ZERO; + let mut time_tx_state_commit = std::time::Duration::ZERO; + let mut time_balance_updates = std::time::Duration::ZERO; + + let mut total_transactions = 0; + let mut executed_transactions = 0; + let mut skipped_transactions = 0; + // BTreeMap guarantees ascending order of keys while iterating + let step_start = Instant::now(); let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { flashblocks_per_block @@ -404,31 +430,47 @@ where .or_default() .push(flashblock); } + let time_btree_construction = step_start.elapsed(); + let step_start = Instant::now(); let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( "Failed to extract header for canonical block number {}. Allow 10 to 15 minutes after a restart/redeploy or after your node has synced to tip.", canonical_block ))?; + let time_header_fetch = step_start.elapsed(); + let step_start = Instant::now(); let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); + let time_evm_config = step_start.elapsed(); + + let step_start = Instant::now(); let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); + let time_state_provider_setup = step_start.elapsed(); + let step_start = Instant::now(); let mut db = match &prev_pending_blocks { Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, None => CacheDB::new(state), }; + let time_db_cache_setup = step_start.elapsed(); + + let step_start = Instant::now(); let mut state_overrides = match &prev_pending_blocks { Some(pending_blocks) => pending_blocks.get_state_overrides().unwrap_or_default(), None => StateOverride::default(), }; + let time_state_overrides_setup = step_start.elapsed(); for (_block_number, flashblocks) in flashblocks_per_block { + let block_start = Instant::now(); + + let step_start = Instant::now(); let base = flashblocks .first() .ok_or(eyre!("cannot build a pending block from no flashblocks"))? @@ -440,7 +482,9 @@ where .last() .cloned() .ok_or(eyre!("cannot build a pending block from no flashblocks"))?; + time_base_collection += step_start.elapsed(); + let step_start = Instant::now(); let transactions: Vec = flashblocks .iter() .flat_map(|flashblock| flashblock.diff.transactions.clone()) @@ -450,7 +494,9 @@ where .iter() .flat_map(|flashblock| flashblock.diff.withdrawals.clone()) .collect(); + time_transactions_collection += step_start.elapsed(); + let step_start = Instant::now(); let receipt_by_hash = flashblocks .iter() .map(|flashblock| flashblock.metadata.receipts.clone()) @@ -458,7 +504,9 @@ where acc.extend(receipts); acc }); + time_receipts_collection += step_start.elapsed(); + let step_start = Instant::now(); let updated_balances = flashblocks .iter() .map(|flashblock| flashblock.metadata.new_account_balances.clone()) @@ -466,7 +514,9 @@ where acc.extend(balances); acc }); + time_balances_collection += step_start.elapsed(); + let step_start = Instant::now(); pending_blocks_builder.with_flashblocks( flashblocks.iter().map(|&x| x.clone()).collect::>(), ); @@ -494,7 +544,9 @@ where }, }, }; + time_execution_payload_build += step_start.elapsed(); + let step_start = Instant::now(); let block: OpBlock = execution_payload.try_into_block()?; let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; let header = block.header.clone().seal_slow(); @@ -508,16 +560,22 @@ where parent_beacon_block_root: Some(base.parent_beacon_block_root), extra_data: base.extra_data.clone(), }; + time_block_conversion += step_start.elapsed(); + let step_start = Instant::now(); let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; let mut evm = evm_config.evm_with_env(db, evm_env); + time_evm_env_setup += step_start.elapsed(); let mut gas_used = 0; let mut next_log_index = 0; + let tx_loop_start = Instant::now(); for (idx, transaction) in block.body.transactions.iter().enumerate() { + total_transactions += 1; let tx_hash = transaction.tx_hash(); + let step_start = Instant::now(); let sender = prev_pending_blocks .as_ref() .and_then(|pb| pb.get_transaction_sender(&tx_hash)) @@ -525,12 +583,16 @@ where pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); + time_tx_sender_recovery += step_start.elapsed(); + let step_start = Instant::now(); let receipt = receipt_by_hash .get(&tx_hash) .cloned() .ok_or(eyre!("missing receipt for {:?}", tx_hash))?; + time_tx_receipt_lookup += step_start.elapsed(); + let step_start = Instant::now(); let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); let envelope = recovered_transaction.clone().convert::(); @@ -570,7 +632,9 @@ where }; pending_blocks_builder.with_transaction(rpc_txn); + time_tx_rpc_build += step_start.elapsed(); + let step_start = Instant::now(); // Receipt Generation let meta: TransactionMeta = TransactionMeta { tx_hash, @@ -600,7 +664,9 @@ where pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); + time_tx_receipt_build += step_start.elapsed(); + let step_start = Instant::now(); let mut should_execute_transaction = false; match &prev_pending_blocks { Some(pending_blocks) => match pending_blocks.get_transaction_state(&tx_hash) { @@ -615,10 +681,17 @@ where should_execute_transaction = true; } } + time_tx_state_check += step_start.elapsed(); if should_execute_transaction { + executed_transactions += 1; + + let step_start = Instant::now(); match evm.transact(recovered_transaction) { Ok(ResultAndState { state, .. }) => { + time_tx_execution += step_start.elapsed(); + + let step_start = Instant::now(); for (addr, acc) in &state { let existing_override = state_overrides.entry(*addr).or_insert(Default::default()); @@ -637,6 +710,7 @@ where } pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); evm.db_mut().commit(state); + time_tx_state_commit += step_start.elapsed(); } Err(e) => { return Err(eyre!( @@ -647,20 +721,149 @@ where )); } } + } else { + skipped_transactions += 1; } } + time_transaction_loop += tx_loop_start.elapsed(); + let step_start = Instant::now(); for (address, balance) in updated_balances { pending_blocks_builder.with_account_balance(address, balance); } + time_balance_updates += step_start.elapsed(); db = evm.into_db(); last_block_header = block.header.clone(); + + time_per_block_loop += block_start.elapsed(); } + let step_start = Instant::now(); pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); let result = Some(Arc::new(pending_blocks_builder.build()?)); + let time_final_builder = step_start.elapsed(); + + let total_time = fn_start.elapsed(); + + // Pretty print timing breakdown + debug!( + "\n╔══════════════════════════════════════════════════════════════════════════════════════════════╗\n\ + ║ BUILD_PENDING_STATE TIMING BREAKDOWN ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TOTAL TIME: {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ INITIALIZATION ║\n\ + ║ • BTree Construction : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Header Fetch : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • EVM Config : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Provider Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • DB Cache Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Overrides Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ PER-BLOCK PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Base Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Transactions Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipts Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Balances Collection : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Execution Payload Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Block Conversion : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • EVM Environment Setup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Balance Updates : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TRANSACTION PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Sender Recovery : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipt Lookup : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • RPC Transaction Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Receipt Build : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • State Check : {:>8.2?} ({:>6.2}%) ║\n\ + ║ • Transaction Execution : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ + ║ • State Commit : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ FINALIZATION ║\n\ + ║ • Final Builder Operations : {:>8.2?} ({:>6.2}%) ║\n\ + ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ + ║ TRANSACTION STATS ║\n\ + ║ • Total Transactions : {:>6} ║\n\ + ║ • Executed Transactions : {:>6} ({:>6.2}%) ║\n\ + ║ • Skipped Transactions : {:>6} ({:>6.2}%) ║\n\ + ║ • Avg Time per TX (executed) : {:>8.2?} ║\n\ + ╚══════════════════════════════════════════════════════════════════════════════════════════════╝", + total_time, + 100.0, + // Initialization + time_btree_construction, + percentage(time_btree_construction, total_time), + time_header_fetch, + percentage(time_header_fetch, total_time), + time_evm_config, + percentage(time_evm_config, total_time), + time_state_provider_setup, + percentage(time_state_provider_setup, total_time), + time_db_cache_setup, + percentage(time_db_cache_setup, total_time), + time_state_overrides_setup, + percentage(time_state_overrides_setup, total_time), + // Per-block processing + time_per_block_loop, + percentage(time_per_block_loop, total_time), + time_base_collection, + percentage(time_base_collection, total_time), + time_transactions_collection, + percentage(time_transactions_collection, total_time), + time_receipts_collection, + percentage(time_receipts_collection, total_time), + time_balances_collection, + percentage(time_balances_collection, total_time), + time_execution_payload_build, + percentage(time_execution_payload_build, total_time), + time_block_conversion, + percentage(time_block_conversion, total_time), + time_evm_env_setup, + percentage(time_evm_env_setup, total_time), + time_balance_updates, + percentage(time_balance_updates, total_time), + // Transaction processing + time_transaction_loop, + percentage(time_transaction_loop, total_time), + time_tx_sender_recovery, + percentage(time_tx_sender_recovery, total_time), + time_tx_receipt_lookup, + percentage(time_tx_receipt_lookup, total_time), + time_tx_rpc_build, + percentage(time_tx_rpc_build, total_time), + time_tx_receipt_build, + percentage(time_tx_receipt_build, total_time), + time_tx_state_check, + percentage(time_tx_state_check, total_time), + time_tx_execution, + percentage(time_tx_execution, total_time), + time_tx_state_commit, + percentage(time_tx_state_commit, total_time), + // Finalization + time_final_builder, + percentage(time_final_builder, total_time), + // Transaction stats + total_transactions, + executed_transactions, + if total_transactions > 0 { + (executed_transactions as f64 / total_transactions as f64) * 100.0 + } else { + 0.0 + }, + skipped_transactions, + if total_transactions > 0 { + (skipped_transactions as f64 / total_transactions as f64) * 100.0 + } else { + 0.0 + }, + if executed_transactions > 0 { + time_tx_execution / executed_transactions as u32 + } else { + std::time::Duration::ZERO + }, + ); Ok(result) } @@ -680,3 +883,11 @@ where is_next_of_block || is_first_of_next_block } } + +fn percentage(duration: std::time::Duration, total: std::time::Duration) -> f64 { + if total.as_nanos() > 0 { + (duration.as_nanos() as f64 / total.as_nanos() as f64) * 100.0 + } else { + 0.0 + } +} From df6f5bd4ad79dbfef04675cad7df109b7791dc27 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 11:53:06 -0500 Subject: [PATCH 13/17] . --- crates/flashblocks-rpc/src/state.rs | 96 +++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 27 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 64377b0b..fa1aa6fa 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -576,10 +576,13 @@ where let tx_hash = transaction.tx_hash(); let step_start = Instant::now(); - let sender = prev_pending_blocks - .as_ref() - .and_then(|pb| pb.get_transaction_sender(&tx_hash)) - .unwrap_or(transaction.recover_signer()?); + let sender = match &prev_pending_blocks { + Some(pending_blocks) => match pending_blocks.get_transaction_sender(&tx_hash) { + Some(sender) => sender, + None => transaction.recover_signer()?, + }, + None => transaction.recover_signer()?, + }; pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); @@ -635,32 +638,71 @@ where time_tx_rpc_build += step_start.elapsed(); let step_start = Instant::now(); - // Receipt Generation - let meta: TransactionMeta = TransactionMeta { - tx_hash, - index: idx as u64, - block_hash: header.hash(), - block_number: block.number, - base_fee: block.base_fee_per_gas, - excess_blob_gas: block.excess_blob_gas, - timestamp: block.timestamp, - }; - let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(transaction, sender), - gas_used: receipt.cumulative_gas_used() - gas_used, - next_log_index, - meta, + // Receipt Generation + let op_receipt = match &prev_pending_blocks { + Some(pending_blocks) => match pending_blocks.get_receipt(tx_hash) { + Some(receipt) => receipt, + None => { + let meta: TransactionMeta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: header.hash(), + block_number: block.number, + base_fee: block.base_fee_per_gas, + excess_blob_gas: block.excess_blob_gas, + timestamp: block.timestamp, + }; + + let input: ConvertReceiptInput<'_, OpPrimitives> = + ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(transaction, sender), + gas_used: receipt.cumulative_gas_used() - gas_used, + next_log_index, + meta, + }; + + let op_receipt = OpReceiptBuilder::new( + self.client.chain_spec().as_ref(), + input, + &mut l1_block_info, + )? + .build(); + + op_receipt + } + }, + None => { + let meta: TransactionMeta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: header.hash(), + block_number: block.number, + base_fee: block.base_fee_per_gas, + excess_blob_gas: block.excess_blob_gas, + timestamp: block.timestamp, + }; + + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(transaction, sender), + gas_used: receipt.cumulative_gas_used() - gas_used, + next_log_index, + meta, + }; + + let op_receipt = OpReceiptBuilder::new( + self.client.chain_spec().as_ref(), + input, + &mut l1_block_info, + )? + .build(); + + op_receipt + } }; - let op_receipt = OpReceiptBuilder::new( - self.client.chain_spec().as_ref(), - input, - &mut l1_block_info, - )? - .build(); - pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); From f71445f27ead96cd3597ce8cba13e2b688446eeb Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 12:46:42 -0500 Subject: [PATCH 14/17] clean up unused imports --- crates/flashblocks-rpc/src/state.rs | 276 ++-------------------------- 1 file changed, 16 insertions(+), 260 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index fa1aa6fa..6017a49e 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -396,33 +396,7 @@ where prev_pending_blocks: Option>, flashblocks: &Vec, ) -> eyre::Result>> { - let fn_start = Instant::now(); - - // Timing tracking - let mut time_per_block_loop = std::time::Duration::ZERO; - let mut time_base_collection = std::time::Duration::ZERO; - let mut time_transactions_collection = std::time::Duration::ZERO; - let mut time_receipts_collection = std::time::Duration::ZERO; - let mut time_balances_collection = std::time::Duration::ZERO; - let mut time_execution_payload_build = std::time::Duration::ZERO; - let mut time_block_conversion = std::time::Duration::ZERO; - let mut time_evm_env_setup = std::time::Duration::ZERO; - let mut time_transaction_loop = std::time::Duration::ZERO; - let mut time_tx_sender_recovery = std::time::Duration::ZERO; - let mut time_tx_receipt_lookup = std::time::Duration::ZERO; - let mut time_tx_rpc_build = std::time::Duration::ZERO; - let mut time_tx_receipt_build = std::time::Duration::ZERO; - let mut time_tx_state_check = std::time::Duration::ZERO; - let mut time_tx_execution = std::time::Duration::ZERO; - let mut time_tx_state_commit = std::time::Duration::ZERO; - let mut time_balance_updates = std::time::Duration::ZERO; - - let mut total_transactions = 0; - let mut executed_transactions = 0; - let mut skipped_transactions = 0; - // BTreeMap guarantees ascending order of keys while iterating - let step_start = Instant::now(); let mut flashblocks_per_block = BTreeMap::>::new(); for flashblock in flashblocks { flashblocks_per_block @@ -430,47 +404,33 @@ where .or_default() .push(flashblock); } - let time_btree_construction = step_start.elapsed(); - let step_start = Instant::now(); let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( "Failed to extract header for canonical block number {}. Allow 10 to 15 minutes after a restart/redeploy or after your node has synced to tip.", canonical_block ))?; - let time_header_fetch = step_start.elapsed(); - let step_start = Instant::now(); let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); - let time_evm_config = step_start.elapsed(); - let step_start = Instant::now(); let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); - let time_state_provider_setup = step_start.elapsed(); - let step_start = Instant::now(); let mut db = match &prev_pending_blocks { Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, None => CacheDB::new(state), }; - let time_db_cache_setup = step_start.elapsed(); - let step_start = Instant::now(); let mut state_overrides = match &prev_pending_blocks { Some(pending_blocks) => pending_blocks.get_state_overrides().unwrap_or_default(), None => StateOverride::default(), }; - let time_state_overrides_setup = step_start.elapsed(); for (_block_number, flashblocks) in flashblocks_per_block { - let block_start = Instant::now(); - - let step_start = Instant::now(); let base = flashblocks .first() .ok_or(eyre!("cannot build a pending block from no flashblocks"))? @@ -482,9 +442,7 @@ where .last() .cloned() .ok_or(eyre!("cannot build a pending block from no flashblocks"))?; - time_base_collection += step_start.elapsed(); - let step_start = Instant::now(); let transactions: Vec = flashblocks .iter() .flat_map(|flashblock| flashblock.diff.transactions.clone()) @@ -494,9 +452,7 @@ where .iter() .flat_map(|flashblock| flashblock.diff.withdrawals.clone()) .collect(); - time_transactions_collection += step_start.elapsed(); - let step_start = Instant::now(); let receipt_by_hash = flashblocks .iter() .map(|flashblock| flashblock.metadata.receipts.clone()) @@ -504,9 +460,7 @@ where acc.extend(receipts); acc }); - time_receipts_collection += step_start.elapsed(); - let step_start = Instant::now(); let updated_balances = flashblocks .iter() .map(|flashblock| flashblock.metadata.new_account_balances.clone()) @@ -514,9 +468,7 @@ where acc.extend(balances); acc }); - time_balances_collection += step_start.elapsed(); - let step_start = Instant::now(); pending_blocks_builder.with_flashblocks( flashblocks.iter().map(|&x| x.clone()).collect::>(), ); @@ -544,9 +496,7 @@ where }, }, }; - time_execution_payload_build += step_start.elapsed(); - let step_start = Instant::now(); let block: OpBlock = execution_payload.try_into_block()?; let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; let header = block.header.clone().seal_slow(); @@ -560,42 +510,32 @@ where parent_beacon_block_root: Some(base.parent_beacon_block_root), extra_data: base.extra_data.clone(), }; - time_block_conversion += step_start.elapsed(); - let step_start = Instant::now(); let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; let mut evm = evm_config.evm_with_env(db, evm_env); - time_evm_env_setup += step_start.elapsed(); let mut gas_used = 0; let mut next_log_index = 0; - let tx_loop_start = Instant::now(); for (idx, transaction) in block.body.transactions.iter().enumerate() { - total_transactions += 1; let tx_hash = transaction.tx_hash(); - let step_start = Instant::now(); - let sender = match &prev_pending_blocks { - Some(pending_blocks) => match pending_blocks.get_transaction_sender(&tx_hash) { - Some(sender) => sender, - None => transaction.recover_signer()?, - }, + let sender = match prev_pending_blocks + .as_ref() + .and_then(|p| p.get_transaction_sender(&tx_hash)) + { + Some(sender) => sender, None => transaction.recover_signer()?, }; pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); - time_tx_sender_recovery += step_start.elapsed(); - let step_start = Instant::now(); let receipt = receipt_by_hash .get(&tx_hash) .cloned() .ok_or(eyre!("missing receipt for {:?}", tx_hash))?; - time_tx_receipt_lookup += step_start.elapsed(); - let step_start = Instant::now(); let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); let envelope = recovered_transaction.clone().convert::(); @@ -635,46 +575,13 @@ where }; pending_blocks_builder.with_transaction(rpc_txn); - time_tx_rpc_build += step_start.elapsed(); - - let step_start = Instant::now(); // Receipt Generation - let op_receipt = match &prev_pending_blocks { - Some(pending_blocks) => match pending_blocks.get_receipt(tx_hash) { - Some(receipt) => receipt, - None => { - let meta: TransactionMeta = TransactionMeta { - tx_hash, - index: idx as u64, - block_hash: header.hash(), - block_number: block.number, - base_fee: block.base_fee_per_gas, - excess_blob_gas: block.excess_blob_gas, - timestamp: block.timestamp, - }; - - let input: ConvertReceiptInput<'_, OpPrimitives> = - ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(transaction, sender), - gas_used: receipt.cumulative_gas_used() - gas_used, - next_log_index, - meta, - }; - - let op_receipt = OpReceiptBuilder::new( - self.client.chain_spec().as_ref(), - input, - &mut l1_block_info, - )? - .build(); - - op_receipt - } - }, - None => { - let meta: TransactionMeta = TransactionMeta { + let op_receipt = prev_pending_blocks + .as_ref() + .and_then(|pending_blocks| pending_blocks.get_receipt(tx_hash)) + .unwrap_or_else(|| { + let meta = TransactionMeta { tx_hash, index: idx as u64, block_hash: header.hash(), @@ -692,23 +599,19 @@ where meta, }; - let op_receipt = OpReceiptBuilder::new( + OpReceiptBuilder::new( self.client.chain_spec().as_ref(), input, &mut l1_block_info, - )? - .build(); - - op_receipt - } - }; + ) + .unwrap() + .build() + }); pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); - time_tx_receipt_build += step_start.elapsed(); - let step_start = Instant::now(); let mut should_execute_transaction = false; match &prev_pending_blocks { Some(pending_blocks) => match pending_blocks.get_transaction_state(&tx_hash) { @@ -723,17 +626,10 @@ where should_execute_transaction = true; } } - time_tx_state_check += step_start.elapsed(); if should_execute_transaction { - executed_transactions += 1; - - let step_start = Instant::now(); match evm.transact(recovered_transaction) { Ok(ResultAndState { state, .. }) => { - time_tx_execution += step_start.elapsed(); - - let step_start = Instant::now(); for (addr, acc) in &state { let existing_override = state_overrides.entry(*addr).or_insert(Default::default()); @@ -752,7 +648,6 @@ where } pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); evm.db_mut().commit(state); - time_tx_state_commit += step_start.elapsed(); } Err(e) => { return Err(eyre!( @@ -763,151 +658,20 @@ where )); } } - } else { - skipped_transactions += 1; } } - time_transaction_loop += tx_loop_start.elapsed(); - let step_start = Instant::now(); for (address, balance) in updated_balances { pending_blocks_builder.with_account_balance(address, balance); } - time_balance_updates += step_start.elapsed(); db = evm.into_db(); last_block_header = block.header.clone(); - - time_per_block_loop += block_start.elapsed(); } - let step_start = Instant::now(); pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); - let result = Some(Arc::new(pending_blocks_builder.build()?)); - let time_final_builder = step_start.elapsed(); - - let total_time = fn_start.elapsed(); - - // Pretty print timing breakdown - debug!( - "\n╔══════════════════════════════════════════════════════════════════════════════════════════════╗\n\ - ║ BUILD_PENDING_STATE TIMING BREAKDOWN ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TOTAL TIME: {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ INITIALIZATION ║\n\ - ║ • BTree Construction : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Header Fetch : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • EVM Config : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Provider Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • DB Cache Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Overrides Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ PER-BLOCK PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Base Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Transactions Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipts Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Balances Collection : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Execution Payload Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Block Conversion : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • EVM Environment Setup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Balance Updates : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TRANSACTION PROCESSING (Total) : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Sender Recovery : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipt Lookup : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • RPC Transaction Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Receipt Build : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • State Check : {:>8.2?} ({:>6.2}%) ║\n\ - ║ • Transaction Execution : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ - ║ • State Commit : {:>8.2?} ({:>6.2}%) ⭐ HOT PATH ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ FINALIZATION ║\n\ - ║ • Final Builder Operations : {:>8.2?} ({:>6.2}%) ║\n\ - ╠══════════════════════════════════════════════════════════════════════════════════════════════╣\n\ - ║ TRANSACTION STATS ║\n\ - ║ • Total Transactions : {:>6} ║\n\ - ║ • Executed Transactions : {:>6} ({:>6.2}%) ║\n\ - ║ • Skipped Transactions : {:>6} ({:>6.2}%) ║\n\ - ║ • Avg Time per TX (executed) : {:>8.2?} ║\n\ - ╚══════════════════════════════════════════════════════════════════════════════════════════════╝", - total_time, - 100.0, - // Initialization - time_btree_construction, - percentage(time_btree_construction, total_time), - time_header_fetch, - percentage(time_header_fetch, total_time), - time_evm_config, - percentage(time_evm_config, total_time), - time_state_provider_setup, - percentage(time_state_provider_setup, total_time), - time_db_cache_setup, - percentage(time_db_cache_setup, total_time), - time_state_overrides_setup, - percentage(time_state_overrides_setup, total_time), - // Per-block processing - time_per_block_loop, - percentage(time_per_block_loop, total_time), - time_base_collection, - percentage(time_base_collection, total_time), - time_transactions_collection, - percentage(time_transactions_collection, total_time), - time_receipts_collection, - percentage(time_receipts_collection, total_time), - time_balances_collection, - percentage(time_balances_collection, total_time), - time_execution_payload_build, - percentage(time_execution_payload_build, total_time), - time_block_conversion, - percentage(time_block_conversion, total_time), - time_evm_env_setup, - percentage(time_evm_env_setup, total_time), - time_balance_updates, - percentage(time_balance_updates, total_time), - // Transaction processing - time_transaction_loop, - percentage(time_transaction_loop, total_time), - time_tx_sender_recovery, - percentage(time_tx_sender_recovery, total_time), - time_tx_receipt_lookup, - percentage(time_tx_receipt_lookup, total_time), - time_tx_rpc_build, - percentage(time_tx_rpc_build, total_time), - time_tx_receipt_build, - percentage(time_tx_receipt_build, total_time), - time_tx_state_check, - percentage(time_tx_state_check, total_time), - time_tx_execution, - percentage(time_tx_execution, total_time), - time_tx_state_commit, - percentage(time_tx_state_commit, total_time), - // Finalization - time_final_builder, - percentage(time_final_builder, total_time), - // Transaction stats - total_transactions, - executed_transactions, - if total_transactions > 0 { - (executed_transactions as f64 / total_transactions as f64) * 100.0 - } else { - 0.0 - }, - skipped_transactions, - if total_transactions > 0 { - (skipped_transactions as f64 / total_transactions as f64) * 100.0 - } else { - 0.0 - }, - if executed_transactions > 0 { - time_tx_execution / executed_transactions as u32 - } else { - std::time::Duration::ZERO - }, - ); - - Ok(result) + Ok(Some(Arc::new(pending_blocks_builder.build()?))) } fn is_next_flashblock( @@ -925,11 +689,3 @@ where is_next_of_block || is_first_of_next_block } } - -fn percentage(duration: std::time::Duration, total: std::time::Duration) -> f64 { - if total.as_nanos() > 0 { - (duration.as_nanos() as f64 / total.as_nanos() as f64) * 100.0 - } else { - 0.0 - } -} From cfbecc281f856634e25ff9b6c64bcc68cc1b22e3 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 14:31:34 -0500 Subject: [PATCH 15/17] move PendingBlocksAPI impl to pending_blocks.rs --- crates/flashblocks-rpc/src/pending_blocks.rs | 57 +++++++++++- crates/flashblocks-rpc/src/state.rs | 97 +++++--------------- 2 files changed, 78 insertions(+), 76 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 23aac9e3..bcc96449 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,4 +1,6 @@ -use crate::subscription::Flashblock; +use std::sync::Arc; + +use crate::{rpc::PendingBlocksAPI, subscription::Flashblock}; use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{ @@ -8,11 +10,13 @@ use alloy_primitives::{ use alloy_provider::network::TransactionResponse; use alloy_rpc_types::{state::StateOverride, BlockTransactions}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; +use arc_swap::Guard; use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{db::Cache, state::EvmState}; -use reth_rpc_eth_api::RpcBlock; +use reth_rpc_convert::RpcTransaction; +use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; pub struct PendingBlocksBuilder { flashblocks: Vec, @@ -253,3 +257,52 @@ impl PendingBlocks { logs } } + +impl PendingBlocksAPI for Guard>> { + fn get_canonical_block_number(&self) -> BlockNumberOrTag { + self.as_ref() + .map(|pb| pb.canonical_block_number()) + .unwrap_or(BlockNumberOrTag::Latest) + } + + fn get_transaction_count(&self, address: Address) -> U256 { + self.as_ref() + .map(|pb| pb.get_transaction_count(address)) + .unwrap_or_else(|| U256::from(0)) + } + + fn get_block(&self, full: bool) -> Option> { + self.as_ref().map(|pb| pb.get_latest_block(full)) + } + + fn get_transaction_receipt( + &self, + tx_hash: alloy_primitives::TxHash, + ) -> Option> { + self.as_ref().and_then(|pb| pb.get_receipt(tx_hash)) + } + + fn get_transaction_by_hash( + &self, + tx_hash: alloy_primitives::TxHash, + ) -> Option> { + self.as_ref() + .and_then(|pb| pb.get_transaction_by_hash(tx_hash)) + } + + fn get_balance(&self, address: Address) -> Option { + self.as_ref().and_then(|pb| pb.get_balance(address)) + } + + fn get_state_overrides(&self) -> Option { + self.as_ref() + .map(|pb| pb.get_state_overrides()) + .unwrap_or_default() + } + + fn get_pending_logs(&self, filter: &Filter) -> Vec { + self.as_ref() + .map(|pb| pb.get_pending_logs(filter)) + .unwrap_or_default() + } +} diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 6017a49e..9bfcd6e8 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -1,22 +1,19 @@ -use std::{ - collections::{BTreeMap, HashSet}, - sync::Arc, - time::Instant, -}; - -use alloy_consensus::{ - transaction::{Recovered, SignerRecoverable, TransactionMeta}, - Header, TxReceipt, -}; +use crate::metrics::Metrics; +use crate::pending_blocks::{PendingBlocks, PendingBlocksBuilder}; +use crate::rpc::FlashblocksAPI; +use crate::subscription::{Flashblock, FlashblocksReceiver}; +use alloy_consensus::transaction::{Recovered, SignerRecoverable, TransactionMeta}; +use alloy_consensus::{Header, TxReceipt}; use alloy_eips::BlockNumberOrTag; -use alloy_primitives::{map::foldhash::HashMap, Address, BlockNumber, Bytes, Sealable, B256, U256}; +use alloy_primitives::map::foldhash::HashMap; +use alloy_primitives::{BlockNumber, Bytes, Sealable, B256}; use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; -use alloy_rpc_types_eth::{state::StateOverride, Filter, Log}; +use alloy_rpc_types_eth::state::StateOverride; use arc_swap::{ArcSwapOption, Guard}; use eyre::eyre; use op_alloy_consensus::OpTxEnvelope; -use op_alloy_network::{Optimism, TransactionResponse}; +use op_alloy_network::TransactionResponse; use op_alloy_rpc_types::Transaction; use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, @@ -32,13 +29,13 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_primitives::{DepositReceipt, OpBlock, OpPrimitives}; use reth_optimism_rpc::OpReceiptBuilder; use reth_primitives::RecoveredBlock; -use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcTransaction}; -use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use tokio::sync::{ - broadcast::{self, Sender}, - mpsc::{self, UnboundedReceiver}, - Mutex, -}; +use reth_rpc_convert::transaction::ConvertReceiptInput; +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::broadcast::{self, Sender}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; use crate::{ @@ -136,46 +133,6 @@ impl FlashblocksAPI for FlashblocksState { } } -impl PendingBlocksAPI for Guard>> { - fn get_canonical_block_number(&self) -> BlockNumberOrTag { - self.as_ref().map(|pb| pb.canonical_block_number()).unwrap_or(BlockNumberOrTag::Latest) - } - - fn get_transaction_count(&self, address: Address) -> U256 { - self.as_ref().map(|pb| pb.get_transaction_count(address)).unwrap_or_else(|| U256::from(0)) - } - - fn get_block(&self, full: bool) -> Option> { - self.as_ref().map(|pb| pb.get_latest_block(full)) - } - - fn get_transaction_receipt( - &self, - tx_hash: alloy_primitives::TxHash, - ) -> Option> { - self.as_ref().and_then(|pb| pb.get_receipt(tx_hash)) - } - - fn get_transaction_by_hash( - &self, - tx_hash: alloy_primitives::TxHash, - ) -> Option> { - self.as_ref().and_then(|pb| pb.get_transaction_by_hash(tx_hash)) - } - - fn get_balance(&self, address: Address) -> Option { - self.as_ref().and_then(|pb| pb.get_balance(address)) - } - - fn get_state_overrides(&self) -> Option { - self.as_ref().map(|pb| pb.get_state_overrides()).unwrap_or_default() - } - - fn get_pending_logs(&self, filter: &Filter) -> Vec { - self.as_ref().map(|pb| pb.get_pending_logs(filter)).unwrap_or_default() - } -} - #[derive(Debug, Clone)] struct StateProcessor { rx: Arc>>, @@ -413,7 +370,6 @@ where ))?; let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); - let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); @@ -612,19 +568,12 @@ where gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); - let mut should_execute_transaction = false; - match &prev_pending_blocks { - Some(pending_blocks) => match pending_blocks.get_transaction_state(&tx_hash) { - Some(state) => { - pending_blocks_builder.with_transaction_state(tx_hash, state); - } - None => { - should_execute_transaction = true; - } - }, - None => { - should_execute_transaction = true; - } + let mut should_execute_transaction = true; + if let Some(state) = + prev_pending_blocks.as_ref().and_then(|p| p.get_transaction_state(&tx_hash)) + { + pending_blocks_builder.with_transaction_state(tx_hash, state); + should_execute_transaction = false; } if should_execute_transaction { From de319986a2c3099fcc1302e0111bec1caeaa7ad4 Mon Sep 17 00:00:00 2001 From: Haardik H Date: Wed, 12 Nov 2025 14:41:18 -0500 Subject: [PATCH 16/17] rustfmt --- crates/flashblocks-rpc/src/pending_blocks.rs | 26 +++---- crates/flashblocks-rpc/src/rpc.rs | 14 ++-- crates/flashblocks-rpc/src/state.rs | 36 +++++----- crates/flashblocks-rpc/src/subscription.rs | 2 +- crates/flashblocks-rpc/src/tests/mod.rs | 2 +- crates/flashblocks-rpc/src/tests/rpc.rs | 40 +++++------ crates/flashblocks-rpc/src/tests/state.rs | 73 +++++++++++--------- crates/flashblocks-rpc/src/tests/utils.rs | 9 ++- crates/metering/src/meter.rs | 6 +- crates/metering/src/rpc.rs | 2 +- crates/metering/src/tests/meter.rs | 10 +-- crates/metering/src/tests/rpc.rs | 4 +- crates/metering/src/tests/utils.rs | 9 ++- crates/node/src/main.rs | 6 +- crates/transaction-tracing/src/tracing.rs | 2 +- justfile | 5 +- rustfmt.toml | 2 +- 17 files changed, 124 insertions(+), 124 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index bcc96449..a0d9ffbc 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,14 +1,13 @@ use std::sync::Arc; -use crate::{rpc::PendingBlocksAPI, subscription::Flashblock}; use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{ + Address, B256, BlockNumber, TxHash, U256, map::foldhash::{HashMap, HashMapExt}, - Address, BlockNumber, TxHash, B256, U256, }; use alloy_provider::network::TransactionResponse; -use alloy_rpc_types::{state::StateOverride, BlockTransactions}; +use alloy_rpc_types::{BlockTransactions, state::StateOverride}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use arc_swap::Guard; use eyre::eyre; @@ -18,6 +17,8 @@ use reth::revm::{db::Cache, state::EvmState}; use reth_rpc_convert::RpcTransaction; use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; +use crate::{rpc::PendingBlocksAPI, subscription::Flashblock}; + pub struct PendingBlocksBuilder { flashblocks: Vec, headers: Vec>, @@ -260,15 +261,11 @@ impl PendingBlocks { impl PendingBlocksAPI for Guard>> { fn get_canonical_block_number(&self) -> BlockNumberOrTag { - self.as_ref() - .map(|pb| pb.canonical_block_number()) - .unwrap_or(BlockNumberOrTag::Latest) + self.as_ref().map(|pb| pb.canonical_block_number()).unwrap_or(BlockNumberOrTag::Latest) } fn get_transaction_count(&self, address: Address) -> U256 { - self.as_ref() - .map(|pb| pb.get_transaction_count(address)) - .unwrap_or_else(|| U256::from(0)) + self.as_ref().map(|pb| pb.get_transaction_count(address)).unwrap_or_else(|| U256::from(0)) } fn get_block(&self, full: bool) -> Option> { @@ -286,8 +283,7 @@ impl PendingBlocksAPI for Guard>> { &self, tx_hash: alloy_primitives::TxHash, ) -> Option> { - self.as_ref() - .and_then(|pb| pb.get_transaction_by_hash(tx_hash)) + self.as_ref().and_then(|pb| pb.get_transaction_by_hash(tx_hash)) } fn get_balance(&self, address: Address) -> Option { @@ -295,14 +291,10 @@ impl PendingBlocksAPI for Guard>> { } fn get_state_overrides(&self) -> Option { - self.as_ref() - .map(|pb| pb.get_state_overrides()) - .unwrap_or_default() + self.as_ref().map(|pb| pb.get_state_overrides()).unwrap_or_default() } fn get_pending_logs(&self, filter: &Filter) -> Vec { - self.as_ref() - .map(|pb| pb.get_pending_logs(filter)) - .unwrap_or_default() + self.as_ref().map(|pb| pb.get_pending_logs(filter)).unwrap_or_default() } } diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index cfd228f7..575d9e22 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -2,21 +2,21 @@ use std::{sync::Arc, time::Duration}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{ - map::foldhash::{HashSet, HashSetExt}, Address, TxHash, U256, + map::foldhash::{HashSet, HashSetExt}, }; use alloy_rpc_types::{ + BlockOverrides, simulate::{SimBlock, SimulatePayload, SimulatedBlock}, state::{EvmOverrides, StateOverride, StateOverridesBuilder}, - BlockOverrides, }; use alloy_rpc_types_eth::{Filter, Log}; use arc_swap::Guard; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{RpcResult, async_trait}, proc_macros::rpc, }; -use jsonrpsee_types::{error::INVALID_PARAMS_CODE, ErrorObjectOwned}; +use jsonrpsee_types::{ErrorObjectOwned, error::INVALID_PARAMS_CODE}; use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::{ @@ -24,14 +24,14 @@ use reth::{ rpc::{eth::EthFilter, server_types::eth::EthApiError}, }; use reth_rpc_eth_api::{ - helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi}, EthApiTypes, EthFilterApiServer, RpcBlock, RpcReceipt, RpcTransaction, + helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi}, }; use tokio::{ sync::{broadcast, broadcast::error::RecvError}, time, }; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; use tracing::{debug, trace, warn}; use crate::{metrics::Metrics, pending_blocks::PendingBlocks}; @@ -91,7 +91,7 @@ pub trait EthApiOverride { #[method(name = "getBalance")] async fn get_balance(&self, address: Address, block_number: Option) - -> RpcResult; + -> RpcResult; #[method(name = "getTransactionCount")] async fn get_transaction_count( diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 9bfcd6e8..e1471937 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -1,12 +1,15 @@ -use crate::metrics::Metrics; -use crate::pending_blocks::{PendingBlocks, PendingBlocksBuilder}; -use crate::rpc::FlashblocksAPI; -use crate::subscription::{Flashblock, FlashblocksReceiver}; -use alloy_consensus::transaction::{Recovered, SignerRecoverable, TransactionMeta}; -use alloy_consensus::{Header, TxReceipt}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, + time::Instant, +}; + +use alloy_consensus::{ + Header, TxReceipt, + transaction::{Recovered, SignerRecoverable, TransactionMeta}, +}; use alloy_eips::BlockNumberOrTag; -use alloy_primitives::map::foldhash::HashMap; -use alloy_primitives::{BlockNumber, Bytes, Sealable, B256}; +use alloy_primitives::{B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap}; use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; use alloy_rpc_types_eth::state::StateOverride; @@ -19,8 +22,8 @@ use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, providers::{BlockReaderIdExt, StateProviderFactory}, revm::{ - context::result::ResultAndState, database::StateProviderDatabase, db::CacheDB, - DatabaseCommit, State, + DatabaseCommit, State, context::result::ResultAndState, database::StateProviderDatabase, + db::CacheDB, }, }; use reth_evm::{ConfigureEvm, Evm}; @@ -30,18 +33,17 @@ use reth_optimism_primitives::{DepositReceipt, OpBlock, OpPrimitives}; use reth_optimism_rpc::OpReceiptBuilder; use reth_primitives::RecoveredBlock; use reth_rpc_convert::transaction::ConvertReceiptInput; -use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; -use std::time::Instant; -use tokio::sync::broadcast::{self, Sender}; -use tokio::sync::mpsc::{self, UnboundedReceiver}; -use tokio::sync::Mutex; +use tokio::sync::{ + Mutex, + broadcast::{self, Sender}, + mpsc::{self, UnboundedReceiver}, +}; use tracing::{debug, error, info, warn}; use crate::{ metrics::Metrics, pending_blocks::{PendingBlocks, PendingBlocksBuilder}, - rpc::{FlashblocksAPI, PendingBlocksAPI}, + rpc::FlashblocksAPI, subscription::{Flashblock, FlashblocksReceiver}, }; diff --git a/crates/flashblocks-rpc/src/subscription.rs b/crates/flashblocks-rpc/src/subscription.rs index 22befb7d..24e72fcc 100644 --- a/crates/flashblocks-rpc/src/subscription.rs +++ b/crates/flashblocks-rpc/src/subscription.rs @@ -1,6 +1,6 @@ use std::{io::Read, sync::Arc, time::Duration}; -use alloy_primitives::{map::foldhash::HashMap, Address, B256, U256}; +use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; use alloy_rpc_types_engine::PayloadId; use futures_util::{SinkExt as _, StreamExt}; use reth_optimism_primitives::OpReceipt; diff --git a/crates/flashblocks-rpc/src/tests/mod.rs b/crates/flashblocks-rpc/src/tests/mod.rs index e1477cdc..59995edb 100644 --- a/crates/flashblocks-rpc/src/tests/mod.rs +++ b/crates/flashblocks-rpc/src/tests/mod.rs @@ -1,4 +1,4 @@ -use alloy_primitives::{b256, bytes, Bytes, B256}; +use alloy_primitives::{B256, Bytes, b256, bytes}; mod rpc; mod state; diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 9c5cdaa4..69d2022a 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -6,13 +6,13 @@ mod tests { use alloy_eips::BlockNumberOrTag; use alloy_genesis::Genesis; use alloy_primitives::{ - address, b256, bytes, map::HashMap, Address, Bytes, LogData, TxHash, B256, U256, + Address, B256, Bytes, LogData, TxHash, U256, address, b256, bytes, map::HashMap, }; use alloy_provider::{Provider, RootProvider}; use alloy_rpc_client::RpcClient; use alloy_rpc_types::simulate::{SimBlock, SimulatePayload}; use alloy_rpc_types_engine::PayloadId; - use alloy_rpc_types_eth::{error::EthRpcErrorCode, TransactionInput}; + use alloy_rpc_types_eth::{TransactionInput, error::EthRpcErrorCode}; use op_alloy_consensus::OpDepositReceipt; use op_alloy_network::{Optimism, ReceiptResponse, TransactionResponse}; use op_alloy_rpc_types::OpTransactionRequest; @@ -24,7 +24,7 @@ mod tests { tasks::TaskManager, }; use reth_optimism_chainspec::OpChainSpecBuilder; - use reth_optimism_node::{args::RollupArgs, OpNode}; + use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_optimism_primitives::OpReceipt; use reth_provider::providers::BlockchainProvider; use reth_rpc_eth_api::RpcReceipt; @@ -527,12 +527,13 @@ mod tests { provider.call(send_eth_call.nonce(4)).block(BlockNumberOrTag::Pending.into()).await; assert!(res.is_err()); - assert!(res - .unwrap_err() - .as_error_resp() - .unwrap() - .message - .contains("insufficient funds for gas")); + assert!( + res.unwrap_err() + .as_error_resp() + .unwrap() + .message + .contains("insufficient funds for gas") + ); // read count1 from counter contract let eth_call_count1 = OpTransactionRequest::default() @@ -603,12 +604,13 @@ mod tests { .await; assert!(res.is_err()); - assert!(res - .unwrap_err() - .as_error_resp() - .unwrap() - .message - .contains("insufficient funds for gas")); + assert!( + res.unwrap_err() + .as_error_resp() + .unwrap() + .message + .contains("insufficient funds for gas") + ); Ok(()) } @@ -706,11 +708,9 @@ mod tests { let receipt_result = node.send_raw_transaction_sync(TRANSFER_ETH_TX, Some(0)).await; let error_code = EthRpcErrorCode::TransactionConfirmationTimeout.code(); - assert!(receipt_result - .err() - .unwrap() - .to_string() - .contains(format!("{}", error_code).as_str())); + assert!( + receipt_result.err().unwrap().to_string().contains(format!("{}", error_code).as_str()) + ); } #[tokio::test] diff --git a/crates/flashblocks-rpc/src/tests/state.rs b/crates/flashblocks-rpc/src/tests/state.rs index 20005507..3354bf84 100644 --- a/crates/flashblocks-rpc/src/tests/state.rs +++ b/crates/flashblocks-rpc/src/tests/state.rs @@ -3,11 +3,11 @@ mod tests { use std::{sync::Arc, time::Duration}; use alloy_consensus::{ - crypto::secp256k1::public_key_to_address, BlockHeader, Header, Receipt, Transaction, + BlockHeader, Header, Receipt, Transaction, crypto::secp256k1::public_key_to_address, }; use alloy_eips::{BlockHashOrNumber, Decodable2718, Encodable2718}; use alloy_genesis::GenesisAccount; - use alloy_primitives::{map::foldhash::HashMap, Address, BlockNumber, Bytes, B256, U256}; + use alloy_primitives::{Address, B256, BlockNumber, Bytes, U256, map::foldhash::HashMap}; use alloy_provider::network::BlockResponse; use alloy_rpc_types_engine::PayloadId; use op_alloy_consensus::OpDepositReceipt; @@ -18,16 +18,16 @@ mod tests { revm::database::StateProviderDatabase, transaction_pool::test_utils::TransactionBuilder, }; - use reth_db::{test_utils::TempDatabase, DatabaseEnv}; - use reth_evm::{execute::Executor, ConfigureEvm}; - use reth_optimism_chainspec::{OpChainSpecBuilder, BASE_MAINNET}; + use reth_db::{DatabaseEnv, test_utils::TempDatabase}; + use reth_evm::{ConfigureEvm, execute::Executor}; + use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpecBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_optimism_node::OpNode; use reth_optimism_primitives::{OpBlock, OpBlockBody, OpReceipt, OpTransactionSigned}; use reth_primitives_traits::{Account, Block, RecoveredBlock, SealedHeader}; use reth_provider::{ - providers::BlockchainProvider, BlockWriter, ChainSpecProvider, ExecutionOutcome, - LatestStateProviderRef, ProviderFactory, StateProviderFactory, + BlockWriter, ChainSpecProvider, ExecutionOutcome, LatestStateProviderRef, ProviderFactory, + StateProviderFactory, providers::BlockchainProvider, }; use rollup_boost::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1}; use tokio::time::sleep; @@ -36,7 +36,7 @@ mod tests { rpc::{FlashblocksAPI, PendingBlocksAPI}, state::FlashblocksState, subscription::{Flashblock, FlashblocksReceiver, Metadata}, - tests::{utils::create_test_provider_factory, BLOCK_INFO_TXN, BLOCK_INFO_TXN_HASH}, + tests::{BLOCK_INFO_TXN, BLOCK_INFO_TXN_HASH, utils::create_test_provider_factory}, }; // The amount of time to wait (in milliseconds) after sending a new flashblock or canonical block // so it can be processed by the state processor @@ -417,12 +417,14 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -494,12 +496,14 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -560,12 +564,13 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + test.flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -612,12 +617,14 @@ mod tests { 1 ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -748,7 +755,7 @@ mod tests { assert_eq!(pending_nonce, 1); test.new_canonical_block_without_processing(vec![ - test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, 100, 0) + test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, 100, 0), ]) .await; diff --git a/crates/flashblocks-rpc/src/tests/utils.rs b/crates/flashblocks-rpc/src/tests/utils.rs index cca64278..a84815b0 100644 --- a/crates/flashblocks-rpc/src/tests/utils.rs +++ b/crates/flashblocks-rpc/src/tests/utils.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ - init_db, - mdbx::{DatabaseArguments, MaxReadTransactionDuration, KILOBYTE, MEGABYTE}, - test_utils::{create_test_static_files_dir, tempdir_path, TempDatabase, ERROR_DB_CREATION}, - ClientVersion, DatabaseEnv, + ClientVersion, DatabaseEnv, init_db, + mdbx::{DatabaseArguments, KILOBYTE, MEGABYTE, MaxReadTransactionDuration}, + test_utils::{ERROR_DB_CREATION, TempDatabase, create_test_static_files_dir, tempdir_path}, }; -use reth_provider::{providers::StaticFileProvider, ProviderFactory}; +use reth_provider::{ProviderFactory, providers::StaticFileProvider}; pub fn create_test_provider_factory( chain_spec: Arc, diff --git a/crates/metering/src/meter.rs b/crates/metering/src/meter.rs index c9236201..da319f33 100644 --- a/crates/metering/src/meter.rs +++ b/crates/metering/src/meter.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Instant}; -use alloy_consensus::{transaction::SignerRecoverable, BlockHeader, Transaction as _}; +use alloy_consensus::{BlockHeader, Transaction as _, transaction::SignerRecoverable}; use alloy_primitives::{B256, U256}; -use eyre::{eyre, Result as EyreResult}; +use eyre::{Result as EyreResult, eyre}; use reth::revm::db::State; -use reth_evm::{execute::BlockBuilder, ConfigureEvm}; +use reth_evm::{ConfigureEvm, execute::BlockBuilder}; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_primitives_traits::SealedHeader; diff --git a/crates/metering/src/rpc.rs b/crates/metering/src/rpc.rs index 31bd2616..737bbd1e 100644 --- a/crates/metering/src/rpc.rs +++ b/crates/metering/src/rpc.rs @@ -2,7 +2,7 @@ use alloy_consensus::Header; use alloy_eips::BlockNumberOrTag; use alloy_primitives::U256; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{RpcResult, async_trait}, proc_macros::rpc, }; use reth::providers::BlockReaderIdExt; diff --git a/crates/metering/src/tests/meter.rs b/crates/metering/src/tests/meter.rs index d861eb40..a83947df 100644 --- a/crates/metering/src/tests/meter.rs +++ b/crates/metering/src/tests/meter.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use alloy_consensus::crypto::secp256k1::public_key_to_address; use alloy_eips::Encodable2718; use alloy_genesis::GenesisAccount; -use alloy_primitives::{keccak256, Address, Bytes, B256, U256}; +use alloy_primitives::{Address, B256, Bytes, U256, keccak256}; use eyre::Context; use op_alloy_consensus::OpTxEnvelope; -use rand::{rngs::StdRng, SeedableRng}; +use rand::{SeedableRng, rngs::StdRng}; use reth::{api::NodeTypesWithDBAdapter, chainspec::EthChainSpec}; -use reth_db::{test_utils::TempDatabase, DatabaseEnv}; -use reth_optimism_chainspec::{OpChainSpec, OpChainSpecBuilder, BASE_MAINNET}; +use reth_db::{DatabaseEnv, test_utils::TempDatabase}; +use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpec, OpChainSpecBuilder}; use reth_optimism_node::OpNode; use reth_optimism_primitives::OpTransactionSigned; use reth_primitives_traits::SealedHeader; -use reth_provider::{providers::BlockchainProvider, HeaderProvider, StateProviderFactory}; +use reth_provider::{HeaderProvider, StateProviderFactory, providers::BlockchainProvider}; use reth_testing_utils::generators::generate_keys; use reth_transaction_pool::test_utils::TransactionBuilder; use tips_core::types::{Bundle, ParsedBundle}; diff --git a/crates/metering/src/tests/rpc.rs b/crates/metering/src/tests/rpc.rs index 5d630628..d41d005d 100644 --- a/crates/metering/src/tests/rpc.rs +++ b/crates/metering/src/tests/rpc.rs @@ -4,7 +4,7 @@ mod tests { use alloy_eips::Encodable2718; use alloy_genesis::Genesis; - use alloy_primitives::{address, b256, bytes, Bytes, U256}; + use alloy_primitives::{Bytes, U256, address, b256, bytes}; use alloy_rpc_client::RpcClient; use op_alloy_consensus::OpTxEnvelope; use reth::{ @@ -15,7 +15,7 @@ mod tests { tasks::TaskManager, }; use reth_optimism_chainspec::OpChainSpecBuilder; - use reth_optimism_node::{args::RollupArgs, OpNode}; + use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_optimism_primitives::OpTransactionSigned; use reth_provider::providers::BlockchainProvider; use reth_transaction_pool::test_utils::TransactionBuilder; diff --git a/crates/metering/src/tests/utils.rs b/crates/metering/src/tests/utils.rs index d8181df4..7bd29fef 100644 --- a/crates/metering/src/tests/utils.rs +++ b/crates/metering/src/tests/utils.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ - init_db, - mdbx::{DatabaseArguments, MaxReadTransactionDuration, KILOBYTE, MEGABYTE}, - test_utils::{create_test_static_files_dir, tempdir_path, TempDatabase, ERROR_DB_CREATION}, - ClientVersion, DatabaseEnv, + ClientVersion, DatabaseEnv, init_db, + mdbx::{DatabaseArguments, KILOBYTE, MEGABYTE, MaxReadTransactionDuration}, + test_utils::{ERROR_DB_CREATION, TempDatabase, create_test_static_files_dir, tempdir_path}, }; -use reth_provider::{providers::StaticFileProvider, ProviderFactory}; +use reth_provider::{ProviderFactory, providers::StaticFileProvider}; pub fn create_provider_factory( chain_spec: Arc, diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 740f912c..9162c746 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -13,11 +13,11 @@ use once_cell::sync::OnceCell; use reth::{ builder::{EngineNodeLauncher, Node, NodeHandle, TreeConfig}, providers::providers::BlockchainProvider, - version::{default_reth_version_metadata, try_init_version_metadata, RethCliVersionConsts}, + version::{RethCliVersionConsts, default_reth_version_metadata, try_init_version_metadata}, }; use reth_exex::ExExEvent; -use reth_optimism_cli::{chainspec::OpChainSpecParser, Cli}; -use reth_optimism_node::{args::RollupArgs, OpNode}; +use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_node::{OpNode, args::RollupArgs}; use tracing::info; use url::Url; diff --git a/crates/transaction-tracing/src/tracing.rs b/crates/transaction-tracing/src/tracing.rs index 3d06c57d..5c299326 100644 --- a/crates/transaction-tracing/src/tracing.rs +++ b/crates/transaction-tracing/src/tracing.rs @@ -10,7 +10,7 @@ use futures::StreamExt; use lru::LruCache; use reth::{ api::{BlockBody, FullNodeComponents}, - core::primitives::{transaction::TxHashRef, AlloyBlockHeader}, + core::primitives::{AlloyBlockHeader, transaction::TxHashRef}, transaction_pool::{FullTransactionEvent, TransactionPool}, }; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; diff --git a/justfile b/justfile index 4717655e..c0861665 100644 --- a/justfile +++ b/justfile @@ -9,10 +9,11 @@ test: cargo test --workspace --all-features check-format: - cargo fmt --all -- --check + cargo +nightly fmt --all -- --check fix-format: - cargo fmt --all + cargo fix --allow-dirty --allow-staged + cargo +nightly fmt --all check-clippy: cargo clippy --all-targets -- -D warnings diff --git a/rustfmt.toml b/rustfmt.toml index 7b74d4d6..6e800413 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,4 @@ -style_edition = "2021" +style_edition = "2024" imports_granularity = "Crate" group_imports = "StdExternalCrate" use_small_heuristics = "Max" From 12cadc6e6636037102c7ee0905e79804b57442db Mon Sep 17 00:00:00 2001 From: Haardik H Date: Fri, 14 Nov 2025 09:43:19 -0500 Subject: [PATCH 17/17] update err msg --- crates/flashblocks-rpc/src/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index e1471937..b3335062 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -367,7 +367,7 @@ where let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( - "Failed to extract header for canonical block number {}. Allow 10 to 15 minutes after a restart/redeploy or after your node has synced to tip.", + "Failed to extract header for canonical block number {}. This can be ignored if the node has recently restarted, restored from a snapshot or is still syncing.", canonical_block ))?;