From cf6dd3db5a9633aa9fa3060395266925c09e9a62 Mon Sep 17 00:00:00 2001 From: allada Date: Mon, 18 Apr 2022 13:03:19 -0500 Subject: [PATCH] Various minor updates * Worker config is now a struct instead of string * Moved ac_utils into store folder instead of scheduler * Changed execution priority from i64 to i32 * Add PlatformPropertyValue::Unknown --- cas/grpc_service/BUILD | 16 ++-------------- cas/grpc_service/ac_server.rs | 2 +- cas/grpc_service/execution_server.rs | 8 ++++---- cas/scheduler/BUILD | 1 + cas/scheduler/action_messages.rs | 2 +- cas/scheduler/platform_property_manager.rs | 14 ++++++++++++++ cas/store/BUILD | 13 +++++++++++++ cas/{grpc_service => store}/ac_utils.rs | 14 ++++++++++++-- config/cas_server.rs | 17 ++++++++++++++--- config/examples/basic_cas.json | 4 +++- 10 files changed, 65 insertions(+), 26 deletions(-) rename cas/{grpc_service => store}/ac_utils.rs (73%) diff --git a/cas/grpc_service/BUILD b/cas/grpc_service/BUILD index ee4d2170e..7b00d7806 100644 --- a/cas/grpc_service/BUILD +++ b/cas/grpc_service/BUILD @@ -25,6 +25,7 @@ rust_library( srcs = ["ac_server.rs"], deps = [ "//cas/store", + "//cas/store:ac_utils", "//config", "//proto", "//third_party:bytes", @@ -32,7 +33,6 @@ rust_library( "//third_party:tonic", "//util:common", "//util:error", - ":ac_utils", ], visibility = ["//cas:__pkg__"] ) @@ -77,6 +77,7 @@ rust_library( "//cas/scheduler:platform_property_manager", "//cas/scheduler:scheduler", "//cas/store", + "//cas/store:ac_utils", "//config", "//proto", "//third_party:futures", @@ -87,7 +88,6 @@ rust_library( "//third_party:tonic", "//util:common", "//util:error", - ":ac_utils", ], visibility = ["//cas:__pkg__"] ) @@ -112,18 +112,6 @@ rust_library( visibility = ["//cas:__pkg__"] ) -rust_library( - name = "ac_utils", - srcs = ["ac_utils.rs"], - deps = [ - "//cas/store", - "//third_party:prost", - "//util:common", - "//util:error", - ], - visibility = ["//visibility:public"], -) - rust_test( name = "worker_api_server_test", srcs = ["tests/worker_api_server_test.rs"], diff --git a/cas/grpc_service/ac_server.rs b/cas/grpc_service/ac_server.rs index f81716fa9..365459d9f 100644 --- a/cas/grpc_service/ac_server.rs +++ b/cas/grpc_service/ac_server.rs @@ -60,7 +60,7 @@ impl AcServer { .as_ref(), ); Ok(Response::new( - get_and_decode_digest::(&store, &digest).await?, + get_and_decode_digest::(store, &digest).await?, )) } diff --git a/cas/grpc_service/execution_server.rs b/cas/grpc_service/execution_server.rs index a2de6da30..2b9b6e3f6 100644 --- a/cas/grpc_service/execution_server.rs +++ b/cas/grpc_service/execution_server.rs @@ -26,7 +26,7 @@ use scheduler::Scheduler; use store::{Store, StoreManager}; /// Default priority remote execution jobs will get when not provided. -const DEFAULT_EXECUTION_PRIORITY: i64 = 0; +const DEFAULT_EXECUTION_PRIORITY: i32 = 0; struct InstanceInfo { scheduler: Arc, @@ -43,7 +43,7 @@ impl InstanceInfo { instance_name: String, action_digest: DigestInfo, action: &Action, - priority: i64, + priority: i32, ) -> Result { let command_digest = DigestInfo::try_from( action @@ -173,9 +173,9 @@ impl ExecutionServer { let priority = execute_req .execution_policy - .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority as i64); + .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority); - let action = get_and_decode_digest::(&instance_info.cas_pin(), &digest).await?; + let action = get_and_decode_digest::(instance_info.cas_pin(), &digest).await?; let action_info = instance_info .build_action_info(instance_name, digest, &action, priority) .await?; diff --git a/cas/scheduler/BUILD b/cas/scheduler/BUILD index cbad9ca96..d773aceee 100644 --- a/cas/scheduler/BUILD +++ b/cas/scheduler/BUILD @@ -7,6 +7,7 @@ rust_library( srcs = ["platform_property_manager.rs"], deps = [ "//config", + "//proto", "//util:error", ], visibility = ["//cas:__pkg__", "//cas:__subpackages__"] diff --git a/cas/scheduler/action_messages.rs b/cas/scheduler/action_messages.rs index 077246d1b..616c5ed34 100644 --- a/cas/scheduler/action_messages.rs +++ b/cas/scheduler/action_messages.rs @@ -50,7 +50,7 @@ pub struct ActionInfo { /// The properties rules that must be applied when finding a worker that can run this action. pub platform_properties: PlatformProperties, /// The priority of the action. Higher value means it should execute faster. - pub priority: i64, + pub priority: i32, /// When this action was created. pub insert_timestamp: SystemTime, diff --git a/cas/scheduler/platform_property_manager.rs b/cas/scheduler/platform_property_manager.rs index 8f0f848e9..97a5d9593 100644 --- a/cas/scheduler/platform_property_manager.rs +++ b/cas/scheduler/platform_property_manager.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use config::cas_server::PropertyType; use error::{make_input_err, Code, Error, ResultExt}; +use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform; /// `PlatformProperties` helps manage the configuration of platform properties to /// keys and types. The scheduler uses these properties to decide what jobs @@ -37,6 +38,16 @@ impl PlatformProperties { } } +impl From for PlatformProperties { + fn from(platform: ProtoPlatform) -> Self { + let mut properties = HashMap::with_capacity(platform.properties.len()); + for property in platform.properties.into_iter() { + properties.insert(property.name, PlatformPropertyValue::Unknown(property.value)); + } + Self { properties } + } +} + /// Holds the associated value of the key and type. /// /// Exact - Means the worker must have this exact value. @@ -54,6 +65,7 @@ pub enum PlatformPropertyValue { Exact(String), Minimum(u64), Priority(String), + Unknown(String), } impl PlatformPropertyValue { @@ -75,6 +87,8 @@ impl PlatformPropertyValue { PlatformPropertyValue::Priority(_) => true, // Success exact case is handled above. PlatformPropertyValue::Exact(_) => false, + // Used mostly for transporting data. This should not be relied upon when this value. + PlatformPropertyValue::Unknown(_) => false, } } } diff --git a/cas/store/BUILD b/cas/store/BUILD index a410ef78a..ef7840ce7 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -50,6 +50,19 @@ rust_library( visibility = ["//cas:__pkg__"] ) +rust_library( + name = "ac_utils", + srcs = ["ac_utils.rs"], + deps = [ + "//third_party:prost", + "//util:common", + "//util:error", + ":store", + ], + visibility = ["//visibility:public"], +) + + rust_library( name = "size_partitioning_store", srcs = ["size_partitioning_store.rs"], diff --git a/cas/grpc_service/ac_utils.rs b/cas/store/ac_utils.rs similarity index 73% rename from cas/grpc_service/ac_utils.rs rename to cas/store/ac_utils.rs index 0697e73b0..fc745c54e 100644 --- a/cas/grpc_service/ac_utils.rs +++ b/cas/store/ac_utils.rs @@ -12,13 +12,23 @@ use store::Store; // 1.2k. Giving a bit more just in case to reduce allocs. pub const ESTIMATED_DIGEST_SIZE: usize = 2048; +/// This is more of a safety check. We are going to collect this entire message +/// into memory. If we don't bound the max size of the object we enable users +/// to use up all the memory on this machine. +const MAX_ACTION_MSG_SIZE: usize = 10 << 20; // 10mb. + /// Attempts to fetch the digest contents from a store into the associated proto. pub async fn get_and_decode_digest( - store: &Pin<&dyn Store>, + store: Pin<&dyn Store>, digest: &DigestInfo, ) -> Result { let mut store_data_resp = store - .get_part_unchunked(digest.clone(), 0, None, Some(ESTIMATED_DIGEST_SIZE)) + .get_part_unchunked( + digest.clone(), + 0, + Some(MAX_ACTION_MSG_SIZE), + Some(ESTIMATED_DIGEST_SIZE), + ) .await; if let Err(err) = &mut store_data_resp { if err.code == Code::NotFound { diff --git a/config/cas_server.rs b/config/cas_server.rs index b46b32032..94bbdaa82 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -189,10 +189,21 @@ pub enum WrokerProperty { query_cmd(String), } +/// Generic config for an endpoint and associated configs. #[derive(Deserialize, Debug)] -pub struct LocalWorker { +pub struct EndpointConfig { + /// URI of the endpoint. + pub uri: String, + + /// Timeout in seconds that a request should take. + /// Default: 5 (seconds) + pub timeout: Option, +} + +#[derive(Deserialize, Debug)] +pub struct LocalWorkerConfig { /// Endpoint which the worker will connect to the scheduler's WorkerApiService. - pub worker_api_endpoint: String, + pub worker_api_endpoint: EndpointConfig, /// The command to execute on every execution request. This will be parsed as /// a command + arguments (not shell). @@ -237,7 +248,7 @@ pub struct LocalWorker { #[derive(Deserialize, Debug)] pub enum WorkerConfig { /// A worker type that executes jobs locally on this machine. - local(LocalWorker), + local(LocalWorkerConfig), } #[derive(Deserialize, Debug)] diff --git a/config/examples/basic_cas.json b/config/examples/basic_cas.json index 14923a134..65969f567 100644 --- a/config/examples/basic_cas.json +++ b/config/examples/basic_cas.json @@ -50,7 +50,9 @@ }, "workers": [{ "local": { - "worker_api_endpoint": "127.0.0.1:50061", + "worker_api_endpoint": { + "uri": "127.0.0.1:50061", + }, "entrypoint_cmd": "./examples/worker/local_entrypoint.sh $@", "local_filesystem_store_ref": "WORKER_FILESYSTEM_STORE", "cas_store": "CAS_MAIN_STORE",