diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 784aeb7c7..6fa740a43 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -118,14 +118,15 @@ pub struct FlashblocksArgs { )] pub flashblocks_block_time: u64, - /// Enabled dynamic flashblocks adjustment. This will allow account for late FCUs and produce - /// less flashblocks, while each flashblock would be bigger. + /// Builder would always thry to produce fixed number of flashblocks without regard to time of + /// FCU arrival. + /// In cases of late FCU it could lead to partially filled blocks. #[arg( - long = "flashblocks.dynamic", + long = "flashblocks.fixed", default_value = "false", - env = "FLASHBLOCK_DYNAMIC" + env = "FLASHBLOCK_FIXED" )] - pub flashblocks_dynamic: bool, + pub flashblocks_fixed: bool, /// Time by which blocks would be completed earlier in milliseconds. /// diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 0229a2b32..702566beb 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -17,11 +17,17 @@ pub struct FlashblocksConfig { pub interval: Duration, /// How much time would be deducted from block build time to account for latencies in - /// milliseconds + /// milliseconds. + /// + /// If dynamic_adjustment is false this value would be deducted from first flashblock and + /// it shouldn't be more than interval + /// + /// If dynamic_adjustment is true this value would be deducted from first flashblock and + /// it shouldn't be more than interval pub leeway_time: Duration, - /// Enables dynamic flashblocks number based on FCU arrival time - pub dynamic_adjustment: bool, + /// Disables dynamic flashblocks number adjustment based on FCU arrival time + pub fixed: bool, } impl Default for FlashblocksConfig { @@ -30,7 +36,7 @@ impl Default for FlashblocksConfig { ws_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 1111), interval: Duration::from_millis(250), leeway_time: Duration::from_millis(50), - dynamic_adjustment: false, + fixed: false, } } } @@ -48,13 +54,13 @@ impl TryFrom for FlashblocksConfig { let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); - let dynamic_adjustment = args.flashblocks.flashblocks_dynamic; + let fixed = args.flashblocks.flashblocks_fixed; Ok(Self { ws_addr, interval, leeway_time, - dynamic_adjustment, + fixed, }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 157b89862..46056ae61 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -45,7 +45,11 @@ use std::{ sync::Arc, time::Instant, }; -use tokio::sync::mpsc; +use tokio::sync::{ + mpsc, + mpsc::{error::SendError, Sender}, +}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; #[derive(Debug, Default)] @@ -146,7 +150,11 @@ where best_payload: BlockCell, ) -> Result<(), PayloadBuilderError> { let block_build_start_time = Instant::now(); - let BuildArguments { config, cancel, .. } = args; + let BuildArguments { + config, + cancel: block_cancel, + .. + } = args; // We log only every 100th block to reduce usage let span = if cfg!(feature = "telemetry") @@ -164,34 +172,6 @@ where let chain_spec = self.client.chain_spec(); let timestamp = config.attributes.timestamp(); - // We use this system time to determine remining time to build a block - // Things to consider: - // FCU(a) - FCU with attributes - // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock - // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock - // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks - let time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) - - self.config.specific.leeway_time; - let time_drift = time.duration_since(std::time::SystemTime::now()).ok(); - match time_drift { - None => error!( - target: "payload_builder", - message = "FCU arrived too late or system clock are unsynced", - ?time, - ), - Some(time_drift) => { - self.metrics - .flashblock_time_drift - .record(time_drift.as_millis() as f64); - debug!( - target: "payload_builder", - message = "Time drift for building round", - ?time, - ?time_drift, - ?timestamp - ); - } - } let block_env_attributes = OpNextBlockEnvAttributes { timestamp, suggested_fee_recipient: config.attributes.suggested_fee_recipient(), @@ -219,13 +199,14 @@ where .next_evm_env(&config.parent_header, &block_env_attributes) .map_err(PayloadBuilderError::other)?; - let ctx = OpPayloadBuilderCtx { + let mut ctx = OpPayloadBuilderCtx { evm_config: self.evm_config.clone(), chain_spec: self.client.chain_spec(), config, evm_env, block_env_attributes, - cancel, + // Here we use parent token because child token handing is only for proper flashblocks + cancel: block_cancel.clone(), da_config: self.config.da_config.clone(), builder_signer: self.config.builder_signer, metrics: Default::default(), @@ -292,7 +273,7 @@ where } // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable let (flashblocks_per_block, first_flashblock_offset) = - self.calculate_flashblocks(time_drift); + self.calculate_flashblocks(timestamp); info!( target: "payload_builder", message = "Performed flashblocks timing derivation", @@ -331,56 +312,14 @@ where let last_flashblock = flashblocks_per_block.saturating_sub(1); let mut flashblock_count = 0; - // Create a channel to coordinate flashblock building - let (build_tx, mut build_rx) = mpsc::channel(1); - - // Spawn the timer task that signals when to build a new flashblock - let cancel_clone = ctx.cancel.clone(); - let interval = self.config.specific.interval; - tokio::spawn(async move { - // We handle first flashblock separately, because it could be shrunk to fit everything - let mut first_interval = tokio::time::interval(first_flashblock_offset); - // If first_flashblock_offset == 0 that means all flashblock are proper, and we just - // skip this cusom logic - if first_flashblock_offset.as_millis() != 0 { - let cancelled = cancel_clone - .run_until_cancelled(async { - // We send first signal straight away and then wait first_interval duration - // to send second signal and start building normal blocks - for _ in 0..2 { - first_interval.tick().await; - // TODO: maybe return None if cancelled - if let Err(err) = build_tx.send(()).await { - error!(target: "payload_builder", "Error sending build signal: {}", err); - } - } - }) - .await; - if cancelled.is_none() { - info!(target: "payload_builder", "Building job cancelled, stopping payload building"); - drop(build_tx); - return; - } - } - // Handle rest of fbs in steady rate - let mut interval = tokio::time::interval(interval); - let cancelled = cancel_clone.run_until_cancelled(async { - // First tick completes immediately - interval.tick().await; - loop { - interval.tick().await; - if let Err(err) = build_tx.send(()).await { - error!(target: "payload_builder", "Error sending build signal: {}", err); - break; - } - } - }).await; - if cancelled.is_none() { - info!(target: "payload_builder", "Building job cancelled, stopping payload building"); - drop(build_tx); - } - }); - + // This channel coordinates flashblock building + let (fb_cancel_token_rx, mut fb_cancel_token_tx) = + mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + self.spawn_timer_task( + block_cancel.clone(), + fb_cancel_token_rx, + first_flashblock_offset, + ); // Process flashblocks in a blocking loop loop { let fb_span = if span.is_none() { @@ -393,30 +332,19 @@ where ) }; let _entered = fb_span.enter(); - // Block on receiving a message, break on cancellation or closed channel - let received = tokio::task::block_in_place(|| { - // Get runtime handle - let rt = tokio::runtime::Handle::current(); - - // Run the async operation to completion, blocking the current thread - rt.block_on(async { - // Check for cancellation first - if ctx.cancel.is_cancelled() { - info!( - target: "payload_builder", - "Job cancelled, stopping payload building", - ); - return None; - } - - // Wait for next message - build_rx.recv().await - }) - }); - // Exit loop if channel closed or cancelled - match received { - Some(()) => { + // We get token from time loop. Token from this channel means that we need to start build flashblock + // Cancellation of this token means that we need to stop building flashblock. + // If channel return None it means that we built all flashblock or parent_token got cancelled + let fb_cancel_token = + tokio::task::block_in_place(|| fb_cancel_token_tx.blocking_recv()).flatten(); + + match fb_cancel_token { + Some(cancel_token) => { + // We use fb_cancel_token inside context so we could exit from + // execute_best_transaction without cancelling parent token + ctx.cancel = cancel_token; + // TODO: remove this if flashblock_count >= flashblocks_per_block { info!( target: "payload_builder", @@ -427,7 +355,6 @@ where ); continue; } - // Continue with flashblock building info!( target: "payload_builder", @@ -464,27 +391,17 @@ where .record(best_txs_start_time.elapsed()); let tx_execution_start_time = Instant::now(); - if ctx - .execute_best_transactions( - &mut info, - &mut db, - best_txs, - total_gas_per_batch.min(ctx.block_gas_limit()), - total_da_per_batch, - )? - .is_some() - { - // Handles job cancellation - info!( - target: "payload_builder", - "Job cancelled, stopping payload building", - ); - ctx.metrics - .payload_tx_simulation_duration - .record(tx_execution_start_time.elapsed()); - // if the job was cancelled, stop - return Ok(()); - } + ctx.execute_best_transactions( + &mut info, + &mut db, + best_txs, + total_gas_per_batch.min(ctx.block_gas_limit()), + total_da_per_batch, + )?; + ctx.metrics + .payload_tx_simulation_duration + .record(tx_execution_start_time.elapsed()); + ctx.metrics .payload_tx_simulation_duration .record(tx_execution_start_time.elapsed()); @@ -513,6 +430,12 @@ where fb_payload.index = flashblock_count + 1; // we do this because the fallback block is index 0 fb_payload.base = None; + // We check that child_job got cancelled before sending flashblock. + // This will ensure consistent timing between flashblocks. + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(async { ctx.cancel.cancelled().await }); + }); self.ws_pub .publish(&fb_payload) .map_err(PayloadBuilderError::other)?; @@ -566,32 +489,107 @@ where } } + /// Spawn task that will send new flashblock level cancel token in steady intervals (first interval + /// may vary if --flashblocks.dynamic enabled) + pub fn spawn_timer_task( + &self, + block_cancel: CancellationToken, + flashblock_cancel_token_rx: Sender>, + first_flashblock_offset: Duration, + ) { + let interval = self.config.specific.interval; + tokio::spawn(async move { + let cancelled: Option>>> = block_cancel + .run_until_cancelled(async { + // Create first fb interval already started + let mut timer = tokio::time::interval(first_flashblock_offset); + timer.tick().await; + let child_token = block_cancel.child_token(); + flashblock_cancel_token_rx + .send(Some(child_token.clone())) + .await?; + timer.tick().await; + // Time to build flashblock has ended so we cancel the token + child_token.cancel(); + // We would start using regular intervals from here on + let mut timer = tokio::time::interval(interval); + timer.tick().await; + loop { + // Initiate fb job + let child_token = block_cancel.child_token(); + flashblock_cancel_token_rx + .send(Some(child_token.clone())) + .await?; + timer.tick().await; + // Cancel job once time is up + child_token.cancel(); + } + }) + .await; + if let Some(Err(err)) = cancelled { + error!(target: "payload_builder", "Timer task encountered error: {err}"); + } else { + info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + } + }); + } + /// Calculate number of flashblocks. /// If dynamic is enabled this function will take time drift into the account. - pub fn calculate_flashblocks(&self, time_drift: Option) -> (u64, Duration) { - if !self.config.specific.dynamic_adjustment || time_drift.is_none() { + pub fn calculate_flashblocks(&self, timestamp: u64) -> (u64, Duration) { + if self.config.specific.fixed { return ( self.config.flashblocks_per_block(), - self.config.specific.interval, + // We adjust first FB to ensure that we have at least some time to make all FB in time + self.config.specific.interval - self.config.specific.leeway_time, ); } - let time_drift = time_drift.unwrap(); - let interval = self.config.specific.interval.as_millis(); - let time_drift = time_drift.as_millis(); + // We use this system time to determine remining time to build a block + // Things to consider: + // FCU(a) - FCU with attributes + // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock + // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock + // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks + let time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) + - self.config.specific.leeway_time; + let now = std::time::SystemTime::now(); + let Ok(time_drift) = time.duration_since(now) else { + error!( + target: "payload_builder", + message = "FCU arrived too late or system clock are unsynced", + ?time, + ?now, + ); + return ( + self.config.flashblocks_per_block(), + self.config.specific.interval, + ); + }; + self.metrics + .flashblock_time_drift + .record(time_drift.as_millis() as f64); + debug!( + target: "payload_builder", + message = "Time drift for building round", + ?time, + ?time_drift, + ?timestamp + ); + // This is extra check to ensure that we would account at least for block time in case we have any timer discrepancies. + let time_drift = time_drift.min(self.config.block_time); + let interval = self.config.specific.interval.as_millis() as u64; + let time_drift = time_drift.as_millis() as u64; let first_flashblock_offset = time_drift.rem(interval); - let flashblocks_per_block = if first_flashblock_offset == 0 { - // In this case all flashblock are full and they all in division - time_drift.div(interval) + if first_flashblock_offset == 0 { + // We have perfect division, so we use interval as first fb offset + (time_drift.div(interval), Duration::from_millis(interval)) } else { - // If we have any reminder that mean the first flashblock won't be in division - // so we add it manually. - time_drift.div(interval) + 1 - }; - // We won't have any problems because of casting - ( - flashblocks_per_block as u64, - Duration::from_millis(first_flashblock_offset as u64), - ) + // Non-perfect division, so we account for it. + ( + time_drift.div(interval) + 1, + Duration::from_millis(first_flashblock_offset), + ) + } } } diff --git a/crates/op-rbuilder/src/tests/data_availability.rs b/crates/op-rbuilder/src/tests/data_availability.rs index ec37ed10e..ffee176fe 100644 --- a/crates/op-rbuilder/src/tests/data_availability.rs +++ b/crates/op-rbuilder/src/tests/data_availability.rs @@ -84,7 +84,7 @@ async fn block_fill(rbuilder: LocalInstance) -> eyre::Result<()> { .await?; let unfit_tx_3 = driver.create_transaction().send().await?; - let block = driver.build_new_block().await?; + let block = driver.build_new_block_with_current_timestamp(None).await?; if_standard! { // Now the first 2 txs will fit into the block diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index 44d8df21b..f285c709b 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -1,8 +1,7 @@ -use std::sync::Arc; - use futures::StreamExt; use macros::rb_test; use parking_lot::Mutex; +use std::{sync::Arc, time::Duration}; use tokio::task::JoinHandle; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; @@ -19,12 +18,81 @@ use crate::{ flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, - flashblocks_leeway_time: 0, - flashblocks_dynamic: false, + flashblocks_leeway_time: 100, + flashblocks_fixed: false, + }, + ..Default::default() +})] +async fn smoke_dynamic_base(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + driver.fund_default_accounts().await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + // We align out block timestamps with current unix timestamp + for _ in 0..10 { + for _ in 0..5 { + // send a valid transaction + let _ = driver + .create_transaction() + .random_valid_transfer() + .send() + .await?; + } + let block = driver.build_new_block_with_current_timestamp(None).await?; + assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 100, + flashblocks_fixed: false, }, ..Default::default() })] -async fn smoke(rbuilder: LocalInstance) -> eyre::Result<()> { +async fn smoke_dynamic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; driver.fund_default_accounts().await?; @@ -52,6 +120,7 @@ async fn smoke(rbuilder: LocalInstance) -> eyre::Result<()> { } }); + // We align out block timestamps with current unix timestamp for _ in 0..10 { for _ in 0..5 { // send a valid transaction @@ -61,10 +130,217 @@ async fn smoke(rbuilder: LocalInstance) -> eyre::Result<()> { .send() .await?; } + let block = driver.build_new_block_with_current_timestamp(None).await?; + assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 50, + flashblocks_fixed: true, + }, + ..Default::default() +})] +async fn smoke_classic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + driver.fund_default_accounts().await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + // We align out block timestamps with current unix timestamp + for _ in 0..10 { + for _ in 0..5 { + // send a valid transaction + let _ = driver + .create_transaction() + .random_valid_transfer() + .send() + .await?; + } let block = driver.build_new_block().await?; - assert_eq!(block.transactions.len(), 8); // 5 normal txn + deposit + 2 builder txn + assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 2000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 50, + flashblocks_fixed: true, + }, + ..Default::default() +})] +async fn smoke_classic_base(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + driver.fund_default_accounts().await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + // We align out block timestamps with current unix timestamp + for _ in 0..10 { + for _ in 0..5 { + // send a valid transaction + let _ = driver + .create_transaction() + .random_valid_transfer() + .send() + .await?; + } + let block = driver.build_new_block().await?; + assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 100, + flashblocks_fixed: false, + }, + ..Default::default() +})] +async fn unichain_dynamic_with_lag(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + driver.fund_default_accounts().await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + // We align out block timestamps with current unix timestamp + for i in 0..9 { + for _ in 0..5 { + // send a valid transaction + let _ = driver + .create_transaction() + .random_valid_transfer() + .send() + .await?; + } + let block = driver + .build_new_block_with_current_timestamp(Some(Duration::from_millis(i * 100))) + .await?; + assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -81,3 +357,70 @@ async fn smoke(rbuilder: LocalInstance) -> eyre::Result<()> { Ok(()) } + +#[rb_test(flashblocks, args = OpRbuilderArgs { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_leeway_time: 0, + flashblocks_fixed: false, + }, + ..Default::default() +})] +async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + driver.fund_default_accounts().await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + let flashblocks_ws_url = rbuilder.flashblocks_ws_url(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async(flashblocks_ws_url).await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + for _ in 0..5 { + // send a valid transaction + let _ = driver + .create_transaction() + .random_valid_transfer() + .send() + .await?; + } + let block = driver + .build_new_block_with_current_timestamp(Some(Duration::from_millis(999))) + .await?; + // We could only produce block with deposits + builder tx because of short time frame + assert_eq!(block.transactions.len(), 2); + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tests/framework/driver.rs b/crates/op-rbuilder/src/tests/framework/driver.rs index 5ee806da1..4d3393fb6 100644 --- a/crates/op-rbuilder/src/tests/framework/driver.rs +++ b/crates/op-rbuilder/src/tests/framework/driver.rs @@ -99,15 +99,24 @@ impl ChainDriver { self.build_new_block_with_txs(vec![]).await } - /// Builds a new block using the current state of the chain and the transactions in the pool with a list - /// of mandatory builder transactions. Those are usually deposit transactions. - pub async fn build_new_block_with_txs( + /// Builds a new block with block_timestamp calculated as block time right before sending FCU + pub async fn build_new_block_with_current_timestamp( + &self, + timestamp_jitter: Option, + ) -> eyre::Result> { + self.build_new_block_with_txs_timestamp(vec![], None, timestamp_jitter) + .await + } + + /// Builds a new block with provided txs and timestamp + pub async fn build_new_block_with_txs_timestamp( &self, txs: Vec, + block_timestamp: Option, + // Amount of time to lag before sending FCU. This tests late FCU scenarios + timestamp_jitter: Option, ) -> eyre::Result> { let latest = self.latest().await?; - let latest_timestamp = Duration::from_secs(latest.header.timestamp); - let block_timestamp = latest_timestamp + Self::MIN_BLOCK_TIME; // Add L1 block info as the first transaction in every L2 block // This deposit transaction contains L1 block metadata required by the L2 chain @@ -134,10 +143,35 @@ impl ChainDriver { signed_tx.encoded_2718().into() }; + let mut wait_until = None; + // If block_timestamp we need to produce new timestamp according to current clocks + let block_timestamp = if let Some(block_timestamp) = block_timestamp { + block_timestamp.as_secs() + } else { + // We take the following second, until which we will need to wait before issuing FCU + let latest_timestamp = (chrono::Utc::now().timestamp() + 1) as u64; + wait_until = Some(latest_timestamp); + latest_timestamp + + Duration::from_millis(self.args.chain_block_time) + .as_secs() + .max(Self::MIN_BLOCK_TIME.as_secs()) + }; + + // This step will alight time at which we send FCU. ideally we must send FCU and the beginning of the second. + if let Some(wait_until) = wait_until { + let sleep_time = Duration::from_secs(wait_until).saturating_sub(Duration::from_millis( + chrono::Utc::now().timestamp_millis() as u64, + )); + if let Some(timestamp_jitter) = timestamp_jitter { + tokio::time::sleep(sleep_time + timestamp_jitter).await; + } else { + tokio::time::sleep(sleep_time).await; + } + } let fcu_result = self .fcu(OpPayloadAttributes { payload_attributes: PayloadAttributes { - timestamp: block_timestamp.as_secs(), + timestamp: block_timestamp, parent_beacon_block_root: Some(B256::ZERO), withdrawals: Some(vec![]), ..Default::default() @@ -157,10 +191,19 @@ impl ChainDriver { .ok_or_else(|| eyre::eyre!("Forkchoice update did not return a payload ID"))?; // wait for the block to be built for the specified chain block time - tokio::time::sleep( - Duration::from_millis(self.args.chain_block_time).max(Self::MIN_BLOCK_TIME), - ) - .await; + if let Some(timestamp_jitter) = timestamp_jitter { + tokio::time::sleep( + Duration::from_millis(self.args.chain_block_time) + .max(Self::MIN_BLOCK_TIME) + .saturating_sub(timestamp_jitter), + ) + .await; + } else { + tokio::time::sleep( + Duration::from_millis(self.args.chain_block_time).max(Self::MIN_BLOCK_TIME), + ) + .await; + } let payload = OpExecutionPayloadEnvelope::V4(self.engine_api.get_payload(payload_id).await?); @@ -206,6 +249,20 @@ impl ChainDriver { Ok(block) } + /// Builds a new block using the current state of the chain and the transactions in the pool with a list + /// of mandatory builder transactions. Those are usually deposit transactions. + pub async fn build_new_block_with_txs( + &self, + txs: Vec, + ) -> eyre::Result> { + let latest = self.latest().await?; + let latest_timestamp = Duration::from_secs(latest.header.timestamp); + let block_timestamp = latest_timestamp + Self::MIN_BLOCK_TIME; + + self.build_new_block_with_txs_timestamp(txs, Some(block_timestamp), None) + .await + } + /// Retreives the latest built block and returns only a list of transaction /// hashes from its body. pub async fn latest(&self) -> eyre::Result> { diff --git a/crates/op-rbuilder/src/tests/framework/instance.rs b/crates/op-rbuilder/src/tests/framework/instance.rs index 08c04c910..0a9b68363 100644 --- a/crates/op-rbuilder/src/tests/framework/instance.rs +++ b/crates/op-rbuilder/src/tests/framework/instance.rs @@ -4,6 +4,7 @@ use crate::{ primitives::reth::engine_api_builder::OpEngineApiBuilder, revert_protection::{EthApiExtServer, EthApiOverrideServer, RevertProtectionExt}, tests::{ + create_test_db, framework::{driver::ChainDriver, BUILDER_PRIVATE_KEY}, ChainDriverExt, EngineApi, Ipc, TransactionPoolObserver, }, @@ -108,7 +109,8 @@ impl LocalInstance { .build(); let node_builder = NodeBuilder::<_, OpChainSpec>::new(config.clone()) - .testing_node(task_manager.executor()) + .with_database(create_test_db(config.clone())) + .with_launch_context(task_manager.executor()) .with_types::() .with_components( op_node diff --git a/crates/op-rbuilder/src/tests/framework/utils.rs b/crates/op-rbuilder/src/tests/framework/utils.rs index ceba8434f..e734f4d88 100644 --- a/crates/op-rbuilder/src/tests/framework/utils.rs +++ b/crates/op-rbuilder/src/tests/framework/utils.rs @@ -1,14 +1,22 @@ +use crate::{ + tests::{framework::driver::ChainDriver, Protocol, ONE_ETH}, + tx_signer::Signer, +}; use alloy_eips::Encodable2718; use alloy_primitives::{hex, Address, BlockHash, TxHash, TxKind, B256, U256}; use alloy_rpc_types_eth::{Block, BlockTransactionHashes}; use core::future::Future; use op_alloy_consensus::{OpTypedTransaction, TxDeposit}; use op_alloy_rpc_types::Transaction; - -use crate::{ - tests::{framework::driver::ChainDriver, Protocol, ONE_ETH}, - tx_signer::Signer, +use reth_db::{ + init_db, + mdbx::{DatabaseArguments, MaxReadTransactionDuration, KILOBYTE, MEGABYTE}, + test_utils::{TempDatabase, ERROR_DB_CREATION}, + ClientVersion, DatabaseEnv, }; +use reth_node_core::{args::DatadirArgs, dirs::DataDirPath, node_config::NodeConfig}; +use reth_optimism_chainspec::OpChainSpec; +use std::sync::Arc; use super::{TransactionBuilder, FUNDED_PRIVATE_KEYS}; @@ -191,3 +199,24 @@ impl AsTxs for Vec { self.clone() } } + +pub fn create_test_db(config: NodeConfig) -> Arc> { + let path = reth_node_core::dirs::MaybePlatformPath::::from( + reth_db::test_utils::tempdir_path(), + ); + let db_config = config.with_datadir_args(DatadirArgs { + datadir: path.clone(), + ..Default::default() + }); + let data_dir = path.unwrap_or_chain_default(db_config.chain.chain(), db_config.datadir.clone()); + let path = data_dir.db(); + let db = init_db( + path.as_path(), + DatabaseArguments::new(ClientVersion::default()) + .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)) + .with_geometry_max_size(Some(4 * MEGABYTE)) + .with_growth_step(Some(4 * KILOBYTE)), + ) + .expect(ERROR_DB_CREATION); + Arc::new(TempDatabase::new(db, path)) +} diff --git a/crates/op-rbuilder/src/tests/revert.rs b/crates/op-rbuilder/src/tests/revert.rs index 404c1790f..cbc315f90 100644 --- a/crates/op-rbuilder/src/tests/revert.rs +++ b/crates/op-rbuilder/src/tests/revert.rs @@ -42,7 +42,7 @@ async fn monitor_transaction_gc(rbuilder: LocalInstance) -> eyre::Result<()> { // generate 10 blocks for i in 0..10 { - let generated_block = driver.build_new_block().await?; + let generated_block = driver.build_new_block_with_current_timestamp(None).await?; if_standard! { // standard builder blocks should only include two transactions (deposit + builder) @@ -50,7 +50,7 @@ async fn monitor_transaction_gc(rbuilder: LocalInstance) -> eyre::Result<()> { } if_flashblocks! { - // flashblocks should include three transactions (deposit + builder + first flashblock) + // flashblocks should include three transactions (deposit + 2 builder txs) assert_eq!(generated_block.transactions.len(), 3); } diff --git a/crates/op-rbuilder/src/tests/smoke.rs b/crates/op-rbuilder/src/tests/smoke.rs index d92a7c659..6439bd887 100644 --- a/crates/op-rbuilder/src/tests/smoke.rs +++ b/crates/op-rbuilder/src/tests/smoke.rs @@ -29,7 +29,7 @@ async fn chain_produces_blocks(rbuilder: LocalInstance) -> eyre::Result<()> { // no user transactions are sent. // the deposit transaction and the block generator's transaction for _ in 0..SAMPLE_SIZE { - let block = driver.build_new_block().await?; + let block = driver.build_new_block_with_current_timestamp(None).await?; let transactions = block.transactions; if_standard! { @@ -67,7 +67,7 @@ async fn chain_produces_blocks(rbuilder: LocalInstance) -> eyre::Result<()> { tx_hashes.insert(*tx.tx_hash()); } - let block = driver.build_new_block().await?; + let block = driver.build_new_block_with_current_timestamp(None).await?; let txs = block.transactions; @@ -149,7 +149,7 @@ async fn produces_blocks_under_load_within_deadline(rbuilder: LocalInstance) -> let block = tokio::time::timeout( Duration::from_secs(rbuilder.args().chain_block_time) + Duration::from_millis(500), - driver.build_new_block(), + driver.build_new_block_with_current_timestamp(None), ) .await .expect("Timeout while waiting for block production")