Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
egress-policy: audit

- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: dtolnay/rust-toolchain@4305c38b25d97ef35a8ad1f985ccf2d2242004f2 # stable
- uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt
- run: cargo fmt --all -- --check
Expand Down
16 changes: 5 additions & 11 deletions crates/flashblocks-rpc/src/pending_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use alloy_consensus::{Header, Sealed};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{
Address, B256, BlockNumber, TxHash, U256,
map::foldhash::{HashMap, HashMapExt},
Address, BlockNumber, TxHash, B256, U256,
};
use alloy_provider::network::TransactionResponse;
use alloy_rpc_types::{BlockTransactions, state::StateOverride};
use alloy_rpc_types::{state::StateOverride, BlockTransactions};
use alloy_rpc_types_eth::{Filter, Header as RPCHeader, Log};
use eyre::eyre;
use op_alloy_network::Optimism;
Expand Down Expand Up @@ -60,8 +60,7 @@ impl PendingBlocksBuilder {

#[inline]
pub(crate) fn with_transaction(&mut self, transaction: Transaction) -> &Self {
self.transactions_by_hash
.insert(transaction.tx_hash(), transaction.clone());
self.transactions_by_hash.insert(transaction.tx_hash(), transaction.clone());
self.transactions.push(transaction);
self
}
Expand All @@ -83,9 +82,7 @@ impl PendingBlocksBuilder {
let zero = U256::from(0);
let current_count = self.transaction_count.get(&sender).unwrap_or(&zero);

_ = self
.transaction_count
.insert(sender, *current_count + U256::from(1));
_ = self.transaction_count.insert(sender, *current_count + U256::from(1));
self
}

Expand Down Expand Up @@ -213,10 +210,7 @@ impl PendingBlocks {
}

pub fn get_transaction_count(&self, address: Address) -> U256 {
self.transaction_count
.get(&address)
.cloned()
.unwrap_or(U256::from(0))
self.transaction_count.get(&address).cloned().unwrap_or(U256::from(0))
}

pub fn get_balance(&self, address: Address) -> Option<U256> {
Expand Down
98 changes: 39 additions & 59 deletions crates/flashblocks-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use crate::metrics::Metrics;
use crate::pending_blocks::PendingBlocks;
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_primitives::map::foldhash::{HashSet, HashSetExt};
use alloy_primitives::{Address, TxHash, U256};
use alloy_rpc_types::BlockOverrides;
use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock};
use alloy_rpc_types::state::{EvmOverrides, StateOverride, StateOverridesBuilder};
use alloy_primitives::{
map::foldhash::{HashSet, HashSetExt},
Address, TxHash, U256,
};
use alloy_rpc_types::{
simulate::{SimBlock, SimulatePayload, SimulatedBlock},
state::{EvmOverrides, StateOverride, StateOverridesBuilder},
BlockOverrides,
};
use alloy_rpc_types_eth::{Filter, Log};
use arc_swap::Guard;
use jsonrpsee::{
core::{RpcResult, async_trait},
core::{async_trait, RpcResult},
proc_macros::rpc,
};
use jsonrpsee_types::ErrorObjectOwned;
use jsonrpsee_types::error::INVALID_PARAMS_CODE;
use jsonrpsee_types::{error::INVALID_PARAMS_CODE, ErrorObjectOwned};
use op_alloy_network::Optimism;
use op_alloy_rpc_types::OpTransactionRequest;
use reth::providers::CanonStateSubscriptions;
use reth::rpc::eth::EthFilter;
use reth::rpc::server_types::eth::EthApiError;
use reth_rpc_eth_api::helpers::EthState;
use reth_rpc_eth_api::helpers::EthTransactions;
use reth_rpc_eth_api::helpers::{EthBlocks, EthCall};
use reth_rpc_eth_api::{EthApiTypes, EthFilterApiServer, RpcBlock, helpers::FullEthApi};
use reth_rpc_eth_api::{RpcReceipt, RpcTransaction};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use reth::{
providers::CanonStateSubscriptions,
rpc::{eth::EthFilter, server_types::eth::EthApiError},
};
use reth_rpc_eth_api::{
helpers::{EthBlocks, EthCall, EthState, EthTransactions, FullEthApi},
EthApiTypes, EthFilterApiServer, RpcBlock, RpcReceipt, RpcTransaction,
};
use tokio::{
sync::{broadcast, broadcast::error::RecvError},
time,
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tracing::{debug, trace, warn};

use crate::{metrics::Metrics, pending_blocks::PendingBlocks};

/// Max configured timeout for `eth_sendRawTransactionSync` in milliseconds.
pub const MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS: u64 = 6_000;

Expand Down Expand Up @@ -89,7 +91,7 @@ pub trait EthApiOverride {

#[method(name = "getBalance")]
async fn get_balance(&self, address: Address, block_number: Option<BlockId>)
-> RpcResult<U256>;
-> RpcResult<U256>;

#[method(name = "getTransactionCount")]
async fn get_transaction_count(
Expand Down Expand Up @@ -149,12 +151,7 @@ pub struct EthApiExt<Eth: EthApiTypes, FB> {

impl<Eth: EthApiTypes, FB> EthApiExt<Eth, FB> {
pub fn new(eth_api: Eth, eth_filter: EthFilter<Eth>, flashblocks_state: Arc<FB>) -> Self {
Self {
eth_api,
eth_filter,
flashblocks_state,
metrics: Metrics::default(),
}
Self { eth_api, eth_filter, flashblocks_state, metrics: Metrics::default() }
}
}

Expand All @@ -180,9 +177,7 @@ where
let pending_blocks = self.flashblocks_state.get_pending_blocks();
Ok(pending_blocks.get_block(full))
} else {
EthBlocks::rpc_block(&self.eth_api, number.into(), full)
.await
.map_err(Into::into)
EthBlocks::rpc_block(&self.eth_api, number.into(), full).await.map_err(Into::into)
}
}

Expand All @@ -201,9 +196,7 @@ where
return Ok(Some(fb_receipt));
}

EthTransactions::transaction_receipt(&self.eth_api, tx_hash)
.await
.map_err(Into::into)
EthTransactions::transaction_receipt(&self.eth_api, tx_hash).await.map_err(Into::into)
}

async fn get_balance(
Expand All @@ -224,9 +217,7 @@ where
}
}

