From 914ff0564589a2bf4a208321b133618b7064b515 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Tue, 18 Nov 2025 14:50:31 -0300 Subject: [PATCH] watchdog initialization as early as possible --- crates/rbuilder/src/live_builder/cli.rs | 51 ++++++++++++++++++++++--- crates/rbuilder/src/live_builder/mod.rs | 29 +++++++------- crates/reth-rbuilder/Cargo.toml | 1 + crates/reth-rbuilder/src/main.rs | 15 ++++++-- 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs index a9364c3f3..2b78ff78c 100644 --- a/crates/rbuilder/src/live_builder/cli.rs +++ b/crates/rbuilder/src/live_builder/cli.rs @@ -16,7 +16,10 @@ use crate::{ builders::{BacktestSimulateBlockInput, Block}, PartialBlockExecutionTracer, }, - live_builder::process_killer::{ProcessKiller, MAX_WAIT_TIME}, + live_builder::{ + process_killer::{ProcessKiller, MAX_WAIT_TIME}, + watchdog::spawn_watchdog_thread, + }, provider::StateProviderFactory, telemetry, utils::{bls::generate_random_bls_address, build_info::Version}, @@ -114,6 +117,9 @@ where let config: ConfigType = load_toml_config(cli.config)?; config.base_config().setup_tracing_subscriber()?; + let cancel = CancellationToken::new(); + let start_slot_watchdog_sender = + create_start_slot_watchdog(config.base_config(), cancel.clone())?; let ready_to_build = Arc::new(AtomicBool::new(false)); // Spawn redacted server that is safe for tdx builders to expose @@ -131,10 +137,26 @@ where .await?; let res = if config.base_config().ipc_provider.is_some() { let provider = config.base_config().create_ipc_provider_factory()?; - run_builder(provider, config, on_run, ready_to_build).await + run_builder( + provider, + config, + on_run, + ready_to_build, + cancel, + start_slot_watchdog_sender, + ) + .await } else { let provider = config.base_config().create_reth_provider_factory(false)?; - run_builder(provider, config, on_run, ready_to_build).await + run_builder( + provider, + config, + on_run, + ready_to_build, + cancel, + start_slot_watchdog_sender, + ) + .await }; res } @@ -144,12 +166,14 @@ async fn run_builder( config: ConfigType, on_run: Option, ready_to_build: Arc, + cancel: CancellationToken, + // If Some, we should send a message for every slot we start building. + start_slot_watchdog_sender: Option>, ) -> eyre::Result<()> where ConfigType: LiveBuilderConfig, P: StateProviderFactory + Clone + 'static, { - let cancel = CancellationToken::new(); let builder = config.new_builder(provider, cancel.clone()).await?; let terminate = async { @@ -173,7 +197,9 @@ where if let Some(on_run) = on_run { on_run(); } - builder.run(ready_to_build).await?; + builder + .run(ready_to_build, start_slot_watchdog_sender) + .await?; info!( wait_time_secs = MAX_WAIT_TIME.as_secs(), "Main thread waiting to die..." @@ -182,3 +208,18 @@ where info!("Main thread exiting"); Ok(()) } + +/// If it's configured, creates a watchdog thread and Sender to where we MUST send a message for every slot we start building. +pub fn create_start_slot_watchdog( + config: &BaseConfig, + cancel: CancellationToken, +) -> std::io::Result>> { + match config.watchdog_timeout() { + Some(duration) => Ok(Some(spawn_watchdog_thread( + duration, + "block build started".to_string(), + ProcessKiller::new(cancel.clone()), + )?)), + None => Ok(None), + } +} diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index dad3d0d21..48555cafc 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -19,7 +19,6 @@ use crate::{ order_input::{start_orderpool_jobs, OrderInputConfig}, process_killer::ProcessKiller, simulation::OrderSimulationPool, - watchdog::spawn_watchdog_thread, }, provider::StateProviderFactory, telemetry::{inc_active_slots, mark_building_started}, @@ -150,11 +149,19 @@ where Self { builders, ..self } } - pub async fn run(self, ready_to_build: Arc) -> eyre::Result<()> { + pub async fn run( + self, + ready_to_build: Arc, // If Some, we should send a message for every slot we start building. + start_slot_watchdog_sender: Option>, + ) -> eyre::Result<()> { let global_cancellation = self.global_cancellation.clone(); let mut inner_jobs_handles = Vec::new(); let res = self - .run_no_cleanup(ready_to_build, &mut inner_jobs_handles) + .run_no_cleanup( + ready_to_build, + &mut inner_jobs_handles, + start_slot_watchdog_sender, + ) .await; info!("Builder shutting down"); global_cancellation.cancel(); @@ -172,6 +179,8 @@ where self, ready_to_build: Arc, inner_jobs_handles: &mut Vec>, + // If Some, we should send a message for every slot we start building. + start_slot_watchdog_sender: Option>, ) -> eyre::Result<()> { info!( "Builder initial block list size: {}", @@ -226,18 +235,6 @@ where self.order_flow_tracer_manager, ); - let watchdog_sender = match self.watchdog_timeout { - Some(duration) => Some(spawn_watchdog_thread( - duration, - "block build started".to_string(), - self.process_killer.clone(), - )?), - None => { - info!("Watchdog not enabled"); - None - } - }; - ready_to_build.store(true, Ordering::Relaxed); while let Some(payload) = payload_events_channel.recv().await { let blocklist = self.blocklist_provider.get_blocklist()?; @@ -350,7 +347,7 @@ where self.global_cancellation.clone(), time_until_slot_end.try_into().unwrap_or_default(), ); - if let Some(watchdog_sender) = watchdog_sender.as_ref() { + if let Some(watchdog_sender) = start_slot_watchdog_sender.as_ref() { watchdog_sender.try_send(()).unwrap_or_default(); }; } diff --git a/crates/reth-rbuilder/Cargo.toml b/crates/reth-rbuilder/Cargo.toml index 3723f1606..17575b860 100644 --- a/crates/reth-rbuilder/Cargo.toml +++ b/crates/reth-rbuilder/Cargo.toml @@ -19,6 +19,7 @@ reth-cli-util.workspace = true alloy-rlp.workspace = true tokio.workspace = true +tokio-util.workspace = true clap.workspace = true eyre.workspace = true tracing.workspace = true diff --git a/crates/reth-rbuilder/src/main.rs b/crates/reth-rbuilder/src/main.rs index d2ff0d6ad..cbc66826e 100644 --- a/crates/reth-rbuilder/src/main.rs +++ b/crates/reth-rbuilder/src/main.rs @@ -7,7 +7,10 @@ use clap::{Args, Parser}; use rbuilder::{ - live_builder::{cli::LiveBuilderConfig, config::Config}, + live_builder::{ + cli::{create_start_slot_watchdog, LiveBuilderConfig}, + config::Config, + }, provider::reth_prov::StateProviderFactoryFromRethProvider, telemetry, }; @@ -29,6 +32,7 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; use tokio::task; +use tokio_util::sync::CancellationToken; use tracing::error; // Prefer jemalloc for performance reasons. @@ -94,6 +98,9 @@ fn spawn_rbuilder

( let _handle = task::spawn(async move { let result = async { let config: Config = load_toml_config(config_path)?; + let cancel = CancellationToken::new(); + let start_slot_watchdog_sender = + create_start_slot_watchdog(config.base_config(), cancel.clone())?; let ready_to_build = Arc::new(AtomicBool::new(false)); // Spawn redacted server that is safe for tdx builders to expose @@ -116,11 +123,13 @@ fn spawn_rbuilder

( provider, config.base_config().live_root_hash_config()?, ), - Default::default(), + cancel, ) .await?; builder.connect_to_transaction_pool(pool).await?; - builder.run(ready_to_build).await?; + builder + .run(ready_to_build, start_slot_watchdog_sender) + .await?; Ok::<(), eyre::Error>(()) }