Skip to content

Commit

Permalink
Store now has flag of when to verify the size of the digest
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 3, 2021
1 parent 25bef4a commit 6c70370
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 64 deletions.
12 changes: 9 additions & 3 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use execution_server::ExecutionServer;
use store;
use store::{StoreConfig, StoreType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse()?;

let ac_store = store::create_store(&store::StoreType::Memory);
let cas_store = store::create_store(&store::StoreType::Memory);
let ac_store = store::create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let cas_store = store::create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});

Server::builder()
.add_service(AcServer::new(ac_store, cas_store.clone()).into_service())
Expand Down
26 changes: 10 additions & 16 deletions cas/grpc_service/ac_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.

use std::convert::TryFrom;
use std::convert::TryInto;
use std::io::Cursor;
use std::sync::Arc;
Expand All @@ -14,7 +13,7 @@ use proto::build::bazel::remote::execution::v2::{
};

use common::DigestInfo;
use error::{error_if, make_err, Code, Error, ResultExt};
use error::{make_err, Code, Error, ResultExt};
use store::Store;

pub struct AcServer {
Expand Down Expand Up @@ -82,24 +81,16 @@ impl AcServer {
.err_tip(|| "Action digest was not set in message")?
.try_into()?;

let size_bytes = usize::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertable to usize")?;

let action_result = update_action_request
.action_result
.err_tip(|| "Action result was not set in message")?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let mut store_data = Vec::with_capacity(size_bytes);
let mut store_data = Vec::new();
action_result
.encode(&mut store_data)
.err_tip(|| "Provided ActionResult could not be serialized")?;
error_if!(
store_data.len() != size_bytes,
"Digest size does not match. Actual: {} Expected: {} ",
store_data.len(),
size_bytes
);

self.ac_store
.update(&digest, Box::new(Cursor::new(store_data)))
.await?;
Expand All @@ -113,19 +104,22 @@ impl ActionCache for AcServer {
&self,
grpc_request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
println!("get_action_result Req: {:?}", grpc_request);
println!("\x1b[0;31mget_action_result Req\x1b[0m: {:?}", grpc_request);
let resp = self.inner_get_action_result(grpc_request).await;
println!("get_action_result Resp: {:?}", resp);
println!("\x1b[0;31mget_action_result Resp\x1b[0m: {:?}", resp);
return resp.map_err(|e| e.into());
}

async fn update_action_result(
&self,
grpc_request: Request<UpdateActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
println!("update_action_result Req: {:?}", grpc_request);
println!(
"\x1b[0;31mupdate_action_result Req\x1b[0m: {:?}",
grpc_request
);
let resp = self.inner_update_action_result(grpc_request).await;
println!("update_action_result Resp: {:?}", resp);
println!("\x1b[0;31mupdate_action_result Resp\x1b[0m: {:?}", resp);
return resp.map_err(|e| e.into());
}
}
16 changes: 3 additions & 13 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl<'a> ResourceInfo<'a> {
struct WriteRequestStreamWrapper {
stream: Streaming<WriteRequest>,
current_msg: WriteRequest,
original_resource_name: String,
hash: String,
expected_size: usize,
is_first: bool,
Expand All @@ -141,15 +140,13 @@ impl WriteRequestStreamWrapper {
.err_tip(|| "Error receiving first message in stream")?
.err_tip(|| "Expected WriteRequest struct in stream")?;

let original_resource_name = current_msg.resource_name.clone();
let resource_info = ResourceInfo::new(&original_resource_name)
let resource_info = ResourceInfo::new(&current_msg.resource_name)
.err_tip(|| "Could not extract resource info from first message of stream")?;
let hash = resource_info.hash.to_string();
let expected_size = resource_info.expected_size;
Ok(WriteRequestStreamWrapper {
stream,
current_msg,
original_resource_name,
hash,
expected_size,
is_first: true,
Expand Down Expand Up @@ -186,13 +183,6 @@ impl WriteRequestStreamWrapper {
.err_tip(|| "Expected WriteRequest struct in stream")?;
self.bytes_received += self.current_msg.data.len();

error_if!(
self.original_resource_name != self.current_msg.resource_name,
"Resource name missmatch, expected {} got {}",
self.original_resource_name,
self.current_msg.resource_name
);

Ok(Some(&self.current_msg))
}
}
Expand All @@ -205,7 +195,7 @@ impl ByteStream for ByteStreamServer {
&self,
_grpc_request: Request<ReadRequest>,
) -> Result<Response<Self::ReadStream>, Status> {
println!("read {:?}", _grpc_request);
println!("\x1b[0;31mread\x1b[0m {:?}", _grpc_request);
Err(Status::unimplemented(""))
}

Expand All @@ -219,7 +209,7 @@ impl ByteStream for ByteStreamServer {
.await
.err_tip(|| format!("Failed on write() command"))
.map_err(|e| e.into());
println!("Write Resp: {:?}", resp);
println!("\x1b[0;31mWrite Resp\x1b[0m: {:?}", resp);
resp
}

Expand Down
54 changes: 45 additions & 9 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use proto::build::bazel::remote::execution::v2::{

use ac_server::AcServer;
use common::DigestInfo;
use store::{create_store, Store, StoreType};
use store::{create_store, Store, StoreConfig, StoreType};

const INSTANCE_NAME: &str = "foo";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand Down Expand Up @@ -58,8 +58,17 @@ mod get_action_results {

#[tokio::test]
async fn empty_store() -> Result<(), Box<dyn std::error::Error>> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));
let ac_store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let ac_server = AcServer::new(
ac_store.clone(),
create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
}),
);

let raw_response = get_action_result(&ac_server, HASH1, 0).await;

Expand All @@ -74,8 +83,17 @@ mod get_action_results {

#[tokio::test]
async fn has_single_item() -> Result<(), Box<dyn std::error::Error>> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));
let ac_store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let ac_server = AcServer::new(
ac_store.clone(),
create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
}),
);

