Skip to content

Commit

Permalink
Move scheduler to use blocking mutexes
Browse files Browse the repository at this point in the history
This will improve the performance of the scheduler by quite a bit
since the scheduler will not need to context switch very much.

The big change here is that `do_try_match()` will now run in a spawn
and is notified when it needs to be run. This will add another layer
of improvements, since this function is so expensive to run, it
allows many changes to tasks<->workers to be queued up and run the
matching all at once instead of each time any change happens.
  • Loading branch information
allada committed Jul 9, 2023
1 parent b62338b commit 74b2f04
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 70 deletions.
3 changes: 2 additions & 1 deletion cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ rust_library(
"//config",
"//util:common",
"//util:error",
"@crate_index//:async-lock",
"@crate_index//:parking_lot",
"@crate_index//:futures",
"@crate_index//:lru",
"@crate_index//:rand",
"@crate_index//:tokio",
Expand Down
134 changes: 87 additions & 47 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;

use async_lock::Mutex;
use futures::Future;
use lru::LruCache;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use tokio::sync::watch;
use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::{log, DigestInfo};
Expand Down Expand Up @@ -132,9 +135,6 @@ impl Workers {
}
}

/// Simple helper type to help with self-documentation.
type ShouldRunAgain = bool;

struct SchedulerImpl {
// BTreeMap uses `cmp` to do it's comparisons, this is a problem because we want to sort our
// actions based on priority and insert timestamp but also want to find and join new actions
Expand All @@ -154,6 +154,9 @@ struct SchedulerImpl {
worker_timeout_s: u64,
/// Default times a job can retry before failing.
max_job_retries: usize,

/// Notify task<->worker matching engine that work needs to be done.
tasks_or_workers_change_notify: Arc<Notify>,
}

impl SchedulerImpl {
Expand Down Expand Up @@ -229,7 +232,7 @@ impl SchedulerImpl {
},
);

self.do_try_match();
self.tasks_or_workers_change_notify.notify_one();
Ok(rx)
}

Expand Down Expand Up @@ -296,7 +299,6 @@ impl SchedulerImpl {
}

/// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
/// Note: This will not call .do_try_match().
fn immediate_evict_worker(&mut self, worker_id: &WorkerId) {
if let Some(mut worker) = self.workers.remove_worker(&worker_id) {
// We don't care if we fail to send message to worker, this is only a best attempt.
Expand All @@ -307,18 +309,14 @@ impl SchedulerImpl {
self.retry_action(&action_info, &worker_id);
}
}
}

/// Wrapper to keep running in the event we could not complete all scheduling in one iteration.
fn do_try_match(&mut self) {
// Run do_try_match until it doesn't need to run again.
while self.inner_do_try_match() {}
// Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
self.tasks_or_workers_change_notify.notify_one();
}

// TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we can create a map
// of capabilities of each worker and then try and match the actions to the worker using
// the map lookup (ie. map reduce).
fn inner_do_try_match(&mut self) -> ShouldRunAgain {
fn do_try_match(&mut self) {
// TODO(blaise.bruer) This is a bit difficult because of how rust's borrow checker gets in
// the way. We need to conditionally remove items from the `queued_action`. Rust is working
// to add `drain_filter`, which would in theory solve this problem, but because we need
Expand Down Expand Up @@ -352,9 +350,7 @@ impl SchedulerImpl {
// Remove worker, as it is no longer receiving messages and let it try to find another worker.
log::warn!("Worker command failed, removing worker {}", worker_id);
self.immediate_evict_worker(&worker_id);
// After evicting the worker, all running actions have been
// placed back in the queue, so start again now.
return true;
return;
}

// At this point everything looks good, so remove it from the queue and add it to active actions.
Expand Down Expand Up @@ -383,7 +379,6 @@ impl SchedulerImpl {
},
);
}
false
}

