Skip to content

Commit

Permalink
Add much better way to do error logging with .err_tip()
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 2, 2021
1 parent 92912e6 commit 9ae49b6
Show file tree
Hide file tree
Showing 18 changed files with 584 additions and 350 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Expand Up @@ -28,7 +28,6 @@ bytes = "0.5.6"
tonic-build = "0.3.1"
pretty_assertions = "0.6.1"
rustfmt-nightly = "1.4.21"
futures-core = "0.3.8"

[package.metadata.raze.crates.prost-build.'*']
gen_buildrs = true
Expand Down
18 changes: 8 additions & 10 deletions cas/grpc_service/BUILD
Expand Up @@ -5,13 +5,12 @@ rust_library(
srcs = ["cas_server.rs"],
deps = [
"//proto",
"//util:macros",
"//third_party:tonic",
"//third_party:tokio",
"//third_party:futures_core",
"//third_party:futures",
"//third_party:stdext",
"//cas/store",
"//util:common",
"//util:error",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -22,8 +21,7 @@ rust_library(
deps = [
"//proto",
"//cas/store",
"//util:common",
"//util:macros",
"//util:error",
"//third_party:stdext",
"//third_party:prost",
"//third_party:tonic",
Expand All @@ -47,12 +45,12 @@ rust_library(
srcs = ["bytestream_server.rs"],
deps = [
"//proto",
"//third_party:futures_core",
"//third_party:tonic",
"//util:async_fixed_buffer",
"//cas/store",
"//third_party:tokio",
"//util:macros",
"//third_party:futures",
"//util:error",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -64,7 +62,7 @@ rust_library(
"//proto",
"//third_party:tonic",
"//third_party:stdext",
"//third_party:futures_core",
"//third_party:futures",
],
visibility = ["//cas:__pkg__"]
)
Expand All @@ -75,6 +73,7 @@ rust_test(
deps = [
":cas_server",
"//cas/store",
"//util:error",
"//proto",
"//third_party:tonic",
"//third_party:tokio",
Expand All @@ -88,7 +87,6 @@ rust_test(
deps = [
":ac_server",
"//cas/store",
"//util:macros",
"//proto",
"//third_party:tonic",
"//third_party:tokio",
Expand All @@ -104,10 +102,10 @@ rust_test(
":bytestream_server",
"//cas/store",
"//proto",
"//util:error",
"//third_party:tonic",
"//third_party:bytes",
"//third_party:tokio",
"//third_party:futures_core",
"//third_party:prost",
"//third_party:pretty_assertions",
],
Expand Down
73 changes: 35 additions & 38 deletions cas/grpc_service/ac_server.rs
Expand Up @@ -12,20 +12,19 @@ use proto::build::bazel::remote::execution::v2::{
ActionResult, GetActionResultRequest, UpdateActionResultRequest,
};

use macros::error_if;
use error::{error_if, make_err, Code, Error, ResultExt};
use store::Store;

#[derive(Debug)]
pub struct AcServer {
ac_store: Arc<dyn Store>,
cas_store: Arc<dyn Store>,
_cas_store: Arc<dyn Store>,
}

impl AcServer {
pub fn new(ac_store: Arc<dyn Store>, cas_store: Arc<dyn Store>) -> Self {
AcServer {
ac_store: ac_store,
cas_store: cas_store,
_cas_store: cas_store,
}
}

Expand All @@ -36,72 +35,70 @@ impl AcServer {
async fn inner_get_action_result(
&self,
grpc_request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
) -> Result<Response<ActionResult>, Error> {
let get_action_request = grpc_request.into_inner();

// TODO(blaise.bruer) This needs to be fixed. It is using wrong macro.
// We also should write a test for these errors.
let digest = get_action_request
.action_digest
.ok_or(Status::invalid_argument(
"Action digest was not set in message",
))?;
let size_bytes = usize::try_from(digest.size_bytes).or(Err(Status::invalid_argument(
"Digest size_bytes was not convertable to usize",
)))?;
.err_tip(|| "Action digest was not set in message")?;
let size_bytes = usize::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertable to usize")?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let mut store_data = Vec::with_capacity(size_bytes);
let mut cursor = Cursor::new(&mut store_data);
self.ac_store
.get(&digest.hash, size_bytes, &mut Cursor::new(&mut store_data))
.await
.or(Err(Status::not_found("")))?;

let action_result = ActionResult::decode(Cursor::new(&store_data)).or(Err(
Status::not_found("Stored value appears to be corrupt."),
))?;
.get(&digest.hash, size_bytes, &mut cursor)
.await?;

error_if!(
store_data.len() != size_bytes,
Status::not_found("Found item, but size does not match")
);
let action_result =
ActionResult::decode(Cursor::new(&store_data)).err_tip_with_code(|e| {
(
Code::NotFound,
format!("Stored value appears to be corrupt: {}", e),
)
})?;

if store_data.len() != size_bytes {
return Err(make_err!(
Code::NotFound,
"Found item, but size does not match"
));
}
Ok(Response::new(action_result))
}

async fn inner_update_action_result(
&self,
grpc_request: Request<UpdateActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
) -> Result<Response<ActionResult>, Error> {
let update_action_request = grpc_request.into_inner();

// TODO(blaise.bruer) This needs to be fixed. It is using wrong macro.
// We also should write a test for these errors.
let digest = update_action_request
.action_digest
.ok_or(Status::invalid_argument(
"Action digest was not set in message",
))?;
.err_tip(|| "Action digest was not set in message")?;

let size_bytes = usize::try_from(digest.size_bytes).or(Err(Status::invalid_argument(
"Digest size_bytes was not convertable to usize",
)))?;
let size_bytes = usize::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertable to usize")?;

let action_result = update_action_request
.action_result
.ok_or(Status::invalid_argument(
"Action result was not set in message",
))?;
.err_tip(|| "Action result was not set in message")?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let mut store_data = Vec::with_capacity(size_bytes);
action_result
.encode(&mut store_data)
.or(Err(Status::invalid_argument(
"Provided ActionResult could not be serialized",
)))?;
.err_tip(|| "Provided ActionResult could not be serialized")?;
error_if!(
store_data.len() != size_bytes,
Status::invalid_argument("Provided digest size does not match serialized size")
"Digest size does not match. Actual: {} Expected: {} ",
store_data.len(),
size_bytes
);
self.ac_store
.update(
Expand All @@ -123,7 +120,7 @@ impl ActionCache for AcServer {
println!("get_action_result Req: {:?}", grpc_request);
let resp = self.inner_get_action_result(grpc_request).await;
println!("get_action_result Resp: {:?}", resp);
return resp;
return resp.map_err(|e| e.into());
}

async fn update_action_result(
Expand All @@ -133,6 +130,6 @@ impl ActionCache for AcServer {
println!("update_action_result Req: {:?}", grpc_request);
let resp = self.inner_update_action_result(grpc_request).await;
println!("update_action_result Resp: {:?}", resp);
return resp;
return resp.map_err(|e| e.into());
}
}

0 comments on commit 9ae49b6

Please sign in to comment.