Skip to content

Commit

Permalink
Add RefStore and restructure StoreManager
Browse files Browse the repository at this point in the history
Adds new RefStore. This store is used so you can reference another
store in the json / StoreManager by name. This will allow users
to reference the same store from multiple places.

Also restructures StoreManager to allow for this new functionality.
  • Loading branch information
allada committed Apr 8, 2022
1 parent 5cc9a09 commit 6795bb0
Show file tree
Hide file tree
Showing 17 changed files with 448 additions and 133 deletions.
1 change: 1 addition & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_binary(
"//cas/grpc_service:cas_server",
"//cas/grpc_service:execution_server",
"//cas/store",
"//cas/store:default_store_factory",
"//config",
"//third_party:clap",
"//third_party:env_logger",
Expand Down
14 changes: 9 additions & 5 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::sync::Arc;

use clap::Parser;
use futures::future::{select_all, BoxFuture};
Expand All @@ -13,6 +14,7 @@ use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use config::cas_server::CasConfig;
use default_store_factory::store_factory;
use error::ResultExt;
use execution_server::ExecutionServer;
use store::StoreManager;
Expand Down Expand Up @@ -54,12 +56,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let json_contents = String::from_utf8(tokio::fs::read(config_file).await?)?;
let cfg: CasConfig = json5::from_str(&json_contents)?;

let mut store_manager = StoreManager::new();
let store_manager = Arc::new(StoreManager::new());
for (name, store_cfg) in cfg.stores {
store_manager
.make_store(&name, &store_cfg)
.await
.err_tip(|| format!("Failed to create store '{}'", name))?;
store_manager.add_store(
&name,
store_factory(&store_cfg, &store_manager)
.await
.err_tip(|| format!("Failed to create store '{}'", name))?,
);
}

let mut servers: Vec<BoxFuture<Result<(), tonic::transport::Error>>> = Vec::new();
Expand Down
3 changes: 3 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ rust_test(
srcs = ["tests/cas_server_test.rs"],
deps = [
"//cas/store",
"//cas/store:default_store_factory",
"//config",
"//proto",
"//third_party:maplit",
Expand All @@ -124,6 +125,7 @@ rust_test(
srcs = ["tests/ac_server_test.rs"],
deps = [
"//cas/store",
"//cas/store:default_store_factory",
"//config",
"//proto",
"//third_party:bytes",
Expand All @@ -143,6 +145,7 @@ rust_test(
srcs = ["tests/bytestream_server_test.rs"],
deps = [
"//cas/store",
"//cas/store:default_store_factory",
"//config",
"//proto",
"//third_party:futures",
Expand Down
3 changes: 1 addition & 2 deletions cas/grpc_service/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ impl AcServer {
for (instance_name, ac_cfg) in config {
let store = store_manager
.get_store(&ac_cfg.ac_store)
.ok_or_else(|| make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store))?
.clone();
.ok_or_else(|| make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store))?;
stores.insert(instance_name.to_string(), store);
}
Ok(AcServer { stores: stores.clone() })
Expand Down
3 changes: 1 addition & 2 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl ByteStreamServer {
for (instance_name, store_name) in &config.cas_stores {
let store = store_manager
.get_store(&store_name)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", store_name))?
.clone();
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", store_name))?;
stores.insert(instance_name.to_string(), store);
}
Ok(ByteStreamServer {
Expand Down
3 changes: 1 addition & 2 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ impl CasServer {
for (instance_name, cas_cfg) in config {
let store = store_manager
.get_store(&cas_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store))?
.clone();
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store))?;
stores.insert(instance_name.to_string(), store);
}
Ok(CasServer { stores: stores })
Expand Down
3 changes: 1 addition & 2 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ impl ExecutionServer {
.err_tip(|| "Capabilities needs config for '{}' because it exists in execution")?;
let cas_store = store_manager
.get_store(&exec_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store))?
.clone();
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", exec_cfg.cas_store))?;
let platform_property_manager = PlatformPropertyManager::new(
capabilities_cfg
.supported_platform_properties
Expand Down
44 changes: 25 additions & 19 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::pin::Pin;
use std::sync::Arc;

use bytes::BytesMut;
use maplit::hashmap;
Expand All @@ -12,6 +13,7 @@ use proto::build::bazel::remote::execution::v2::{action_cache_server::ActionCach
use ac_server::AcServer;
use common::DigestInfo;
use config;
use default_store_factory::store_factory;
use error::Error;
use store::{Store, StoreManager};

Expand All @@ -31,24 +33,28 @@ async fn insert_into_store<T: Message>(
Ok(digest.size_bytes)
}

async fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager
.make_store(
"main_cas",
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
"main_cas",
store_factory(
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
&store_manager,
)
.await?;
store_manager
.make_store(
"main_ac",
.await?,
);
store_manager.add_store(
"main_ac",
store_factory(
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
&store_manager,
)
.await?;
.await?,
);
Ok(store_manager)
}

fn make_ac_server(store_manager: &mut StoreManager) -> Result<AcServer, Error> {
fn make_ac_server(store_manager: &StoreManager) -> Result<AcServer, Error> {
AcServer::new(
&hashmap! {
"foo_instance_name".to_string() => config::cas_server::AcStoreConfig{
Expand Down Expand Up @@ -83,8 +89,8 @@ mod get_action_result {

#[tokio::test]
async fn empty_store() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&store_manager)?;

let raw_response = get_action_result(&ac_server, HASH1, 0).await;

Expand All @@ -99,8 +105,8 @@ mod get_action_result {

#[tokio::test]
async fn has_single_item() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

let mut action_result = ActionResult::default();
Expand All @@ -117,8 +123,8 @@ mod get_action_result {

#[tokio::test]
async fn single_item_wrong_digest_size() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

let mut action_result = ActionResult::default();
Expand Down Expand Up @@ -169,8 +175,8 @@ mod update_action_result {

#[tokio::test]
async fn one_item_update_test() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

let mut action_result = ActionResult::default();
Expand Down
39 changes: 21 additions & 18 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,35 @@ use tonic::Request;

use common::DigestInfo;
use config;
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use store::StoreManager;

const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";

async fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager
.make_store(
"main_cas",
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
"main_cas",
store_factory(
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
&store_manager,
)
.await?;
.await?,
);
Ok(store_manager)
}

fn make_bytestream_server(store_manager: &mut StoreManager) -> Result<ByteStreamServer, Error> {
fn make_bytestream_server(store_manager: &StoreManager) -> Result<ByteStreamServer, Error> {
ByteStreamServer::new(
&config::cas_server::ByteStreamConfig {
cas_stores: hashmap! {
"foo_instance_name".to_string() => "main_cas".to_string(),
},
max_bytes_per_stream: 1024,
},
&store_manager,
store_manager,
)
}

Expand Down Expand Up @@ -89,8 +92,8 @@ pub mod write_tests {

#[tokio::test]
pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(store_manager.as_ref())?;
let store_owned = store_manager.get_store("main_cas").unwrap();

let store = Pin::new(store_owned.as_ref());
Expand Down Expand Up @@ -183,8 +186,8 @@ pub mod read_tests {

#[tokio::test]
pub async fn chunked_stream_reads_small_set_of_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(store_manager.as_ref())?;
let store_owned = store_manager.get_store("main_cas").unwrap();

let store = Pin::new(store_owned.as_ref());
Expand Down Expand Up @@ -223,8 +226,8 @@ pub mod read_tests {

#[tokio::test]
pub async fn chunked_stream_reads_10mb_of_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(store_manager.as_ref())?;
let store_owned = store_manager.get_store("main_cas").unwrap();

let store = Pin::new(store_owned.as_ref());
Expand Down Expand Up @@ -269,9 +272,9 @@ pub mod read_tests {
/// stream was never shutdown.
#[tokio::test]
pub async fn read_with_not_found_does_not_deadlock() -> Result<(), Error> {
let mut store_manager = make_store_manager().await.err_tip(|| "Couldn't get store manager")?;
let store_manager = make_store_manager().await.err_tip(|| "Couldn't get store manager")?;
let mut read_stream = {
let bs_server = make_bytestream_server(&mut store_manager).err_tip(|| "Couldn't make store")?;
let bs_server = make_bytestream_server(store_manager.as_ref()).err_tip(|| "Couldn't make store")?;
let read_request = ReadRequest {
resource_name: format!(
"{}/uploads/{}/blobs/{}/{}",
Expand Down Expand Up @@ -329,8 +332,8 @@ pub mod query_tests {

#[tokio::test]
pub async fn test_query_write_status_smoke_test() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager().await?;
let bs_server = Arc::new(make_bytestream_server(&mut store_manager)?);
let store_manager = make_store_manager().await?;
let bs_server = Arc::new(make_bytestream_server(store_manager.as_ref())?);

let raw_data = "12456789abcdefghijk".as_bytes();
let resource_name = format!(
Expand Down
Loading

0 comments on commit 6795bb0

Please sign in to comment.