Skip to content
Merged
3 changes: 3 additions & 0 deletions crates/flashblocks-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Metrics {
#[metric(describe = "Count of times flashblocks simulate_v1 is called")]
pub simulate_v1: Counter,

#[metric(describe = "Count of times flashblocks get_logs is called")]
pub get_logs: Counter,

#[metric(
describe = "Number of times pending snapshot was cleared because canonical caught up"
)]
Expand Down
17 changes: 16 additions & 1 deletion crates/flashblocks-rpc/src/pending_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use alloy_primitives::{
};
use alloy_provider::network::TransactionResponse;
use alloy_rpc_types::{state::StateOverride, BlockTransactions};
use alloy_rpc_types_eth::Header as RPCHeader;
use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log};
use eyre::eyre;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::{OpTransactionReceipt, Transaction};
Expand Down Expand Up @@ -230,4 +230,19 @@ impl PendingBlocks {
pub fn get_state_overrides(&self) -> Option<StateOverride> {
self.state_overrides.clone()
}

pub fn get_pending_logs(&self, filter: &Filter) -> Vec<Log> {
let mut logs = Vec::new();

// Iterate through all transaction receipts in pending state
for receipt in self.transaction_receipts.values() {
for log in receipt.inner.logs() {
if filter.matches(&log.inner) {
logs.push(log.clone());
}
}
}

logs
}
}
70 changes: 66 additions & 4 deletions crates/flashblocks-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use alloy_primitives::{Address, TxHash, U256};
use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock};
use alloy_rpc_types::state::{EvmOverrides, StateOverride, StateOverridesBuilder};
use alloy_rpc_types::BlockOverrides;
use alloy_rpc_types_eth::{Filter, Log};
use arc_swap::Guard;
use jsonrpsee::{
core::{async_trait, RpcResult},
Expand All @@ -19,11 +20,12 @@ use jsonrpsee_types::ErrorObjectOwned;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::OpTransactionRequest;
use reth::providers::CanonStateSubscriptions;
use reth::rpc::eth::EthFilter;
use reth::rpc::server_types::eth::EthApiError;
use reth_rpc_eth_api::helpers::EthState;
use reth_rpc_eth_api::helpers::EthTransactions;
use reth_rpc_eth_api::helpers::{EthBlocks, EthCall};
use reth_rpc_eth_api::{helpers::FullEthApi, RpcBlock};
use reth_rpc_eth_api::{helpers::FullEthApi, EthApiTypes, EthFilterApiServer, RpcBlock};
use reth_rpc_eth_api::{RpcReceipt, RpcTransaction};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -64,6 +66,9 @@ pub trait PendingBlocksAPI {

/// Gets the state overrides for the pending blocks
fn get_state_overrides(&self) -> Option<StateOverride>;

/// Gets logs from pending state matching the provided filter.
fn get_pending_logs(&self, filter: &Filter) -> Vec<Log>;
}

#[cfg_attr(not(test), rpc(server, namespace = "eth"))]
Expand Down Expand Up @@ -129,19 +134,24 @@ pub trait EthApiOverride {
opts: SimulatePayload<OpTransactionRequest>,
block_number: Option<BlockId>,
) -> RpcResult<Vec<SimulatedBlock<RpcBlock<Optimism>>>>;

#[method(name = "getLogs")]
async fn get_logs(&self, filter: Filter) -> RpcResult<Vec<Log>>;
}

#[derive(Debug)]
pub struct EthApiExt<Eth, FB> {
pub struct EthApiExt<Eth: EthApiTypes, FB> {
eth_api: Eth,
eth_filter: EthFilter<Eth>,
flashblocks_state: Arc<FB>,
metrics: Metrics,
}

impl<Eth, FB> EthApiExt<Eth, FB> {
pub fn new(eth_api: Eth, flashblocks_state: Arc<FB>) -> Self {
impl<Eth: EthApiTypes, FB> EthApiExt<Eth, FB> {
pub fn new(eth_api: Eth, eth_filter: EthFilter<Eth>, flashblocks_state: Arc<FB>) -> Self {
Self {
eth_api,
eth_filter,
flashblocks_state,
metrics: Metrics::default(),
}
Expand Down Expand Up @@ -451,6 +461,58 @@ where
.await
.map_err(Into::into)
}

async fn get_logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
debug!(
message = "rpc::get_logs",
address = ?filter.address
);

// Check if this is a mixed query (toBlock is pending)
let (from_block, to_block) = match &filter.block_option {
alloy_rpc_types_eth::FilterBlockOption::Range {
from_block,
to_block,
} => (*from_block, *to_block),
_ => {
// Block hash queries or other formats - delegate to eth API
return self.eth_filter.logs(filter).await;
}
};

// If toBlock is not pending, delegate to eth API
if !matches!(to_block, Some(BlockNumberOrTag::Pending)) {
return self.eth_filter.logs(filter).await;
}

// Mixed query: toBlock is pending, so we need to combine historical + pending logs
self.metrics.get_logs.increment(1);
let mut all_logs = Vec::new();

let pending_blocks = self.flashblocks_state.get_pending_blocks();

// Get historical logs if fromBlock is not pending
if !matches!(from_block, Some(BlockNumberOrTag::Pending)) {
// Use the canonical block number from pending blocks to ensure consistency
let canonical_block = pending_blocks.get_canonical_block_number();

// Create a filter for historical data (fromBlock to canonical block)
let mut historical_filter = filter.clone();
historical_filter.block_option = alloy_rpc_types_eth::FilterBlockOption::Range {
from_block,
to_block: Some(canonical_block),
};

let historical_logs = self.eth_filter.logs(historical_filter).await?;
all_logs.extend(historical_logs);
}

// Always get pending logs when toBlock is pending
let pending_logs = pending_blocks.get_pending_logs(&filter);
all_logs.extend(pending_logs);

Ok(all_logs)
}
}

impl<Eth, FB> EthApiExt<Eth, FB>
Expand Down
11 changes: 9 additions & 2 deletions crates/flashblocks-rpc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use alloy_eips::BlockNumberOrTag;
use alloy_primitives::map::foldhash::HashMap;
use alloy_primitives::map::B256HashMap;
use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, B256, U256};
use alloy_rpc_types::{state::StateOverride, TransactionTrait, Withdrawal};
use alloy_rpc_types::{TransactionTrait, Withdrawal};
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::state::{AccountOverride, StateOverridesBuilder};
use alloy_rpc_types_eth::state::{AccountOverride, StateOverride, StateOverridesBuilder};
use alloy_rpc_types_eth::{Filter, Log};
use arc_swap::{ArcSwapOption, Guard};
use eyre::eyre;
use op_alloy_consensus::OpTxEnvelope;
Expand Down Expand Up @@ -168,6 +169,12 @@ impl PendingBlocksAPI for Guard<Option<Arc<PendingBlocks>>> {
.map(|pb| pb.get_state_overrides())
.unwrap_or_default()
}

fn get_pending_logs(&self, filter: &Filter) -> Vec<Log> {
self.as_ref()
.map(|pb| pb.get_pending_logs(filter))
.unwrap_or_default()
}
}

#[derive(Debug, Clone)]
Expand Down
Loading