Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions crates/rbuilder-operator/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use rbuilder::{
relay_submit::{AlwaysSubmitPolicy, RelaySubmissionPolicy},
},
payload_events::MevBoostSlotData,
process_killer::RUN_SUBMIT_TO_RELAYS_JOB_CANCEL_TIME,
},
};
use rbuilder_primitives::{Order, OrderId};
Expand All @@ -28,6 +27,7 @@ use rbuilder_utils::clickhouse::{
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::error;

Expand Down Expand Up @@ -164,13 +164,20 @@ impl RelaySubmissionPolicy for BackupNotTooBigRelaySubmissionPolicy {
}

impl BuiltBlocksWriter {
/// Returns the writer and the submission policy what asks to stop submitting blocks if the disk backup is too big.
/// It's a little ugly/coupled that we generate this here but it was easier than injecting a metric observer.
/// Returns the writer, the submission policy, and a JoinHandle for the clickhouse tasks.
/// The JoinHandle can be awaited to ensure clickhouse tasks have completed during shutdown.
/// It's a little ugly/coupled that we generate the submission policy here but it was easier than injecting a metric observer.
/// abort_token is a last resort cancellation token used to cancel clickhouse tasks if the BuiltBlocksWriter is not
/// released. It will tell clickhouse to stop listening for new blocks and flush the backup process.
pub fn new(
config: BuiltBlocksClickhouseConfig,
rbuilder_commit: String,
cancellation_token: CancellationToken,
) -> eyre::Result<(Self, Box<dyn RelaySubmissionPolicy + Send + Sync>)> {
abort_token: CancellationToken,
) -> eyre::Result<(
Self,
Box<dyn RelaySubmissionPolicy + Send + Sync>,
JoinHandle<()>,
)> {
let backup_max_size_bytes =
config.disk_max_size_mb.unwrap_or(DEFAULT_MAX_DISK_SIZE_MB) * MEGA;
let submission_policy: Box<dyn RelaySubmissionPolicy + Send + Sync> =
Expand Down Expand Up @@ -222,35 +229,37 @@ impl BuiltBlocksWriter {
let end_timeout =
Duration::from_millis(config.end_timeout_ms.unwrap_or(DEFAULT_END_TIMEOUT_MS));

spawn_clickhouse_inserter_and_backup::<BlockRow, BlockRow, ClickhouseMetrics>(
&client,
block_rx,
&task_executor,
BLOCKS_TABLE_NAME.to_string(),
"".to_string(), // No buildername used in blocks table.
disk_backup,
config
.memory_max_size_mb
.unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB)
* MEGA,
send_timeout,
end_timeout,
BLOCKS_TABLE_NAME,
);
// Task to forward the cancellation to the task_manager.
let clickhouse_shutdown_handle =
spawn_clickhouse_inserter_and_backup::<BlockRow, BlockRow, ClickhouseMetrics>(
&client,
block_rx,
&task_executor,
BLOCKS_TABLE_NAME.to_string(),
"".to_string(), // No buildername used in blocks table.
disk_backup,
config
.memory_max_size_mb
.unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB)
* MEGA,
send_timeout,
end_timeout,
BLOCKS_TABLE_NAME,
);

// Task to forward the abort to the task_manager.
tokio::spawn(async move {
cancellation_token.cancelled().await;
// @Pending: Needed to avoid losing blocks but we should try to avoid this.
tokio::time::sleep(RUN_SUBMIT_TO_RELAYS_JOB_CANCEL_TIME).await;
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
abort_token.cancelled().await;
task_manager.graceful_shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good design pattern: This task forwards the abort signal to the task_manager, which is cleaner than the previous approach that used a hardcoded sleep. However, ensure that task_manager.graceful_shutdown() properly signals all child tasks to complete their cleanup work before the clickhouse_shutdown_handle completes.

});

Ok((
Self {
blocks_tx: block_tx,
rbuilder_commit,
builder_name: config.builder_name,
},
submission_policy,
clickhouse_shutdown_handle,
))
}
}
Expand Down
75 changes: 52 additions & 23 deletions crates/rbuilder-operator/src/flashbots_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{

use clickhouse::Client;
use std::{path::PathBuf, sync::Arc};
use tokio::task::JoinHandle;

#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
pub struct ClickhouseConfig {
Expand Down Expand Up @@ -155,6 +156,7 @@ impl LiveBuilderConfig for FlashbotsConfig {
where
P: StateProviderFactory + Clone + 'static,
{
let abort_token = CancellationToken::new();
if self.l1_config.relay_bid_scrapers.is_empty() {
eyre::bail!("relay_bid_scrapers is not set");
}
Expand All @@ -171,34 +173,45 @@ impl LiveBuilderConfig for FlashbotsConfig {
)
.await?;

let (bid_observer, submission_policy) = self
.create_bid_observer_and_submission_policy(&cancellation_token)
let (bid_observer, submission_policy, clickhouse_shutdown_handle) = self
.create_bid_observer_and_submission_policy(&cancellation_token, &abort_token)
.await?;

let (sink_factory, slot_info_provider, adjustment_fee_payers) =
create_sink_factory_and_relays(
&self.base_config,
&self.l1_config,
bidding_service.relay_sets().to_vec(),
wallet_balance_watcher,
bid_observer,
submission_policy,
bidding_service.clone(),
cancellation_token.clone(),
)
.await?;
let (
sink_factory,
slot_info_provider,
adjustment_fee_payers,
optimistic_v3_server_join_handle,
) = create_sink_factory_and_relays(
&self.base_config,
&self.l1_config,
bidding_service.relay_sets().to_vec(),
wallet_balance_watcher,
bid_observer,
submission_policy,
bidding_service.clone(),
cancellation_token.clone(),
)
.await?;

let live_builder = create_builder_from_sink(
let mut live_builder = create_builder_from_sink(
&self.base_config,
&self.l1_config,
provider,
sink_factory,
slot_info_provider,
adjustment_fee_payers,
cancellation_token,
abort_token,
)
.await?;

if let Some(handle) = clickhouse_shutdown_handle {
live_builder.add_critical_task(handle);
}
if let Some(optimistic_v3_server_join_handle) = optimistic_v3_server_join_handle {
live_builder.add_critical_task(optimistic_v3_server_join_handle);
}
let mut module = RpcModule::new(());
module.register_async_method("bid_subsidiseBlock", move |params, _| {
handle_subsidise_block(bidding_service.clone(), params)
Expand Down Expand Up @@ -318,28 +331,35 @@ impl FlashbotsConfig {
/// Depending on the cfg may create:
/// - Dummy sink (no built_blocks_clickhouse_config)
/// - BuiltBlocksWriter that writes to clickhouse
///
/// Returns (BidObserver, RelaySubmissionPolicy, Option<JoinHandle> for clickhouse shutdown)
#[allow(clippy::type_complexity)]
fn create_clickhouse_writer_and_submission_policy(
&self,
cancellation_token: &CancellationToken,
clickhouse_abort_token: &CancellationToken,
block_processor_key: Option<PrivateKeySigner>,
) -> eyre::Result<(
Option<Box<dyn BidObserver + Send + Sync>>,
Box<dyn RelaySubmissionPolicy + Send + Sync>,
Option<JoinHandle<()>>,
)> {
if let Some(built_blocks_clickhouse_config) = &self.built_blocks_clickhouse_config {
let rbuilder_version = rbuilder_version();
let (writer, submission_policy) = BuiltBlocksWriter::new(
let (writer, submission_policy, shutdown_handle) = BuiltBlocksWriter::new(
built_blocks_clickhouse_config.clone(),
rbuilder_version.git_commit,
cancellation_token.clone(),
clickhouse_abort_token.clone(),
)?;
Ok((Some(Box::new(writer)), submission_policy))
Ok((
Some(Box::new(writer)),
submission_policy,
Some(shutdown_handle),
))
} else {
if block_processor_key.is_some() {
return Self::bail_blocks_processor_url_not_set();
}
Ok((None, Box::new(AlwaysSubmitPolicy {})))
Ok((None, Box::new(AlwaysSubmitPolicy {}), None))
}
}

Expand All @@ -348,12 +368,17 @@ impl FlashbotsConfig {
}

/// Depending on the cfg add a BlocksProcessorClientBidObserver and/or a true value pusher.
/// Returns (BidObserver, RelaySubmissionPolicy, Option<JoinHandle> for clickhouse shutdown)
/// cancellation_token: used to cancel tbv_pusher
/// clickhouse_abort_token: used to cancel clickhouse tasks if source is hanged.
async fn create_bid_observer_and_submission_policy(
&self,
cancellation_token: &CancellationToken,
clickhouse_abort_token: &CancellationToken,
) -> eyre::Result<(
Box<dyn BidObserver + Send + Sync>,
Box<dyn RelaySubmissionPolicy + Send + Sync>,
Option<JoinHandle<()>>,
)> {
let block_processor_key = if let Some(key_registration_url) = &self.key_registration_url {
if self.blocks_processor_url.is_none() {
Expand All @@ -364,16 +389,20 @@ impl FlashbotsConfig {
None
};

let (clickhouse_writer, submission_policy) = self
let (clickhouse_writer, submission_policy, clickhouse_shutdown_handle) = self
.create_clickhouse_writer_and_submission_policy(
cancellation_token,
clickhouse_abort_token,
block_processor_key.clone(),
)?;
let bid_observer = RbuilderOperatorBidObserver {
clickhouse_writer,
tbv_pusher: self.create_tbv_pusher(block_processor_key, cancellation_token)?,
};
Ok((Box::new(bid_observer), submission_policy))
Ok((
Box::new(bid_observer),
submission_policy,
clickhouse_shutdown_handle,
))
}

fn create_tbv_pusher(
Expand Down
12 changes: 10 additions & 2 deletions crates/rbuilder-utils/src/clickhouse/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use derive_more::{Deref, DerefMut};
use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata};
use strum::AsRefStr;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::{
backoff::BackoffInterval,
Expand Down Expand Up @@ -761,7 +762,14 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
}

/// Spawns the inserter runner on the given task executor.
pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str)
/// Returns a JoinHandle that resolves when the task completes.
/// On shutdown will stop processing new data flush the backup. New data might be lost.
pub fn spawn(
mut self,
task_executor: &TaskExecutor,
name: String,
target: &'static str,
) -> JoinHandle<()>
where
MetricsType: Send + Sync + 'static,
for<'a> <T as clickhouse::Row>::Value<'a>: Sync,
Expand All @@ -784,7 +792,7 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
"Clickhouse backup cleanup complete"
);
drop(shutdown_guard);
});
})
}
}

Expand Down
13 changes: 10 additions & 3 deletions crates/rbuilder-utils/src/clickhouse/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use clickhouse::{
};
use reth_tasks::TaskExecutor;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::{
clickhouse::{
Expand Down Expand Up @@ -215,7 +216,14 @@ impl<T: ClickhouseIndexableData, MetricsType: Metrics> InserterRunner<T, Metrics
}

/// Spawns the inserter runner on the given task executor.
pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str)
/// Returns a JoinHandle that resolves when the task completes.
/// On shutdown will stop processing new data flush the inserter. New data might be lost.
pub fn spawn(
mut self,
task_executor: &TaskExecutor,
name: String,
target: &'static str,
) -> JoinHandle<()>
where
T: Send + Sync + 'static,
MetricsType: Send + Sync + 'static,
Expand All @@ -242,8 +250,7 @@ impl<T: ClickhouseIndexableData, MetricsType: Metrics> InserterRunner<T, Metrics
}
}
drop(shutdown_guard);

});
})
}
}