let mut action_result = ActionResult::default();
action_result.exit_code = 45;
Expand All @@ -94,8 +112,17 @@ mod get_action_results {

#[tokio::test]
async fn single_item_wrong_digest_size() -> Result<(), Box<dyn std::error::Error>> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));
let ac_store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let ac_server = AcServer::new(
ac_store.clone(),
create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
}),
);

let mut action_result = ActionResult::default();
action_result.exit_code = 45;
Expand Down Expand Up @@ -141,8 +168,17 @@ mod update_action_result {

#[tokio::test]
async fn one_item_update_test() -> Result<(), Box<dyn std::error::Error>> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));
let ac_store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: false,
});
let ac_server = AcServer::new(
ac_store.clone(),
create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
}),
);

let mut action_result = ActionResult::default();
action_result.exit_code = 45;
Expand Down
7 changes: 5 additions & 2 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bytestream_server::ByteStreamServer;
use tonic::Request;

use common::DigestInfo;
use store::{create_store, StoreType};
use store::{create_store, StoreConfig, StoreType};

const INSTANCE_NAME: &str = "foo";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand Down Expand Up @@ -60,7 +60,10 @@ pub mod write_tests {

#[tokio::test]
pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let bs_server = ByteStreamServer::new(store.clone());

// Setup stream.
Expand Down
32 changes: 25 additions & 7 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use proto::google::rpc::Status as GrpcStatus;

use cas_server::CasServer;
use common::DigestInfo;
use store::{create_store, StoreType};
use store::{create_store, StoreConfig, StoreType};

const INSTANCE_NAME: &str = "foo";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand All @@ -28,7 +28,10 @@ mod find_missing_blobs {

#[tokio::test]
async fn empty_store() {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

let raw_response = cas_server
Expand All @@ -47,7 +50,10 @@ mod find_missing_blobs {

#[tokio::test]
async fn store_one_item_existence() -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

const VALUE: &str = "1";
Expand Down Expand Up @@ -75,7 +81,10 @@ mod find_missing_blobs {

#[tokio::test]
async fn has_three_requests_one_bad_hash() -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

const VALUE: &str = "1";
Expand Down Expand Up @@ -129,7 +138,10 @@ mod batch_update_blobs {

#[tokio::test]
async fn update_existing_item() -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

const VALUE1: &str = "1";
Expand Down Expand Up @@ -203,7 +215,10 @@ mod batch_read_blobs {
#[tokio::test]
async fn batch_read_blobs_read_two_blobs_success_one_fail(
) -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

const VALUE1: &str = "1";
Expand Down Expand Up @@ -299,7 +314,10 @@ mod end_to_end {
#[tokio::test]
async fn batch_update_blobs_two_items_existence_with_third_missing(
) -> Result<(), Box<dyn std::error::Error>> {
let store = create_store(&StoreType::Memory);
let store = create_store(&StoreConfig {
store_type: StoreType::Memory,
verify_size: true,
});
let cas_server = CasServer::new(store.clone());

const VALUE1: &str = "1";
Expand Down
12 changes: 4 additions & 8 deletions cas/store/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

use std::sync::Arc;

pub use traits::StoreTrait as Store;
pub use traits::{StoreConfig, StoreTrait as Store, StoreType};

use memory_store::MemoryStore;

pub enum StoreType {
Memory,
}

pub fn create_store(store_type: &StoreType) -> Arc<dyn Store> {
match store_type {
StoreType::Memory => Arc::new(MemoryStore::new()),
pub fn create_store(config: &StoreConfig) -> Arc<dyn Store> {
match config.store_type {
StoreType::Memory => Arc::new(MemoryStore::new(&config)),
}
}
8 changes: 5 additions & 3 deletions cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

use common::DigestInfo;
use error::{error_if, Code, Error, ResultExt};
use traits::StoreTrait;
use traits::{StoreConfig, StoreTrait};

#[derive(Debug)]
pub struct MemoryStore {
map: Mutex<HashMap<[u8; 32], Arc<Vec<u8>>>>,
verify_size: bool,
}

impl MemoryStore {
pub fn new() -> Self {
pub fn new(config: &StoreConfig) -> Self {
MemoryStore {
map: Mutex::new(HashMap::new()),
verify_size: config.verify_size,
}
}
}
Expand All @@ -39,7 +41,7 @@ impl StoreTrait for MemoryStore {
let mut buffer = Vec::new();
let read_size = reader.read_to_end(&mut buffer).await? as i64;
error_if!(
read_size != digest.size_bytes,
self.verify_size && read_size != digest.size_bytes,
"Expected size {} but got size {} for hash {} CAS insert",
digest.size_bytes,
read_size,
Expand Down

0 comments on commit 6c70370

Please sign in to comment.