Skip to content

Commit

Permalink
Add WorkerApiServer to services being served
Browse files Browse the repository at this point in the history
Adds WorkerApiServer so it can now be served over the GRPC
connections based on config file.
  • Loading branch information
allada committed Apr 13, 2022
1 parent e8ce3a8 commit af0ccc3
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 28 deletions.
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust_binary(
"//cas/grpc_service:capabilities_server",
"//cas/grpc_service:cas_server",
"//cas/grpc_service:execution_server",
"//cas/grpc_service:worker_api_server",
"//cas/scheduler",
"//cas/store",
"//cas/store:default_store_factory",
Expand Down
20 changes: 16 additions & 4 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use error::ResultExt;
use execution_server::ExecutionServer;
use scheduler::Scheduler;
use store::StoreManager;
use worker_api_server::WorkerApiServer;

const DEFAULT_CONFIG_FILE: &str = "<built-in example in config/examples/basic_cas.json>";

Expand All @@ -31,7 +32,7 @@ const DEFAULT_CONFIG_FILE: &str = "<built-in example in config/examples/basic_ca
long_about = None
)]
struct Args {
/// Config file to use
/// Config file to use.
#[clap(default_value = DEFAULT_CONFIG_FILE)]
config_file: String,
}
Expand Down Expand Up @@ -79,8 +80,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut server = Server::builder();
let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?;

let capabilities_config = services.capabilities.unwrap_or(HashMap::new());

