Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/rbuilder-operator/src/flashbots_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ impl LiveBuilderConfig for FlashbotsConfig {
handle_subsidise_block(bidding_service.clone(), params)
})?;
let live_builder = live_builder.with_extra_rpc(module);
let builders = create_builders(self.live_builders()?);
let builders = create_builders(
self.live_builders()?,
self.base_config.max_order_execution_duration_warning(),
);
Ok(live_builder.with_builders(builders))
}

Expand Down
1 change: 1 addition & 0 deletions crates/rbuilder/src/bin/run-bundle-on-prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl LandedBlockInfo {
false,
order_statistics,
CancellationToken::new(),
None,
)?)
}
}
Expand Down
70 changes: 67 additions & 3 deletions crates/rbuilder/src/building/builders/block_building_helper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy_primitives::{utils::format_ether, U256};
use alloy_primitives::{utils::format_ether, Address, TxHash, U256};
use reth_provider::StateProvider;
use std::{
cmp::max,
Expand All @@ -7,7 +7,7 @@ use std::{
};
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace};
use tracing::{debug, error, trace, warn};

use crate::{
building::{
Expand All @@ -21,7 +21,10 @@ use crate::{
telemetry::{self, add_block_fill_time, add_order_simulation_time},
utils::{check_block_hash_reader_health, elapsed_ms, HistoricalBlockError},
};
use rbuilder_primitives::{order_statistics::OrderStatistics, SimValue, SimulatedOrder};
use rbuilder_primitives::{
order_statistics::OrderStatistics, SimValue, SimulatedOrder,
TransactionSignedEcRecoveredWithBlobs,
};

use super::Block;

Expand Down Expand Up @@ -157,6 +160,10 @@ pub struct BlockBuildingHelperFromProvider<
cancel_on_fatal_error: CancellationToken,

finalize_adjustment_state: Option<FinalizeAdjustmentState>,

/// If an order execution duration (commit_order) is greater than this, we will log a warning with some info about the order.
/// This probably should not be implemented here and should be a wrapper but this is simpler.
max_order_execution_duration_warning: Option<Duration>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -209,6 +216,7 @@ impl BlockBuildingHelperFromProvider<NullPartialBlockExecutionTracer> {
discard_txs: bool,
available_orders_statistics: OrderStatistics,
cancel_on_fatal_error: CancellationToken,
max_order_execution_duration_warning: Option<Duration>,
) -> Result<Self, BlockBuildingHelperError> {
BlockBuildingHelperFromProvider::new_with_execution_tracer(
built_block_id,
Expand All @@ -220,6 +228,7 @@ impl BlockBuildingHelperFromProvider<NullPartialBlockExecutionTracer> {
available_orders_statistics,
cancel_on_fatal_error,
NullPartialBlockExecutionTracer {},
max_order_execution_duration_warning,
)
}
}
Expand All @@ -244,6 +253,7 @@ impl<
available_orders_statistics: OrderStatistics,
cancel_on_fatal_error: CancellationToken,
partial_block_execution_tracer: PartialBlockExecutionTracerType,
max_order_execution_duration_warning: Option<Duration>,
) -> Result<Self, BlockBuildingHelperError> {
let last_committed_block = building_ctx.block() - 1;
check_block_hash_reader_health(last_committed_block, &state_provider)?;
Expand Down Expand Up @@ -280,6 +290,7 @@ impl<
built_block_trace,
cancel_on_fatal_error,
finalize_adjustment_state: None,
max_order_execution_duration_warning,
})
}

Expand Down Expand Up @@ -464,6 +475,52 @@ impl<
};
Ok(FinalizeBlockResult { block })
}

fn trace_slow_order_execution(
&self,
order: &SimulatedOrder,
sim_time: Duration,
result: &Result<Result<ExecutionResult, ExecutionError>, CriticalCommitOrderError>,
) {
#[derive(Debug)]
#[allow(dead_code)]
struct TxInfo {
pub hash: TxHash,
pub signer: Address,
pub to: Option<Address>,
}
impl From<&TransactionSignedEcRecoveredWithBlobs> for TxInfo {
fn from(tx: &TransactionSignedEcRecoveredWithBlobs) -> Self {
Self {
hash: tx.hash(),
signer: tx.signer(),
to: tx.to(),
}
}
}
impl TxInfo {
fn parse_order(order: &SimulatedOrder) -> Vec<Self> {
order
.order
.list_txs()
.iter()
.map(|(tx, _)| (*tx).into())
.collect::<Vec<_>>()
}
}
match result {
Ok(Ok(result)) => {
warn!(?sim_time,builder_name=self.builder_name,id = ?order.id(),tob_sim_value = ?order.sim_value,txs = ?TxInfo::parse_order(order),
space_used = ?result.space_used,coinbase_profit = ?result.coinbase_profit,inplace_sim = ?result.inplace_sim, "Slow order ok execution");
}
Ok(Err(err)) => {
warn!(?err,?sim_time,builder_name=self.builder_name,id = ?order.id(),tob_sim_value = ?order.sim_value,txs = ?TxInfo::parse_order(order), "Slow order failed execution.");
}
Err(err) => {
warn!(?err,?sim_time,builder_name=self.builder_name,id = ?order.id(),tob_sim_value = ?order.sim_value,txs = ?TxInfo::parse_order(order), "Slow order critical execution error.");
}
}
}
}

