diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 4cd9095a1..fa7911892 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -115,6 +115,26 @@ pub struct FlashblocksArgs { env = "FLASHBLOCK_BLOCK_TIME" )] pub flashblocks_block_time: u64, + + /// Enabled dynamic flashblocks adjustment. This will allow account for late FCUs and produce + /// less flashblocks, while each flashblock would be bigger. + #[arg( + long = "flashblocks.dynamic", + default_value = "false", + env = "FLASHBLOCK_DYNAMIC" + )] + pub flashblocks_dynamic: bool, + + /// Time by which blocks would be completed earlier in milliseconds. + /// + /// This time used to account for latencies, this time would be deducted from total block + /// building time before calculating number of fbs. + #[arg( + long = "flashblocks.leeway-time", + default_value = "50", + env = "FLASHBLOCK_LEEWAY_TIME" + )] + pub flashblocks_leeway_time: u64, } impl Default for FlashblocksArgs { @@ -137,4 +157,12 @@ pub struct TelemetryArgs { /// OpenTelemetry headers for authentication #[arg(long = "telemetry.otlp-headers", env = "OTEL_EXPORTER_OTLP_HEADERS")] pub otlp_headers: Option, + + /// Inverted sampling frequency in blocks. 1 - each block, 100 - every 100th block. + #[arg( + long = "telemetry.sampling-ratio", + env = "SAMPLING_RATIO", + default_value = "100" + )] + pub sampling_ratio: u64, } diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 1c2af41b2..0229a2b32 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -15,6 +15,13 @@ pub struct FlashblocksConfig { /// Each block will contain one or more flashblocks. On average, the number of flashblocks /// per block is equal to the block time divided by the flashblock interval. pub interval: Duration, + + /// How much time would be deducted from block build time to account for latencies in + /// milliseconds + pub leeway_time: Duration, + + /// Enables dynamic flashblocks number based on FCU arrival time + pub dynamic_adjustment: bool, } impl Default for FlashblocksConfig { @@ -22,6 +29,8 @@ impl Default for FlashblocksConfig { Self { ws_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 1111), interval: Duration::from_millis(250), + leeway_time: Duration::from_millis(50), + dynamic_adjustment: false, } } } @@ -36,7 +45,17 @@ impl TryFrom for FlashblocksConfig { args.flashblocks.flashblocks_addr.parse()?, args.flashblocks.flashblocks_port, ); - Ok(Self { ws_addr, interval }) + + let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); + + let dynamic_adjustment = args.flashblocks.flashblocks_dynamic; + + Ok(Self { + ws_addr, + interval, + leeway_time, + dynamic_adjustment, + }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index a7b0f6c21..256d2756c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -1,6 +1,3 @@ -use core::time::Duration; -use std::{sync::Arc, time::Instant}; - use super::{config::FlashblocksConfig, wspub::WebSocketPublisher}; use crate::{ builders::{ @@ -18,6 +15,7 @@ use alloy_consensus::{ }; use alloy_eips::{eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE, Encodable2718}; use alloy_primitives::{map::foldhash::HashMap, Address, B256, U256}; +use core::time::Duration; use reth::payload::PayloadBuilderAttributes; use reth_basic_payload_builder::BuildOutcome; use reth_evm::{execute::BlockBuilder, ConfigureEvm}; @@ -42,8 +40,13 @@ use rollup_boost::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1, }; use serde::{Deserialize, Serialize}; +use std::{ + ops::{Div, Rem}, + sync::Arc, + time::Instant, +}; use tokio::sync::mpsc; -use tracing::{debug, error, metadata::Level, span, warn}; +use tracing::{debug, error, info, metadata::Level, span, warn}; #[derive(Debug, Default)] struct ExtraExecutionInfo { @@ -139,12 +142,14 @@ where let BuildArguments { config, cancel, .. } = args; // We log only every 100th block to reduce usage - let span = if cfg!(feature = "telemetry") && config.parent_header.number % 100 == 0 { + let span = if cfg!(feature = "telemetry") + && config.parent_header.number % self.config.sampling_ratio == 0 + { span!(Level::INFO, "build_payload") } else { tracing::Span::none() }; - let _enter = span.enter(); + let _entered = span.enter(); span.record( "payload_id", config.attributes.payload_attributes.id.to_string(), @@ -152,6 +157,34 @@ where let chain_spec = self.client.chain_spec(); let timestamp = config.attributes.timestamp(); + // We use this system time to determine remining time to build a block + // Things to consider: + // FCU(a) - FCU with attributes + // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock + // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock + // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks + let time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) + - self.config.specific.leeway_time; + let time_drift = time.duration_since(std::time::SystemTime::now()).ok(); + match time_drift { + None => error!( + target: "payload_builder", + message = "FCU arrived too late or system clock are unsynced", + ?time, + ), + Some(time_drift) => { + self.metrics + .flashblock_time_drift + .record(time_drift.as_millis() as f64); + debug!( + target: "payload_builder", + message = "Time drift for building round", + ?time, + ?time_drift, + ?timestamp + ); + } + } let block_env_attributes = OpNextBlockEnvAttributes { timestamp, suggested_fee_recipient: config.attributes.suggested_fee_recipient(), @@ -202,7 +235,6 @@ where .build(); // We subtract gas limit and da limit for builder transaction from the whole limit - // TODO: we could optimise this and subtract this only for the last flashblocks let message = format!("Block Number: {}", ctx.block_number()).into_bytes(); let builder_tx_gas = ctx .builder_signer() @@ -223,7 +255,7 @@ where .publish(&fb_payload) .map_err(PayloadBuilderError::other)?; - tracing::info!( + info!( target: "payload_builder", message = "Fallback block built", payload_id = fb_payload.payload_id.to_string(), @@ -234,24 +266,40 @@ where .record(info.executed_transactions.len() as f64); if ctx.attributes().no_tx_pool { - tracing::info!( + info!( target: "payload_builder", "No transaction pool, skipping transaction pool processing", ); - self.metrics + ctx.metrics .total_block_built_duration .record(block_build_start_time.elapsed()); // return early since we don't need to build a block with transactions from the pool return Ok(()); } - let gas_per_batch = ctx.block_gas_limit() / self.config.flashblocks_per_block(); + // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable + let (flashblocks_per_block, first_flashblock_offset) = + self.calculate_flashblocks(time_drift); + info!( + target: "payload_builder", + message = "Performed flashblocks timing derivation", + flashblocks_per_block, + first_flashblock_offset = first_flashblock_offset.as_millis(), + flashblocks_interval = self.config.specific.interval.as_millis(), + ); + ctx.metrics + .target_flashblock + .record(flashblocks_per_block as f64); + ctx.metrics + .first_flashblock_time_offset + .record(first_flashblock_offset.as_millis() as f64); + let gas_per_batch = ctx.block_gas_limit() / flashblocks_per_block; let mut total_gas_per_batch = gas_per_batch; let da_per_batch = ctx .da_config .max_da_block_size() - .map(|da_limit| da_limit / self.config.flashblocks_per_block()); + .map(|da_limit| da_limit / flashblocks_per_block); // Check that builder tx won't affect fb limit too much if let Some(da_limit) = da_per_batch { // We error if we can't insert any tx aside from builder tx in flashblock @@ -261,7 +309,9 @@ where } let mut total_da_per_batch = da_per_batch; - let last_flashblock = self.config.flashblocks_per_block().saturating_sub(1); + // TODO: we should account for a case when we will issue only 1 flashblock + let last_flashblock = flashblocks_per_block.saturating_sub(1); + let mut flashblock_count = 0; // Create a channel to coordinate flashblock building let (build_tx, mut build_rx) = mpsc::channel(1); @@ -270,24 +320,46 @@ where let cancel_clone = ctx.cancel.clone(); let interval = self.config.specific.interval; tokio::spawn(async move { - let mut interval = tokio::time::interval(interval); - loop { - tokio::select! { - // Add a cancellation check that only runs every 10ms to avoid tight polling - _ = tokio::time::sleep(Duration::from_millis(10)) => { - if cancel_clone.is_cancelled() { - tracing::info!(target: "payload_builder", "Job cancelled during sleep, stopping payload building"); - drop(build_tx); - break; - } - } - _ = interval.tick() => { + // We handle first flashblock separately, because it could be shrunk to fit everything + let mut first_interval = tokio::time::interval(first_flashblock_offset); + // If first_flashblock_offset == 0 that means all flashblock are proper, and we just + // skip this cusom logic + if first_flashblock_offset.as_millis() != 0 { + let cancelled = cancel_clone + .run_until_cancelled(async { + // We send first signal straight away and then wait first_interval duration + // to send second signal and start building normal blocks + for _ in 0..2 { + first_interval.tick().await; + // TODO: maybe return None if cancelled if let Err(err) = build_tx.send(()).await { error!(target: "payload_builder", "Error sending build signal: {}", err); - break; } } + }) + .await; + if cancelled.is_none() { + info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + drop(build_tx); + return; + } + } + // Handle rest of fbs in steady rate + let mut interval = tokio::time::interval(interval); + let cancelled = cancel_clone.run_until_cancelled(async { + // First tick completes immediately + interval.tick().await; + loop { + interval.tick().await; + if let Err(err) = build_tx.send(()).await { + error!(target: "payload_builder", "Error sending build signal: {}", err); + break; + } } + }).await; + if cancelled.is_none() { + info!(target: "payload_builder", "Building job cancelled, stopping payload building"); + drop(build_tx); } }); @@ -312,7 +384,7 @@ where rt.block_on(async { // Check for cancellation first if ctx.cancel.is_cancelled() { - tracing::info!( + info!( target: "payload_builder", "Job cancelled, stopping payload building", ); @@ -327,10 +399,10 @@ where // Exit loop if channel closed or cancelled match received { Some(()) => { - if flashblock_count >= self.config.flashblocks_per_block() { - tracing::info!( + if flashblock_count >= flashblocks_per_block { + info!( target: "payload_builder", - target = self.config.flashblocks_per_block(), + target = flashblocks_per_block, flashblock_count = flashblock_count, block_number = ctx.block_number(), "Skipping flashblock reached target", @@ -339,7 +411,7 @@ where } // Continue with flashblock building - tracing::info!( + info!( target: "payload_builder", block_number = ctx.block_number(), flashblock_count = flashblock_count, @@ -381,25 +453,30 @@ where .record(best_txs_start_time.elapsed()); let tx_execution_start_time = Instant::now(); - ctx.execute_best_transactions( - &mut info, - &mut db, - best_txs, - total_gas_per_batch.min(ctx.block_gas_limit()), - total_da_per_batch, - )?; - ctx.metrics - .payload_tx_simulation_duration - .record(tx_execution_start_time.elapsed()); - - if ctx.cancel.is_cancelled() { - tracing::info!( + if ctx + .execute_best_transactions( + &mut info, + &mut db, + best_txs, + total_gas_per_batch.min(ctx.block_gas_limit()), + total_da_per_batch, + )? + .is_some() + { + // Handles job cancellation + info!( target: "payload_builder", "Job cancelled, stopping payload building", ); + ctx.metrics + .payload_tx_simulation_duration + .record(tx_execution_start_time.elapsed()); // if the job was cancelled, stop return Ok(()); } + ctx.metrics + .payload_tx_simulation_duration + .record(tx_execution_start_time.elapsed()); // TODO: temporary we add builder tx to the first flashblock too invoke_on_first_flashblock(flashblock_count, || { @@ -421,7 +498,7 @@ where match build_result { Err(err) => { // Track invalid/bad block - self.metrics.invalid_blocks_count.increment(1); + ctx.metrics.invalid_blocks_count.increment(1); error!(target: "payload_builder", "Failed to build block {}, flashblock {}: {}", ctx.block_number(), flashblock_count, err); // Return the error return Err(err); @@ -435,7 +512,7 @@ where .map_err(PayloadBuilderError::other)?; // Record flashblock build duration - self.metrics + ctx.metrics .flashblock_build_duration .record(flashblock_build_start_time.elapsed()); ctx.metrics @@ -457,22 +534,21 @@ where } } flashblock_count += 1; - tracing::info!( + info!( target: "payload_builder", message = "Flashblock built", ?flashblock_count, current_gas = info.cumulative_gas_used, current_da = info.cumulative_da_bytes_used, + target_flashblocks = flashblocks_per_block, ); } } } None => { // Exit loop if channel closed or cancelled - self.metrics.block_built_success.increment(1); - self.metrics - .flashblock_count - .record(flashblock_count as f64); + ctx.metrics.block_built_success.increment(1); + ctx.metrics.flashblock_count.record(flashblock_count as f64); debug!( target: "payload_builder", message = "Payload building complete, channel closed or job cancelled" @@ -483,6 +559,34 @@ where } } } + + /// Calculate number of flashblocks. + /// If dynamic is enabled this function will take time drift into the account. + pub fn calculate_flashblocks(&self, time_drift: Option) -> (u64, Duration) { + if !self.config.specific.dynamic_adjustment || time_drift.is_none() { + return ( + self.config.flashblocks_per_block(), + self.config.specific.interval, + ); + } + let time_drift = time_drift.unwrap(); + let interval = self.config.specific.interval.as_millis(); + let time_drift = time_drift.as_millis(); + let first_flashblock_offset = time_drift.rem(interval); + let flashblocks_per_block = if first_flashblock_offset == 0 { + // In this case all flashblock are full and they all in division + time_drift.div(interval) + } else { + // If we have any reminder that mean the first flashblock won't be in division + // so we add it manually. + time_drift.div(interval) + 1 + }; + // We won't have any problems because of casting + ( + flashblocks_per_block as u64, + Duration::from_millis(first_flashblock_offset as u64), + ) + } } impl crate::builders::generator::PayloadBuilder for OpPayloadBuilder diff --git a/crates/op-rbuilder/src/builders/mod.rs b/crates/op-rbuilder/src/builders/mod.rs index ce0974e33..926fd8421 100644 --- a/crates/op-rbuilder/src/builders/mod.rs +++ b/crates/op-rbuilder/src/builders/mod.rs @@ -74,7 +74,7 @@ pub struct BuilderConfig { pub revert_protection: bool, /// The interval at which blocks are added to the chain. - /// This is also the frequency at which the builder will be receiving FCU rquests from the + /// This is also the frequency at which the builder will be receiving FCU requests from the /// sequencer. pub block_time: Duration, @@ -100,6 +100,9 @@ pub struct BuilderConfig { // (not just 0.5s) because of that. pub block_time_leeway: Duration, + /// Inverted sampling frequency in blocks. 1 - each block, 100 - every 100th block. + pub sampling_ratio: u64, + /// Configuration values that are specific to the block builder implementation used. pub specific: Specific, } @@ -132,6 +135,7 @@ impl Default for BuilderConfig { block_time_leeway: Duration::from_millis(500), da_config: OpDAConfig::default(), specific: S::default(), + sampling_ratio: 100, } } } @@ -149,6 +153,7 @@ where block_time: Duration::from_millis(args.chain_block_time), block_time_leeway: Duration::from_secs(args.extra_block_deadline_secs), da_config: Default::default(), + sampling_ratio: args.telemetry.sampling_ratio, specific: S::try_from(args)?, }) } diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 32dbd3b52..d3ed291ed 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -76,6 +76,12 @@ pub struct OpRBuilderMetrics { pub da_block_size_limit: Gauge, /// Da tx size limit pub da_tx_size_limit: Gauge, + /// Desired number of flashblocks + pub target_flashblock: Histogram, + /// Time drift that we account for in the beginning of block building + pub flashblock_time_drift: Histogram, + /// Time offset we used for first flashblock + pub first_flashblock_time_offset: Histogram, /// Number of valid bundles received at the eth_sendBundle endpoint pub bundles_received: Counter, /// Number of reverted bundles diff --git a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs index 5a98b5d0d..339d332bd 100644 --- a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs +++ b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs @@ -21,6 +21,8 @@ async fn chain_produces_blocks() -> eyre::Result<()> { flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, + flashblocks_leeway_time: 0, + flashblocks_dynamic: false, }, ..Default::default() })