From 6bc991c30d8dd9abffb38d178ed05cfdf090da79 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Mon, 27 Nov 2023 07:13:17 -0600 Subject: [PATCH] Move action_messages & part of platform_properties to utils crate (#417) Moves action_messages helper and part of the platform properties to the utils crate so it can be used in stores. --- Cargo.lock | 2 +- native-link-scheduler/BUILD.bazel | 2 - native-link-scheduler/Cargo.toml | 1 - native-link-scheduler/src/action_scheduler.rs | 2 +- .../src/cache_lookup_scheduler.rs | 2 +- native-link-scheduler/src/grpc_scheduler.rs | 2 +- native-link-scheduler/src/lib.rs | 1 - .../src/platform_property_manager.rs | 100 +-------------- .../src/property_modifier_scheduler.rs | 2 +- native-link-scheduler/src/simple_scheduler.rs | 9 +- native-link-scheduler/src/worker.rs | 5 +- native-link-scheduler/src/worker_scheduler.rs | 2 +- .../tests/action_messages_test.rs | 4 +- .../tests/cache_lookup_scheduler_test.rs | 4 +- .../tests/property_modifier_scheduler_test.rs | 5 +- .../tests/simple_scheduler_test.rs | 6 +- .../tests/utils/mock_scheduler.rs | 2 +- .../tests/utils/scheduler_utils.rs | 4 +- native-link-service/src/execution_server.rs | 4 +- native-link-service/src/worker_api_server.rs | 4 +- .../tests/worker_api_server_test.rs | 4 +- native-link-util/BUILD.bazel | 3 + native-link-util/Cargo.toml | 1 + .../src/action_messages.rs | 71 ++++++++++- native-link-util/src/lib.rs | 2 + native-link-util/src/platform_properties.rs | 115 ++++++++++++++++++ native-link-worker/src/local_worker.rs | 2 +- .../src/running_actions_manager.rs | 6 +- native-link-worker/tests/local_worker_test.rs | 6 +- .../tests/running_actions_manager_test.rs | 8 +- .../utils/mock_running_actions_manager.rs | 2 +- 31 files changed, 231 insertions(+), 152 deletions(-) rename {native-link-scheduler => native-link-util}/src/action_messages.rs (93%) create mode 100644 native-link-util/src/platform_properties.rs diff --git a/Cargo.lock b/Cargo.lock index e8f44db3a..c3ae73a99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1638,7 +1638,6 @@ dependencies = [ "parking_lot", "pretty_assertions", "prost", - "prost-types", "proto", "rand", "scopeguard", @@ -1737,6 +1736,7 @@ dependencies = [ "pretty_assertions", "prometheus-client", "prost", + "prost-types", "proto", "rand", "serde", diff --git a/native-link-scheduler/BUILD.bazel b/native-link-scheduler/BUILD.bazel index 2059c99a4..3dda4ae10 100644 --- a/native-link-scheduler/BUILD.bazel +++ b/native-link-scheduler/BUILD.bazel @@ -9,7 +9,6 @@ load( rust_library( name = "native-link-scheduler", srcs = [ - "src/action_messages.rs", "src/action_scheduler.rs", "src/cache_lookup_scheduler.rs", "src/default_scheduler_factory.rs", @@ -38,7 +37,6 @@ rust_library( "@crate_index//:lru", "@crate_index//:parking_lot", "@crate_index//:prost", - "@crate_index//:prost-types", "@crate_index//:rand", "@crate_index//:scopeguard", "@crate_index//:tokio", diff --git a/native-link-scheduler/Cargo.toml b/native-link-scheduler/Cargo.toml index 3796091a6..1d04eb6e0 100644 --- a/native-link-scheduler/Cargo.toml +++ b/native-link-scheduler/Cargo.toml @@ -21,7 +21,6 @@ futures = "0.3.28" hashbrown = "0.14" lru = "0.10.1" parking_lot = "0.12.1" -prost-types = "0.11.9" rand = "0.8.5" scopeguard = "1.2.0" tokio = { version = "1.29.1", features = ["sync", "rt", "parking_lot"] } diff --git a/native-link-scheduler/src/action_scheduler.rs b/native-link-scheduler/src/action_scheduler.rs index 15cf5cd05..a4075ab7b 100644 --- a/native-link-scheduler/src/action_scheduler.rs +++ b/native-link-scheduler/src/action_scheduler.rs @@ -16,10 +16,10 @@ use std::sync::Arc; use async_trait::async_trait; use error::Error; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use native_link_util::metrics_utils::Registry; use tokio::sync::watch; -use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use crate::platform_property_manager::PlatformPropertyManager; /// ActionScheduler interface is responsible for interactions between the scheduler diff --git a/native-link-scheduler/src/cache_lookup_scheduler.rs b/native-link-scheduler/src/cache_lookup_scheduler.rs index abaebdd1a..4da98c97e 100644 --- a/native-link-scheduler/src/cache_lookup_scheduler.rs +++ b/native-link-scheduler/src/cache_lookup_scheduler.rs @@ -21,6 +21,7 @@ use error::Error; use futures::stream::StreamExt; use native_link_store::ac_utils::get_and_decode_digest; use native_link_store::grpc_store::GrpcStore; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState}; use native_link_util::common::DigestInfo; use native_link_util::store_trait::Store; use parking_lot::{Mutex, MutexGuard}; @@ -33,7 +34,6 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::Request; -use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; diff --git a/native-link-scheduler/src/grpc_scheduler.rs b/native-link-scheduler/src/grpc_scheduler.rs index dcd7c4f34..de0a09d30 100644 --- a/native-link-scheduler/src/grpc_scheduler.rs +++ b/native-link-scheduler/src/grpc_scheduler.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use error::{make_err, Code, Error, ResultExt}; use futures::stream::unfold; use futures::TryFutureExt; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use native_link_util::common::log; use native_link_util::retry::{ExponentialBackoff, Retrier, RetryResult}; use parking_lot::Mutex; @@ -37,7 +38,6 @@ use tokio::sync::watch; use tokio::time::sleep; use tonic::{transport, Request, Streaming}; -use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; diff --git a/native-link-scheduler/src/lib.rs b/native-link-scheduler/src/lib.rs index d056492ee..b728d5158 100644 --- a/native-link-scheduler/src/lib.rs +++ b/native-link-scheduler/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod action_messages; pub mod action_scheduler; pub mod cache_lookup_scheduler; pub mod default_scheduler_factory; diff --git a/native-link-scheduler/src/platform_property_manager.rs b/native-link-scheduler/src/platform_property_manager.rs index 26daa2168..967eb03f8 100644 --- a/native-link-scheduler/src/platform_property_manager.rs +++ b/native-link-scheduler/src/platform_property_manager.rs @@ -12,109 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; use std::collections::HashMap; use error::{make_input_err, Code, Error, ResultExt}; use native_link_config::schedulers::PropertyType; -use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform; - -/// `PlatformProperties` helps manage the configuration of platform properties to -/// keys and types. The scheduler uses these properties to decide what jobs -/// can be assigned to different workers. For example, if a job states it needs -/// a specific key, it will never be run on a worker that does not have at least -/// all the platform property keys configured on the worker. -/// -/// Additional rules may be applied based on `PlatfromPropertyValue`. -#[derive(Eq, PartialEq, Clone, Debug, Default)] -pub struct PlatformProperties { - pub properties: HashMap, -} - -impl PlatformProperties { - #[must_use] - pub const fn new(map: HashMap) -> Self { - Self { properties: map } - } - - /// Determines if the worker's `PlatformProperties` is satisfied by this struct. - #[must_use] - pub fn is_satisfied_by(&self, worker_properties: &Self) -> bool { - for (property, check_value) in &self.properties { - if let Some(worker_value) = worker_properties.properties.get(property) { - if !check_value.is_satisfied_by(worker_value) { - return false; - } - } else { - return false; - } - } - true - } -} - -impl From for PlatformProperties { - fn from(platform: ProtoPlatform) -> Self { - let mut properties = HashMap::with_capacity(platform.properties.len()); - for property in platform.properties { - properties.insert(property.name, PlatformPropertyValue::Unknown(property.value)); - } - Self { properties } - } -} - -/// Holds the associated value of the key and type. -/// -/// Exact - Means the worker must have this exact value. -/// Minimum - Means that workers must have at least this number available. When -/// a worker executes a task that has this value, the worker will have -/// this value subtracted from the available resources of the worker. -/// Priority - Means the worker is given this information, but does not restrict -/// what workers can take this value. However, the worker must have the -/// associated key present to be matched. -/// TODO(allada) In the future this will be used by the scheduler and -/// worker to cause the scheduler to prefer certain workers over others, -/// but not restrict them based on these values. -#[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)] -pub enum PlatformPropertyValue { - Exact(String), - Minimum(u64), - Priority(String), - Unknown(String), -} - -impl PlatformPropertyValue { - /// Same as `PlatformProperties::is_satisfied_by`, but on an individual value. - #[must_use] - pub fn is_satisfied_by(&self, worker_value: &Self) -> bool { - if self == worker_value { - return true; - } - match self { - Self::Minimum(v) => { - if let Self::Minimum(worker_v) = worker_value { - return worker_v >= v; - } - false - } - // Priority is used to pass info to the worker and not restrict which - // workers can be selected, but might be used to prefer certain workers - // over others. - Self::Priority(_) => true, - // Success exact case is handled above. - Self::Exact(_) | Self::Unknown(_) => false, - } - } - - pub fn as_str(&self) -> Cow { - match self { - Self::Exact(value) => Cow::Borrowed(value), - Self::Minimum(value) => Cow::Owned(value.to_string()), - Self::Priority(value) => Cow::Borrowed(value), - Self::Unknown(value) => Cow::Borrowed(value), - } - } -} +use native_link_util::platform_properties::PlatformPropertyValue; /// Helps manage known properties and conversion into `PlatformPropertyValue`. pub struct PlatformPropertyManager { diff --git a/native-link-scheduler/src/property_modifier_scheduler.rs b/native-link-scheduler/src/property_modifier_scheduler.rs index 7dfc40387..4a09114a6 100644 --- a/native-link-scheduler/src/property_modifier_scheduler.rs +++ b/native-link-scheduler/src/property_modifier_scheduler.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use async_trait::async_trait; use error::{Error, ResultExt}; use native_link_config::schedulers::{PropertyModification, PropertyType}; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use parking_lot::Mutex; use tokio::sync::watch; -use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; diff --git a/native-link-scheduler/src/simple_scheduler.rs b/native-link-scheduler/src/simple_scheduler.rs index cc8bfae8a..eab0517a6 100644 --- a/native-link-scheduler/src/simple_scheduler.rs +++ b/native-link-scheduler/src/simple_scheduler.rs @@ -26,20 +26,21 @@ use futures::Future; use hashbrown::{HashMap, HashSet}; use lru::LruCache; use native_link_config::schedulers::WorkerAllocationStrategy; +use native_link_util::action_messages::{ + ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, +}; use native_link_util::common::log; use native_link_util::metrics_utils::{ AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent, Registry, }; +use native_link_util::platform_properties::PlatformPropertyValue; use parking_lot::{Mutex, MutexGuard}; use tokio::sync::{watch, Notify}; use tokio::task::JoinHandle; use tokio::time::Duration; -use crate::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, -}; use crate::action_scheduler::ActionScheduler; -use crate::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue}; +use crate::platform_property_manager::PlatformPropertyManager; use crate::worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate}; use crate::worker_scheduler::WorkerScheduler; diff --git a/native-link-scheduler/src/worker.rs b/native-link-scheduler/src/worker.rs index fd285d75a..b2d6d90ef 100644 --- a/native-link-scheduler/src/worker.rs +++ b/native-link-scheduler/src/worker.rs @@ -18,16 +18,15 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use error::{make_err, make_input_err, Code, Error, ResultExt}; +use native_link_util::action_messages::ActionInfo; use native_link_util::metrics_utils::{CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent}; +use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; use proto::com::github::trace_machina::native_link::remote_execution::{ update_for_worker, ConnectionResult, StartExecute, UpdateForWorker, }; use tokio::sync::mpsc::UnboundedSender; use uuid::Uuid; -use crate::action_messages::ActionInfo; -use crate::platform_property_manager::{PlatformProperties, PlatformPropertyValue}; - pub type WorkerTimestamp = u64; /// Unique id of worker. diff --git a/native-link-scheduler/src/worker_scheduler.rs b/native-link-scheduler/src/worker_scheduler.rs index fa8cf02f4..fb4c1189a 100644 --- a/native-link-scheduler/src/worker_scheduler.rs +++ b/native-link-scheduler/src/worker_scheduler.rs @@ -16,9 +16,9 @@ use std::sync::Arc; use async_trait::async_trait; use error::Error; +use native_link_util::action_messages::{ActionInfoHashKey, ActionStage}; use native_link_util::metrics_utils::Registry; -use crate::action_messages::{ActionInfoHashKey, ActionStage}; use crate::platform_property_manager::PlatformPropertyManager; use crate::worker::{Worker, WorkerId, WorkerTimestamp}; diff --git a/native-link-scheduler/tests/action_messages_test.rs b/native-link-scheduler/tests/action_messages_test.rs index 68ae2b2c2..5434fc521 100644 --- a/native-link-scheduler/tests/action_messages_test.rs +++ b/native-link-scheduler/tests/action_messages_test.rs @@ -17,12 +17,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use error::Error; -use native_link_scheduler::action_messages::{ +use native_link_util::action_messages::{ ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata, }; -use native_link_scheduler::platform_property_manager::PlatformProperties; use native_link_util::common::DigestInfo; use native_link_util::digest_hasher::DigestHasherFunc; +use native_link_util::platform_properties::PlatformProperties; use proto::build::bazel::remote::execution::v2::ExecuteResponse; use proto::google::longrunning::{operation, Operation}; use proto::google::rpc::Status; diff --git a/native-link-scheduler/tests/cache_lookup_scheduler_test.rs b/native-link-scheduler/tests/cache_lookup_scheduler_test.rs index 5f7cf7dea..7a721b2d6 100644 --- a/native-link-scheduler/tests/cache_lookup_scheduler_test.rs +++ b/native-link-scheduler/tests/cache_lookup_scheduler_test.rs @@ -24,13 +24,11 @@ mod utils { use error::{Error, ResultExt}; use futures::join; -use native_link_scheduler::action_messages::{ - ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo, -}; use native_link_scheduler::action_scheduler::ActionScheduler; use native_link_scheduler::cache_lookup_scheduler::CacheLookupScheduler; use native_link_scheduler::platform_property_manager::PlatformPropertyManager; use native_link_store::memory_store::MemoryStore; +use native_link_util::action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo}; use native_link_util::common::DigestInfo; use native_link_util::store_trait::Store; use prost::Message; diff --git a/native-link-scheduler/tests/property_modifier_scheduler_test.rs b/native-link-scheduler/tests/property_modifier_scheduler_test.rs index 0a8fb61a7..1572361a7 100644 --- a/native-link-scheduler/tests/property_modifier_scheduler_test.rs +++ b/native-link-scheduler/tests/property_modifier_scheduler_test.rs @@ -25,11 +25,12 @@ mod utils { use error::Error; use futures::join; use native_link_config::schedulers::{PlatformPropertyAddition, PropertyModification, PropertyType}; -use native_link_scheduler::action_messages::{ActionInfoHashKey, ActionStage, ActionState}; use native_link_scheduler::action_scheduler::ActionScheduler; -use native_link_scheduler::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue}; +use native_link_scheduler::platform_property_manager::PlatformPropertyManager; use native_link_scheduler::property_modifier_scheduler::PropertyModifierScheduler; +use native_link_util::action_messages::{ActionInfoHashKey, ActionStage, ActionState}; use native_link_util::common::DigestInfo; +use native_link_util::platform_properties::PlatformPropertyValue; use tokio::sync::watch; use utils::mock_scheduler::MockActionScheduler; use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME}; diff --git a/native-link-scheduler/tests/simple_scheduler_test.rs b/native-link-scheduler/tests/simple_scheduler_test.rs index 4ad77c361..ab7fe3941 100644 --- a/native-link-scheduler/tests/simple_scheduler_test.rs +++ b/native-link-scheduler/tests/simple_scheduler_test.rs @@ -18,12 +18,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use error::{make_err, Code, Error, ResultExt}; -use native_link_scheduler::action_messages::{ +use native_link_scheduler::action_scheduler::ActionScheduler; +use native_link_util::action_messages::{ ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo, INTERNAL_ERROR_EXIT_CODE, }; -use native_link_scheduler::action_scheduler::ActionScheduler; -use native_link_scheduler::platform_property_manager::{PlatformProperties, PlatformPropertyValue}; +use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; mod utils { pub(crate) mod scheduler_utils; } diff --git a/native-link-scheduler/tests/utils/mock_scheduler.rs b/native-link-scheduler/tests/utils/mock_scheduler.rs index 6bbb079e2..13b89fa8f 100644 --- a/native-link-scheduler/tests/utils/mock_scheduler.rs +++ b/native-link-scheduler/tests/utils/mock_scheduler.rs @@ -16,9 +16,9 @@ use std::sync::Arc; use async_trait::async_trait; use error::{make_input_err, Error}; -use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use native_link_scheduler::action_scheduler::ActionScheduler; use native_link_scheduler::platform_property_manager::PlatformPropertyManager; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState}; use tokio::sync::{mpsc, watch, Mutex}; #[allow(clippy::large_enum_variant)] diff --git a/native-link-scheduler/tests/utils/scheduler_utils.rs b/native-link-scheduler/tests/utils/scheduler_utils.rs index 61b3101da..eb7081904 100644 --- a/native-link-scheduler/tests/utils/scheduler_utils.rs +++ b/native-link-scheduler/tests/utils/scheduler_utils.rs @@ -15,10 +15,10 @@ use std::collections::HashMap; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey}; -use native_link_scheduler::platform_property_manager::PlatformProperties; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey}; use native_link_util::common::DigestInfo; use native_link_util::digest_hasher::DigestHasherFunc; +use native_link_util::platform_properties::PlatformProperties; pub const INSTANCE_NAME: &str = "foobar_instance_name"; diff --git a/native-link-service/src/execution_server.rs b/native-link-service/src/execution_server.rs index 44de3d254..5623383e5 100644 --- a/native-link-service/src/execution_server.rs +++ b/native-link-service/src/execution_server.rs @@ -20,13 +20,13 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use error::{make_input_err, Error, ResultExt}; use futures::{Stream, StreamExt}; use native_link_config::cas_server::{ExecutionConfig, InstanceName}; -use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use native_link_scheduler::action_scheduler::ActionScheduler; -use native_link_scheduler::platform_property_manager::PlatformProperties; use native_link_store::ac_utils::get_and_decode_digest; use native_link_store::store_manager::StoreManager; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY}; use native_link_util::common::{log, DigestInfo}; use native_link_util::digest_hasher::DigestHasherFunc; +use native_link_util::platform_properties::PlatformProperties; use native_link_util::store_trait::Store; use proto::build::bazel::remote::execution::v2::execution_server::{Execution, ExecutionServer as Server}; use proto::build::bazel::remote::execution::v2::{Action, Command, ExecuteRequest, WaitExecutionRequest}; diff --git a/native-link-service/src/worker_api_server.rs b/native-link-service/src/worker_api_server.rs index 8c2d628b7..ecfd53bce 100644 --- a/native-link-service/src/worker_api_server.rs +++ b/native-link-service/src/worker_api_server.rs @@ -21,11 +21,11 @@ use error::{make_err, Code, Error, ResultExt}; use futures::stream::unfold; use futures::Stream; use native_link_config::cas_server::WorkerApiConfig; -use native_link_scheduler::action_messages::ActionInfoHashKey; -use native_link_scheduler::platform_property_manager::PlatformProperties; use native_link_scheduler::worker::{Worker, WorkerId}; use native_link_scheduler::worker_scheduler::WorkerScheduler; +use native_link_util::action_messages::ActionInfoHashKey; use native_link_util::common::{log, DigestInfo}; +use native_link_util::platform_properties::PlatformProperties; use proto::com::github::trace_machina::native_link::remote_execution::worker_api_server::{ WorkerApi, WorkerApiServer as Server, }; diff --git a/native-link-service/tests/worker_api_server_test.rs b/native-link-service/tests/worker_api_server_test.rs index d1116e028..e73214253 100644 --- a/native-link-service/tests/worker_api_server_test.rs +++ b/native-link-service/tests/worker_api_server_test.rs @@ -18,15 +18,15 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use error::{Error, ResultExt}; use native_link_config::cas_server::WorkerApiConfig; -use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionStage}; use native_link_scheduler::action_scheduler::ActionScheduler; -use native_link_scheduler::platform_property_manager::PlatformProperties; use native_link_scheduler::simple_scheduler::SimpleScheduler; use native_link_scheduler::worker::WorkerId; use native_link_scheduler::worker_scheduler::WorkerScheduler; use native_link_service::worker_api_server::{ConnectWorkerStream, NowFn, WorkerApiServer}; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionStage}; use native_link_util::common::DigestInfo; use native_link_util::digest_hasher::DigestHasherFunc; +use native_link_util::platform_properties::PlatformProperties; use proto::build::bazel::remote::execution::v2::{ ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, OutputDirectory, OutputFile, OutputSymlink, diff --git a/native-link-util/BUILD.bazel b/native-link-util/BUILD.bazel index f6d736461..7e05ced76 100644 --- a/native-link-util/BUILD.bazel +++ b/native-link-util/BUILD.bazel @@ -9,6 +9,7 @@ load( rust_library( name = "native-link-util", srcs = [ + "src/action_messages.rs", "src/async_fixed_buffer.rs", "src/buf_channel.rs", "src/common.rs", @@ -18,6 +19,7 @@ rust_library( "src/fs.rs", "src/lib.rs", "src/metrics_utils.rs", + "src/platform_properties.rs", "src/resource_info.rs", "src/retry.rs", "src/store_trait.rs", @@ -44,6 +46,7 @@ rust_library( "@crate_index//:pin-project-lite", "@crate_index//:prometheus-client", "@crate_index//:prost", + "@crate_index//:prost-types", "@crate_index//:serde", "@crate_index//:sha2", "@crate_index//:tokio", diff --git a/native-link-util/Cargo.toml b/native-link-util/Cargo.toml index d64dd10dc..520b58fe8 100644 --- a/native-link-util/Cargo.toml +++ b/native-link-util/Cargo.toml @@ -21,6 +21,7 @@ parking_lot = "0.12.1" pin-project-lite = "0.2.10" prometheus-client = "0.21.2" prost = "0.11.9" +prost-types = "0.11.9" serde = { version = "1.0.167", features = ["derive"] } sha2 = "0.10.7" tokio = { version = "1.29.1", features = [ "sync", "fs", "rt", "time", "io-util" ] } diff --git a/native-link-scheduler/src/action_messages.rs b/native-link-util/src/action_messages.rs similarity index 93% rename from native-link-scheduler/src/action_messages.rs rename to native-link-util/src/action_messages.rs index b7c60d5f4..9da4029b0 100644 --- a/native-link-scheduler/src/action_messages.rs +++ b/native-link-util/src/action_messages.rs @@ -22,9 +22,6 @@ use std::time::{Duration, SystemTime}; use blake3::Hasher as Blake3Hasher; use error::{error_if, make_input_err, Error, ResultExt}; -use native_link_util::common::{DigestInfo, HashMapExt, VecExt}; -use native_link_util::digest_hasher::DigestHasherFunc; -use native_link_util::metrics_utils::{CollectorState, MetricsComponent}; use prost::bytes::Bytes; use prost::Message; use prost_types::Any; @@ -37,7 +34,10 @@ use proto::google::longrunning::operation::Result as LongRunningResult; use proto::google::longrunning::Operation; use proto::google::rpc::Status; -use crate::platform_property_manager::PlatformProperties; +use crate::common::{DigestInfo, HashMapExt, VecExt}; +use crate::digest_hasher::DigestHasherFunc; +use crate::metrics_utils::{CollectorState, MetricsComponent}; +use crate::platform_properties::PlatformProperties; /// Default priority remote execution jobs will get when not provided. pub const DEFAULT_EXECUTION_PRIORITY: i32 = 0; @@ -755,6 +755,69 @@ impl From for ProtoActionResult { } } +impl TryFrom for ActionResult { + type Error = Error; + + fn try_from(val: ProtoActionResult) -> Result { + let output_file_symlinks = val + .output_file_symlinks + .into_iter() + .map(|output_symlink| { + SymlinkInfo::try_from(output_symlink) + .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo") + }) + .collect::, _>>()?; + + let output_directory_symlinks = val + .output_directory_symlinks + .into_iter() + .map(|output_symlink| { + SymlinkInfo::try_from(output_symlink) + .err_tip(|| "Output File Symlinks could not be converted to SymlinkInfo") + }) + .collect::, _>>()?; + + let output_files = val + .output_files + .into_iter() + .map(|output_file| output_file.try_into().err_tip(|| "Output File could not be converted")) + .collect::, _>>()?; + + let output_folders = val + .output_directories + .into_iter() + .map(|output_directory| { + output_directory + .try_into() + .err_tip(|| "Output File could not be converted") + }) + .collect::, _>>()?; + + Ok(Self { + output_files, + output_folders, + output_file_symlinks, + output_directory_symlinks, + exit_code: val.exit_code, + stdout_digest: val + .stdout_digest + .err_tip(|| "Expected stdout_digest to be set on ExecuteResponse msg")? + .try_into()?, + stderr_digest: val + .stderr_digest + .err_tip(|| "Expected stderr_digest to be set on ExecuteResponse msg")? + .try_into()?, + execution_metadata: val + .execution_metadata + .err_tip(|| "Expected execution_metadata to be set on ExecuteResponse msg")? + .try_into()?, + server_logs: Default::default(), + error: None, + message: String::new(), + }) + } +} + impl TryFrom for ActionStage { type Error = Error; diff --git a/native-link-util/src/lib.rs b/native-link-util/src/lib.rs index 24974d574..7ec984375 100644 --- a/native-link-util/src/lib.rs +++ b/native-link-util/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod action_messages; pub mod async_fixed_buffer; pub mod buf_channel; pub mod common; @@ -20,6 +21,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod metrics_utils; +pub mod platform_properties; pub mod resource_info; pub mod retry; pub mod store_trait; diff --git a/native-link-util/src/platform_properties.rs b/native-link-util/src/platform_properties.rs new file mode 100644 index 000000000..f83fae6f7 --- /dev/null +++ b/native-link-util/src/platform_properties.rs @@ -0,0 +1,115 @@ +// Copyright 2023 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; + +use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform; + +/// `PlatformProperties` helps manage the configuration of platform properties to +/// keys and types. The scheduler uses these properties to decide what jobs +/// can be assigned to different workers. For example, if a job states it needs +/// a specific key, it will never be run on a worker that does not have at least +/// all the platform property keys configured on the worker. +/// +/// Additional rules may be applied based on `PlatfromPropertyValue`. +#[derive(Eq, PartialEq, Clone, Debug, Default)] +pub struct PlatformProperties { + pub properties: HashMap, +} + +impl PlatformProperties { + #[must_use] + pub const fn new(map: HashMap) -> Self { + Self { properties: map } + } + + /// Determines if the worker's `PlatformProperties` is satisfied by this struct. + #[must_use] + pub fn is_satisfied_by(&self, worker_properties: &Self) -> bool { + for (property, check_value) in &self.properties { + if let Some(worker_value) = worker_properties.properties.get(property) { + if !check_value.is_satisfied_by(worker_value) { + return false; + } + } else { + return false; + } + } + true + } +} + +impl From for PlatformProperties { + fn from(platform: ProtoPlatform) -> Self { + let mut properties = HashMap::with_capacity(platform.properties.len()); + for property in platform.properties { + properties.insert(property.name, PlatformPropertyValue::Unknown(property.value)); + } + Self { properties } + } +} + +/// Holds the associated value of the key and type. +/// +/// Exact - Means the worker must have this exact value. +/// Minimum - Means that workers must have at least this number available. When +/// a worker executes a task that has this value, the worker will have +/// this value subtracted from the available resources of the worker. +/// Priority - Means the worker is given this information, but does not restrict +/// what workers can take this value. However, the worker must have the +/// associated key present to be matched. +/// TODO(allada) In the future this will be used by the scheduler and +/// worker to cause the scheduler to prefer certain workers over others, +/// but not restrict them based on these values. +#[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)] +pub enum PlatformPropertyValue { + Exact(String), + Minimum(u64), + Priority(String), + Unknown(String), +} + +impl PlatformPropertyValue { + /// Same as `PlatformProperties::is_satisfied_by`, but on an individual value. + #[must_use] + pub fn is_satisfied_by(&self, worker_value: &Self) -> bool { + if self == worker_value { + return true; + } + match self { + Self::Minimum(v) => { + if let Self::Minimum(worker_v) = worker_value { + return worker_v >= v; + } + false + } + // Priority is used to pass info to the worker and not restrict which + // workers can be selected, but might be used to prefer certain workers + // over others. + Self::Priority(_) => true, + // Success exact case is handled above. + Self::Exact(_) | Self::Unknown(_) => false, + } + } + + pub fn as_str(&self) -> Cow { + match self { + Self::Exact(value) => Cow::Borrowed(value), + Self::Minimum(value) => Cow::Owned(value.to_string()), + Self::Priority(value) => Cow::Borrowed(value), + Self::Unknown(value) => Cow::Borrowed(value), + } + } +} diff --git a/native-link-worker/src/local_worker.rs b/native-link-worker/src/local_worker.rs index 6fc6a117d..e497d10d9 100644 --- a/native-link-worker/src/local_worker.rs +++ b/native-link-worker/src/local_worker.rs @@ -24,8 +24,8 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{select, Future, FutureExt, StreamExt, TryFutureExt}; use native_link_config::cas_server::LocalWorkerConfig; -use native_link_scheduler::action_messages::{ActionResult, ActionStage}; use native_link_store::fast_slow_store::FastSlowStore; +use native_link_util::action_messages::{ActionResult, ActionStage}; use native_link_util::common::{fs, log}; use native_link_util::digest_hasher::DigestHasherFunc; use native_link_util::metrics_utils::{ diff --git a/native-link-worker/src/running_actions_manager.rs b/native-link-worker/src/running_actions_manager.rs index 7b3b10832..27c9baf10 100644 --- a/native-link-worker/src/running_actions_manager.rs +++ b/native-link-worker/src/running_actions_manager.rs @@ -36,9 +36,6 @@ use formatx::Template; use futures::future::{try_join, try_join3, try_join_all, BoxFuture, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt}; use native_link_config::cas_server::{EnvironmentSource, UploadActionResultConfig, UploadCacheResultsStrategy}; -use native_link_scheduler::action_messages::{ - to_execute_response, ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo, -}; use native_link_store::ac_utils::{ compute_buf_digest, compute_digest, get_and_decode_digest, serialize_and_upload_message, upload_buf_to_store, upload_file_to_store, ESTIMATED_DIGEST_SIZE, @@ -46,6 +43,9 @@ use native_link_store::ac_utils::{ use native_link_store::fast_slow_store::FastSlowStore; use native_link_store::filesystem_store::{FileEntry, FilesystemStore}; use native_link_store::grpc_store::GrpcStore; +use native_link_util::action_messages::{ + to_execute_response, ActionInfo, ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo, +}; use native_link_util::common::{fs, log, DigestInfo, JoinHandleDropGuard}; use native_link_util::digest_hasher::DigestHasherFunc; use native_link_util::metrics_utils::{AsyncCounterWrapper, CollectorState, CounterWithTime, MetricsComponent}; diff --git a/native-link-worker/tests/local_worker_test.rs b/native-link-worker/tests/local_worker_test.rs index d62eb1761..98498a144 100644 --- a/native-link-worker/tests/local_worker_test.rs +++ b/native-link-worker/tests/local_worker_test.rs @@ -31,15 +31,13 @@ mod utils { use error::{make_err, make_input_err, Code, Error}; use native_link_config::cas_server::{LocalWorkerConfig, WorkerProperty}; -use native_link_scheduler::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata, -}; -use native_link_scheduler::platform_property_manager::PlatformProperties; use native_link_store::fast_slow_store::FastSlowStore; use native_link_store::filesystem_store::FilesystemStore; use native_link_store::memory_store::MemoryStore; +use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata}; use native_link_util::common::{encode_stream_proto, fs, DigestInfo}; use native_link_util::digest_hasher::DigestHasherFunc; +use native_link_util::platform_properties::PlatformProperties; use native_link_worker::local_worker::new_local_worker; use prost::Message; use proto::build::bazel::remote::execution::v2::platform::Property; diff --git a/native-link-worker/tests/running_actions_manager_test.rs b/native-link-worker/tests/running_actions_manager_test.rs index c1f283145..45488b02b 100644 --- a/native-link-worker/tests/running_actions_manager_test.rs +++ b/native-link-worker/tests/running_actions_manager_test.rs @@ -29,14 +29,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use error::{make_input_err, Code, Error, ResultExt}; use futures::{FutureExt, TryFutureExt}; use native_link_config::cas_server::EnvironmentSource; -#[cfg_attr(target_family = "windows", allow(unused_imports))] -use native_link_scheduler::action_messages::{ - ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo, -}; use native_link_store::ac_utils::{compute_digest, get_and_decode_digest, serialize_and_upload_message}; use native_link_store::fast_slow_store::FastSlowStore; use native_link_store::filesystem_store::FilesystemStore; use native_link_store::memory_store::MemoryStore; +#[cfg_attr(target_family = "windows", allow(unused_imports))] +use native_link_util::action_messages::{ + ActionResult, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, SymlinkInfo, +}; use native_link_util::common::{fs, DigestInfo}; use native_link_util::digest_hasher::DigestHasherFunc; use native_link_util::store_trait::Store; diff --git a/native-link-worker/tests/utils/mock_running_actions_manager.rs b/native-link-worker/tests/utils/mock_running_actions_manager.rs index e7b5b2437..17cb61fb4 100644 --- a/native-link-worker/tests/utils/mock_running_actions_manager.rs +++ b/native-link-worker/tests/utils/mock_running_actions_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_lock::Mutex; use async_trait::async_trait; use error::{make_input_err, Error}; -use native_link_scheduler::action_messages::ActionResult; +use native_link_util::action_messages::ActionResult; use native_link_util::common::DigestInfo; use native_link_util::digest_hasher::DigestHasherFunc; use native_link_worker::running_actions_manager::{Metrics, RunningAction, RunningActionsManager};