Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rbuilder::{
live_builder::{
base_config::{
default_ip, DEFAULT_EL_NODE_IPC_PATH, DEFAULT_INCOMING_BUNDLES_PORT,
DEFAULT_RETH_DB_PATH,
DEFAULT_RETH_DB_PATH, DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
},
block_list_provider::NullBlockListProvider,
config::create_provider_factory,
Expand Down Expand Up @@ -97,6 +97,7 @@ async fn main() -> eyre::Result<()> {
DEFAULT_SERVE_MAX_CONNECTIONS,
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS),
);
let (orderpool_sender, orderpool_receiver) =
mpsc::channel(order_input_config.input_channel_buffer_size);
Expand Down
5 changes: 5 additions & 0 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ pub struct BaseConfig {
/// Use experimental code for faster finalize
pub faster_finalize: bool,

/// See [OrderPool::time_to_keep_mempool_txs]
pub time_to_keep_mempool_txs_secs: u64,

// backtest config
backtest_fetch_mempool_data_dir: EnvOrValue<String>,
pub backtest_fetch_eth_rpc_url: String,
Expand Down Expand Up @@ -542,6 +545,7 @@ pub const DEFAULT_RETH_DB_PATH: &str = "/mnt/data/reth";
/// This will update every 2.4 hours, super reasonable.
pub const DEFAULT_BLOCKLIST_URL_MAX_AGE_HOURS: u64 = 24;
pub const DEFAULT_REQUIRE_NON_EMPTY_BLOCKLIST: bool = false;
pub const DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS: u64 = 60;

impl Default for BaseConfig {
fn default() -> Self {
Expand Down Expand Up @@ -591,6 +595,7 @@ impl Default for BaseConfig {
ipc_provider: None,
evm_caching_enable: false,
faster_finalize: false,
time_to_keep_mempool_txs_secs: DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ use self::{
orderpool::{OrderPool, OrderPoolSubscriptionId},
replaceable_order_sink::ReplaceableOrderSink,
};
use crate::primitives::{serialize::CancelShareBundle, BundleReplacementData, Order};
use crate::provider::StateProviderFactory;
use crate::telemetry::{set_current_block, set_ordepool_count};
use crate::telemetry::{set_current_block, set_ordepool_stats};
use crate::{
live_builder::base_config::DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
primitives::{serialize::CancelShareBundle, BundleReplacementData, Order},
};
use alloy_consensus::Header;
use jsonrpsee::RpcModule;
use parking_lot::Mutex;
Expand Down Expand Up @@ -101,6 +104,8 @@ pub struct OrderInputConfig {
results_channel_timeout: Duration,
/// Size of the bounded channel.
pub input_channel_buffer_size: usize,
/// See [OrderPool::time_to_keep_mempool_txs]
time_to_keep_mempool_txs: Duration,
}
pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096;
pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50);
Expand All @@ -116,6 +121,7 @@ impl OrderInputConfig {
serve_max_connections: u32,
results_channel_timeout: Duration,
input_channel_buffer_size: usize,
time_to_keep_mempool_txs: Duration,
) -> Self {
Self {
ignore_cancellable_orders,
Expand All @@ -126,6 +132,7 @@ impl OrderInputConfig {
serve_max_connections,
results_channel_timeout,
input_channel_buffer_size,
time_to_keep_mempool_txs,
}
}

Expand All @@ -148,6 +155,7 @@ impl OrderInputConfig {
serve_max_connections: 4096,
results_channel_timeout: Duration::from_millis(50),
input_channel_buffer_size: 10_000,
time_to_keep_mempool_txs: Duration::from_secs(config.time_to_keep_mempool_txs_secs),
})
}

Expand All @@ -161,6 +169,7 @@ impl OrderInputConfig {
serve_max_connections: 4096,
server_ip: Ipv4Addr::new(127, 0, 0, 1),
server_port: 0,
time_to_keep_mempool_txs: Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS),
}
}
}
Expand Down Expand Up @@ -211,7 +220,7 @@ where
warn!("ignore_blobs is set to true, some order input is ignored");
}

