Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
SellTokenSource as DbSellTokenSource,
SigningScheme as DbSigningScheme,
},
solver_competition_v2::{Order, Solution},
solver_competition_v2::{self, Order, Solution},
},
domain::auction::order::{
BuyTokenDestination as DomainBuyTokenDestination,
Expand Down Expand Up @@ -1024,6 +1024,27 @@ impl Persistence {
.context("solver_competition::fetch_solver_winning_solutions")?,
)
}

/// Fetches orders which are currently inflight. Those orders should
/// be omitted from the current auction to avoid onchain reverts.
pub async fn fetch_in_flight_orders(
&self,
current_block: u64,
) -> anyhow::Result<HashSet<crate::domain::OrderUid>> {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["inflight_orders"])
.start_timer();

let mut ex = self.postgres.pool.acquire().await.context("acquire")?;
let orders =
solver_competition_v2::fetch_in_flight_orders(&mut ex, current_block.cast_signed())
Comment thread
MartinquaXD marked this conversation as resolved.
.await?;
Ok(orders
.into_iter()
.map(|o| crate::domain::OrderUid(o.0))
.collect())
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
Expand Down
30 changes: 9 additions & 21 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use {
database::competition::Competition,
domain::{
self,
OrderUid,
auction::Id,
competition::{
self,
Expand Down Expand Up @@ -53,7 +52,6 @@ use {
},
time::{Duration, Instant},
},
tokio::sync::Mutex,
tracing::{Instrument, instrument},
};

Expand Down Expand Up @@ -83,7 +81,6 @@ pub struct RunLoop {
solver_participation_guard: SolverParticipationGuard,
solvable_orders_cache: Arc<SolvableOrdersCache>,
trusted_tokens: AutoUpdatingTokenList,
in_flight_orders: Arc<Mutex<HashSet<OrderUid>>>,
probes: Probes,
/// Maintenance tasks that should run before every runloop to have
/// the most recent data available.
Expand Down Expand Up @@ -126,7 +123,6 @@ impl RunLoop {
solver_participation_guard,
solvable_orders_cache,
trusted_tokens,
in_flight_orders: Default::default(),
probes,
maintenance,
competition_updates_sender,
Expand Down Expand Up @@ -403,11 +399,6 @@ impl RunLoop {
block_deadline: u64,
) {
let solved_order_uids: HashSet<_> = solution.orders().keys().cloned().collect();
self.in_flight_orders
.lock()
.await
.extend(solved_order_uids.clone());
Comment thread
squadgazzz marked this conversation as resolved.

let solution_id = solution.id();
let solver = solution.solver();
let self_ = self.clone();
Expand All @@ -420,7 +411,6 @@ impl RunLoop {
match self_
.settle(
&driver_,
solved_order_uids.clone(),
solver,
auction_id,
solution_id,
Expand Down Expand Up @@ -734,11 +724,9 @@ impl RunLoop {

/// Execute the solver's solution. Returns Ok when the corresponding
/// transaction has been mined.
#[expect(clippy::too_many_arguments)]
async fn settle(
&self,
driver: &infra::Driver,
solved_order_uids: HashSet<OrderUid>,
solver: eth::Address,
auction_id: i64,
solution_id: u64,
Expand Down Expand Up @@ -794,12 +782,6 @@ impl RunLoop {

self.store_execution_ended(solver, auction_id, solution_uid, &result);

// Clean up the in-flight orders regardless the result.
self.in_flight_orders
.lock()
.await
.retain(|order| !solved_order_uids.contains(order));

result
}

Expand Down Expand Up @@ -913,13 +895,19 @@ impl RunLoop {
Err(SettleError::Timeout)
}

/// Removes orders that are currently being settled to avoid solvers trying
/// to fill an order a second time.
/// Removes orders that are currently being settled to avoid solver
/// solutions conflicting with each other.
async fn remove_in_flight_orders(
&self,
mut auction: domain::RawAuctionData,
) -> domain::RawAuctionData {
let in_flight = &*self.in_flight_orders.lock().await;
let in_flight = self
.persistence
.fetch_in_flight_orders(auction.block)
Comment thread
jmg-duarte marked this conversation as resolved.
.await
.inspect_err(|err| tracing::warn!(?err, "failed to fetch in-flight orders"))
.unwrap_or_default();
Comment thread
MartinquaXD marked this conversation as resolved.

if in_flight.is_empty() {
return auction;
};
Expand Down
148 changes: 148 additions & 0 deletions crates/database/src/solver_competition_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,35 @@ fn map_rows_to_solutions(rows: Vec<SolutionRow>) -> Result<Vec<Solution>, sqlx::
Ok(solutions)
}

/// Fetches all orders for which we must assume that there are
/// still onchain transactions being mined or submitted.
///
/// Those are all orders (JIT or regular) that belong to winning
/// solutions with a deadline greater than the current block
/// where the execution actually has not been observed onchain yet.
pub async fn fetch_in_flight_orders(
ex: &mut PgConnection,
current_block: i64,
) -> Result<Vec<OrderUid>, sqlx::Error> {
const QUERY: &str = r#"
SELECT DISTINCT order_uid
FROM competition_auctions ca
JOIN proposed_solutions ps ON ps.auction_id = ca.id
JOIN proposed_trade_executions pte ON pte.auction_id = ca.id AND pte.solution_uid = ps.uid
WHERE ca.deadline > $1
AND ps.is_winner = true
AND NOT EXISTS (
SELECT 1 FROM settlements s
WHERE s.auction_id = ca.id AND s.solution_uid = ps.uid
);
"#;

sqlx::query_as(QUERY)
.bind(current_block)
.fetch_all(ex)
.await
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -1242,4 +1271,123 @@ mod tests {
assert_eq!(auction_participants.len(), 1);
assert_eq!(auction_participants[0].participant, solutions[0].solver);
}

#[tokio::test]
#[ignore]
async fn postgres_fetch_inflight_orders() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

let order_uid = |i| ByteArray([i; 56]);
let order = |i| Order {
uid: order_uid(i),
..Default::default()
};
let solutions = vec![
Solution {
uid: 0,
id: 0.into(),
orders: vec![order(0)],
is_winner: true,
..Default::default()
},
Solution {
uid: 1,
id: 0.into(),
orders: vec![order(1)],
is_winner: true,
..Default::default()
},
];
crate::auction::save(
&mut db,
crate::auction::Auction {
id: 0,
block: 0,
deadline: 5,
order_uids: Default::default(),
price_tokens: Default::default(),
price_values: Default::default(),
surplus_capturing_jit_order_owners: Default::default(),
},
)
.await
.unwrap();
save(&mut db, 0, &solutions).await.unwrap();

let solutions = vec![
Solution {
uid: 2,
id: 1.into(),
orders: vec![order(2)],
is_winner: true,
..Default::default()
},
Solution {
uid: 3,
id: 1.into(),
orders: vec![order(3)],
is_winner: true,
..Default::default()
},
];
crate::auction::save(
&mut db,
crate::auction::Auction {
id: 1,
block: 5,
deadline: 10,
order_uids: Default::default(),
price_tokens: Default::default(),
price_values: Default::default(),
surplus_capturing_jit_order_owners: Default::default(),
},
)
.await
.unwrap();
save(&mut db, 1, &solutions).await.unwrap();

// all orders in flight at block 4
let early_block = fetch_in_flight_orders(&mut db, 4).await.unwrap();
assert_eq!(early_block.len(), 4);
assert!(
[0, 1, 2, 3]
.into_iter()
.all(|id| early_block.contains(&order_uid(id)))
);

// only orders from the later auction in flight at block 5
let later_block = fetch_in_flight_orders(&mut db, 5).await.unwrap();
assert_eq!(later_block.len(), 2);
assert!(
[2, 3]
.into_iter()
.all(|id| later_block.contains(&order_uid(id)))
);

// observe settlement event
crate::events::insert_settlement(
&mut db,
&EventIndex {
block_number: 5,
log_index: 0,
},
&Default::default(),
)
.await
.unwrap();
// associate with auction 1
settlements::update_settlement_auction(&mut db, 5, 0, 1)
.await
.unwrap();
// associate with solution 3
settlements::update_settlement_solver(&mut db, 5, 0, Default::default(), 3)
.await
.unwrap();

// when an order gets marked as settled we dont consider it inflight anymore
let later_block_with_settlement = fetch_in_flight_orders(&mut db, 5).await.unwrap();
assert_eq!(later_block_with_settlement, vec![order_uid(2)]);
Comment thread
MartinquaXD marked this conversation as resolved.
}
}
1 change: 1 addition & 0 deletions database/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Contains all auctions for which a valid solver competition exists.

Indexes:
- PRIMARY KEY: btree(`id`)
- competition_auction_deadline: btree(`deadline`)

### ethflow\_orders

Expand Down
2 changes: 2 additions & 0 deletions database/sql/V100__add_auction_deadline.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- adds index on the deadline of an auction to quickly look up inflight orders from the db
CREATE INDEX CONCURRENTLY competition_auction_deadline ON competition_auctions USING BTREE(deadline);
Comment thread
MartinquaXD marked this conversation as resolved.
Loading