Expand Down
16 changes: 13 additions & 3 deletions crates/rbuilder-utils/src/clickhouse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ::serde::{Deserialize, Serialize};
use clickhouse::Client;
use reth_tasks::TaskExecutor;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::clickhouse::{
backup::{
Expand Down Expand Up @@ -59,6 +60,7 @@ impl From<Quantities> for clickhouse::inserter::Quantities {
const BACKUP_INPUT_CHANNEL_BUFFER_SIZE: usize = 128;

/// Main func to spawn the clickhouse inserter and backup tasks.
/// Returns a JoinHandle that resolves when both tasks have completed.
#[allow(clippy::too_many_arguments)]
pub fn spawn_clickhouse_inserter_and_backup<
DataType: ClickhouseIndexableData + Send + Sync + 'static,
Expand All @@ -75,7 +77,8 @@ pub fn spawn_clickhouse_inserter_and_backup<
send_timeout: Duration,
end_timeout: Duration,
tracing_target: &'static str,
) where
) -> JoinHandle<()>
where
for<'a> <DataType::ClickhouseRowType as clickhouse::Row>::Value<'a>: Sync,
{
let backup_table_name = RowType::TABLE_NAME.to_string();
Expand All @@ -93,6 +96,13 @@ pub fn spawn_clickhouse_inserter_and_backup<
disk_backup_db,
)
.with_memory_backup_config(MemoryBackupConfig::new(memory_max_size_bytes));
inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target);
backup.spawn(task_executor, backup_table_name, tracing_target);

let inserter_handle =
inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target);
let backup_handle = backup.spawn(task_executor, backup_table_name, tracing_target);

// Spawn a wrapper task that waits for both to complete
tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good pattern: Wrapping both handles in a single task that waits for both to complete is a clean solution. This ensures the returned JoinHandle only completes when both the inserter and backup tasks have finished their cleanup.

let _ = tokio::join!(inserter_handle, backup_handle);
})
}
3 changes: 3 additions & 0 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl BaseConfig {
pub async fn create_builder_with_provider_factory<P>(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
global_abort: tokio_util::sync::CancellationToken,
unfinished_built_blocks_input_factory: UnfinishedBuiltBlocksInputFactory<P>,
slot_source: MevBoostSlotDataGenerator,
provider: P,
Expand Down Expand Up @@ -259,6 +260,8 @@ impl BaseConfig {
blocklist_provider,

global_cancellation: cancellation_token.clone(),
global_abort,
critical_tasks_join_handles: Vec::new(),
process_killer: ProcessKiller::new(cancellation_token),
extra_rpc: RpcModule::new(()),
unfinished_built_blocks_input_factory,
Expand Down
Loading
Loading