From 74b2f049eceebd947ed04180f073ad3bb42e7b4f Mon Sep 17 00:00:00 2001 From: allada Date: Wed, 5 Jul 2023 17:21:45 -0500 Subject: [PATCH] Move scheduler to use blocking mutexes This will improve the performance of the scheduler by quite a bit since the scheduler will not need to context switch very much. The big change here is that `do_try_match()` will now run in a spawn and is notified when it needs to be run. This will add another layer of improvements, since this function is so expensive to run, it allows many changes to tasks<->workers to be queued up and run the matching all at once instead of each time any change happens. --- cas/scheduler/BUILD | 3 +- cas/scheduler/scheduler.rs | 134 +++++++++++++++++--------- cas/scheduler/tests/scheduler_test.rs | 94 +++++++++++++----- 3 files changed, 161 insertions(+), 70 deletions(-) diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index 572ffd7a6..e16afa27d 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -28,7 +28,8 @@ rust_library( "//config", "//util:common", "//util:error", - "@crate_index//:async-lock", + "@crate_index//:parking_lot", + "@crate_index//:futures", "@crate_index//:lru", "@crate_index//:rand", "@crate_index//:tokio", diff --git a/cas/scheduler/scheduler.rs b/cas/scheduler/scheduler.rs index 26c6fe1ff..85c143bbc 100644 --- a/cas/scheduler/scheduler.rs +++ b/cas/scheduler/scheduler.rs @@ -17,10 +17,13 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::time::SystemTime; -use async_lock::Mutex; +use futures::Future; use lru::LruCache; +use parking_lot::Mutex; use rand::{thread_rng, Rng}; -use tokio::sync::watch; +use tokio::sync::{watch, Notify}; +use tokio::task::JoinHandle; +use tokio::time::Duration; use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata}; use common::{log, DigestInfo}; @@ -132,9 +135,6 @@ impl Workers { } } -/// Simple helper type to help with self-documentation. -type ShouldRunAgain = bool; - struct SchedulerImpl { // BTreeMap uses `cmp` to do it's comparisons, this is a problem because we want to sort our // actions based on priority and insert timestamp but also want to find and join new actions @@ -154,6 +154,9 @@ struct SchedulerImpl { worker_timeout_s: u64, /// Default times a job can retry before failing. max_job_retries: usize, + + /// Notify task<->worker matching engine that work needs to be done. + tasks_or_workers_change_notify: Arc, } impl SchedulerImpl { @@ -229,7 +232,7 @@ impl SchedulerImpl { }, ); - self.do_try_match(); + self.tasks_or_workers_change_notify.notify_one(); Ok(rx) } @@ -296,7 +299,6 @@ impl SchedulerImpl { } /// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it. - /// Note: This will not call .do_try_match(). fn immediate_evict_worker(&mut self, worker_id: &WorkerId) { if let Some(mut worker) = self.workers.remove_worker(&worker_id) { // We don't care if we fail to send message to worker, this is only a best attempt. @@ -307,18 +309,14 @@ impl SchedulerImpl { self.retry_action(&action_info, &worker_id); } } - } - - /// Wrapper to keep running in the event we could not complete all scheduling in one iteration. - fn do_try_match(&mut self) { - // Run do_try_match until it doesn't need to run again. - while self.inner_do_try_match() {} + // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. + self.tasks_or_workers_change_notify.notify_one(); } // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we can create a map // of capabilities of each worker and then try and match the actions to the worker using // the map lookup (ie. map reduce). - fn inner_do_try_match(&mut self) -> ShouldRunAgain { + fn do_try_match(&mut self) { // TODO(blaise.bruer) This is a bit difficult because of how rust's borrow checker gets in // the way. We need to conditionally remove items from the `queued_action`. Rust is working // to add `drain_filter`, which would in theory solve this problem, but because we need @@ -352,9 +350,7 @@ impl SchedulerImpl { // Remove worker, as it is no longer receiving messages and let it try to find another worker. log::warn!("Worker command failed, removing worker {}", worker_id); self.immediate_evict_worker(&worker_id); - // After evicting the worker, all running actions have been - // placed back in the queue, so start again now. - return true; + return; } // At this point everything looks good, so remove it from the queue and add it to active actions. @@ -383,7 +379,6 @@ impl SchedulerImpl { }, ); } - false } fn update_action_with_internal_error( @@ -426,7 +421,7 @@ impl SchedulerImpl { self.retry_action(&action_info, &worker_id); } - async fn update_action( + fn update_action( &mut self, worker_id: &WorkerId, action_info_hash_key: &ActionInfoHashKey, @@ -490,7 +485,7 @@ impl SchedulerImpl { .get_mut(worker_id) .ok_or_else(|| make_input_err!("WorkerId '{}' does not exist in workers map", worker_id))?; worker.complete_action(&action_info); - self.do_try_match(); + self.tasks_or_workers_change_notify.notify_one(); } // TODO(allada) We should probably hold a small queue of recent actions for debugging. @@ -503,12 +498,30 @@ impl SchedulerImpl { /// the worker nodes. All state on how the workers and actions are interacting /// should be held in this struct. pub struct Scheduler { - inner: Mutex, + inner: Arc>, platform_property_manager: PlatformPropertyManager, + task_worker_matching_future: JoinHandle<()>, } impl Scheduler { + #[inline] pub fn new(scheduler_cfg: &SimpleScheduler) -> Self { + Self::new_with_callback(scheduler_cfg, || { + // The cost of running `do_try_match()` is very high, but constant + // in relation to the number of changes that have happened. This means + // that grabbing this lock to process `do_try_match()` should always + // yield to any other tasks that might want the lock. The easiest and + // most fair way to do this is to sleep for a small amount of time. + // Using something like tokio::task::yield_now() does not yield as + // aggresively as we'd like if new futures are scheduled within a future. + tokio::time::sleep(Duration::from_millis(1)) + }) + } + + pub fn new_with_callback + Send, F: Fn() -> Fut + Send + Sync + 'static>( + scheduler_cfg: &SimpleScheduler, + on_matching_engine_run: F, + ) -> Self { let platform_property_manager = PlatformPropertyManager::new( scheduler_cfg .supported_platform_properties @@ -526,16 +539,42 @@ impl Scheduler { max_job_retries = DEFAULT_MAX_JOB_RETRIES; } + let tasks_or_workers_change_notify = Arc::new(Notify::new()); + + let inner = Arc::new(Mutex::new(SchedulerImpl { + queued_actions_set: HashSet::new(), + queued_actions: BTreeMap::new(), + workers: Workers::new(), + active_actions: HashMap::new(), + worker_timeout_s, + max_job_retries, + tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(), + })); + let weak_inner = Arc::downgrade(&inner); Self { - inner: Mutex::new(SchedulerImpl { - queued_actions_set: HashSet::new(), - queued_actions: BTreeMap::new(), - workers: Workers::new(), - active_actions: HashMap::new(), - worker_timeout_s, - max_job_retries, - }), + inner: inner.clone(), platform_property_manager, + task_worker_matching_future: tokio::spawn(async move { + // Break out of the loop only when the inner is dropped. + loop { + tasks_or_workers_change_notify.notified().await; + match weak_inner.upgrade() { + // Note: According to `parking_lot` documentation, the default + // `Mutex` implementation is eventual fairness, so we don't + // really need to worry about this thread taking the lock + // starving other threads too much. + Some(inner_mux) => { + let mut inner = inner_mux.lock(); + inner.do_try_match(); + } + // If the inner went away it means the scheduler is shutting + // down, so we need to resolve our future. + None => return, + }; + on_matching_engine_run().await; + } + // Unreachable. + }), } } @@ -546,22 +585,21 @@ impl Scheduler { /// Adds a worker to the scheduler and begin using it to execute actions (when able). pub async fn add_worker(&self, worker: Worker) -> Result<(), Error> { let worker_id = worker.id.clone(); - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); let res = inner .workers .add_worker(worker) .err_tip(|| "Error while adding worker, removing from pool"); if res.is_err() { inner.immediate_evict_worker(&worker_id); - return res; } - inner.do_try_match(); - Ok(()) + inner.tasks_or_workers_change_notify.notify_one(); + res } /// Adds an action to the scheduler for remote execution. pub async fn add_action(&self, action_info: ActionInfo) -> Result>, Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner.add_action(action_info) } @@ -571,7 +609,7 @@ impl Scheduler { action_info_hash_key: &ActionInfoHashKey, err: Error, ) { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner.update_action_with_internal_error(worker_id, action_info_hash_key, err) } @@ -582,8 +620,8 @@ impl Scheduler { action_info_hash_key: &ActionInfoHashKey, action_stage: ActionStage, ) -> Result<(), Error> { - let mut inner = self.inner.lock().await; - inner.update_action(worker_id, action_info_hash_key, action_stage).await + let mut inner = self.inner.lock(); + inner.update_action(worker_id, action_info_hash_key, action_stage) } /// Event for when the keep alive message was received from the worker. @@ -592,7 +630,7 @@ impl Scheduler { worker_id: &WorkerId, timestamp: WorkerTimestamp, ) -> Result<(), Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner .workers .refresh_lifetime(worker_id, timestamp) @@ -601,13 +639,12 @@ impl Scheduler { /// Removes worker from pool and reschedule any tasks that might be running on it. pub async fn remove_worker(&self, worker_id: WorkerId) { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); inner.immediate_evict_worker(&worker_id); - inner.do_try_match(); } pub async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); // Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire // map most of the time. let worker_ids_to_remove: Vec = inner @@ -623,25 +660,22 @@ impl Scheduler { } }) .collect(); - for worker_id in worker_ids_to_remove.as_slice() { + for worker_id in worker_ids_to_remove.iter() { log::warn!("Worker {} timed out, removing from pool", worker_id); inner.immediate_evict_worker(&worker_id); } - if !worker_ids_to_remove.is_empty() { - inner.do_try_match(); - } Ok(()) } /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests. pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool { - let inner = self.inner.lock().await; + let inner = self.inner.lock(); inner.workers.workers.contains(worker_id) } /// A unit test function used to send the keep alive message to the worker from the server. pub async fn send_keep_alive_to_worker_for_test(&self, worker_id: &WorkerId) -> Result<(), Error> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.lock(); let worker = inner .workers .workers @@ -650,3 +684,9 @@ impl Scheduler { worker.keep_alive() } } + +impl Drop for Scheduler { + fn drop(&mut self) { + self.task_worker_matching_future.abort(); + } +} diff --git a/cas/scheduler/tests/scheduler_test.rs b/cas/scheduler/tests/scheduler_test.rs index a19d74a00..a7c9c2517 100644 --- a/cas/scheduler/tests/scheduler_test.rs +++ b/cas/scheduler/tests/scheduler_test.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -82,6 +83,7 @@ async fn setup_new_worker( let (tx, mut rx) = mpsc::unbounded_channel(); let worker = Worker::new(worker_id, props, tx, NOW_TIME); scheduler.add_worker(worker).await.err_tip(|| "Failed to add worker")?; + tokio::task::yield_now().await; // Allow task<->worker matcher to run. verify_initial_connection_message(worker_id, &mut rx).await; Ok(rx) } @@ -95,7 +97,9 @@ async fn setup_action( let mut action_info = make_base_action_info(insert_timestamp); action_info.platform_properties = platform_properties; action_info.unique_qualifier.digest = action_digest; - scheduler.add_action(action_info).await + let result = scheduler.add_action(action_info).await; + tokio::task::yield_now().await; // Allow task<->worker matcher to run. + result } #[cfg(test)] @@ -109,7 +113,7 @@ mod scheduler_tests { async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x123456789111); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; @@ -153,10 +157,13 @@ mod scheduler_tests { async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> { const WORKER_ID1: WorkerId = WorkerId(0x111111); const WORKER_ID2: WorkerId = WorkerId(0x222222); - let scheduler = Scheduler::new(&SimpleScheduler { - worker_timeout_s: WORKER_TIMEOUT_S, - ..Default::default() - }); + let scheduler = Scheduler::new_with_callback( + &SimpleScheduler { + worker_timeout_s: WORKER_TIMEOUT_S, + ..Default::default() + }, + || async move {}, + ); let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); @@ -246,6 +253,7 @@ mod scheduler_tests { // Now remove worker. scheduler.remove_worker(WORKER_ID1).await; + tokio::task::yield_now().await; // Allow task<->worker matcher to run. { // Worker1 should have received a disconnect message. @@ -285,7 +293,7 @@ mod scheduler_tests { #[tokio::test] async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), Error> { - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut platform_properties = PlatformProperties::default(); platform_properties @@ -360,7 +368,7 @@ mod scheduler_tests { async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x100009); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut expected_action_state = ActionState { @@ -429,7 +437,7 @@ mod scheduler_tests { #[tokio::test] async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x100010); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; @@ -459,10 +467,13 @@ mod scheduler_tests { async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { const WORKER_ID1: WorkerId = WorkerId(0x111111); const WORKER_ID2: WorkerId = WorkerId(0x222222); - let scheduler = Scheduler::new(&SimpleScheduler { - worker_timeout_s: WORKER_TIMEOUT_S, - ..Default::default() - }); + let scheduler = Scheduler::new_with_callback( + &SimpleScheduler { + worker_timeout_s: WORKER_TIMEOUT_S, + ..Default::default() + }, + || async move {}, + ); let action_digest = DigestInfo::new([99u8; 32], 512); // Note: This needs to stay in scope or a disconnect will trigger. @@ -514,6 +525,7 @@ mod scheduler_tests { .await?; // This should remove worker 1 (the one executing our job). scheduler.remove_timedout_workers(NOW_TIME + WORKER_TIMEOUT_S).await?; + tokio::task::yield_now().await; // Allow task<->worker matcher to run. { // Worker1 should have received a disconnect message. @@ -544,7 +556,7 @@ mod scheduler_tests { async fn update_action_sends_completed_result_to_client_test() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x123456789111); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; @@ -634,7 +646,7 @@ mod scheduler_tests { const GOOD_WORKER_ID: WorkerId = WorkerId(0x123456789111); const ROGUE_WORKER_ID: WorkerId = WorkerId(0x987654321); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = setup_new_worker(&scheduler, GOOD_WORKER_ID, Default::default()).await?; @@ -718,7 +730,7 @@ mod scheduler_tests { async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x10000f); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest = DigestInfo::new([99u8; 32], 512); let mut expected_action_state = ActionState { @@ -822,7 +834,7 @@ mod scheduler_tests { async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x123456789111); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -960,7 +972,7 @@ mod scheduler_tests { async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x123456789111); - let scheduler = Scheduler::new(&SimpleScheduler::default()); + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), || async move {}); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1008,10 +1020,13 @@ mod scheduler_tests { async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> { const WORKER_ID: WorkerId = WorkerId(0x123456789111); - let scheduler = Scheduler::new(&SimpleScheduler { - max_job_retries: 2, - ..Default::default() - }); + let scheduler = Scheduler::new_with_callback( + &SimpleScheduler { + max_job_retries: 2, + ..Default::default() + }, + || async move {}, + ); let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?; @@ -1112,4 +1127,39 @@ mod scheduler_tests { Ok(()) } + + #[tokio::test] + async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { + struct DropChecker { + dropped: Arc, + } + impl Drop for DropChecker { + fn drop(&mut self) { + self.dropped.store(true, Ordering::Relaxed); + } + } + + let dropped = Arc::new(AtomicBool::new(false)); + let drop_checker = Arc::new(DropChecker { + dropped: dropped.clone(), + }); + + // Since the inner spawn owns this callback, we can use the callback to know if the + // inner spawn was dropped because our callback would be dropped, which dropps our + // DropChecker. + let scheduler = Scheduler::new_with_callback(&SimpleScheduler::default(), move || { + // This will ensure dropping happens if this function is ever dropped. + let _drop_checker = drop_checker.clone(); + async move {} + }); + assert_eq!(dropped.load(Ordering::Relaxed), false); + + drop(scheduler); + tokio::task::yield_now().await; // The drop may happen in a different task. + + // Ensure our callback was dropped. + assert_eq!(dropped.load(Ordering::Relaxed), true); + + Ok(()) + } }