From 98449c7e7941c7ba682a3e5bee779985eecc4de8 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 14 Oct 2025 17:24:43 +0400 Subject: [PATCH 01/13] v1 impl --- Cargo.lock | 1 + Cargo.toml | 1 + crates/rollup-boost/Cargo.toml | 2 + crates/rollup-boost/src/flashblocks/args.rs | 66 ++++++++++++- .../rollup-boost/src/flashblocks/inbound.rs | 95 ++++++++++++------- .../rollup-boost/src/flashblocks/launcher.rs | 6 +- crates/websocket-proxy/Cargo.toml | 2 +- 7 files changed, 134 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f46b0895..595b3d1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10192,6 +10192,7 @@ dependencies = [ "alloy-serde", "anyhow", "assert_cmd", + "backoff", "bytes", "clap", "ctor", diff --git a/Cargo.toml b/Cargo.toml index 55226aec..0695601f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1", features = ["full"] } eyre = "0.6.12" url = "2.2.0" sha2 = { version = "0.10", default-features = false } +backoff = "0.4.0" # Reth deps reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.3" } diff --git a/crates/rollup-boost/Cargo.toml b/crates/rollup-boost/Cargo.toml index 1db81c8e..a93f870b 100644 --- a/crates/rollup-boost/Cargo.toml +++ b/crates/rollup-boost/Cargo.toml @@ -64,6 +64,8 @@ paste = "1.0.15" parking_lot = "0.12.3" tokio-util = { version = "0.7.13" } dashmap = "6.1.0" +backoff.workspace = true + [dev-dependencies] rand = "0.9.0" diff --git a/crates/rollup-boost/src/flashblocks/args.rs b/crates/rollup-boost/src/flashblocks/args.rs index 1b5a7780..210a4f6f 100644 --- a/crates/rollup-boost/src/flashblocks/args.rs +++ b/crates/rollup-boost/src/flashblocks/args.rs @@ -1,4 +1,6 @@ +use std::time::Duration; use clap::Parser; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use url::Url; #[derive(Parser, Clone, Debug)] @@ -19,7 +21,67 @@ pub struct FlashblocksArgs { #[arg(long, env, default_value = "1112")] pub flashblocks_port: u16, - /// Time used for timeout if builder disconnected + /// Websocket connection configuration + #[command(flatten)] + pub flashblocks_ws_config: FlashblocksWebsocketConfig, +} + + +#[derive(Parser, Debug, Clone, Copy)] +pub struct FlashblocksWebsocketConfig { + /// Minimum time for exponential backoff for timeout if builder disconnected + #[arg(long, env, default_value = "10")] + pub flashblock_builder_ws_initial_reconnect_ms: u64, + + /// Maximum time for exponential backoff for timeout if builder disconnected #[arg(long, env, default_value = "5000")] - pub flashblock_builder_ws_reconnect_ms: u64, + pub flashblock_builder_ws_max_reconnect_ms: u64, + + /// Interval in milliseconds between ping messages sent to upstream servers to detect unresponsive connections + #[arg(long, env, default_value = "500")] + pub flashblock_builder_ws_ping_interval_ms: u64, + + /// Timeout in milliseconds to wait for pong responses from upstream servers before considering the connection dead + #[arg(long, env, default_value = "2000")] + pub flashblock_builder_ws_pong_timeout_ms: u64, + + /// Timeout in milliseconds for reading data from upstream servers before considering the connection dead + #[arg(long, env, default_value = "1500")] + pub flashblock_builder_ws_read_timeout_ms: u64, } + +impl FlashblocksWebsocketConfig { + /// Creates `ExponentialBackoff` use to control builder websocket reconnection time + pub fn backoff(&self) -> ExponentialBackoff { + ExponentialBackoffBuilder::default() + .with_initial_interval(self.initial_interval()) + .with_max_interval(self.max_interval()) + .with_max_elapsed_time(None) + .build() + } + + /// Returns initial time for exponential backoff + pub fn initial_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_initial_reconnect_ms) + } + + /// Returns maximal time for exponential backoff + pub fn max_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_max_reconnect_ms) + } + + /// Returns ping interval + pub fn ping_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_ping_interval_ms) + } + + /// Returns pong interval + pub fn pong_interval(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_pong_timeout_ms) + } + + /// Returns read timeout + pub fn read_timeout(&self) -> Duration { + Duration::from_millis(self.flashblock_builder_ws_read_timeout_ms) + } +} \ No newline at end of file diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index a5f6e053..14698be3 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -1,5 +1,7 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Duration; - +use backoff::backoff::Backoff; use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayloadV1}; use futures::{SinkExt, StreamExt}; use tokio::{sync::mpsc, time::interval}; @@ -7,6 +9,7 @@ use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use url::Url; +use crate::{FlashblocksArgs, FlashblocksWebsocketConfig}; #[derive(Debug, thiserror::Error)] enum FlashblocksReceiverError { @@ -16,8 +19,14 @@ enum FlashblocksReceiverError { #[error("Ping failed")] PingFailed, - #[error("Read timeout")] - ReadTimeout, + #[error("Pong timeout")] + PongTimeout, + + #[error("Flashblock timeout")] + FlashblockTimeout, + + #[error("Websocket haven't return the message")] + MessageMissing, #[error("Connection error: {0}")] ConnectionError(String), @@ -35,28 +44,31 @@ enum FlashblocksReceiverError { pub struct FlashblocksReceiverService { url: Url, sender: mpsc::Sender, - reconnect_ms: u64, + websocket_config: FlashblocksWebsocketConfig, metrics: FlashblocksWsInboundMetrics, } impl FlashblocksReceiverService { - pub fn new(url: Url, sender: mpsc::Sender, reconnect_ms: u64) -> Self { + pub fn new(url: Url, sender: mpsc::Sender, websocket_config: FlashblocksWebsocketConfig) -> Self { Self { url, sender, - reconnect_ms, + websocket_config, metrics: Default::default(), } } pub async fn run(self) { + let mut backoff = self.websocket_config.backoff(); loop { if let Err(e) = self.connect_and_handle().await { - error!("Flashblocks receiver connection error, retrying in 5 seconds: {e}"); + let interval = backoff.next_backoff().expect("max_elapsed_time not set, never None"); + error!("Flashblocks receiver connection error, retrying in {}ms: {}", interval.as_millis(), e); self.metrics.reconnect_attempts.increment(1); self.metrics.connection_status.set(0); - tokio::time::sleep(std::time::Duration::from_millis(self.reconnect_ms)).await; + tokio::time::sleep(interval).await; } else { + backoff.reset(); break; } } @@ -72,9 +84,8 @@ impl FlashblocksReceiverService { let cancel_token = CancellationToken::new(); let cancel_for_ping = cancel_token.clone(); + let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { - let mut ping_interval = interval(Duration::from_millis(500)); - loop { tokio::select! { _ = ping_interval.tick() => { @@ -93,35 +104,53 @@ impl FlashblocksReceiverService { let sender = self.sender.clone(); let metrics = self.metrics.clone(); - let read_timeout = Duration::from_millis(1500); + let read_timeout = self.websocket_config.read_timeout(); + let pong_timeout = self.websocket_config.pong_interval(); let message_handle = tokio::spawn(async move { + let mut flashblock_interval = interval(read_timeout); + let mut pong_interval = interval(pong_timeout); + // We await here because first tick executes immediately + flashblock_interval.tick().await; + pong_interval.tick().await; loop { - let result = tokio::time::timeout(read_timeout, read.next()) - .await - .map_err(|_| FlashblocksReceiverError::ReadTimeout)?; - - match result { - Some(Ok(msg)) => match msg { - Message::Text(text) => { - metrics.messages_received.increment(1); - if let Ok(flashblocks_msg) = - serde_json::from_str::(&text) - { - sender.send(flashblocks_msg).await.map_err(|e| { - FlashblocksReceiverError::SendError(Box::new(e)) - })?; + tokio::select! { + result = read.next() => { + match result { + Some(Ok(msg)) => match msg { + Message::Text(text) => { + metrics.messages_received.increment(1); + // Refresh flashblock interval + flashblock_interval.reset(); + if let Ok(flashblocks_msg) = + serde_json::from_str::(&text) + { + sender.send(flashblocks_msg).await.map_err(|e| { + FlashblocksReceiverError::SendError(Box::new(e)) + })?; + } + } + Message::Close(_) => { + return Err(FlashblocksReceiverError::ConnectionClosed); + } + Message::Pong(_) => { + // Refresh pong interval + pong_interval.reset(); + } + _ => {} + }, + Some(Err(e)) => { + return Err(FlashblocksReceiverError::ConnectionError(e.to_string())); + } + None => { + return Err(FlashblocksReceiverError::MessageMissing); } } - Message::Close(_) => { - return Err(FlashblocksReceiverError::ConnectionClosed); - } - _ => {} }, - Some(Err(e)) => { - return Err(FlashblocksReceiverError::ConnectionError(e.to_string())); + _ = flashblock_interval.tick() => { + return Err(FlashblocksReceiverError::FlashblockTimeout); } - None => { - return Err(FlashblocksReceiverError::ReadTimeout); + _ = pong_interval.tick() => { + return Err(FlashblocksReceiverError::PongTimeout); } }; } diff --git a/crates/rollup-boost/src/flashblocks/launcher.rs b/crates/rollup-boost/src/flashblocks/launcher.rs index fd8f0fb9..8b1e1ed5 100644 --- a/crates/rollup-boost/src/flashblocks/launcher.rs +++ b/crates/rollup-boost/src/flashblocks/launcher.rs @@ -1,5 +1,5 @@ use crate::flashblocks::inbound::FlashblocksReceiverService; -use crate::{FlashblocksService, RpcClient}; +use crate::{FlashblocksService, FlashblocksWebsocketConfig, RpcClient}; use core::net::SocketAddr; use tokio::sync::mpsc; use url::Url; @@ -11,11 +11,11 @@ impl Flashblocks { builder_url: RpcClient, flashblocks_url: Url, outbound_addr: SocketAddr, - reconnect_ms: u64, + websocket_config: FlashblocksWebsocketConfig, ) -> eyre::Result { let (tx, rx) = mpsc::channel(100); - let receiver = FlashblocksReceiverService::new(flashblocks_url, tx, reconnect_ms); + let receiver = FlashblocksReceiverService::new(flashblocks_url, tx, websocket_config); tokio::spawn(async move { let _ = receiver.run().await; }); diff --git a/crates/websocket-proxy/Cargo.toml b/crates/websocket-proxy/Cargo.toml index 14785f79..1cf24afe 100644 --- a/crates/websocket-proxy/Cargo.toml +++ b/crates/websocket-proxy/Cargo.toml @@ -24,7 +24,7 @@ metrics-exporter-prometheus = { version = "0.17.0", features = [ http = "1.2.0" axum = { version = "0.8.1", features = ["ws"] } dotenvy = "0.15.7" -backoff = "0.4.0" +backoff.workspace = true reqwest = { version = "0.12.15", default-features = false, features = [ "native-tls", ] } From 9c023ce28d65c840d09512b8ab9d9458bfc41b21 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 14 Oct 2025 19:06:28 +0400 Subject: [PATCH 02/13] v1 impl --- Cargo.lock | 1 + crates/rollup-boost/Cargo.toml | 3 +- crates/rollup-boost/src/flashblocks/args.rs | 18 +--- .../rollup-boost/src/flashblocks/inbound.rs | 82 +++++++++++++------ 4 files changed, 63 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 595b3d1d..6642b5ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10239,6 +10239,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber 0.3.20", "url", + "uuid", "vergen", "vergen-git2", ] diff --git a/crates/rollup-boost/Cargo.toml b/crates/rollup-boost/Cargo.toml index a93f870b..c7d7d638 100644 --- a/crates/rollup-boost/Cargo.toml +++ b/crates/rollup-boost/Cargo.toml @@ -65,7 +65,8 @@ parking_lot = "0.12.3" tokio-util = { version = "0.7.13" } dashmap = "6.1.0" backoff.workspace = true - +uuid = { version = "1.17.0", features = ["v4", "v7"] } +bytes = "1.10.1" [dev-dependencies] rand = "0.9.0" diff --git a/crates/rollup-boost/src/flashblocks/args.rs b/crates/rollup-boost/src/flashblocks/args.rs index 210a4f6f..b25c5974 100644 --- a/crates/rollup-boost/src/flashblocks/args.rs +++ b/crates/rollup-boost/src/flashblocks/args.rs @@ -1,6 +1,6 @@ -use std::time::Duration; -use clap::Parser; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; +use clap::Parser; +use std::time::Duration; use url::Url; #[derive(Parser, Clone, Debug)] @@ -26,7 +26,6 @@ pub struct FlashblocksArgs { pub flashblocks_ws_config: FlashblocksWebsocketConfig, } - #[derive(Parser, Debug, Clone, Copy)] pub struct FlashblocksWebsocketConfig { /// Minimum time for exponential backoff for timeout if builder disconnected @@ -42,12 +41,8 @@ pub struct FlashblocksWebsocketConfig { pub flashblock_builder_ws_ping_interval_ms: u64, /// Timeout in milliseconds to wait for pong responses from upstream servers before considering the connection dead - #[arg(long, env, default_value = "2000")] - pub flashblock_builder_ws_pong_timeout_ms: u64, - - /// Timeout in milliseconds for reading data from upstream servers before considering the connection dead #[arg(long, env, default_value = "1500")] - pub flashblock_builder_ws_read_timeout_ms: u64, + pub flashblock_builder_ws_pong_timeout_ms: u64, } impl FlashblocksWebsocketConfig { @@ -79,9 +74,4 @@ impl FlashblocksWebsocketConfig { pub fn pong_interval(&self) -> Duration { Duration::from_millis(self.flashblock_builder_ws_pong_timeout_ms) } - - /// Returns read timeout - pub fn read_timeout(&self) -> Duration { - Duration::from_millis(self.flashblock_builder_ws_read_timeout_ms) - } -} \ No newline at end of file +} diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 14698be3..b3962302 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -1,15 +1,15 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use std::time::Duration; -use backoff::backoff::Backoff; use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayloadV1}; +use crate::FlashblocksWebsocketConfig; +use backoff::backoff::Backoff; +use bytes::Bytes; +use dashmap::DashSet; use futures::{SinkExt, StreamExt}; +use std::sync::Arc; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use url::Url; -use crate::{FlashblocksArgs, FlashblocksWebsocketConfig}; #[derive(Debug, thiserror::Error)] enum FlashblocksReceiverError { @@ -22,9 +22,6 @@ enum FlashblocksReceiverError { #[error("Pong timeout")] PongTimeout, - #[error("Flashblock timeout")] - FlashblockTimeout, - #[error("Websocket haven't return the message")] MessageMissing, @@ -49,7 +46,11 @@ pub struct FlashblocksReceiverService { } impl FlashblocksReceiverService { - pub fn new(url: Url, sender: mpsc::Sender, websocket_config: FlashblocksWebsocketConfig) -> Self { + pub fn new( + url: Url, + sender: mpsc::Sender, + websocket_config: FlashblocksWebsocketConfig, + ) -> Self { Self { url, sender, @@ -62,8 +63,14 @@ impl FlashblocksReceiverService { let mut backoff = self.websocket_config.backoff(); loop { if let Err(e) = self.connect_and_handle().await { - let interval = backoff.next_backoff().expect("max_elapsed_time not set, never None"); - error!("Flashblocks receiver connection error, retrying in {}ms: {}", interval.as_millis(), e); + let interval = backoff + .next_backoff() + .expect("max_elapsed_time not set, never None"); + error!( + "Flashblocks receiver connection error, retrying in {}ms: {}", + interval.as_millis(), + e + ); self.metrics.reconnect_attempts.increment(1); self.metrics.connection_status.set(0); tokio::time::sleep(interval).await; @@ -84,14 +91,19 @@ impl FlashblocksReceiverService { let cancel_token = CancellationToken::new(); let cancel_for_ping = cancel_token.clone(); + let ping_map = Arc::new(DashSet::with_capacity(10)); + let pong_map = ping_map.clone(); + let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { loop { tokio::select! { _ = ping_interval.tick() => { - if write.send(Message::Ping(Default::default())).await.is_err() { + let uuid = uuid::Uuid::now_v7(); + if write.send(Message::Ping(Bytes::copy_from_slice(uuid.as_bytes().as_slice()))).await.is_err() { return Err(FlashblocksReceiverError::PingFailed); } + ping_map.insert(uuid); } _ = cancel_for_ping.cancelled() => { tracing::debug!("Ping task cancelled"); @@ -104,13 +116,10 @@ impl FlashblocksReceiverService { let sender = self.sender.clone(); let metrics = self.metrics.clone(); - let read_timeout = self.websocket_config.read_timeout(); let pong_timeout = self.websocket_config.pong_interval(); let message_handle = tokio::spawn(async move { - let mut flashblock_interval = interval(read_timeout); let mut pong_interval = interval(pong_timeout); // We await here because first tick executes immediately - flashblock_interval.tick().await; pong_interval.tick().await; loop { tokio::select! { @@ -119,8 +128,6 @@ impl FlashblocksReceiverService { Some(Ok(msg)) => match msg { Message::Text(text) => { metrics.messages_received.increment(1); - // Refresh flashblock interval - flashblock_interval.reset(); if let Ok(flashblocks_msg) = serde_json::from_str::(&text) { @@ -132,11 +139,25 @@ impl FlashblocksReceiverService { Message::Close(_) => { return Err(FlashblocksReceiverError::ConnectionClosed); } - Message::Pong(_) => { - // Refresh pong interval - pong_interval.reset(); + Message::Pong(data) => { + match uuid::Uuid::from_slice(data.as_ref()) { + Ok(uuid) => { + if pong_map.remove(&uuid).is_some() { + pong_interval.reset(); + } else { + tracing::warn!("Received pong with unknown data {}", uuid); + } + + } + Err(e) => { + tracing::warn!("Failed to parse pong: {e}"); + } + } + + } + msg => { + tracing::warn!("Received unexpected message: {:?}", msg); } - _ => {} }, Some(Err(e)) => { return Err(FlashblocksReceiverError::ConnectionError(e.to_string())); @@ -146,9 +167,6 @@ impl FlashblocksReceiverService { } } }, - _ = flashblock_interval.tick() => { - return Err(FlashblocksReceiverError::FlashblockTimeout); - } _ = pong_interval.tick() => { return Err(FlashblocksReceiverError::PongTimeout); } @@ -269,7 +287,13 @@ mod tests { let (tx, mut rx) = mpsc::channel(100); - let service = FlashblocksReceiverService::new(url, tx, 100); + let config = FlashblocksWebsocketConfig { + flashblock_builder_ws_initial_reconnect_ms: 100, + flashblock_builder_ws_max_reconnect_ms: 100, + flashblock_builder_ws_ping_interval_ms: 500, + flashblock_builder_ws_pong_timeout_ms: 2000, + }; + let service = FlashblocksReceiverService::new(url, tx, config); let _ = tokio::spawn(async move { service.run().await; }); @@ -311,9 +335,15 @@ mod tests { let addr = "127.0.0.1:8081".parse::().unwrap(); let (_term, _send_msg, mut ping_rx, url) = start(addr).await?; + let config = FlashblocksWebsocketConfig { + flashblock_builder_ws_initial_reconnect_ms: 100, + flashblock_builder_ws_max_reconnect_ms: 100, + flashblock_builder_ws_ping_interval_ms: 500, + flashblock_builder_ws_pong_timeout_ms: 2000, + }; let (tx, _rx) = mpsc::channel(100); - let service = FlashblocksReceiverService::new(url, tx, 100); + let service = FlashblocksReceiverService::new(url, tx, config); let _ = tokio::spawn(async move { service.run().await; }); From cf13897ed4f5a88e2931270f11b522037692e47b Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 14 Oct 2025 20:03:00 +0400 Subject: [PATCH 03/13] Fix backoff reset --- crates/rollup-boost/src/flashblocks/args.rs | 1 + crates/rollup-boost/src/flashblocks/inbound.rs | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/args.rs b/crates/rollup-boost/src/flashblocks/args.rs index b25c5974..38a62cf1 100644 --- a/crates/rollup-boost/src/flashblocks/args.rs +++ b/crates/rollup-boost/src/flashblocks/args.rs @@ -51,6 +51,7 @@ impl FlashblocksWebsocketConfig { ExponentialBackoffBuilder::default() .with_initial_interval(self.initial_interval()) .with_max_interval(self.max_interval()) + .with_randomization_factor(0 as f64) .with_max_elapsed_time(None) .build() } diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index b3962302..69bfb2f8 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -5,6 +5,7 @@ use bytes::Bytes; use dashmap::DashSet; use futures::{SinkExt, StreamExt}; use std::sync::Arc; +use backoff::ExponentialBackoff; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; @@ -62,7 +63,7 @@ impl FlashblocksReceiverService { pub async fn run(self) { let mut backoff = self.websocket_config.backoff(); loop { - if let Err(e) = self.connect_and_handle().await { + if let Err(e) = self.connect_and_handle(&mut backoff).await { let interval = backoff .next_backoff() .expect("max_elapsed_time not set, never None"); @@ -75,16 +76,18 @@ impl FlashblocksReceiverService { self.metrics.connection_status.set(0); tokio::time::sleep(interval).await; } else { - backoff.reset(); break; } } } - async fn connect_and_handle(&self) -> Result<(), FlashblocksReceiverError> { + async fn connect_and_handle(&self, backoff: &mut ExponentialBackoff) -> Result<(), FlashblocksReceiverError> { let (ws_stream, _) = connect_async(self.url.as_str()).await?; let (mut write, mut read) = ws_stream.split(); + // if we have successfully connected - reset backoff + backoff.reset(); + info!("Connected to Flashblocks receiver at {}", self.url); self.metrics.connection_status.set(1); From ada5ecc50daa2afd6a4c475d0281da63ead9cacb Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Tue, 14 Oct 2025 21:20:15 +0400 Subject: [PATCH 04/13] fmt --- crates/rollup-boost/src/flashblocks/inbound.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 69bfb2f8..7829762c 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -1,11 +1,11 @@ use super::{metrics::FlashblocksWsInboundMetrics, primitives::FlashblocksPayloadV1}; use crate::FlashblocksWebsocketConfig; +use backoff::ExponentialBackoff; use backoff::backoff::Backoff; use bytes::Bytes; use dashmap::DashSet; use futures::{SinkExt, StreamExt}; use std::sync::Arc; -use backoff::ExponentialBackoff; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; @@ -81,7 +81,10 @@ impl FlashblocksReceiverService { } } - async fn connect_and_handle(&self, backoff: &mut ExponentialBackoff) -> Result<(), FlashblocksReceiverError> { + async fn connect_and_handle( + &self, + backoff: &mut ExponentialBackoff, + ) -> Result<(), FlashblocksReceiverError> { let (ws_stream, _) = connect_async(self.url.as_str()).await?; let (mut write, mut read) = ws_stream.split(); From 21ccd15af4778b3d7a6c911ab7edd4bfb0327c5b Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 15 Oct 2025 17:26:45 +0400 Subject: [PATCH 05/13] Add cleaning up --- .../rollup-boost/src/flashblocks/inbound.rs | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 7829762c..f5712ff6 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -11,6 +11,7 @@ use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use url::Url; +use uuid::Timestamp; #[derive(Debug, thiserror::Error)] enum FlashblocksReceiverError { @@ -97,9 +98,25 @@ impl FlashblocksReceiverService { let cancel_token = CancellationToken::new(); let cancel_for_ping = cancel_token.clone(); - let ping_map = Arc::new(DashSet::with_capacity(10)); - let pong_map = ping_map.clone(); + let ping_set = Arc::new(DashSet::with_capacity(10)); + let pong_set = ping_set.clone(); + let cleaning_set = ping_set.clone(); + tokio::spawn(async move { + // clean up pings without pongs older than 60 seconds + loop { + // To simplify logic we just create uuid time timestamp now() - 60s, because they + // support comparison + let unix_now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() - 60; + let ts = Timestamp::from_unix(uuid::timestamp::context::NoContext, unix_now, 0); + let clean_before = uuid::Uuid::new_v7(ts); + cleaning_set.retain(|uuid| uuid > &clean_before); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + } + }); let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { loop { @@ -109,7 +126,7 @@ impl FlashblocksReceiverService { if write.send(Message::Ping(Bytes::copy_from_slice(uuid.as_bytes().as_slice()))).await.is_err() { return Err(FlashblocksReceiverError::PingFailed); } - ping_map.insert(uuid); + ping_set.insert(uuid); } _ = cancel_for_ping.cancelled() => { tracing::debug!("Ping task cancelled"); @@ -148,7 +165,7 @@ impl FlashblocksReceiverService { Message::Pong(data) => { match uuid::Uuid::from_slice(data.as_ref()) { Ok(uuid) => { - if pong_map.remove(&uuid).is_some() { + if pong_set.remove(&uuid).is_some() { pong_interval.reset(); } else { tracing::warn!("Received pong with unknown data {}", uuid); From cd9dedbb6caf43e8edaf91563c89246fb8277531 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 15 Oct 2025 20:11:10 +0400 Subject: [PATCH 06/13] add unit test --- .../rollup-boost/src/flashblocks/inbound.rs | 92 ++++++++++++++++++- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index f5712ff6..7b6fee8e 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -168,9 +168,8 @@ impl FlashblocksReceiverService { if pong_set.remove(&uuid).is_some() { pong_interval.reset(); } else { - tracing::warn!("Received pong with unknown data {}", uuid); + tracing::warn!("Received pong with unknown data:{}", uuid); } - } Err(e) => { tracing::warn!("Failed to parse pong: {e}"); @@ -219,6 +218,7 @@ mod tests { use super::*; use std::net::{SocketAddr, TcpListener}; + use std::sync::atomic::{AtomicBool, Ordering}; async fn start( addr: SocketAddr, @@ -303,6 +303,74 @@ mod tests { Ok((term_tx, send_tx, send_ping_rx, url)) } + async fn start_ping_server( + addr: SocketAddr, + send_pongs: Arc + ) -> eyre::Result<( + watch::Receiver, + mpsc::Receiver, + url::Url, + )> { + let (term_tx, term_rx) = watch::channel(false); + let (send_ping_tx, send_ping_rx) = mpsc::channel(100); + + let listener = TcpListener::bind(addr)?; + let url = Url::parse(&format!("ws://{addr}"))?; + + listener + .set_nonblocking(true) + .expect("Failed to set TcpListener socket to non-blocking"); + + let listener = tokio::net::TcpListener::from_std(listener) + .expect("Failed to convert TcpListener to tokio TcpListener"); + + tokio::spawn(async move { + loop { + tokio::select! { + result = listener.accept() => { + match result { + Ok((connection, _addr)) => { + match accept_async(connection).await { + Ok(ws_stream) => { + let (_, mut read) = ws_stream.split(); + loop { + if send_pongs.load(Ordering::Relaxed) { + let msg = read.next().await; + match msg { + // we need to read for the library to handle pong messages + Some(Ok(Message::Ping(data))) => { + send_ping_tx.send(data).await.unwrap(); + }, + _ => {} + } + + } else { + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + } + } + } + Err(e) => { + eprintln!("Failed to accept WebSocket connection: {}", e); + } + } + } + Err(e) => { + // Optionally break or continue based on error type + if e.kind() == std::io::ErrorKind::Interrupted { + break; + } + } + } + } + } + // If we have broken from the look it means reconnection occured + term_tx.send(true).expect("channel is up"); + } + }); + + Ok((term_rx, send_ping_rx, url)) + } + #[tokio::test] async fn test_flashblocks_receiver_service() -> eyre::Result<()> { let addr = "127.0.0.1:8080".parse::().unwrap(); @@ -357,10 +425,11 @@ mod tests { // ping messages to test the connection periodically let addr = "127.0.0.1:8081".parse::().unwrap(); - let (_term, _send_msg, mut ping_rx, url) = start(addr).await?; + let send_pongs = Arc::new(AtomicBool::new(true)); + let (term, mut ping_rx, url) = start_ping_server(addr, send_pongs.clone()).await?; let config = FlashblocksWebsocketConfig { flashblock_builder_ws_initial_reconnect_ms: 100, - flashblock_builder_ws_max_reconnect_ms: 100, + flashblock_builder_ws_max_reconnect_ms: 1000, flashblock_builder_ws_ping_interval_ms: 500, flashblock_builder_ws_pong_timeout_ms: 2000, }; @@ -372,10 +441,23 @@ mod tests { }); // even if we do not send any messages, we should receive pings to keep the connection alive - for _ in 0..10 { + for _ in 0..5 { ping_rx.recv().await.expect("Failed to receive ping"); } + // Check that server hasn't reconnected because we have answered to pongs + let reconnected = term.has_changed().expect("channel not closed"); + assert!(!reconnected, "reconnected when we answered to pings"); + send_pongs.store(false, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // After this we should be reconnected + let reconnected = term.has_changed().expect("channel not closed"); + assert!(!reconnected, "haven't reconnected before deadline is reached"); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + // After this we should be reconnected + let reconnected = term.has_changed().expect("channel not closed"); + assert!(reconnected, "have reconnected after deadline is reached"); Ok(()) } } From c99cbacb000f5def0ab3364f328bd8d0bc69ba67 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 15 Oct 2025 22:37:32 +0400 Subject: [PATCH 07/13] add unit test --- .../rollup-boost/src/flashblocks/inbound.rs | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 7b6fee8e..393c6868 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -130,6 +130,9 @@ impl FlashblocksReceiverService { } _ = cancel_for_ping.cancelled() => { tracing::debug!("Ping task cancelled"); + if let Err(e) = write.close().await { + tracing::warn!("Failed to close builder ws connection: {}", e); + } return Ok(()); } } @@ -341,6 +344,7 @@ mod tests { Some(Ok(Message::Ping(data))) => { send_ping_tx.send(data).await.unwrap(); }, + Some(Err(_)) => {break;} _ => {} } @@ -363,7 +367,7 @@ mod tests { } } } - // If we have broken from the look it means reconnection occured + // If we have broken from the look it means reconnection occurred term_tx.send(true).expect("channel is up"); } }); @@ -446,18 +450,26 @@ mod tests { } // Check that server hasn't reconnected because we have answered to pongs let reconnected = term.has_changed().expect("channel not closed"); - assert!(!reconnected, "reconnected when we answered to pings"); + assert!(!reconnected, "not reconnected when we answered to pings"); send_pongs.store(false, Ordering::Relaxed); tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // After this we should be reconnected + send_pongs.store(true, Ordering::Relaxed); + // This sleep is to ensure that we will try to read socket and realise it closed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // One second is not enough to break the connection let reconnected = term.has_changed().expect("channel not closed"); - assert!(!reconnected, "haven't reconnected before deadline is reached"); + assert!(!reconnected, "have reconnected before deadline is reached"); + + send_pongs.store(false, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + send_pongs.store(true, Ordering::Relaxed); + // This sleep is to ensure that we will try to read socket and realise it closed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - // After this we should be reconnected + // 3 seconds will cause reconnect let reconnected = term.has_changed().expect("channel not closed"); - assert!(reconnected, "have reconnected after deadline is reached"); + assert!(reconnected, "haven't reconnected after deadline is reached"); Ok(()) } } From d6b545cb3373a5dd383b304ae024e6191d786b86 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Wed, 15 Oct 2025 22:37:48 +0400 Subject: [PATCH 08/13] add unit test --- crates/rollup-boost/src/flashblocks/inbound.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 393c6868..dfe1d841 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -110,7 +110,8 @@ impl FlashblocksReceiverService { let unix_now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() - .as_secs() - 60; + .as_secs() + - 60; let ts = Timestamp::from_unix(uuid::timestamp::context::NoContext, unix_now, 0); let clean_before = uuid::Uuid::new_v7(ts); cleaning_set.retain(|uuid| uuid > &clean_before); @@ -308,12 +309,8 @@ mod tests { async fn start_ping_server( addr: SocketAddr, - send_pongs: Arc - ) -> eyre::Result<( - watch::Receiver, - mpsc::Receiver, - url::Url, - )> { + send_pongs: Arc, + ) -> eyre::Result<(watch::Receiver, mpsc::Receiver, url::Url)> { let (term_tx, term_rx) = watch::channel(false); let (send_ping_tx, send_ping_rx) = mpsc::channel(100); From 294410edf865c6a937b5a45781f581c3c1946ab6 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Thu, 16 Oct 2025 13:34:16 +0400 Subject: [PATCH 09/13] Use LRU cache --- Cargo.lock | 31 ++++- crates/rollup-boost/Cargo.toml | 1 + .../rollup-boost/src/flashblocks/inbound.rs | 120 +++++++++--------- 3 files changed, 91 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6642b5ff..54dfe68e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,7 +394,7 @@ dependencies = [ "cfg-if", "const-hex", "derive_more", - "foldhash", + "foldhash 0.1.5", "getrandom 0.3.3", "hashbrown 0.15.4", "indexmap 2.10.0", @@ -3412,6 +3412,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -3771,10 +3777,21 @@ checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", "serde", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] + [[package]] name = "hashlink" version = "0.9.1" @@ -5149,6 +5166,15 @@ dependencies = [ "hashbrown 0.15.4", ] +[[package]] +name = "lru" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96051b46fc183dc9cd4a223960ef37b9af631b55191852a8274bfef064cda20f" +dependencies = [ + "hashbrown 0.16.0", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -10206,6 +10232,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "jsonrpsee 0.25.1", + "lru 0.16.2", "metrics", "metrics-derive", "metrics-exporter-prometheus 0.16.2", diff --git a/crates/rollup-boost/Cargo.toml b/crates/rollup-boost/Cargo.toml index c7d7d638..d7a00817 100644 --- a/crates/rollup-boost/Cargo.toml +++ b/crates/rollup-boost/Cargo.toml @@ -67,6 +67,7 @@ dashmap = "6.1.0" backoff.workspace = true uuid = { version = "1.17.0", features = ["v4", "v7"] } bytes = "1.10.1" +lru = "0.16" [dev-dependencies] rand = "0.9.0" diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index dfe1d841..17ba0f6f 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -3,15 +3,18 @@ use crate::FlashblocksWebsocketConfig; use backoff::ExponentialBackoff; use backoff::backoff::Backoff; use bytes::Bytes; -use dashmap::DashSet; use futures::{SinkExt, StreamExt}; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; +use std::sync::Mutex; use tokio::{sync::mpsc, time::interval}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use url::Url; -use uuid::Timestamp; + +const MAXIMUM_PINGS: NonZeroUsize = NonZeroUsize::new(60).expect("positive number always non zero"); #[derive(Debug, thiserror::Error)] enum FlashblocksReceiverError { @@ -38,6 +41,9 @@ enum FlashblocksReceiverError { #[error("Failed to send message to sender: {0}")] SendError(#[from] Box>), + + #[error("Ping mutex poisoned")] + MutexPoisoned, } pub struct FlashblocksReceiverService { @@ -98,26 +104,11 @@ impl FlashblocksReceiverService { let cancel_token = CancellationToken::new(); let cancel_for_ping = cancel_token.clone(); - let ping_set = Arc::new(DashSet::with_capacity(10)); - let pong_set = ping_set.clone(); - let cleaning_set = ping_set.clone(); - - tokio::spawn(async move { - // clean up pings without pongs older than 60 seconds - loop { - // To simplify logic we just create uuid time timestamp now() - 60s, because they - // support comparison - let unix_now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() - - 60; - let ts = Timestamp::from_unix(uuid::timestamp::context::NoContext, unix_now, 0); - let clean_before = uuid::Uuid::new_v7(ts); - cleaning_set.retain(|uuid| uuid > &clean_before); - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - } - }); + // LRU cache with capacity of 60 pings - automatically evicts oldest entries + let ping_cache = Arc::new(Mutex::new( + LruCache::new(MAXIMUM_PINGS), + )); + let pong_cache = ping_cache.clone(); let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { loop { @@ -127,7 +118,14 @@ impl FlashblocksReceiverService { if write.send(Message::Ping(Bytes::copy_from_slice(uuid.as_bytes().as_slice()))).await.is_err() { return Err(FlashblocksReceiverError::PingFailed); } - ping_set.insert(uuid); + match ping_cache.lock() { + Ok(mut cache) => { + cache.put(uuid, ()); + } + Err(_) => { + return Err(FlashblocksReceiverError::MutexPoisoned); + } + } } _ = cancel_for_ping.cancelled() => { tracing::debug!("Ping task cancelled"); @@ -169,10 +167,17 @@ impl FlashblocksReceiverService { Message::Pong(data) => { match uuid::Uuid::from_slice(data.as_ref()) { Ok(uuid) => { - if pong_set.remove(&uuid).is_some() { - pong_interval.reset(); - } else { - tracing::warn!("Received pong with unknown data:{}", uuid); + match pong_cache.lock() { + Ok(mut cache) => { + if cache.pop(&uuid).is_some() { + pong_interval.reset(); + } else { + tracing::warn!("Received pong with unknown data:{}", uuid); + } + } + Err(_) => { + return Err(FlashblocksReceiverError::MutexPoisoned); + } } } Err(e) => { @@ -319,52 +324,49 @@ mod tests { listener .set_nonblocking(true) - .expect("Failed to set TcpListener socket to non-blocking"); + .expect("can set TcpListener socket to non-blocking"); let listener = tokio::net::TcpListener::from_std(listener) - .expect("Failed to convert TcpListener to tokio TcpListener"); + .expect("can convert TcpListener to tokio TcpListener"); tokio::spawn(async move { loop { - tokio::select! { - result = listener.accept() => { - match result { - Ok((connection, _addr)) => { - match accept_async(connection).await { - Ok(ws_stream) => { - let (_, mut read) = ws_stream.split(); - loop { - if send_pongs.load(Ordering::Relaxed) { - let msg = read.next().await; - match msg { - // we need to read for the library to handle pong messages - Some(Ok(Message::Ping(data))) => { - send_ping_tx.send(data).await.unwrap(); - }, - Some(Err(_)) => {break;} - _ => {} - } - - } else { - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - } + let result = listener.accept().await; + match result { + Ok((connection, _addr)) => { + match accept_async(connection).await { + Ok(ws_stream) => { + let (_, mut read) = ws_stream.split(); + loop { + if send_pongs.load(Ordering::Relaxed) { + let msg = read.next().await; + match msg { + // we need to read for the library to handle pong messages + Some(Ok(Message::Ping(data))) => { + send_ping_tx.send(data).await.unwrap(); + }, + Some(Err(_)) => {break;} + _ => {} } - } - Err(e) => { - eprintln!("Failed to accept WebSocket connection: {}", e); + + } else { + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; } } } Err(e) => { - // Optionally break or continue based on error type - if e.kind() == std::io::ErrorKind::Interrupted { - break; - } + eprintln!("Failed to accept WebSocket connection: {}", e); } } } + Err(e) => { + // Optionally break or continue based on error type + if e.kind() == std::io::ErrorKind::Interrupted { + break; + } + } } - // If we have broken from the look it means reconnection occurred + // If we have broken from the loop it means reconnection occurred term_tx.send(true).expect("channel is up"); } }); From 1da7ec728b5c8cdb2a34c2cf0c814df4f80d69d2 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Thu, 16 Oct 2025 13:45:11 +0400 Subject: [PATCH 10/13] Use LRU cache --- crates/rollup-boost/src/flashblocks/inbound.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index 17ba0f6f..ea1482a4 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -105,9 +105,7 @@ impl FlashblocksReceiverService { let cancel_for_ping = cancel_token.clone(); // LRU cache with capacity of 60 pings - automatically evicts oldest entries - let ping_cache = Arc::new(Mutex::new( - LruCache::new(MAXIMUM_PINGS), - )); + let ping_cache = Arc::new(Mutex::new(LruCache::new(MAXIMUM_PINGS))); let pong_cache = ping_cache.clone(); let mut ping_interval = interval(self.websocket_config.ping_interval()); let ping_task = tokio::spawn(async move { @@ -344,13 +342,15 @@ mod tests { // we need to read for the library to handle pong messages Some(Ok(Message::Ping(data))) => { send_ping_tx.send(data).await.unwrap(); - }, - Some(Err(_)) => {break;} + } + Some(Err(_)) => { + break; + } _ => {} } - } else { - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(1)) + .await; } } } From 769bea4feab82be1ed4859544ab187ba07e4c3a3 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Thu, 16 Oct 2025 14:11:19 +0400 Subject: [PATCH 11/13] unwraps -> expect --- .../rollup-boost/src/flashblocks/inbound.rs | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index ea1482a4..e79e2cad 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -151,12 +151,11 @@ impl FlashblocksReceiverService { Some(Ok(msg)) => match msg { Message::Text(text) => { metrics.messages_received.increment(1); - if let Ok(flashblocks_msg) = - serde_json::from_str::(&text) - { - sender.send(flashblocks_msg).await.map_err(|e| { - FlashblocksReceiverError::SendError(Box::new(e)) - })?; + match serde_json::from_str::(&text) { + Ok(flashblocks_msg) => sender.send(flashblocks_msg).await.map_err(|e| { + FlashblocksReceiverError::SendError(Box::new(e)) + })?, + Err(e) => error!("Failed to process flashblock, error: {e}") } } Message::Close(_) => { @@ -268,16 +267,16 @@ mod tests { loop { tokio::select! { Some(msg) = send_rx.recv() => { - let serialized = serde_json::to_string(&msg).unwrap(); + let serialized = serde_json::to_string(&msg).expect("message serialized"); let utf8_bytes = Utf8Bytes::from(serialized); - write.send(Message::Text(utf8_bytes)).await.unwrap(); + write.send(Message::Text(utf8_bytes)).await.expect("message sent"); }, msg = read.next() => { match msg { // we need to read for the library to handle pong messages Some(Ok(Message::Ping(_))) => { - send_ping_tx.send(()).await.unwrap(); + send_ping_tx.send(()).await.expect("ping notification sent"); }, _ => {} } @@ -341,7 +340,10 @@ mod tests { match msg { // we need to read for the library to handle pong messages Some(Ok(Message::Ping(data))) => { - send_ping_tx.send(data).await.unwrap(); + send_ping_tx + .send(data) + .await + .expect("ping data sent"); } Some(Err(_)) => { break; @@ -376,7 +378,9 @@ mod tests { #[tokio::test] async fn test_flashblocks_receiver_service() -> eyre::Result<()> { - let addr = "127.0.0.1:8080".parse::().unwrap(); + let addr = "127.0.0.1:8080" + .parse::() + .expect("valid socket address"); let (term, send_msg, _, url) = start(addr).await?; let (tx, mut rx) = mpsc::channel(100); @@ -396,14 +400,14 @@ mod tests { send_msg .send(FlashblocksPayloadV1::default()) .await - .expect("Failed to send message"); + .expect("message sent to websocket server"); - let msg = rx.recv().await.expect("Failed to receive message"); + let msg = rx.recv().await.expect("message received from websocket"); assert_eq!(msg, FlashblocksPayloadV1::default()); // Drop the websocket server and start another one with the same address // The FlashblocksReceiverService should reconnect to the new server - term.send(true).unwrap(); + term.send(true).expect("termination signal sent"); // sleep for 1 second to ensure the server is dropped tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -413,11 +417,11 @@ mod tests { send_msg .send(FlashblocksPayloadV1::default()) .await - .expect("Failed to send message"); + .expect("message sent to websocket server"); - let msg = rx.recv().await.expect("Failed to receive message"); + let msg = rx.recv().await.expect("message received from websocket"); assert_eq!(msg, FlashblocksPayloadV1::default()); - term.send(true).unwrap(); + term.send(true).expect("termination signal sent"); Ok(()) } @@ -427,7 +431,9 @@ mod tests { // test that if the builder is not sending any messages back, the service will send // ping messages to test the connection periodically - let addr = "127.0.0.1:8081".parse::().unwrap(); + let addr = "127.0.0.1:8081" + .parse::() + .expect("valid socket address"); let send_pongs = Arc::new(AtomicBool::new(true)); let (term, mut ping_rx, url) = start_ping_server(addr, send_pongs.clone()).await?; let config = FlashblocksWebsocketConfig { @@ -445,7 +451,7 @@ mod tests { // even if we do not send any messages, we should receive pings to keep the connection alive for _ in 0..5 { - ping_rx.recv().await.expect("Failed to receive ping"); + ping_rx.recv().await.expect("ping received"); } // Check that server hasn't reconnected because we have answered to pongs let reconnected = term.has_changed().expect("channel not closed"); From 7586241bb08526bdfbde8fd3e454c50e23b4d55e Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Mon, 3 Nov 2025 21:25:40 +0400 Subject: [PATCH 12/13] Rebase --- crates/rollup-boost/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index 9749f66f..71a5a6ec 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -96,7 +96,7 @@ impl RollupBoostServer { builder_client.clone(), inbound_url, outbound_addr, - flashblocks_args.flashblock_builder_ws_reconnect_ms, + flashblocks_args.flashblocks_ws_config, )?); Ok(RollupBoostServer::new( From ab8f32d257d319651cb2bf1701e05a88b92d36e3 Mon Sep 17 00:00:00 2001 From: Solar Mithril Date: Mon, 3 Nov 2025 23:02:44 +0400 Subject: [PATCH 13/13] Bump miltiplicator Cover cornercase of rapid reconnections --- crates/rollup-boost/src/flashblocks/args.rs | 1 + crates/rollup-boost/src/flashblocks/inbound.rs | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/rollup-boost/src/flashblocks/args.rs b/crates/rollup-boost/src/flashblocks/args.rs index 38a62cf1..47b7ee1e 100644 --- a/crates/rollup-boost/src/flashblocks/args.rs +++ b/crates/rollup-boost/src/flashblocks/args.rs @@ -53,6 +53,7 @@ impl FlashblocksWebsocketConfig { .with_max_interval(self.max_interval()) .with_randomization_factor(0 as f64) .with_max_elapsed_time(None) + .with_multiplier(2.0) .build() } diff --git a/crates/rollup-boost/src/flashblocks/inbound.rs b/crates/rollup-boost/src/flashblocks/inbound.rs index e79e2cad..4737c66c 100644 --- a/crates/rollup-boost/src/flashblocks/inbound.rs +++ b/crates/rollup-boost/src/flashblocks/inbound.rs @@ -95,9 +95,6 @@ impl FlashblocksReceiverService { let (ws_stream, _) = connect_async(self.url.as_str()).await?; let (mut write, mut read) = ws_stream.split(); - // if we have successfully connected - reset backoff - backoff.reset(); - info!("Connected to Flashblocks receiver at {}", self.url); self.metrics.connection_status.set(1); @@ -202,6 +199,8 @@ impl FlashblocksReceiverService { } }); + let connection_start = std::time::Instant::now(); + let result = tokio::select! { result = message_handle => { result.map_err(|e| FlashblocksReceiverError::TaskPanic(e.to_string()))? @@ -212,6 +211,12 @@ impl FlashblocksReceiverService { }; cancel_token.cancel(); + + // Only reset backoff if connection was stable for the max_interval set + // This prevents rapid reconnection loops when a proxy accepts and immediately drops connections + if connection_start.elapsed() >= backoff.max_interval { + backoff.reset(); + } result } }