Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions crates/rbuilder/src/live_builder/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use crate::{
builders::{BacktestSimulateBlockInput, Block},
PartialBlockExecutionTracer,
},
live_builder::process_killer::{ProcessKiller, MAX_WAIT_TIME},
live_builder::{
process_killer::{ProcessKiller, MAX_WAIT_TIME},
watchdog::spawn_watchdog_thread,
},
provider::StateProviderFactory,
telemetry,
utils::{bls::generate_random_bls_address, build_info::Version},
Expand Down Expand Up @@ -114,6 +117,9 @@ where

let config: ConfigType = load_toml_config(cli.config)?;
config.base_config().setup_tracing_subscriber()?;
let cancel = CancellationToken::new();
let start_slot_watchdog_sender =
create_start_slot_watchdog(config.base_config(), cancel.clone())?;

let ready_to_build = Arc::new(AtomicBool::new(false));
// Spawn redacted server that is safe for tdx builders to expose
Expand All @@ -131,10 +137,26 @@ where
.await?;
let res = if config.base_config().ipc_provider.is_some() {
let provider = config.base_config().create_ipc_provider_factory()?;
run_builder(provider, config, on_run, ready_to_build).await
run_builder(
provider,
config,
on_run,
ready_to_build,
cancel,
start_slot_watchdog_sender,
)
.await
} else {
let provider = config.base_config().create_reth_provider_factory(false)?;
run_builder(provider, config, on_run, ready_to_build).await
run_builder(
provider,
config,
on_run,
ready_to_build,
cancel,
start_slot_watchdog_sender,
)
.await
};
res
}
Expand All @@ -144,12 +166,14 @@ async fn run_builder<P, ConfigType>(
config: ConfigType,
on_run: Option<fn()>,
ready_to_build: Arc<AtomicBool>,
cancel: CancellationToken,
// If Some, we should send a message for every slot we start building.
start_slot_watchdog_sender: Option<flume::Sender<()>>,
) -> eyre::Result<()>
where
ConfigType: LiveBuilderConfig,
P: StateProviderFactory + Clone + 'static,
{
let cancel = CancellationToken::new();
let builder = config.new_builder(provider, cancel.clone()).await?;

let terminate = async {
Expand All @@ -173,7 +197,9 @@ where
if let Some(on_run) = on_run {
on_run();
}
builder.run(ready_to_build).await?;
builder
.run(ready_to_build, start_slot_watchdog_sender)
.await?;
info!(
wait_time_secs = MAX_WAIT_TIME.as_secs(),
"Main thread waiting to die..."
Expand All @@ -182,3 +208,18 @@ where
info!("Main thread exiting");
Ok(())
}

/// If it's configured, creates a watchdog thread and Sender to where we MUST send a message for every slot we start building.
pub fn create_start_slot_watchdog(
config: &BaseConfig,
cancel: CancellationToken,
) -> std::io::Result<Option<flume::Sender<()>>> {
match config.watchdog_timeout() {
Some(duration) => Ok(Some(spawn_watchdog_thread(
duration,
"block build started".to_string(),
ProcessKiller::new(cancel.clone()),
)?)),
None => Ok(None),
}
}
29 changes: 13 additions & 16 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
order_input::{start_orderpool_jobs, OrderInputConfig},
process_killer::ProcessKiller,
simulation::OrderSimulationPool,
watchdog::spawn_watchdog_thread,
},
provider::StateProviderFactory,
telemetry::{inc_active_slots, mark_building_started},
Expand Down Expand Up @@ -150,11 +149,19 @@ where
Self { builders, ..self }
}

pub async fn run(self, ready_to_build: Arc<AtomicBool>) -> eyre::Result<()> {
pub async fn run(
self,
ready_to_build: Arc<AtomicBool>, // If Some, we should send a message for every slot we start building.
start_slot_watchdog_sender: Option<flume::Sender<()>>,
) -> eyre::Result<()> {
let global_cancellation = self.global_cancellation.clone();
let mut inner_jobs_handles = Vec::new();
let res = self
.run_no_cleanup(ready_to_build, &mut inner_jobs_handles)
.run_no_cleanup(
ready_to_build,
&mut inner_jobs_handles,
start_slot_watchdog_sender,
)
.await;
info!("Builder shutting down");
global_cancellation.cancel();
Expand All @@ -172,6 +179,8 @@ where
self,
ready_to_build: Arc<AtomicBool>,
inner_jobs_handles: &mut Vec<JoinHandle<()>>,
// If Some, we should send a message for every slot we start building.
start_slot_watchdog_sender: Option<flume::Sender<()>>,
) -> eyre::Result<()> {
info!(
"Builder initial block list size: {}",
Expand Down Expand Up @@ -226,18 +235,6 @@ where
self.order_flow_tracer_manager,
);

let watchdog_sender = match self.watchdog_timeout {
Some(duration) => Some(spawn_watchdog_thread(
duration,
"block build started".to_string(),
self.process_killer.clone(),
)?),
None => {
info!("Watchdog not enabled");
None
}
};

ready_to_build.store(true, Ordering::Relaxed);
while let Some(payload) = payload_events_channel.recv().await {
let blocklist = self.blocklist_provider.get_blocklist()?;
Expand Down Expand Up @@ -350,7 +347,7 @@ where
self.global_cancellation.clone(),
time_until_slot_end.try_into().unwrap_or_default(),
);
if let Some(watchdog_sender) = watchdog_sender.as_ref() {
if let Some(watchdog_sender) = start_slot_watchdog_sender.as_ref() {
watchdog_sender.try_send(()).unwrap_or_default();
};
}
Expand Down
1 change: 1 addition & 0 deletions crates/reth-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-cli-util.workspace = true
alloy-rlp.workspace = true

tokio.workspace = true
tokio-util.workspace = true
clap.workspace = true
eyre.workspace = true
tracing.workspace = true
Expand Down
15 changes: 12 additions & 3 deletions crates/reth-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

use clap::{Args, Parser};
use rbuilder::{
live_builder::{cli::LiveBuilderConfig, config::Config},
live_builder::{
cli::{create_start_slot_watchdog, LiveBuilderConfig},
config::Config,
},
provider::reth_prov::StateProviderFactoryFromRethProvider,
telemetry,
};
Expand All @@ -29,6 +32,7 @@ use std::{
sync::{atomic::AtomicBool, Arc},
};
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::error;

// Prefer jemalloc for performance reasons.
Expand Down Expand Up @@ -94,6 +98,9 @@ fn spawn_rbuilder<P>(
let _handle = task::spawn(async move {
let result = async {
let config: Config = load_toml_config(config_path)?;
let cancel = CancellationToken::new();
let start_slot_watchdog_sender =
create_start_slot_watchdog(config.base_config(), cancel.clone())?;

let ready_to_build = Arc::new(AtomicBool::new(false));
// Spawn redacted server that is safe for tdx builders to expose
Expand All @@ -116,11 +123,13 @@ fn spawn_rbuilder<P>(
provider,
config.base_config().live_root_hash_config()?,
),
Default::default(),
cancel,
)
.await?;
builder.connect_to_transaction_pool(pool).await?;
builder.run(ready_to_build).await?;
builder
.run(ready_to_build, start_slot_watchdog_sender)
.await?;

Ok::<(), eyre::Error>(())
}
Expand Down
Loading