Skip to content

Commit

Permalink
Promote chosen worker to distribute jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo committed Jul 11, 2023
1 parent 0801e26 commit 2fd1702
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
31 changes: 22 additions & 9 deletions cas/scheduler/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ struct RunningAction {

struct Workers {
workers: LruCache<WorkerId, Worker>,
/// The allocation strategy for workers.
allocation_strategy: config::schedulers::WorkerAllocationStrategy,
}

impl Workers {
fn new() -> Self {
fn new(allocation_strategy: config::schedulers::WorkerAllocationStrategy) -> Self {
Self {
workers: LruCache::unbounded(),
allocation_strategy,
}
}

Expand Down Expand Up @@ -127,13 +130,24 @@ impl Workers {
fn find_worker_for_action_mut<'a>(&'a mut self, awaited_action: &AwaitedAction) -> Option<&'a mut Worker> {
assert!(matches!(awaited_action.current_state.stage, ActionStage::Queued));
let action_properties = &awaited_action.action_info.platform_properties;
return self.workers.iter_mut().find_map(|(_, w)| {
if action_properties.is_satisfied_by(&w.platform_properties) {
Some(w)
} else {
None
let mut workers_iter = self.workers.iter_mut();
let workers_iter = match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::LeastRecentlyUsed => {
workers_iter.rfind(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties))
}
});
// Use find to get the most recently used that satisfies the properties.
config::schedulers::WorkerAllocationStrategy::MostRecentlyUsed => {
workers_iter.find(|(_, w)| action_properties.is_satisfied_by(&w.platform_properties))
}
};
let worker_id = workers_iter.map(|(_, w)| &w.id);
// We need to "touch" the worker to ensure it gets re-ordered in the LRUCache, since it was selected.
if let Some(&worker_id) = worker_id {
self.workers.get_mut(&worker_id)
} else {
None
}
}
}

Expand All @@ -156,7 +170,6 @@ struct SimpleSchedulerImpl {
worker_timeout_s: u64,
/// Default times a job can retry before failing.
max_job_retries: usize,

/// Notify task<->worker matching engine that work needs to be done.
tasks_or_workers_change_notify: Arc<Notify>,
}
Expand Down Expand Up @@ -546,7 +559,7 @@ impl SimpleScheduler {
let inner = Arc::new(Mutex::new(SimpleSchedulerImpl {
queued_actions_set: HashSet::new(),
queued_actions: BTreeMap::new(),
workers: Workers::new(),
workers: Workers::new(scheduler_cfg.allocation_strategy.clone()),
active_actions: HashMap::new(),
worker_timeout_s,
max_job_retries,
Expand Down
17 changes: 17 additions & 0 deletions config/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ pub enum PropertyType {
Priority,
}

/// When a worker is being searched for to run a job, this will be used
/// on how to choose which worker should run the job when multiple
/// workers are able to run the task.
#[allow(non_camel_case_types)]
#[derive(Copy, Clone, Deserialize, Debug, Default)]
pub enum WorkerAllocationStrategy {
/// Prefer workers that have been least recently used to run a job.
#[default]
LeastRecentlyUsed,
/// Prefer workers that have been most recently used to run a job.
MostRecentlyUsed,
}

#[derive(Deserialize, Debug, Default)]
pub struct SimpleScheduler {
/// A list of supported platform properties mapped to how these properties
Expand Down Expand Up @@ -90,4 +103,8 @@ pub struct SimpleScheduler {
/// Default: 3
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_job_retries: usize,

/// The strategy used to assign workers jobs.
#[serde(default)]
pub allocation_strategy: WorkerAllocationStrategy,
}

0 comments on commit 2fd1702

Please sign in to comment.