diff --git a/Cargo.toml b/Cargo.toml index 5ea3804..29ebf01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,8 @@ serde = "1.0" jsonrpsee = "0.26.0" parking_lot = "0.12" metrics = "0.24.0" +priority-queue = "2.0.0" + # Alloy dependencies alloy-origin = { version = "1.0.37", package = "alloy", features = [ diff --git a/src/lib.rs b/src/lib.rs index a6e4a0a..c5343df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,9 @@ pub mod pool; /// Common steps library pub mod steps; +/// Orderpool utils +pub mod orderpool2; + /// Externally available test utilities #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/src/orderpool2/mod.rs b/src/orderpool2/mod.rs new file mode 100644 index 0000000..f3ba615 --- /dev/null +++ b/src/orderpool2/mod.rs @@ -0,0 +1,38 @@ +use {crate::alloy::primitives::Address, std::hash::Hash}; + +pub mod prioritized_pool; +pub mod sim_tree; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AccountNonce { + pub account: Address, + pub nonce: u64, +} + +impl AccountNonce { + #[must_use] + pub fn with_nonce(self, nonce: u64) -> Self { + Self { + account: self.account, + nonce, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BundleNonce { + pub address: Address, + pub nonce: u64, + pub optional: bool, +} + +pub trait OrderpoolOrder { + type ID: Hash + Eq; + fn id(&self) -> Self::ID; + fn nonces(&self) -> Vec; +} + +pub trait OrderpoolNonceSource { + type NonceError; + fn nonce(&self, address: &Address) -> Result; +} diff --git a/src/orderpool2/prioritized_pool/mod.rs b/src/orderpool2/prioritized_pool/mod.rs new file mode 100644 index 0000000..c679fd5 --- /dev/null +++ b/src/orderpool2/prioritized_pool/mod.rs @@ -0,0 +1,235 @@ +/// Prioritized orderpool is used by block building thread to iterate +/// over all bundles using some ordering and include them while keeping +/// track of onchain nonces. +/// +/// Usage: +/// 1. Create `PrioritizedOrderpool` and incrementally fill it with bundles +/// using `insert_order` +/// 2. Clone it before starting block building run +/// 3. Pop bundles using `pop_order` until it returns None and try to +/// include them +/// 4. Update onchain nonces after each successful commit using +/// `update_onchain_nonces` +use { + crate::alloy::primitives::Address, + priority_queue::PriorityQueue, + std::collections::{HashMap, HashSet, hash_map::Entry}, +}; + +use super::{AccountNonce, BundleNonce, OrderpoolNonceSource, OrderpoolOrder}; + +pub mod step; +#[cfg(test)] +mod tests; + +pub trait PrioritizedOrderpoolPriority: Ord + Clone + Send + Sync { + type Order; + fn new(order: &Self::Order) -> Self; +} + +#[derive(Debug, Clone)] +pub struct PrioritizedOrderpool { + /// Ready (all nonce matching (or not matched but optional)) to execute + /// orders sorted + main_queue: PriorityQueue, + /// For each account we store all the orders from `main_queue` which contain + /// a tx from this account. Since the orders belong to `main_queue` these + /// are orders ready to execute. As soon as we execute an order from + /// `main_queue` all orders for all the accounts the order used + /// (`order.nonces()`) could get invalidated (if tx is not optional). + main_queue_nonces: HashMap>, + + /// Up to date "onchain" nonces for the current block we are building. + /// Special care must be taken to keep this in sync. + onchain_nonces: HashMap, + + /// Orders waiting for an account to reach a particular nonce. + pending_orders: HashMap>, + /// Id -> order for all orders we manage. Carefully maintained by + /// remove/insert + orders: HashMap, +} + +impl Default + for PrioritizedOrderpool +{ + fn default() -> Self { + Self { + main_queue: PriorityQueue::new(), + main_queue_nonces: HashMap::default(), + onchain_nonces: HashMap::default(), + pending_orders: HashMap::default(), + orders: HashMap::default(), + } + } +} + +impl PrioritizedOrderpool +where + Priority: PrioritizedOrderpoolPriority, + Order: OrderpoolOrder, +{ + /// Removes order from the pool + /// # Panics + /// Panics if implementation has a bug + pub fn pop_order(&mut self) -> Option { + let (id, _) = self.main_queue.pop()?; + + let order = self + .remove_popped_order(&id) + .expect("order from prio queue not found in block orders"); + Some(order) + } + + /// Clean up after some order was removed from `main_queue` + fn remove_popped_order(&mut self, id: &Order::ID) -> Option { + let order = self.remove_from_orders(id)?; + for BundleNonce { address, .. } in order.nonces() { + match self.main_queue_nonces.entry(address) { + Entry::Occupied(mut entry) => { + entry.get_mut().retain(|id| *id != order.id()); + } + Entry::Vacant(_) => {} + } + } + Some(order) + } + + /// Updates orderpool with changed nonces + /// if order updates onchain nonce from n -> n + 2, we get n + 2 as an + /// arguments here + /// # Panics + /// Panics if implementation has a bug + pub fn update_onchain_nonces( + &mut self, + new_nonces: &[AccountNonce], + nonce_source: &NonceSource, + ) -> Result<(), NonceSource::NonceError> { + let mut invalidated_orders: HashSet = HashSet::default(); + for new_nonce in new_nonces { + self + .onchain_nonces + .insert(new_nonce.account, new_nonce.nonce); + + if let Some(orders) = self.main_queue_nonces.remove(&new_nonce.account) { + invalidated_orders.extend(orders.into_iter()); + } + } + + for order_id in invalidated_orders { + // check if order can still be valid because of optional nonces + self.main_queue.remove(&order_id); + let order = self + .remove_popped_order(&order_id) + .expect("order from prio queue not found in block orders"); + let mut valid = true; + let mut valid_nonces = 0; + for BundleNonce { + nonce, + address, + optional, + } in order.nonces() + { + let onchain_nonce = self.nonce(&address, nonce_source)?; + if onchain_nonce > nonce && !optional { + valid = false; + break; + } else if onchain_nonce == nonce { + valid_nonces += 1; + } + } + let retain_order = valid && valid_nonces > 0; + if retain_order { + self.insert_order(order, nonce_source)?; + } + } + + for new_nonce in new_nonces { + if let Some(pending) = self.pending_orders.remove(new_nonce) { + let orders = pending + .iter() + .filter_map(|id| self.remove_from_orders(id)) + .collect::>(); + for order in orders { + self.insert_order(order, nonce_source)?; + } + } + } + Ok(()) + } + + fn remove_from_orders(&mut self, id: &Order::ID) -> Option { + self.orders.remove(id) + } + + fn nonce( + &mut self, + address: &Address, + nonce_source: &NonceSource, + ) -> Result { + match self.onchain_nonces.entry(*address) { + Entry::Occupied(entry) => Ok(*entry.get()), + Entry::Vacant(entry) => { + let nonce = nonce_source.nonce(address)?; + entry.insert(nonce); + Ok(nonce) + } + } + } + + pub fn insert_order( + &mut self, + order: Order, + nonce_source: &NonceSource, + ) -> Result<(), NonceSource::NonceError> { + if self.orders.contains_key(&order.id()) { + return Ok(()); + } + let mut pending_nonces = Vec::new(); + for BundleNonce { + nonce, + address, + optional, + } in order.nonces() + { + let onchain_nonce = self.nonce(&address, nonce_source)?; + if onchain_nonce > nonce && !optional { + // order can't be included because of nonce + return Ok(()); + } + if onchain_nonce < nonce && !optional { + pending_nonces.push(AccountNonce { + account: address, + nonce, + }); + } + } + if pending_nonces.is_empty() { + self.main_queue.push(order.id(), Priority::new(&order)); + for nonce in order.nonces() { + self + .main_queue_nonces + .entry(nonce.address) + .or_default() + .push(order.id()); + } + } else { + for pending_nonce in pending_nonces { + let pending = self.pending_orders.entry(pending_nonce).or_default(); + if !pending.contains(&order.id()) { + pending.push(order.id()); + } + } + } + self.orders.insert(order.id(), order); + Ok(()) + } + + pub fn remove_order(&mut self, id: &Order::ID) -> Option { + // we don't remove from pending because pending will clean itself + if self.main_queue.remove(id).is_some() { + self.remove_popped_order(id); + } + self.remove_from_orders(id) + } +} diff --git a/src/orderpool2/prioritized_pool/step.rs b/src/orderpool2/prioritized_pool/step.rs new file mode 100644 index 0000000..9c48d0d --- /dev/null +++ b/src/orderpool2/prioritized_pool/step.rs @@ -0,0 +1,216 @@ +/// Example usage of `prioritized_pool` using pipelines +use { + super::{ + super::{OrderpoolNonceSource, OrderpoolOrder}, + PrioritizedOrderpool, + PrioritizedOrderpoolPriority, + }, + crate::{ + alloy::{ + consensus::Transaction, + primitives::{Address, B256}, + }, + orderpool2::{AccountNonce, BundleNonce}, + payload::CheckpointExt, + prelude::{Bundle, Checkpoint, ControlFlow, Platform, Step, StepContext}, + reth, + }, + parking_lot::Mutex, + reth_ethereum::primitives::transaction::TxHashRef, + reth_evm::revm::DatabaseRef, + std::{ + marker::{PhantomData, Send, Sync}, + sync::Arc, + }, +}; + +#[derive(Clone)] +pub struct BundleWithNonces { + bundle: B, + nonces: Vec, + phantom: std::marker::PhantomData

