From 50d3ee4b355de87c3c95db83bff3e314e20b4b7a Mon Sep 17 00:00:00 2001 From: Francis Li Date: Wed, 19 Nov 2025 20:52:32 -0800 Subject: [PATCH 1/2] Update to use op-alloy flashblock types --- Cargo.lock | 33 +++--- Cargo.toml | 8 +- Justfile | 9 ++ crates/flashblocks-rpc/Cargo.toml | 1 + crates/flashblocks-rpc/src/cache.rs | 59 ++++++----- crates/flashblocks-rpc/src/flashblocks.rs | 10 +- crates/flashblocks-rpc/src/tests/mod.rs | 100 ++++++++---------- .../rollup-boost/src/flashblocks/inbound.rs | 23 ++-- crates/rollup-boost/src/flashblocks/mod.rs | 3 - .../rollup-boost/src/flashblocks/outbound.rs | 10 +- .../src/flashblocks/primitives.rs | 82 -------------- .../rollup-boost/src/flashblocks/service.rs | 34 +++--- 12 files changed, 146 insertions(+), 226 deletions(-) delete mode 100644 crates/rollup-boost/src/flashblocks/primitives.rs diff --git a/Cargo.lock b/Cargo.lock index a6b79548..6bbe400f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1669,7 +1669,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -1687,7 +1687,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -2842,7 +2842,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3120,7 +3120,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3328,6 +3328,7 @@ dependencies = [ "op-alloy-consensus", "op-alloy-network", "op-alloy-rpc-types", + "op-alloy-rpc-types-engine", "reth-db", "reth-e2e-test-utils", "reth-node-api", @@ -5546,7 +5547,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5703,9 +5704,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "op-alloy-consensus" -version = "0.22.1" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0d7ec388eb83a3e6c71774131dbbb2ba9c199b6acac7dce172ed8de2f819e91" +checksum = "e82f4f768ba39e52a4efe1b8f3425c04ab0d0e6f90c003fe97e5444cd963405e" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5729,9 +5730,9 @@ checksum = "a79f352fc3893dcd670172e615afef993a41798a1d3fc0db88a3e60ef2e70ecc" [[package]] name = "op-alloy-network" -version = "0.22.1" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "979fe768bbb571d1d0bd7f84bc35124243b4db17f944b94698872a4701e743a0" +checksum = "f2607d0d985f848f98fa79068d11c612f8476dba7deb7498881794bf51b3cfb5" dependencies = [ "alloy-consensus", "alloy-network", @@ -5755,9 +5756,9 @@ dependencies = [ [[package]] name = "op-alloy-rpc-types" -version = "0.22.1" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc252b5fa74dbd33aa2f9a40e5ff9cfe34ed2af9b9b235781bc7cc8ec7d6aca8" +checksum = "890b51c3a619c263d52ee5a945dce173a4052d017f93bf5698613b21cbe0d237" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5774,9 +5775,9 @@ dependencies = [ [[package]] name = "op-alloy-rpc-types-engine" -version = "0.22.1" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1abe694cd6718b8932da3f824f46778be0f43289e4103c88abc505c63533a04" +checksum = "c92f9dd709b3a769b7604d4d2257846b6de3d3f60e5163982cc4e90c0d0b6f95" dependencies = [ "alloy-consensus", "alloy-eips", @@ -10477,7 +10478,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -11396,7 +11397,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -12606,7 +12607,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 66b5e5fd..7240367b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,10 +40,10 @@ alloy-rpc-types = "1.0.41" alloy-genesis = "1.0.41" alloy-rpc-client = "1.0.41" alloy-provider = "1.0.41" -op-alloy-network = "0.22.0" -op-alloy-rpc-types-engine = "0.22.0" -op-alloy-consensus = "0.22.0" -op-alloy-rpc-types = "0.22.0" +op-alloy-network = "0.22.3" +op-alloy-rpc-types-engine = "0.22.3" +op-alloy-consensus = "0.22.3" +op-alloy-rpc-types = "0.22.3" tokio-tungstenite = { version = "0.26.2", features = ["native-tls"] } testcontainers = "0.23" testcontainers-modules = { version = "0.11", features = ["redis"] } diff --git a/Justfile b/Justfile index d592e7e7..e5dc6f58 100644 --- a/Justfile +++ b/Justfile @@ -18,3 +18,12 @@ build-debug: kurtosis-spawn: kurtosis run github.com/ethpandaops/optimism-package@452133367b693e3ba22214a6615c86c60a1efd5e --args-file ./scripts/ci/kurtosis-params.yaml --enclave op-rollup-boost + +clippy: + cargo clippy --workspace -- -D warnings + +fmt: + cargo fmt --all + +test: + cargo nextest run --workspace \ No newline at end of file diff --git a/crates/flashblocks-rpc/Cargo.toml b/crates/flashblocks-rpc/Cargo.toml index 3d230055..a4540cd8 100644 --- a/crates/flashblocks-rpc/Cargo.toml +++ b/crates/flashblocks-rpc/Cargo.toml @@ -42,6 +42,7 @@ alloy-provider.workspace = true op-alloy-network.workspace = true op-alloy-consensus.workspace = true op-alloy-rpc-types.workspace = true +op-alloy-rpc-types-engine.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true diff --git a/crates/flashblocks-rpc/src/cache.rs b/crates/flashblocks-rpc/src/cache.rs index 792a83a7..4dddd96a 100644 --- a/crates/flashblocks-rpc/src/cache.rs +++ b/crates/flashblocks-rpc/src/cache.rs @@ -14,24 +14,36 @@ use op_alloy_rpc_types::Transaction; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::extract_l1_info; use reth_optimism_primitives::OpPrimitives; -use reth_optimism_primitives::{OpBlock, OpReceipt, OpTransactionSigned}; +use reth_optimism_primitives::{OpBlock, OpTransactionSigned}; use reth_optimism_rpc::OpReceiptBuilder; use reth_primitives::Recovered; use reth_primitives_traits::block::body::BlockBody; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use reth_rpc_eth_api::transaction::ConvertReceiptInput; use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use rollup_boost::{ - FlashblockBuilder, FlashblocksPayloadV1, OpExecutionPayloadEnvelope, PayloadVersion, -}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, str::FromStr, sync::Arc}; - -#[derive(Debug, Deserialize, Serialize, Clone, Default)] -pub struct Metadata { - pub receipts: HashMap, - pub new_account_balances: HashMap, // Address -> Balance (hex) - pub block_number: u64, +use rollup_boost::{FlashblockBuilder, OpExecutionPayloadEnvelope, PayloadVersion}; +use std::{collections::HashMap, sync::Arc}; + +/// Convert op_alloy_consensus::OpReceipt to reth_optimism_primitives::OpReceipt +fn convert_receipt(receipt: &op_alloy_consensus::OpReceipt) -> reth_optimism_primitives::OpReceipt { + match receipt { + op_alloy_consensus::OpReceipt::Legacy(r) => { + reth_optimism_primitives::OpReceipt::Legacy(r.clone()) + } + op_alloy_consensus::OpReceipt::Eip2930(r) => { + reth_optimism_primitives::OpReceipt::Eip2930(r.clone()) + } + op_alloy_consensus::OpReceipt::Eip1559(r) => { + reth_optimism_primitives::OpReceipt::Eip1559(r.clone()) + } + op_alloy_consensus::OpReceipt::Eip7702(r) => { + reth_optimism_primitives::OpReceipt::Eip7702(r.clone()) + } + op_alloy_consensus::OpReceipt::Deposit(r) => { + reth_optimism_primitives::OpReceipt::Deposit(r.clone()) + } + } } #[derive(Clone)] @@ -65,7 +77,7 @@ impl FlashblocksCache { ArcSwap::load(&self.inner).get_receipt(tx_hash) } - pub fn process_payload(&self, payload: FlashblocksPayloadV1) -> eyre::Result<()> { + pub fn process_payload(&self, payload: OpFlashblockPayload) -> eyre::Result<()> { let mut new_state = FlashblocksCacheInner::clone(&self.inner.load_full()); new_state.process_payload(payload)?; self.inner.store(Arc::new(new_state)); @@ -147,14 +159,8 @@ impl FlashblocksCacheInner { self.receipts_cache.clear(); } - pub fn process_payload(&mut self, payload: FlashblocksPayloadV1) -> eyre::Result<()> { - // Convert metadata with error handling - let metadata: Metadata = match serde_json::from_value(payload.metadata.clone()) { - Ok(m) => m, - Err(e) => { - return Err(eyre::eyre!("Failed to deserialize metadata: {}", e)); - } - }; + pub fn process_payload(&mut self, payload: OpFlashblockPayload) -> eyre::Result<()> { + let metadata = payload.metadata.clone(); if payload.index == 0 { self.reset(); @@ -190,7 +196,7 @@ impl FlashblocksCacheInner { // update the receipts let receipt = metadata .receipts - .get(&tx.tx_hash().to_string()) + .get(&tx.tx_hash()) .expect("Receipt should exist"); all_receipts.push(receipt.clone()); @@ -226,7 +232,7 @@ impl FlashblocksCacheInner { timestamp, }; let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { - receipt: receipt.clone(), + receipt: convert_receipt(receipt), tx: tx.try_to_recovered_ref()?, gas_used: receipt.cumulative_gas_used() - gas_used, next_log_index, @@ -250,12 +256,7 @@ impl FlashblocksCacheInner { // Store account balances for (address, balance) in metadata.new_account_balances.iter() { - let address = Address::from_str(address) - .map_err(|e| eyre::eyre!("Failed to parse address: {}", e))?; - let balance = U256::from_str(balance) - .map_err(|e| eyre::eyre!("Failed to parse balance: {}", e))?; - - self.balance_cache.insert(address, balance); + self.balance_cache.insert(*address, *balance); } Ok(()) diff --git a/crates/flashblocks-rpc/src/flashblocks.rs b/crates/flashblocks-rpc/src/flashblocks.rs index fe3e3c42..e0aea59e 100644 --- a/crates/flashblocks-rpc/src/flashblocks.rs +++ b/crates/flashblocks-rpc/src/flashblocks.rs @@ -3,9 +3,9 @@ use alloy_primitives::{Address, TxHash, U256}; use futures_util::StreamExt; use jsonrpsee::core::async_trait; use op_alloy_network::Optimism; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use reth_optimism_chainspec::OpChainSpec; use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use rollup_boost::FlashblocksPayloadV1; use std::{io::Read, sync::Arc}; use tokio::sync::mpsc; use tokio_tungstenite::{connect_async, tungstenite::Message}; @@ -101,19 +101,19 @@ impl FlashblocksOverlay { Ok(()) } - pub fn process_payload(&self, payload: FlashblocksPayloadV1) -> eyre::Result<()> { + pub fn process_payload(&self, payload: OpFlashblockPayload) -> eyre::Result<()> { self.cache.process_payload(payload) } } enum InternalMessage { - NewPayload(FlashblocksPayloadV1), + NewPayload(OpFlashblockPayload), } -fn try_decode_message(bytes: &[u8]) -> eyre::Result { +fn try_decode_message(bytes: &[u8]) -> eyre::Result { let text = try_parse_message(bytes)?; - let payload: FlashblocksPayloadV1 = match serde_json::from_str(&text) { + let payload: OpFlashblockPayload = match serde_json::from_str(&text) { Ok(m) => m, Err(e) => { return Err(eyre::eyre!("failed to parse message: {}", e)); diff --git a/crates/flashblocks-rpc/src/tests/mod.rs b/crates/flashblocks-rpc/src/tests/mod.rs index 27597e7b..2b7eb2c5 100644 --- a/crates/flashblocks-rpc/src/tests/mod.rs +++ b/crates/flashblocks-rpc/src/tests/mod.rs @@ -1,12 +1,13 @@ #[cfg(test)] mod tests { - use crate::{EthApiOverrideServer, FlashblocksApiExt, FlashblocksOverlay, cache::Metadata}; + use crate::{EthApiOverrideServer, FlashblocksApiExt, FlashblocksOverlay}; use alloy_consensus::Receipt; use alloy_genesis::Genesis; use alloy_primitives::{Address, B256, Bytes, TxHash, U256, address, b256}; use alloy_provider::{Provider, RootProvider}; use alloy_rpc_client::RpcClient; use alloy_rpc_types_engine::PayloadId; + use op_alloy_consensus::OpReceipt; use reth_node_builder::{Node, NodeBuilder, NodeConfig, NodeHandle}; use reth_node_core::{ args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, @@ -14,18 +15,18 @@ mod tests { }; use reth_optimism_chainspec::OpChainSpecBuilder; use reth_optimism_node::{OpNode, args::RollupArgs}; - use reth_optimism_primitives::OpReceipt; use reth_provider::providers::BlockchainProvider; use reth_tasks::TaskManager; - use rollup_boost::{ - ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, + use op_alloy_rpc_types_engine::{ + OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, + OpFlashblockPayloadMetadata, }; - use std::{any::Any, collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc}; + use std::{any::Any, collections::BTreeMap, net::SocketAddr, str::FromStr, sync::Arc}; use tokio::sync::{mpsc, oneshot}; use url::Url; pub struct NodeContext { - sender: mpsc::Sender<(FlashblocksPayloadV1, oneshot::Sender<()>)>, + sender: mpsc::Sender<(OpFlashblockPayload, oneshot::Sender<()>)>, http_api_addr: SocketAddr, _node_exit_future: NodeExitFuture, _node: Box, @@ -33,7 +34,7 @@ mod tests { } impl NodeContext { - pub async fn send_payload(&self, payload: FlashblocksPayloadV1) -> eyre::Result<()> { + pub async fn send_payload(&self, payload: OpFlashblockPayload) -> eyre::Result<()> { let (tx, rx) = oneshot::channel(); self.sender.send((payload, tx)).await?; rx.await?; @@ -88,7 +89,7 @@ mod tests { // Start websocket server to simulate the builder and send payloads back to the node let (sender, mut receiver) = - mpsc::channel::<(FlashblocksPayloadV1, oneshot::Sender<()>)>(100); + mpsc::channel::<(OpFlashblockPayload, oneshot::Sender<()>)>(100); let NodeHandle { node, @@ -135,11 +136,11 @@ mod tests { }) } - fn create_first_payload() -> FlashblocksPayloadV1 { - FlashblocksPayloadV1 { + fn create_first_payload() -> OpFlashblockPayload { + OpFlashblockPayload { payload_id: PayloadId::new([0; 8]), index: 0, - base: Some(ExecutionPayloadBaseV1 { + base: Some(OpFlashblockPayloadBase { parent_beacon_block_root: B256::default(), parent_hash: B256::default(), fee_recipient: Address::ZERO, @@ -150,13 +151,12 @@ mod tests { extra_data: Bytes::new(), base_fee_per_gas: U256::ZERO, }), - diff: ExecutionPayloadFlashblockDeltaV1::default(), - metadata: serde_json::to_value(Metadata { + diff: OpFlashblockPayloadDelta::default(), + metadata: OpFlashblockPayloadMetadata { block_number: 1, - receipts: HashMap::default(), - new_account_balances: HashMap::default(), - }) - .unwrap(), + receipts: BTreeMap::new(), + new_account_balances: BTreeMap::new(), + }, } } @@ -168,7 +168,7 @@ mod tests { const TX2_HASH: TxHash = b256!("0xa6155b295085d3b87a3c86e342fe11c3b22f9952d0d85d9d34d223b7d6a17cd8"); - fn create_second_payload() -> FlashblocksPayloadV1 { + fn create_second_payload() -> OpFlashblockPayload { // Create second payload (index 1) with transactions // tx1 hash: 0x2be2e6f8b01b03b87ae9f0ebca8bbd420f174bef0fbcc18c7802c5378b78f548 (deposit transaction) // tx2 hash: 0xa6155b295085d3b87a3c86e342fe11c3b22f9952d0d85d9d34d223b7d6a17cd8 @@ -176,11 +176,32 @@ mod tests { let tx2 = Bytes::from_str("0xf8cd82016d8316e5708302c01c94f39635f2adf40608255779ff742afe13de31f57780b8646e530e9700000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000156ddc81eed2a36d68302948ba0a608703e79b22164f74523d188a11f81c25a65dd59535bab1cd1d8b30d115f3ea07f4cfbbad77a139c9209d3bded89091867ff6b548dd714109c61d1f8e7a84d14").unwrap(); // Send another test flashblock payload - let payload = FlashblocksPayloadV1 { + let mut receipts = BTreeMap::new(); + receipts.insert( + TX1_HASH, + OpReceipt::Legacy(Receipt { + status: true.into(), + cumulative_gas_used: 21000, + logs: vec![], + }), + ); + receipts.insert( + TX2_HASH, + OpReceipt::Legacy(Receipt { + status: true.into(), + cumulative_gas_used: 45000, + logs: vec![], + }), + ); + + let mut balances = BTreeMap::new(); + balances.insert(TEST_ADDRESS, U256::from(PENDING_BALANCE)); + + OpFlashblockPayload { payload_id: PayloadId::new([0; 8]), index: 1, base: None, - diff: ExecutionPayloadFlashblockDeltaV1 { + diff: OpFlashblockPayloadDelta { state_root: B256::default(), receipts_root: B256::default(), gas_used: 0, @@ -191,41 +212,12 @@ mod tests { withdrawals_root: Default::default(), blob_gas_used: Default::default(), }, - metadata: serde_json::to_value(Metadata { + metadata: OpFlashblockPayloadMetadata { block_number: 1, - receipts: { - let mut receipts = HashMap::default(); - receipts.insert( - TX1_HASH.to_string(), // transaction hash as string - OpReceipt::Legacy(Receipt { - status: true.into(), - cumulative_gas_used: 21000, - logs: vec![], - }), - ); - receipts.insert( - TX2_HASH.to_string(), // transaction hash as string - OpReceipt::Legacy(Receipt { - status: true.into(), - cumulative_gas_used: 45000, - logs: vec![], - }), - ); - receipts - }, - new_account_balances: { - let mut map = HashMap::default(); - map.insert( - TEST_ADDRESS.to_string(), - format!("0x{:x}", U256::from(PENDING_BALANCE)), - ); - map - }, - }) - .unwrap(), - }; - - payload + receipts, + new_account_balances: balances, + }, + } } #[tokio::test] diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 4737c66c..c27200df 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -1,10 +1,11 @@ -use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayloadV1}; +use super::metrics::FlashblocksWsInboundMetrics; use crate::FlashblocksWebsocketConfig; use backoff::ExponentialBackoff; use backoff::backoff::Backoff; use bytes::Bytes; use futures::{SinkExt, StreamExt}; use lru::LruCache; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::Mutex; @@ -40,7 +41,7 @@ enum FlashblocksReceiverError { TaskPanic(String), #[error("Failed to send message to sender: {0}")] - SendError(#[from] Box>), + SendError(#[from] Box>), #[error("Ping mutex poisoned")] MutexPoisoned, @@ -48,7 +49,7 @@ enum FlashblocksReceiverError { pub struct FlashblocksReceiverService { url: Url, - sender: mpsc::Sender, + sender: mpsc::Sender, websocket_config: FlashblocksWebsocketConfig, metrics: FlashblocksWsInboundMetrics, } @@ -56,7 +57,7 @@ pub struct FlashblocksReceiverService { impl FlashblocksReceiverService { pub fn new( url: Url, - sender: mpsc::Sender, + sender: mpsc::Sender, websocket_config: FlashblocksWebsocketConfig, ) -> Self { Self { @@ -148,7 +149,7 @@ impl FlashblocksReceiverService { Some(Ok(msg)) => match msg { Message::Text(text) => { metrics.messages_received.increment(1); - match serde_json::from_str::(&text) { + match serde_json::from_str::(&text) { Ok(flashblocks_msg) => sender.send(flashblocks_msg).await.map_err(|e| { FlashblocksReceiverError::SendError(Box::new(e)) })?, @@ -235,12 +236,12 @@ mod tests { addr: SocketAddr, ) -> eyre::Result<( watch::Sender, - mpsc::Sender, + mpsc::Sender, mpsc::Receiver<()>, url::Url, )> { let (term_tx, mut term_rx) = watch::channel(false); - let (send_tx, mut send_rx) = mpsc::channel::(100); + let (send_tx, mut send_rx) = mpsc::channel::(100); let (send_ping_tx, send_ping_rx) = mpsc::channel::<()>(100); let listener = TcpListener::bind(addr)?; @@ -403,12 +404,12 @@ mod tests { // Send a message to the websocket server send_msg - .send(FlashblocksPayloadV1::default()) + .send(OpFlashblockPayload::default()) .await .expect("message sent to websocket server"); let msg = rx.recv().await.expect("message received from websocket"); - assert_eq!(msg, FlashblocksPayloadV1::default()); + assert_eq!(msg, OpFlashblockPayload::default()); // Drop the websocket server and start another one with the same address // The FlashblocksReceiverService should reconnect to the new server @@ -420,12 +421,12 @@ mod tests { // start a new server with the same address let (term, send_msg, _, _url) = start(addr).await?; send_msg - .send(FlashblocksPayloadV1::default()) + .send(OpFlashblockPayload::default()) .await .expect("message sent to websocket server"); let msg = rx.recv().await.expect("message received from websocket"); - assert_eq!(msg, FlashblocksPayloadV1::default()); + assert_eq!(msg, OpFlashblockPayload::default()); term.send(true).expect("termination signal sent"); Ok(()) diff --git a/crates/rollup-boost/src/flashblocks/mod.rs b/crates/rollup-boost/src/flashblocks/mod.rs index 2e219c61..1499a26d 100644 --- a/crates/rollup-boost/src/flashblocks/mod.rs +++ b/crates/rollup-boost/src/flashblocks/mod.rs @@ -2,12 +2,9 @@ mod launcher; pub use launcher::*; -mod primitives; mod service; pub use service::*; -pub use primitives::*; - mod inbound; mod outbound; diff --git a/crates/rollup-boost/src/flashblocks/outbound.rs b/crates/rollup-boost/src/flashblocks/outbound.rs index 86f1eb2a..fcccc5b9 100644 --- a/crates/rollup-boost/src/flashblocks/outbound.rs +++ b/crates/rollup-boost/src/flashblocks/outbound.rs @@ -1,4 +1,3 @@ -use super::primitives::FlashblocksPayloadV1; use core::{ fmt::{Debug, Formatter}, net::SocketAddr, @@ -7,6 +6,7 @@ use core::{ task::{Context, Poll}, }; use futures::{Sink, SinkExt, StreamExt}; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use std::{io, net::TcpListener, sync::Arc}; use tokio::{ net::TcpStream, @@ -22,7 +22,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message}; /// A WebSockets publisher that accepts connections from client websockets and broadcasts to them /// updates about new flashblocks. It maintains a count of sent messages and active subscriptions. /// -/// This is modelled as a `futures::Sink` that can be used to send `FlashblocksPayloadV1` messages. +/// This is modelled as a `futures::Sink` that can be used to send `OpFlashblockPayload` messages. pub struct WebSocketPublisher { sent: Arc, subs: Arc, @@ -55,7 +55,7 @@ impl WebSocketPublisher { }) } - pub fn publish(&self, payload: &FlashblocksPayloadV1) -> io::Result<()> { + pub fn publish(&self, payload: &OpFlashblockPayload) -> io::Result<()> { // Serialize the payload to a UTF-8 string // serialize only once, then just copy around only a pointer // to the serialized data for each subscription. @@ -223,14 +223,14 @@ impl Debug for WebSocketPublisher { } } -impl Sink<&FlashblocksPayloadV1> for WebSocketPublisher { +impl Sink<&OpFlashblockPayload> for WebSocketPublisher { type Error = eyre::Report; fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: &FlashblocksPayloadV1) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: &OpFlashblockPayload) -> Result<(), Self::Error> { self.publish(item)?; Ok(()) } diff --git a/crates/rollup-boost/src/flashblocks/primitives.rs b/crates/rollup-boost/src/flashblocks/primitives.rs deleted file mode 100644 index cec0533c..00000000 --- a/crates/rollup-boost/src/flashblocks/primitives.rs +++ /dev/null @@ -1,82 +0,0 @@ -use alloy_primitives::{Address, B256, Bloom, Bytes, U256}; -use alloy_rpc_types_engine::PayloadId; -use alloy_rpc_types_eth::Withdrawal; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -/// Represents the modified portions of an execution payload within a flashblock. -/// This structure contains only the fields that can be updated during block construction, -/// such as state root, receipts, logs, and new transactions. Other immutable block fields -/// like parent hash and block number are excluded since they remain constant throughout -/// the block's construction. -#[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)] -pub struct ExecutionPayloadFlashblockDeltaV1 { - /// The state root of the block. - pub state_root: B256, - /// The receipts root of the block. - pub receipts_root: B256, - /// The logs bloom of the block. - pub logs_bloom: Bloom, - /// The gas used of the block. - #[serde(with = "alloy_serde::quantity")] - pub gas_used: u64, - /// The block hash of the block. - pub block_hash: B256, - /// The transactions of the block. - pub transactions: Vec, - /// Array of [`Withdrawal`] enabled with V2 - pub withdrawals: Vec, - /// The withdrawals root of the block. - pub withdrawals_root: B256, - /// The blob gas used - #[serde( - default, - skip_serializing_if = "Option::is_none", - with = "alloy_serde::quantity::opt" - )] - pub blob_gas_used: Option, -} - -/// Represents the base configuration of an execution payload that remains constant -/// throughout block construction. This includes fundamental block properties like -/// parent hash, block number, and other header fields that are determined at -/// block creation and cannot be modified. -#[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)] -pub struct ExecutionPayloadBaseV1 { - /// Ecotone parent beacon block root - pub parent_beacon_block_root: B256, - /// The parent hash of the block. - pub parent_hash: B256, - /// The fee recipient of the block. - pub fee_recipient: Address, - /// The previous randao of the block. - pub prev_randao: B256, - /// The block number. - #[serde(with = "alloy_serde::quantity")] - pub block_number: u64, - /// The gas limit of the block. - #[serde(with = "alloy_serde::quantity")] - pub gas_limit: u64, - /// The timestamp of the block. - #[serde(with = "alloy_serde::quantity")] - pub timestamp: u64, - /// The extra data of the block. - pub extra_data: Bytes, - /// The base fee per gas of the block. - pub base_fee_per_gas: U256, -} - -#[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)] -pub struct FlashblocksPayloadV1 { - /// The payload id of the flashblock - pub payload_id: PayloadId, - /// The index of the flashblock in the block - pub index: u64, - /// The base execution payload configuration - #[serde(skip_serializing_if = "Option::is_none")] - pub base: Option, - /// The delta/diff containing modified portions of the execution payload - pub diff: ExecutionPayloadFlashblockDeltaV1, - /// Additional metadata associated with the flashblock - pub metadata: Value, -} diff --git a/crates/rollup-boost/src/flashblocks/service.rs b/crates/rollup-boost/src/flashblocks/service.rs index 9da05933..1c62e8c0 100644 --- a/crates/rollup-boost/src/flashblocks/service.rs +++ b/crates/rollup-boost/src/flashblocks/service.rs @@ -1,7 +1,4 @@ use super::outbound::WebSocketPublisher; -use super::primitives::{ - ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, -}; use crate::flashblocks::metrics::FlashblocksServiceMetrics; use crate::{ ClientResult, EngineApiExt, NewPayload, OpExecutionPayloadEnvelope, PayloadVersion, RpcClient, @@ -18,6 +15,9 @@ use op_alloy_rpc_types_engine::{ OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4, OpPayloadAttributes, }; +use op_alloy_rpc_types_engine::{ + OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, +}; use reth_optimism_payload_builder::payload_id_optimism; use std::io; use std::sync::Arc; @@ -44,13 +44,13 @@ pub enum FlashblocksError { // Simplify actor messages to just handle shutdown #[derive(Debug)] enum FlashblocksEngineMessage { - FlashblocksPayloadV1(FlashblocksPayloadV1), + OpFlashblockPayload(OpFlashblockPayload), } #[derive(Clone, Debug, Default)] pub struct FlashblockBuilder { - base: Option, - flashblocks: Vec, + base: Option, + flashblocks: Vec, } impl FlashblockBuilder { @@ -58,7 +58,7 @@ impl FlashblockBuilder { Self::default() } - pub fn extend(&mut self, payload: FlashblocksPayloadV1) -> Result<(), FlashblocksError> { + pub fn extend(&mut self, payload: OpFlashblockPayload) -> Result<(), FlashblocksError> { tracing::debug!(message = "Extending payload", payload_id = %payload.payload_id, index = payload.index, has_base=payload.base.is_some()); // Validate the index is contiguous @@ -241,7 +241,7 @@ impl FlashblocksService { async fn on_event(&mut self, event: FlashblocksEngineMessage) { match event { - FlashblocksEngineMessage::FlashblocksPayloadV1(payload) => { + FlashblocksEngineMessage::OpFlashblockPayload(payload) => { self.metrics.messages_processed.increment(1); tracing::debug!( @@ -299,9 +299,9 @@ impl FlashblocksService { } } - pub async fn run(&mut self, mut stream: mpsc::Receiver) { + pub async fn run(&mut self, mut stream: mpsc::Receiver) { while let Some(event) = stream.recv().await { - self.on_event(FlashblocksEngineMessage::FlashblocksPayloadV1(event)) + self.on_event(FlashblocksEngineMessage::OpFlashblockPayload(event)) .await; } } @@ -465,7 +465,7 @@ mod tests { let mut builder = FlashblockBuilder::new(); // Error: First payload must have a base - let result = builder.extend(FlashblocksPayloadV1 { + let result = builder.extend(OpFlashblockPayload { payload_id: PayloadId::default(), index: 0, ..Default::default() @@ -474,10 +474,10 @@ mod tests { assert_eq!(result.unwrap_err(), FlashblocksError::MissingBasePayload); // Ok: First payload is correct if it has base and index 0 - let result = builder.extend(FlashblocksPayloadV1 { + let result = builder.extend(OpFlashblockPayload { payload_id: PayloadId::default(), index: 0, - base: Some(ExecutionPayloadBaseV1 { + base: Some(OpFlashblockPayloadBase { ..Default::default() }), ..Default::default() @@ -485,10 +485,10 @@ mod tests { assert!(result.is_ok()); // Error: First payload must have index 0 - let result = builder.extend(FlashblocksPayloadV1 { + let result = builder.extend(OpFlashblockPayload { payload_id: PayloadId::default(), index: 1, - base: Some(ExecutionPayloadBaseV1 { + base: Some(OpFlashblockPayloadBase { ..Default::default() }), ..Default::default() @@ -497,7 +497,7 @@ mod tests { assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload); // Error: Second payload must have a follow-up index - let result = builder.extend(FlashblocksPayloadV1 { + let result = builder.extend(OpFlashblockPayload { payload_id: PayloadId::default(), index: 2, base: None, @@ -507,7 +507,7 @@ mod tests { assert_eq!(result.unwrap_err(), FlashblocksError::InvalidIndex); // Ok: Second payload has the correct index - let result = builder.extend(FlashblocksPayloadV1 { + let result = builder.extend(OpFlashblockPayload { payload_id: PayloadId::default(), index: 1, base: None, From e4e284ac28159a1423cd81afda45d753a33ae136 Mon Sep 17 00:00:00 2001 From: Francis Li Date: Wed, 19 Nov 2025 20:56:17 -0800 Subject: [PATCH 2/2] fmt --- crates/flashblocks-rpc/src/tests/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/flashblocks-rpc/src/tests/mod.rs b/crates/flashblocks-rpc/src/tests/mod.rs index 2b7eb2c5..10ba1864 100644 --- a/crates/flashblocks-rpc/src/tests/mod.rs +++ b/crates/flashblocks-rpc/src/tests/mod.rs @@ -8,6 +8,10 @@ mod tests { use alloy_rpc_client::RpcClient; use alloy_rpc_types_engine::PayloadId; use op_alloy_consensus::OpReceipt; + use op_alloy_rpc_types_engine::{ + OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, + OpFlashblockPayloadMetadata, + }; use reth_node_builder::{Node, NodeBuilder, NodeConfig, NodeHandle}; use reth_node_core::{ args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, @@ -17,10 +21,6 @@ mod tests { use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_provider::providers::BlockchainProvider; use reth_tasks::TaskManager; - use op_alloy_rpc_types_engine::{ - OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, - OpFlashblockPayloadMetadata, - }; use std::{any::Any, collections::BTreeMap, net::SocketAddr, str::FromStr, sync::Arc}; use tokio::sync::{mpsc, oneshot}; use url::Url;