From 829d96dd4b1f38d0d5740a02d767358b85058b01 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 23 Sep 2025 20:14:42 -0400 Subject: [PATCH 01/13] feat: implement get_logs RPC method for pending state - Add get_logs counter metric - Implement pending logs filtering with address/topic support - Add get_logs RPC method with pending/historical logic - Include comprehensive tests for log retrieval functionality --- crates/flashblocks-rpc/src/metrics.rs | 3 + crates/flashblocks-rpc/src/pending_blocks.rs | 57 +++++++++++- crates/flashblocks-rpc/src/rpc.rs | 58 ++++++++++++ crates/flashblocks-rpc/src/state.rs | 13 ++- crates/flashblocks-rpc/src/tests/rpc.rs | 93 ++++++++++++++++++++ 5 files changed, 222 insertions(+), 2 deletions(-) diff --git a/crates/flashblocks-rpc/src/metrics.rs b/crates/flashblocks-rpc/src/metrics.rs index c816b51..de9c231 100644 --- a/crates/flashblocks-rpc/src/metrics.rs +++ b/crates/flashblocks-rpc/src/metrics.rs @@ -42,6 +42,9 @@ pub struct Metrics { #[metric(describe = "Count of times flashblocks simulate_v1 is called")] pub simulate_v1: Counter, + #[metric(describe = "Count of times flashblocks get_logs is called")] + pub get_logs: Counter, + #[metric( describe = "Number of times pending snapshot was cleared because canonical caught up" )] diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index c6c1422..8f47a59 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -5,7 +5,7 @@ use alloy_primitives::{ }; use alloy_provider::network::TransactionResponse; use alloy_rpc_types::{state::StateOverride, BlockTransactions}; -use alloy_rpc_types_eth::Header as RPCHeader; +use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log}; use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; @@ -225,4 +225,59 @@ impl PendingBlocks { pub fn get_state_overrides(&self) -> Option { self.state_overrides.clone() } + + pub fn get_pending_logs(&self, filter: &Filter) -> Vec { + let mut logs = Vec::new(); + let mut log_index = 0u64; + + // Get latest block context for pending logs + let latest_header = self.latest_header(); + let block_number = latest_header.number; + let block_hash = latest_header.hash(); + + // Iterate through all transaction receipts in pending state + for (tx_hash, receipt) in &self.transaction_receipts { + // Extract logs from OpTransactionReceipt using the inner receipt's logs + let receipt_logs = receipt.inner.logs(); + + // Apply filter to each log and add block context + for log in receipt_logs { + if self.matches_filter(log, filter) { + let mut pending_log = log.clone(); + + // Add block and transaction context + pending_log.log_index = Some(log_index); + pending_log.transaction_hash = Some(*tx_hash); + pending_log.block_number = Some(block_number); + pending_log.block_hash = Some(block_hash); + pending_log.removed = false; // Pending logs are never removed + + logs.push(pending_log); + } + log_index += 1; + } + } + + logs + } + + fn matches_filter(&self, log: &Log, filter: &Filter) -> bool { + // Address filtering - check if filter has address and if log matches + if !filter.address.matches(&log.address()) { + return false; + } + + // Topic filtering - check each topic position + for (i, topic_filter) in filter.topics.iter().enumerate() { + if let Some(log_topic) = log.topics().get(i) { + if !topic_filter.matches(log_topic) { + return false; + } + } else if !topic_filter.matches(&Default::default()) { + return false; + } + } + + true + } } diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 76be39f..55d8951 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -8,6 +8,7 @@ use alloy_primitives::{Address, TxHash, U256}; use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock}; use alloy_rpc_types::state::{EvmOverrides, StateOverride, StateOverridesBuilder}; use alloy_rpc_types::BlockOverrides; +use alloy_rpc_types_eth::{Filter, Log}; use jsonrpsee::{ core::{async_trait, RpcResult}, proc_macros::rpc, @@ -54,6 +55,9 @@ pub trait FlashblocksAPI { fn subscribe_to_flashblocks(&self) -> broadcast::Receiver; fn get_state_overrides(&self) -> Option; + + /// Gets logs from pending state matching the provided filter. + fn get_pending_logs(&self, filter: &Filter) -> Vec; } #[cfg_attr(not(test), rpc(server, namespace = "eth"))] @@ -119,6 +123,9 @@ pub trait EthApiOverride { opts: SimulatePayload, block_number: Option, ) -> RpcResult>>>; + + #[method(name = "getLogs")] + async fn get_logs(&self, filter: Filter) -> RpcResult>; } #[derive(Debug)] @@ -410,6 +417,31 @@ where .await .map_err(Into::into) } + + async fn get_logs(&self, filter: Filter) -> RpcResult> { + debug!( + message = "rpc::get_logs", + address = ?filter.address + ); + + // Check if filter range includes pending/latest blocks + let should_include_pending = self.should_include_pending_logs(&filter); + + if should_include_pending { + self.metrics.get_logs.increment(1); + + // For now, just return pending logs + // TODO: Implement historical log delegation once the correct reth API is identified + // Historical logs would be retrieved and combined here + let pending_logs = self.flashblocks_state.get_pending_logs(&filter); + Ok(pending_logs) + } else { + // Pure historical query - would delegate to reth here + // TODO: Implement historical log delegation to underlying reth node + // This should use the same pattern as other methods like get_balance + Ok(Vec::new()) + } + } } impl EthApiExt @@ -417,6 +449,32 @@ where Eth: FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { + fn should_include_pending_logs(&self, filter: &Filter) -> bool { + // Check the block_option in the filter + match &filter.block_option { + alloy_rpc_types_eth::FilterBlockOption::Range { + from_block: _, + to_block, + } => { + match to_block { + Some(BlockNumberOrTag::Pending) => true, + Some(BlockNumberOrTag::Latest) => true, + None => true, // Default toBlock is "latest" + Some(BlockNumberOrTag::Number(to_block)) => { + // Case 3: toBlock is specific number >= current latest + if let Some(latest_block) = self.flashblocks_state.get_block(false) { + *to_block >= latest_block.header.number + } else { + false + } + } + _ => false, // Other cases like "earliest" + } + } + alloy_rpc_types_eth::FilterBlockOption::AtBlockHash(_) => false, // Specific block hash + } + } + async fn wait_for_flashblocks_receipt(&self, tx_hash: TxHash) -> Option> { let mut receiver = self.flashblocks_state.subscribe_to_flashblocks(); diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index a494fc7..fb8ba9b 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -11,6 +11,7 @@ use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, TxHash, B256, U256 use alloy_rpc_types::{TransactionTrait, Withdrawal}; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; use alloy_rpc_types_eth::state::{AccountOverride, StateOverride, StateOverridesBuilder}; +use alloy_rpc_types_eth::{Filter, Log}; use arc_swap::ArcSwapOption; use eyre::eyre; use op_alloy_consensus::OpTxEnvelope; @@ -167,6 +168,14 @@ impl FlashblocksAPI for FlashblocksState { .as_ref() .and_then(|pb| pb.get_state_overrides()) } + + fn get_pending_logs(&self, filter: &Filter) -> Vec { + self.pending_blocks + .load() + .as_ref() + .map(|pb| pb.get_pending_logs(filter)) + .unwrap_or_default() + } } #[derive(Debug, Clone)] @@ -281,7 +290,9 @@ where warn!( message = "reorg detected, clearing pending blocks", latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number + canonical_block = block.number, + tracked_txn_hashes_len = tracked_txn_hashes.len(), + block_txn_hashes_len = block_txn_hashes.len(), ); self.metrics.pending_clear_reorg.increment(1); diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index d81ed8d..3fddbbf 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -671,4 +671,97 @@ mod tests { .to_string() .contains(format!("{}", error_code).as_str())); } + + #[tokio::test] + async fn test_get_logs_pending() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + // Test no logs when no flashblocks sent + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + assert_eq!(logs.len(), 0); + + // Send payloads with transactions + node.send_test_payloads().await?; + + // Test getting pending logs - the current payloads don't have logs + // but this tests the API is working correctly + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Since our test transactions don't emit logs, we expect 0 logs + // but this confirms the API works and processes pending state + assert_eq!(logs.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_filter_by_address() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test filtering by a specific address + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .address(COUNTER_ADDRESS) + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Since our test contract doesn't emit logs in our test transactions, + // we expect 0 logs, but this tests the address filtering works + assert_eq!(logs.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_historical_vs_pending() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + // Test historical logs (should work but return empty for our test setup) + let historical_logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(0) + .to_block(0), + ) + .await?; + + // Historical query should return empty in our current implementation + assert_eq!(historical_logs.len(), 0); + + node.send_test_payloads().await?; + + // Test pending logs after sending payloads + let pending_logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should still be 0 since our test transactions don't emit logs + // but confirms the pending logic is working + assert_eq!(pending_logs.len(), 0); + + Ok(()) + } } From 5ed0c7759c47b4e6da4fbb6e6622c388ba7d412f Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 23 Sep 2025 21:06:53 -0400 Subject: [PATCH 02/13] fix test to properly test logs --- crates/flashblocks-rpc/src/state.rs | 4 +- crates/flashblocks-rpc/src/tests/rpc.rs | 119 +++++++++++++++++++++--- 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index fb8ba9b..53c8fae 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -290,9 +290,7 @@ where warn!( message = "reorg detected, clearing pending blocks", latest_pending_block = pending_blocks.latest_block_number(), - canonical_block = block.number, - tracked_txn_hashes_len = tracked_txn_hashes.len(), - block_txn_hashes_len = block_txn_hashes.len(), + canonical_block = block.number ); self.metrics.pending_clear_reorg.increment(1); diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 3fddbbf..9798ca7 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -8,7 +8,7 @@ mod tests { use alloy_eips::BlockNumberOrTag; use alloy_genesis::Genesis; use alloy_primitives::map::HashMap; - use alloy_primitives::{address, b256, bytes, Address, Bytes, TxHash, B256, U256}; + use alloy_primitives::{address, b256, bytes, Address, Bytes, LogData, TxHash, B256, U256}; use alloy_provider::Provider; use alloy_provider::RootProvider; use alloy_rpc_client::RpcClient; @@ -226,6 +226,37 @@ mod tests { const COUNTER_ADDRESS: Address = address!("0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512"); + // Test log topics - these represent common events + const TEST_LOG_TOPIC_0: B256 = + b256!("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"); // Transfer event + const TEST_LOG_TOPIC_1: B256 = + b256!("0x000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266"); // From address + const TEST_LOG_TOPIC_2: B256 = + b256!("0x0000000000000000000000001234567890123456789012345678901234567890"); // To address + + fn create_test_logs() -> Vec { + vec![ + alloy_primitives::Log { + address: COUNTER_ADDRESS, + data: LogData::new( + vec![TEST_LOG_TOPIC_0, TEST_LOG_TOPIC_1, TEST_LOG_TOPIC_2], + bytes!("0x0000000000000000000000000000000000000000000000000de0b6b3a7640000") + .into(), // 1 ETH in wei + ) + .unwrap(), + }, + alloy_primitives::Log { + address: TEST_ADDRESS, + data: LogData::new( + vec![TEST_LOG_TOPIC_0], + bytes!("0x0000000000000000000000000000000000000000000000000000000000000001") + .into(), // Value: 1 + ) + .unwrap(), + }, + ] + } + // NOTE: // To create tx use cast mktx/ // Example: `cast mktx --private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 --nonce 1 --gas-limit 100000 --gas-price 1499576 --chain 84532 --value 0 --priority-gas-price 0 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 0x` @@ -296,7 +327,7 @@ mod tests { OpReceipt::Legacy(Receipt { status: true.into(), cumulative_gas_used: 172279 + 44000, - logs: vec![], + logs: create_test_logs(), }), ); receipts @@ -690,8 +721,7 @@ mod tests { // Send payloads with transactions node.send_test_payloads().await?; - // Test getting pending logs - the current payloads don't have logs - // but this tests the API is working correctly + // Test getting pending logs - the INCREMENT_TX should have logs let logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() @@ -699,9 +729,18 @@ mod tests { ) .await?; - // Since our test transactions don't emit logs, we expect 0 logs - // but this confirms the API works and processes pending state - assert_eq!(logs.len(), 0); + // We should now have 2 logs from the INCREMENT_TX transaction + assert_eq!(logs.len(), 2); + + // Verify the first log is from COUNTER_ADDRESS + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].topics()[0], TEST_LOG_TOPIC_0); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + // Verify the second log is from TEST_ADDRESS + assert_eq!(logs[1].address(), TEST_ADDRESS); + assert_eq!(logs[1].topics()[0], TEST_LOG_TOPIC_0); + assert_eq!(logs[1].transaction_hash, Some(INCREMENT_HASH)); Ok(()) } @@ -714,7 +753,7 @@ mod tests { node.send_test_payloads().await?; - // Test filtering by a specific address + // Test filtering by a specific address (COUNTER_ADDRESS) let logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() @@ -723,9 +762,24 @@ mod tests { ) .await?; - // Since our test contract doesn't emit logs in our test transactions, - // we expect 0 logs, but this tests the address filtering works - assert_eq!(logs.len(), 0); + // Should get only 1 log from COUNTER_ADDRESS + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + // Test filtering by TEST_ADDRESS + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .address(TEST_ADDRESS) + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get only 1 log from TEST_ADDRESS + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), TEST_ADDRESS); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); Ok(()) } @@ -758,9 +812,46 @@ mod tests { ) .await?; - // Should still be 0 since our test transactions don't emit logs - // but confirms the pending logic is working - assert_eq!(pending_logs.len(), 0); + // Should have 2 logs from the INCREMENT_TX transaction + assert_eq!(pending_logs.len(), 2); + + // Verify both logs have the correct transaction hash + assert_eq!(pending_logs[0].transaction_hash, Some(INCREMENT_HASH)); + assert_eq!(pending_logs[1].transaction_hash, Some(INCREMENT_HASH)); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_topic_filtering() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test filtering by topic - should match both logs + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .event_signature(TEST_LOG_TOPIC_0) + .select(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.topics()[0] == TEST_LOG_TOPIC_0)); + + // Test filtering by specific topic combination - should match only the first log + let filter = alloy_rpc_types_eth::Filter::default() + .topic1(TEST_LOG_TOPIC_1) + .select(alloy_eips::BlockNumberOrTag::Pending); + + let logs = provider.get_logs(&filter).await?; + + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].topics()[1], TEST_LOG_TOPIC_1); Ok(()) } From 7d69c71cd7196166185a22d7cd785b6b18e66c34 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 13:56:14 -0400 Subject: [PATCH 03/13] wip --- crates/flashblocks-rpc/src/rpc.rs | 21 +++++++++++---------- crates/node/src/main.rs | 4 +++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 55d8951..1617ec4 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -22,7 +22,7 @@ use reth::rpc::server_types::eth::EthApiError; use reth_rpc_eth_api::helpers::EthState; use reth_rpc_eth_api::helpers::EthTransactions; use reth_rpc_eth_api::helpers::{EthBlocks, EthCall}; -use reth_rpc_eth_api::{helpers::FullEthApi, RpcBlock}; +use reth_rpc_eth_api::{helpers::FullEthApi, EthFilterApiServer, RpcBlock}; use reth_rpc_eth_api::{RpcReceipt, RpcTransaction}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -129,16 +129,18 @@ pub trait EthApiOverride { } #[derive(Debug)] -pub struct EthApiExt { +pub struct EthApiExt { eth_api: Eth, + eth_filter: EthFilter, flashblocks_state: Arc, metrics: Metrics, } -impl EthApiExt { - pub fn new(eth_api: Eth, flashblocks_state: Arc) -> Self { +impl EthApiExt { + pub fn new(eth_api: Eth, eth_filter: EthFilter, flashblocks_state: Arc) -> Self { Self { eth_api, + eth_filter, flashblocks_state, metrics: Metrics::default(), } @@ -146,9 +148,10 @@ impl EthApiExt { } #[async_trait] -impl EthApiOverrideServer for EthApiExt +impl EthApiOverrideServer for EthApiExt where Eth: FullEthApi + Send + Sync + 'static, + EthFilter: EthFilterApiServer, FB: FlashblocksAPI + Send + Sync + 'static, jsonrpsee_types::error::ErrorObject<'static>: From, { @@ -436,17 +439,15 @@ where let pending_logs = self.flashblocks_state.get_pending_logs(&filter); Ok(pending_logs) } else { - // Pure historical query - would delegate to reth here - // TODO: Implement historical log delegation to underlying reth node - // This should use the same pattern as other methods like get_balance - Ok(Vec::new()) + self.eth_filter.logs(filter).await.map_err(Into::into) } } } -impl EthApiExt +impl EthApiExt where Eth: FullEthApi + Send + Sync + 'static, + EthFilter: EthFilterApiServer, FB: FlashblocksAPI + Send + Sync + 'static, { fn should_include_pending_logs(&self, filter: &Filter) -> bool { diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 7a66e5b..75b7591 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,6 +1,7 @@ use base_reth_flashblocks_rpc::rpc::EthApiExt; use futures_util::TryStreamExt; use once_cell::sync::OnceCell; +use reth::rpc::eth::EthFilter; use reth_exex::ExExEvent; use std::sync::Arc; @@ -106,7 +107,8 @@ fn main() { let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); flashblocks_client.start(); - let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), fb); + let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), ctx.registry.eth_handlers().filter.clone(), fb); + ctx.modules.replace_configured(api_ext.into_rpc())?; } else { info!(message = "flashblocks integration is disabled"); From a9cc05cd0d1ed74ee10d42e640b3e5f5cbad1558 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 14:23:07 -0400 Subject: [PATCH 04/13] fix test --- crates/flashblocks-rpc/src/rpc.rs | 21 ++++++++++----------- crates/flashblocks-rpc/src/tests/rpc.rs | 7 +++++-- crates/node/src/main.rs | 9 ++++++--- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 1617ec4..e2a4c04 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -18,11 +18,12 @@ use jsonrpsee_types::ErrorObjectOwned; use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::providers::CanonStateSubscriptions; +use reth::rpc::eth::EthFilter; use reth::rpc::server_types::eth::EthApiError; use reth_rpc_eth_api::helpers::EthState; use reth_rpc_eth_api::helpers::EthTransactions; use reth_rpc_eth_api::helpers::{EthBlocks, EthCall}; -use reth_rpc_eth_api::{helpers::FullEthApi, EthFilterApiServer, RpcBlock}; +use reth_rpc_eth_api::{helpers::FullEthApi, EthApiTypes, EthFilterApiServer, RpcBlock}; use reth_rpc_eth_api::{RpcReceipt, RpcTransaction}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -129,15 +130,15 @@ pub trait EthApiOverride { } #[derive(Debug)] -pub struct EthApiExt { +pub struct EthApiExt { eth_api: Eth, - eth_filter: EthFilter, + eth_filter: EthFilter, flashblocks_state: Arc, metrics: Metrics, } -impl EthApiExt { - pub fn new(eth_api: Eth, eth_filter: EthFilter, flashblocks_state: Arc) -> Self { +impl EthApiExt { + pub fn new(eth_api: Eth, eth_filter: EthFilter, flashblocks_state: Arc) -> Self { Self { eth_api, eth_filter, @@ -148,10 +149,9 @@ impl EthApiExt { } #[async_trait] -impl EthApiOverrideServer for EthApiExt +impl EthApiOverrideServer for EthApiExt where - Eth: FullEthApi + Send + Sync + 'static, - EthFilter: EthFilterApiServer, + Eth: EthApiTypes + FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, jsonrpsee_types::error::ErrorObject<'static>: From, { @@ -444,10 +444,9 @@ where } } -impl EthApiExt +impl EthApiExt where - Eth: FullEthApi + Send + Sync + 'static, - EthFilter: EthFilterApiServer, + Eth: EthApiTypes + FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { fn should_include_pending_logs(&self, filter: &Filter) -> bool { diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 9798ca7..b62a92d 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -134,8 +134,11 @@ mod tests { let flashblocks_state = Arc::new(FlashblocksState::new(ctx.provider().clone())); flashblocks_state.start(); - let api_ext = - EthApiExt::new(ctx.registry.eth_api().clone(), flashblocks_state.clone()); + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + flashblocks_state.clone(), + ); ctx.modules.replace_configured(api_ext.into_rpc())?; diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 75b7591..d94dc71 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,7 +1,6 @@ use base_reth_flashblocks_rpc::rpc::EthApiExt; use futures_util::TryStreamExt; use once_cell::sync::OnceCell; -use reth::rpc::eth::EthFilter; use reth_exex::ExExEvent; use std::sync::Arc; @@ -107,8 +106,12 @@ fn main() { let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); flashblocks_client.start(); - let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), ctx.registry.eth_handlers().filter.clone(), fb); - + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + fb, + ); + ctx.modules.replace_configured(api_ext.into_rpc())?; } else { info!(message = "flashblocks integration is disabled"); From f9ddb4b69d1d7ce37cabdf4f878c46aee8f4bae9 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 14:37:03 -0400 Subject: [PATCH 05/13] only pending --- crates/flashblocks-rpc/src/rpc.rs | 38 ++++++----------- crates/flashblocks-rpc/src/tests/rpc.rs | 56 +++++++++++++++++++++---- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index e2a4c04..1a2632a 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -427,18 +427,14 @@ where address = ?filter.address ); - // Check if filter range includes pending/latest blocks - let should_include_pending = self.should_include_pending_logs(&filter); - - if should_include_pending { + // Only handle pure pending queries: fromBlock="pending" and toBlock="pending" + // Everything else goes to regular eth API + if self.is_pure_pending_query(&filter) { self.metrics.get_logs.increment(1); - - // For now, just return pending logs - // TODO: Implement historical log delegation once the correct reth API is identified - // Historical logs would be retrieved and combined here let pending_logs = self.flashblocks_state.get_pending_logs(&filter); Ok(pending_logs) } else { + // All other queries - delegate to underlying reth node self.eth_filter.logs(filter).await.map_err(Into::into) } } @@ -449,29 +445,19 @@ where Eth: EthApiTypes + FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { - fn should_include_pending_logs(&self, filter: &Filter) -> bool { - // Check the block_option in the filter + fn is_pure_pending_query(&self, filter: &Filter) -> bool { + // Only return true for pure pending queries: both fromBlock and toBlock must be "pending" match &filter.block_option { alloy_rpc_types_eth::FilterBlockOption::Range { - from_block: _, + from_block, to_block, } => { - match to_block { - Some(BlockNumberOrTag::Pending) => true, - Some(BlockNumberOrTag::Latest) => true, - None => true, // Default toBlock is "latest" - Some(BlockNumberOrTag::Number(to_block)) => { - // Case 3: toBlock is specific number >= current latest - if let Some(latest_block) = self.flashblocks_state.get_block(false) { - *to_block >= latest_block.header.number - } else { - false - } - } - _ => false, // Other cases like "earliest" - } + matches!( + (from_block, to_block), + (Some(BlockNumberOrTag::Pending), Some(BlockNumberOrTag::Pending)) + ) } - alloy_rpc_types_eth::FilterBlockOption::AtBlockHash(_) => false, // Specific block hash + _ => false, // Block hash queries or other formats are not pure pending } } diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index b62a92d..7e153dd 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -724,11 +724,12 @@ mod tests { // Send payloads with transactions node.send_test_payloads().await?; - // Test getting pending logs - the INCREMENT_TX should have logs + // Test getting pending logs - must use both fromBlock and toBlock as "pending" let logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() - .select(alloy_eips::BlockNumberOrTag::Pending), + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), ) .await?; @@ -761,7 +762,8 @@ mod tests { .get_logs( &alloy_rpc_types_eth::Filter::default() .address(COUNTER_ADDRESS) - .select(alloy_eips::BlockNumberOrTag::Pending), + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), ) .await?; @@ -775,7 +777,8 @@ mod tests { .get_logs( &alloy_rpc_types_eth::Filter::default() .address(TEST_ADDRESS) - .select(alloy_eips::BlockNumberOrTag::Pending), + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), ) .await?; @@ -811,7 +814,8 @@ mod tests { let pending_logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() - .select(alloy_eips::BlockNumberOrTag::Pending), + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), ) .await?; @@ -838,7 +842,8 @@ mod tests { .get_logs( &alloy_rpc_types_eth::Filter::default() .event_signature(TEST_LOG_TOPIC_0) - .select(alloy_eips::BlockNumberOrTag::Pending), + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), ) .await?; @@ -848,7 +853,8 @@ mod tests { // Test filtering by specific topic combination - should match only the first log let filter = alloy_rpc_types_eth::Filter::default() .topic1(TEST_LOG_TOPIC_1) - .select(alloy_eips::BlockNumberOrTag::Pending); + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending); let logs = provider.get_logs(&filter).await?; @@ -858,4 +864,40 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_get_logs_non_pending_queries_go_to_eth_api() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test that non-pure-pending queries go to the regular eth API + // This filter from block 0 to pending should go to eth API (not pure pending) + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(0) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should be handled by underlying eth API (empty in our test setup) + assert_eq!(logs.len(), 0); + + // Test that latest queries also go to eth API + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Latest) + .to_block(alloy_eips::BlockNumberOrTag::Latest), + ) + .await?; + + // Should be handled by underlying eth API (empty in our test setup) + assert_eq!(logs.len(), 0); + + Ok(()) + } } From efdf4523820cd7e2396be2d8c0b5d760571dc856 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 17:50:35 -0400 Subject: [PATCH 06/13] optimize --- crates/flashblocks-rpc/src/pending_blocks.rs | 40 ++++---------------- crates/flashblocks-rpc/src/rpc.rs | 5 ++- 2 files changed, 12 insertions(+), 33 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 8f47a59..a557549 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -232,24 +232,20 @@ impl PendingBlocks { // Get latest block context for pending logs let latest_header = self.latest_header(); - let block_number = latest_header.number; - let block_hash = latest_header.hash(); // Iterate through all transaction receipts in pending state for (tx_hash, receipt) in &self.transaction_receipts { - // Extract logs from OpTransactionReceipt using the inner receipt's logs - let receipt_logs = receipt.inner.logs(); - - // Apply filter to each log and add block context - for log in receipt_logs { - if self.matches_filter(log, filter) { + // Apply filter and set proper context following reth's pattern + for log in receipt.inner.logs() { + if filter.matches(&log.inner) { let mut pending_log = log.clone(); - // Add block and transaction context - pending_log.log_index = Some(log_index); + // Set context following reth's logs_utils pattern + pending_log.block_hash = Some(latest_header.hash()); + pending_log.block_number = Some(latest_header.number); pending_log.transaction_hash = Some(*tx_hash); - pending_log.block_number = Some(block_number); - pending_log.block_hash = Some(block_hash); + pending_log.transaction_index = receipt.inner.transaction_index; + pending_log.log_index = Some(log_index); pending_log.removed = false; // Pending logs are never removed logs.push(pending_log); @@ -260,24 +256,4 @@ impl PendingBlocks { logs } - - fn matches_filter(&self, log: &Log, filter: &Filter) -> bool { - // Address filtering - check if filter has address and if log matches - if !filter.address.matches(&log.address()) { - return false; - } - - // Topic filtering - check each topic position - for (i, topic_filter) in filter.topics.iter().enumerate() { - if let Some(log_topic) = log.topics().get(i) { - if !topic_filter.matches(log_topic) { - return false; - } - } else if !topic_filter.matches(&Default::default()) { - return false; - } - } - - true - } } diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 1a2632a..95d9139 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -454,7 +454,10 @@ where } => { matches!( (from_block, to_block), - (Some(BlockNumberOrTag::Pending), Some(BlockNumberOrTag::Pending)) + ( + Some(BlockNumberOrTag::Pending), + Some(BlockNumberOrTag::Pending) + ) ) } _ => false, // Block hash queries or other formats are not pure pending From 0187d7efeb542f0b10c3d6fbaa61e541a3853032 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 17:57:46 -0400 Subject: [PATCH 07/13] optimize --- crates/flashblocks-rpc/src/pending_blocks.rs | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index a557549..0311882 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -228,29 +228,14 @@ impl PendingBlocks { pub fn get_pending_logs(&self, filter: &Filter) -> Vec { let mut logs = Vec::new(); - let mut log_index = 0u64; - - // Get latest block context for pending logs - let latest_header = self.latest_header(); // Iterate through all transaction receipts in pending state - for (tx_hash, receipt) in &self.transaction_receipts { + for (_idx, receipt) in &self.transaction_receipts { // Apply filter and set proper context following reth's pattern for log in receipt.inner.logs() { if filter.matches(&log.inner) { - let mut pending_log = log.clone(); - - // Set context following reth's logs_utils pattern - pending_log.block_hash = Some(latest_header.hash()); - pending_log.block_number = Some(latest_header.number); - pending_log.transaction_hash = Some(*tx_hash); - pending_log.transaction_index = receipt.inner.transaction_index; - pending_log.log_index = Some(log_index); - pending_log.removed = false; // Pending logs are never removed - - logs.push(pending_log); + logs.push(log.clone()); } - log_index += 1; } } From e6706dd78bc2d5db1677708a2a9305c47549430c Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 18:09:06 -0400 Subject: [PATCH 08/13] feat: implement eth_getLogs RPC method for flashblocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add get_logs method to EthApiOverride trait and implementation - Support pure pending queries (fromBlock:"pending" and toBlock:"pending") - Delegate all other queries to underlying eth API for historical data - Add comprehensive test coverage with realistic log data including Transfer events - Implement log filtering by address, topics, and block ranges - Add get_pending_logs method to FlashblocksAPI trait - Use alloy's built-in filter.matches() for efficient log filtering 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- crates/flashblocks-rpc/src/pending_blocks.rs | 1 - crates/flashblocks-rpc/src/rpc.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 0311882..97d7103 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -231,7 +231,6 @@ impl PendingBlocks { // Iterate through all transaction receipts in pending state for (_idx, receipt) in &self.transaction_receipts { - // Apply filter and set proper context following reth's pattern for log in receipt.inner.logs() { if filter.matches(&log.inner) { logs.push(log.clone()); diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 95d9139..85b27c5 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -151,7 +151,7 @@ impl EthApiExt { #[async_trait] impl EthApiOverrideServer for EthApiExt where - Eth: EthApiTypes + FullEthApi + Send + Sync + 'static, + Eth: FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, jsonrpsee_types::error::ErrorObject<'static>: From, { @@ -429,7 +429,7 @@ where // Only handle pure pending queries: fromBlock="pending" and toBlock="pending" // Everything else goes to regular eth API - if self.is_pure_pending_query(&filter) { + if self.is_pending_query(&filter) { self.metrics.get_logs.increment(1); let pending_logs = self.flashblocks_state.get_pending_logs(&filter); Ok(pending_logs) @@ -442,10 +442,10 @@ where impl EthApiExt where - Eth: EthApiTypes + FullEthApi + Send + Sync + 'static, + Eth: FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { - fn is_pure_pending_query(&self, filter: &Filter) -> bool { + fn is_pending_query(&self, filter: &Filter) -> bool { // Only return true for pure pending queries: both fromBlock and toBlock must be "pending" match &filter.block_option { alloy_rpc_types_eth::FilterBlockOption::Range { From 6d4c6312a53e04b13034434b1e05cfb54da08931 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Thu, 25 Sep 2025 18:22:14 -0400 Subject: [PATCH 09/13] lint --- crates/flashblocks-rpc/src/pending_blocks.rs | 2 +- crates/flashblocks-rpc/src/rpc.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index 97d7103..48ce920 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -230,7 +230,7 @@ impl PendingBlocks { let mut logs = Vec::new(); // Iterate through all transaction receipts in pending state - for (_idx, receipt) in &self.transaction_receipts { + for receipt in self.transaction_receipts.values() { for log in receipt.inner.logs() { if filter.matches(&log.inner) { logs.push(log.clone()); diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 85b27c5..1b1dc65 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -435,7 +435,7 @@ where Ok(pending_logs) } else { // All other queries - delegate to underlying reth node - self.eth_filter.logs(filter).await.map_err(Into::into) + self.eth_filter.logs(filter).await } } } From 26c115ed3ec454406bb27962a07caa335ef4c900 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Mon, 29 Sep 2025 23:09:58 -0400 Subject: [PATCH 10/13] wip --- crates/flashblocks-rpc/src/rpc.rs | 72 ++++++++++---- crates/flashblocks-rpc/src/tests/rpc.rs | 126 ++++++++++++++++++++++-- 2 files changed, 175 insertions(+), 23 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 1b1dc65..98d563b 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -427,14 +427,12 @@ where address = ?filter.address ); - // Only handle pure pending queries: fromBlock="pending" and toBlock="pending" - // Everything else goes to regular eth API - if self.is_pending_query(&filter) { + // Check if we need to include pending logs in the response + if self.needs_pending_logs(&filter) { self.metrics.get_logs.increment(1); - let pending_logs = self.flashblocks_state.get_pending_logs(&filter); - Ok(pending_logs) + self.get_logs_hybrid(filter).await } else { - // All other queries - delegate to underlying reth node + // Pure historical queries - delegate to underlying reth node self.eth_filter.logs(filter).await } } @@ -445,25 +443,65 @@ where Eth: FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { - fn is_pending_query(&self, filter: &Filter) -> bool { - // Only return true for pure pending queries: both fromBlock and toBlock must be "pending" + fn needs_pending_logs(&self, filter: &Filter) -> bool { + // Return true if toBlock is pending, indicating we need pending logs match &filter.block_option { alloy_rpc_types_eth::FilterBlockOption::Range { - from_block, + from_block: _, to_block, } => { - matches!( - (from_block, to_block), - ( - Some(BlockNumberOrTag::Pending), - Some(BlockNumberOrTag::Pending) - ) - ) + matches!(to_block, Some(BlockNumberOrTag::Pending)) + } + _ => false, // Block hash queries don't include pending + } + } + + fn is_from_block_pending(&self, filter: &Filter) -> bool { + // Check if fromBlock is pending + match &filter.block_option { + alloy_rpc_types_eth::FilterBlockOption::Range { + from_block, + to_block: _, + } => { + matches!(from_block, Some(BlockNumberOrTag::Pending)) } - _ => false, // Block hash queries or other formats are not pure pending + _ => false, } } + async fn get_logs_hybrid(&self, filter: Filter) -> RpcResult> { + let mut all_logs = Vec::new(); + + // Get historical logs if fromBlock is not pending + if !self.is_from_block_pending(&filter) { + // Create a filter for historical data (fromBlock to latest) + let historical_filter = match &filter.block_option { + alloy_rpc_types_eth::FilterBlockOption::Range { + from_block, + to_block: _, + } => { + // Create new filter with toBlock set to Latest + let mut historical_filter = filter.clone(); + historical_filter.block_option = alloy_rpc_types_eth::FilterBlockOption::Range { + from_block: *from_block, + to_block: Some(BlockNumberOrTag::Latest), + }; + historical_filter + } + _ => return Err(jsonrpsee_types::ErrorObjectOwned::from(EthApiError::InvalidBlockRange)), + }; + + let historical_logs = self.eth_filter.logs(historical_filter).await?; + all_logs.extend(historical_logs); + } + + // Always get pending logs when toBlock is pending + let pending_logs = self.flashblocks_state.get_pending_logs(&filter); + all_logs.extend(pending_logs); + + Ok(all_logs) + } + async fn wait_for_flashblocks_receipt(&self, tx_hash: TxHash) -> Option> { let mut receiver = self.flashblocks_state.subscribe_to_flashblocks(); diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 7e153dd..55e25b3 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -866,15 +866,14 @@ mod tests { } #[tokio::test] - async fn test_get_logs_non_pending_queries_go_to_eth_api() -> eyre::Result<()> { + async fn test_get_logs_mixed_block_ranges() -> eyre::Result<()> { reth_tracing::init_test_tracing(); let node = setup_node().await?; let provider = node.provider().await?; node.send_test_payloads().await?; - // Test that non-pure-pending queries go to the regular eth API - // This filter from block 0 to pending should go to eth API (not pure pending) + // Test fromBlock: 0, toBlock: pending (should include both historical and pending) let logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() @@ -883,10 +882,48 @@ mod tests { ) .await?; - // Should be handled by underlying eth API (empty in our test setup) - assert_eq!(logs.len(), 0); + // Should now include pending logs (2 logs from our test setup) + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.transaction_hash == Some(INCREMENT_HASH))); - // Test that latest queries also go to eth API + // Test fromBlock: latest, toBlock: pending + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Latest) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should include pending logs (historical part is empty in our test setup) + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + // Test fromBlock: earliest, toBlock: pending + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Earliest) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should include pending logs (historical part is empty in our test setup) + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_pure_historical_queries_go_to_eth_api() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test that pure historical queries still go to eth API let logs = provider .get_logs( &alloy_rpc_types_eth::Filter::default() @@ -898,6 +935,83 @@ mod tests { // Should be handled by underlying eth API (empty in our test setup) assert_eq!(logs.len(), 0); + // Test specific block number range + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(0) + .to_block(100), + ) + .await?; + + // Should be handled by underlying eth API (empty in our test setup) + assert_eq!(logs.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_mixed_range_with_filtering() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test mixed range with address filtering + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .address(COUNTER_ADDRESS) + .from_block(0) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get only 1 log from COUNTER_ADDRESS in pending state + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].address(), COUNTER_ADDRESS); + assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); + + // Test mixed range with topic filtering + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .event_signature(TEST_LOG_TOPIC_0) + .from_block(alloy_eips::BlockNumberOrTag::Latest) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get both logs that match the topic + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.topics()[0] == TEST_LOG_TOPIC_0)); + assert!(logs.iter().all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + + Ok(()) + } + + #[tokio::test] + async fn test_get_logs_pending_to_pending_still_works() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let provider = node.provider().await?; + + node.send_test_payloads().await?; + + // Test that pure pending to pending queries still work (legacy behavior) + let logs = provider + .get_logs( + &alloy_rpc_types_eth::Filter::default() + .from_block(alloy_eips::BlockNumberOrTag::Pending) + .to_block(alloy_eips::BlockNumberOrTag::Pending), + ) + .await?; + + // Should get 2 logs from pending state + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|log| log.transaction_hash == Some(INCREMENT_HASH))); + Ok(()) } } From ef575fd8a9f09e67831e82a3483c00316691a96c Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 30 Sep 2025 16:45:29 -0400 Subject: [PATCH 11/13] use canonical block --- crates/flashblocks-rpc/src/rpc.rs | 69 +++++++++++++------------------ 1 file changed, 28 insertions(+), 41 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index bf3d692..890ebae 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -484,65 +484,52 @@ where Eth: FullEthApi + Send + Sync + 'static, FB: FlashblocksAPI + Send + Sync + 'static, { - fn needs_pending_logs(&self, filter: &Filter) -> bool { - // Return true if toBlock is pending, indicating we need pending logs + fn parse_block_range( + &self, + filter: &Filter, + ) -> Option<(Option, Option)> { match &filter.block_option { alloy_rpc_types_eth::FilterBlockOption::Range { - from_block: _, + from_block, to_block, - } => { - matches!(to_block, Some(BlockNumberOrTag::Pending)) - } - _ => false, // Block hash queries don't include pending + } => Some((*from_block, *to_block)), + _ => None, // Block hash queries or other formats not supported } } - fn is_from_block_pending(&self, filter: &Filter) -> bool { - // Check if fromBlock is pending - match &filter.block_option { - alloy_rpc_types_eth::FilterBlockOption::Range { - from_block, - to_block: _, - } => { - matches!(from_block, Some(BlockNumberOrTag::Pending)) - } - _ => false, - } + fn needs_pending_logs(&self, filter: &Filter) -> bool { + self.parse_block_range(filter) + .map(|(_, to_block)| matches!(to_block, Some(BlockNumberOrTag::Pending))) + .unwrap_or(false) } async fn get_logs_hybrid(&self, filter: Filter) -> RpcResult> { let mut all_logs = Vec::new(); + // Always get pending logs when toBlock is pending + let pending_blocks = self.flashblocks_state.get_pending_blocks(); + + // Parse the block range once + let (from_block, _to_block) = self.parse_block_range(&filter).ok_or_else(|| { + jsonrpsee_types::ErrorObjectOwned::from(EthApiError::InvalidBlockRange) + })?; + // Get historical logs if fromBlock is not pending - if !self.is_from_block_pending(&filter) { - // Create a filter for historical data (fromBlock to latest) - let historical_filter = match &filter.block_option { - alloy_rpc_types_eth::FilterBlockOption::Range { - from_block, - to_block: _, - } => { - // Create new filter with toBlock set to Latest - let mut historical_filter = filter.clone(); - historical_filter.block_option = - alloy_rpc_types_eth::FilterBlockOption::Range { - from_block: *from_block, - to_block: Some(BlockNumberOrTag::Latest), - }; - historical_filter - } - _ => { - return Err(jsonrpsee_types::ErrorObjectOwned::from( - EthApiError::InvalidBlockRange, - )) - } + if !matches!(from_block, Some(BlockNumberOrTag::Pending)) { + // Use the canonical block number from pending blocks to ensure consistency + let canonical_block = pending_blocks.get_canonical_block_number(); + + // Create a filter for historical data (fromBlock to canonical block) + let mut historical_filter = filter.clone(); + historical_filter.block_option = alloy_rpc_types_eth::FilterBlockOption::Range { + from_block, + to_block: Some(canonical_block), }; let historical_logs = self.eth_filter.logs(historical_filter).await?; all_logs.extend(historical_logs); } - // Always get pending logs when toBlock is pending - let pending_blocks = self.flashblocks_state.get_pending_blocks(); let pending_logs = pending_blocks.get_pending_logs(&filter); all_logs.extend(pending_logs); From 214617c8a99b8ec47a00470123349f8915a58aec Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 30 Sep 2025 16:57:24 -0400 Subject: [PATCH 12/13] refactor --- crates/flashblocks-rpc/src/rpc.rs | 58 +++++++++++-------------------- 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 890ebae..e0a9c61 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -468,52 +468,29 @@ where address = ?filter.address ); - // Check if we need to include pending logs in the response - if self.needs_pending_logs(&filter) { - self.metrics.get_logs.increment(1); - self.get_logs_hybrid(filter).await - } else { - // Pure historical queries - delegate to underlying reth node - self.eth_filter.logs(filter).await - } - } -} - -impl EthApiExt -where - Eth: FullEthApi + Send + Sync + 'static, - FB: FlashblocksAPI + Send + Sync + 'static, -{ - fn parse_block_range( - &self, - filter: &Filter, - ) -> Option<(Option, Option)> { - match &filter.block_option { + // Check if this is a mixed query (toBlock is pending) + let (from_block, to_block) = match &filter.block_option { alloy_rpc_types_eth::FilterBlockOption::Range { from_block, to_block, - } => Some((*from_block, *to_block)), - _ => None, // Block hash queries or other formats not supported - } - } + } => (*from_block, *to_block), + _ => { + // Block hash queries or other formats - delegate to eth API + return self.eth_filter.logs(filter).await; + } + }; - fn needs_pending_logs(&self, filter: &Filter) -> bool { - self.parse_block_range(filter) - .map(|(_, to_block)| matches!(to_block, Some(BlockNumberOrTag::Pending))) - .unwrap_or(false) - } + // If toBlock is not pending, delegate to eth API + if !matches!(to_block, Some(BlockNumberOrTag::Pending)) { + return self.eth_filter.logs(filter).await; + } - async fn get_logs_hybrid(&self, filter: Filter) -> RpcResult> { + // Mixed query: toBlock is pending, so we need to combine historical + pending logs + self.metrics.get_logs.increment(1); let mut all_logs = Vec::new(); - // Always get pending logs when toBlock is pending let pending_blocks = self.flashblocks_state.get_pending_blocks(); - // Parse the block range once - let (from_block, _to_block) = self.parse_block_range(&filter).ok_or_else(|| { - jsonrpsee_types::ErrorObjectOwned::from(EthApiError::InvalidBlockRange) - })?; - // Get historical logs if fromBlock is not pending if !matches!(from_block, Some(BlockNumberOrTag::Pending)) { // Use the canonical block number from pending blocks to ensure consistency @@ -530,12 +507,19 @@ where all_logs.extend(historical_logs); } + // Always get pending logs when toBlock is pending let pending_logs = pending_blocks.get_pending_logs(&filter); all_logs.extend(pending_logs); Ok(all_logs) } +} +impl EthApiExt +where + Eth: FullEthApi + Send + Sync + 'static, + FB: FlashblocksAPI + Send + Sync + 'static, +{ async fn wait_for_flashblocks_receipt(&self, tx_hash: TxHash) -> Option> { let mut receiver = self.flashblocks_state.subscribe_to_flashblocks(); From 6e3895f7951ef5b259e1ab13627b4e14cd1cea9e Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 30 Sep 2025 19:30:32 -0400 Subject: [PATCH 13/13] remove unnecessary tests --- crates/flashblocks-rpc/src/tests/rpc.rs | 143 ------------------------ 1 file changed, 143 deletions(-) diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 4c1cb0e..0ed37d4 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -790,45 +790,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_get_logs_historical_vs_pending() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - let node = setup_node().await?; - let provider = node.provider().await?; - - // Test historical logs (should work but return empty for our test setup) - let historical_logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .from_block(0) - .to_block(0), - ) - .await?; - - // Historical query should return empty in our current implementation - assert_eq!(historical_logs.len(), 0); - - node.send_test_payloads().await?; - - // Test pending logs after sending payloads - let pending_logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .from_block(alloy_eips::BlockNumberOrTag::Pending) - .to_block(alloy_eips::BlockNumberOrTag::Pending), - ) - .await?; - - // Should have 2 logs from the INCREMENT_TX transaction - assert_eq!(pending_logs.len(), 2); - - // Verify both logs have the correct transaction hash - assert_eq!(pending_logs[0].transaction_hash, Some(INCREMENT_HASH)); - assert_eq!(pending_logs[1].transaction_hash, Some(INCREMENT_HASH)); - - Ok(()) - } - #[tokio::test] async fn test_get_logs_topic_filtering() -> eyre::Result<()> { reth_tracing::init_test_tracing(); @@ -920,108 +881,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_get_logs_pure_historical_queries_go_to_eth_api() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - let node = setup_node().await?; - let provider = node.provider().await?; - - node.send_test_payloads().await?; - - // Test that pure historical queries still go to eth API - let logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .from_block(alloy_eips::BlockNumberOrTag::Latest) - .to_block(alloy_eips::BlockNumberOrTag::Latest), - ) - .await?; - - // Should be handled by underlying eth API (empty in our test setup) - assert_eq!(logs.len(), 0); - - // Test specific block number range - let logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .from_block(0) - .to_block(100), - ) - .await?; - - // Should be handled by underlying eth API (empty in our test setup) - assert_eq!(logs.len(), 0); - - Ok(()) - } - - #[tokio::test] - async fn test_get_logs_mixed_range_with_filtering() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - let node = setup_node().await?; - let provider = node.provider().await?; - - node.send_test_payloads().await?; - - // Test mixed range with address filtering - let logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .address(COUNTER_ADDRESS) - .from_block(0) - .to_block(alloy_eips::BlockNumberOrTag::Pending), - ) - .await?; - - // Should get only 1 log from COUNTER_ADDRESS in pending state - assert_eq!(logs.len(), 1); - assert_eq!(logs[0].address(), COUNTER_ADDRESS); - assert_eq!(logs[0].transaction_hash, Some(INCREMENT_HASH)); - - // Test mixed range with topic filtering - let logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .event_signature(TEST_LOG_TOPIC_0) - .from_block(alloy_eips::BlockNumberOrTag::Latest) - .to_block(alloy_eips::BlockNumberOrTag::Pending), - ) - .await?; - - // Should get both logs that match the topic - assert_eq!(logs.len(), 2); - assert!(logs.iter().all(|log| log.topics()[0] == TEST_LOG_TOPIC_0)); - assert!(logs - .iter() - .all(|log| log.transaction_hash == Some(INCREMENT_HASH))); - - Ok(()) - } - - #[tokio::test] - async fn test_get_logs_pending_to_pending_still_works() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - let node = setup_node().await?; - let provider = node.provider().await?; - - node.send_test_payloads().await?; - - // Test that pure pending to pending queries still work (legacy behavior) - let logs = provider - .get_logs( - &alloy_rpc_types_eth::Filter::default() - .from_block(alloy_eips::BlockNumberOrTag::Pending) - .to_block(alloy_eips::BlockNumberOrTag::Pending), - ) - .await?; - - // Should get 2 logs from pending state - assert_eq!(logs.len(), 2); - assert!(logs - .iter() - .all(|log| log.transaction_hash == Some(INCREMENT_HASH))); - - Ok(()) - } }