Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/flashblocks-rpc/src/pending_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_consensus::{Header, Sealed};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{
map::foldhash::{HashMap, HashMapExt},
Address, BlockNumber, TxHash, B256, U256,
Expand Down Expand Up @@ -151,6 +152,10 @@ impl PendingBlocks {
self.headers.last().unwrap().number
}

pub fn canonical_block_number(&self) -> BlockNumberOrTag {
BlockNumberOrTag::Number(self.headers.first().unwrap().number - 1)
}

pub fn latest_flashblock_index(&self) -> u64 {
self.flashblocks.last().unwrap().index
}
Expand Down
76 changes: 49 additions & 27 deletions crates/flashblocks-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::Metrics;
use crate::pending_blocks::PendingBlocks;
use crate::subscription::Flashblock;
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_primitives::{Address, TxHash, U256};
use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock};
use alloy_rpc_types::state::{EvmOverrides, StateOverride, StateOverridesBuilder};
use alloy_rpc_types::BlockOverrides;
use arc_swap::Guard;
use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
Expand Down Expand Up @@ -35,24 +37,32 @@ pub const MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS: u64 = 6_000;

/// Core API for accessing flashblock state and data.
pub trait FlashblocksAPI {
/// Retrieves the pending blocks.
fn get_pending_blocks(&self) -> Guard<Option<Arc<PendingBlocks>>>;

fn subscribe_to_flashblocks(&self) -> broadcast::Receiver<Flashblock>;
}

pub trait PendingBlocksAPI {
/// Get the canonical block number on top of which all pending state is built
fn get_canonical_block_number(&self) -> BlockNumberOrTag;

/// Get the pending transactions count for an address
fn get_transaction_count(&self, address: Address) -> U256;

/// Retrieves the current block. If `full` is true, includes full transaction details.
fn get_block(&self, full: bool) -> Option<RpcBlock<Optimism>>;

/// Gets transaction receipt by hash.
fn get_transaction_receipt(&self, tx_hash: TxHash) -> Option<RpcReceipt<Optimism>>;

/// Gets transaction count (nonce) for an address.
fn get_transaction_count(&self, address: Address) -> U256;

/// Gets transaction details by hash.
fn get_transaction_by_hash(&self, tx_hash: TxHash) -> Option<RpcTransaction<Optimism>>;

/// Gets balance for an address. Returns None if address not updated in flashblocks.
fn get_balance(&self, address: Address) -> Option<U256>;

/// Creates a subscription to receive flashblock updates.
fn subscribe_to_flashblocks(&self) -> broadcast::Receiver<Flashblock>;

/// Gets the state overrides for the pending blocks
fn get_state_overrides(&self) -> Option<StateOverride>;
}

Expand Down Expand Up @@ -157,7 +167,8 @@ where

if number.is_pending() {
self.metrics.get_block_by_number.increment(1);
Ok(self.flashblocks_state.get_block(full))
let pending_blocks = self.flashblocks_state.get_pending_blocks();
Ok(pending_blocks.get_block(full))
} else {
EthBlocks::rpc_block(&self.eth_api, number.into(), full)
.await
Expand All @@ -174,7 +185,8 @@ where
tx_hash = %tx_hash
);

