Skip to content

Commit

Permalink
Add more basic scheduler support
Browse files Browse the repository at this point in the history
Introduces most of the basic primitives for the scheduler and
some of the logic with matching actions to workers along with
a few tests.
  • Loading branch information
allada committed Nov 26, 2021
1 parent c91f61e commit 2edf514
Show file tree
Hide file tree
Showing 18 changed files with 1,685 additions and 102 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.51"
fixed-buffer = "0.2.3"
futures = "0.3.17"
tokio = { version = "1.13.0", features = ["macros", "io-util", "fs", "rt-multi-thread"] }
tokio-stream = { version = "0.1.8", features = ["fs"] }
tokio-stream = { version = "0.1.8", features = ["fs", "sync"] }
tokio-util = { version = "0.6.9", features = ["io", "io-util"] }
tonic = "0.6.1"
lazy-init = "0.5.0"
Expand Down
3 changes: 3 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,16 @@ rust_library(
name = "execution_server",
srcs = ["execution_server.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:scheduler",
"//cas/store",
"//config",
"//proto",
"//third_party:futures",
"//third_party:rand",
"//third_party:stdext",
"//third_party:tokio_stream",
"//third_party:tonic",
"//util:common",
"//util:error",
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Capabilities for CapabilitiesServer {
}),
high_api_version: Some(SemVer {
major: 2,
minor: 1,
minor: 2,
patch: 0,
prerelease: "".to_string(),
}),
Expand Down
122 changes: 102 additions & 20 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,99 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant, SystemTime};

use futures::Stream;
use futures::{Stream, StreamExt};
use rand::{thread_rng, Rng};
use tokio_stream::wrappers::WatchStream;
use tonic::{Request, Response, Status};

use ac_utils::get_and_decode_digest;
use action_messages::ActionInfo;
use common::{log, DigestInfo};
use config::cas_server::{CapabilitiesConfig, ExecutionConfig, InstanceName};
use error::{make_input_err, Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use platform_property_manager::{PlatformProperties, PlatformPropertyManager};
use proto::build::bazel::remote::execution::v2::{
execution_server::Execution, execution_server::ExecutionServer as Server, Action, ExecuteRequest,
WaitExecutionRequest,
};
use proto::google::longrunning::Operation;
use scheduler::Scheduler;
use store::StoreManager;
use store::{Store, StoreManager};

/// Default priority remote execution jobs will get when not provided.
const DEFAULT_EXECUTION_PRIORITY: i64 = 0;

struct InstanceInfo {
scheduler: Scheduler,
cas_store: Arc<dyn Store>,
platform_property_manager: PlatformPropertyManager,
}

impl InstanceInfo {
fn cas_pin<'a>(&'a self) -> Pin<&'a dyn Store> {
Pin::new(self.cas_store.as_ref())
}

async fn build_action_info(
&self,
instance_name: String,
action_digest: DigestInfo,
action: &Action,
priority: i64,
) -> Result<ActionInfo, Error> {
let command_digest = DigestInfo::try_from(
action
.command_digest
.clone()
.err_tip(|| "Expected command_digest to exist")?,
)
.err_tip(|| "Could not decode command digest")?;

let input_root_digest = DigestInfo::try_from(
action
.clone()
.input_root_digest
.err_tip(|| "Expected input_digest_root")?,
)?;
let timeout = action
.timeout
.clone()
.map(|v| Duration::new(v.seconds as u64, v.nanos as u32))
.unwrap_or(Duration::MAX);

let mut platform_properties = HashMap::new();
if let Some(platform) = &action.platform {
for property in &platform.properties {
let platform_property = self
.platform_property_manager
.make_prop_value(&property.name, &property.value)
.err_tip(|| "Failed to convert platform property in queue_action")?;
platform_properties.insert(property.name.clone(), platform_property);
}
}

Ok(ActionInfo {
instance_name,
digest: action_digest,
command_digest,
input_root_digest,
timeout,
platform_properties: PlatformProperties::new(platform_properties),
priority,
insert_timestamp: SystemTime::now(),
salt: if action.do_not_cache {
thread_rng().gen::<u64>()
} else {
0
},
})
}
}

