Skip to content

Commit

Permalink
Various minor updates
Browse files Browse the repository at this point in the history
* Worker config is now a struct instead of string
* Moved ac_utils into store folder instead of scheduler
* Changed execution priority from i64 to i32
* Add PlatformPropertyValue::Unknown
  • Loading branch information
allada committed Apr 18, 2022
1 parent 576aff4 commit cf6dd3d
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 26 deletions.
16 changes: 2 additions & 14 deletions cas/grpc_service/BUILD
Expand Up @@ -25,14 +25,14 @@ rust_library(
srcs = ["ac_server.rs"],
deps = [
"//cas/store",
"//cas/store:ac_utils",
"//config",
"//proto",
"//third_party:bytes",
"//third_party:prost",
"//third_party:tonic",
"//util:common",
"//util:error",
":ac_utils",
],
visibility = ["//cas:__pkg__"]
)
Expand Down Expand Up @@ -77,6 +77,7 @@ rust_library(
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:scheduler",
"//cas/store",
"//cas/store:ac_utils",
"//config",
"//proto",
"//third_party:futures",
Expand All @@ -87,7 +88,6 @@ rust_library(
"//third_party:tonic",
"//util:common",
"//util:error",
":ac_utils",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -112,18 +112,6 @@ rust_library(
visibility = ["//cas:__pkg__"]
)

rust_library(
name = "ac_utils",
srcs = ["ac_utils.rs"],
deps = [
"//cas/store",
"//third_party:prost",
"//util:common",
"//util:error",
],
visibility = ["//visibility:public"],
)

rust_test(
name = "worker_api_server_test",
srcs = ["tests/worker_api_server_test.rs"],
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/ac_server.rs
Expand Up @@ -60,7 +60,7 @@ impl AcServer {
.as_ref(),
);
Ok(Response::new(
get_and_decode_digest::<ActionResult>(&store, &digest).await?,
get_and_decode_digest::<ActionResult>(store, &digest).await?,
))
}

Expand Down
8 changes: 4 additions & 4 deletions cas/grpc_service/execution_server.rs
Expand Up @@ -26,7 +26,7 @@ use scheduler::Scheduler;
use store::{Store, StoreManager};

/// Default priority remote execution jobs will get when not provided.
const DEFAULT_EXECUTION_PRIORITY: i64 = 0;
const DEFAULT_EXECUTION_PRIORITY: i32 = 0;

struct InstanceInfo {
scheduler: Arc<Scheduler>,
Expand All @@ -43,7 +43,7 @@ impl InstanceInfo {
instance_name: String,
action_digest: DigestInfo,
action: &Action,
priority: i64,
priority: i32,
) -> Result<ActionInfo, Error> {
let command_digest = DigestInfo::try_from(
action
Expand Down Expand Up @@ -173,9 +173,9 @@ impl ExecutionServer {

let priority = execute_req
.execution_policy
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority as i64);
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);

let action = get_and_decode_digest::<Action>(&instance_info.cas_pin(), &digest).await?;
let action = get_and_decode_digest::<Action>(instance_info.cas_pin(), &digest).await?;
let action_info = instance_info
.build_action_info(instance_name, digest, &action, priority)
.await?;
Expand Down
1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Expand Up @@ -7,6 +7,7 @@ rust_library(
srcs = ["platform_property_manager.rs"],
deps = [
"//config",
"//proto",
"//util:error",
],
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
Expand Down
2 changes: 1 addition & 1 deletion cas/scheduler/action_messages.rs
Expand Up @@ -50,7 +50,7 @@ pub struct ActionInfo {
/// The properties rules that must be applied when finding a worker that can run this action.
pub platform_properties: PlatformProperties,
/// The priority of the action. Higher value means it should execute faster.
pub priority: i64,
pub priority: i32,
/// When this action was created.
pub insert_timestamp: SystemTime,

Expand Down
14 changes: 14 additions & 0 deletions cas/scheduler/platform_property_manager.rs
Expand Up @@ -4,6 +4,7 @@ use std::collections::HashMap;

use config::cas_server::PropertyType;
use error::{make_input_err, Code, Error, ResultExt};
use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform;

/// `PlatformProperties` helps manage the configuration of platform properties to
/// keys and types. The scheduler uses these properties to decide what jobs
Expand Down Expand Up @@ -37,6 +38,16 @@ impl PlatformProperties {
}
}

impl From<ProtoPlatform> for PlatformProperties {
fn from(platform: ProtoPlatform) -> Self {
let mut properties = HashMap::with_capacity(platform.properties.len());
for property in platform.properties.into_iter() {
properties.insert(property.name, PlatformPropertyValue::Unknown(property.value));
}
Self { properties }
}
}

/// Holds the associated value of the key and type.
///
/// Exact - Means the worker must have this exact value.
Expand All @@ -54,6 +65,7 @@ pub enum PlatformPropertyValue {
Exact(String),
Minimum(u64),
Priority(String),
Unknown(String),
}

impl PlatformPropertyValue {
Expand All @@ -75,6 +87,8 @@ impl PlatformPropertyValue {
PlatformPropertyValue::Priority(_) => true,
// Success exact case is handled above.
PlatformPropertyValue::Exact(_) => false,
// Used mostly for transporting data. This should not be relied upon when this value.
PlatformPropertyValue::Unknown(_) => false,
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions cas/store/BUILD
Expand Up @@ -50,6 +50,19 @@ rust_library(
visibility = ["//cas:__pkg__"]
)

rust_library(
name = "ac_utils",
srcs = ["ac_utils.rs"],
deps = [
"//third_party:prost",
"//util:common",
"//util:error",
":store",
],
visibility = ["//visibility:public"],
)


rust_library(
name = "size_partitioning_store",
srcs = ["size_partitioning_store.rs"],
Expand Down
14 changes: 12 additions & 2 deletions cas/grpc_service/ac_utils.rs → cas/store/ac_utils.rs
Expand Up @@ -12,13 +12,23 @@ use store::Store;
// 1.2k. Giving a bit more just in case to reduce allocs.
pub const ESTIMATED_DIGEST_SIZE: usize = 2048;

/// This is more of a safety check. We are going to collect this entire message
/// into memory. If we don't bound the max size of the object we enable users
/// to use up all the memory on this machine.
const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb.

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_and_decode_digest<T: Message + Default>(
store: &Pin<&dyn Store>,
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<T, Error> {
let mut store_data_resp = store
.get_part_unchunked(digest.clone(), 0, None, Some(ESTIMATED_DIGEST_SIZE))
.get_part_unchunked(
digest.clone(),
0,
Some(MAX_ACTION_MSG_SIZE),
Some(ESTIMATED_DIGEST_SIZE),
)
.await;
if let Err(err) = &mut store_data_resp {
if err.code == Code::NotFound {
Expand Down
17 changes: 14 additions & 3 deletions config/cas_server.rs
Expand Up @@ -189,10 +189,21 @@ pub enum WrokerProperty {
query_cmd(String),
}

/// Generic config for an endpoint and associated configs.
#[derive(Deserialize, Debug)]
pub struct LocalWorker {
pub struct EndpointConfig {
/// URI of the endpoint.
pub uri: String,

/// Timeout in seconds that a request should take.
/// Default: 5 (seconds)
pub timeout: Option<f32>,
}

#[derive(Deserialize, Debug)]
pub struct LocalWorkerConfig {
/// Endpoint which the worker will connect to the scheduler's WorkerApiService.
pub worker_api_endpoint: String,
pub worker_api_endpoint: EndpointConfig,

/// The command to execute on every execution request. This will be parsed as
/// a command + arguments (not shell).
Expand Down Expand Up @@ -237,7 +248,7 @@ pub struct LocalWorker {
#[derive(Deserialize, Debug)]
pub enum WorkerConfig {
/// A worker type that executes jobs locally on this machine.
local(LocalWorker),
local(LocalWorkerConfig),
}

#[derive(Deserialize, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion config/examples/basic_cas.json
Expand Up @@ -50,7 +50,9 @@
},
"workers": [{
"local": {
"worker_api_endpoint": "127.0.0.1:50061",
"worker_api_endpoint": {
"uri": "127.0.0.1:50061",
},
"entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@",
"local_filesystem_store_ref": "WORKER_FILESYSTEM_STORE",
"cas_store": "CAS_MAIN_STORE",
Expand Down

0 comments on commit cf6dd3d

Please sign in to comment.