From ec54b01b18e6d089fc03183cc90f8f6223352d2e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 7 May 2024 08:53:36 -0500 Subject: [PATCH] Scheduler: throttle thread queueing by CUs (#1101) --- .../prio_graph_scheduler.rs | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 806a573b90218d..1a29e910dc8bbe 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -15,6 +15,7 @@ use { crossbeam_channel::{Receiver, Sender, TryRecvError}, itertools::izip, prio_graph::{AccessKind, PrioGraph}, + solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, solana_measure::measure_us, solana_sdk::{ pubkey::Pubkey, saturating_add_assign, slot_history::Slot, @@ -68,6 +69,23 @@ impl PrioGraphScheduler { pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, ) -> Result { let num_threads = self.consume_work_senders.len(); + let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64; + + let mut schedulable_threads = ThreadSet::any(num_threads); + for thread_id in 0..num_threads { + if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id] >= max_cu_per_thread { + schedulable_threads.remove(thread_id); + } + } + if schedulable_threads.is_empty() { + return Ok(SchedulingSummary { + num_scheduled: 0, + num_unschedulable: 0, + num_filtered_out: 0, + filter_time_us: 0, + }); + } + let mut batches = Batches::new(num_threads); // Some transactions may be unschedulable due to multi-thread conflicts. // These transactions cannot be scheduled until some conflicting work is completed. @@ -173,7 +191,7 @@ impl PrioGraphScheduler { let Some(thread_id) = self.account_locks.try_lock_accounts( transaction_locks.writable.into_iter(), transaction_locks.readonly.into_iter(), - ThreadSet::any(num_threads), + schedulable_threads, |thread_set| { Self::select_thread( thread_set, @@ -207,7 +225,17 @@ impl PrioGraphScheduler { if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { saturating_add_assign!(num_sent, self.send_batch(&mut batches, thread_id)?); } - + // if the thread is at max_cu_per_thread, remove it from the schedulable threads + // if there are no more schedulable threads, stop scheduling. + if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id] + + batches.total_cus[thread_id] + >= max_cu_per_thread + { + schedulable_threads.remove(thread_id); + if schedulable_threads.is_empty() { + break; + } + } if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { break; }