pub struct ExecutionServer {
schedulers: HashMap<InstanceName, Arc<Scheduler>>,
instance_infos: HashMap<InstanceName, InstanceInfo>,
}

type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
Expand All @@ -33,7 +106,7 @@ impl ExecutionServer {
capabilities_config: &HashMap<InstanceName, CapabilitiesConfig>,
store_manager: &StoreManager,
) -> Result<Self, Error> {
let mut schedulers = HashMap::with_capacity(config.len());
let mut instance_infos = HashMap::with_capacity(config.len());
for (instance_name, exec_cfg) in config {
let capabilities_cfg = capabilities_config
.get(instance_name)
Expand All @@ -42,22 +115,22 @@ impl ExecutionServer {
.get_store(&exec_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store))?
.clone();
let ac_store = store_manager
.get_store(&exec_cfg.ac_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", exec_cfg.ac_store))?
.clone();
let platform_property_manager = PlatformPropertyManager::new(
capabilities_cfg
.supported_platform_properties
.clone()
.unwrap_or(HashMap::new()),
);
schedulers.insert(
instance_infos.insert(
instance_name.to_string(),
Arc::new(Scheduler::new(exec_cfg, platform_property_manager, ac_store, cas_store)),
InstanceInfo {
scheduler: Scheduler::new(),
cas_store,
platform_property_manager,
},
);
}
Ok(Self { schedulers })
Ok(Self { instance_infos })
}

pub fn into_service(self) -> Server<ExecutionServer> {
Expand All @@ -68,8 +141,8 @@ impl ExecutionServer {
let execute_req = request.into_inner();
let instance_name = execute_req.instance_name;

let scheduler = self
.schedulers
let instance_info = self
.instance_infos
.get(&instance_name)
.err_tip(|| "Instance name '{}' not configured")?;

Expand All @@ -80,14 +153,23 @@ impl ExecutionServer {
)
.err_tip(|| "Failed to unwrap action cache")?;

let action = get_and_decode_digest::<Action>(&scheduler.cas_pin(), &digest).await?;
let priority = execute_req
.execution_policy
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority as i64);

let action = get_and_decode_digest::<Action>(&instance_info.cas_pin(), &digest).await?;
let action_info = instance_info
.build_action_info(instance_name, digest, &action, priority)
.await?;

scheduler
.queue_action(&action)
let rx = instance_info
.scheduler
.add_action(action_info)
.await
.err_tip(|| "Failed to queue operation")?;
.err_tip(|| "Failed to schedule task")?;

Err(make_input_err!(""))
let receiver_stream = Box::pin(WatchStream::new(rx).map(|action_update| Ok(action_update.as_ref().into())));
Ok(tonic::Response::new(receiver_stream))
}
}

Expand Down
50 changes: 47 additions & 3 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,57 @@ rust_library(
name = "scheduler",
srcs = ["scheduler.rs"],
deps = [
"//cas/grpc_service:ac_utils",
"//config",
"//third_party:fast_async_mutex",
"//third_party:rand",
"//third_party:tokio",
"//util:common",
"//util:error",
":action_messages",
":worker",
],
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_library(
name = "worker",
srcs = ["worker.rs"],
deps = [
"//proto",
"//third_party:tokio",
"//util:error",
":action_messages",
":platform_property_manager",
],
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_library(
name = "action_messages",
srcs = ["action_messages.rs"],
deps = [
"//proto",
"//third_party:prost",
"//third_party:prost_types",
"//util:common",
"//util:error",
":platform_property_manager",
],
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)

rust_test(
name = "scheduler_test",
srcs = ["tests/scheduler_test.rs"],
deps = [
"//proto",
"//cas/store",
"//third_party:pretty_assertions",
"//third_party:tokio",
"//util:common",
"//util:error",
":action_messages",
":platform_property_manager",
":scheduler",
":worker",
],
visibility = ["//cas:__pkg__", "//cas:__subpackages__"]
)
Loading

0 comments on commit 2edf514

Please sign in to comment.