Skip to content

Commit

Permalink
Configuration is now loaded from json
Browse files Browse the repository at this point in the history
We now use a json5 configuration file to host the configuration
for the services.
  • Loading branch information
allada committed Jan 18, 2021
1 parent b09153b commit 99d170c
Show file tree
Hide file tree
Showing 23 changed files with 860 additions and 251 deletions.
97 changes: 74 additions & 23 deletions cas/cas_main.rs
@@ -1,39 +1,90 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use futures::future::{select_all, BoxFuture};
use json5;
use runfiles::Runfiles;
use tonic::transport::Server;

use ac_server::AcServer;
use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use config::cas_server::CasConfig;
use error::ResultExt;
use execution_server::ExecutionServer;
use store::{StoreConfig, StoreType};
use store::StoreManager;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();

let addr = "0.0.0.0:50051".parse()?;

let ac_store = store::create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let cas_store = store::create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});

Server::builder()
.add_service(AcServer::new(ac_store, cas_store.clone()).into_service())
.add_service(CasServer::new(cas_store.clone()).into_service())
.add_service(CapabilitiesServer::default().into_service())
.add_service(ExecutionServer::default().into_service())
.add_service(ByteStreamServer::new(cas_store).into_service())
.serve(addr)
.await?;

Ok(())
let r = Runfiles::create().err_tip(|| "Failed to create runfiles lookup object")?;
let contents = String::from_utf8(tokio::fs::read(r.rlocation("rust_cas/config/examples/basic_cas.json")).await?)?;

let cfg: CasConfig = json5::from_str(&contents)?;

let mut store_manager = StoreManager::new();
for (name, store_cfg) in cfg.stores {
store_manager
.make_store(&name, &store_cfg)
.err_tip(|| format!("Failed to create store '{}'", name))?;
}

