Skip to content

Commit

Permalink
get_action_result done with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Dec 27, 2020
1 parent 221ed5f commit fcc8a31
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 71 deletions.
5 changes: 5 additions & 0 deletions cas/grpc_service/BUILD
Expand Up @@ -22,8 +22,12 @@ rust_library(
deps = [
"//proto",
"//cas/store",
"//util:common",
"//util:macros",
"//third_party:stdext",
"//third_party:prost",
"//third_party:tonic",
"//third_party:tokio",
],
visibility = ["//cas:__pkg__"]
)
Expand Down Expand Up @@ -72,5 +76,6 @@ rust_test(
"//proto",
"//third_party:tonic",
"//third_party:tokio",
"//third_party:prost",
],
)
43 changes: 37 additions & 6 deletions cas/grpc_service/ac_server.rs
@@ -1,12 +1,18 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.

use std::convert::TryFrom;
use std::io::Cursor;
use std::sync::Arc;

use prost::Message;
use tonic::{Request, Response, Status};

use proto::build::bazel::remote::execution::v2::{
action_cache_server::ActionCache, action_cache_server::ActionCacheServer as Server,
ActionResult, GetActionResultRequest, UpdateActionResultRequest,
};
use std::sync::Arc;

use macros::error_if;
use store::Store;

