Skip to content

Commit

Permalink
Stores now support non-exact upload sizes & S3 store optimizations
Browse files Browse the repository at this point in the history
S3 store now is optimized and can upload multiple parts concurrently.
Non-exact upload sizes is supported for a follow-up change which
will allow compression to and from backend stores.
  • Loading branch information
allada committed Nov 5, 2021
1 parent a4634eb commit d093fdd
Show file tree
Hide file tree
Showing 35 changed files with 993 additions and 283 deletions.
86 changes: 84 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rusoto_core = "0.46.0"
http = "^0.2"
pin-project-lite = "0.2.7"
fast-async-mutex = "0.6.7"
lease = "0.2.3"

[dev-dependencies]
clap = "2.33.3"
Expand Down
6 changes: 3 additions & 3 deletions cas/grpc_service/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use proto::build::bazel::remote::execution::v2::{
use common::{log, DigestInfo};
use config::cas_server::{AcStoreConfig, InstanceName};
use error::{make_input_err, Code, Error, ResultExt};
use store::{Store, StoreManager};
use store::{Store, StoreManager, UploadSizeInfo};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
// 1.2k. Giving a bit more just in case to reduce allocs.
Expand Down Expand Up @@ -103,9 +103,9 @@ impl AcServer {
.err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))?
.as_ref(),
);
let expected_size = store_data.len();
let size_info = UploadSizeInfo::ExactSize(store_data.len());
store
.update(digest, Box::new(Cursor::new(store_data)), expected_size)
.update(digest, Box::new(Cursor::new(store_data)), size_info)
.await?;
Ok(Response::new(action_result))
}
Expand Down
8 changes: 6 additions & 2 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use proto::google::bytestream::{
use common::{log, DigestInfo};
use config::cas_server::ByteStreamConfig;
use error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use store::{Store, StoreManager};
use store::{Store, StoreManager, UploadSizeInfo};

pub struct ByteStreamServer {
stores: HashMap<String, Arc<dyn Store>>,
Expand Down Expand Up @@ -182,7 +182,11 @@ impl ByteStreamServer {
let rx = Box::new(rx.take(expected_size as u64));
let store = Pin::new(store_clone.as_ref());
store
.update(DigestInfo::try_new(&hash, expected_size)?, rx, expected_size)
.update(
DigestInfo::try_new(&hash, expected_size)?,
rx,
UploadSizeInfo::ExactSize(expected_size),
)
.await
})
};
Expand Down
4 changes: 2 additions & 2 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use proto::google::rpc::Status as GrpcStatus;
use common::{log, DigestInfo};
use config::cas_server::{CasStoreConfig, InstanceName};
use error::{error_if, make_input_err, Error, ResultExt};
use store::{Store, StoreManager};
use store::{Store, StoreManager, UploadSizeInfo};

