diff --git a/crates/rbuilder/src/live_builder/cli.rs b/crates/rbuilder/src/live_builder/cli.rs
index a9364c3f3..2b78ff78c 100644
--- a/crates/rbuilder/src/live_builder/cli.rs
+++ b/crates/rbuilder/src/live_builder/cli.rs
@@ -16,7 +16,10 @@ use crate::{
builders::{BacktestSimulateBlockInput, Block},
PartialBlockExecutionTracer,
},
- live_builder::process_killer::{ProcessKiller, MAX_WAIT_TIME},
+ live_builder::{
+ process_killer::{ProcessKiller, MAX_WAIT_TIME},
+ watchdog::spawn_watchdog_thread,
+ },
provider::StateProviderFactory,
telemetry,
utils::{bls::generate_random_bls_address, build_info::Version},
@@ -114,6 +117,9 @@ where
let config: ConfigType = load_toml_config(cli.config)?;
config.base_config().setup_tracing_subscriber()?;
+ let cancel = CancellationToken::new();
+ let start_slot_watchdog_sender =
+ create_start_slot_watchdog(config.base_config(), cancel.clone())?;
let ready_to_build = Arc::new(AtomicBool::new(false));
// Spawn redacted server that is safe for tdx builders to expose
@@ -131,10 +137,26 @@ where
.await?;
let res = if config.base_config().ipc_provider.is_some() {
let provider = config.base_config().create_ipc_provider_factory()?;
- run_builder(provider, config, on_run, ready_to_build).await
+ run_builder(
+ provider,
+ config,
+ on_run,
+ ready_to_build,
+ cancel,
+ start_slot_watchdog_sender,
+ )
+ .await
} else {
let provider = config.base_config().create_reth_provider_factory(false)?;
- run_builder(provider, config, on_run, ready_to_build).await
+ run_builder(
+ provider,
+ config,
+ on_run,
+ ready_to_build,
+ cancel,
+ start_slot_watchdog_sender,
+ )
+ .await
};
res
}
@@ -144,12 +166,14 @@ async fn run_builder
(
config: ConfigType,
on_run: Option,
ready_to_build: Arc,
+ cancel: CancellationToken,
+ // If Some, we should send a message for every slot we start building.
+ start_slot_watchdog_sender: Option>,
) -> eyre::Result<()>
where
ConfigType: LiveBuilderConfig,
P: StateProviderFactory + Clone + 'static,
{
- let cancel = CancellationToken::new();
let builder = config.new_builder(provider, cancel.clone()).await?;
let terminate = async {
@@ -173,7 +197,9 @@ where
if let Some(on_run) = on_run {
on_run();
}
- builder.run(ready_to_build).await?;
+ builder
+ .run(ready_to_build, start_slot_watchdog_sender)
+ .await?;
info!(
wait_time_secs = MAX_WAIT_TIME.as_secs(),
"Main thread waiting to die..."
@@ -182,3 +208,18 @@ where
info!("Main thread exiting");
Ok(())
}
+
+/// If it's configured, creates a watchdog thread and Sender to where we MUST send a message for every slot we start building.
+pub fn create_start_slot_watchdog(
+ config: &BaseConfig,
+ cancel: CancellationToken,
+) -> std::io::Result