#[derive(Debug)]
Expand All @@ -32,12 +38,37 @@ impl AcServer {
impl ActionCache for AcServer {
async fn get_action_result(
&self,
_grpc_request: Request<GetActionResultRequest>,
grpc_request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
use stdext::function_name;
let output = format!("{} not yet implemented", function_name!());
println!("{}", output);
Err(Status::unimplemented(output))
let get_action_request = grpc_request.into_inner();

// TODO(blaise.bruer) This needs to be fixed. It is using wrong macro.
// We also should write a test for these errors.
let digest = get_action_request
.action_digest
.ok_or(Status::invalid_argument(
"Action digest was not set in message",
))?;
let size_bytes = usize::try_from(digest.size_bytes).or(Err(Status::invalid_argument(
"Digest size_bytes was not convertable to usize",
)))?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let mut store_data = Vec::with_capacity(size_bytes);
self.ac_store
.get(&digest.hash, size_bytes, &mut Cursor::new(&mut store_data))
.await
.or(Err(Status::not_found("")))?;

let action_result = ActionResult::decode(Cursor::new(&store_data)).or(Err(
Status::not_found("Stored value appears to be corrupt."),
))?;

error_if!(
store_data.len() != size_bytes,
Status::not_found("Found item, but size does not match")
);
Ok(Response::new(action_result))
}

async fn update_action_result(
Expand Down
16 changes: 9 additions & 7 deletions cas/grpc_service/cas_server.rs
Expand Up @@ -12,7 +12,7 @@ use tokio::io::Error;
use tonic::{Request, Response, Status};

use common;
use macros::{error_if, make_err};
use macros::{error_if, make_input_err};
use proto::build::bazel::remote::execution::v2::{
batch_read_blobs_response, batch_update_blobs_response,
content_addressable_storage_server::ContentAddressableStorage,
Expand Down Expand Up @@ -71,15 +71,17 @@ impl ContentAddressableStorage for CasServer {
let result_status: Result<(), Error> = try {
let digest = request
.digest
.ok_or(make_err!("Digest not found in request"))?;
let size_bytes = usize::try_from(digest.size_bytes).or(Err(make_err!(
.ok_or(make_input_err!("Digest not found in request"))?;
let size_bytes = usize::try_from(digest.size_bytes).or(Err(make_input_err!(
"Digest size_bytes was not convertable to usize"
)))?;
error_if!(
size_bytes != request.data.len(),
"Digest for upload had mismatching sizes, digest said {} data said {}",
size_bytes,
request.data.len()
make_input_err!(
"Digest for upload had mismatching sizes, digest said {} data said {}",
size_bytes,
request.data.len()
)
);
self.store
.update(
Expand Down Expand Up @@ -107,7 +109,7 @@ impl ContentAddressableStorage for CasServer {
responses: Vec::with_capacity(batch_read_request.digests.len()),
};
for digest in batch_read_request.digests {
let size_bytes = usize::try_from(digest.size_bytes).or(Err(make_err!(
let size_bytes = usize::try_from(digest.size_bytes).or(Err(make_input_err!(
"Digest size_bytes was not convertable to usize"
)))?;
// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
Expand Down
92 changes: 77 additions & 15 deletions cas/grpc_service/tests/ac_server_test.rs
@@ -1,12 +1,16 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.

use tonic::{Code, Request};
use std::io::Cursor;

use tokio::io::Error;
use tonic::{Code, Request, Response, Status};

use prost::Message;
use proto::build::bazel::remote::execution::v2::Digest;

use ac_server::AcServer;
use proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache;
use store::{create_store, StoreType};
use proto::build::bazel::remote::execution::v2::{action_cache_server::ActionCache, ActionResult};
use store::{create_store, Store, StoreType};

const INSTANCE_NAME: &str = "foo";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand All @@ -17,29 +21,87 @@ mod get_action_results {

use proto::build::bazel::remote::execution::v2::GetActionResultRequest;

#[tokio::test]
#[ignore]
async fn empty_store() {
let ac_store = create_store(&StoreType::Memory);
let cas_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), cas_store.clone());
async fn insert_into_store<T: Message>(
store: &dyn Store,
hash: &str,
action_result: &T,
) -> Result<i64, Error> {
let mut store_data = Vec::new();
action_result.encode(&mut store_data)?;
let digest_size = store_data.len() as i64;
store
.update(&hash, store_data.len(), Box::new(Cursor::new(store_data)))
.await?;
Ok(digest_size)
}

let raw_response = ac_server
async fn get_action_result(
ac_server: &AcServer,
hash: &str,
size: i64,
) -> Result<Response<ActionResult>, Status> {
ac_server
.get_action_result(Request::new(GetActionResultRequest {
instance_name: INSTANCE_NAME.to_string(),
action_digest: Some(Digest {
hash: HASH1.to_string(),
size_bytes: 0,
hash: hash.to_string(),
size_bytes: size,
}),
inline_stdout: false,
inline_stderr: false,
inline_output_files: vec![],
}))
.await;
.await
}

#[tokio::test]
async fn empty_store() -> Result<(), Error> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));

let raw_response = get_action_result(&ac_server, HASH1, 0).await;

let err = raw_response.unwrap_err();
assert_eq!(err.code(), Code::NotFound);
assert_eq!(err.message(), "");
Ok(())
}

#[tokio::test]
async fn has_single_item() -> Result<(), Error> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));

let mut action_result = ActionResult::default();
action_result.exit_code = 45;

let digest_size = insert_into_store(ac_store.as_ref(), &HASH1, &action_result).await?;
let raw_response = get_action_result(&ac_server, HASH1, digest_size).await;

assert!(
raw_response.is_ok(),
"Expected value, got error {:?}",
raw_response
);
assert_eq!(raw_response.unwrap().into_inner(), action_result);
Ok(())
}

#[tokio::test]
async fn single_item_wrong_digest_size() -> Result<(), Error> {
let ac_store = create_store(&StoreType::Memory);
let ac_server = AcServer::new(ac_store.clone(), create_store(&StoreType::Memory));

let mut action_result = ActionResult::default();
action_result.exit_code = 45;

let digest_size = insert_into_store(ac_store.as_ref(), &HASH1, &action_result).await?;
assert!(digest_size > 1, "Digest was too small");
let raw_response = get_action_result(&ac_server, HASH1, digest_size - 1).await;

assert!(raw_response.is_err());
let err = raw_response.unwrap_err();
assert_eq!(err.code(), Code::NotFound);
assert_eq!(err.message(), format!("Hash {} not found", HASH1));
assert_eq!(err.message(), "Found item, but size does not match");
Ok(())
}
}
20 changes: 11 additions & 9 deletions cas/store/memory_store.rs
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use hex::FromHex;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error, ErrorKind};

use macros::{error_if, make_err, make_err_with_code};
use macros::{error_if, make_err, make_input_err};
use traits::StoreTrait;

use async_mutex::Mutex;
Expand All @@ -30,7 +30,7 @@ impl StoreTrait for MemoryStore {
async fn has(&self, hash: &str, _expected_size: usize) -> Result<bool, Error> {
let raw_key = <[u8; 32]>::from_hex(&hash).or_else(|_| {
println!("Foobar");
Err(make_err!("Hex length is not 64 hex characters"))
Err(make_input_err!("Hex length is not 64 hex characters"))
})?;
let map = self.map.lock().await;
Ok(map.contains_key(&raw_key))
Expand All @@ -43,15 +43,17 @@ impl StoreTrait for MemoryStore {
mut reader: Box<dyn AsyncRead + Send + Unpin + 'b>,
) -> Result<(), Error> {
let raw_key = <[u8; 32]>::from_hex(&hash)
.or(Err(make_err!("Hex length is not 64 hex characters")))?;
.or(Err(make_input_err!("Hex length is not 64 hex characters")))?;
let mut buffer = Vec::new();
let read_size = reader.read_to_end(&mut buffer).await?;
error_if!(
read_size != expected_size,
"Expected size {} but got size {} for hash {} CAS insert",
expected_size,
read_size,
hash
make_input_err!(
"Expected size {} but got size {} for hash {} CAS insert",
expected_size,
read_size,
hash
)
);
let mut map = self.map.lock().await;
map.insert(raw_key, Arc::new(buffer));
Expand All @@ -65,9 +67,9 @@ impl StoreTrait for MemoryStore {
writer: &mut (dyn AsyncWrite + Send + Unpin),
) -> Result<(), Error> {
let raw_key = <[u8; 32]>::from_hex(&hash)
.or(Err(make_err!("Hex length is not 64 hex characters")))?;
.or(Err(make_input_err!("Hex length is not 64 hex characters")))?;
let map = self.map.lock().await;
let value = map.get(&raw_key).ok_or(make_err_with_code!(
let value = map.get(&raw_key).ok_or(make_err!(
ErrorKind::NotFound,
"Trying to get object but could not find hash: {}",
hash
Expand Down
1 change: 1 addition & 0 deletions util/BUILD
Expand Up @@ -7,6 +7,7 @@ rust_library(
srcs = ["macros.rs"],
deps = [
"//third_party:tokio",
"//third_party:tonic",
],
visibility = ["//visibility:public"],
)
Expand Down
59 changes: 35 additions & 24 deletions util/common.rs
Expand Up @@ -5,32 +5,43 @@ use std::result::Result;
use tokio::io::{Error, ErrorKind};

use proto::google::rpc::Status as GrpcStatus;
use tonic::Code;
use tonic::{Code, Status as TonicStatus};

pub fn result_to_grpc_status(result: Result<(), Error>) -> GrpcStatus {
fn kind_to_grpc_code(kind: &ErrorKind) -> Code {
match kind {
ErrorKind::NotFound => Code::NotFound,
ErrorKind::PermissionDenied => Code::PermissionDenied,
ErrorKind::ConnectionRefused => Code::Unavailable,
ErrorKind::ConnectionReset => Code::Unavailable,
ErrorKind::ConnectionAborted => Code::Unavailable,
ErrorKind::NotConnected => Code::Internal,
ErrorKind::AddrInUse => Code::Internal,
ErrorKind::AddrNotAvailable => Code::Internal,
ErrorKind::BrokenPipe => Code::Internal,
ErrorKind::AlreadyExists => Code::AlreadyExists,
ErrorKind::WouldBlock => Code::Internal,
ErrorKind::InvalidInput => Code::InvalidArgument,
ErrorKind::InvalidData => Code::InvalidArgument,
ErrorKind::TimedOut => Code::DeadlineExceeded,
ErrorKind::WriteZero => Code::Internal,
ErrorKind::Interrupted => Code::Aborted,
ErrorKind::Other => Code::Internal,
ErrorKind::UnexpectedEof => Code::Internal,
_ => Code::Internal,
}
fn kind_to_grpc_code(kind: &ErrorKind) -> Code {
match kind {
ErrorKind::NotFound => Code::NotFound,
ErrorKind::PermissionDenied => Code::PermissionDenied,
ErrorKind::ConnectionRefused => Code::Unavailable,
ErrorKind::ConnectionReset => Code::Unavailable,
ErrorKind::ConnectionAborted => Code::Unavailable,
ErrorKind::NotConnected => Code::Internal,
ErrorKind::AddrInUse => Code::Internal,
ErrorKind::AddrNotAvailable => Code::Internal,
ErrorKind::BrokenPipe => Code::Internal,
ErrorKind::AlreadyExists => Code::AlreadyExists,
ErrorKind::WouldBlock => Code::Internal,
ErrorKind::InvalidInput => Code::InvalidArgument,
ErrorKind::InvalidData => Code::InvalidArgument,
ErrorKind::TimedOut => Code::DeadlineExceeded,
ErrorKind::WriteZero => Code::Internal,
ErrorKind::Interrupted => Code::Aborted,
ErrorKind::Other => Code::Internal,
ErrorKind::UnexpectedEof => Code::Internal,
_ => Code::Internal,
}
}

pub fn result_to_tonic_status(result: Result<(), Error>) -> TonicStatus {
match result {
Ok(()) => TonicStatus::ok(""),
Err(error) => TonicStatus::new(
kind_to_grpc_code(&error.kind()),
format!("Error: {:?}", error),
),
}
}

pub fn result_to_grpc_status(result: Result<(), Error>) -> GrpcStatus {
match result {
Ok(()) => GrpcStatus {
code: Code::Ok as i32,
Expand Down

0 comments on commit fcc8a31

Please sign in to comment.