impl<
Expand All @@ -487,6 +544,13 @@ impl<
result_filter,
);
let sim_time = start.elapsed();
if self
.max_order_execution_duration_warning
.is_some_and(|max_dur| sim_time > max_dur)
{
self.trace_slow_order_execution(order, sim_time, &result);
}

let (result, sim_ok) = match result {
Ok(ok_result) => match ok_result {
Ok(res) => {
Expand Down
2 changes: 2 additions & 0 deletions crates/rbuilder/src/building/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::{
broadcast,
Expand Down Expand Up @@ -91,6 +92,7 @@ pub struct LiveBuilderInput<P> {
pub cancel: CancellationToken,
pub built_block_cache: Arc<BuiltBlockCache>,
pub built_block_id_source: Arc<BuiltBlockIdSource>,
pub max_order_execution_duration_warning: Option<Duration>,
}

/// Struct that helps reading new orders/cancellations
Expand Down
16 changes: 15 additions & 1 deletion crates/rbuilder/src/building/builders/ordering_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub fn run_ordering_builder<P, OrderPriorityType>(
input.builder_name,
input.ctx,
config.clone(),
input.max_order_execution_duration_warning,
input.built_block_cache,
);

Expand Down Expand Up @@ -185,6 +186,7 @@ where
input.builder_name,
input.ctx.clone(),
ordering_config,
None,
Arc::new(BuiltBlockCache::new()),
);
let mut block_builder = builder.build_block_with_execution_tracer(
Expand All @@ -208,6 +210,8 @@ pub struct OrderingBuilderContext {
builder_name: String,
ctx: BlockBuildingContext,
config: OrderingBuilderConfig,
/// See [BlockBuildingHelperFromProvider::max_order_execution_duration_warning]
max_order_execution_duration_warning: Option<Duration>,

// caches
local_ctx: ThreadBlockBuildingContext,
Expand All @@ -224,6 +228,7 @@ impl OrderingBuilderContext {
builder_name: String,
ctx: BlockBuildingContext,
config: OrderingBuilderConfig,
max_order_execution_duration_warning: Option<Duration>,
built_block_cache: Arc<BuiltBlockCache>,
) -> Self {
Self {
Expand All @@ -235,6 +240,7 @@ impl OrderingBuilderContext {
failed_orders: HashSet::default(),
order_attempts: HashMap::default(),
built_block_cache,
max_order_execution_duration_warning,
}
}

Expand Down Expand Up @@ -285,6 +291,7 @@ impl OrderingBuilderContext {
block_orders.orders_statistics(),
cancel_block,
partial_block_execution_tracer,
self.max_order_execution_duration_warning,
)?;
self.fill_orders(
&mut block_building_helper,
Expand Down Expand Up @@ -449,15 +456,21 @@ impl OrderingBuilderContext {
pub struct OrderingBuildingAlgorithm<OrderPriorityType> {
config: OrderingBuilderConfig,
name: String,
max_order_execution_duration_warning: Option<Duration>,
/// The ordering priority type used to sort simulated orders.
order_priority: PhantomData<OrderPriorityType>,
}

impl<OrderPriorityType> OrderingBuildingAlgorithm<OrderPriorityType> {
pub fn new(config: OrderingBuilderConfig, name: String) -> Self {
pub fn new(
config: OrderingBuilderConfig,
max_order_execution_duration_warning: Option<Duration>,
name: String,
) -> Self {
Self {
config,
name,
max_order_execution_duration_warning,
order_priority: PhantomData,
}
}
Expand All @@ -483,6 +496,7 @@ where
cancel: input.cancel,
built_block_cache: input.built_block_cache,
built_block_id_source: input.built_block_id_source,
max_order_execution_duration_warning: self.max_order_execution_duration_warning,
};
run_ordering_builder::<P, OrderPriorityType>(live_input, &self.config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use super::{
use ahash::HashMap;
use alloy_primitives::utils::format_ether;
use reth_provider::StateProvider;
use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, trace};
Expand Down Expand Up @@ -39,6 +42,7 @@ pub struct BlockBuildingResultAssembler {
run_id: u64,
last_version: Option<u64>,
built_block_id_source: Arc<BuiltBlockIdSource>,
max_order_execution_duration_warning: Option<Duration>,
}

impl BlockBuildingResultAssembler {
Expand All @@ -60,6 +64,7 @@ impl BlockBuildingResultAssembler {
builder_name: String,
sink: Option<UnfinishedBuiltBlocksInput>,
built_block_id_source: Arc<BuiltBlockIdSource>,
max_order_execution_duration_warning: Option<Duration>,
) -> Self {
Self {
state,
Expand All @@ -73,6 +78,7 @@ impl BlockBuildingResultAssembler {
run_id: 0,
last_version: None,
built_block_id_source,
max_order_execution_duration_warning,
}
}

Expand Down Expand Up @@ -189,6 +195,7 @@ impl BlockBuildingResultAssembler {
self.discard_txs,
OrderStatistics::default(),
self.cancellation_token.clone(),
self.max_order_execution_duration_warning,
)?;
block_building_helper.set_trace_orders_closed_at(orders_closed_at);

Expand Down Expand Up @@ -263,6 +270,7 @@ impl BlockBuildingResultAssembler {
self.discard_txs,
OrderStatistics::default(),
CancellationToken::new(),
self.max_order_execution_duration_warning,
)?;

block_building_helper.set_trace_orders_closed_at(orders_closed_at);
Expand Down
18 changes: 15 additions & 3 deletions crates/rbuilder/src/building/builders/parallel_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use simulation_cache::SharedSimulationCache;
use std::{
sync::{mpsc as std_mpsc, Arc},
thread,
time::Instant,
time::{Duration, Instant},
};
use task::*;
use time::OffsetDateTime;
Expand Down Expand Up @@ -134,6 +134,7 @@ where
input.builder_name.clone(),
Some(input.sink.clone()),
input.built_block_id_source.clone(),
input.max_order_execution_duration_warning,
);

let order_intake_consumer = OrderIntakeStore::new(input.input);
Expand Down Expand Up @@ -352,6 +353,7 @@ where
String::from("backtest_builder"),
None,
Arc::new(BuiltBlockIdSource::new()),
None,
);
let assembler_duration = assembler_start.elapsed();

Expand Down Expand Up @@ -394,12 +396,21 @@ where
#[derive(Debug)]
pub struct ParallelBuildingAlgorithm {
config: ParallelBuilderConfig,
max_order_execution_duration_warning: Option<Duration>,
name: String,
}

impl ParallelBuildingAlgorithm {
pub fn new(config: ParallelBuilderConfig, name: String) -> Self {
Self { config, name }
pub fn new(
config: ParallelBuilderConfig,
max_order_execution_duration_warning: Option<Duration>,
name: String,
) -> Self {
Self {
config,
max_order_execution_duration_warning,
name,
}
}
}

Expand All @@ -421,6 +432,7 @@ where
cancel: input.cancel,
built_block_cache: input.built_block_cache,
built_block_id_source: input.built_block_id_source,
max_order_execution_duration_warning: self.max_order_execution_duration_warning,
};
run_parallel_builder(live_input, &self.config);
}
Expand Down
8 changes: 8 additions & 0 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ pub struct BaseConfig {
/// List of `builders` to be used for live building
pub live_builders: Vec<String>,

/// See [BlockBuildingHelperFromProvider::max_order_execution_duration_warning]
pub max_order_execution_duration_warning_us: Option<u64>,

/// Config for IPC state provider
pub ipc_provider: Option<IpcProviderConfig>,

Expand Down Expand Up @@ -466,6 +469,10 @@ impl BaseConfig {

Ok(path_expanded.parse()?)
}
pub fn max_order_execution_duration_warning(&self) -> Option<Duration> {
self.max_order_execution_duration_warning_us
.map(Duration::from_micros)
}
}

pub const DEFAULT_CL_NODE_URL: &str = "http://127.0.0.1:3500";
Expand Down Expand Up @@ -530,6 +537,7 @@ impl Default for BaseConfig {
orderflow_tracing_store_path: None,
orderflow_tracing_max_blocks: 0,
system_recipient_allowlist: Vec::new(),
max_order_execution_duration_warning_us: None,
}
}
}
Expand Down
Loading
Loading