let mut servers: Vec<BoxFuture<Result<(), tonic::transport::Error>>> = Vec::new();
for server_cfg in cfg.servers {
let mut server = Server::builder();
let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?;

let server = server
.add_optional_service(
services
.ac
.map_or(Ok(None), |cfg| {
AcServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create AC service")?,
)
.add_optional_service(
services
.cas
.map_or(Ok(None), |cfg| {
CasServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create CAS service")?,
)
.add_optional_service(
services
.execution
.map_or(Ok(None), |cfg| {
ExecutionServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create Execution service")?,
)
.add_optional_service(
services
.bytestream
.map_or(Ok(None), |cfg| {
ByteStreamServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create ByteStream service")?,
)
.add_optional_service(
services
.capabilities
.map_or(Ok(None), |cfg| {
CapabilitiesServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
})
.err_tip(|| "Could not create Capabilities service")?,
);

let addr = server_cfg.listen_address.parse()?;
servers.push(Box::pin(server.serve(addr)));
}

if let Err(e) = select_all(servers).await.0 {
panic!(e);
}
panic!("No servers should ever resolve their future");
}
80 changes: 49 additions & 31 deletions cas/grpc_service/BUILD
@@ -1,15 +1,18 @@
load("@io_bazel_rules_rust//rust:rust.bzl", "rust_library", "rust_test", "rust_binary")
# Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

load("@io_bazel_rules_rust//rust:rust.bzl", "rust_library", "rust_test")

rust_library(
name = "cas_server",
srcs = ["cas_server.rs"],
deps = [
"//cas/store",
"//config",
"//proto",
"//third_party:tonic",
"//third_party:tokio",
"//third_party:futures",
"//third_party:stdext",
"//cas/store",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
],
Expand All @@ -20,14 +23,15 @@ rust_library(
name = "ac_server",
srcs = ["ac_server.rs"],
deps = [
"//proto",
"//cas/store",
"//util:error",
"//util:common",
"//third_party:stdext",
"//config",
"//proto",
"//third_party:prost",
"//third_party:tonic",
"//third_party:stdext",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -36,8 +40,11 @@ rust_library(
name = "capabilities_server",
srcs = ["capabilities_server.rs"],
deps = [
"//cas/store",
"//config",
"//proto",
"//third_party:tonic",
"//util:error",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -46,15 +53,16 @@ rust_library(
name = "bytestream_server",
srcs = ["bytestream_server.rs"],
deps = [
"//cas/store",
"//config",
"//proto",
"//third_party:drop_guard",
"//third_party:futures",
"//third_party:tokio",
"//third_party:tonic",
"//util:async_fixed_buffer",
"//util:common",
"//util:error",
"//cas/store",
"//third_party:drop_guard",
"//third_party:tokio",
"//third_party:futures",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -63,10 +71,13 @@ rust_library(
name = "execution_server",
srcs = ["execution_server.rs"],
deps = [
"//cas/store",
"//config",
"//proto",
"//third_party:tonic",
"//third_party:stdext",
"//third_party:futures",
"//third_party:stdext",
"//third_party:tonic",
"//util:error",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -75,45 +86,52 @@ rust_test(
name = "cas_server_test",
srcs = ["tests/cas_server_test.rs"],
deps = [
":cas_server",
"//cas/store",
"//util:error",
"//util:common",
"//config",
"//proto",
"//third_party:tonic",
"//third_party:tokio",
"//third_party:maplit",
"//third_party:pretty_assertions",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":cas_server",
],
)

rust_test(
name = "ac_server_test",
srcs = ["tests/ac_server_test.rs"],
deps = [
":ac_server",
"//cas/store",
"//config",
"//proto",
"//util:common",
"//third_party:tonic",
"//third_party:tokio",
"//third_party:prost",
"//third_party:maplit",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":ac_server",
],
)

rust_test(
name = "bytestream_server_test",
srcs = ["tests/bytestream_server_test.rs"],
deps = [
":bytestream_server",
"//cas/store",
"//config",
"//proto",
"//util:error",
"//util:common",
"//third_party:tonic",
"//third_party:bytes",
"//third_party:tokio",
"//third_party:prost",
"//third_party:maplit",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":bytestream_server",
],
)
26 changes: 19 additions & 7 deletions cas/grpc_service/ac_server.rs
@@ -1,5 +1,6 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::convert::TryInto;
use std::io::Cursor;
use std::pin::Pin;
Expand All @@ -15,20 +16,31 @@ use proto::build::bazel::remote::execution::v2::{
};

use common::{log, DigestInfo};
use error::{Code, Error, ResultExt};
use store::Store;
use config::cas_server::{AcStoreConfig, InstanceName};
use error::{make_input_err, Code, Error, ResultExt};
use store::{Store, StoreManager};

pub struct AcServer {
ac_store: Arc<dyn Store>,
_cas_store: Arc<dyn Store>,
}

impl AcServer {
pub fn new(ac_store: Arc<dyn Store>, cas_store: Arc<dyn Store>) -> Self {
AcServer {
ac_store: ac_store,
_cas_store: cas_store,
pub fn new(config: &HashMap<InstanceName, AcStoreConfig>, store_manager: &StoreManager) -> Result<Self, Error> {
for (_instance_name, ac_cfg) in config {
let ac_store = store_manager
.get_store(&ac_cfg.ac_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", ac_cfg.cas_store))?;
let cas_store = store_manager
.get_store(&ac_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", ac_cfg.cas_store))?;
// TODO(allada) We don't yet support instance_name.
return Ok(AcServer {
ac_store: ac_store.clone(),
_cas_store: cas_store.clone(),
});
}
Err(make_input_err!("No configuration configured for 'ac' service"))
}

pub fn into_service(self) -> Server<AcServer> {
Expand Down
34 changes: 21 additions & 13 deletions cas/grpc_service/bytestream_server.rs
@@ -1,5 +1,6 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -17,8 +18,9 @@ use proto::google::bytestream::{
};

use common::{log, DigestInfo};
use error::{error_if, Error, ResultExt};
use store::Store;
use config::cas_server::{ByteStreamConfig, InstanceName};
use error::{error_if, make_input_err, Error, ResultExt};
use store::{Store, StoreManager};

pub struct ByteStreamServer {
store: Arc<dyn Store>,
Expand All @@ -40,7 +42,6 @@ struct ReaderState {

impl ReaderState {
async fn shutdown(&mut self) -> Result<(), Error> {
// let mut me = Pin::new(&mut self);
self.was_shutdown = true;
// Close stream then wait for reader stream to finish.
(self.stream_closer)();
Expand All @@ -60,16 +61,23 @@ impl ReaderState {
type ReadStream = Pin<Box<dyn Stream<Item = Result<ReadResponse, Status>> + Send + Sync + 'static>>;

impl ByteStreamServer {
pub fn new(store: Arc<dyn Store>) -> Self {
ByteStreamServer {
store: store,
// TODO(allada) Make this configurable.
// This value was choosen only because it is a common mem page size.
write_buffer_stream_size: 2 << 20, // 2Mb.
read_buffer_stream_size: 2 << 20, // 2Mb.
// According to https://github.com/grpc/grpc.github.io/issues/371 16KiB - 64KiB is optimal.
max_bytes_per_stream: 2 << 15, // 64Kb.
pub fn new(config: &HashMap<InstanceName, ByteStreamConfig>, store_manager: &StoreManager) -> Result<Self, Error> {
for (_instance_name, bytestream_cfg) in config {
let store = store_manager
.get_store(&bytestream_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", bytestream_cfg.cas_store))?;
// TODO(allada) We don't yet support instance_name.
return Ok(ByteStreamServer {
store: store.clone(),
// TODO(allada) Make this configurable.
// This value was choosen only because it is a common mem page size.
write_buffer_stream_size: 2 << 20, // 2Mb.
read_buffer_stream_size: 2 << 20, // 2Mb.
// According to https://github.com/grpc/grpc.github.io/issues/371 16KiB - 64KiB is optimal.
max_bytes_per_stream: 2 << 15, // 64Kb.
});
}
Err(make_input_err!("No configuration configured for 'cas' service"))
}

pub fn into_service(self) -> Server<ByteStreamServer> {
Expand Down

0 comments on commit 99d170c

Please sign in to comment.