Skip to content

Commit

Permalink
Move action_messages & part of platform_properties to utils crate (#417)
Browse files Browse the repository at this point in the history
Moves action_messages helper and part of the platform properties
to the utils crate so it can be used in stores.
  • Loading branch information
allada committed Nov 27, 2023
1 parent 08b2954 commit 6bc991c
Show file tree
Hide file tree
Showing 31 changed files with 231 additions and 152 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions native-link-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ load(
rust_library(
name = "native-link-scheduler",
srcs = [
"src/action_messages.rs",
"src/action_scheduler.rs",
"src/cache_lookup_scheduler.rs",
"src/default_scheduler_factory.rs",
Expand Down Expand Up @@ -38,7 +37,6 @@ rust_library(
"@crate_index//:lru",
"@crate_index//:parking_lot",
"@crate_index//:prost",
"@crate_index//:prost-types",
"@crate_index//:rand",
"@crate_index//:scopeguard",
"@crate_index//:tokio",
Expand Down
1 change: 0 additions & 1 deletion native-link-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ futures = "0.3.28"
hashbrown = "0.14"
lru = "0.10.1"
parking_lot = "0.12.1"
prost-types = "0.11.9"
rand = "0.8.5"
scopeguard = "1.2.0"
tokio = { version = "1.29.1", features = ["sync", "rt", "parking_lot"] }
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::Error;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use native_link_util::metrics_utils::Registry;
use tokio::sync::watch;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use crate::platform_property_manager::PlatformPropertyManager;

/// ActionScheduler interface is responsible for interactions between the scheduler
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use error::Error;
use futures::stream::StreamExt;
use native_link_store::ac_utils::get_and_decode_digest;
use native_link_store::grpc_store::GrpcStore;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use native_link_util::common::DigestInfo;
use native_link_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
Expand All @@ -33,7 +34,6 @@ use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tonic::Request;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use error::{make_err, Code, Error, ResultExt};
use futures::stream::unfold;
use futures::TryFutureExt;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_util::common::log;
use native_link_util::retry::{ExponentialBackoff, Retrier, RetryResult};
use parking_lot::Mutex;
Expand All @@ -37,7 +38,6 @@ use tokio::sync::watch;
use tokio::time::sleep;
use tonic::{transport, Request, Streaming};

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
1 change: 0 additions & 1 deletion native-link-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod action_messages;
pub mod action_scheduler;
pub mod cache_lookup_scheduler;
pub mod default_scheduler_factory;
Expand Down
100 changes: 1 addition & 99 deletions native-link-scheduler/src/platform_property_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,109 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::HashMap;

use error::{make_input_err, Code, Error, ResultExt};
use native_link_config::schedulers::PropertyType;
use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform;

/// `PlatformProperties` helps manage the configuration of platform properties to
/// keys and types. The scheduler uses these properties to decide what jobs
/// can be assigned to different workers. For example, if a job states it needs
/// a specific key, it will never be run on a worker that does not have at least
/// all the platform property keys configured on the worker.
///
/// Additional rules may be applied based on `PlatfromPropertyValue`.
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct PlatformProperties {
pub properties: HashMap<String, PlatformPropertyValue>,
}

impl PlatformProperties {
#[must_use]
pub const fn new(map: HashMap<String, PlatformPropertyValue>) -> Self {
Self { properties: map }
}

/// Determines if the worker's `PlatformProperties` is satisfied by this struct.
#[must_use]
pub fn is_satisfied_by(&self, worker_properties: &Self) -> bool {
for (property, check_value) in &self.properties {
if let Some(worker_value) = worker_properties.properties.get(property) {
if !check_value.is_satisfied_by(worker_value) {
return false;
}
} else {
return false;
}
}
true
}
}

impl From<ProtoPlatform> for PlatformProperties {
fn from(platform: ProtoPlatform) -> Self {
let mut properties = HashMap::with_capacity(platform.properties.len());
for property in platform.properties {
properties.insert(property.name, PlatformPropertyValue::Unknown(property.value));
}
Self { properties }
}
}

/// Holds the associated value of the key and type.
///
/// Exact - Means the worker must have this exact value.
/// Minimum - Means that workers must have at least this number available. When
/// a worker executes a task that has this value, the worker will have
/// this value subtracted from the available resources of the worker.
/// Priority - Means the worker is given this information, but does not restrict
/// what workers can take this value. However, the worker must have the
/// associated key present to be matched.
/// TODO(allada) In the future this will be used by the scheduler and
/// worker to cause the scheduler to prefer certain workers over others,
/// but not restrict them based on these values.
#[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)]
pub enum PlatformPropertyValue {
Exact(String),
Minimum(u64),
Priority(String),
Unknown(String),
}

impl PlatformPropertyValue {
/// Same as `PlatformProperties::is_satisfied_by`, but on an individual value.
#[must_use]
pub fn is_satisfied_by(&self, worker_value: &Self) -> bool {
if self == worker_value {
return true;
}
match self {
Self::Minimum(v) => {
if let Self::Minimum(worker_v) = worker_value {
return worker_v >= v;
}
false
}
// Priority is used to pass info to the worker and not restrict which
// workers can be selected, but might be used to prefer certain workers
// over others.
Self::Priority(_) => true,
// Success exact case is handled above.
Self::Exact(_) | Self::Unknown(_) => false,
}
}

pub fn as_str(&self) -> Cow<str> {
match self {
Self::Exact(value) => Cow::Borrowed(value),
Self::Minimum(value) => Cow::Owned(value.to_string()),
Self::Priority(value) => Cow::Borrowed(value),
Self::Unknown(value) => Cow::Borrowed(value),
}
}
}
use native_link_util::platform_properties::PlatformPropertyValue;

/// Helps manage known properties and conversion into `PlatformPropertyValue`.
pub struct PlatformPropertyManager {
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use error::{Error, ResultExt};
use native_link_config::schedulers::{PropertyModification, PropertyType};
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use parking_lot::Mutex;
use tokio::sync::watch;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
9 changes: 5 additions & 4 deletions native-link-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@ use futures::Future;
use hashbrown::{HashMap, HashSet};
use lru::LruCache;
use native_link_config::schedulers::WorkerAllocationStrategy;
use native_link_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use native_link_util::common::log;
use native_link_util::metrics_utils::{
AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent, Registry,
};
use native_link_util::platform_properties::PlatformPropertyValue;
use parking_lot::{Mutex, MutexGuard};
use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;

use crate::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue};
use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate};
use crate::worker_scheduler::WorkerScheduler;

Expand Down
5 changes: 2 additions & 3 deletions native-link-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use error::{make_err, make_input_err, Code, Error, ResultExt};
use native_link_util::action_messages::ActionInfo;
use native_link_util::metrics_utils::{CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent};
use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
use proto::com::github::trace_machina::native_link::remote_execution::{
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
};
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;

use crate::action_messages::ActionInfo;
use crate::platform_property_manager::{PlatformProperties, PlatformPropertyValue};

pub type WorkerTimestamp = u64;

/// Unique id of worker.
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::Error;
use native_link_util::action_messages::{ActionInfoHashKey, ActionStage};
use native_link_util::metrics_utils::Registry;

use crate::action_messages::{ActionInfoHashKey, ActionStage};
use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerId, WorkerTimestamp};

Expand Down
4 changes: 2 additions & 2 deletions native-link-scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use error::Error;
use native_link_scheduler::action_messages::{
use native_link_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_util::common::DigestInfo;
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;
use proto::build::bazel::remote::execution::v2::ExecuteResponse;
use proto::google::longrunning::{operation, Operation};
use proto::google::rpc::Status;
Expand Down
4 changes: 1 addition & 3 deletions native-link-scheduler/tests/cache_lookup_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ mod utils {

use error::{Error, ResultExt};
use futures::join;
use native_link_scheduler::action_messages::{
ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo,
};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::cache_lookup_scheduler::CacheLookupScheduler;
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_store::memory_store::MemoryStore;
use native_link_util::action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo};
use native_link_util::common::DigestInfo;
use native_link_util::store_trait::Store;
use prost::Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ mod utils {
use error::Error;
use futures::join;
use native_link_config::schedulers::{PlatformPropertyAddition, PropertyModification, PropertyType};
use native_link_scheduler::action_messages::{ActionInfoHashKey, ActionStage, ActionState};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue};
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_scheduler::property_modifier_scheduler::PropertyModifierScheduler;
use native_link_util::action_messages::{ActionInfoHashKey, ActionStage, ActionState};
use native_link_util::common::DigestInfo;
use native_link_util::platform_properties::PlatformPropertyValue;
use tokio::sync::watch;
use utils::mock_scheduler::MockActionScheduler;
use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME};
Expand Down
6 changes: 3 additions & 3 deletions native-link-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use error::{make_err, Code, Error, ResultExt};
use native_link_scheduler::action_messages::{
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_util::action_messages::{
ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath,
SymlinkInfo, INTERNAL_ERROR_EXIT_CODE,
};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::{PlatformProperties, PlatformPropertyValue};
use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
mod utils {
pub(crate) mod scheduler_utils;
}
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/tests/utils/mock_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::{make_input_err, Error};
use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use tokio::sync::{mpsc, watch, Mutex};

#[allow(clippy::large_enum_variant)]
Expand Down
4 changes: 2 additions & 2 deletions native-link-scheduler/tests/utils/scheduler_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey};
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey};
use native_link_util::common::DigestInfo;
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;

pub const INSTANCE_NAME: &str = "foobar_instance_name";

Expand Down
4 changes: 2 additions & 2 deletions native-link-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use error::{make_input_err, Error, ResultExt};
use futures::{Stream, StreamExt};
use native_link_config::cas_server::{ExecutionConfig, InstanceName};
use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_store::ac_utils::get_and_decode_digest;
use native_link_store::store_manager::StoreManager;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_util::common::{log, DigestInfo};
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;
use native_link_util::store_trait::Store;
use proto::build::bazel::remote::execution::v2::execution_server::{Execution, ExecutionServer as Server};
use proto::build::bazel::remote::execution::v2::{Action, Command, ExecuteRequest, WaitExecutionRequest};
Expand Down
4 changes: 2 additions & 2 deletions native-link-service/src/worker_api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use error::{make_err, Code, Error, ResultExt};
use futures::stream::unfold;
use futures::Stream;
use native_link_config::cas_server::WorkerApiConfig;
use native_link_scheduler::action_messages::ActionInfoHashKey;
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_scheduler::worker::{Worker, WorkerId};
use native_link_scheduler::worker_scheduler::WorkerScheduler;
use native_link_util::action_messages::ActionInfoHashKey;
use native_link_util::common::{log, DigestInfo};
use native_link_util::platform_properties::PlatformProperties;
use proto::com::github::trace_machina::native_link::remote_execution::worker_api_server::{
WorkerApi, WorkerApiServer as Server,
};
Expand Down

0 comments on commit 6bc991c

Please sign in to comment.