if let Some(fb_receipt) = self.flashblocks_state.get_transaction_receipt(tx_hash) {
let pending_blocks = self.flashblocks_state.get_pending_blocks();
if let Some(fb_receipt) = pending_blocks.get_transaction_receipt(tx_hash) {
self.metrics.get_transaction_receipt.increment(1);
return Ok(Some(fb_receipt));
}
Expand All @@ -196,7 +208,8 @@ where
let block_id = block_number.unwrap_or_default();
if block_id.is_pending() {
self.metrics.get_balance.increment(1);
if let Some(balance) = self.flashblocks_state.get_balance(address) {
let pending_blocks = self.flashblocks_state.get_pending_blocks();
if let Some(balance) = pending_blocks.get_balance(address) {
return Ok(balance);
}
}
Expand All @@ -219,16 +232,16 @@ where
let block_id = block_number.unwrap_or_default();
if block_id.is_pending() {
self.metrics.get_transaction_count.increment(1);
let latest_count = EthState::transaction_count(
&self.eth_api,
address,
Some(BlockId::Number(BlockNumberOrTag::Latest)),
)
.await
.map_err(Into::into)?;
let pending_blocks = self.flashblocks_state.get_pending_blocks();
let canon_block = pending_blocks.get_canonical_block_number();
let fb_count = pending_blocks.get_transaction_count(address);

let canon_count =
EthState::transaction_count(&self.eth_api, address, Some(canon_block.into()))
.await
.map_err(Into::into)?;

let fb_count = self.flashblocks_state.get_transaction_count(address);
return Ok(latest_count + fb_count);
return Ok(canon_count + fb_count);
}

EthState::transaction_count(&self.eth_api, address, block_number)
Expand All @@ -245,7 +258,9 @@ where
tx_hash = %tx_hash
);

if let Some(fb_transaction) = self.flashblocks_state.get_transaction_by_hash(tx_hash) {
let pending_blocks = self.flashblocks_state.get_pending_blocks();

if let Some(fb_transaction) = pending_blocks.get_transaction_by_hash(tx_hash) {
self.metrics.get_transaction_receipt.increment(1);
return Ok(Some(fb_transaction));
}
Expand Down Expand Up @@ -329,12 +344,14 @@ where
block_overrides = ?block_overrides,
);

let block_id = block_number.unwrap_or_default();
let mut block_id = block_number.unwrap_or_default();
let mut pending_overrides = EvmOverrides::default();
// If the call is to pending block use cached override (if they exist)
if block_id.is_pending() {
self.metrics.call.increment(1);
pending_overrides.state = self.flashblocks_state.get_state_overrides();
let pending_blocks = self.flashblocks_state.get_pending_blocks();
block_id = pending_blocks.get_canonical_block_number().into();
pending_overrides.state = pending_blocks.get_state_overrides();
}

// Apply user's overrides on top
Expand All @@ -348,7 +365,7 @@ where
EthCall::call(
&self.eth_api,
transaction,
block_number,
Some(block_id),
EvmOverrides::new(Some(final_overrides), block_overrides),
)
.await
Expand All @@ -368,12 +385,14 @@ where
overrides = ?overrides,
);

let block_id = block_number.unwrap_or_default();
let mut block_id = block_number.unwrap_or_default();
let mut pending_overrides = EvmOverrides::default();
// If the call is to pending block use cached override (if they exist)
if block_id.is_pending() {
self.metrics.estimate_gas.increment(1);
pending_overrides.state = self.flashblocks_state.get_state_overrides();
let pending_blocks = self.flashblocks_state.get_pending_blocks();
block_id = pending_blocks.get_canonical_block_number().into();
pending_overrides.state = pending_blocks.get_state_overrides();
}

let mut state_overrides_builder =
Expand All @@ -396,13 +415,15 @@ where
block_number = ?block_number,
);

let block_id = block_number.unwrap_or_default();
let mut block_id = block_number.unwrap_or_default();
let mut pending_overrides = EvmOverrides::default();

// If the call is to pending block use cached override (if they exist)
if block_id.is_pending() {
self.metrics.simulate_v1.increment(1);
pending_overrides.state = self.flashblocks_state.get_state_overrides();
let pending_blocks = self.flashblocks_state.get_pending_blocks();
block_id = pending_blocks.get_canonical_block_number().into();
pending_overrides.state = pending_blocks.get_state_overrides();
}

