Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move action_messages & part of platform_properties to utils crate #417

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions native-link-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ load(
rust_library(
name = "native-link-scheduler",
srcs = [
"src/action_messages.rs",
"src/action_scheduler.rs",
"src/cache_lookup_scheduler.rs",
"src/default_scheduler_factory.rs",
Expand Down Expand Up @@ -38,7 +37,6 @@ rust_library(
"@crate_index//:lru",
"@crate_index//:parking_lot",
"@crate_index//:prost",
"@crate_index//:prost-types",
"@crate_index//:rand",
"@crate_index//:scopeguard",
"@crate_index//:tokio",
Expand Down
1 change: 0 additions & 1 deletion native-link-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ futures = "0.3.28"
hashbrown = "0.14"
lru = "0.10.1"
parking_lot = "0.12.1"
prost-types = "0.11.9"
rand = "0.8.5"
scopeguard = "1.2.0"
tokio = { version = "1.29.1", features = ["sync", "rt", "parking_lot"] }
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::Error;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use native_link_util::metrics_utils::Registry;
use tokio::sync::watch;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use crate::platform_property_manager::PlatformPropertyManager;

/// ActionScheduler interface is responsible for interactions between the scheduler
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use error::Error;
use futures::stream::StreamExt;
use native_link_store::ac_utils::get_and_decode_digest;
use native_link_store::grpc_store::GrpcStore;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use native_link_util::common::DigestInfo;
use native_link_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
Expand All @@ -33,7 +34,6 @@ use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tonic::Request;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use error::{make_err, Code, Error, ResultExt};
use futures::stream::unfold;
use futures::TryFutureExt;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_util::common::log;
use native_link_util::retry::{ExponentialBackoff, Retrier, RetryResult};
use parking_lot::Mutex;
Expand All @@ -37,7 +38,6 @@ use tokio::sync::watch;
use tokio::time::sleep;
use tonic::{transport, Request, Streaming};

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
1 change: 0 additions & 1 deletion native-link-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod action_messages;
pub mod action_scheduler;
pub mod cache_lookup_scheduler;
pub mod default_scheduler_factory;
Expand Down
100 changes: 1 addition & 99 deletions native-link-scheduler/src/platform_property_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,109 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::HashMap;

use error::{make_input_err, Code, Error, ResultExt};
use native_link_config::schedulers::PropertyType;
use proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform;

/// `PlatformProperties` helps manage the configuration of platform properties to
/// keys and types. The scheduler uses these properties to decide what jobs
/// can be assigned to different workers. For example, if a job states it needs
/// a specific key, it will never be run on a worker that does not have at least
/// all the platform property keys configured on the worker.
///
/// Additional rules may be applied based on `PlatfromPropertyValue`.
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct PlatformProperties {
pub properties: HashMap<String, PlatformPropertyValue>,
}

impl PlatformProperties {
#[must_use]
pub const fn new(map: HashMap<String, PlatformPropertyValue>) -> Self {
Self { properties: map }
}

/// Determines if the worker's `PlatformProperties` is satisfied by this struct.
#[must_use]
pub fn is_satisfied_by(&self, worker_properties: &Self) -> bool {
for (property, check_value) in &self.properties {
if let Some(worker_value) = worker_properties.properties.get(property) {
if !check_value.is_satisfied_by(worker_value) {
return false;
}
} else {
return false;
}
}
true
}
}

impl From<ProtoPlatform> for PlatformProperties {
fn from(platform: ProtoPlatform) -> Self {
let mut properties = HashMap::with_capacity(platform.properties.len());
for property in platform.properties {
properties.insert(property.name, PlatformPropertyValue::Unknown(property.value));
}
Self { properties }
}
}

/// Holds the associated value of the key and type.
///
/// Exact - Means the worker must have this exact value.
/// Minimum - Means that workers must have at least this number available. When
/// a worker executes a task that has this value, the worker will have
/// this value subtracted from the available resources of the worker.
/// Priority - Means the worker is given this information, but does not restrict
/// what workers can take this value. However, the worker must have the
/// associated key present to be matched.
/// TODO(allada) In the future this will be used by the scheduler and
/// worker to cause the scheduler to prefer certain workers over others,
/// but not restrict them based on these values.
#[derive(Eq, PartialEq, Hash, Clone, Ord, PartialOrd, Debug)]
pub enum PlatformPropertyValue {
Exact(String),
Minimum(u64),
Priority(String),
Unknown(String),
}

impl PlatformPropertyValue {
/// Same as `PlatformProperties::is_satisfied_by`, but on an individual value.
#[must_use]
pub fn is_satisfied_by(&self, worker_value: &Self) -> bool {
if self == worker_value {
return true;
}
match self {
Self::Minimum(v) => {
if let Self::Minimum(worker_v) = worker_value {
return worker_v >= v;
}
false
}
// Priority is used to pass info to the worker and not restrict which
// workers can be selected, but might be used to prefer certain workers
// over others.
Self::Priority(_) => true,
// Success exact case is handled above.
Self::Exact(_) | Self::Unknown(_) => false,
}
}

pub fn as_str(&self) -> Cow<str> {
match self {
Self::Exact(value) => Cow::Borrowed(value),
Self::Minimum(value) => Cow::Owned(value.to_string()),
Self::Priority(value) => Cow::Borrowed(value),
Self::Unknown(value) => Cow::Borrowed(value),
}
}
}
use native_link_util::platform_properties::PlatformPropertyValue;

/// Helps manage known properties and conversion into `PlatformPropertyValue`.
pub struct PlatformPropertyManager {
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use error::{Error, ResultExt};
use native_link_config::schedulers::{PropertyModification, PropertyType};
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use parking_lot::Mutex;
use tokio::sync::watch;

use crate::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

Expand Down
9 changes: 5 additions & 4 deletions native-link-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@ use futures::Future;
use hashbrown::{HashMap, HashSet};
use lru::LruCache;
use native_link_config::schedulers::WorkerAllocationStrategy;
use native_link_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use native_link_util::common::log;
use native_link_util::metrics_utils::{
AsyncCounterWrapper, Collector, CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent, Registry,
};
use native_link_util::platform_properties::PlatformPropertyValue;
use parking_lot::{Mutex, MutexGuard};
use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;

use crate::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue};
use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerId, WorkerTimestamp, WorkerUpdate};
use crate::worker_scheduler::WorkerScheduler;

Expand Down
5 changes: 2 additions & 3 deletions native-link-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use error::{make_err, make_input_err, Code, Error, ResultExt};
use native_link_util::action_messages::ActionInfo;
use native_link_util::metrics_utils::{CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent};
use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
use proto::com::github::trace_machina::native_link::remote_execution::{
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
};
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;

use crate::action_messages::ActionInfo;
use crate::platform_property_manager::{PlatformProperties, PlatformPropertyValue};

pub type WorkerTimestamp = u64;

/// Unique id of worker.
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/src/worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::Error;
use native_link_util::action_messages::{ActionInfoHashKey, ActionStage};
use native_link_util::metrics_utils::Registry;

use crate::action_messages::{ActionInfoHashKey, ActionStage};
use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerId, WorkerTimestamp};

Expand Down
4 changes: 2 additions & 2 deletions native-link-scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use error::Error;
use native_link_scheduler::action_messages::{
use native_link_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
};
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_util::common::DigestInfo;
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;
use proto::build::bazel::remote::execution::v2::ExecuteResponse;
use proto::google::longrunning::{operation, Operation};
use proto::google::rpc::Status;
Expand Down
4 changes: 1 addition & 3 deletions native-link-scheduler/tests/cache_lookup_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ mod utils {

use error::{Error, ResultExt};
use futures::join;
use native_link_scheduler::action_messages::{
ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo,
};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::cache_lookup_scheduler::CacheLookupScheduler;
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_store::memory_store::MemoryStore;
use native_link_util::action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo};
use native_link_util::common::DigestInfo;
use native_link_util::store_trait::Store;
use prost::Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ mod utils {
use error::Error;
use futures::join;
use native_link_config::schedulers::{PlatformPropertyAddition, PropertyModification, PropertyType};
use native_link_scheduler::action_messages::{ActionInfoHashKey, ActionStage, ActionState};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::{PlatformPropertyManager, PlatformPropertyValue};
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_scheduler::property_modifier_scheduler::PropertyModifierScheduler;
use native_link_util::action_messages::{ActionInfoHashKey, ActionStage, ActionState};
use native_link_util::common::DigestInfo;
use native_link_util::platform_properties::PlatformPropertyValue;
use tokio::sync::watch;
use utils::mock_scheduler::MockActionScheduler;
use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME};
Expand Down
6 changes: 3 additions & 3 deletions native-link-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use error::{make_err, Code, Error, ResultExt};
use native_link_scheduler::action_messages::{
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_util::action_messages::{
ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath,
SymlinkInfo, INTERNAL_ERROR_EXIT_CODE,
};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::{PlatformProperties, PlatformPropertyValue};
use native_link_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
mod utils {
pub(crate) mod scheduler_utils;
}
Expand Down
2 changes: 1 addition & 1 deletion native-link-scheduler/tests/utils/mock_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use error::{make_input_err, Error};
use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::PlatformPropertyManager;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use tokio::sync::{mpsc, watch, Mutex};

#[allow(clippy::large_enum_variant)]
Expand Down
4 changes: 2 additions & 2 deletions native-link-scheduler/tests/utils/scheduler_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey};
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey};
use native_link_util::common::DigestInfo;
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;

pub const INSTANCE_NAME: &str = "foobar_instance_name";

Expand Down
4 changes: 2 additions & 2 deletions native-link-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use error::{make_input_err, Error, ResultExt};
use futures::{Stream, StreamExt};
use native_link_config::cas_server::{ExecutionConfig, InstanceName};
use native_link_scheduler::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_scheduler::action_scheduler::ActionScheduler;
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_store::ac_utils::get_and_decode_digest;
use native_link_store::store_manager::StoreManager;
use native_link_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use native_link_util::common::{log, DigestInfo};
use native_link_util::digest_hasher::DigestHasherFunc;
use native_link_util::platform_properties::PlatformProperties;
use native_link_util::store_trait::Store;
use proto::build::bazel::remote::execution::v2::execution_server::{Execution, ExecutionServer as Server};
use proto::build::bazel::remote::execution::v2::{Action, Command, ExecuteRequest, WaitExecutionRequest};
Expand Down
4 changes: 2 additions & 2 deletions native-link-service/src/worker_api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use error::{make_err, Code, Error, ResultExt};
use futures::stream::unfold;
use futures::Stream;
use native_link_config::cas_server::WorkerApiConfig;
use native_link_scheduler::action_messages::ActionInfoHashKey;
use native_link_scheduler::platform_property_manager::PlatformProperties;
use native_link_scheduler::worker::{Worker, WorkerId};
use native_link_scheduler::worker_scheduler::WorkerScheduler;
use native_link_util::action_messages::ActionInfoHashKey;
use native_link_util::common::{log, DigestInfo};
use native_link_util::platform_properties::PlatformProperties;
use proto::com::github::trace_machina::native_link::remote_execution::worker_api_server::{
WorkerApi, WorkerApiServer as Server,
};
Expand Down
Loading
Loading