diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index e9b8dec947..b816358fda 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -21,7 +21,7 @@ use { SellTokenSource as DbSellTokenSource, SigningScheme as DbSigningScheme, }, - solver_competition_v2::{Order, Solution}, + solver_competition_v2::{self, Order, Solution}, }, domain::auction::order::{ BuyTokenDestination as DomainBuyTokenDestination, @@ -1024,6 +1024,27 @@ impl Persistence { .context("solver_competition::fetch_solver_winning_solutions")?, ) } + + /// Fetches orders which are currently inflight. Those orders should + /// be omitted from the current auction to avoid onchain reverts. + pub async fn fetch_in_flight_orders( + &self, + current_block: u64, + ) -> anyhow::Result> { + let _timer = Metrics::get() + .database_queries + .with_label_values(&["inflight_orders"]) + .start_timer(); + + let mut ex = self.postgres.pool.acquire().await.context("acquire")?; + let orders = + solver_competition_v2::fetch_in_flight_orders(&mut ex, current_block.cast_signed()) + .await?; + Ok(orders + .into_iter() + .map(|o| crate::domain::OrderUid(o.0)) + .collect()) + } } #[derive(prometheus_metric_storage::MetricStorage)] diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 6fce69cbc2..937f50000f 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -3,7 +3,6 @@ use { database::competition::Competition, domain::{ self, - OrderUid, auction::Id, competition::{ self, @@ -53,7 +52,6 @@ use { }, time::{Duration, Instant}, }, - tokio::sync::Mutex, tracing::{Instrument, instrument}, }; @@ -83,7 +81,6 @@ pub struct RunLoop { solver_participation_guard: SolverParticipationGuard, solvable_orders_cache: Arc, trusted_tokens: AutoUpdatingTokenList, - in_flight_orders: Arc>>, probes: Probes, /// Maintenance tasks that should run before every runloop to have /// the most recent data available. @@ -126,7 +123,6 @@ impl RunLoop { solver_participation_guard, solvable_orders_cache, trusted_tokens, - in_flight_orders: Default::default(), probes, maintenance, competition_updates_sender, @@ -403,11 +399,6 @@ impl RunLoop { block_deadline: u64, ) { let solved_order_uids: HashSet<_> = solution.orders().keys().cloned().collect(); - self.in_flight_orders - .lock() - .await - .extend(solved_order_uids.clone()); - let solution_id = solution.id(); let solver = solution.solver(); let self_ = self.clone(); @@ -420,7 +411,6 @@ impl RunLoop { match self_ .settle( &driver_, - solved_order_uids.clone(), solver, auction_id, solution_id, @@ -734,11 +724,9 @@ impl RunLoop { /// Execute the solver's solution. Returns Ok when the corresponding /// transaction has been mined. - #[expect(clippy::too_many_arguments)] async fn settle( &self, driver: &infra::Driver, - solved_order_uids: HashSet, solver: eth::Address, auction_id: i64, solution_id: u64, @@ -794,12 +782,6 @@ impl RunLoop { self.store_execution_ended(solver, auction_id, solution_uid, &result); - // Clean up the in-flight orders regardless the result. - self.in_flight_orders - .lock() - .await - .retain(|order| !solved_order_uids.contains(order)); - result } @@ -913,13 +895,19 @@ impl RunLoop { Err(SettleError::Timeout) } - /// Removes orders that are currently being settled to avoid solvers trying - /// to fill an order a second time. + /// Removes orders that are currently being settled to avoid solver + /// solutions conflicting with each other. async fn remove_in_flight_orders( &self, mut auction: domain::RawAuctionData, ) -> domain::RawAuctionData { - let in_flight = &*self.in_flight_orders.lock().await; + let in_flight = self + .persistence + .fetch_in_flight_orders(auction.block) + .await + .inspect_err(|err| tracing::warn!(?err, "failed to fetch in-flight orders")) + .unwrap_or_default(); + if in_flight.is_empty() { return auction; }; diff --git a/crates/database/src/solver_competition_v2.rs b/crates/database/src/solver_competition_v2.rs index 0a61cd8382..11aefbcdc9 100644 --- a/crates/database/src/solver_competition_v2.rs +++ b/crates/database/src/solver_competition_v2.rs @@ -563,6 +563,35 @@ fn map_rows_to_solutions(rows: Vec) -> Result, sqlx:: Ok(solutions) } +/// Fetches all orders for which we must assume that there are +/// still onchain transactions being mined or submitted. +/// +/// Those are all orders (JIT or regular) that belong to winning +/// solutions with a deadline greater than the current block +/// where the execution actually has not been observed onchain yet. +pub async fn fetch_in_flight_orders( + ex: &mut PgConnection, + current_block: i64, +) -> Result, sqlx::Error> { + const QUERY: &str = r#" + SELECT DISTINCT order_uid + FROM competition_auctions ca + JOIN proposed_solutions ps ON ps.auction_id = ca.id + JOIN proposed_trade_executions pte ON pte.auction_id = ca.id AND pte.solution_uid = ps.uid + WHERE ca.deadline > $1 + AND ps.is_winner = true + AND NOT EXISTS ( + SELECT 1 FROM settlements s + WHERE s.auction_id = ca.id AND s.solution_uid = ps.uid + ); + "#; + + sqlx::query_as(QUERY) + .bind(current_block) + .fetch_all(ex) + .await +} + #[cfg(test)] mod tests { use { @@ -1242,4 +1271,123 @@ mod tests { assert_eq!(auction_participants.len(), 1); assert_eq!(auction_participants[0].participant, solutions[0].solver); } + + #[tokio::test] + #[ignore] + async fn postgres_fetch_inflight_orders() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + crate::clear_DANGER_(&mut db).await.unwrap(); + + let order_uid = |i| ByteArray([i; 56]); + let order = |i| Order { + uid: order_uid(i), + ..Default::default() + }; + let solutions = vec![ + Solution { + uid: 0, + id: 0.into(), + orders: vec![order(0)], + is_winner: true, + ..Default::default() + }, + Solution { + uid: 1, + id: 0.into(), + orders: vec![order(1)], + is_winner: true, + ..Default::default() + }, + ]; + crate::auction::save( + &mut db, + crate::auction::Auction { + id: 0, + block: 0, + deadline: 5, + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }, + ) + .await + .unwrap(); + save(&mut db, 0, &solutions).await.unwrap(); + + let solutions = vec![ + Solution { + uid: 2, + id: 1.into(), + orders: vec![order(2)], + is_winner: true, + ..Default::default() + }, + Solution { + uid: 3, + id: 1.into(), + orders: vec![order(3)], + is_winner: true, + ..Default::default() + }, + ]; + crate::auction::save( + &mut db, + crate::auction::Auction { + id: 1, + block: 5, + deadline: 10, + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }, + ) + .await + .unwrap(); + save(&mut db, 1, &solutions).await.unwrap(); + + // all orders in flight at block 4 + let early_block = fetch_in_flight_orders(&mut db, 4).await.unwrap(); + assert_eq!(early_block.len(), 4); + assert!( + [0, 1, 2, 3] + .into_iter() + .all(|id| early_block.contains(&order_uid(id))) + ); + + // only orders from the later auction in flight at block 5 + let later_block = fetch_in_flight_orders(&mut db, 5).await.unwrap(); + assert_eq!(later_block.len(), 2); + assert!( + [2, 3] + .into_iter() + .all(|id| later_block.contains(&order_uid(id))) + ); + + // observe settlement event + crate::events::insert_settlement( + &mut db, + &EventIndex { + block_number: 5, + log_index: 0, + }, + &Default::default(), + ) + .await + .unwrap(); + // associate with auction 1 + settlements::update_settlement_auction(&mut db, 5, 0, 1) + .await + .unwrap(); + // associate with solution 3 + settlements::update_settlement_solver(&mut db, 5, 0, Default::default(), 3) + .await + .unwrap(); + + // when an order gets marked as settled we dont consider it inflight anymore + let later_block_with_settlement = fetch_in_flight_orders(&mut db, 5).await.unwrap(); + assert_eq!(later_block_with_settlement, vec![order_uid(2)]); + } } diff --git a/database/README.md b/database/README.md index 04d5407171..8176269204 100644 --- a/database/README.md +++ b/database/README.md @@ -64,6 +64,7 @@ Contains all auctions for which a valid solver competition exists. Indexes: - PRIMARY KEY: btree(`id`) +- competition_auction_deadline: btree(`deadline`) ### ethflow\_orders diff --git a/database/sql/V100__add_auction_deadline.sql b/database/sql/V100__add_auction_deadline.sql new file mode 100644 index 0000000000..46e29b8ae3 --- /dev/null +++ b/database/sql/V100__add_auction_deadline.sql @@ -0,0 +1,2 @@ +-- adds index on the deadline of an auction to quickly look up inflight orders from the db +CREATE INDEX CONCURRENTLY competition_auction_deadline ON competition_auctions USING BTREE(deadline);