// Prepend flashblocks pending overrides to the block state calls
Expand Down Expand Up @@ -444,7 +465,8 @@ where
match receiver.recv().await {
Ok(flashblock) if flashblock.metadata.receipts.contains_key(&tx_hash) => {
debug!(message = "found receipt in flashblock", tx_hash = %tx_hash);
return self.flashblocks_state.get_transaction_receipt(tx_hash);
let pending_blocks = self.flashblocks_state.get_pending_blocks();
return pending_blocks.get_transaction_receipt(tx_hash);
}
Ok(_) => {
trace!(message = "flashblock does not contain receipt", tx_hash = %tx_hash);
Expand Down
78 changes: 40 additions & 38 deletions crates/flashblocks-rpc/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use crate::metrics::Metrics;
use crate::pending_blocks::{PendingBlocks, PendingBlocksBuilder};
use crate::rpc::FlashblocksAPI;
use crate::rpc::{FlashblocksAPI, PendingBlocksAPI};
use crate::subscription::{Flashblock, FlashblocksReceiver};
use alloy_consensus::transaction::{Recovered, SignerRecoverable, TransactionMeta};
use alloy_consensus::{Header, TxReceipt};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::map::foldhash::HashMap;
use alloy_primitives::map::B256HashMap;
use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, TxHash, B256, U256};
use alloy_rpc_types::{TransactionTrait, Withdrawal};
use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, B256, U256};
use alloy_rpc_types::{state::StateOverride, TransactionTrait, Withdrawal};
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::state::{AccountOverride, StateOverride, StateOverridesBuilder};
use arc_swap::ArcSwapOption;
use alloy_rpc_types_eth::state::{AccountOverride, StateOverridesBuilder};
use arc_swap::{ArcSwapOption, Guard};
use eyre::eyre;
use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::Optimism;
use op_alloy_network::TransactionResponse;
use op_alloy_network::{Optimism, TransactionResponse};
use op_alloy_rpc_types::Transaction;
use reth::chainspec::{ChainSpecProvider, EthChainSpec};
use reth::providers::{BlockReaderIdExt, StateProviderFactory};
Expand All @@ -29,8 +28,7 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_optimism_primitives::{DepositReceipt, OpBlock, OpPrimitives};
use reth_optimism_rpc::OpReceiptBuilder;
use reth_primitives::RecoveredBlock;
use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcTransaction};
use reth_rpc_eth_api::{RpcBlock, RpcReceipt};
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -120,51 +118,55 @@ impl<Client> FlashblocksReceiver for FlashblocksState<Client> {
}

impl<Client> FlashblocksAPI for FlashblocksState<Client> {
fn get_block(&self, full: bool) -> Option<RpcBlock<Optimism>> {
self.pending_blocks
.load()
.as_ref()
.map(|pb| pb.get_latest_block(full))
fn get_pending_blocks(&self) -> Guard<Option<Arc<PendingBlocks>>> {
self.pending_blocks.load()
}

fn get_transaction_receipt(&self, tx_hash: TxHash) -> Option<RpcReceipt<Optimism>> {
self.pending_blocks
.load()
.as_ref()
.and_then(|pb| pb.get_receipt(tx_hash))
fn subscribe_to_flashblocks(&self) -> tokio::sync::broadcast::Receiver<Flashblock> {
self.flashblock_sender.subscribe()
}
}

impl PendingBlocksAPI for Guard<Option<Arc<PendingBlocks>>> {
fn get_canonical_block_number(&self) -> BlockNumberOrTag {
self.as_ref()
.map(|pb| pb.canonical_block_number())
.unwrap_or(BlockNumberOrTag::Latest)
}

fn get_transaction_count(&self, address: Address) -> U256 {
self.pending_blocks
.load()
.as_ref()
self.as_ref()
.map(|pb| pb.get_transaction_count(address))
.unwrap_or_else(|| U256::from(0))
}

fn get_transaction_by_hash(&self, tx_hash: TxHash) -> Option<RpcTransaction<Optimism>> {
self.pending_blocks
.load()
.as_ref()
.and_then(|pb| pb.get_transaction_by_hash(tx_hash))
fn get_block(&self, full: bool) -> Option<RpcBlock<Optimism>> {
self.as_ref().map(|pb| pb.get_latest_block(full))
}

fn get_balance(&self, address: Address) -> Option<U256> {
self.pending_blocks
.load()
.as_ref()
.and_then(|pb| pb.get_balance(address))
fn get_transaction_receipt(
&self,
tx_hash: alloy_primitives::TxHash,
) -> Option<RpcReceipt<Optimism>> {
self.as_ref().and_then(|pb| pb.get_receipt(tx_hash))
}

fn subscribe_to_flashblocks(&self) -> tokio::sync::broadcast::Receiver<Flashblock> {
self.flashblock_sender.subscribe()
fn get_transaction_by_hash(
&self,
tx_hash: alloy_primitives::TxHash,
) -> Option<RpcTransaction<Optimism>> {
self.as_ref()
.and_then(|pb| pb.get_transaction_by_hash(tx_hash))
}

fn get_balance(&self, address: Address) -> Option<U256> {
self.as_ref().and_then(|pb| pb.get_balance(address))
}

fn get_state_overrides(&self) -> Option<StateOverride> {
self.pending_blocks
.load()
.as_ref()
.and_then(|pb| pb.get_state_overrides())
self.as_ref()
.map(|pb| pb.get_state_overrides())
.unwrap_or_default()
}
}

Expand Down
Loading