diff --git a/crates/flashblocks-rpc/src/metrics.rs b/crates/flashblocks-rpc/src/metrics.rs index 5c37585..bd73816 100644 --- a/crates/flashblocks-rpc/src/metrics.rs +++ b/crates/flashblocks-rpc/src/metrics.rs @@ -47,6 +47,9 @@ pub struct Metrics { #[metric(describe = "Count of times flashblocks simulate_v1 is called")] pub simulate_v1: Counter, + #[metric(describe = "Count of times flashblocks get_logs is called")] + pub get_logs: Counter, + #[metric( describe = "Number of times pending snapshot was cleared because canonical caught up" )] diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index dba4a1b..14e5d71 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -6,7 +6,7 @@ use alloy_primitives::{ }; use alloy_provider::network::TransactionResponse; use alloy_rpc_types::{state::StateOverride, BlockTransactions}; -use alloy_rpc_types_eth::Header as RPCHeader; +use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; @@ -230,4 +230,19 @@ impl PendingBlocks { pub fn get_state_overrides(&self) -> Option { self.state_overrides.clone() } + + pub fn get_pending_logs(&self, filter: &Filter) -> Vec { + let mut logs = Vec::new(); + + // Iterate through all transaction receipts in pending state + for receipt in self.transaction_receipts.values() { + for log in receipt.inner.logs() { + if filter.matches(&log.inner) { + logs.push(log.clone()); + } + } + } + + logs + } } diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 85be61a..e0a9c61 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -9,6 +9,7 @@ use alloy_primitives::{Address, TxHash, U256}; use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock}; use alloy_rpc_types::state::{EvmOverrides, StateOverride, StateOverridesBuilder}; use alloy_rpc_types::BlockOverrides; +use alloy_rpc_types_eth::{Filter, Log}; use arc_swap::Guard; use jsonrpsee::{ core::{async_trait, RpcResult}, @@ -19,11 +20,12 @@ use jsonrpsee_types::ErrorObjectOwned; use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::providers::CanonStateSubscriptions; +use reth::rpc::eth::EthFilter; use reth::rpc::server_types::eth::EthApiError; use reth_rpc_eth_api::helpers::EthState; use reth_rpc_eth_api::helpers::EthTransactions; use reth_rpc_eth_api::helpers::{EthBlocks, EthCall}; -use reth_rpc_eth_api::{helpers::FullEthApi, RpcBlock}; +use reth_rpc_eth_api::{helpers::FullEthApi, EthApiTypes, EthFilterApiServer, RpcBlock}; use reth_rpc_eth_api::{RpcReceipt, RpcTransaction}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -64,6 +66,9 @@ pub trait PendingBlocksAPI { /// Gets the state overrides for the pending blocks fn get_state_overrides(&self) -> Option; + + /// Gets logs from pending state matching the provided filter. + fn get_pending_logs(&self, filter: &Filter) -> Vec; } #[cfg_attr(not(test), rpc(server, namespace = "eth"))] @@ -129,19 +134,24 @@ pub trait EthApiOverride { opts: SimulatePayload, block_number: Option, ) -> RpcResult>>>; + + #[method(name = "getLogs")] + async fn get_logs(&self, filter: Filter) -> RpcResult>; } #[derive(Debug)] -pub struct EthApiExt { +pub struct EthApiExt { eth_api: Eth, + eth_filter: EthFilter, flashblocks_state: Arc, metrics: Metrics, } -impl EthApiExt { - pub fn new(eth_api: Eth, flashblocks_state: Arc) -> Self { +impl EthApiExt { + pub fn new(eth_api: Eth, eth_filter: EthFilter, flashblocks_state: Arc) -> Self { Self { eth_api, + eth_filter, flashblocks_state, metrics: Metrics::default(), } @@ -451,6 +461,58 @@ where .await .map_err(Into::into) } + + async fn get_logs(&self, filter: Filter) -> RpcResult> { + debug!( + message = "rpc::get_logs", + address = ?filter.address + ); + + // Check if this is a mixed query (toBlock is pending) + let (from_block, to_block) = match &filter.block_option { + alloy_rpc_types_eth::FilterBlockOption::Range { + from_block, + to_block, + } => (*from_block, *to_block), + _ => { + // Block hash queries or other formats - delegate to eth API + return self.eth_filter.logs(filter).await; + } + }; + + // If toBlock is not pending, delegate to eth API + if !matches!(to_block, Some(BlockNumberOrTag::Pending)) { + return self.eth_filter.logs(filter).await; + } + + // Mixed query: toBlock is pending, so we need to combine historical + pending logs + self.metrics.get_logs.increment(1); + let mut all_logs = Vec::new(); + + let pending_blocks = self.flashblocks_state.get_pending_blocks(); + + // Get historical logs if fromBlock is not pending + if !matches!(from_block, Some(BlockNumberOrTag::Pending)) { + // Use the canonical block number from pending blocks to ensure consistency + let canonical_block = pending_blocks.get_canonical_block_number(); + + // Create a filter for historical data (fromBlock to canonical block) + let mut historical_filter = filter.clone(); + historical_filter.block_option = alloy_rpc_types_eth::FilterBlockOption::Range { + from_block, + to_block: Some(canonical_block), + }; + + let historical_logs = self.eth_filter.logs(historical_filter).await?; + all_logs.extend(historical_logs); + } + + // Always get pending logs when toBlock is pending + let pending_logs = pending_blocks.get_pending_logs(&filter); + all_logs.extend(pending_logs); + + Ok(all_logs) + } } impl EthApiExt diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index b00d5e0..bbbf898 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -8,9 +8,10 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::map::foldhash::HashMap; use alloy_primitives::map::B256HashMap; use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, B256, U256}; -use alloy_rpc_types::{state::StateOverride, TransactionTrait, Withdrawal}; +use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; -use alloy_rpc_types_eth::state::{AccountOverride, StateOverridesBuilder}; +use alloy_rpc_types_eth::state::{AccountOverride, StateOverride, StateOverridesBuilder}; +use alloy_rpc_types_eth::{Filter, Log}; use arc_swap::{ArcSwapOption, Guard}; use eyre::eyre; use op_alloy_consensus::OpTxEnvelope; @@ -168,6 +169,12 @@ impl PendingBlocksAPI for Guard>> { .map(|pb| pb.get_state_overrides()) .unwrap_or_default() } + + fn get_pending_logs(&self, filter: &Filter) -> Vec { + self.as_ref() + .map(|pb| pb.get_pending_logs(filter)) + .unwrap_or_default() + } } #[derive(Debug, Clone)] diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 2854f21..0ed37d4 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -8,7 +8,7 @@ mod tests { use alloy_eips::BlockNumberOrTag; use alloy_genesis::Genesis; use alloy_primitives::map::HashMap; - use alloy_primitives::{address, b256, bytes, Address, Bytes, TxHash, B256, U256}; + use alloy_primitives::{address, b256, bytes, Address, Bytes, LogData, TxHash, B256, U256}; use alloy_provider::Provider; use alloy_provider::RootProvider; use alloy_rpc_client::RpcClient; @@ -134,8 +134,11 @@ mod tests { let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone())); flashblocks_state.start(); - let api_ext = - EthApiExt::new(ctx.registry.eth_api().clone(), flashblocks_state.clone()); + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + flashblocks_state.clone(), + ); ctx.modules.replace_configured(api_ext.into_rpc())?; @@ -226,6 +229,37 @@ mod tests { const COUNTER_ADDRESS: Address = address!("0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512"); + // Test log topics - these represent common events + const TEST_LOG_TOPIC_0: B256 = + b256!("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"); // Transfer event + const TEST_LOG_TOPIC_1: B256 = + b256!("0x000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266"); // From address + const TEST_LOG_TOPIC_2: B256 = + b256!("0x0000000000000000000000001234567890123456789012345678901234567890"); // To address + + fn create_test_logs() -> Vec { + vec![ + alloy_primitives::Log { + address: COUNTER_ADDRESS, + data: LogData::new( + vec![TEST_LOG_TOPIC_0, TEST_LOG_TOPIC_1, TEST_LOG_TOPIC_2], + bytes!("0x0000000000000000000000000000000000000000000000000de0b6b3a7640000") + .into(), // 1 ETH in wei + ) + .unwrap(), + }, + alloy_primitives::Log { + address: TEST_ADDRESS, + data: LogData::new( + vec![TEST_LOG_TOPIC_0], + bytes!("0x0000000000000000000000000000000000000000000000000000000000000001") + .into(), // Value: 1 + ) + .unwrap(), + }, + ] + } + // NOTE: // To create tx use cast mktx/ // Example: `cast mktx --private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 --nonce 1 --gas-limit 100000 --gas-price 1499576 --chain 84532 --value 0 --priority-gas-price 0 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 0x` @@ -296,7 +330,7 @@ mod tests { OpReceipt::Legacy(Receipt { status: true.into(), cumulative_gas_used: 172279 + 44000, - logs: vec![], + logs: create_test_logs(), }), ); receipts @@ -671,4 +705,180 @@ mod tests { .to_string() .contains(format!("{}", error_code).as_str())); } + + #[tokio::test] + async fn test_get_logs_pending() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + // Test no logs when no flashblocks sent + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + assert_eq!(logs.len(), 0); + + // Send payloads with transactions + node.send_test_payloads().await?; + + // Test getting pending logs - must use both fromBlock and toBlock as "pending" + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // We should now have 2 logs from the INCREMENT_TX transaction + assert_eq!(logs.len(), 2); + + // Verify the first log is from COUNTER_ADDRESS + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].topics()[0], TEST_LOG_TOPIC_0); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + // Verify the second log is from TEST_ADDRESS + assert_eq!(logs[1].address(), TEST_ADDRESS); + assert_eq!(logs[1].topics()[0], TEST_LOG_TOPIC_0); + assert_eq!(logs[1].transaction_hash, Some(INCREMENT_HASH)); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_filter_by_address() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test filtering by a specific address (COUNTER_ADDRESS) + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .address(COUNTER_ADDRESS) + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get only 1 log from COUNTER_ADDRESS + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + // Test filtering by TEST_ADDRESS + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .address(TEST_ADDRESS) + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get only 1 log from TEST_ADDRESS + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), TEST_ADDRESS); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_topic_filtering() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test filtering by topic - should match both logs + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .event_signature(TEST_LOG_TOPIC_0) + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.topics()[0] == TEST_LOG_TOPIC_0)); + + // Test filtering by specific topic combination - should match only the first log + let filter = alloy_rpc_types_eth::Filter::default() + .topic1(TEST_LOG_TOPIC_1) + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending); + + let logs = provider.get_logs(&filter).await?; + + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].topics()[1], TEST_LOG_TOPIC_1); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_mixed_block_ranges() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test fromBlock: 0, toBlock: pending (should include both historical and pending) + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(0) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should now include pending logs (2 logs from our test setup) + assert_eq!(logs.len(), 2); + assert!(logs + .iter() + .all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + // Test fromBlock: latest, toBlock: pending + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Latest) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should include pending logs (historical part is empty in our test setup) + assert_eq!(logs.len(), 2); + assert!(logs + .iter() + .all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + // Test fromBlock: earliest, toBlock: pending + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Earliest) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should include pending logs (historical part is empty in our test setup) + assert_eq!(logs.len(), 2); + assert!(logs + .iter() + .all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + Ok(()) + } } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 0527577..d1e7cd7 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -118,7 +118,12 @@ fn main() { let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); flashblocks_client.start(); - let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), fb); + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + fb, + ); + ctx.modules.replace_configured(api_ext.into_rpc())?; } else { info!(message = "flashblocks integration is disabled");