From 2c89a30d7a4796a66bc2b96b58fb839f847cf284 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:31:53 -0400 Subject: [PATCH 1/6] refactor BundleOpts to use with_ methods to set --- crates/op-rbuilder/src/tests/flashblocks.rs | 15 +- crates/op-rbuilder/src/tests/framework/txs.rs | 44 +++++- crates/op-rbuilder/src/tests/revert.rs | 144 ++++++++++-------- 3 files changed, 125 insertions(+), 78 deletions(-) diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index cd17f8aeb..990c75f66 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -466,10 +466,7 @@ async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result< let tx1 = driver .create_transaction() .random_valid_transfer() - .with_bundle(BundleOpts { - flashblock_number_min: Some(0), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_flashblock_number_min(0)) .with_max_priority_fee_per_gas(0) .send() .await?; @@ -477,10 +474,7 @@ async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result< let tx2 = driver .create_transaction() .random_valid_transfer() - .with_bundle(BundleOpts { - flashblock_number_min: Some(3), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_flashblock_number_min(3)) .with_max_priority_fee_per_gas(10) .send() .await?; @@ -575,10 +569,7 @@ async fn test_flashblock_max_filtering(rbuilder: LocalInstance) -> eyre::Result< let tx1 = driver .create_transaction() .random_valid_transfer() - .with_bundle(BundleOpts { - flashblock_number_max: Some(1), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_flashblock_number_max(1)) .send() .await?; diff --git a/crates/op-rbuilder/src/tests/framework/txs.rs b/crates/op-rbuilder/src/tests/framework/txs.rs index 24a1748d6..0e78d0eb9 100644 --- a/crates/op-rbuilder/src/tests/framework/txs.rs +++ b/crates/op-rbuilder/src/tests/framework/txs.rs @@ -25,12 +25,44 @@ use super::FUNDED_PRIVATE_KEYS; #[derive(Clone, Copy, Default)] pub struct BundleOpts { - pub block_number_min: Option, - pub block_number_max: Option, - pub flashblock_number_min: Option, - pub flashblock_number_max: Option, - pub min_timestamp: Option, - pub max_timestamp: Option, + block_number_min: Option, + block_number_max: Option, + flashblock_number_min: Option, + flashblock_number_max: Option, + min_timestamp: Option, + max_timestamp: Option, +} + +impl BundleOpts { + pub fn with_block_number_min(mut self, block_number_min: u64) -> Self { + self.block_number_min = Some(block_number_min); + self + } + + pub fn with_block_number_max(mut self, block_number_max: u64) -> Self { + self.block_number_max = Some(block_number_max); + self + } + + pub fn with_flashblock_number_min(mut self, flashblock_number_min: u64) -> Self { + self.flashblock_number_min = Some(flashblock_number_min); + self + } + + pub fn with_flashblock_number_max(mut self, flashblock_number_max: u64) -> Self { + self.flashblock_number_max = Some(flashblock_number_max); + self + } + + pub fn with_min_timestamp(mut self, min_timestamp: u64) -> Self { + self.min_timestamp = Some(min_timestamp); + self + } + + pub fn with_max_timestamp(mut self, max_timestamp: u64) -> Self { + self.max_timestamp = Some(max_timestamp); + self + } } #[derive(Clone)] diff --git a/crates/op-rbuilder/src/tests/revert.rs b/crates/op-rbuilder/src/tests/revert.rs index 8037f996c..76e1c5b9b 100644 --- a/crates/op-rbuilder/src/tests/revert.rs +++ b/crates/op-rbuilder/src/tests/revert.rs @@ -31,10 +31,9 @@ async fn monitor_transaction_gc(rbuilder: LocalInstance) -> eyre::Result<()> { .create_transaction() .random_reverting_transaction() .with_signer(accounts[i].clone()) - .with_bundle(BundleOpts { - block_number_max: Some(latest_block_number + i as u64 + 1), - ..Default::default() - }) + .with_bundle( + BundleOpts::default().with_block_number_max(latest_block_number + i as u64 + 1), + ) .send() .await?; pending_txn.push(txn); @@ -141,10 +140,7 @@ async fn bundle(rbuilder: LocalInstance) -> eyre::Result<()> { .includes(valid_bundle.tx_hash()) ); - let bundle_opts = BundleOpts { - block_number_max: Some(4), - ..Default::default() - }; + let bundle_opts = BundleOpts::default().with_block_number_max(4); let reverted_bundle = driver .create_transaction() @@ -183,10 +179,7 @@ async fn bundle_min_block_number(rbuilder: LocalInstance) -> eyre::Result<()> { .create_transaction() .with_revert() // the transaction reverts but it is included in the block .with_reverted_hash() - .with_bundle(BundleOpts { - block_number_min: Some(2), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_block_number_min(2)) .send() .await?; @@ -201,11 +194,11 @@ async fn bundle_min_block_number(rbuilder: LocalInstance) -> eyre::Result<()> { .create_transaction() .with_revert() .with_reverted_hash() - .with_bundle(BundleOpts { - block_number_max: Some(4), - block_number_min: Some(4), - ..Default::default() - }) + .with_bundle( + BundleOpts::default() + .with_block_number_max(4) + .with_block_number_min(4), + ) .send() .await?; @@ -232,10 +225,7 @@ async fn bundle_min_timestamp(rbuilder: LocalInstance) -> eyre::Result<()> { .create_transaction() .with_revert() // the transaction reverts but it is included in the block .with_reverted_hash() - .with_bundle(BundleOpts { - min_timestamp: Some(initial_timestamp + 2), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_min_timestamp(initial_timestamp + 2)) .send() .await?; @@ -261,84 +251,121 @@ async fn bundle_range_limits(rbuilder: LocalInstance) -> eyre::Result<()> { async fn send_bundle( driver: &ChainDriver, - block_number_max: Option, - block_number_min: Option, + bundle: BundleOpts, ) -> eyre::Result> { - driver - .create_transaction() - .with_bundle(BundleOpts { - block_number_max, - block_number_min, - ..Default::default() - }) - .send() - .await + driver.create_transaction().with_bundle(bundle).send().await } // Max block cannot be a past block - assert!(send_bundle(&driver, Some(1), None).await.is_err()); + assert!( + send_bundle(&driver, BundleOpts::default().with_block_number_max(1)) + .await + .is_err() + ); // Bundles are valid if their max block in in between the current block and the max block range let current_block = 2; let next_valid_block = current_block + 1; for i in next_valid_block..next_valid_block + MAX_BLOCK_RANGE_BLOCKS { - assert!(send_bundle(&driver, Some(i), None).await.is_ok()); + assert!( + send_bundle(&driver, BundleOpts::default().with_block_number_max(i)) + .await + .is_ok() + ); } // A bundle with a block out of range is invalid assert!( send_bundle( &driver, - Some(next_valid_block + MAX_BLOCK_RANGE_BLOCKS + 1), - None + BundleOpts::default() + .with_block_number_max(next_valid_block + MAX_BLOCK_RANGE_BLOCKS + 1) ) .await .is_err() ); // A bundle with a min block number higher than the max block is invalid - assert!(send_bundle(&driver, Some(1), Some(2)).await.is_err()); + assert!( + send_bundle( + &driver, + BundleOpts::default() + .with_block_number_max(1) + .with_block_number_min(2) + ) + .await + .is_err() + ); // A bundle with a min block number lower or equal to the current block is valid assert!( - send_bundle(&driver, Some(next_valid_block), Some(current_block)) - .await - .is_ok() + send_bundle( + &driver, + BundleOpts::default() + .with_block_number_max(next_valid_block) + .with_block_number_min(current_block) + ) + .await + .is_ok() ); assert!( - send_bundle(&driver, Some(next_valid_block), Some(0)) - .await - .is_ok() + send_bundle( + &driver, + BundleOpts::default().with_block_number_max(next_valid_block) + ) + .await + .is_ok() ); // A bundle with a min block equal to max block is valid assert!( - send_bundle(&driver, Some(next_valid_block), Some(next_valid_block)) - .await - .is_ok() + send_bundle( + &driver, + BundleOpts::default() + .with_block_number_max(next_valid_block) + .with_block_number_min(next_valid_block) + ) + .await + .is_ok() ); // Test min-only cases (no max specified) // A bundle with only min block that's within the default range is valid let default_max = current_block + MAX_BLOCK_RANGE_BLOCKS; assert!( - send_bundle(&driver, None, Some(current_block)) - .await - .is_ok() + send_bundle( + &driver, + BundleOpts::default().with_block_number_min(current_block) + ) + .await + .is_ok() ); assert!( - send_bundle(&driver, None, Some(default_max - 1)) - .await - .is_ok() + send_bundle( + &driver, + BundleOpts::default().with_block_number_min(default_max - 1) + ) + .await + .is_ok() + ); + assert!( + send_bundle( + &driver, + BundleOpts::default().with_block_number_min(default_max) + ) + .await + .is_ok() ); - assert!(send_bundle(&driver, None, Some(default_max)).await.is_ok()); // A bundle with only min block that exceeds the default max range is invalid assert!( - send_bundle(&driver, None, Some(default_max + 1)) - .await - .is_err() + send_bundle( + &driver, + BundleOpts::default().with_block_number_min(default_max + 1) + ) + .await + .is_err() ); Ok(()) @@ -387,10 +414,7 @@ async fn check_transaction_receipt_status_message(rbuilder: LocalInstance) -> ey let reverting_tx = driver .create_transaction() .random_reverting_transaction() - .with_bundle(BundleOpts { - block_number_max: Some(3), - ..Default::default() - }) + .with_bundle(BundleOpts::default().with_block_number_max(3)) .send() .await?; let tx_hash = reverting_tx.tx_hash(); From e88365e2b4a907cbd7e705efa8d86110c0fa085c Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:38:41 -0400 Subject: [PATCH 2/6] print logs during tests --- .../op-rbuilder/src/tests/framework/macros/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs index 64cb940ab..296b7d3fa 100644 --- a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs +++ b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs @@ -231,6 +231,16 @@ pub fn rb_test(args: TokenStream, input: TokenStream) -> TokenStream { generated_functions.push(quote! { #test_attribute async fn #test_name() -> eyre::Result<()> { + let subscriber = tracing_subscriber::fmt() + .with_env_filter(std::env::var("RUST_LOG") + .unwrap_or_else(|_| "info".to_string())) + .with_file(true) + .with_line_number(true) + .with_test_writer() + .finish(); + let _guard = tracing::subscriber::set_global_default(subscriber); + tracing::info!("{} start", stringify!(#test_name)); + let instance = #instance_init; #original_name(instance).await } From d70704fb0ecf5cccf7eb7093fe73ccc1f7c307ac Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:38:41 -0400 Subject: [PATCH 3/6] fix connecting to flashblocks ws url --- .../src/tests/framework/macros/src/lib.rs | 4 ++-- crates/op-rbuilder/src/tests/framework/utils.rs | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs index 296b7d3fa..daa88c7e7 100644 --- a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs +++ b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs @@ -28,7 +28,7 @@ const BUILDER_VARIANTS: &[VariantInfo] = &[ { let mut args = #args; args.flashblocks.enabled = true; - args.flashblocks.flashblocks_port = 0; + args.flashblocks.flashblocks_port = crate::tests::get_available_port(); args } } @@ -38,7 +38,7 @@ const BUILDER_VARIANTS: &[VariantInfo] = &[ { let mut args = crate::args::OpRbuilderArgs::default(); args.flashblocks.enabled = true; - args.flashblocks.flashblocks_port = 0; + args.flashblocks.flashblocks_port = crate::tests::get_available_port(); args } } diff --git a/crates/op-rbuilder/src/tests/framework/utils.rs b/crates/op-rbuilder/src/tests/framework/utils.rs index 2cb5f51b3..a71361d2e 100644 --- a/crates/op-rbuilder/src/tests/framework/utils.rs +++ b/crates/op-rbuilder/src/tests/framework/utils.rs @@ -15,7 +15,7 @@ use reth_db::{ }; use reth_node_core::{args::DatadirArgs, dirs::DataDirPath, node_config::NodeConfig}; use reth_optimism_chainspec::OpChainSpec; -use std::sync::Arc; +use std::{net::TcpListener, sync::Arc}; use super::{FUNDED_PRIVATE_KEYS, TransactionBuilder}; @@ -228,3 +228,14 @@ pub fn create_test_db(config: NodeConfig) -> Arc u16 { + TcpListener::bind("127.0.0.1:0") + .expect("Failed to bind to random port") + .local_addr() + .expect("Failed to get local address") + .port() +} From 51fe8035417206a15469c317a77bb6a4af431025 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:38:41 -0400 Subject: [PATCH 4/6] fix propagating bundle params for flashblocks check --- crates/op-rbuilder/src/tests/framework/txs.rs | 4 ++-- crates/op-rbuilder/src/tx.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/op-rbuilder/src/tests/framework/txs.rs b/crates/op-rbuilder/src/tests/framework/txs.rs index 0e78d0eb9..7c91892ad 100644 --- a/crates/op-rbuilder/src/tests/framework/txs.rs +++ b/crates/op-rbuilder/src/tests/framework/txs.rs @@ -229,8 +229,8 @@ impl TransactionBuilder { }, block_number_min: bundle_opts.block_number_min, block_number_max: bundle_opts.block_number_max, - flashblock_number_min: bundle_opts.block_number_min, - flashblock_number_max: bundle_opts.block_number_max, + flashblock_number_min: bundle_opts.flashblock_number_min, + flashblock_number_max: bundle_opts.flashblock_number_max, min_timestamp: bundle_opts.min_timestamp, max_timestamp: bundle_opts.max_timestamp, }; diff --git a/crates/op-rbuilder/src/tx.rs b/crates/op-rbuilder/src/tx.rs index 7626fff1d..64f7e43f5 100644 --- a/crates/op-rbuilder/src/tx.rs +++ b/crates/op-rbuilder/src/tx.rs @@ -293,8 +293,8 @@ impl MaybeConditionalTransaction for FBPooledTransaction { FBPooledTransaction { inner: self.inner.with_conditional(conditional), reverted_hashes: self.reverted_hashes, - flashblock_number_min: None, - flashblock_number_max: None, + flashblock_number_min: self.flashblock_number_min, + flashblock_number_max: self.flashblock_number_max, } } } From 71243bad0b1c4e27803123930f61bd509111e674 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:38:41 -0400 Subject: [PATCH 5/6] refactor flashblock ws listener in tests --- .../src/builders/flashblocks/best_txs.rs | 1 + crates/op-rbuilder/src/tests/flashblocks.rs | 330 +++--------------- .../src/tests/framework/instance.rs | 109 +++++- 3 files changed, 154 insertions(+), 286 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs index e8d73bad5..b2108aa79 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/best_txs.rs @@ -74,6 +74,7 @@ where if self.current_flashblock_number > max { debug!( target: "payload_builder", + tx_hash = ?tx.hash(), sender = ?tx.sender(), nonce = tx.nonce(), current_flashblock = self.current_flashblock_number, diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index 990c75f66..4b5b61df1 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -1,11 +1,6 @@ use alloy_provider::Provider; -use futures::StreamExt; use macros::rb_test; -use parking_lot::Mutex; -use std::{sync::Arc, time::Duration}; -use tokio::task::JoinHandle; -use tokio_tungstenite::{connect_async, tungstenite::Message}; -use tokio_util::sync::CancellationToken; +use std::time::Duration; use crate::{ args::{FlashblocksArgs, OpRbuilderArgs}, @@ -26,30 +21,7 @@ use crate::{ })] async fn smoke_dynamic_base(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // We align out block timestamps with current unix timestamp for _ in 0..10 { @@ -66,18 +38,10 @@ async fn smoke_dynamic_base(rbuilder: LocalInstance) -> eyre::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(110, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -94,30 +58,7 @@ async fn smoke_dynamic_base(rbuilder: LocalInstance) -> eyre::Result<()> { })] async fn smoke_dynamic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // We align out block timestamps with current unix timestamp for _ in 0..10 { @@ -134,18 +75,10 @@ async fn smoke_dynamic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(60, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -162,30 +95,7 @@ async fn smoke_dynamic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { })] async fn smoke_classic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // We align out block timestamps with current unix timestamp for _ in 0..10 { @@ -202,18 +112,10 @@ async fn smoke_classic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(60, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -230,30 +132,7 @@ async fn smoke_classic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { })] async fn smoke_classic_base(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // We align out block timestamps with current unix timestamp for _ in 0..10 { @@ -270,18 +149,10 @@ async fn smoke_classic_base(rbuilder: LocalInstance) -> eyre::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(110, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -298,30 +169,7 @@ async fn smoke_classic_base(rbuilder: LocalInstance) -> eyre::Result<()> { })] async fn unichain_dynamic_with_lag(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // We align out block timestamps with current unix timestamp for i in 0..9 { @@ -340,18 +188,10 @@ async fn unichain_dynamic_with_lag(rbuilder: LocalInstance) -> eyre::Result<()> tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(34, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -368,30 +208,7 @@ async fn unichain_dynamic_with_lag(rbuilder: LocalInstance) -> eyre::Result<()> })] async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); for _ in 0..5 { // send a valid transaction @@ -406,18 +223,11 @@ async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<() .await?; // We could only produce block with deposits + builder tx because of short time frame assert_eq!(block.transactions.len(), 2); - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); - assert!( - !received_messages - .lock() - .iter() - .any(|msg| msg.contains("Building flashblock")), - "No messages received from WebSocket" - ); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(1, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -435,30 +245,7 @@ async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<() })] async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // Create two transactions and set their tips so that while ordinarily // tx2 would come before tx1 because its tip is bigger, now tx1 comes @@ -479,34 +266,27 @@ async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result< .send() .await?; - let block1 = driver.build_new_block_with_current_timestamp(None).await?; + let _block1 = driver.build_new_block_with_current_timestamp(None).await?; // Check that tx1 comes before tx2 let tx1_hash = *tx1.tx_hash(); let tx2_hash = *tx2.tx_hash(); - let mut tx1_pos = None; - let mut tx2_pos = None; - - for (i, item) in block1.transactions.hashes().into_iter().enumerate() { - if item == tx1_hash { - tx1_pos = Some(i); - } - if item == tx2_hash { - tx2_pos = Some(i); - } - } + let tx1_pos = flashblocks_listener + .find_transaction_flashblock(&tx1_hash) + .unwrap(); + let tx2_pos = flashblocks_listener + .find_transaction_flashblock(&tx2_hash) + .unwrap(); - assert!(tx1_pos.is_some(), "tx {tx1_hash:?} not found"); - assert!(tx2_pos.is_some(), "tx {tx2_hash:?} not found"); assert!( - tx1_pos.unwrap() < tx2_pos.unwrap(), + tx1_pos < tx2_pos, "tx {tx1_hash:?} does not come before {tx2_hash:?}" ); - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(6, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } #[rb_test(flashblocks, args = OpRbuilderArgs { @@ -524,30 +304,7 @@ async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result< })] async fn test_flashblock_max_filtering(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - let cancellation_token = CancellationToken::new(); - let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); - - // Spawn WebSocket listener task - let cancellation_token_clone = cancellation_token.clone(); - let ws_handle: JoinHandle> = tokio::spawn(async move { - let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; - let (_, mut read) = ws_stream.split(); - - loop { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - break Ok(()); - } - Some(Ok(Message::Text(text))) = read.next() => { - messages_clone.lock().push(text); - } - } - } - }); + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); // Since we cannot directly trigger flashblock creation in tests, we // instead fill up the gas of flashblocks so that our tx with the @@ -575,9 +332,14 @@ async fn test_flashblock_max_filtering(rbuilder: LocalInstance) -> eyre::Result< let block = driver.build_new_block_with_current_timestamp(None).await?; assert!(!block.includes(tx1.tx_hash())); + assert!( + flashblocks_listener + .find_transaction_flashblock(tx1.tx_hash()) + .is_none() + ); - cancellation_token.cancel(); - assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(6, flashblocks.len()); - Ok(()) + flashblocks_listener.stop().await } diff --git a/crates/op-rbuilder/src/tests/framework/instance.rs b/crates/op-rbuilder/src/tests/framework/instance.rs index 45bf5e62e..34f5f506f 100644 --- a/crates/op-rbuilder/src/tests/framework/instance.rs +++ b/crates/op-rbuilder/src/tests/framework/instance.rs @@ -10,6 +10,7 @@ use crate::{ tx::FBPooledTransaction, tx_signer::Signer, }; +use alloy_primitives::B256; use alloy_provider::{Identity, ProviderBuilder, RootProvider}; use clap::Parser; use core::{ @@ -20,10 +21,11 @@ use core::{ task::{Context, Poll}, time::Duration, }; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use moka::future::Cache; use nanoid::nanoid; use op_alloy_network::Optimism; +use parking_lot::Mutex; use reth::{ args::{DatadirArgs, NetworkArgs, RpcServerArgs}, core::exit::NodeExitFuture, @@ -38,8 +40,12 @@ use reth_optimism_node::{ }; use reth_optimism_rpc::OpEthApiBuilder; use reth_transaction_pool::{AllTransactionsEvents, TransactionPool}; +use rollup_boost::FlashblocksPayloadV1; use std::sync::{Arc, LazyLock}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinHandle}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_util::sync::CancellationToken; +use tracing::warn; /// Represents a type that emulates a local in-process instance of the OP builder node. /// This node uses IPC as the communication channel for the RPC server Engine API. @@ -225,6 +231,10 @@ impl LocalInstance { format!("ws://{ipaddr}:{port}/") } + pub fn spawn_flashblocks_listener(&self) -> FlashblocksListener { + FlashblocksListener::new(self.flashblocks_ws_url()) + } + pub fn rpc_ipc(&self) -> &str { &self.config.rpc.ipcpath } @@ -345,3 +355,98 @@ fn pool_component(args: &OpRbuilderArgs) -> OpPoolBuilder { rollup_args.supervisor_safety_level, ) } + +/// A utility for listening to flashblocks WebSocket messages during tests. +/// +/// This provides a reusable way to capture and inspect flashblocks that are produced +/// during test execution, eliminating the need for duplicate WebSocket listening code. +pub struct FlashblocksListener { + pub flashblocks: Arc>>, + pub cancellation_token: CancellationToken, + pub handle: JoinHandle>, +} + +impl FlashblocksListener { + /// Create a new flashblocks listener that connects to the given WebSocket URL. + /// + /// The listener will automatically parse incoming messages as FlashblocksPayloadV1. + fn new(flashblocks_ws_url: String) -> Self { + let flashblocks = Arc::new(Mutex::new(Vec::new())); + let cancellation_token = CancellationToken::new(); + + let flashblocks_clone = flashblocks.clone(); + let cancellation_token_clone = cancellation_token.clone(); + + let handle = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + let fb = serde_json::from_str(&text).unwrap(); + warn!("GOT FB: {fb:#?}"); + flashblocks_clone.lock().push(fb); + } + } + } + }); + + Self { + flashblocks, + cancellation_token, + handle, + } + } + + /// Get a snapshot of all received flashblocks + pub fn get_flashblocks(&self) -> Vec { + self.flashblocks.lock().clone() + } + + /// Find a flashblock by index + pub fn find_flashblock(&self, index: u64) -> Option { + self.flashblocks + .lock() + .iter() + .find(|fb| fb.index == index) + .cloned() + } + + /// Check if any flashblock contains the given transaction hash + pub fn contains_transaction(&self, tx_hash: &B256) -> bool { + let tx_hash_str = format!("{:#x}", tx_hash); + self.flashblocks.lock().iter().any(|fb| { + if let Some(receipts) = fb.metadata.get("receipts") { + if let Some(receipts_obj) = receipts.as_object() { + return receipts_obj.contains_key(&tx_hash_str); + } + } + false + }) + } + + /// Find which flashblock index contains the given transaction hash + pub fn find_transaction_flashblock(&self, tx_hash: &B256) -> Option { + let tx_hash_str = format!("{:#x}", tx_hash); + self.flashblocks.lock().iter().find_map(|fb| { + if let Some(receipts) = fb.metadata.get("receipts") { + if let Some(receipts_obj) = receipts.as_object() { + if receipts_obj.contains_key(&tx_hash_str) { + return Some(fb.index); + } + } + } + None + }) + } + + /// Stop the listener and wait for it to complete + pub async fn stop(self) -> eyre::Result<()> { + self.cancellation_token.cancel(); + self.handle.await? + } +} From 552cb2de224eb54348100515c83637799af0d926 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Thu, 11 Sep 2025 11:54:42 -0400 Subject: [PATCH 6/6] test setting min=max flashblock param --- crates/op-rbuilder/src/tests/flashblocks.rs | 45 +++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index 4b5b61df1..d492d5c11 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -343,3 +343,48 @@ async fn test_flashblock_max_filtering(rbuilder: LocalInstance) -> eyre::Result< flashblocks_listener.stop().await } + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 1000, + enable_revert_protection: true, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 100, + flashblocks_fixed: false, + }, + ..Default::default() +})] +async fn test_flashblock_min_max_filtering(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + let tx1 = driver + .create_transaction() + .random_valid_transfer() + .with_bundle( + BundleOpts::default() + .with_flashblock_number_max(2) + .with_flashblock_number_min(2), + ) + .send() + .await?; + + let _block = driver.build_new_block_with_current_timestamp(None).await?; + + // It ends up in the flashblock with index 3. Flashblock number and index + // are different. + assert_eq!( + 2 + 1, + flashblocks_listener + .find_transaction_flashblock(tx1.tx_hash()) + .unwrap() + ); + + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!(6, flashblocks.len()); + + flashblocks_listener.stop().await +}