fn update_action_with_internal_error(
Expand Down Expand Up @@ -426,7 +421,7 @@ impl SchedulerImpl {
self.retry_action(&action_info, &worker_id);
}

async fn update_action(
fn update_action(
&mut self,
worker_id: &WorkerId,
action_info_hash_key: &ActionInfoHashKey,
Expand Down Expand Up @@ -490,7 +485,7 @@ impl SchedulerImpl {
.get_mut(worker_id)
.ok_or_else(|| make_input_err!("WorkerId '{}' does not exist in workers map", worker_id))?;
worker.complete_action(&action_info);
self.do_try_match();
self.tasks_or_workers_change_notify.notify_one();
}

// TODO(allada) We should probably hold a small queue of recent actions for debugging.
Expand All @@ -503,12 +498,30 @@ impl SchedulerImpl {
/// the worker nodes. All state on how the workers and actions are interacting
/// should be held in this struct.
pub struct Scheduler {
inner: Mutex<SchedulerImpl>,
inner: Arc<Mutex<SchedulerImpl>>,
platform_property_manager: PlatformPropertyManager,
task_worker_matching_future: JoinHandle<()>,
}

impl Scheduler {
#[inline]
pub fn new(scheduler_cfg: &SimpleScheduler) -> Self {
Self::new_with_callback(scheduler_cfg, || {
// The cost of running `do_try_match()` is very high, but constant
// in relation to the number of changes that have happened. This means
// that grabbing this lock to process `do_try_match()` should always
// yield to any other tasks that might want the lock. The easiest and
// most fair way to do this is to sleep for a small amount of time.
// Using something like tokio::task::yield_now() does not yield as
// aggresively as we'd like if new futures are scheduled within a future.
tokio::time::sleep(Duration::from_millis(1))
})
}

pub fn new_with_callback<Fut: Future<Output = ()> + Send, F: Fn() -> Fut + Send + Sync + 'static>(
scheduler_cfg: &SimpleScheduler,
on_matching_engine_run: F,
) -> Self {
let platform_property_manager = PlatformPropertyManager::new(
scheduler_cfg
.supported_platform_properties
Expand All @@ -526,16 +539,42 @@ impl Scheduler {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

let tasks_or_workers_change_notify = Arc::new(Notify::new());

let inner = Arc::new(Mutex::new(SchedulerImpl {
queued_actions_set: HashSet::new(),
queued_actions: BTreeMap::new(),
workers: Workers::new(),
active_actions: HashMap::new(),
worker_timeout_s,
max_job_retries,
tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(),
}));
let weak_inner = Arc::downgrade(&inner);
Self {
inner: Mutex::new(SchedulerImpl {
queued_actions_set: HashSet::new(),
queued_actions: BTreeMap::new(),
workers: Workers::new(),
active_actions: HashMap::new(),
worker_timeout_s,
max_job_retries,
}),
inner: inner.clone(),
platform_property_manager,
task_worker_matching_future: tokio::spawn(async move {
// Break out of the loop only when the inner is dropped.
loop {
tasks_or_workers_change_notify.notified().await;
match weak_inner.upgrade() {
// Note: According to `parking_lot` documentation, the default
// `Mutex` implementation is eventual fairness, so we don't
// really need to worry about this thread taking the lock
// starving other threads too much.
Some(inner_mux) => {
let mut inner = inner_mux.lock();
inner.do_try_match();
}
// If the inner went away it means the scheduler is shutting
// down, so we need to resolve our future.
None => return,
};
on_matching_engine_run().await;
}
// Unreachable.
}),
}
}

Expand All @@ -546,22 +585,21 @@ impl Scheduler {
/// Adds a worker to the scheduler and begin using it to execute actions (when able).
pub async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
let worker_id = worker.id.clone();
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
let res = inner
.workers
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
if res.is_err() {
inner.immediate_evict_worker(&worker_id);
return res;
}
inner.do_try_match();
Ok(())
inner.tasks_or_workers_change_notify.notify_one();
res
}

/// Adds an action to the scheduler for remote execution.
pub async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner.add_action(action_info)
}

Expand All @@ -571,7 +609,7 @@ impl Scheduler {
action_info_hash_key: &ActionInfoHashKey,
err: Error,
) {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner.update_action_with_internal_error(worker_id, action_info_hash_key, err)
}

Expand All @@ -582,8 +620,8 @@ impl Scheduler {
action_info_hash_key: &ActionInfoHashKey,
action_stage: ActionStage,
) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
inner.update_action(worker_id, action_info_hash_key, action_stage).await
let mut inner = self.inner.lock();
inner.update_action(worker_id, action_info_hash_key, action_stage)
}

/// Event for when the keep alive message was received from the worker.
Expand All @@ -592,7 +630,7 @@ impl Scheduler {
worker_id: &WorkerId,
timestamp: WorkerTimestamp,
) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner
.workers
.refresh_lifetime(worker_id, timestamp)
Expand All @@ -601,13 +639,12 @@ impl Scheduler {

/// Removes worker from pool and reschedule any tasks that might be running on it.
pub async fn remove_worker(&self, worker_id: WorkerId) {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
inner.immediate_evict_worker(&worker_id);
inner.do_try_match();
}

pub async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
// Items should be sorted based on last_update_timestamp, so we don't need to iterate the entire
// map most of the time.
let worker_ids_to_remove: Vec<WorkerId> = inner
Expand All @@ -623,25 +660,22 @@ impl Scheduler {
}
})
.collect();
for worker_id in worker_ids_to_remove.as_slice() {
for worker_id in worker_ids_to_remove.iter() {
log::warn!("Worker {} timed out, removing from pool", worker_id);
inner.immediate_evict_worker(&worker_id);
}
if !worker_ids_to_remove.is_empty() {
inner.do_try_match();
}
Ok(())
}

/// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
pub async fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
let inner = self.inner.lock().await;
let inner = self.inner.lock();
inner.workers.workers.contains(worker_id)
}

/// A unit test function used to send the keep alive message to the worker from the server.
pub async fn send_keep_alive_to_worker_for_test(&self, worker_id: &WorkerId) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock();
let worker = inner
.workers
.workers
Expand All @@ -650,3 +684,9 @@ impl Scheduler {
worker.keep_alive()
}
}

impl Drop for Scheduler {
fn drop(&mut self) {
self.task_worker_matching_future.abort();
}
}
Loading

0 comments on commit 74b2f04

Please sign in to comment.