diff --git a/cas/cas_main.rs b/cas/cas_main.rs index b323ec2d2..e3ee3e222 100644 --- a/cas/cas_main.rs +++ b/cas/cas_main.rs @@ -7,14 +7,20 @@ use bytestream_server::ByteStreamServer; use capabilities_server::CapabilitiesServer; use cas_server::CasServer; use execution_server::ExecutionServer; -use store; +use store::{StoreConfig, StoreType}; #[tokio::main] async fn main() -> Result<(), Box> { let addr = "0.0.0.0:50051".parse()?; - let ac_store = store::create_store(&store::StoreType::Memory); - let cas_store = store::create_store(&store::StoreType::Memory); + let ac_store = store::create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + let cas_store = store::create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); Server::builder() .add_service(AcServer::new(ac_store, cas_store.clone()).into_service()) diff --git a/cas/grpc_service/ac_server.rs b/cas/grpc_service/ac_server.rs index 1a4f67cb4..36d21debf 100644 --- a/cas/grpc_service/ac_server.rs +++ b/cas/grpc_service/ac_server.rs @@ -1,6 +1,5 @@ // Copyright 2020 Nathan (Blaise) Bruer. All rights reserved. -use std::convert::TryFrom; use std::convert::TryInto; use std::io::Cursor; use std::sync::Arc; @@ -14,7 +13,7 @@ use proto::build::bazel::remote::execution::v2::{ }; use common::DigestInfo; -use error::{error_if, make_err, Code, Error, ResultExt}; +use error::{make_err, Code, Error, ResultExt}; use store::Store; pub struct AcServer { @@ -82,24 +81,16 @@ impl AcServer { .err_tip(|| "Action digest was not set in message")? .try_into()?; - let size_bytes = usize::try_from(digest.size_bytes) - .err_tip(|| "Digest size_bytes was not convertable to usize")?; - let action_result = update_action_request .action_result .err_tip(|| "Action result was not set in message")?; // TODO(allada) There is a security risk here of someone taking all the memory on the instance. - let mut store_data = Vec::with_capacity(size_bytes); + let mut store_data = Vec::new(); action_result .encode(&mut store_data) .err_tip(|| "Provided ActionResult could not be serialized")?; - error_if!( - store_data.len() != size_bytes, - "Digest size does not match. Actual: {} Expected: {} ", - store_data.len(), - size_bytes - ); + self.ac_store .update(&digest, Box::new(Cursor::new(store_data))) .await?; @@ -113,9 +104,9 @@ impl ActionCache for AcServer { &self, grpc_request: Request, ) -> Result, Status> { - println!("get_action_result Req: {:?}", grpc_request); + println!("\x1b[0;31mget_action_result Req\x1b[0m: {:?}", grpc_request); let resp = self.inner_get_action_result(grpc_request).await; - println!("get_action_result Resp: {:?}", resp); + println!("\x1b[0;31mget_action_result Resp\x1b[0m: {:?}", resp); return resp.map_err(|e| e.into()); } @@ -123,9 +114,12 @@ impl ActionCache for AcServer { &self, grpc_request: Request, ) -> Result, Status> { - println!("update_action_result Req: {:?}", grpc_request); + println!( + "\x1b[0;31mupdate_action_result Req\x1b[0m: {:?}", + grpc_request + ); let resp = self.inner_update_action_result(grpc_request).await; - println!("update_action_result Resp: {:?}", resp); + println!("\x1b[0;31mupdate_action_result Resp\x1b[0m: {:?}", resp); return resp.map_err(|e| e.into()); } } diff --git a/cas/grpc_service/bytestream_server.rs b/cas/grpc_service/bytestream_server.rs index f641e5abb..15ff27498 100644 --- a/cas/grpc_service/bytestream_server.rs +++ b/cas/grpc_service/bytestream_server.rs @@ -126,7 +126,6 @@ impl<'a> ResourceInfo<'a> { struct WriteRequestStreamWrapper { stream: Streaming, current_msg: WriteRequest, - original_resource_name: String, hash: String, expected_size: usize, is_first: bool, @@ -141,15 +140,13 @@ impl WriteRequestStreamWrapper { .err_tip(|| "Error receiving first message in stream")? .err_tip(|| "Expected WriteRequest struct in stream")?; - let original_resource_name = current_msg.resource_name.clone(); - let resource_info = ResourceInfo::new(&original_resource_name) + let resource_info = ResourceInfo::new(¤t_msg.resource_name) .err_tip(|| "Could not extract resource info from first message of stream")?; let hash = resource_info.hash.to_string(); let expected_size = resource_info.expected_size; Ok(WriteRequestStreamWrapper { stream, current_msg, - original_resource_name, hash, expected_size, is_first: true, @@ -186,13 +183,6 @@ impl WriteRequestStreamWrapper { .err_tip(|| "Expected WriteRequest struct in stream")?; self.bytes_received += self.current_msg.data.len(); - error_if!( - self.original_resource_name != self.current_msg.resource_name, - "Resource name missmatch, expected {} got {}", - self.original_resource_name, - self.current_msg.resource_name - ); - Ok(Some(&self.current_msg)) } } @@ -205,7 +195,7 @@ impl ByteStream for ByteStreamServer { &self, _grpc_request: Request, ) -> Result, Status> { - println!("read {:?}", _grpc_request); + println!("\x1b[0;31mread\x1b[0m {:?}", _grpc_request); Err(Status::unimplemented("")) } @@ -219,7 +209,7 @@ impl ByteStream for ByteStreamServer { .await .err_tip(|| format!("Failed on write() command")) .map_err(|e| e.into()); - println!("Write Resp: {:?}", resp); + println!("\x1b[0;31mWrite Resp\x1b[0m: {:?}", resp); resp } diff --git a/cas/grpc_service/tests/ac_server_test.rs b/cas/grpc_service/tests/ac_server_test.rs index dda361a56..1d0993009 100644 --- a/cas/grpc_service/tests/ac_server_test.rs +++ b/cas/grpc_service/tests/ac_server_test.rs @@ -11,7 +11,7 @@ use proto::build::bazel::remote::execution::v2::{ use ac_server::AcServer; use common::DigestInfo; -use store::{create_store, Store, StoreType}; +use store::{create_store, Store, StoreConfig, StoreType}; const INSTANCE_NAME: &str = "foo"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -58,8 +58,17 @@ mod get_action_results { #[tokio::test] async fn empty_store() -> Result<(), Box> { - let ac_store = create_store(&StoreType::Memory); - let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory)); + let ac_store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + let ac_server = AcServer::new( + ac_store.clone(), + create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }), + ); let raw_response = get_action_result(&ac_server, HASH1, 0).await; @@ -74,8 +83,17 @@ mod get_action_results { #[tokio::test] async fn has_single_item() -> Result<(), Box> { - let ac_store = create_store(&StoreType::Memory); - let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory)); + let ac_store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + let ac_server = AcServer::new( + ac_store.clone(), + create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }), + ); let mut action_result = ActionResult::default(); action_result.exit_code = 45; @@ -94,8 +112,17 @@ mod get_action_results { #[tokio::test] async fn single_item_wrong_digest_size() -> Result<(), Box> { - let ac_store = create_store(&StoreType::Memory); - let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory)); + let ac_store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + let ac_server = AcServer::new( + ac_store.clone(), + create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }), + ); let mut action_result = ActionResult::default(); action_result.exit_code = 45; @@ -141,8 +168,17 @@ mod update_action_result { #[tokio::test] async fn one_item_update_test() -> Result<(), Box> { - let ac_store = create_store(&StoreType::Memory); - let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory)); + let ac_store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + let ac_server = AcServer::new( + ac_store.clone(), + create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }), + ); let mut action_result = ActionResult::default(); action_result.exit_code = 45; diff --git a/cas/grpc_service/tests/bytestream_server_test.rs b/cas/grpc_service/tests/bytestream_server_test.rs index b4ac8517b..e00f1bc37 100644 --- a/cas/grpc_service/tests/bytestream_server_test.rs +++ b/cas/grpc_service/tests/bytestream_server_test.rs @@ -7,7 +7,7 @@ use bytestream_server::ByteStreamServer; use tonic::Request; use common::DigestInfo; -use store::{create_store, StoreType}; +use store::{create_store, StoreConfig, StoreType}; const INSTANCE_NAME: &str = "foo"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -60,7 +60,10 @@ pub mod write_tests { #[tokio::test] pub async fn chunked_stream_receives_all_data() -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let bs_server = ByteStreamServer::new(store.clone()); // Setup stream. diff --git a/cas/grpc_service/tests/cas_server_test.rs b/cas/grpc_service/tests/cas_server_test.rs index 455d2a396..ae9003233 100644 --- a/cas/grpc_service/tests/cas_server_test.rs +++ b/cas/grpc_service/tests/cas_server_test.rs @@ -9,7 +9,7 @@ use proto::google::rpc::Status as GrpcStatus; use cas_server::CasServer; use common::DigestInfo; -use store::{create_store, StoreType}; +use store::{create_store, StoreConfig, StoreType}; const INSTANCE_NAME: &str = "foo"; const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; @@ -28,7 +28,10 @@ mod find_missing_blobs { #[tokio::test] async fn empty_store() { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); let raw_response = cas_server @@ -47,7 +50,10 @@ mod find_missing_blobs { #[tokio::test] async fn store_one_item_existence() -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); const VALUE: &str = "1"; @@ -75,7 +81,10 @@ mod find_missing_blobs { #[tokio::test] async fn has_three_requests_one_bad_hash() -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); const VALUE: &str = "1"; @@ -129,7 +138,10 @@ mod batch_update_blobs { #[tokio::test] async fn update_existing_item() -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); const VALUE1: &str = "1"; @@ -203,7 +215,10 @@ mod batch_read_blobs { #[tokio::test] async fn batch_read_blobs_read_two_blobs_success_one_fail( ) -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); const VALUE1: &str = "1"; @@ -299,7 +314,10 @@ mod end_to_end { #[tokio::test] async fn batch_update_blobs_two_items_existence_with_third_missing( ) -> Result<(), Box> { - let store = create_store(&StoreType::Memory); + let store = create_store(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); let cas_server = CasServer::new(store.clone()); const VALUE1: &str = "1"; diff --git a/cas/store/lib.rs b/cas/store/lib.rs index fc16c2c06..d73fbbfbb 100644 --- a/cas/store/lib.rs +++ b/cas/store/lib.rs @@ -2,16 +2,12 @@ use std::sync::Arc; -pub use traits::StoreTrait as Store; +pub use traits::{StoreConfig, StoreTrait as Store, StoreType}; use memory_store::MemoryStore; -pub enum StoreType { - Memory, -} - -pub fn create_store(store_type: &StoreType) -> Arc { - match store_type { - StoreType::Memory => Arc::new(MemoryStore::new()), +pub fn create_store(config: &StoreConfig) -> Arc { + match config.store_type { + StoreType::Memory => Arc::new(MemoryStore::new(&config)), } } diff --git a/cas/store/memory_store.rs b/cas/store/memory_store.rs index 8fa3117a1..ab1d9a554 100644 --- a/cas/store/memory_store.rs +++ b/cas/store/memory_store.rs @@ -9,17 +9,19 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use common::DigestInfo; use error::{error_if, Code, Error, ResultExt}; -use traits::StoreTrait; +use traits::{StoreConfig, StoreTrait}; #[derive(Debug)] pub struct MemoryStore { map: Mutex>>>, + verify_size: bool, } impl MemoryStore { - pub fn new() -> Self { + pub fn new(config: &StoreConfig) -> Self { MemoryStore { map: Mutex::new(HashMap::new()), + verify_size: config.verify_size, } } } @@ -39,7 +41,7 @@ impl StoreTrait for MemoryStore { let mut buffer = Vec::new(); let read_size = reader.read_to_end(&mut buffer).await? as i64; error_if!( - read_size != digest.size_bytes, + self.verify_size && read_size != digest.size_bytes, "Expected size {} but got size {} for hash {} CAS insert", digest.size_bytes, read_size, diff --git a/cas/store/store_trait.rs b/cas/store/store_trait.rs index ac39c1373..55d1421ed 100644 --- a/cas/store/store_trait.rs +++ b/cas/store/store_trait.rs @@ -6,6 +6,17 @@ use tokio::io::{AsyncRead, AsyncWrite}; use common::DigestInfo; use error::Error; +pub enum StoreType { + Memory, +} + +pub struct StoreConfig { + pub store_type: StoreType, + + // If we need to verify the digest size of what is being uploaded. + pub verify_size: bool, +} + #[async_trait] pub trait StoreTrait: Sync + Send { async fn has(&self, digest: &DigestInfo) -> Result; diff --git a/cas/store/tests/memory_store_test.rs b/cas/store/tests/memory_store_test.rs index 2b99dc3cc..e884f2464 100644 --- a/cas/store/tests/memory_store_test.rs +++ b/cas/store/tests/memory_store_test.rs @@ -9,13 +9,16 @@ mod memory_store_tests { use common::DigestInfo; use memory_store::MemoryStore; - use traits::StoreTrait; + use traits::{StoreConfig, StoreTrait, StoreType}; const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; #[tokio::test] async fn insert_one_item_then_update() -> Result<(), Error> { - let store = MemoryStore::new(); + let store = MemoryStore::new(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); { // Insert dummy value into store. @@ -69,9 +72,30 @@ mod memory_store_tests { const TOO_SHORT_HASH: &str = "100000000000000000000000000000000000000000000000000000000000001"; const INVALID_HASH: &str = "g111111111111111111111111111111111111111111111111111111111111111"; + #[tokio::test] + async fn verify_size_false_passes_on_update() -> Result<(), Error> { + let store = MemoryStore::new(&StoreConfig { + store_type: StoreType::Memory, + verify_size: false, + }); + const VALUE1: &str = "123"; + let digest = DigestInfo::try_new(&VALID_HASH1, 100).unwrap(); + let result = store.update(&digest, Box::new(Cursor::new(VALUE1))).await; + assert_eq!( + result, + Ok(()), + "Should have succeeded when verify_size = false, got: {:?}", + result + ); + Ok(()) + } + #[tokio::test] async fn errors_with_invalid_inputs() -> Result<(), Error> { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(&StoreConfig { + store_type: StoreType::Memory, + verify_size: true, + }); const VALUE1: &str = "123"; { // .has() tests.