diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index 9bf331725..a75b18916 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -26,7 +26,7 @@ use rbuilder::{ live_builder::{ base_config::{ default_ip, DEFAULT_EL_NODE_IPC_PATH, DEFAULT_INCOMING_BUNDLES_PORT, - DEFAULT_RETH_DB_PATH, + DEFAULT_RETH_DB_PATH, DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS, }, block_list_provider::NullBlockListProvider, config::create_provider_factory, @@ -97,6 +97,7 @@ async fn main() -> eyre::Result<()> { DEFAULT_SERVE_MAX_CONNECTIONS, DEFAULT_RESULTS_CHANNEL_TIMEOUT, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, + Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS), ); let (orderpool_sender, orderpool_receiver) = mpsc::channel(order_input_config.input_channel_buffer_size); diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 83d8b1860..68abc4b46 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -143,6 +143,9 @@ pub struct BaseConfig { /// Use experimental code for faster finalize pub faster_finalize: bool, + /// See [OrderPool::time_to_keep_mempool_txs] + pub time_to_keep_mempool_txs_secs: u64, + // backtest config backtest_fetch_mempool_data_dir: EnvOrValue, pub backtest_fetch_eth_rpc_url: String, @@ -542,6 +545,7 @@ pub const DEFAULT_RETH_DB_PATH: &str = "/mnt/data/reth"; /// This will update every 2.4 hours, super reasonable. pub const DEFAULT_BLOCKLIST_URL_MAX_AGE_HOURS: u64 = 24; pub const DEFAULT_REQUIRE_NON_EMPTY_BLOCKLIST: bool = false; +pub const DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS: u64 = 60; impl Default for BaseConfig { fn default() -> Self { @@ -591,6 +595,7 @@ impl Default for BaseConfig { ipc_provider: None, evm_caching_enable: false, faster_finalize: false, + time_to_keep_mempool_txs_secs: DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS, } } } diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index e5ebf9b59..30f35954e 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -12,9 +12,12 @@ use self::{ orderpool::{OrderPool, OrderPoolSubscriptionId}, replaceable_order_sink::ReplaceableOrderSink, }; -use crate::primitives::{serialize::CancelShareBundle, BundleReplacementData, Order}; use crate::provider::StateProviderFactory; -use crate::telemetry::{set_current_block, set_ordepool_count}; +use crate::telemetry::{set_current_block, set_ordepool_stats}; +use crate::{ + live_builder::base_config::DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS, + primitives::{serialize::CancelShareBundle, BundleReplacementData, Order}, +}; use alloy_consensus::Header; use jsonrpsee::RpcModule; use parking_lot::Mutex; @@ -101,6 +104,8 @@ pub struct OrderInputConfig { results_channel_timeout: Duration, /// Size of the bounded channel. pub input_channel_buffer_size: usize, + /// See [OrderPool::time_to_keep_mempool_txs] + time_to_keep_mempool_txs: Duration, } pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096; pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50); @@ -116,6 +121,7 @@ impl OrderInputConfig { serve_max_connections: u32, results_channel_timeout: Duration, input_channel_buffer_size: usize, + time_to_keep_mempool_txs: Duration, ) -> Self { Self { ignore_cancellable_orders, @@ -126,6 +132,7 @@ impl OrderInputConfig { serve_max_connections, results_channel_timeout, input_channel_buffer_size, + time_to_keep_mempool_txs, } } @@ -148,6 +155,7 @@ impl OrderInputConfig { serve_max_connections: 4096, results_channel_timeout: Duration::from_millis(50), input_channel_buffer_size: 10_000, + time_to_keep_mempool_txs: Duration::from_secs(config.time_to_keep_mempool_txs_secs), }) } @@ -161,6 +169,7 @@ impl OrderInputConfig { serve_max_connections: 4096, server_ip: Ipv4Addr::new(127, 0, 0, 1), server_port: 0, + time_to_keep_mempool_txs: Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS), } } } @@ -211,7 +220,7 @@ where warn!("ignore_blobs is set to true, some order input is ignored"); } - let orderpool = Arc::new(Mutex::new(OrderPool::new())); + let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs))); let subscriber = OrderPoolSubscriber { orderpool: orderpool.clone(), }; @@ -361,7 +370,7 @@ where let update_time = start.elapsed(); let (tx_count, bundle_count) = orderpool.content_count(); - set_ordepool_count(tx_count, bundle_count); + set_ordepool_stats(tx_count, bundle_count, orderpool.mempool_txs_size()); debug!( current_block, tx_count, diff --git a/crates/rbuilder/src/live_builder/order_input/orderpool.rs b/crates/rbuilder/src/live_builder/order_input/orderpool.rs index 0a3768eb7..0f7f1e8c5 100644 --- a/crates/rbuilder/src/live_builder/order_input/orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/orderpool.rs @@ -2,9 +2,9 @@ use crate::primitives::{ serialize::CancelShareBundle, BundleReplacementData, Order, OrderId, ShareBundleReplacementKey, }; use ahash::HashMap; -use alloy_eips::merge::SLOT_DURATION; use lru::LruCache; use reth::providers::StateProviderBox; +use reth_primitives_traits::InMemorySize; use std::{ collections::VecDeque, num::NonZeroUsize, @@ -19,9 +19,6 @@ use super::{ ReplaceableOrderPoolCommand, }; -const BLOCKS_TO_KEEP_TXS: u32 = 5; -const TIME_TO_KEEP_TXS: Duration = SLOT_DURATION.saturating_mul(BLOCKS_TO_KEEP_TXS); - const TIME_TO_KEEP_BUNDLE_CANCELLATIONS: Duration = Duration::from_secs(60); /// Push to pull for OrderSink. Just poll de UnboundedReceiver to get the orders. #[derive(Debug)] @@ -69,6 +66,8 @@ pub struct OrderPoolSubscriptionId(u64); #[derive(Debug)] pub struct OrderPool { mempool_txs: Vec<(Order, Instant)>, + /// Sum of measure_tx(order) for all mempool_txs + mempool_txs_size: usize, /// cancelled bundle, cancellation arrival time bundle_cancellations: VecDeque<(BundleReplacementData, Instant)>, bundles_by_target_block: HashMap, @@ -77,16 +76,12 @@ pub struct OrderPool { known_orders: LruCache<(OrderId, u64), ()>, sinks: HashMap, next_sink_id: u64, -} - -impl Default for OrderPool { - fn default() -> Self { - Self::new() - } + /// After this time a mempool tx is dropped. + time_to_keep_mempool_txs: Duration, } impl OrderPool { - pub fn new() -> Self { + pub fn new(time_to_keep_mempool_txs: Duration) -> Self { OrderPool { mempool_txs: Vec::new(), bundles_by_target_block: HashMap::default(), @@ -95,6 +90,8 @@ impl OrderPool { sinks: Default::default(), next_sink_id: 0, bundle_cancellations: Default::default(), + time_to_keep_mempool_txs, + mempool_txs_size: 0, } } @@ -117,6 +114,7 @@ impl OrderPool { let (order, target_block) = match &order { Order::Tx(..) => { self.mempool_txs.push((order.clone(), Instant::now())); + self.mempool_txs_size += Self::measure_tx(order); (order, None) } Order::Bundle(bundle) => { @@ -232,6 +230,32 @@ impl OrderPool { self.sinks.remove(id).map(|s| s.sink) } + /// Retains if order is young and nonces are valid. + pub fn must_retain_order( + inserted_time: &Instant, + order: &Order, + new_state: &StateProviderBox, + time_to_keep_mempool_txs: &Duration, + ) -> bool { + if inserted_time.elapsed() > *time_to_keep_mempool_txs { + return false; + } + for nonce in order.nonces() { + if nonce.optional { + continue; + } + let onchain_nonce = new_state + .account_nonce(&nonce.address) + .map_err(|e: reth_errors::ProviderError| error!("Failed to get a nonce: {}", e)) + .unwrap_or_default() + .unwrap_or_default(); + if onchain_nonce > nonce.nonce { + return false; + } + } + true + } + /// Should be called when last block is updated. /// It's slow but since it only happens at the start of the block it does now matter. /// It clears old txs from the mempool and old bundle_cancellations. @@ -242,23 +266,12 @@ impl OrderPool { self.bundles_for_current_block.clear(); // remove mempool txs by nonce, time self.mempool_txs.retain(|(order, time)| { - if time.elapsed() > TIME_TO_KEEP_TXS { - return false; + let retain = + Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs); + if !retain { + self.mempool_txs_size -= Self::measure_tx(order); } - for nonce in order.nonces() { - if nonce.optional { - continue; - } - let onchain_nonce = new_state - .account_nonce(&nonce.address) - .map_err(|e| error!("Failed to get a nonce: {}", e)) - .unwrap_or_default() - .unwrap_or_default(); - if onchain_nonce > nonce.nonce { - return false; - } - } - true + retain }); //remove old bundle cancellations while let Some((_, oldest_time)) = self.bundle_cancellations.front() { @@ -279,4 +292,22 @@ impl OrderPool { .sum(); (tx_count, bundle_count) } + + pub fn mempool_txs_size(&self) -> usize { + self.mempool_txs_size + } + + pub fn measure_tx(order: &Order) -> usize { + match order { + Order::Tx(tx) => tx.size(), + Order::Bundle(_) => { + error!("measure_tx called on a bundle"); + 0 + } + Order::ShareBundle(_) => { + error!("measure_tx called on an sbundle"); + 0 + } + } + } } diff --git a/crates/rbuilder/src/primitives/mod.rs b/crates/rbuilder/src/primitives/mod.rs index 27e61acff..306ac5f3e 100644 --- a/crates/rbuilder/src/primitives/mod.rs +++ b/crates/rbuilder/src/primitives/mod.rs @@ -26,10 +26,12 @@ use reth_primitives::{ kzg::{BYTES_PER_BLOB, BYTES_PER_COMMITMENT, BYTES_PER_PROOF}, PooledTransaction, Recovered, Transaction, TransactionSigned, }; -use reth_primitives_traits::SignerRecoverable; +use reth_primitives_traits::{InMemorySize, SignerRecoverable}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use std::{cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, str::FromStr, sync::Arc}; +use std::{ + cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, mem, str::FromStr, sync::Arc, +}; pub use test_data_generator::TestDataGenerator; use thiserror::Error; use uuid::Uuid; @@ -50,6 +52,13 @@ impl Metadata { } } +impl InMemorySize for Metadata { + fn size(&self) -> usize { + mem::size_of::() + // received_at_timestamp + mem::size_of::>() // refund_identity + } +} + impl Default for Metadata { fn default() -> Self { Self::with_current_received_at() @@ -874,6 +883,14 @@ impl MempoolTx { } } +impl InMemorySize for MempoolTx { + fn size(&self) -> usize { + self.tx_with_blobs.tx.inner().size() + + self.tx_with_blobs.blobs_sidecar.size() + + self.tx_with_blobs.metadata.size() + } +} + /// Main type used for block building, we build blocks as sequences of Orders #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Order { diff --git a/crates/rbuilder/src/telemetry/metrics/mod.rs b/crates/rbuilder/src/telemetry/metrics/mod.rs index b9ba64c32..02b212ba8 100644 --- a/crates/rbuilder/src/telemetry/metrics/mod.rs +++ b/crates/rbuilder/src/telemetry/metrics/mod.rs @@ -126,6 +126,9 @@ register_metrics! { IntGauge::new("current_block", "Current Block").unwrap(); pub static ORDERPOOL_TXS: IntGauge = IntGauge::new("orderpool_txs", "Transactions In The Orderpool").unwrap(); + + pub static ORDERPOOL_TXS_SIZE: IntGauge = + IntGauge::new("orderpool_txs_size", "Aprox in memory size of transactions in the Orderpool (bytes)").unwrap(); pub static ORDERPOOL_BUNDLES: IntGauge = IntGauge::new("orderpool_bundles", "Bundles In The Orderpool").unwrap(); @@ -437,9 +440,10 @@ pub fn inc_simulation_gas_used(gas: u64) { SIMULATION_GAS_USED.inc_by(gas); } -pub fn set_ordepool_count(txs: usize, bundles: usize) { +pub fn set_ordepool_stats(txs: usize, bundles: usize, txs_size: usize) { ORDERPOOL_TXS.set(txs as i64); ORDERPOOL_BUNDLES.set(bundles as i64); + ORDERPOOL_TXS_SIZE.set(txs_size as i64); } pub fn inc_order_input_rpc_errors(method: &str) { diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 263385b0f..2656ba954 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -39,7 +39,9 @@ Every field has a default if omitted. |root_hash_threads| int|Threads used when using reth's native root hash calculation. If 0 global rayon pool is used| 0 | watchdog_timeout_sec| optional int| If now block building is started in this period rbuilder exits.|None| |live_builders|vec[string]| List of `builders` to be used for live building.
Notice that you can define on **builders** some builders and select only a few here.|["mgp-ordering","mp-ordering"]| -|evm_caching_enable|bool|Experimental. If enabled per block EVM execution will be enabled|false| +|evm_caching_enable|bool|If enabled per block EVM execution will be enabled|false| +|faster_finalize|bool| If enabled improves block finalization by catching proofs|false| +|time_to_keep_mempool_txs_secs|u64| /// After this time a mempool tx is dropped.|1| |backtest_fetch_mempool_data_dir|env/string|Dir used to store mempool data used in backtesting|"/mnt/data/mempool"| |backtest_fetch_eth_rpc_url|string|url to EL node RPC used in backtesting|"http://127.0.0.1:8545"| |backtest_fetch_eth_rpc_parallel| int|Number of parallel connections allowed on backtest_fetch_eth_rpc_url|1| @@ -55,6 +57,7 @@ Every field has a default if omitted. |relays|vec[RelayConfig]| List of relays used to get validator registration info and/or submitting. Below are the details for RelayConfig fields. Example:
[[relays]]
name = "relay1"
optimistic = true
priority = 1
url = "https://relay1"
use_gzip_for_submit = true
use_ssz_for_submit = true
mode:full

[[relays]]
name = "relay2"
...more params...|[]| |RelayConfig.name|mandatory string| Human readable name for the relay|| |RelayConfig.url|mandatory string| Url to relay's endpoint|| +|RelayConfig.grpc_url|optional string| Url to relay's gRPC endpoint (only bloxroute at 2025/08/20).|None| |RelayConfig.authorization_header|optional env/string|If set "authorization" header will be added to RPC calls|None| |RelayConfig.builder_id_header|optional env/string|If set "X-Builder-Id" header will be added to RPC calls|None| |RelayConfig.api_token_header|optional env/string|If set "X-Api-Token" header will be added to RPC calls|None| @@ -63,8 +66,10 @@ Every field has a default if omitted. |RelayConfig.use_gzip_for_submit|optional bool||false| |RelayConfig.optimistic|optional bool||false| |RelayConfig.interval_between_submissions_ms|optional int| Caps the submission rate to the relay|None| -|RelayConfig.is_fast|optional bool| If the block bid > ignore_fast_bid_threshold_eth, critical blocks (the ones containing orders with replacement id) will go only to fast relays.|true| -|RelayConfig.is_independent|optional bool| Big blocks (bid value > independent_bid_threshold_eth) will go only to independent relays.|true| +|RelayConfig.max_bid_eth|optional string| Max bid we can submit to this relay. Any bid above this will be skipped.
None -> No limit.|None| +|RelayConfig.is_bloxroute|bool|Set to `true` for bloxroute relays to add extra headers.|false| +|RelayConfig.ask_for_filtering_validators|optional bool| Adds "filtering=true" as query to the call relay/v1/builder/validators to get all validators (including those filtering OFAC).
On 2025/06/24 only supported by ultrasound.|false| +|RelayConfig.can_ignore_gas_limit|optional bool| If we submit a block with a different gas than the one the validator registered with in this relay the relay does not mind. Useful for gas limit conflicts. On 2025/08/20 only ultrasound confirmed that is ok with this. (we didn't asked the rest yet)|false| |enabled_relays| vec["string"]| Extra hardcoded relays to add (see DEFAULT_RELAYS in [config.rs](../crates/rbuilder/src/live_builder/config.rs))|[]| |relay_secret_key|optional env/string|Secret key that will be used to sign normal submissions to the relay.|None| |optimistic_relay_secret_key|optional env/string|Secret key that will be used to sign optimistic submissions to the relay.|None| @@ -72,8 +77,7 @@ Every field has a default if omitted. |optimistic_max_bid_value_eth|string| Bids above this value will always be submitted in non-optimistic mode.|"0.0"| |cl_node_url|vec[env/stirng]| Array if urls to CL clients to get the new payload events|["http://127.0.0.1:3500"] |genesis_fork_version|optional string|Genesis fork version for the chain. If not provided it will be fetched from the beacon client.|None| -|independent_bid_threshold_eth|optional string|Bids above this value will only go to independent relays.| "0"| -|ignore_fast_bid_threshold_eth|optional string|For bids below this value we ignore RelayConfig::is_fast (it's like is_fast is true for all relays)| "1000"| +|scraped_bids_publisher_url|string| Url to connect to the bid scraper service| "tcp://0.0.0.0:5555"| ## Building algorithms rbuilder can multiple building algorithms and each algorithm can be instantiated multiple times with it's own set of parameters each time. Each instantiated algorithm starts with: