From 3628208402b3fb5590e10956df7edde26a63464c Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Thu, 5 Jun 2025 16:53:02 +0500 Subject: [PATCH 1/8] quick bugfix Account for dynamic lag in the beginning of building process --- .../src/builders/flashblocks/payload.rs | 141 +++++++++++++----- crates/op-rbuilder/src/builders/mod.rs | 2 +- crates/op-rbuilder/src/metrics.rs | 6 + 3 files changed, 114 insertions(+), 35 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index a7b0f6c21..2c41ae0f2 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -1,6 +1,3 @@ -use core::time::Duration; -use std::{sync::Arc, time::Instant}; - use super::{config::FlashblocksConfig, wspub::WebSocketPublisher}; use crate::{ builders::{ @@ -18,6 +15,7 @@ use alloy_consensus::{ }; use alloy_eips::{eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE, Encodable2718}; use alloy_primitives::{map::foldhash::HashMap, Address, B256, U256}; +use core::time::Duration; use reth::payload::PayloadBuilderAttributes; use reth_basic_payload_builder::BuildOutcome; use reth_evm::{execute::BlockBuilder, ConfigureEvm}; @@ -42,6 +40,11 @@ use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, }; use serde::{Deserialize, Serialize}; +use std::{ + ops::{Div, Rem}, + sync::Arc, + time::Instant, +}; use tokio::sync::mpsc; use tracing::{debug, error, metadata::Level, span, warn}; @@ -152,6 +155,22 @@ where let chain_spec = self.client.chain_spec(); let timestamp = config.attributes.timestamp(); + // We use this system time to determine remining time to build a block + // Things to consider: + // FCU(a) - FCU with attributes, block time 2 seconds + // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock + // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock + // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks + let time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) + - Duration::from_millis(50); + let time_drift = time.duration_since(std::time::SystemTime::now()).ok(); + match time_drift { + None => error!("FCU arrived too late or system clock are unsynced"), + Some(time_drift) => self + .metrics + .flashblock_time_drift + .record(time_drift.as_millis() as f64), + } let block_env_attributes = OpNextBlockEnvAttributes { timestamp, suggested_fee_recipient: config.attributes.suggested_fee_recipient(), @@ -202,7 +221,6 @@ where .build(); // We subtract gas limit and da limit for builder transaction from the whole limit - // TODO: we could optimise this and subtract this only for the last flashblocks let message = format!("Block Number: {}", ctx.block_number()).into_bytes(); let builder_tx_gas = ctx .builder_signer() @@ -239,19 +257,51 @@ where "No transaction pool, skipping transaction pool processing", ); - self.metrics + ctx.metrics .total_block_built_duration .record(block_build_start_time.elapsed()); // return early since we don't need to build a block with transactions from the pool return Ok(()); } - let gas_per_batch = ctx.block_gas_limit() / self.config.flashblocks_per_block(); + // We adjust our flashblocks timings based on time_drift + let (flashblocks_per_block, first_flashblock_offset) = match time_drift { + // Nothing we could do, assuming we have system time problem and praying + None => ( + self.config.flashblocks_per_block(), + self.config.specific.interval, + ), + Some(time_drift) => { + let interval = self.config.specific.interval.as_millis(); + let time_drift = time_drift.as_millis(); + let first_flashblock_offset = time_drift.rem(interval); + let flashblocks_per_block = if first_flashblock_offset == 0 { + // In this case all flashblock are full and they all in division + time_drift.div(interval) + } else { + // If we have any reminder that mean the first flashblock won't be in division + // so we add it manually. + time_drift.div(interval) + 1 + }; + // We won't have any problems because of casting + ( + flashblocks_per_block as u64, + Duration::from_millis(first_flashblock_offset as u64), + ) + } + }; + ctx.metrics + .target_flashblock + .record(flashblocks_per_block as f64); + ctx.metrics + .first_flashblock_time_offset + .record(first_flashblock_offset.as_millis() as f64); + let gas_per_batch = ctx.block_gas_limit() / flashblocks_per_block; let mut total_gas_per_batch = gas_per_batch; let da_per_batch = ctx .da_config .max_da_block_size() - .map(|da_limit| da_limit / self.config.flashblocks_per_block()); + .map(|da_limit| da_limit / flashblocks_per_block); // Check that builder tx won't affect fb limit too much if let Some(da_limit) = da_per_batch { // We error if we can't insert any tx aside from builder tx in flashblock @@ -261,7 +311,9 @@ where } let mut total_da_per_batch = da_per_batch; - let last_flashblock = self.config.flashblocks_per_block().saturating_sub(1); + // TODO: we should account for a case when we will issue only 1 flashblock + let last_flashblock = flashblocks_per_block.saturating_sub(1); + let mut flashblock_count = 0; // Create a channel to coordinate flashblock building let (build_tx, mut build_rx) = mpsc::channel(1); @@ -270,25 +322,47 @@ where let cancel_clone = ctx.cancel.clone(); let interval = self.config.specific.interval; tokio::spawn(async move { - let mut interval = tokio::time::interval(interval); - loop { - tokio::select! { - // Add a cancellation check that only runs every 10ms to avoid tight polling - _ = tokio::time::sleep(Duration::from_millis(10)) => { - if cancel_clone.is_cancelled() { - tracing::info!(target: "payload_builder", "Job cancelled during sleep, stopping payload building"); - drop(build_tx); - break; - } - } - _ = interval.tick() => { + // We handle first flashblock separately, because it could be shrunk to fit everything + let mut first_interval = tokio::time::interval(first_flashblock_offset); + // If first_flashblock_offset == 0 that means all flashblock are proper, and we just + // skip this cusom logic + if first_flashblock_offset.as_millis() != 0 { + let cancelled = cancel_clone + .run_until_cancelled(async { + // We send first signal straight away and then wait first_interval duration + // to send second signal and start building normal blocks + for _ in 0..2 { + first_interval.tick().await; + // TODO: maybe return None if cancelled if let Err(err) = build_tx.send(()).await { error!(target: "payload_builder", "Error sending build signal: {}", err); - break; } } + }) + .await; + if cancelled.is_none() { + tracing::info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + drop(build_tx); + return; } } + // Handle rest of fbs in steady rate + let mut interval = tokio::time::interval(interval); + let cancelled = cancel_clone.run_until_cancelled(async { + // First tick completes immediately + interval.tick().await; + loop { + interval.tick().await; + if let Err(err) = build_tx.send(()).await { + error!(target: "payload_builder", "Error sending build signal: {}", err); + break; + } + } + }).await; + if cancelled.is_none() { + tracing::info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + drop(build_tx); + } }); // Process flashblocks in a blocking loop @@ -327,10 +401,10 @@ where // Exit loop if channel closed or cancelled match received { Some(()) => { - if flashblock_count >= self.config.flashblocks_per_block() { + if flashblock_count >= flashblocks_per_block { tracing::info!( target: "payload_builder", - target = self.config.flashblocks_per_block(), + target = flashblocks_per_block, flashblock_count = flashblock_count, block_number = ctx.block_number(), "Skipping flashblock reached target", @@ -381,18 +455,14 @@ where .record(best_txs_start_time.elapsed()); let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( + if let Some(()) = ctx.execute_best_transactions( &mut info, &mut db, best_txs, total_gas_per_batch.min(ctx.block_gas_limit()), total_da_per_batch, - )?; - ctx.metrics - .payload_tx_simulation_duration - .record(tx_execution_start_time.elapsed()); - - if ctx.cancel.is_cancelled() { + )? { + // Handles job cancellation tracing::info!( target: "payload_builder", "Job cancelled, stopping payload building", @@ -400,6 +470,9 @@ where // if the job was cancelled, stop return Ok(()); } + ctx.metrics + .payload_tx_simulation_duration + .record(tx_execution_start_time.elapsed()); // TODO: temporary we add builder tx to the first flashblock too invoke_on_first_flashblock(flashblock_count, || { @@ -421,7 +494,7 @@ where match build_result { Err(err) => { // Track invalid/bad block - self.metrics.invalid_blocks_count.increment(1); + ctx.metrics.invalid_blocks_count.increment(1); error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), flashblock_count, err); // Return the error return Err(err); @@ -435,7 +508,7 @@ where .map_err(PayloadBuilderError::other)?; // Record flashblock build duration - self.metrics + ctx.metrics .flashblock_build_duration .record(flashblock_build_start_time.elapsed()); ctx.metrics @@ -469,8 +542,8 @@ where } None => { // Exit loop if channel closed or cancelled - self.metrics.block_built_success.increment(1); - self.metrics + ctx.metrics.block_built_success.increment(1); + ctx.metrics .flashblock_count .record(flashblock_count as f64); debug!( diff --git a/crates/op-rbuilder/src/builders/mod.rs b/crates/op-rbuilder/src/builders/mod.rs index ce0974e33..de9291ced 100644 --- a/crates/op-rbuilder/src/builders/mod.rs +++ b/crates/op-rbuilder/src/builders/mod.rs @@ -74,7 +74,7 @@ pub struct BuilderConfig { pub revert_protection: bool, /// The interval at which blocks are added to the chain. - /// This is also the frequency at which the builder will be receiving FCU rquests from the + /// This is also the frequency at which the builder will be receiving FCU requests from the /// sequencer. pub block_time: Duration, diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 32dbd3b52..d3ed291ed 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -76,6 +76,12 @@ pub struct OpRBuilderMetrics { pub da_block_size_limit: Gauge, /// Da tx size limit pub da_tx_size_limit: Gauge, + /// Desired number of flashblocks + pub target_flashblock: Histogram, + /// Time drift that we account for in the beginning of block building + pub flashblock_time_drift: Histogram, + /// Time offset we used for first flashblock + pub first_flashblock_time_offset: Histogram, /// Number of valid bundles received at the eth_sendBundle endpoint pub bundles_received: Counter, /// Number of reverted bundles From 08484f053821fdccfbb111e05a02583dfef7d7dd Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 11 Jun 2025 17:39:25 +0500 Subject: [PATCH 2/8] More improvements --- .../src/builders/flashblocks/payload.rs | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 2c41ae0f2..cf2341c63 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -46,7 +46,7 @@ use std::{ time::Instant, }; use tokio::sync::mpsc; -use tracing::{debug, error, metadata::Level, span, warn}; +use tracing::{debug, error, info, metadata::Level, span, warn}; #[derive(Debug, Default)] struct ExtraExecutionInfo { @@ -147,7 +147,7 @@ where } else { tracing::Span::none() }; - let _enter = span.enter(); + let _entered = span.enter(); span.record( "payload_id", config.attributes.payload_attributes.id.to_string(), @@ -165,11 +165,24 @@ where - Duration::from_millis(50); let time_drift = time.duration_since(std::time::SystemTime::now()).ok(); match time_drift { - None => error!("FCU arrived too late or system clock are unsynced"), - Some(time_drift) => self - .metrics - .flashblock_time_drift - .record(time_drift.as_millis() as f64), + None => error!( + target: "payload_builder" + message = "FCU arrived too late or system clock are unsynced", + ?time, + ), + Some(time_drift) => { + self + .metrics + .flashblock_time_drift + .record(time_drift.as_millis() as f64); + debug!( + target: "payload_builder", + message = "Time drift for building round", + ?time, + ?time_drift, + ?timestamp + ); + } } let block_env_attributes = OpNextBlockEnvAttributes { timestamp, @@ -241,7 +254,7 @@ where .publish(&fb_payload) .map_err(PayloadBuilderError::other)?; - tracing::info!( + info!( target: "payload_builder", message = "Fallback block built", payload_id = fb_payload.payload_id.to_string(), @@ -252,7 +265,7 @@ where .record(info.executed_transactions.len() as f64); if ctx.attributes().no_tx_pool { - tracing::info!( + info!( target: "payload_builder", "No transaction pool, skipping transaction pool processing", ); @@ -290,6 +303,13 @@ where ) } }; + info!( + target: "payload_builder", + message = "Performed flashblocks timing derivation", + flashblocks_per_block, + first_flashblock_offset = first_flashblock_offset.as_millis(), + flashblocks_interval = self.config.specific.interval, + ); ctx.metrics .target_flashblock .record(flashblocks_per_block as f64); @@ -341,7 +361,7 @@ where }) .await; if cancelled.is_none() { - tracing::info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + info!(target: "payload_builder", "Building job cancelled, stopping payload building"); drop(build_tx); return; } @@ -360,7 +380,7 @@ where } }).await; if cancelled.is_none() { - tracing::info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + info!(target: "payload_builder", "Building job cancelled, stopping payload building"); drop(build_tx); } }); @@ -386,7 +406,7 @@ where rt.block_on(async { // Check for cancellation first if ctx.cancel.is_cancelled() { - tracing::info!( + info!( target: "payload_builder", "Job cancelled, stopping payload building", ); @@ -402,7 +422,7 @@ where match received { Some(()) => { if flashblock_count >= flashblocks_per_block { - tracing::info!( + info!( target: "payload_builder", target = flashblocks_per_block, flashblock_count = flashblock_count, @@ -413,7 +433,7 @@ where } // Continue with flashblock building - tracing::info!( + info!( target: "payload_builder", block_number = ctx.block_number(), flashblock_count = flashblock_count, @@ -463,7 +483,7 @@ where total_da_per_batch, )? { // Handles job cancellation - tracing::info!( + info!( target: "payload_builder", "Job cancelled, stopping payload building", ); @@ -530,12 +550,13 @@ where } } flashblock_count += 1; - tracing::info!( + info!( target: "payload_builder", message = "Flashblock built", ?flashblock_count, current_gas = info.cumulative_gas_used, current_da = info.cumulative_da_bytes_used, + target_flashblocks = flashblocks_per_block, ); } } From 051a7bded8dea3ac90e1eeb395b3ee4c170758bc Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 11 Jun 2025 18:41:07 +0500 Subject: [PATCH 3/8] More improvements --- crates/op-rbuilder/src/args/op.rs | 15 +++++++++++++++ .../src/builders/flashblocks/config.rs | 9 ++++++++- .../src/builders/flashblocks/payload.rs | 11 ++++++----- crates/op-rbuilder/src/builders/mod.rs | 5 +++++ 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 4cd9095a1..2c917e6f0 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -115,6 +115,17 @@ pub struct FlashblocksArgs { env = "FLASHBLOCK_BLOCK_TIME" )] pub flashblocks_block_time: u64, + + /// Time by which blocks would be completed earlier in milliseconds. + /// + /// This time used to account for latencies, this time would be deducted from total block + /// building time before calculating number of fbs. + #[arg( + long = "flashblocks.leeway-time", + default_value = "50", + env = "FLASHBLOCK_LEEWAY_TIME" + )] + pub flashblocks_leeway_time: u64, } impl Default for FlashblocksArgs { @@ -137,4 +148,8 @@ pub struct TelemetryArgs { /// OpenTelemetry headers for authentication #[arg(long = "telemetry.otlp-headers", env = "OTEL_EXPORTER_OTLP_HEADERS")] pub otlp_headers: Option, + + /// Inverted sampling frequency in blocks. 1 - each block, 100 - every 100th block. + #[arg(long = "telemetry.sampling-ratio", env = "SAMPLING_RATIO", default_value = "100")] + pub sampling_ratio: u64 } diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 1c2af41b2..a459aaad3 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -15,6 +15,10 @@ pub struct FlashblocksConfig { /// Each block will contain one or more flashblocks. On average, the number of flashblocks /// per block is equal to the block time divided by the flashblock interval. pub interval: Duration, + + /// How much time would be deducted from block build time to account for latencies in + /// milliseconds + pub leeway_time: Duration, } impl Default for FlashblocksConfig { @@ -22,6 +26,7 @@ impl Default for FlashblocksConfig { Self { ws_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 1111), interval: Duration::from_millis(250), + leeway_time: Duration::from_millis(50), } } } @@ -36,7 +41,9 @@ impl TryFrom for FlashblocksConfig { args.flashblocks.flashblocks_addr.parse()?, args.flashblocks.flashblocks_port, ); - Ok(Self { ws_addr, interval }) + + let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); + Ok(Self { ws_addr, interval, leeway_time }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index cf2341c63..b6db6ccd4 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -142,7 +142,8 @@ where let BuildArguments { config, cancel, .. } = args; // We log only every 100th block to reduce usage - let span = if cfg!(feature = "telemetry") && config.parent_header.number % 100 == 0 { + let span = if cfg!(feature = "telemetry") + && config.parent_header.number % self.config.sampling_ratio == 0 { span!(Level::INFO, "build_payload") } else { tracing::Span::none() @@ -157,16 +158,16 @@ where let timestamp = config.attributes.timestamp(); // We use this system time to determine remining time to build a block // Things to consider: - // FCU(a) - FCU with attributes, block time 2 seconds + // FCU(a) - FCU with attributes // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks let time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) - - Duration::from_millis(50); + - self.config.specific.leeway_time; let time_drift = time.duration_since(std::time::SystemTime::now()).ok(); match time_drift { None => error!( - target: "payload_builder" + target: "payload_builder", message = "FCU arrived too late or system clock are unsynced", ?time, ), @@ -308,7 +309,7 @@ where message = "Performed flashblocks timing derivation", flashblocks_per_block, first_flashblock_offset = first_flashblock_offset.as_millis(), - flashblocks_interval = self.config.specific.interval, + flashblocks_interval = self.config.specific.interval.as_millis(), ); ctx.metrics .target_flashblock diff --git a/crates/op-rbuilder/src/builders/mod.rs b/crates/op-rbuilder/src/builders/mod.rs index de9291ced..926fd8421 100644 --- a/crates/op-rbuilder/src/builders/mod.rs +++ b/crates/op-rbuilder/src/builders/mod.rs @@ -100,6 +100,9 @@ pub struct BuilderConfig { // (not just 0.5s) because of that. pub block_time_leeway: Duration, + /// Inverted sampling frequency in blocks. 1 - each block, 100 - every 100th block. + pub sampling_ratio: u64, + /// Configuration values that are specific to the block builder implementation used. pub specific: Specific, } @@ -132,6 +135,7 @@ impl Default for BuilderConfig { block_time_leeway: Duration::from_millis(500), da_config: OpDAConfig::default(), specific: S::default(), + sampling_ratio: 100, } } } @@ -149,6 +153,7 @@ where block_time: Duration::from_millis(args.chain_block_time), block_time_leeway: Duration::from_secs(args.extra_block_deadline_secs), da_config: Default::default(), + sampling_ratio: args.telemetry.sampling_ratio, specific: S::try_from(args)?, }) } From 827fe1338cc1cd0c930dc61fa75a8bb2ad77dc97 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 11 Jun 2025 18:42:04 +0500 Subject: [PATCH 4/8] More improvements --- crates/op-rbuilder/src/args/op.rs | 8 +++-- .../src/builders/flashblocks/config.rs | 6 +++- .../src/builders/flashblocks/payload.rs | 30 +++++++++---------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 2c917e6f0..d84101f23 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -150,6 +150,10 @@ pub struct TelemetryArgs { pub otlp_headers: Option, /// Inverted sampling frequency in blocks. 1 - each block, 100 - every 100th block. - #[arg(long = "telemetry.sampling-ratio", env = "SAMPLING_RATIO", default_value = "100")] - pub sampling_ratio: u64 + #[arg( + long = "telemetry.sampling-ratio", + env = "SAMPLING_RATIO", + default_value = "100" + )] + pub sampling_ratio: u64, } diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index a459aaad3..bbed8c7ac 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -43,7 +43,11 @@ impl TryFrom for FlashblocksConfig { ); let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); - Ok(Self { ws_addr, interval, leeway_time }) + Ok(Self { + ws_addr, + interval, + leeway_time, + }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index b6db6ccd4..aa68fa88e 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -143,7 +143,8 @@ where // We log only every 100th block to reduce usage let span = if cfg!(feature = "telemetry") - && config.parent_header.number % self.config.sampling_ratio == 0 { + && config.parent_header.number % self.config.sampling_ratio == 0 + { span!(Level::INFO, "build_payload") } else { tracing::Span::none() @@ -172,18 +173,17 @@ where ?time, ), Some(time_drift) => { - self - .metrics - .flashblock_time_drift - .record(time_drift.as_millis() as f64); - debug!( - target: "payload_builder", - message = "Time drift for building round", - ?time, - ?time_drift, - ?timestamp - ); - } + self.metrics + .flashblock_time_drift + .record(time_drift.as_millis() as f64); + debug!( + target: "payload_builder", + message = "Time drift for building round", + ?time, + ?time_drift, + ?timestamp + ); + } } let block_env_attributes = OpNextBlockEnvAttributes { timestamp, @@ -565,9 +565,7 @@ where None => { // Exit loop if channel closed or cancelled ctx.metrics.block_built_success.increment(1); - ctx.metrics - .flashblock_count - .record(flashblock_count as f64); + ctx.metrics.flashblock_count.record(flashblock_count as f64); debug!( target: "payload_builder", message = "Payload building complete, channel closed or job cancelled" From ee3ae8468cff397142ed9c4ff456fcbcc66cfff4 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 11 Jun 2025 18:50:05 +0500 Subject: [PATCH 5/8] More improvements --- crates/op-rbuilder/src/tests/flashblocks/smoke.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs index 5a98b5d0d..932d95420 100644 --- a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs +++ b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs @@ -21,6 +21,7 @@ async fn chain_produces_blocks() -> eyre::Result<()> { flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, + flashblocks_leeway_time: 0, }, ..Default::default() }) From 896046feaca6f8c68d7b18ebd3d8f729486b0796 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 17 Jun 2025 18:11:37 +0500 Subject: [PATCH 6/8] Wrap everything into the config --- crates/op-rbuilder/src/args/op.rs | 9 +++ .../src/builders/flashblocks/config.rs | 8 +++ .../src/builders/flashblocks/payload.rs | 57 ++++++++++--------- .../src/tests/flashblocks/smoke.rs | 1 + 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index d84101f23..fa7911892 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -116,6 +116,15 @@ pub struct FlashblocksArgs { )] pub flashblocks_block_time: u64, + /// Enabled dynamic flashblocks adjustment. This will allow account for late FCUs and produce + /// less flashblocks, while each flashblock would be bigger. + #[arg( + long = "flashblocks.dynamic", + default_value = "false", + env = "FLASHBLOCK_DYNAMIC" + )] + pub flashblocks_dynamic: bool, + /// Time by which blocks would be completed earlier in milliseconds. /// /// This time used to account for latencies, this time would be deducted from total block diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index bbed8c7ac..0229a2b32 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -19,6 +19,9 @@ pub struct FlashblocksConfig { /// How much time would be deducted from block build time to account for latencies in /// milliseconds pub leeway_time: Duration, + + /// Enables dynamic flashblocks number based on FCU arrival time + pub dynamic_adjustment: bool, } impl Default for FlashblocksConfig { @@ -27,6 +30,7 @@ impl Default for FlashblocksConfig { ws_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 1111), interval: Duration::from_millis(250), leeway_time: Duration::from_millis(50), + dynamic_adjustment: false, } } } @@ -43,10 +47,14 @@ impl TryFrom for FlashblocksConfig { ); let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); + + let dynamic_adjustment = args.flashblocks.flashblocks_dynamic; + Ok(Self { ws_addr, interval, leeway_time, + dynamic_adjustment, }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index aa68fa88e..c09610a19 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -278,32 +278,9 @@ where // return early since we don't need to build a block with transactions from the pool return Ok(()); } - // We adjust our flashblocks timings based on time_drift - let (flashblocks_per_block, first_flashblock_offset) = match time_drift { - // Nothing we could do, assuming we have system time problem and praying - None => ( - self.config.flashblocks_per_block(), - self.config.specific.interval, - ), - Some(time_drift) => { - let interval = self.config.specific.interval.as_millis(); - let time_drift = time_drift.as_millis(); - let first_flashblock_offset = time_drift.rem(interval); - let flashblocks_per_block = if first_flashblock_offset == 0 { - // In this case all flashblock are full and they all in division - time_drift.div(interval) - } else { - // If we have any reminder that mean the first flashblock won't be in division - // so we add it manually. - time_drift.div(interval) + 1 - }; - // We won't have any problems because of casting - ( - flashblocks_per_block as u64, - Duration::from_millis(first_flashblock_offset as u64), - ) - } - }; + // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable + let (flashblocks_per_block, first_flashblock_offset) = + self.calculate_flashblocks(time_drift); info!( target: "payload_builder", message = "Performed flashblocks timing derivation", @@ -576,6 +553,34 @@ where } } } + + /// Calculate number of flashblocks. + /// If dynamic is enabled this function will take time drift into the account. + pub fn calculate_flashblocks(&self, time_drift: Option) -> (u64, Duration) { + if !self.config.specific.dynamic_adjustment || time_drift.is_none() { + return ( + self.config.flashblocks_per_block(), + self.config.specific.interval, + ); + } + let time_drift = time_drift.unwrap(); + let interval = self.config.specific.interval.as_millis(); + let time_drift = time_drift.as_millis(); + let first_flashblock_offset = time_drift.rem(interval); + let flashblocks_per_block = if first_flashblock_offset == 0 { + // In this case all flashblock are full and they all in division + time_drift.div(interval) + } else { + // If we have any reminder that mean the first flashblock won't be in division + // so we add it manually. + time_drift.div(interval) + 1 + }; + // We won't have any problems because of casting + ( + flashblocks_per_block as u64, + Duration::from_millis(first_flashblock_offset as u64), + ) + } } impl crate::builders::generator::PayloadBuilder for OpPayloadBuilder diff --git a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs index 932d95420..339d332bd 100644 --- a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs +++ b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs @@ -22,6 +22,7 @@ async fn chain_produces_blocks() -> eyre::Result<()> { flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, flashblocks_leeway_time: 0, + flashblocks_dynamic: false, }, ..Default::default() }) From 8445e0850778d3745128801a74d1e672ddd73981 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 17 Jun 2025 18:57:52 +0500 Subject: [PATCH 7/8] Review --- crates/op-rbuilder/src/builders/flashblocks/payload.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index c09610a19..0cbdba7b2 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -453,13 +453,13 @@ where .record(best_txs_start_time.elapsed()); let tx_execution_start_time = Instant::now(); - if let Some(()) = ctx.execute_best_transactions( + if ctx.execute_best_transactions( &mut info, &mut db, best_txs, total_gas_per_batch.min(ctx.block_gas_limit()), total_da_per_batch, - )? { + )?.is_some() { // Handles job cancellation info!( target: "payload_builder", From 27b1adb2c36244f989501984c597e5015d003edb Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 17 Jun 2025 18:59:07 +0500 Subject: [PATCH 8/8] Review --- .../src/builders/flashblocks/payload.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 0cbdba7b2..256d2756c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -453,18 +453,24 @@ where .record(best_txs_start_time.elapsed()); let tx_execution_start_time = Instant::now(); - if ctx.execute_best_transactions( - &mut info, - &mut db, - best_txs, - total_gas_per_batch.min(ctx.block_gas_limit()), - total_da_per_batch, - )?.is_some() { + if ctx + .execute_best_transactions( + &mut info, + &mut db, + best_txs, + total_gas_per_batch.min(ctx.block_gas_limit()), + total_da_per_batch, + )? + .is_some() + { // Handles job cancellation info!( target: "payload_builder", "Job cancelled, stopping payload building", ); + ctx.metrics + .payload_tx_simulation_duration + .record(tx_execution_start_time.elapsed()); // if the job was cancelled, stop return Ok(()); }