, +} + +impl BundleWithNonces +where + B: Bundle

, + P: Platform, +{ + fn new(bundle: B) -> Self { + let txs = bundle.transactions(); + let mut nonces = Vec::with_capacity(txs.len()); + for tx in txs { + nonces.push(BundleNonce { + address: tx.signer(), + nonce: tx.nonce(), + optional: bundle.is_optional(tx.tx_hash()), + }); + } + // for each address we keep lowest nonce + nonces + .sort_by(|a, b| a.address.cmp(&b.address).then(a.nonce.cmp(&b.nonce))); + nonces.dedup_by_key(|n| n.address); + Self { + bundle, + nonces, + phantom: PhantomData, + } + } +} + +impl OrderpoolOrder for BundleWithNonces +where + B: Bundle

, + P: Platform, +{ + type ID = B256; + + fn id(&self) -> Self::ID { + self.bundle.hash() + } + + fn nonces(&self) -> Vec { + self.nonces.clone() + } +} + +impl OrderpoolNonceSource for Checkpoint

{ + type NonceError = reth::errors::ProviderError; + + fn nonce(&self, address: &Address) -> Result { + Ok( + self + .basic_ref(*address)? + .map(|acc| acc.nonce) + .unwrap_or_default(), + ) + } +} + +pub struct EffectiveGasPriceOrdering { + effective_gas_price: u128, + marker: PhantomData<(B, P)>, +} + +impl Clone for EffectiveGasPriceOrdering { + fn clone(&self) -> Self { + Self { + effective_gas_price: self.effective_gas_price, + marker: PhantomData, + } + } +} + +impl PartialOrd for EffectiveGasPriceOrdering { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for EffectiveGasPriceOrdering { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.effective_gas_price.cmp(&other.effective_gas_price) + } +} + +impl PartialEq for EffectiveGasPriceOrdering { + fn eq(&self, other: &Self) -> bool { + self.effective_gas_price == other.effective_gas_price + } +} +impl Eq for EffectiveGasPriceOrdering {} + +impl, P: Send + Sync + Platform> + PrioritizedOrderpoolPriority for EffectiveGasPriceOrdering +{ + type Order = BundleWithNonces; + + fn new(order: &Self::Order) -> Self { + let mut total_gas_limit = 0u128; + let mut accumulated_weighted_fee = 0u128; + for tx in order.bundle.transactions() { + if order.bundle.is_optional(tx.tx_hash()) { + continue; + } + let max_fee = tx.max_fee_per_gas(); + let gas_limit = u128::from(tx.gas_limit()); + accumulated_weighted_fee += max_fee * gas_limit; + total_gas_limit += gas_limit; + } + let effective_gas_price = if total_gas_limit != 0 { + accumulated_weighted_fee / total_gas_limit + } else { + 0 + }; + Self { + effective_gas_price, + marker: PhantomData, + } + } +} + +type PrioritizedOrderpoolForPlatform = + PrioritizedOrderpool>; + +pub struct AppendOrdersByPriority +where + P: Platform, + Pending: PendingOrdersSource

, + Priority: + PrioritizedOrderpoolPriority>, +{ + pending: Pending, + prioritized_orderpool: + Arc>>, + marker: PhantomData

, +} + +/// `PendingOrdersSource` is a source from which we can pull new bundles +/// One example of this would be a `best_transactions` iterator from mempool +pub trait PendingOrdersSource: Send + Sync + 'static { + /// If None is returned, we don't have new pending bundles yet + fn try_get_next_pending(&self) -> Option; +} + +impl Step

+ for AppendOrdersByPriority +where + P: Platform, + Pending: PendingOrdersSource

, + Priority: PrioritizedOrderpoolPriority> + + 'static, +{ + fn step( + self: Arc, + payload: Checkpoint

, + _ctx: StepContext

, + ) -> impl Future> + Send { + let mut orderpool = self.prioritized_orderpool.lock(); + // pull all new pending orders + while let Some(order) = self.pending.try_get_next_pending() { + orderpool + .insert_order(BundleWithNonces::new(order), &payload) + .unwrap_or_default(); + } + // we clone because orderpool is modified during the execution + let mut orderpool = orderpool.clone(); + let mut payload = payload; + + // try to include all orders + while let Some(order) = orderpool.pop_order() { + if let Ok(new) = payload.apply(order.bundle) { + let changed_nonces: Vec<_> = new + .changed_nonces() + .into_iter() + .map(|(account, nonce)| AccountNonce { account, nonce }) + .collect(); + + orderpool + .update_onchain_nonces(&changed_nonces, &new) + .unwrap_or_default(); + payload = new; + } + } + async { ControlFlow::Ok(payload) } + } +} diff --git a/src/orderpool2/prioritized_pool/tests.rs b/src/orderpool2/prioritized_pool/tests.rs new file mode 100644 index 0000000..7ea601f --- /dev/null +++ b/src/orderpool2/prioritized_pool/tests.rs @@ -0,0 +1,281 @@ +use { + super::*, + crate::alloy::primitives::Address, + std::{collections::HashMap, convert::Infallible}, +}; + +struct PrioritizedOrderpoolHashMapNonces(HashMap); + +impl From> for PrioritizedOrderpoolHashMapNonces { + fn from(map: HashMap) -> Self { + Self(map) + } +} + +impl OrderpoolNonceSource for PrioritizedOrderpoolHashMapNonces { + type NonceError = Infallible; + + fn nonce(&self, address: &Address) -> Result { + Ok(self.0.get(address).copied().unwrap_or_default()) + } +} + +#[derive(Clone)] +pub struct PrioritizedOrderpoolTestBundle { + id: u64, + nonces: Vec, + profit: u64, +} + +impl OrderpoolOrder for PrioritizedOrderpoolTestBundle { + type ID = u64; + + fn id(&self) -> Self::ID { + self.id + } + + fn nonces(&self) -> Vec { + self.nonces.clone() + } +} + +type PrioritizedOrderpoolTestPriority = u64; + +impl PrioritizedOrderpoolPriority for PrioritizedOrderpoolTestPriority { + type Order = PrioritizedOrderpoolTestBundle; + + fn new(order: &Self::Order) -> Self { + order.profit + } +} + +/// Helper struct for common `PrioritizedOrderStore` test operations +/// Works hardcoded on max profit ordering since it changes nothing on internal +/// logic +struct TestContext { + generated_id_count: u64, + nonces: PrioritizedOrderpoolHashMapNonces, + order_pool: PrioritizedOrderpool< + PrioritizedOrderpoolTestPriority, + PrioritizedOrderpoolTestBundle, + >, +} + +impl TestContext { + const ADDRESS1: Address = crate::alloy::primitives::address!( + "0000000000000000000000000000000000000001" + ); + const ADDRESS2: Address = crate::alloy::primitives::address!( + "0000000000000000000000000000000000000002" + ); + + /// Context with 1 account to send txs from + fn new_1_account(nonce: u64) -> (AccountNonce, Self) { + let nonces = PrioritizedOrderpoolHashMapNonces( + [(TestContext::ADDRESS1, nonce)].into_iter().collect(), + ); + let account_nonce = AccountNonce { + account: TestContext::ADDRESS1, + nonce, + }; + (account_nonce.clone(), TestContext { + generated_id_count: 0, + nonces, + order_pool: PrioritizedOrderpool::default(), + }) + } + + /// Context with 2 accounts to send txs from + fn new_2_accounts( + nonce_1: u64, + nonce_2: u64, + ) -> (AccountNonce, AccountNonce, TestContext) { + let nonces = PrioritizedOrderpoolHashMapNonces( + [ + (TestContext::ADDRESS1, nonce_1), + (TestContext::ADDRESS2, nonce_2), + ] + .into_iter() + .collect(), + ); + let account_nonce_1 = AccountNonce { + account: TestContext::ADDRESS1, + nonce: nonce_1, + }; + let account_nonce_2 = AccountNonce { + account: TestContext::ADDRESS2, + nonce: nonce_2, + }; + ( + account_nonce_1.clone(), + account_nonce_2.clone(), + TestContext { + generated_id_count: 0, + nonces, + order_pool: PrioritizedOrderpool::default(), + }, + ) + } + + fn new_id(&mut self) -> u64 { + let id = self.generated_id_count; + self.generated_id_count += 1; + id + } + + fn create_add_tx_order( + &mut self, + tx_nonce: &AccountNonce, + tx_profit: u64, + ) -> PrioritizedOrderpoolTestBundle { + let order = PrioritizedOrderpoolTestBundle { + id: self.new_id(), + nonces: vec![BundleNonce { + address: tx_nonce.account, + nonce: tx_nonce.nonce, + optional: false, + }], + profit: tx_profit, + }; + self + .order_pool + .insert_order(order.clone(), &self.nonces) + .unwrap(); + order + } + + fn create_add_bundle_order_2_txs( + &mut self, + tx1_nonce: &AccountNonce, + tx1_optional: bool, + tx2_nonce: &AccountNonce, + tx2_optional: bool, + bundle_profit: u64, + ) -> PrioritizedOrderpoolTestBundle { + let order = PrioritizedOrderpoolTestBundle { + id: self.new_id(), + nonces: vec![ + BundleNonce { + address: tx1_nonce.account, + nonce: tx1_nonce.nonce, + optional: tx1_optional, + }, + BundleNonce { + address: tx2_nonce.account, + nonce: tx2_nonce.nonce, + optional: tx2_optional, + }, + ], + profit: bundle_profit, + }; + self + .order_pool + .insert_order(order.clone(), &self.nonces) + .unwrap(); + order + } + + fn update_nonce(&mut self, tx_nonce: &AccountNonce, new_nonce: u64) { + self + .order_pool + .update_onchain_nonces( + &[AccountNonce { + account: tx_nonce.account, + nonce: new_nonce, + }], + &self.nonces, + ) + .unwrap(); + } + + fn assert_pop_order(&mut self, order: &PrioritizedOrderpoolTestBundle) { + assert_eq!( + self.order_pool.pop_order().map(|o| o.id()), + Some(order.id()) + ); + } + + fn assert_pop_none(&mut self) { + assert_eq!(self.order_pool.pop_order().map(|o| o.id()), None); + } +} + +#[test] +/// Tests 2 tx from different accounts, can execute both +fn test_block_orders_simple() { + let (nonce_worst_order, nonce_best_order, mut context) = + TestContext::new_2_accounts(0, 1); + let worst_order = context.create_add_tx_order(&nonce_worst_order, 0); + let best_order = context.create_add_tx_order(&nonce_best_order, 5); + // we must see first the most profitable order + context.assert_pop_order(&best_order); + // we must see second the least profitable order + context.assert_pop_order(&worst_order); + // out of orders + context.assert_pop_none(); +} + +#[test] +/// Tests 3 tx from the same account, only 1 can succeed +fn test_block_orders_competing_orders() { + let (nonce, mut context) = TestContext::new_1_account(0); + let middle_order = context.create_add_tx_order(&nonce, 3); + let best_order = context.create_add_tx_order(&nonce, 5); + let _worst_order = context.create_add_tx_order(&nonce, 1); + // we must see first the most profitable order + context.assert_pop_order(&best_order); + // we simulate that best_order failed to execute so we don't call + // update_onchain_nonces + context.assert_pop_order(&middle_order); + // we simulate that middle_order executed + context.update_nonce(&nonce, 1); + // we must see none and NOT _worst_order (invalid nonce) + context.assert_pop_none(); +} + +#[test] +/// Tests 4 tx from the same account with different nonces. +fn test_block_orders_pending_orders() { + let (nonce, mut context) = TestContext::new_1_account(0); + let first_nonce_order = context.create_add_tx_order(&nonce, 3); + let second_nonce_order_worst = + context.create_add_tx_order(&nonce.clone().with_nonce(1), 5); + let second_nonce_order_best = + context.create_add_tx_order(&nonce.clone().with_nonce(1), 6); + let _third_nonce_order = + context.create_add_tx_order(&nonce.clone().with_nonce(2), 7); + + context.assert_pop_order(&first_nonce_order); + // Until we update the execution we must see none + context.assert_pop_none(); + // executed + context.update_nonce(&nonce, 1); + context.assert_pop_order(&second_nonce_order_best); + // second_nonce_order_best failed so we don't update_nonce + context.assert_pop_order(&second_nonce_order_worst); + // No more orders for second nonce -> we must see none + context.assert_pop_none(); + // sim that last tx increased nonce twice so we skipped third_nonce_order_best + context.update_nonce(&nonce, 3); + // _third_nonce_order_best was skipped -> none + context.assert_pop_none(); +} + +#[test] +// Execute a bundle with an optional tx that fails for invalid nonce +fn test_block_orders_optional_nonce() { + let (nonce_1, nonce_2, mut context) = TestContext::new_2_accounts(0, 0); + let bundle_order = + context.create_add_bundle_order_2_txs(&nonce_1, true, &nonce_2, false, 1); + let tx_order = context.create_add_tx_order(&nonce_1, 2); + + // tx_order gives more profit + context.assert_pop_order(&tx_order); + // tx_order executed, now tx_order nonce_1 updates + context.update_nonce(&nonce_1, 1); + // Even with the first tx failing because of nonce_1 the bundle should be + // valid + context.assert_pop_order(&bundle_order); + // No more orders + context.assert_pop_none(); +} diff --git a/src/orderpool2/sim_tree.rs b/src/orderpool2/sim_tree.rs new file mode 100644 index 0000000..ca3f15f --- /dev/null +++ b/src/orderpool2/sim_tree.rs @@ -0,0 +1,274 @@ +use { + super::{AccountNonce, OrderpoolNonceSource, OrderpoolOrder}, + crate::alloy::primitives::Address, + std::{ + cmp::{Ordering, min}, + collections::{HashMap, HashSet, hash_map::Entry}, + hash::Hash, + }, + tracing::error, +}; + +pub type SimulationId = u64; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SimulationRequest { + pub id: SimulationId, + pub order: Order, + pub parents: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SimulatedResult { + pub id: SimulationId, + pub simulated_order: SimResult, + pub order: Order, + pub previous_orders: Vec, + pub nonces_after: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct PendingOrder { + order: Order, + unsatisfied_nonces: usize, +} + +#[derive(Debug)] +enum OrderNonceState { + Invalid, + PendingNonces(Vec), + Ready(Vec), +} + +pub trait SimTreeResult: Clone { + type ScoreType: Ord; + fn score(&self) -> Self::ScoreType; +} + +#[derive(Debug)] +pub struct SimTree { + // fields for nonce management + nonces: NonceSource, + + sims: HashMap>, + sims_that_update_one_nonce: HashMap, + + pending_orders: HashMap>, + pending_nonces: HashMap>, + + ready_orders: Vec>, +} + +impl SimTree +where + Order: OrderpoolOrder + Clone, + NonceSource: OrderpoolNonceSource, + SimResult: SimTreeResult, +{ + pub fn new(nonces: NonceSource) -> Self { + Self { + nonces, + sims: HashMap::default(), + sims_that_update_one_nonce: HashMap::default(), + pending_orders: HashMap::default(), + pending_nonces: HashMap::default(), + ready_orders: Vec::default(), + } + } + + pub fn push_order( + &mut self, + order: Order, + ) -> Result<(), NonceSource::NonceError> { + if self.pending_orders.contains_key(&order.id()) { + return Ok(()); + } + + let order_nonce_state = self.get_order_nonce_state(&order)?; + + match order_nonce_state { + OrderNonceState::Invalid => { + return Ok(()); + } + OrderNonceState::PendingNonces(pending_nonces) => { + let unsatisfied_nonces = pending_nonces.len(); + for nonce in pending_nonces { + self + .pending_nonces + .entry(nonce) + .or_default() + .push(order.id()); + } + self.pending_orders.insert(order.id(), PendingOrder { + order, + unsatisfied_nonces, + }); + } + OrderNonceState::Ready(parents) => { + self.ready_orders.push(SimulationRequest { + id: rand::random(), + order, + parents, + }); + } + } + Ok(()) + } + + fn get_order_nonce_state( + &mut self, + order: &Order, + ) -> Result, NonceSource::NonceError> { + let mut onchain_nonces_incremented: HashSet

= HashSet::default(); + let mut pending_nonces = Vec::new(); + let mut parent_orders = Vec::new(); + + for nonce in order.nonces() { + let onchain_nonce = self.nonces.nonce(&nonce.address)?; + + match onchain_nonce.cmp(&nonce.nonce) { + Ordering::Equal => { + // nonce, valid + onchain_nonces_incremented.insert(nonce.address); + } + Ordering::Greater => { + // nonce invalid, maybe its optional + if !nonce.optional { + // this order will never be valid + return Ok(OrderNonceState::Invalid); + } + } + Ordering::Less => { + if onchain_nonces_incremented.contains(&nonce.address) { + // we already considered this account nonce + continue; + } + // mark this nonce as considered + onchain_nonces_incremented.insert(nonce.address); + + let nonce_key = AccountNonce { + account: nonce.address, + nonce: nonce.nonce, + }; + + if let Some(sim_id) = self.sims_that_update_one_nonce.get(&nonce_key) + { + // we have something that fills this nonce + let sim = self.sims.get(sim_id).expect("we never delete sims"); + parent_orders.extend_from_slice(&sim.previous_orders); + parent_orders.push(sim.order.clone()); + continue; + } + + pending_nonces.push(nonce_key); + } + } + } + + if pending_nonces.is_empty() { + Ok(OrderNonceState::Ready(parent_orders)) + } else { + Ok(OrderNonceState::PendingNonces(pending_nonces)) + } + } + + pub fn push_orders( + &mut self, + orders: Vec, + ) -> Result<(), NonceSource::NonceError> { + for order in orders { + self.push_order(order)?; + } + Ok(()) + } + + pub fn pop_simulation_tasks( + &mut self, + limit: usize, + ) -> Vec> { + let limit = min(limit, self.ready_orders.len()); + self.ready_orders.drain(..limit).collect() + } + + // we don't really need state here because nonces are cached but its smaller + // if we reuse pending state fn + fn process_simulation_task_result( + &mut self, + result: &SimulatedResult, + ) -> Result<(), NonceSource::NonceError> { + self.sims.insert(result.id, result.clone()); + let mut orders_ready = Vec::new(); + if result.nonces_after.len() == 1 { + let updated_nonce = result.nonces_after.first().unwrap().clone(); + + match self.sims_that_update_one_nonce.entry(updated_nonce.clone()) { + Entry::Occupied(mut entry) => { + let current_score = { + let sim_id = entry.get_mut(); + self + .sims + .get(sim_id) + .expect("we never delete sims") + .simulated_order + .score() + }; + if result.simulated_order.score() > current_score { + entry.insert(result.id); + } + } + Entry::Vacant(entry) => { + entry.insert(result.id); + + if let Some(pending_orders) = + self.pending_nonces.remove(&updated_nonce) + { + for order in pending_orders { + match self.pending_orders.entry(order) { + Entry::Occupied(mut entry) => { + let pending_order = entry.get_mut(); + pending_order.unsatisfied_nonces -= 1; + if pending_order.unsatisfied_nonces == 0 { + orders_ready.push(entry.remove().order); + } + } + Entry::Vacant(_) => { + error!("SimTree bug order not found"); + } + } + } + } + } + } + } + + for ready_order in orders_ready { + let pending_state = self.get_order_nonce_state(&ready_order)?; + match pending_state { + OrderNonceState::Ready(parents) => { + self.ready_orders.push(SimulationRequest { + id: rand::random(), + order: ready_order, + parents, + }); + } + OrderNonceState::Invalid => { + error!("SimTree bug order became invalid"); + } + OrderNonceState::PendingNonces(_) => { + error!("SimTree bug order became pending again"); + } + } + } + Ok(()) + } + + pub fn submit_simulation_tasks_results( + &mut self, + results: &[SimulatedResult], + ) -> Result<(), NonceSource::NonceError> { + for result in results { + self.process_simulation_task_result(result)?; + } + Ok(()) + } +} diff --git a/src/payload/ext/checkpoint.rs b/src/payload/ext/checkpoint.rs index c661e63..69e451c 100644 --- a/src/payload/ext/checkpoint.rs +++ b/src/payload/ext/checkpoint.rs @@ -5,7 +5,11 @@ use { primitives::{Address, B256, TxHash, U256}, }, prelude::*, - reth::{errors::ProviderError, primitives::Recovered, revm::DatabaseRef}, + reth::{ + errors::ProviderError, + primitives::Recovered, + revm::{DatabaseRef, db::BundleState}, + }, }, itertools::Itertools, std::time::Instant, @@ -151,6 +155,10 @@ pub trait CheckpointExt: super::sealed::Sealed { let start = history.iter().position(|cp| cp.is_tagged(tag))?; Some(history.skip(start)) } + + /// Account nonces changed after transactions execution. + /// If transactions changes nonces nonces from N to N+1 this would return N+1. + fn changed_nonces(&self) -> Vec<(Address, u64)>; } impl CheckpointExt

for Checkpoint

{ @@ -326,6 +334,34 @@ impl CheckpointExt

for Checkpoint

{ fn building_since(&self) -> Instant { self.root().created_at() } + + fn changed_nonces(&self) -> Vec<(Address, u64)> { + let Some(res) = self.result() else { + return Vec::new(); + }; + extract_changed_nonces_for_executable(res.state()) + } +} + +/// Get changed nonces from bundle state created as a result of execution of one +/// executable +fn extract_changed_nonces_for_executable( + bundle_state: &BundleState, +) -> Vec<(Address, u64)> { + let mut result = Vec::new(); + for (address, data) in bundle_state.state() { + let old_nonce = data + .original_info + .as_ref() + .map(|a| a.nonce) + .unwrap_or_default(); + let new_nonce = data.info.as_ref().map(|a| a.nonce).unwrap_or_default(); + if old_nonce == new_nonce { + continue; + } + result.push((*address, new_nonce)); + } + result } #[cfg(test)]