diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 17277650..a0d9ffbc 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,19 +1,23 @@ +use std::sync::Arc; + use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{ + Address, B256, BlockNumber, TxHash, U256, map::foldhash::{HashMap, HashMapExt}, - Address, BlockNumber, TxHash, B256, U256, }; use alloy_provider::network::TransactionResponse; -use alloy_rpc_types::{state::StateOverride, BlockTransactions}; +use alloy_rpc_types::{BlockTransactions, state::StateOverride}; use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; +use arc_swap::Guard; use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; use reth::revm::{db::Cache, state::EvmState}; -use reth_rpc_eth_api::RpcBlock; +use reth_rpc_convert::RpcTransaction; +use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use crate::subscription::Flashblock; +use crate::{rpc::PendingBlocksAPI, subscription::Flashblock}; pub struct PendingBlocksBuilder { flashblocks: Vec, @@ -25,6 +29,7 @@ pub struct PendingBlocksBuilder { transaction_receipts: HashMap, transactions_by_hash: HashMap, transaction_state: HashMap, + transaction_senders: HashMap, state_overrides: Option, db_cache: Cache, @@ -41,6 +46,7 @@ impl PendingBlocksBuilder { transaction_receipts: HashMap::new(), transactions_by_hash: HashMap::new(), transaction_state: HashMap::new(), + transaction_senders: HashMap::new(), state_overrides: None, db_cache: Cache::default(), } @@ -77,6 +83,12 @@ impl PendingBlocksBuilder { self } + #[inline] + pub(crate) fn with_transaction_sender(&mut self, hash: B256, sender: Address) -> &Self { + self.transaction_senders.insert(hash, sender); + self + } + #[inline] pub(crate) fn increment_nonce(&mut self, sender: Address) -> &Self { let zero = U256::from(0); @@ -122,6 +134,7 @@ impl PendingBlocksBuilder { transaction_receipts: self.transaction_receipts, transactions_by_hash: self.transactions_by_hash, transaction_state: self.transaction_state, + transaction_senders: self.transaction_senders, state_overrides: self.state_overrides, db_cache: self.db_cache, }) @@ -139,6 +152,7 @@ pub struct PendingBlocks { transaction_receipts: HashMap, transactions_by_hash: HashMap, transaction_state: HashMap, + transaction_senders: HashMap, state_overrides: Option, db_cache: Cache, @@ -153,6 +167,10 @@ impl PendingBlocks { BlockNumberOrTag::Number(self.headers.first().unwrap().number - 1) } + pub fn earliest_block_number(&self) -> BlockNumber { + self.headers.first().unwrap().number + } + pub fn latest_flashblock_index(&self) -> u64 { self.flashblocks.last().unwrap().index } @@ -165,8 +183,12 @@ impl PendingBlocks { self.flashblocks.clone() } - pub fn get_transaction_state(&self, hash: B256) -> Option { - self.transaction_state.get(&hash).cloned() + pub fn get_transaction_state(&self, hash: &B256) -> Option { + self.transaction_state.get(hash).cloned() + } + + pub fn get_transaction_sender(&self, tx_hash: &B256) -> Option
{ + self.transaction_senders.get(tx_hash).cloned() } pub fn get_db_cache(&self) -> Cache { @@ -236,3 +258,43 @@ impl PendingBlocks { logs } } + +impl PendingBlocksAPI for Guard>> { + fn get_canonical_block_number(&self) -> BlockNumberOrTag { + self.as_ref().map(|pb| pb.canonical_block_number()).unwrap_or(BlockNumberOrTag::Latest) + } + + fn get_transaction_count(&self, address: Address) -> U256 { + self.as_ref().map(|pb| pb.get_transaction_count(address)).unwrap_or_else(|| U256::from(0)) + } + + fn get_block(&self, full: bool) -> Option> { + self.as_ref().map(|pb| pb.get_latest_block(full)) + } + + fn get_transaction_receipt( + &self, + tx_hash: alloy_primitives::TxHash, + ) -> Option> { + self.as_ref().and_then(|pb| pb.get_receipt(tx_hash)) + } + + fn get_transaction_by_hash( + &self, + tx_hash: alloy_primitives::TxHash, + ) -> Option> { + self.as_ref().and_then(|pb| pb.get_transaction_by_hash(tx_hash)) + } + + fn get_balance(&self, address: Address) -> Option { + self.as_ref().and_then(|pb| pb.get_balance(address)) + } + + fn get_state_overrides(&self) -> Option { + self.as_ref().map(|pb| pb.get_state_overrides()).unwrap_or_default() + } + + fn get_pending_logs(&self, filter: &Filter) -> Vec { + self.as_ref().map(|pb| pb.get_pending_logs(filter)).unwrap_or_default() + } +} diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index cfd228f7..575d9e22 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -2,21 +2,21 @@ use std::{sync::Arc, time::Duration}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{ - map::foldhash::{HashSet, HashSetExt}, Address, TxHash, U256, + map::foldhash::{HashSet, HashSetExt}, }; use alloy_rpc_types::{ + BlockOverrides, simulate::{SimBlock, SimulatePayload, SimulatedBlock}, state::{EvmOverrides, StateOverride, StateOverridesBuilder}, - BlockOverrides, }; use alloy_rpc_types_eth::{Filter, Log}; use arc_swap::Guard; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{RpcResult, async_trait}, proc_macros::rpc, }; -use jsonrpsee_types::{error::INVALID_PARAMS_CODE, ErrorObjectOwned}; +use jsonrpsee_types::{ErrorObjectOwned, error::INVALID_PARAMS_CODE}; use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::{ @@ -24,14 +24,14 @@ use reth::{ rpc::{eth::EthFilter, server_types::eth::EthApiError}, }; use reth_rpc_eth_api::{ - helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi}, EthApiTypes, EthFilterApiServer, RpcBlock, RpcReceipt, RpcTransaction, + helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi}, }; use tokio::{ sync::{broadcast, broadcast::error::RecvError}, time, }; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; use tracing::{debug, trace, warn}; use crate::{metrics::Metrics, pending_blocks::PendingBlocks}; @@ -91,7 +91,7 @@ pub trait EthApiOverride { #[method(name = "getBalance")] async fn get_balance(&self, address: Address, block_number: Option) - -> RpcResult; + -> RpcResult; #[method(name = "getTransactionCount")] async fn get_transaction_count( diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index 08d2a774..b3335062 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -5,25 +5,25 @@ use std::{ }; use alloy_consensus::{ - transaction::{Recovered, SignerRecoverable, TransactionMeta}, Header, TxReceipt, + transaction::{Recovered, SignerRecoverable, TransactionMeta}, }; use alloy_eips::BlockNumberOrTag; -use alloy_primitives::{map::foldhash::HashMap, Address, BlockNumber, Bytes, Sealable, B256, U256}; +use alloy_primitives::{B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap}; use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; -use alloy_rpc_types_eth::{state::StateOverride, Filter, Log}; +use alloy_rpc_types_eth::state::StateOverride; use arc_swap::{ArcSwapOption, Guard}; use eyre::eyre; use op_alloy_consensus::OpTxEnvelope; -use op_alloy_network::{Optimism, TransactionResponse}; +use op_alloy_network::TransactionResponse; use op_alloy_rpc_types::Transaction; use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, providers::{BlockReaderIdExt, StateProviderFactory}, revm::{ - context::result::ResultAndState, database::StateProviderDatabase, db::CacheDB, - DatabaseCommit, State, + DatabaseCommit, State, context::result::ResultAndState, database::StateProviderDatabase, + db::CacheDB, }, }; use reth_evm::{ConfigureEvm, Evm}; @@ -32,19 +32,18 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_primitives::{DepositReceipt, OpBlock, OpPrimitives}; use reth_optimism_rpc::OpReceiptBuilder; use reth_primitives::RecoveredBlock; -use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcTransaction}; -use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; +use reth_rpc_convert::transaction::ConvertReceiptInput; use tokio::sync::{ + Mutex, broadcast::{self, Sender}, mpsc::{self, UnboundedReceiver}, - Mutex, }; use tracing::{debug, error, info, warn}; use crate::{ metrics::Metrics, pending_blocks::{PendingBlocks, PendingBlocksBuilder}, - rpc::{FlashblocksAPI, PendingBlocksAPI}, + rpc::FlashblocksAPI, subscription::{Flashblock, FlashblocksReceiver}, }; @@ -72,13 +71,14 @@ where + Clone + 'static, { - pub fn new(client: Client) -> Self { + pub fn new(client: Client, max_pending_blocks_depth: u64) -> Self { let (tx, rx) = mpsc::unbounded_channel::(); let pending_blocks: Arc> = Arc::new(ArcSwapOption::new(None)); let (flashblock_sender, _) = broadcast::channel(BUFFER_SIZE); let state_processor = StateProcessor::new( client, pending_blocks.clone(), + max_pending_blocks_depth, Arc::new(Mutex::new(rx)), flashblock_sender.clone(), ); @@ -135,50 +135,11 @@ impl FlashblocksAPI for FlashblocksState { } } -impl PendingBlocksAPI for Guard>> { - fn get_canonical_block_number(&self) -> BlockNumberOrTag { - self.as_ref().map(|pb| pb.canonical_block_number()).unwrap_or(BlockNumberOrTag::Latest) - } - - fn get_transaction_count(&self, address: Address) -> U256 { - self.as_ref().map(|pb| pb.get_transaction_count(address)).unwrap_or_else(|| U256::from(0)) - } - - fn get_block(&self, full: bool) -> Option> { - self.as_ref().map(|pb| pb.get_latest_block(full)) - } - - fn get_transaction_receipt( - &self, - tx_hash: alloy_primitives::TxHash, - ) -> Option> { - self.as_ref().and_then(|pb| pb.get_receipt(tx_hash)) - } - - fn get_transaction_by_hash( - &self, - tx_hash: alloy_primitives::TxHash, - ) -> Option> { - self.as_ref().and_then(|pb| pb.get_transaction_by_hash(tx_hash)) - } - - fn get_balance(&self, address: Address) -> Option { - self.as_ref().and_then(|pb| pb.get_balance(address)) - } - - fn get_state_overrides(&self) -> Option { - self.as_ref().map(|pb| pb.get_state_overrides()).unwrap_or_default() - } - - fn get_pending_logs(&self, filter: &Filter) -> Vec { - self.as_ref().map(|pb| pb.get_pending_logs(filter)).unwrap_or_default() - } -} - #[derive(Debug, Clone)] struct StateProcessor { rx: Arc>>, pending_blocks: Arc>, + max_depth: u64, metrics: Metrics, client: Client, sender: Sender>, @@ -195,10 +156,11 @@ where fn new( client: Client, pending_blocks: Arc>, + max_depth: u64, rx: Arc>>, sender: Sender>, ) -> Self { - Self { metrics: Metrics::default(), pending_blocks, client, rx, sender } + Self { metrics: Metrics::default(), pending_blocks, client, max_depth, rx, sender } } async fn start(&self) { @@ -255,12 +217,17 @@ where .filter(|fb| fb.metadata.block_number == block.number) .count(); self.metrics.flashblocks_in_block.record(num_flashblocks_for_canon as f64); + self.metrics + .pending_snapshot_height + .set(pending_blocks.latest_block_number() as f64); if pending_blocks.latest_block_number() <= block.number { + debug!( + message = "pending snapshot cleared because canonical caught up", + latest_pending_block = pending_blocks.latest_block_number(), + canonical_block = block.number, + ); self.metrics.pending_clear_catchup.increment(1); - self.metrics - .pending_snapshot_height - .set(pending_blocks.latest_block_number() as f64); self.metrics .pending_snapshot_fb_index .set(pending_blocks.latest_flashblock_index() as f64); @@ -273,29 +240,51 @@ where tracked_txns.iter().map(|tx| tx.tx_hash()).collect(); let block_txn_hashes: HashSet<_> = block.body().transactions().map(|tx| tx.tx_hash()).collect(); + let pending_blocks_depth = + block.number - pending_blocks.earliest_block_number(); - flashblocks - .retain(|flashblock| flashblock.metadata.block_number > block.number); + debug!( + message = "canonical block behind latest pending block, checking for reorg and max depth", + latest_pending_block = pending_blocks.latest_block_number(), + earliest_pending_block = pending_blocks.earliest_block_number(), + canonical_block = block.number, + pending_txns_for_block = ?tracked_txn_hashes.len(), + canonical_txns_for_block = ?block_txn_hashes.len(), + pending_blocks_depth = pending_blocks_depth, + max_depth = self.max_depth, + ); if tracked_txn_hashes.len() != block_txn_hashes.len() || tracked_txn_hashes != block_txn_hashes { debug!( message = "reorg detected, recomputing pending flashblocks going ahead of reorg", - latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number, - tracked_txn_hashes_len = tracked_txn_hashes.len(), - block_txn_hashes_len = block_txn_hashes.len(), tracked_txn_hashes = ?tracked_txn_hashes, block_txn_hashes = ?block_txn_hashes, ); self.metrics.pending_clear_reorg.increment(1); // If there is a reorg, we re-process all future flashblocks without reusing the existing pending state + flashblocks + .retain(|flashblock| flashblock.metadata.block_number > block.number); + return self.build_pending_state(None, &flashblocks); + } + + if pending_blocks_depth > self.max_depth { + debug!( + message = + "pending blocks depth exceeds max depth, resetting pending blocks", + pending_blocks_depth = pending_blocks_depth, + max_depth = self.max_depth, + ); + + flashblocks + .retain(|flashblock| flashblock.metadata.block_number > block.number); return self.build_pending_state(None, &flashblocks); } // If no reorg, we can continue building on top of the existing pending state + // NOTE: We do not retain specific flashblocks here to avoid losing track of our "earliest" pending block number self.build_pending_state(prev_pending_blocks, &flashblocks) } } @@ -378,12 +367,11 @@ where let earliest_block_number = flashblocks_per_block.keys().min().unwrap(); let canonical_block = earliest_block_number - 1; let mut last_block_header = self.client.header_by_number(canonical_block)?.ok_or(eyre!( - "Failed to extract header for canonical block number {}. This is okay if your node is not fully synced to tip yet.", + "Failed to extract header for canonical block number {}. This can be ignored if the node has recently restarted, restored from a snapshot or is still syncing.", canonical_block ))?; let evm_config = OpEvmConfig::optimism(self.client.chain_spec()); - let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); @@ -394,6 +382,7 @@ where Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, None => CacheDB::new(state), }; + let mut state_overrides = match &prev_pending_blocks { Some(pending_blocks) => pending_blocks.get_state_overrides().unwrap_or_default(), None => StateOverride::default(), @@ -487,16 +476,23 @@ where let mut next_log_index = 0; for (idx, transaction) in block.body.transactions.iter().enumerate() { - let sender = match transaction.recover_signer() { - Ok(signer) => signer, - Err(err) => return Err(err.into()), + let tx_hash = transaction.tx_hash(); + + let sender = match prev_pending_blocks + .as_ref() + .and_then(|p| p.get_transaction_sender(&tx_hash)) + { + Some(sender) => sender, + None => transaction.recover_signer()?, }; + + pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); let receipt = receipt_by_hash - .get(&transaction.tx_hash()) + .get(&tx_hash) .cloned() - .ok_or(eyre!("missing receipt for {:?}", transaction.tx_hash()))?; + .ok_or(eyre!("missing receipt for {:?}", tx_hash))?; let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); let envelope = recovered_transaction.clone().convert::(); @@ -539,51 +535,47 @@ where pending_blocks_builder.with_transaction(rpc_txn); // Receipt Generation - let meta = TransactionMeta { - tx_hash: transaction.tx_hash(), - index: idx as u64, - block_hash: header.hash(), - block_number: block.number, - base_fee: block.base_fee_per_gas, - excess_blob_gas: block.excess_blob_gas, - timestamp: block.timestamp, - }; - - let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(transaction, sender), - gas_used: receipt.cumulative_gas_used() - gas_used, - next_log_index, - meta, - }; - - let op_receipt = OpReceiptBuilder::new( - self.client.chain_spec().as_ref(), - input, - &mut l1_block_info, - )? - .build(); - - pending_blocks_builder.with_receipt(transaction.tx_hash(), op_receipt); + let op_receipt = prev_pending_blocks + .as_ref() + .and_then(|pending_blocks| pending_blocks.get_receipt(tx_hash)) + .unwrap_or_else(|| { + let meta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: header.hash(), + block_number: block.number, + base_fee: block.base_fee_per_gas, + excess_blob_gas: block.excess_blob_gas, + timestamp: block.timestamp, + }; + + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(transaction, sender), + gas_used: receipt.cumulative_gas_used() - gas_used, + next_log_index, + meta, + }; + + OpReceiptBuilder::new( + self.client.chain_spec().as_ref(), + input, + &mut l1_block_info, + ) + .unwrap() + .build() + }); + + pending_blocks_builder.with_receipt(tx_hash, op_receipt); gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); - let mut should_execute_transaction = false; - match &prev_pending_blocks { - Some(pending_blocks) => { - match pending_blocks.get_transaction_state(transaction.tx_hash()) { - Some(state) => { - pending_blocks_builder - .with_transaction_state(transaction.tx_hash(), state); - } - None => { - should_execute_transaction = true; - } - } - } - None => { - should_execute_transaction = true; - } + let mut should_execute_transaction = true; + if let Some(state) = + prev_pending_blocks.as_ref().and_then(|p| p.get_transaction_state(&tx_hash)) + { + pending_blocks_builder.with_transaction_state(tx_hash, state); + should_execute_transaction = false; } if should_execute_transaction { @@ -605,15 +597,14 @@ where existing.extend(changed_slots); } - pending_blocks_builder - .with_transaction_state(transaction.tx_hash(), state.clone()); + pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); evm.db_mut().commit(state); } Err(e) => { return Err(eyre!( "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", e, - transaction.tx_hash(), + tx_hash, sender )); } diff --git a/crates/flashblocks-rpc/src/subscription.rs b/crates/flashblocks-rpc/src/subscription.rs index 22befb7d..24e72fcc 100644 --- a/crates/flashblocks-rpc/src/subscription.rs +++ b/crates/flashblocks-rpc/src/subscription.rs @@ -1,6 +1,6 @@ use std::{io::Read, sync::Arc, time::Duration}; -use alloy_primitives::{map::foldhash::HashMap, Address, B256, U256}; +use alloy_primitives::{Address, B256, U256, map::foldhash::HashMap}; use alloy_rpc_types_engine::PayloadId; use futures_util::{SinkExt as _, StreamExt}; use reth_optimism_primitives::OpReceipt; diff --git a/crates/flashblocks-rpc/src/tests/mod.rs b/crates/flashblocks-rpc/src/tests/mod.rs index e1477cdc..59995edb 100644 --- a/crates/flashblocks-rpc/src/tests/mod.rs +++ b/crates/flashblocks-rpc/src/tests/mod.rs @@ -1,4 +1,4 @@ -use alloy_primitives::{b256, bytes, Bytes, B256}; +use alloy_primitives::{B256, Bytes, b256, bytes}; mod rpc; mod state; diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index b8bbdfe0..69d2022a 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -6,13 +6,13 @@ mod tests { use alloy_eips::BlockNumberOrTag; use alloy_genesis::Genesis; use alloy_primitives::{ - address, b256, bytes, map::HashMap, Address, Bytes, LogData, TxHash, B256, U256, + Address, B256, Bytes, LogData, TxHash, U256, address, b256, bytes, map::HashMap, }; use alloy_provider::{Provider, RootProvider}; use alloy_rpc_client::RpcClient; use alloy_rpc_types::simulate::{SimBlock, SimulatePayload}; use alloy_rpc_types_engine::PayloadId; - use alloy_rpc_types_eth::{error::EthRpcErrorCode, TransactionInput}; + use alloy_rpc_types_eth::{TransactionInput, error::EthRpcErrorCode}; use op_alloy_consensus::OpDepositReceipt; use op_alloy_network::{Optimism, ReceiptResponse, TransactionResponse}; use op_alloy_rpc_types::OpTransactionRequest; @@ -24,7 +24,7 @@ mod tests { tasks::TaskManager, }; use reth_optimism_chainspec::OpChainSpecBuilder; - use reth_optimism_node::{args::RollupArgs, OpNode}; + use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_optimism_primitives::OpReceipt; use reth_provider::providers::BlockchainProvider; use reth_rpc_eth_api::RpcReceipt; @@ -126,7 +126,7 @@ mod tests { .extend_rpc_modules(move |ctx| { // We are not going to use the websocket connection to send payloads so we use // a dummy url. - let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone())); + let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone(), 5)); flashblocks_state.start(); let api_ext = EthApiExt::new( @@ -527,12 +527,13 @@ mod tests { provider.call(send_eth_call.nonce(4)).block(BlockNumberOrTag::Pending.into()).await; assert!(res.is_err()); - assert!(res - .unwrap_err() - .as_error_resp() - .unwrap() - .message - .contains("insufficient funds for gas")); + assert!( + res.unwrap_err() + .as_error_resp() + .unwrap() + .message + .contains("insufficient funds for gas") + ); // read count1 from counter contract let eth_call_count1 = OpTransactionRequest::default() @@ -603,12 +604,13 @@ mod tests { .await; assert!(res.is_err()); - assert!(res - .unwrap_err() - .as_error_resp() - .unwrap() - .message - .contains("insufficient funds for gas")); + assert!( + res.unwrap_err() + .as_error_resp() + .unwrap() + .message + .contains("insufficient funds for gas") + ); Ok(()) } @@ -706,11 +708,9 @@ mod tests { let receipt_result = node.send_raw_transaction_sync(TRANSFER_ETH_TX, Some(0)).await; let error_code = EthRpcErrorCode::TransactionConfirmationTimeout.code(); - assert!(receipt_result - .err() - .unwrap() - .to_string() - .contains(format!("{}", error_code).as_str())); + assert!( + receipt_result.err().unwrap().to_string().contains(format!("{}", error_code).as_str()) + ); } #[tokio::test] diff --git a/crates/flashblocks-rpc/src/tests/state.rs b/crates/flashblocks-rpc/src/tests/state.rs index 4244e2db..3354bf84 100644 --- a/crates/flashblocks-rpc/src/tests/state.rs +++ b/crates/flashblocks-rpc/src/tests/state.rs @@ -3,11 +3,11 @@ mod tests { use std::{sync::Arc, time::Duration}; use alloy_consensus::{ - crypto::secp256k1::public_key_to_address, BlockHeader, Header, Receipt, Transaction, + BlockHeader, Header, Receipt, Transaction, crypto::secp256k1::public_key_to_address, }; use alloy_eips::{BlockHashOrNumber, Decodable2718, Encodable2718}; use alloy_genesis::GenesisAccount; - use alloy_primitives::{map::foldhash::HashMap, Address, BlockNumber, Bytes, B256, U256}; + use alloy_primitives::{Address, B256, BlockNumber, Bytes, U256, map::foldhash::HashMap}; use alloy_provider::network::BlockResponse; use alloy_rpc_types_engine::PayloadId; use op_alloy_consensus::OpDepositReceipt; @@ -18,16 +18,16 @@ mod tests { revm::database::StateProviderDatabase, transaction_pool::test_utils::TransactionBuilder, }; - use reth_db::{test_utils::TempDatabase, DatabaseEnv}; - use reth_evm::{execute::Executor, ConfigureEvm}; - use reth_optimism_chainspec::{OpChainSpecBuilder, BASE_MAINNET}; + use reth_db::{DatabaseEnv, test_utils::TempDatabase}; + use reth_evm::{ConfigureEvm, execute::Executor}; + use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpecBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_optimism_node::OpNode; use reth_optimism_primitives::{OpBlock, OpBlockBody, OpReceipt, OpTransactionSigned}; use reth_primitives_traits::{Account, Block, RecoveredBlock, SealedHeader}; use reth_provider::{ - providers::BlockchainProvider, BlockWriter, ChainSpecProvider, ExecutionOutcome, - LatestStateProviderRef, ProviderFactory, StateProviderFactory, + BlockWriter, ChainSpecProvider, ExecutionOutcome, LatestStateProviderRef, ProviderFactory, + StateProviderFactory, providers::BlockchainProvider, }; use rollup_boost::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1}; use tokio::time::sleep; @@ -36,7 +36,7 @@ mod tests { rpc::{FlashblocksAPI, PendingBlocksAPI}, state::FlashblocksState, subscription::{Flashblock, FlashblocksReceiver, Metadata}, - tests::{utils::create_test_provider_factory, BLOCK_INFO_TXN, BLOCK_INFO_TXN_HASH}, + tests::{BLOCK_INFO_TXN, BLOCK_INFO_TXN_HASH, utils::create_test_provider_factory}, }; // The amount of time to wait (in milliseconds) after sending a new flashblock or canonical block // so it can be processed by the state processor @@ -256,7 +256,7 @@ mod tests { .try_into_recovered() .expect("able to recover block"); - let flashblocks = FlashblocksState::new(provider.clone()); + let flashblocks = FlashblocksState::new(provider.clone(), 4); flashblocks.start(); flashblocks.on_canonical_block_received(&block); @@ -417,12 +417,14 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -494,12 +496,14 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -560,12 +564,13 @@ mod tests { ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + test.flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -612,12 +617,14 @@ mod tests { 1 ); assert!(test.flashblocks.get_pending_blocks().get_state_overrides().is_some()); - assert!(!test - .flashblocks - .get_pending_blocks() - .get_state_overrides() - .unwrap() - .contains_key(&test.address(User::Alice))); + assert!( + !test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .unwrap() + .contains_key(&test.address(User::Alice)) + ); test.send_flashblock( FlashblockBuilder::new(&test, 1) @@ -748,7 +755,7 @@ mod tests { assert_eq!(pending_nonce, 1); test.new_canonical_block_without_processing(vec![ - test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, 100, 0) + test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, 100, 0), ]) .await; diff --git a/crates/flashblocks-rpc/src/tests/utils.rs b/crates/flashblocks-rpc/src/tests/utils.rs index cca64278..a84815b0 100644 --- a/crates/flashblocks-rpc/src/tests/utils.rs +++ b/crates/flashblocks-rpc/src/tests/utils.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ - init_db, - mdbx::{DatabaseArguments, MaxReadTransactionDuration, KILOBYTE, MEGABYTE}, - test_utils::{create_test_static_files_dir, tempdir_path, TempDatabase, ERROR_DB_CREATION}, - ClientVersion, DatabaseEnv, + ClientVersion, DatabaseEnv, init_db, + mdbx::{DatabaseArguments, KILOBYTE, MEGABYTE, MaxReadTransactionDuration}, + test_utils::{ERROR_DB_CREATION, TempDatabase, create_test_static_files_dir, tempdir_path}, }; -use reth_provider::{providers::StaticFileProvider, ProviderFactory}; +use reth_provider::{ProviderFactory, providers::StaticFileProvider}; pub fn create_test_provider_factory( chain_spec: Arc, diff --git a/crates/metering/src/meter.rs b/crates/metering/src/meter.rs index c9236201..da319f33 100644 --- a/crates/metering/src/meter.rs +++ b/crates/metering/src/meter.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Instant}; -use alloy_consensus::{transaction::SignerRecoverable, BlockHeader, Transaction as _}; +use alloy_consensus::{BlockHeader, Transaction as _, transaction::SignerRecoverable}; use alloy_primitives::{B256, U256}; -use eyre::{eyre, Result as EyreResult}; +use eyre::{Result as EyreResult, eyre}; use reth::revm::db::State; -use reth_evm::{execute::BlockBuilder, ConfigureEvm}; +use reth_evm::{ConfigureEvm, execute::BlockBuilder}; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_primitives_traits::SealedHeader; diff --git a/crates/metering/src/rpc.rs b/crates/metering/src/rpc.rs index 31bd2616..737bbd1e 100644 --- a/crates/metering/src/rpc.rs +++ b/crates/metering/src/rpc.rs @@ -2,7 +2,7 @@ use alloy_consensus::Header; use alloy_eips::BlockNumberOrTag; use alloy_primitives::U256; use jsonrpsee::{ - core::{async_trait, RpcResult}, + core::{RpcResult, async_trait}, proc_macros::rpc, }; use reth::providers::BlockReaderIdExt; diff --git a/crates/metering/src/tests/meter.rs b/crates/metering/src/tests/meter.rs index d861eb40..a83947df 100644 --- a/crates/metering/src/tests/meter.rs +++ b/crates/metering/src/tests/meter.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use alloy_consensus::crypto::secp256k1::public_key_to_address; use alloy_eips::Encodable2718; use alloy_genesis::GenesisAccount; -use alloy_primitives::{keccak256, Address, Bytes, B256, U256}; +use alloy_primitives::{Address, B256, Bytes, U256, keccak256}; use eyre::Context; use op_alloy_consensus::OpTxEnvelope; -use rand::{rngs::StdRng, SeedableRng}; +use rand::{SeedableRng, rngs::StdRng}; use reth::{api::NodeTypesWithDBAdapter, chainspec::EthChainSpec}; -use reth_db::{test_utils::TempDatabase, DatabaseEnv}; -use reth_optimism_chainspec::{OpChainSpec, OpChainSpecBuilder, BASE_MAINNET}; +use reth_db::{DatabaseEnv, test_utils::TempDatabase}; +use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpec, OpChainSpecBuilder}; use reth_optimism_node::OpNode; use reth_optimism_primitives::OpTransactionSigned; use reth_primitives_traits::SealedHeader; -use reth_provider::{providers::BlockchainProvider, HeaderProvider, StateProviderFactory}; +use reth_provider::{HeaderProvider, StateProviderFactory, providers::BlockchainProvider}; use reth_testing_utils::generators::generate_keys; use reth_transaction_pool::test_utils::TransactionBuilder; use tips_core::types::{Bundle, ParsedBundle}; diff --git a/crates/metering/src/tests/rpc.rs b/crates/metering/src/tests/rpc.rs index 5d630628..d41d005d 100644 --- a/crates/metering/src/tests/rpc.rs +++ b/crates/metering/src/tests/rpc.rs @@ -4,7 +4,7 @@ mod tests { use alloy_eips::Encodable2718; use alloy_genesis::Genesis; - use alloy_primitives::{address, b256, bytes, Bytes, U256}; + use alloy_primitives::{Bytes, U256, address, b256, bytes}; use alloy_rpc_client::RpcClient; use op_alloy_consensus::OpTxEnvelope; use reth::{ @@ -15,7 +15,7 @@ mod tests { tasks::TaskManager, }; use reth_optimism_chainspec::OpChainSpecBuilder; - use reth_optimism_node::{args::RollupArgs, OpNode}; + use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_optimism_primitives::OpTransactionSigned; use reth_provider::providers::BlockchainProvider; use reth_transaction_pool::test_utils::TransactionBuilder; diff --git a/crates/metering/src/tests/utils.rs b/crates/metering/src/tests/utils.rs index d8181df4..7bd29fef 100644 --- a/crates/metering/src/tests/utils.rs +++ b/crates/metering/src/tests/utils.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ - init_db, - mdbx::{DatabaseArguments, MaxReadTransactionDuration, KILOBYTE, MEGABYTE}, - test_utils::{create_test_static_files_dir, tempdir_path, TempDatabase, ERROR_DB_CREATION}, - ClientVersion, DatabaseEnv, + ClientVersion, DatabaseEnv, init_db, + mdbx::{DatabaseArguments, KILOBYTE, MEGABYTE, MaxReadTransactionDuration}, + test_utils::{ERROR_DB_CREATION, TempDatabase, create_test_static_files_dir, tempdir_path}, }; -use reth_provider::{providers::StaticFileProvider, ProviderFactory}; +use reth_provider::{ProviderFactory, providers::StaticFileProvider}; pub fn create_provider_factory( chain_spec: Arc, diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index c598ae3f..9162c746 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -13,11 +13,11 @@ use once_cell::sync::OnceCell; use reth::{ builder::{EngineNodeLauncher, Node, NodeHandle, TreeConfig}, providers::providers::BlockchainProvider, - version::{default_reth_version_metadata, try_init_version_metadata, RethCliVersionConsts}, + version::{RethCliVersionConsts, default_reth_version_metadata, try_init_version_metadata}, }; use reth_exex::ExExEvent; -use reth_optimism_cli::{chainspec::OpChainSpecParser, Cli}; -use reth_optimism_node::{args::RollupArgs, OpNode}; +use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_node::{OpNode, args::RollupArgs}; use tracing::info; use url::Url; @@ -35,6 +35,13 @@ struct Args { #[arg(long = "websocket-url", value_name = "WEBSOCKET_URL")] pub websocket_url: Option, + #[arg( + long = "max-pending-blocks-depth", + value_name = "MAX_PENDING_BLOCKS_DEPTH", + default_value = "3" + )] + pub max_pending_blocks_depth: u64, + /// Enable transaction tracing ExEx for mempool-to-block timing analysis #[arg(long = "enable-transaction-tracing", value_name = "ENABLE_TRANSACTION_TRACING")] pub enable_transaction_tracing: bool, @@ -105,7 +112,12 @@ fn main() { let fb_cell = fb_cell.clone(); move |mut ctx| async move { let fb = fb_cell - .get_or_init(|| Arc::new(FlashblocksState::new(ctx.provider().clone()))) + .get_or_init(|| { + Arc::new(FlashblocksState::new( + ctx.provider().clone(), + args.max_pending_blocks_depth, + )) + }) .clone(); Ok(async move { while let Some(note) = ctx.notifications.try_next().await? { @@ -139,7 +151,12 @@ fn main() { )?; let fb = fb_cell - .get_or_init(|| Arc::new(FlashblocksState::new(ctx.provider().clone()))) + .get_or_init(|| { + Arc::new(FlashblocksState::new( + ctx.provider().clone(), + args.max_pending_blocks_depth, + )) + }) .clone(); fb.start(); diff --git a/crates/transaction-tracing/src/tracing.rs b/crates/transaction-tracing/src/tracing.rs index 3d06c57d..5c299326 100644 --- a/crates/transaction-tracing/src/tracing.rs +++ b/crates/transaction-tracing/src/tracing.rs @@ -10,7 +10,7 @@ use futures::StreamExt; use lru::LruCache; use reth::{ api::{BlockBody, FullNodeComponents}, - core::primitives::{transaction::TxHashRef, AlloyBlockHeader}, + core::primitives::{AlloyBlockHeader, transaction::TxHashRef}, transaction_pool::{FullTransactionEvent, TransactionPool}, }; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; diff --git a/justfile b/justfile index 4717655e..c0861665 100644 --- a/justfile +++ b/justfile @@ -9,10 +9,11 @@ test: cargo test --workspace --all-features check-format: - cargo fmt --all -- --check + cargo +nightly fmt --all -- --check fix-format: - cargo fmt --all + cargo fix --allow-dirty --allow-staged + cargo +nightly fmt --all check-clippy: cargo clippy --all-targets -- -D warnings diff --git a/rustfmt.toml b/rustfmt.toml index 7b74d4d6..6e800413 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,4 @@ -style_edition = "2021" +style_edition = "2024" imports_granularity = "Crate" group_imports = "StdExternalCrate" use_small_heuristics = "Max"