EthState::balance(&self.eth_api, address, block_number)
.await
.map_err(Into::into)
EthState::balance(&self.eth_api, address, block_number).await.map_err(Into::into)
}

async fn get_transaction_count(
Expand Down Expand Up @@ -254,9 +245,7 @@ where
return Ok(canon_count + fb_count);
}

EthState::transaction_count(&self.eth_api, address, block_number)
.await
.map_err(Into::into)
EthState::transaction_count(&self.eth_api, address, block_number).await.map_err(Into::into)
}

async fn transaction_by_hash(
Expand Down Expand Up @@ -447,21 +436,13 @@ where
state_overrides_builder.extend(sim_block.state_overrides.unwrap_or_default());
let final_overrides = state_overrides_builder.build();

let block_state_call = SimBlock {
state_overrides: Some(final_overrides),
..sim_block
};
let block_state_call = SimBlock { state_overrides: Some(final_overrides), ..sim_block };
block_state_calls.push(block_state_call);
}

let payload = SimulatePayload {
block_state_calls,
..opts
};
let payload = SimulatePayload { block_state_calls, ..opts };

EthCall::simulate_v1(&self.eth_api, payload, Some(block_id))
.await
.map_err(Into::into)
EthCall::simulate_v1(&self.eth_api, payload, Some(block_id)).await.map_err(Into::into)
}

async fn get_logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
Expand All @@ -472,10 +453,9 @@ where

// Check if this is a mixed query (toBlock is pending)
let (from_block, to_block) = match &filter.block_option {
alloy_rpc_types_eth::FilterBlockOption::Range {
from_block,
to_block,
} => (*from_block, *to_block),
alloy_rpc_types_eth::FilterBlockOption::Range { from_block, to_block } => {
(*from_block, *to_block)
}
_ => {
// Block hash queries or other formats - delegate to eth API
return self.eth_filter.logs(filter).await;
Expand Down
Loading