Skip to content

Commit

Permalink
[BreakingChange] Scheduler config now supports multiple impls
Browse files Browse the repository at this point in the history
This is a step in the direction to support different scheduler impls.

To upgrade, you'll need to add a "local" property to the "scheduler"
configs. See example files that where changed in this commit.

towards: #140
  • Loading branch information
allada committed Jul 7, 2023
1 parent 58b5021 commit 384f14e
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 147 deletions.
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust_binary(
"//cas/grpc_service:execution_server",
"//cas/grpc_service:worker_api_server",
"//cas/scheduler",
"//cas/scheduler:default_scheduler_factory",
"//cas/store",
"//cas/store:default_store_factory",
"//cas/worker:local_worker",
Expand Down
11 changes: 8 additions & 3 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::set_open_file_limit;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, WorkerConfig};
use default_scheduler_factory::scheduler_factory;
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
use scheduler::Scheduler;
use store::StoreManager;
use worker_api_server::WorkerApiServer;

Expand Down Expand Up @@ -104,8 +104,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut schedulers = HashMap::new();
if let Some(schedulers_cfg) = cfg.schedulers {
for (scheduler_name, scheduler_cfg) in schedulers_cfg {
schedulers.insert(scheduler_name, Arc::new(Scheduler::new(&scheduler_cfg)));
for (name, scheduler_cfg) in schedulers_cfg {
schedulers.insert(
name.clone(),
scheduler_factory(&scheduler_cfg)
.await
.err_tip(|| format!("Failed to create scheduler '{}'", name))?,
);
}
}

Expand Down
5 changes: 3 additions & 2 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use tonic::Request;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage};
use common::DigestInfo;
use config::cas_server::{SchedulerConfig, WorkerApiConfig};
use config::cas_server::WorkerApiConfig;
use config::schedulers::SimpleScheduler;
use error::{Error, ResultExt};
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::{
Expand Down Expand Up @@ -54,7 +55,7 @@ fn static_now_fn() -> Result<Duration, Error> {
async fn setup_api_server(worker_timeout: u64, now_fn: NowFn) -> Result<TestContext, Error> {
const SCHEDULER_NAME: &str = "DUMMY_SCHEDULE_NAME";

let scheduler = Arc::new(Scheduler::new(&SchedulerConfig {
let scheduler = Arc::new(Scheduler::new(&SimpleScheduler {
worker_timeout_s: worker_timeout,
..Default::default()
}));
Expand Down
15 changes: 15 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ rust_library(
],
)

rust_library(
name = "default_scheduler_factory",
srcs = ["default_scheduler_factory.rs"],
visibility = [
"//cas:__pkg__",
"//cas:__subpackages__",
],
deps = [
":scheduler",
"//config",
"//util:error",
"@crate_index//:futures",
],
)

rust_library(
name = "worker",
srcs = ["worker.rs"],
Expand Down
33 changes: 33 additions & 0 deletions cas/scheduler/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use futures::Future;

use config::schedulers::SchedulerConfig;
use error::Error;
use scheduler::Scheduler;

pub fn scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
) -> Pin<Box<dyn Future<Output = Result<Arc<Scheduler>, Error>> + 'a>> {
Box::pin(async move {
let scheduler = match scheduler_type_cfg {
SchedulerConfig::simple(config) => Arc::new(Scheduler::new(&config)),
};
Ok(scheduler)
})
}
2 changes: 1 addition & 1 deletion cas/scheduler/platform_property_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use config::cas_server::PropertyType;
use config::schedulers::PropertyType;
use error::{make_input_err, Code, Error, ResultExt};
use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform;

Expand Down
4 changes: 2 additions & 2 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::sync::watch;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata};
use common::{log, DigestInfo};
use config::cas_server::SchedulerConfig;
use config::schedulers::SimpleScheduler;
use error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate};
Expand Down Expand Up @@ -508,7 +508,7 @@ pub struct Scheduler {
}

impl Scheduler {
pub fn new(scheduler_cfg: &SchedulerConfig) -> Self {
pub fn new(scheduler_cfg: &SimpleScheduler) -> Self {
let platform_property_manager = PlatformPropertyManager::new(
scheduler_cfg
.supported_platform_properties
Expand Down
26 changes: 13 additions & 13 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use action_messages::{
NameOrPath, SymlinkInfo,
};
use common::DigestInfo;
use config::cas_server::SchedulerConfig;
use config::schedulers::SimpleScheduler;
use error::{make_err, Code, Error, ResultExt};
use platform_property_manager::{PlatformProperties, PlatformPropertyValue};
use proto::build::bazel::remote::execution::v2::ExecuteRequest;
Expand Down Expand Up @@ -109,7 +109,7 @@ mod scheduler_tests {
async fn basic_add_action_with_one_worker_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
Expand Down Expand Up @@ -153,7 +153,7 @@ mod scheduler_tests {
async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> {
const WORKER_ID1: WorkerId = WorkerId(0x111111);
const WORKER_ID2: WorkerId = WorkerId(0x222222);
let scheduler = Scheduler::new(&SchedulerConfig {
let scheduler = Scheduler::new(&SimpleScheduler {
worker_timeout_s: WORKER_TIMEOUT_S,
..Default::default()
});
Expand Down Expand Up @@ -285,7 +285,7 @@ mod scheduler_tests {

#[tokio::test]
async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), Error> {
let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);
let mut platform_properties = PlatformProperties::default();
platform_properties
Expand Down Expand Up @@ -360,7 +360,7 @@ mod scheduler_tests {
async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x100009);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut expected_action_state = ActionState {
Expand Down Expand Up @@ -429,7 +429,7 @@ mod scheduler_tests {
#[tokio::test]
async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x100010);
let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
Expand Down Expand Up @@ -459,7 +459,7 @@ mod scheduler_tests {
async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> {
const WORKER_ID1: WorkerId = WorkerId(0x111111);
const WORKER_ID2: WorkerId = WorkerId(0x222222);
let scheduler = Scheduler::new(&SchedulerConfig {
let scheduler = Scheduler::new(&SimpleScheduler {
worker_timeout_s: WORKER_TIMEOUT_S,
..Default::default()
});
Expand Down Expand Up @@ -544,7 +544,7 @@ mod scheduler_tests {
async fn update_action_sends_completed_result_to_client_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
Expand Down Expand Up @@ -634,7 +634,7 @@ mod scheduler_tests {
const GOOD_WORKER_ID: WorkerId = WorkerId(0x123456789111);
const ROGUE_WORKER_ID: WorkerId = WorkerId(0x987654321);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, GOOD_WORKER_ID, Default::default()).await?;
Expand Down Expand Up @@ -718,7 +718,7 @@ mod scheduler_tests {
async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x10000f);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut expected_action_state = ActionState {
Expand Down Expand Up @@ -822,7 +822,7 @@ mod scheduler_tests {
async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest1 = DigestInfo::new([11u8; 32], 512);
let action_digest2 = DigestInfo::new([99u8; 32], 512);

Expand Down Expand Up @@ -960,7 +960,7 @@ mod scheduler_tests {
async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let scheduler = Scheduler::new(&SimpleScheduler::default());
let action_digest1 = DigestInfo::new([11u8; 32], 512);
let action_digest2 = DigestInfo::new([99u8; 32], 512);

Expand Down Expand Up @@ -1008,7 +1008,7 @@ mod scheduler_tests {
async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig {
let scheduler = Scheduler::new(&SimpleScheduler {
max_job_retries: 2,
..Default::default()
});
Expand Down
11 changes: 11 additions & 0 deletions config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust_library(
srcs = ["lib.rs"],
visibility = ["//visibility:public"],
deps = [
":schedulers",
":stores",
":cas_server",
"@crate_index//:serde",
Expand All @@ -21,6 +22,7 @@ rust_library(
name = "cas_server",
srcs = ["cas_server.rs"],
deps = [
":schedulers",
":stores",
"//util:serde_utils",
"@crate_index//:serde",
Expand All @@ -35,3 +37,12 @@ rust_library(
"@crate_index//:serde",
],
)

rust_library(
name = "schedulers",
srcs = ["schedulers.rs"],
deps = [
"//util:serde_utils",
"@crate_index//:serde",
],
)
71 changes: 2 additions & 69 deletions config/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use serde::Deserialize;

use schedulers;
use serde_utils::{convert_numeric_with_shellexpand, convert_string_with_shellexpand};
use stores;

Expand Down Expand Up @@ -86,51 +87,6 @@ pub struct CasStoreConfig {
pub cas_store: StoreRefName,
}

#[derive(Deserialize, Debug, Default)]
pub struct SchedulerConfig {
/// A list of supported platform properties mapped to how these properties
/// are used when the scheduler looks for worker nodes capable of running
/// the task.
///
/// For example, a value of:
/// ```
/// { "cpu_count": "Minimum", "cpu_arch": "Exact" }
/// ```
/// With a job that contains:
/// ```
/// { "cpu_count": "8", "cpu_arch": "arm" }
/// ```
/// Will result in the scheduler filtering out any workers that do not have
/// "cpu_arch" = "arm" and filter out any workers that have less than 8 cpu
/// cores available.
///
/// The property names here must match the property keys provided by the
/// worker nodes when they join the pool. In other words, the workers will
/// publish their capabilities to the scheduler when they join the worker
/// pool. If the worker fails to notify the scheduler of it's (for example)
/// "cpu_arch", the scheduler will never send any jobs to it, if all jobs
/// have the "cpu_arch" label. There is no special treatment of any platform
/// property labels other and entirely driven by worker configs and this
/// config.
pub supported_platform_properties: Option<HashMap<String, PropertyType>>,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
/// Default: 5 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub worker_timeout_s: u64,

/// If a job returns an internal error or times out this many times when
/// attempting to run on a worker the scheduler will return the last error
/// to the client. Jobs will be retried and this configuration is to help
/// prevent one rogue job from infinitely retrying and taking up a lot of
/// resources when the task itself is the one causing the server to go
/// into a bad state.
/// Default: 3
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_job_retries: usize,
}

#[derive(Deserialize, Debug, Default)]
pub struct CapabilitiesRemoteExecutionConfig {
/// Scheduler used to configure the capabilities of remote execution.
Expand All @@ -146,29 +102,6 @@ pub struct CapabilitiesConfig {
pub remote_execution: Option<CapabilitiesRemoteExecutionConfig>,
}

/// When the scheduler matches tasks to workers that are capable of running
/// the task, this value will be used to determine how the property is treated.
#[derive(Deserialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub enum PropertyType {
/// Requires the platform property to be a u64 and when the scheduler looks
/// for appropriate worker nodes that are capable of executing the task,
/// the task will not run on a node that has less than this value.
Minimum,

/// Requires the platform property to be a string and when the scheduler
/// looks for appropriate worker nodes that are capable of executing the
/// task, the task will not run on a node that does not have this property
/// set to the value with exact string match.
Exact,

/// Does not restrict on this value and instead will be passed to the worker
/// as an informational piece.
/// TODO(allada) In the future this will be used by the scheduler and worker
/// to cause the scheduler to prefer certain workers over others, but not
/// restrict them based on these values.
Priority,
}

#[derive(Deserialize, Debug)]
pub struct ExecutionConfig {
/// The store name referenced in the `stores` map in the main config.
Expand Down Expand Up @@ -354,7 +287,7 @@ pub struct CasConfig {
/// List of schedulers available to use in this config.
/// The keys can be used in other configs when needing to reference a
/// scheduler.
pub schedulers: Option<HashMap<SchedulerRefName, SchedulerConfig>>,
pub schedulers: Option<HashMap<SchedulerRefName, schedulers::SchedulerConfig>>,

/// Servers to setup for this process.
pub servers: Vec<ServerConfig>,
Expand Down
Loading

0 comments on commit 384f14e

Please sign in to comment.