let server = server
.add_optional_service(
services
Expand Down Expand Up @@ -115,7 +114,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.err_tip(|| "Could not create ByteStream service")?,
)
.add_optional_service(
CapabilitiesServer::new(&capabilities_config, &schedulers).and_then(|v| Ok(Some(v.into_service())))?,
services
.capabilities
.map_or(Ok(None), |cfg| {
CapabilitiesServer::new(&cfg, &schedulers).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create Capabilities service")?,
)
.add_optional_service(
services
.worker_api
.map_or(Ok(None), |cfg| {
WorkerApiServer::new(&cfg, &schedulers).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create WorkerApi service")?,
);

let addr = server_cfg.listen_address.parse()?;
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ rust_library(
"//cas/scheduler",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:worker",
"//config",
"//proto",
"//third_party:futures",
"//third_party:tokio",
Expand Down Expand Up @@ -127,7 +128,6 @@ rust_test(
srcs = ["tests/worker_api_server_test.rs"],
deps = [
"//cas/scheduler",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:worker",
"//config",
"//proto",
Expand Down
18 changes: 12 additions & 6 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::Request;

use config::cas_server::SchedulerConfig;
use config::cas_server::{SchedulerConfig, WorkerApiConfig};
use error::{Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use proto::com::github::allada::turbo_cache::remote_execution::{
update_for_worker, worker_api_server::WorkerApi, KeepAliveRequest, SupportedProperties,
};
Expand All @@ -32,16 +31,23 @@ fn static_now_fn() -> Result<Duration, Error> {
}

async fn setup_api_server(worker_timeout: u64, now_fn: NowFn) -> Result<TestContext, Error> {
let platform_properties = HashMap::new();
const SCHEDULER_NAME: &str = "DUMMY_SCHEDULE_NAME";

let scheduler = Arc::new(Scheduler::new(&SchedulerConfig {
worker_timeout_s: worker_timeout,
..Default::default()
}));

let mut schedulers = HashMap::new();
schedulers.insert(SCHEDULER_NAME.to_string(), scheduler.clone());
let worker_api_server = WorkerApiServer::new_with_now_fn(
Arc::new(PlatformPropertyManager::new(platform_properties)),
scheduler.clone(),
&WorkerApiConfig {
scheduler: SCHEDULER_NAME.to_string(),
},
&schedulers,
now_fn,
);
)
.err_tip(|| "Error creating WorkerApiServer")?;

let supported_properties = SupportedProperties::default();
let mut connection_worker_stream = worker_api_server
Expand Down
43 changes: 27 additions & 16 deletions cas/grpc_service/worker_api_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2022 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -10,11 +11,12 @@ use tonic::{Request, Response, Status};
use uuid::Uuid;

use common::log;
use config::cas_server::WorkerApiConfig;
use error::{make_err, Code, Error, ResultExt};
use platform_property_manager::{PlatformProperties, PlatformPropertyManager};
use platform_property_manager::PlatformProperties;
use proto::com::github::allada::turbo_cache::remote_execution::{
worker_api_server::WorkerApi, ExecuteResult, GoingAwayRequest, KeepAliveRequest, SupportedProperties,
UpdateForWorker,
worker_api_server::WorkerApi, worker_api_server::WorkerApiServer as Server, ExecuteResult, GoingAwayRequest,
KeepAliveRequest, SupportedProperties, UpdateForWorker,
};
use scheduler::Scheduler;
use worker::{Worker, WorkerId};
Expand All @@ -24,16 +26,15 @@ pub type ConnectWorkerStream = Pin<Box<dyn Stream<Item = Result<UpdateForWorker,
pub type NowFn = Box<dyn Fn() -> Result<Duration, Error> + Send + Sync>;

pub struct WorkerApiServer {
platform_property_manager: Arc<PlatformPropertyManager>,
scheduler: Arc<Scheduler>,
now_fn: NowFn,
}

impl WorkerApiServer {
pub fn new(platform_property_manager: Arc<PlatformPropertyManager>, scheduler: Arc<Scheduler>) -> Self {
pub fn new(config: &WorkerApiConfig, schedulers: &HashMap<String, Arc<Scheduler>>) -> Result<Self, Error> {
Self::new_with_now_fn(
platform_property_manager,
scheduler,
config,
schedulers,
Box::new(move || {
SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -45,15 +46,24 @@ impl WorkerApiServer {
/// Same as new(), but you can pass a custom `now_fn`, that returns a Duration since UNIX_EPOCH
/// representing the current time. Used mostly in unit tests.
pub fn new_with_now_fn(
platform_property_manager: Arc<PlatformPropertyManager>,
scheduler: Arc<Scheduler>,
config: &WorkerApiConfig,
schedulers: &HashMap<String, Arc<Scheduler>>,
now_fn: NowFn,
) -> Self {
Self {
platform_property_manager,
scheduler,
now_fn,
}
) -> Result<Self, Error> {
let scheduler = schedulers
.get(&config.scheduler)
.err_tip(|| {
format!(
"Scheduler needs config for '{}' because it exists in worker_api",
config.scheduler
)
})?
.clone();
Ok(Self { scheduler, now_fn })
}

pub fn into_service(self) -> Server<WorkerApiServer> {
Server::new(self)
}

async fn inner_connect_worker(
Expand All @@ -67,7 +77,8 @@ impl WorkerApiServer {
let mut platform_properties = PlatformProperties::default();
for property in supported_properties.properties {
let platform_property_value = self
.platform_property_manager
.scheduler
.get_platform_property_manager()
.make_prop_value(&property.name, &property.value)
.err_tip(|| "Bad Property during connect_worker()")?;
platform_properties
Expand Down
15 changes: 15 additions & 0 deletions config/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ pub struct ByteStreamConfig {
pub max_bytes_per_stream: usize,
}

#[derive(Deserialize, Debug)]
pub struct WorkerApiConfig {
/// The scheduler name referenced in the `schedulers` map in the main config.
pub scheduler: SchedulerRefName,
}

#[derive(Deserialize, Debug)]
pub struct ServicesConfig {
/// The Content Addressable Storage (CAS) backend config.
Expand All @@ -146,6 +152,15 @@ pub struct ServicesConfig {
/// Bazel's protocol strongly encourages users to use this streaming
/// interface to interact with the CAS when the data is large.
pub bytestream: Option<ByteStreamConfig>,

/// This is the service used for workers to connect and communicate
/// through.
/// NOTE: This service should be served on a different, non-public port.
/// In other words, `worker_api` configuration should not have any other
/// services that are served on the same port. Doing so is a security
/// risk, as workers have a different permission set than a client
/// that makes the remote execution/cache requests.
pub worker_api: Option<WorkerApiConfig>,
}

#[derive(Deserialize, Debug)]
Expand Down
11 changes: 11 additions & 0 deletions config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,16 @@
"max_bytes_per_stream": 64000, // 64kb.
}
}
}, {
"listen_address": "0.0.0.0:50061",
"services": {
// Note: This should be served on a different port, because it has
// a different permission set than the other services.
// In other words, this service is a backend api. The ones above
// are a frontend api.
"worker_api": {
"scheduler": "MAIN_SCHEDULER",
}
}
}]
}
44 changes: 43 additions & 1 deletion config/examples/filesystem_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@
}
}
},
"schedulers": {
"MAIN_SCHEDULER": {
"supported_platform_properties": {
"cpu_count": "Minimum",
"memory_kb": "Minimum",
"network_kbps": "Minimum",
"disk_read_iops": "Minimum",
"disk_read_bps": "Minimum",
"disk_write_iops": "Minimum",
"disk_write_bps": "Minimum",
"shm_size": "Minimum",
"gpu_count": "Minimum",
"gpu_model": "Exact",
"cpu_vendor": "Exact",
"cpu_arch": "Exact",
"cpu_model": "Exact",
"kernel_version": "Exact",
"docker_image": "Metadata",
}
}
},
"servers": [{
"listen_address": "0.0.0.0:50051",
"services": {
Expand All @@ -103,8 +124,18 @@
"ac_store": "AC_MAIN_STORE"
}
},
"execution": {
"main": {
"cas_store": "CAS_MAIN_STORE",
"scheduler": "MAIN_SCHEDULER",
}
},
"capabilities": {
"main": {},
"main": {
"remote_execution": {
"scheduler": "MAIN_SCHEDULER",
}
}
},
"bytestream": {
"cas_stores": {
Expand All @@ -114,5 +145,16 @@
"max_bytes_per_stream": 64000, // 64kb.
}
}
}, {
"listen_address": "0.0.0.0:50061",
"services": {
// Note: This should be served on a different port, because it has
// a different permission set than the other services.
// In other words, this service is a backend api. The ones above
// are a frontend api.
"worker_api": {
"scheduler": "MAIN_SCHEDULER",
}
}
}]
}

0 comments on commit af0ccc3

Please sign in to comment.