Skip to content

Commit

Permalink
Scheduler: throttle thread queueing by CUs (#1101)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed May 7, 2024
1 parent 4ae2ca1 commit ec54b01
Showing 1 changed file with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
crossbeam_channel::{Receiver, Sender, TryRecvError},
itertools::izip,
prio_graph::{AccessKind, PrioGraph},
solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS,
solana_measure::measure_us,
solana_sdk::{
pubkey::Pubkey, saturating_add_assign, slot_history::Slot,
Expand Down Expand Up @@ -68,6 +69,23 @@ impl PrioGraphScheduler {
pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64;

let mut schedulable_threads = ThreadSet::any(num_threads);
for thread_id in 0..num_threads {
if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id] >= max_cu_per_thread {
schedulable_threads.remove(thread_id);
}
}
if schedulable_threads.is_empty() {
return Ok(SchedulingSummary {
num_scheduled: 0,
num_unschedulable: 0,
num_filtered_out: 0,
filter_time_us: 0,
});
}

let mut batches = Batches::new(num_threads);
// Some transactions may be unschedulable due to multi-thread conflicts.
// These transactions cannot be scheduled until some conflicting work is completed.
Expand Down Expand Up @@ -173,7 +191,7 @@ impl PrioGraphScheduler {
let Some(thread_id) = self.account_locks.try_lock_accounts(
transaction_locks.writable.into_iter(),
transaction_locks.readonly.into_iter(),
ThreadSet::any(num_threads),
schedulable_threads,
|thread_set| {
Self::select_thread(
thread_set,
Expand Down Expand Up @@ -207,7 +225,17 @@ impl PrioGraphScheduler {
if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH {
saturating_add_assign!(num_sent, self.send_batch(&mut batches, thread_id)?);
}

// if the thread is at max_cu_per_thread, remove it from the schedulable threads
// if there are no more schedulable threads, stop scheduling.
if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id]
+ batches.total_cus[thread_id]
>= max_cu_per_thread
{
schedulable_threads.remove(thread_id);
if schedulable_threads.is_empty() {
break;
}
}
if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
break;
}
Expand Down

0 comments on commit ec54b01

Please sign in to comment.