Skip to content

Commit

Permalink
Refactor code to use streams instead of AsyncRead/AsyncWrite
Browse files Browse the repository at this point in the history
This will greatly reduce the amount of copies needed when paired
with using `bytes` package. This is because `bytes` package will
not make copies of the data much of the time. Instead we will ship
these Bytes structs around and only when needed make a copy. These
structs also don't make copies most of the time when passed from
module to module and instead just point to the same memory.
  • Loading branch information
allada committed Nov 16, 2021
1 parent 7e111c1 commit 697a11c
Show file tree
Hide file tree
Showing 29 changed files with 713 additions and 901 deletions.
6 changes: 4 additions & 2 deletions cas/grpc_service/BUILD
Expand Up @@ -9,6 +9,7 @@ rust_library(
"//cas/store",
"//config",
"//proto",
"//third_party:bytes",
"//third_party:futures",
"//third_party:stdext",
"//third_party:tokio",
Expand All @@ -26,8 +27,8 @@ rust_library(
"//cas/store",
"//config",
"//proto",
"//third_party:bytes",
"//third_party:prost",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
Expand Down Expand Up @@ -58,7 +59,7 @@ rust_library(
"//third_party:futures",
"//third_party:tokio",
"//third_party:tonic",
"//util:async_fixed_buffer",
"//util:buf_channel",
"//util:common",
"//util:error",
],
Expand Down Expand Up @@ -104,6 +105,7 @@ rust_test(
"//cas/store",
"//config",
"//proto",
"//third_party:bytes",
"//third_party:maplit",
"//third_party:pretty_assertions",
"//third_party:prost",
Expand Down
29 changes: 19 additions & 10 deletions cas/grpc_service/ac_server.rs
Expand Up @@ -2,11 +2,11 @@

use std::collections::HashMap;
use std::convert::TryInto;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use bytes::BytesMut;
use prost::Message;
use tonic::{Request, Response, Status};

Expand All @@ -18,7 +18,7 @@ use proto::build::bazel::remote::execution::v2::{
use common::{log, DigestInfo};
use config::cas_server::{AcStoreConfig, InstanceName};
use error::{make_input_err, Code, Error, ResultExt};
use store::{Store, StoreManager, UploadSizeInfo};
use store::{Store, StoreManager};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
// 1.2k. Giving a bit more just in case to reduce allocs.
Expand Down Expand Up @@ -57,18 +57,27 @@ impl AcServer {
.err_tip(|| "Action digest was not set in message")?
.try_into()?;

let mut store_data = Vec::with_capacity(ESTIMATED_DIGEST_SIZE);
let mut cursor = Cursor::new(&mut store_data);
let instance_name = get_action_request.instance_name;
let store = Pin::new(
self.stores
.get(&instance_name)
.err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))?
.as_ref(),
);
store.get(digest.clone(), &mut cursor).await?;
let mut store_data_resp = store
.get_part_unchunked(digest.clone(), 0, None, Some(ESTIMATED_DIGEST_SIZE))
.await;
if let Err(err) = &mut store_data_resp {
if err.code == Code::NotFound {
// Trim the error code. Not Found is quite common and we don't want to send a large
// error (debug) message for something that is common. We resize to just the last
// message as it will be the most relevant.
err.messages.resize_with(1, || "".to_string());
}
}
let store_data = store_data_resp?;

let action_result = ActionResult::decode(Cursor::new(&store_data))
let action_result = ActionResult::decode(store_data)
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))?;

Ok(Response::new(action_result))
Expand All @@ -91,7 +100,7 @@ impl AcServer {
.action_result
.err_tip(|| "Action result was not set in message")?;

let mut store_data = Vec::with_capacity(ESTIMATED_DIGEST_SIZE);
let mut store_data = BytesMut::with_capacity(ESTIMATED_DIGEST_SIZE);
action_result
.encode(&mut store_data)
.err_tip(|| "Provided ActionResult could not be serialized")?;
Expand All @@ -103,10 +112,10 @@ impl AcServer {
.err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))?
.as_ref(),
);
let size_info = UploadSizeInfo::ExactSize(store_data.len());
store
.update(digest, Box::new(Cursor::new(store_data)), size_info)
.await?;
.update_oneshot(digest, store_data.freeze())
.await
.err_tip(|| "Failed to update in action cache")?;
Ok(Response::new(action_result))
}
}
Expand Down

0 comments on commit 697a11c

Please sign in to comment.