pub struct CasServer {
stores: HashMap<String, Arc<dyn Store>>,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl CasServer {
let cursor = Box::new(Cursor::new(request_data));
let store = Pin::new(store_owned.as_ref());
store
.update(digest_copy, cursor, size_bytes)
.update(digest_copy, cursor, UploadSizeInfo::ExactSize(size_bytes))
.await
.err_tip(|| "Error writing to store")
}
Expand Down
8 changes: 6 additions & 2 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ac_server::AcServer;
use common::DigestInfo;
use config;
use error::Error;
use store::{Store, StoreManager};
use store::{Store, StoreManager, UploadSizeInfo};

const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand All @@ -28,7 +28,11 @@ async fn insert_into_store<T: Message>(
let data_len = store_data.len();
let digest = DigestInfo::try_new(&hash, data_len as i64)?;
store
.update(digest.clone(), Box::new(Cursor::new(store_data)), data_len)
.update(
digest.clone(),
Box::new(Cursor::new(store_data)),
UploadSizeInfo::ExactSize(data_len),
)
.await?;
Ok(digest.size_bytes)
}
Expand Down
14 changes: 11 additions & 3 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tonic::Request;
use common::DigestInfo;
use config;
use error::{make_err, Code, Error, ResultExt};
use store::StoreManager;
use store::{StoreManager, UploadSizeInfo};

const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand Down Expand Up @@ -197,7 +197,11 @@ pub mod read_tests {

let digest = DigestInfo::try_new(&HASH1, VALUE1.len())?;
store
.update(digest, Box::new(Cursor::new(VALUE1)), VALUE1.len())
.update(
digest,
Box::new(Cursor::new(VALUE1)),
UploadSizeInfo::ExactSize(VALUE1.len()),
)
.await?;

let read_request = ReadRequest {
Expand Down Expand Up @@ -245,7 +249,11 @@ pub mod read_tests {
let data_len = raw_data.len();
let digest = DigestInfo::try_new(&HASH1, data_len)?;
store
.update(digest, Box::new(Cursor::new(raw_data.clone())), data_len)
.update(
digest,
Box::new(Cursor::new(raw_data.clone())),
UploadSizeInfo::ExactSize(data_len),
)
.await?;

let read_request = ReadRequest {
Expand Down
12 changes: 6 additions & 6 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use cas_server::CasServer;
use common::DigestInfo;
use config;
use error::Error;
use store::StoreManager;
use store::{StoreManager, UploadSizeInfo};

const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
Expand Down Expand Up @@ -84,7 +84,7 @@ mod find_missing_blobs {
.update(
DigestInfo::try_new(HASH1, VALUE.len())?,
Box::new(Cursor::new(VALUE)),
VALUE.len(),
UploadSizeInfo::ExactSize(VALUE.len()),
)
.await?;
let raw_response = cas_server
Expand Down Expand Up @@ -115,7 +115,7 @@ mod find_missing_blobs {
.update(
DigestInfo::try_new(HASH1, VALUE.len())?,
Box::new(Cursor::new(VALUE)),
VALUE.len(),
UploadSizeInfo::ExactSize(VALUE.len()),
)
.await?;
let raw_response = cas_server
Expand Down Expand Up @@ -177,7 +177,7 @@ mod batch_update_blobs {
.update(
DigestInfo::try_new(&HASH1, VALUE1.len())?,
Box::new(Cursor::new(VALUE1)),
VALUE1.len(),
UploadSizeInfo::ExactSize(VALUE1.len()),
)
.await
.expect("Update should have succeeded");
Expand Down Expand Up @@ -258,15 +258,15 @@ mod batch_read_blobs {
.update(
DigestInfo::try_new(&HASH1, VALUE1.len())?,
Box::new(Cursor::new(VALUE1)),
VALUE1.len(),
UploadSizeInfo::ExactSize(VALUE1.len()),
)
.await
.expect("Update should have succeeded");
store
.update(
DigestInfo::try_new(&HASH2, VALUE2.len())?,
Box::new(Cursor::new(VALUE2)),
VALUE2.len(),
UploadSizeInfo::ExactSize(VALUE2.len()),
)
.await
.expect("Update should have succeeded");
Expand Down
3 changes: 1 addition & 2 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,13 @@ rust_library(
"//config",
"//third_party:futures",
"//third_party:http",
"//third_party:lease",
"//third_party:rand",
"//third_party:rusoto_core",
"//third_party:rusoto_s3",
"//third_party:tokio",
"//third_party:tokio_util",
"//third_party:fast_async_mutex",
"//util:common",
"//util:async_read_taker",
"//util:error",
"//util:retry",
":traits",
Expand Down
2 changes: 1 addition & 1 deletion cas/store/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use memory_store::MemoryStore;
use s3_store::S3Store;
use verify_store::VerifyStore;

pub use traits::{StoreTrait as Store, StoreType};
pub use traits::{StoreTrait as Store, StoreType, UploadSizeInfo};

pub struct StoreManager {
stores: HashMap<String, Arc<dyn Store>>,
Expand Down
11 changes: 8 additions & 3 deletions cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common::DigestInfo;
use config;
use error::{Code, ResultExt};
use evicting_map::EvictingMap;
use traits::{ResultFuture, StoreTrait};
use traits::{ResultFuture, StoreTrait, UploadSizeInfo};

pub struct MemoryStore {
map: Mutex<EvictingMap<Instant>>,
Expand Down Expand Up @@ -41,11 +41,16 @@ impl StoreTrait for MemoryStore {
self: std::pin::Pin<&'a Self>,
digest: DigestInfo,
mut reader: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
expected_size: usize,
size_info: UploadSizeInfo,
) -> ResultFuture<'a, ()> {
Box::pin(async move {
let mut buffer = Vec::with_capacity(expected_size);
let max_size = match size_info {
UploadSizeInfo::ExactSize(sz) => sz,
UploadSizeInfo::MaxSize(sz) => sz,
};
let mut buffer = Vec::with_capacity(max_size);
reader.read_to_end(&mut buffer).await?;
buffer.shrink_to_fit();
let mut map = self.map.lock().await;
map.insert(digest, Arc::new(buffer));
Ok(())
Expand Down
Loading

0 comments on commit d093fdd

Please sign in to comment.