let orderpool = Arc::new(Mutex::new(OrderPool::new()));
let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs)));
let subscriber = OrderPoolSubscriber {
orderpool: orderpool.clone(),
};
Expand Down Expand Up @@ -361,7 +370,7 @@ where

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
set_ordepool_stats(tx_count, bundle_count, orderpool.mempool_txs_size());
debug!(
current_block,
tx_count,
Expand Down
85 changes: 58 additions & 27 deletions crates/rbuilder/src/live_builder/order_input/orderpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::primitives::{
serialize::CancelShareBundle, BundleReplacementData, Order, OrderId, ShareBundleReplacementKey,
};
use ahash::HashMap;
use alloy_eips::merge::SLOT_DURATION;
use lru::LruCache;
use reth::providers::StateProviderBox;
use reth_primitives_traits::InMemorySize;
use std::{
collections::VecDeque,
num::NonZeroUsize,
Expand All @@ -19,9 +19,6 @@ use super::{
ReplaceableOrderPoolCommand,
};

const BLOCKS_TO_KEEP_TXS: u32 = 5;
const TIME_TO_KEEP_TXS: Duration = SLOT_DURATION.saturating_mul(BLOCKS_TO_KEEP_TXS);

const TIME_TO_KEEP_BUNDLE_CANCELLATIONS: Duration = Duration::from_secs(60);
/// Push to pull for OrderSink. Just poll de UnboundedReceiver to get the orders.
#[derive(Debug)]
Expand Down Expand Up @@ -69,6 +66,8 @@ pub struct OrderPoolSubscriptionId(u64);
#[derive(Debug)]
pub struct OrderPool {
mempool_txs: Vec<(Order, Instant)>,
/// Sum of measure_tx(order) for all mempool_txs
mempool_txs_size: usize,
/// cancelled bundle, cancellation arrival time
bundle_cancellations: VecDeque<(BundleReplacementData, Instant)>,
bundles_by_target_block: HashMap<u64, BundleBlockStore>,
Expand All @@ -77,16 +76,12 @@ pub struct OrderPool {
known_orders: LruCache<(OrderId, u64), ()>,
sinks: HashMap<OrderPoolSubscriptionId, SinkSubscription>,
next_sink_id: u64,
}

impl Default for OrderPool {
fn default() -> Self {
Self::new()
}
/// After this time a mempool tx is dropped.
time_to_keep_mempool_txs: Duration,
}

impl OrderPool {
pub fn new() -> Self {
pub fn new(time_to_keep_mempool_txs: Duration) -> Self {
OrderPool {
mempool_txs: Vec::new(),
bundles_by_target_block: HashMap::default(),
Expand All @@ -95,6 +90,8 @@ impl OrderPool {
sinks: Default::default(),
next_sink_id: 0,
bundle_cancellations: Default::default(),
time_to_keep_mempool_txs,
mempool_txs_size: 0,
}
}

Expand All @@ -117,6 +114,7 @@ impl OrderPool {
let (order, target_block) = match &order {
Order::Tx(..) => {
self.mempool_txs.push((order.clone(), Instant::now()));
self.mempool_txs_size += Self::measure_tx(order);
(order, None)
}
Order::Bundle(bundle) => {
Expand Down Expand Up @@ -232,6 +230,32 @@ impl OrderPool {
self.sinks.remove(id).map(|s| s.sink)
}

/// Retains if order is young and nonces are valid.
pub fn must_retain_order(
inserted_time: &Instant,
order: &Order,
new_state: &StateProviderBox,
time_to_keep_mempool_txs: &Duration,
) -> bool {
if inserted_time.elapsed() > *time_to_keep_mempool_txs {
return false;
}
for nonce in order.nonces() {
if nonce.optional {
continue;
}
let onchain_nonce = new_state
.account_nonce(&nonce.address)
.map_err(|e: reth_errors::ProviderError| error!("Failed to get a nonce: {}", e))
.unwrap_or_default()
.unwrap_or_default();
if onchain_nonce > nonce.nonce {
return false;
}
}
true
}

/// Should be called when last block is updated.
/// It's slow but since it only happens at the start of the block it does now matter.
/// It clears old txs from the mempool and old bundle_cancellations.
Expand All @@ -242,23 +266,12 @@ impl OrderPool {
self.bundles_for_current_block.clear();
// remove mempool txs by nonce, time
self.mempool_txs.retain(|(order, time)| {
if time.elapsed() > TIME_TO_KEEP_TXS {
return false;
let retain =
Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs);
if !retain {
self.mempool_txs_size -= Self::measure_tx(order);
}
for nonce in order.nonces() {
if nonce.optional {
continue;
}
let onchain_nonce = new_state
.account_nonce(&nonce.address)
.map_err(|e| error!("Failed to get a nonce: {}", e))
.unwrap_or_default()
.unwrap_or_default();
if onchain_nonce > nonce.nonce {
return false;
}
}
true
retain
});
//remove old bundle cancellations
while let Some((_, oldest_time)) = self.bundle_cancellations.front() {
Expand All @@ -279,4 +292,22 @@ impl OrderPool {
.sum();
(tx_count, bundle_count)
}

pub fn mempool_txs_size(&self) -> usize {
self.mempool_txs_size
}

pub fn measure_tx(order: &Order) -> usize {
match order {
Order::Tx(tx) => tx.size(),
Order::Bundle(_) => {
error!("measure_tx called on a bundle");
0
}
Order::ShareBundle(_) => {
error!("measure_tx called on an sbundle");
0
}
}
}
}
21 changes: 19 additions & 2 deletions crates/rbuilder/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ use reth_primitives::{
kzg::{BYTES_PER_BLOB, BYTES_PER_COMMITMENT, BYTES_PER_PROOF},
PooledTransaction, Recovered, Transaction, TransactionSigned,
};
use reth_primitives_traits::SignerRecoverable;
use reth_primitives_traits::{InMemorySize, SignerRecoverable};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::{cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, str::FromStr, sync::Arc};
use std::{
cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, mem, str::FromStr, sync::Arc,
};
pub use test_data_generator::TestDataGenerator;
use thiserror::Error;
use uuid::Uuid;
Expand All @@ -50,6 +52,13 @@ impl Metadata {
}
}

impl InMemorySize for Metadata {
fn size(&self) -> usize {
mem::size_of::<time::OffsetDateTime>() + // received_at_timestamp
mem::size_of::<Option<Address>>() // refund_identity
}
}

impl Default for Metadata {
fn default() -> Self {
Self::with_current_received_at()
Expand Down Expand Up @@ -874,6 +883,14 @@ impl MempoolTx {
}
}

impl InMemorySize for MempoolTx {
fn size(&self) -> usize {
self.tx_with_blobs.tx.inner().size()
+ self.tx_with_blobs.blobs_sidecar.size()
+ self.tx_with_blobs.metadata.size()
}
}

/// Main type used for block building, we build blocks as sequences of Orders
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Order {
Expand Down
6 changes: 5 additions & 1 deletion crates/rbuilder/src/telemetry/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ register_metrics! {
IntGauge::new("current_block", "Current Block").unwrap();
pub static ORDERPOOL_TXS: IntGauge =
IntGauge::new("orderpool_txs", "Transactions In The Orderpool").unwrap();

pub static ORDERPOOL_TXS_SIZE: IntGauge =
IntGauge::new("orderpool_txs_size", "Aprox in memory size of transactions in the Orderpool (bytes)").unwrap();
pub static ORDERPOOL_BUNDLES: IntGauge =
IntGauge::new("orderpool_bundles", "Bundles In The Orderpool").unwrap();

Expand Down Expand Up @@ -437,9 +440,10 @@ pub fn inc_simulation_gas_used(gas: u64) {
SIMULATION_GAS_USED.inc_by(gas);
}

pub fn set_ordepool_count(txs: usize, bundles: usize) {
pub fn set_ordepool_stats(txs: usize, bundles: usize, txs_size: usize) {
ORDERPOOL_TXS.set(txs as i64);
ORDERPOOL_BUNDLES.set(bundles as i64);
ORDERPOOL_TXS_SIZE.set(txs_size as i64);
}

pub fn inc_order_input_rpc_errors(method: &str) {
Expand Down
14 changes: 9 additions & 5 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Every field has a default if omitted.
|root_hash_threads| int|Threads used when using reth's native root hash calculation. If 0 global rayon pool is used| 0
| watchdog_timeout_sec| optional int| If now block building is started in this period rbuilder exits.|None|
|live_builders|vec[string]| List of `builders` to be used for live building.<br>Notice that you can define on **builders** some builders and select only a few here.|["mgp-ordering","mp-ordering"]|
|evm_caching_enable|bool|Experimental. If enabled per block EVM execution will be enabled|false|
|evm_caching_enable|bool|If enabled per block EVM execution will be enabled|false|
|faster_finalize|bool| If enabled improves block finalization by catching proofs|false|
|time_to_keep_mempool_txs_secs|u64| /// After this time a mempool tx is dropped.|1|
|backtest_fetch_mempool_data_dir|env/string|Dir used to store mempool data used in backtesting|"/mnt/data/mempool"|
|backtest_fetch_eth_rpc_url|string|url to EL node RPC used in backtesting|"http://127.0.0.1:8545"|
|backtest_fetch_eth_rpc_parallel| int|Number of parallel connections allowed on backtest_fetch_eth_rpc_url|1|
Expand All @@ -55,6 +57,7 @@ Every field has a default if omitted.
|relays|vec[RelayConfig]| List of relays used to get validator registration info and/or submitting. Below are the details for RelayConfig fields. Example: <br>[[relays]]<br>name = "relay1"<br>optimistic = true<br>priority = 1<br>url = "https://relay1"<br>use_gzip_for_submit = true<br>use_ssz_for_submit = true<br>mode:full<br><br>[[relays]]<br>name = "relay2"<br>...more params...|[]|
|RelayConfig.name|mandatory string| Human readable name for the relay||
|RelayConfig.url|mandatory string| Url to relay's endpoint||
|RelayConfig.grpc_url|optional string| Url to relay's gRPC endpoint (only bloxroute at 2025/08/20).|None|
|RelayConfig.authorization_header|optional env/string|If set "authorization" header will be added to RPC calls|None|
|RelayConfig.builder_id_header|optional env/string|If set "X-Builder-Id" header will be added to RPC calls|None|
|RelayConfig.api_token_header|optional env/string|If set "X-Api-Token" header will be added to RPC calls|None|
Expand All @@ -63,17 +66,18 @@ Every field has a default if omitted.
|RelayConfig.use_gzip_for_submit|optional bool||false|
|RelayConfig.optimistic|optional bool||false|
|RelayConfig.interval_between_submissions_ms|optional int| Caps the submission rate to the relay|None|
|RelayConfig.is_fast|optional bool| If the block bid > ignore_fast_bid_threshold_eth, critical blocks (the ones containing orders with replacement id) will go only to fast relays.|true|
|RelayConfig.is_independent|optional bool| Big blocks (bid value > independent_bid_threshold_eth) will go only to independent relays.|true|
|RelayConfig.max_bid_eth|optional string| Max bid we can submit to this relay. Any bid above this will be skipped.<br>None -> No limit.|None|
|RelayConfig.is_bloxroute|bool|Set to `true` for bloxroute relays to add extra headers.|false|
|RelayConfig.ask_for_filtering_validators|optional bool| Adds "filtering=true" as query to the call relay/v1/builder/validators to get all validators (including those filtering OFAC).<br>On 2025/06/24 only supported by ultrasound.|false|
|RelayConfig.can_ignore_gas_limit|optional bool| If we submit a block with a different gas than the one the validator registered with in this relay the relay does not mind. Useful for gas limit conflicts. On 2025/08/20 only ultrasound confirmed that is ok with this. (we didn't asked the rest yet)|false|
|enabled_relays| vec["string"]| Extra hardcoded relays to add (see DEFAULT_RELAYS in [config.rs](../crates/rbuilder/src/live_builder/config.rs))|[]|
|relay_secret_key|optional env/string|Secret key that will be used to sign normal submissions to the relay.|None|
|optimistic_relay_secret_key|optional env/string|Secret key that will be used to sign optimistic submissions to the relay.|None|
|optimistic_enabled|bool|When enabled builder will make optimistic submissions to optimistic relays|false|
|optimistic_max_bid_value_eth|string| Bids above this value will always be submitted in non-optimistic mode.|"0.0"|
|cl_node_url|vec[env/stirng]| Array if urls to CL clients to get the new payload events|["http://127.0.0.1:3500"]
|genesis_fork_version|optional string|Genesis fork version for the chain. If not provided it will be fetched from the beacon client.|None|
|independent_bid_threshold_eth|optional string|Bids above this value will only go to independent relays.| "0"|
|ignore_fast_bid_threshold_eth|optional string|For bids below this value we ignore RelayConfig::is_fast (it's like is_fast is true for all relays)| "1000"|
|scraped_bids_publisher_url|string| Url to connect to the bid scraper service| "tcp://0.0.0.0:5555"|
## Building algorithms
rbuilder can multiple building algorithms and each algorithm can be instantiated multiple times with it's own set of parameters each time.
Each instantiated algorithm starts with:
Expand Down
Loading