Skip to content

Commit

Permalink
Add worker config definitions and rename Metadata to Priority
Browse files Browse the repository at this point in the history
* Adds the initial config for workers to use use
* Rename PropertyType::Metadata to PropertyType::Priority
  • Loading branch information
allada committed Apr 16, 2022
1 parent e26549a commit 98c4e08
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 24 deletions.
16 changes: 10 additions & 6 deletions cas/scheduler/platform_property_manager.rs
Expand Up @@ -43,14 +43,17 @@ impl PlatformProperties {
/// Minimum - Means that workers must have at least this number available. When
/// a worker executes a task that has this value, the worker will have
/// this value subtracted from the available resources of the worker.
/// Metadata - Means the worker is given this information, but does not restrict
/// Priority - Means the worker is given this information, but does not restrict
/// what workers can take this value. However, the worker must have the
/// associated key present to be matched.
/// TODO(allada) In the future this will be used by the scheduler and
/// worker to cause the scheduler to prefer certain workers over others,
/// but not restrict them based on these values.
#[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)]
pub enum PlatformPropertyValue {
Exact(String),
Minimum(u64),
Metadata(String),
Priority(String),
}

impl PlatformPropertyValue {
Expand All @@ -66,9 +69,10 @@ impl PlatformPropertyValue {
}
false
}
// Metadata is used to pass info to the worker and not restrict which
// workers can be selected.
PlatformPropertyValue::Metadata(_) => true,
// Priority is used to pass info to the worker and not restrict which
// workers can be selected, but might be used to prefer certain workers
// over others.
PlatformPropertyValue::Priority(_) => true,
// Success exact case is handled above.
PlatformPropertyValue::Exact(_) => false,
}
Expand Down Expand Up @@ -105,7 +109,7 @@ impl PlatformPropertyManager {
})?,
)),
PropertyType::Exact => Ok(PlatformPropertyValue::Exact(value.to_string())),
PropertyType::Metadata => Ok(PlatformPropertyValue::Metadata(value.to_string())),
PropertyType::Priority => Ok(PlatformPropertyValue::Priority(value.to_string())),
};
}
Err(make_input_err!("Unknown platform property '{}'", key))
Expand Down
74 changes: 72 additions & 2 deletions config/cas_server.rs
Expand Up @@ -82,7 +82,7 @@ pub struct CapabilitiesConfig {

/// When the scheduler matches tasks to workers that are capable of running
/// the task, this value will be used to determine how the property is treated.
#[derive(Deserialize, Debug, Clone, Copy)]
#[derive(Deserialize, Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub enum PropertyType {
/// Requires the platform property to be a u64 and when the scheduler looks
/// for appropriate worker nodes that are capable of executing the task,
Expand All @@ -97,7 +97,10 @@ pub enum PropertyType {

/// Does not restrict on this value and instead will be passed to the worker
/// as an informational piece.
Metadata,
/// TODO(allada) In the future this will be used by the scheduler and worker
/// to cause the scheduler to prefer certain workers over others, but not
/// restrict them based on these values.
Priority,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -173,12 +176,79 @@ pub struct ServerConfig {
pub services: Option<ServicesConfig>,
}

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
pub enum WrokerProperty {
/// List of static values.
/// Note: Generally there should only ever be 1 value, but if the platform
/// property key is PropertyType::Priority it may have more than one value.
values(Vec<String>),

/// A dynamic configuration. The string will be executed as a command
/// (not sell) and will be split by "\n" (new line character).
query_cmd(String),
}

#[derive(Deserialize, Debug)]
pub struct LocalWorker {
/// Endpoint which the worker will connect to the scheduler's WorkerApiService.
pub worker_api_endpoint: String,

/// The command to execute on every execution request. This will be parsed as
/// a command + arguments (not shell).
/// '$@' has a special meaning in that all the arguments will expand into this
/// location.
/// Example: "run.sh $@" and a job with command: "sleep 5" will result in a
/// command like: "run.sh sleep 5".
pub entrypoint_cmd: String,

/// Reference to a filesystem store (runtime enforced). This store will be used
/// to store a local cache of files for optimization purposes.
/// Must be a reference to a store implementing backends::FilesystemStore.
pub local_filesystem_store_ref: StoreRefName,

/// Underlying CAS store that the worker will use to download CAS artifacts.
/// This store must have the same objects that the scheduler/client-cas uses.
/// The scheduler will send job requests that will reference objects stored
/// in this store. If the objects referenced in the job request don't exist
/// in this store an error may be returned.
pub cas_store: StoreRefName,

/// Underlying AC store that the worker will use to publish execution results
/// into. Objects placed in this store should be reachable from the
/// scheduler/client-cas after they have finished updating.
pub ac_store: StoreRefName,

/// The directory work jobs will be executed from. This directory will be fully
/// managed by the worker service and will be purged on startup.
/// This directory and the directory referenced in local_filesystem_store_ref's
/// backends::FilesystemStore::content_path must be on the same filesystem.
/// Hardlinks will be used when placing files that are accessible to the jobs
/// that are sourced from local_filesystem_store_ref's content_path.
pub work_directory: String,

/// Properties of this worker. This configuration will be sent to the scheduler
/// and used to tell the scheduler to restrict what should be executed on this
/// worker.
pub platform_properties: HashMap<String, WrokerProperty>,
}

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
pub enum WorkerConfig {
/// A worker type that executes jobs locally on this machine.
local(LocalWorker),
}

#[derive(Deserialize, Debug)]
pub struct CasConfig {
/// List of stores available to use in this config.
/// The keys can be used in other configs when needing to reference a store.
pub stores: HashMap<StoreRefName, backends::StoreConfig>,

/// Worker configurations used to execute jobs.
pub workers: Option<Vec<WorkerConfig>>,

/// List of schedulers available to use in this config.
/// The keys can be used in other configs when needing to reference a
/// scheduler.
Expand Down
39 changes: 38 additions & 1 deletion config/examples/basic_cas.json
Expand Up @@ -15,6 +15,16 @@
"max_bytes": 10000000,
}
}
},
"WORKER_FILESYSTEM_STORE": {
"filesystem": {
"content_path": "/tmp/turbo_cache/data/content_path-cas",
"temp_path": "/tmp/turbo_cache/data/tmp_path-cas",
"eviction_policy": {
// 2gb.
"max_bytes": 2000000000,
}
}
}
},
"schedulers": {
Expand All @@ -34,10 +44,37 @@
"cpu_arch": "Exact",
"cpu_model": "Exact",
"kernel_version": "Exact",
"docker_image": "Metadata",
"docker_image": "Priority",
}
}
},
"workers": [{
"local": {
"worker_api_endpoint": "127.0.0.1:50061",
"entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@",
"local_filesystem_store_ref": "WORKER_FILESYSTEM_STORE",
"cas_store": "CAS_MAIN_STORE",
"ac_store": "AC_MAIN_STORE",
"work_directory": "/tmp/turbo_cache/work",
"platform_properties": {
"cpu_count": {
"values": ["1"],
},
"memory_kb": {
"values": ["500000"],
},
"network_kbps": {
"values": ["100000"],
},
"cpu_arch": {
"values": ["x86_64"],
},
"docker_image": {
"query_cmd": "docker images --format {{.Repository}}:{{.Tag}}",
}
}
}
}],
"servers": [{
"listen_address": "0.0.0.0:50051",
"services": {
Expand Down
16 changes: 8 additions & 8 deletions config/examples/filesystem_cas.json
@@ -1,5 +1,5 @@
// This configuration will place objects in various folders in
// `/tmp/turbo_cache_data`. It will store all data on disk and
// `/tmp/turbo_cache/data`. It will store all data on disk and
// allows for restarts of the underlying service. It is optimized
// so objects are compressed, deduplicated and uses some in-memory
// optimizations for certain hot paths.
Expand All @@ -12,8 +12,8 @@
},
"backend": {
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-cas",
"temp_path": "/tmp/turbo_cache_data/tmp_path-cas",
"content_path": "/tmp/turbo_cache/data/content_path-cas",
"temp_path": "/tmp/turbo_cache/data/tmp_path-cas",
"eviction_policy": {
// 2gb.
"max_bytes": 2000000000,
Expand Down Expand Up @@ -56,8 +56,8 @@
},
"slow": {
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-index",
"temp_path": "/tmp/turbo_cache_data/tmp_path-index",
"content_path": "/tmp/turbo_cache/data/content_path-index",
"temp_path": "/tmp/turbo_cache/data/tmp_path-index",
"eviction_policy": {
// 500mb.
"max_bytes": 500000000,
Expand All @@ -81,8 +81,8 @@
},
"AC_MAIN_STORE": {
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-ac",
"temp_path": "/tmp/turbo_cache_data/tmp_path-ac",
"content_path": "/tmp/turbo_cache/data/content_path-ac",
"temp_path": "/tmp/turbo_cache/data/tmp_path-ac",
"eviction_policy": {
// 500mb.
"max_bytes": 500000000,
Expand All @@ -107,7 +107,7 @@
"cpu_arch": "Exact",
"cpu_model": "Exact",
"kernel_version": "Exact",
"docker_image": "Metadata",
"docker_image": "Priority",
}
}
},
Expand Down
14 changes: 7 additions & 7 deletions config/examples/s3_backend_with_local_fast_cas.json
Expand Up @@ -8,8 +8,8 @@
"fast_slow": {
"fast": {
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-index",
"temp_path": "/tmp/turbo_cache_data/tmp_path-index",
"content_path": "/tmp/turbo_cache/data/content_path-index",
"temp_path": "/tmp/turbo_cache/data/tmp_path-index",
"eviction_policy": {
// 500mb.
"max_bytes": 500000000,
Expand Down Expand Up @@ -40,8 +40,8 @@
"fast_slow": {
"fast": {
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-content",
"temp_path": "/tmp/turbo_cache_data/tmp_path-content",
"content_path": "/tmp/turbo_cache/data/content_path-content",
"temp_path": "/tmp/turbo_cache/data/tmp_path-content",
"eviction_policy": {
// 2gb.
"max_bytes": 2000000000,
Expand Down Expand Up @@ -81,8 +81,8 @@
}
},
"filesystem": {
"content_path": "/tmp/turbo_cache_data/content_path-ac",
"temp_path": "/tmp/turbo_cache_data/tmp_path-ac",
"content_path": "/tmp/turbo_cache/data/content_path-ac",
"temp_path": "/tmp/turbo_cache/data/tmp_path-ac",
"eviction_policy": {
// 500mb.
"max_bytes": 500000000,
Expand Down Expand Up @@ -122,7 +122,7 @@
"cpu_arch": "Exact",
"cpu_model": "Exact",
"kernel_version": "Exact",
"docker_image": "Metadata",
"docker_image": "Priority",
}
}
},
Expand Down

0 comments on commit 98c4e08

Please sign in to comment.