Skip to content

Commit

Permalink
Remove DigestInfo.trust_size & add expected_size to update() function
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Nov 3, 2021
1 parent 519fa9f commit e8a83eb
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 59 deletions.
17 changes: 10 additions & 7 deletions cas/grpc_service/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use config::cas_server::{AcStoreConfig, InstanceName};
use error::{make_input_err, Code, Error, ResultExt};
use store::{Store, StoreManager};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
// 1.2k. Giving a bit more just in case to reduce allocs.
const ESTIMATED_DIGEST_SIZE: usize = 2048;

pub struct AcServer {
stores: HashMap<String, Arc<dyn Store>>,
}
Expand Down Expand Up @@ -53,10 +57,7 @@ impl AcServer {
.err_tip(|| "Action digest was not set in message")?
.try_into()?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
// Note: We don't know the real size of action cache results, so we apx it here with size_bytes * 2
// but it might be larger or smaller.
let mut store_data = Vec::with_capacity((digest.size_bytes as usize) * 2);
let mut store_data = Vec::with_capacity(ESTIMATED_DIGEST_SIZE);
let mut cursor = Cursor::new(&mut store_data);
let instance_name = get_action_request.instance_name;
let store = Pin::new(
Expand Down Expand Up @@ -90,8 +91,7 @@ impl AcServer {
.action_result
.err_tip(|| "Action result was not set in message")?;

// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let mut store_data = Vec::new();
let mut store_data = Vec::with_capacity(ESTIMATED_DIGEST_SIZE);
action_result
.encode(&mut store_data)
.err_tip(|| "Provided ActionResult could not be serialized")?;
Expand All @@ -103,7 +103,10 @@ impl AcServer {
.err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))?
.as_ref(),
);
store.update(digest, Box::new(Cursor::new(store_data))).await?;
let expected_size = store_data.len();
store
.update(digest, Box::new(Cursor::new(store_data)), expected_size)
.await?;
Ok(Response::new(action_result))
}
}
Expand Down
4 changes: 3 additions & 1 deletion cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ impl ByteStreamServer {
tokio::spawn(async move {
let rx = Box::new(rx.take(expected_size as u64));
let store = Pin::new(store_clone.as_ref());
store.update(DigestInfo::try_new(&hash, expected_size)?, rx).await
store
.update(DigestInfo::try_new(&hash, expected_size)?, rx, expected_size)
.await
})
};

Expand Down
11 changes: 4 additions & 7 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ impl CasServer {
let inner_request = grpc_request.into_inner();
let instance_name = inner_request.instance_name;
for digest in inner_request.blob_digests.into_iter() {
let mut digest: DigestInfo = digest.try_into()?;
digest.trust_size = true;
let digest: DigestInfo = digest.try_into()?;
let store_owned = self
.stores
.get(&instance_name)
Expand Down Expand Up @@ -89,8 +88,7 @@ impl CasServer {
let inner_request = grpc_request.into_inner();
let instance_name = inner_request.instance_name;
for request in inner_request.requests {
let mut digest: DigestInfo = request.digest.err_tip(|| "Digest not found in request")?.try_into()?;
digest.trust_size = true;
let digest: DigestInfo = request.digest.err_tip(|| "Digest not found in request")?.try_into()?;
let digest_copy = digest.clone();
let store_owned = self
.stores
Expand All @@ -111,7 +109,7 @@ impl CasServer {
let cursor = Box::new(Cursor::new(request_data));
let store = Pin::new(store_owned.as_ref());
store
.update(digest_copy, cursor)
.update(digest_copy, cursor, size_bytes)
.await
.err_tip(|| "Error writing to store")
}
Expand All @@ -136,8 +134,7 @@ impl CasServer {
let inner_request = grpc_request.into_inner();
let instance_name = inner_request.instance_name;
for digest in inner_request.digests {
let mut digest: DigestInfo = digest.try_into()?;
digest.trust_size = true;
let digest: DigestInfo = digest.try_into()?;
let digest_copy = digest.clone();
let store_owned = self
.stores
Expand Down
7 changes: 5 additions & 2 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ async fn insert_into_store<T: Message>(
) -> Result<i64, Box<dyn std::error::Error>> {
let mut store_data = Vec::new();
action_result.encode(&mut store_data)?;
let digest = DigestInfo::try_new(&hash, store_data.len() as i64)?;
store.update(digest.clone(), Box::new(Cursor::new(store_data))).await?;
let data_len = store_data.len();
let digest = DigestInfo::try_new(&hash, data_len as i64)?;
store
.update(digest.clone(), Box::new(Cursor::new(store_data)), data_len)
.await?;
Ok(digest.size_bytes)
}

Expand Down
11 changes: 8 additions & 3 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ pub mod read_tests {
const VALUE1: &str = "12456789abcdefghijk";

let digest = DigestInfo::try_new(&HASH1, VALUE1.len())?;
store.update(digest, Box::new(Cursor::new(VALUE1))).await?;
store
.update(digest, Box::new(Cursor::new(VALUE1)), VALUE1.len())
.await?;

let read_request = ReadRequest {
resource_name: format!(
Expand Down Expand Up @@ -238,8 +240,11 @@ pub mod read_tests {
raw_data[5] = 42u8;
raw_data[DATA_SIZE - 2] = 43u8;

let digest = DigestInfo::try_new(&HASH1, raw_data.len())?;
store.update(digest, Box::new(Cursor::new(raw_data.clone()))).await?;
let data_len = raw_data.len();
let digest = DigestInfo::try_new(&HASH1, data_len)?;
store
.update(digest, Box::new(Cursor::new(raw_data.clone())), data_len)
.await?;

let read_request = ReadRequest {
resource_name: format!(
Expand Down
15 changes: 13 additions & 2 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ mod find_missing_blobs {

let store = Pin::new(store_owned.as_ref());
store
.update(DigestInfo::try_new(HASH1, VALUE.len())?, Box::new(Cursor::new(VALUE)))
.update(
DigestInfo::try_new(HASH1, VALUE.len())?,
Box::new(Cursor::new(VALUE)),
VALUE.len(),
)
.await?;
let raw_response = cas_server
.find_missing_blobs(Request::new(FindMissingBlobsRequest {
Expand All @@ -108,7 +112,11 @@ mod find_missing_blobs {

let store = Pin::new(store_owned.as_ref());
store
.update(DigestInfo::try_new(HASH1, VALUE.len())?, Box::new(Cursor::new(VALUE)))
.update(
DigestInfo::try_new(HASH1, VALUE.len())?,
Box::new(Cursor::new(VALUE)),
VALUE.len(),
)
.await?;
let raw_response = cas_server
.find_missing_blobs(Request::new(FindMissingBlobsRequest {
Expand Down Expand Up @@ -169,6 +177,7 @@ mod batch_update_blobs {
.update(
DigestInfo::try_new(&HASH1, VALUE1.len())?,
Box::new(Cursor::new(VALUE1)),
VALUE1.len(),
)
.await
.expect("Update should have succeeded");
Expand Down Expand Up @@ -249,13 +258,15 @@ mod batch_read_blobs {
.update(
DigestInfo::try_new(&HASH1, VALUE1.len())?,
Box::new(Cursor::new(VALUE1)),
VALUE1.len(),
)
.await
.expect("Update should have succeeded");
store
.update(
DigestInfo::try_new(&HASH2, VALUE2.len())?,
Box::new(Cursor::new(VALUE2)),
VALUE2.len(),
)
.await
.expect("Update should have succeeded");
Expand Down
3 changes: 2 additions & 1 deletion cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ impl StoreTrait for MemoryStore {
self: std::pin::Pin<&'a Self>,
digest: DigestInfo,
mut reader: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
expected_size: usize,
) -> ResultFuture<'a, ()> {
Box::pin(async move {
let mut buffer = Vec::with_capacity(digest.size_bytes as usize);
let mut buffer = Vec::with_capacity(expected_size);
reader.read_to_end(&mut buffer).await?;
let mut map = self.map.lock().await;
map.insert(digest, Arc::new(buffer));
Expand Down
14 changes: 5 additions & 9 deletions cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_read_taker::AsyncReadTaker;

// S3 parts cannot be smaller than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MIN_MULTIPART_SIZE: i64 = 5_000_000; // 5mb.
const MIN_MULTIPART_SIZE: usize = 5_000_000; // 5mb.

pub struct S3Store {
s3_client: S3Client,
Expand Down Expand Up @@ -154,6 +154,7 @@ impl StoreTrait for S3Store {
self: Pin<&'a Self>,
digest: DigestInfo,
reader: Box<dyn AsyncRead + Send + Unpin + Sync + 'static>,
expected_size: usize,
) -> ResultFuture<'a, ()> {
Box::pin(async move {
let s3_path = &self.make_s3_path(&digest);
Expand All @@ -163,15 +164,11 @@ impl StoreTrait for S3Store {
// it should be `trust_size = true` which we can upload in one chunk if small enough (and more efficient)
// but only if the size is small enough. We could use MAX_UPLOAD_PART_SIZE (5gb), but I think it's fine
// to use 5mb as a limit too.
if !digest.trust_size || digest.size_bytes < MIN_MULTIPART_SIZE {
if expected_size < MIN_MULTIPART_SIZE {
let put_object_request = PutObjectRequest {
bucket: self.bucket.to_owned(),
key: s3_path.to_owned(),
content_length: if digest.trust_size {
Some(digest.size_bytes)
} else {
None
},
content_length: Some(expected_size as i64),
body: Some(ByteStream::new(ReaderStream::new(reader))),
..Default::default()
};
Expand All @@ -185,8 +182,7 @@ impl StoreTrait for S3Store {

// S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
// 5mb and can have up to 10,000 parts.
let bytes_per_upload_part =
cmp::max(MIN_MULTIPART_SIZE, digest.size_bytes / (MIN_MULTIPART_SIZE - 1)) as usize;
let bytes_per_upload_part = cmp::max(MIN_MULTIPART_SIZE, expected_size / (MIN_MULTIPART_SIZE - 1));

let response = self
.s3_client
Expand Down
1 change: 1 addition & 0 deletions cas/store/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait StoreTrait: Sync + Send + Unpin {
self: Pin<&'a Self>,
digest: DigestInfo,
reader: Box<dyn AsyncRead + Send + Unpin + Sync + 'static>,
upload_size: usize,
) -> ResultFuture<'a, ()>;

fn get_part<'a>(
Expand Down
6 changes: 4 additions & 2 deletions cas/store/tests/memory_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod memory_store_tests {
.update(
DigestInfo::try_new(&VALID_HASH1, VALUE1.len())?,
Box::new(Cursor::new(VALUE1)),
VALUE1.len(),
)
.await?;
assert!(
Expand All @@ -46,6 +47,7 @@ mod memory_store_tests {
.update(
DigestInfo::try_new(&VALID_HASH1, VALUE2.len())?,
Box::new(Cursor::new(VALUE2)),
VALUE2.len(),
)
.await?;
store
Expand Down Expand Up @@ -78,7 +80,7 @@ mod memory_store_tests {

const VALUE1: &str = "1234";
let digest = DigestInfo::try_new(&VALID_HASH1, 4).unwrap();
store.update(digest.clone(), Box::new(Cursor::new(VALUE1))).await?;
store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 4).await?;

let mut store_data = Vec::new();
store
Expand Down Expand Up @@ -127,7 +129,7 @@ mod memory_store_tests {
assert!(
digest.is_err()
|| store
.update(digest.unwrap(), Box::new(Cursor::new(value)))
.update(digest.unwrap(), Box::new(Cursor::new(value)), expected_size)
.await
.is_err(),
".has() should have failed: {} {} {}",
Expand Down
7 changes: 4 additions & 3 deletions cas/store/tests/s3_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ mod s3_store_tests {
.update(
DigestInfo::try_new(&VALID_HASH1, 199)?,
Box::new(Cursor::new(send_data.clone())),
199,
)
.await?;
}
Expand Down Expand Up @@ -329,8 +330,7 @@ mod s3_store_tests {

{
// Send payload.
let mut digest = DigestInfo::try_new(&VALID_HASH1, send_data.len())?;
digest.trust_size = true;
let digest = DigestInfo::try_new(&VALID_HASH1, send_data.len())?;
let store = S3Store::new_with_client_and_jitter(
&config::backends::S3Store {
bucket: BUCKET_NAME.to_string(),
Expand All @@ -340,8 +340,9 @@ mod s3_store_tests {
Box::new(move |_delay| Duration::from_secs(0)),
)?;
let store_pin = Pin::new(&store);
let data_len = send_data.len();
store_pin
.update(digest, Box::new(Cursor::new(send_data.clone())))
.update(digest, Box::new(Cursor::new(send_data.clone())), data_len)
.await?;
}

Expand Down
12 changes: 6 additions & 6 deletions cas/store/tests/verify_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod verify_store_tests {

const VALUE1: &str = "123";
let digest = DigestInfo::try_new(&VALID_HASH1, 100).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1))).await;
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 3).await;
assert_eq!(
result,
Ok(()),
Expand Down Expand Up @@ -66,7 +66,7 @@ mod verify_store_tests {

const VALUE1: &str = "123";
let digest = DigestInfo::try_new(&VALID_HASH1, 100).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1))).await;
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 100).await;
assert!(result.is_err(), "Expected error, got: {:?}", &result);
const EXPECTED_ERR: &str = "Expected size 100 but got size 3 on insert";
let err = result.unwrap_err().to_string();
Expand Down Expand Up @@ -99,7 +99,7 @@ mod verify_store_tests {

const VALUE1: &str = "123";
let digest = DigestInfo::try_new(&VALID_HASH1, 3).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1))).await;
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE1)), 3).await;
assert_eq!(result, Ok(()), "Expected success, got: {:?}", result);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await?,
Expand All @@ -126,7 +126,7 @@ mod verify_store_tests {

let digest = DigestInfo::try_new(&VALID_HASH1, 6).unwrap();
let digest_clone = digest.clone();
let future = tokio::spawn(async move { Pin::new(&store_owned).update(digest_clone, Box::new(rx)).await });
let future = tokio::spawn(async move { Pin::new(&store_owned).update(digest_clone, Box::new(rx), 6).await });
tx.write_all("foo".as_bytes()).await?;
tx.flush().await?;
tx.write_all("bar".as_bytes()).await?;
Expand Down Expand Up @@ -158,7 +158,7 @@ mod verify_store_tests {
const HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(&HASH, 3).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE))).await;
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE)), 3).await;
assert_eq!(result, Ok(()), "Expected success, got: {:?}", result);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await?,
Expand All @@ -185,7 +185,7 @@ mod verify_store_tests {
const HASH: &str = "6b51d431df5d7f141cbececcf79edf3dd861c3b4069f0b11661a3eefacbba918";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(&HASH, 3).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE))).await;
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE)), 3).await;
let err = result.unwrap_err().to_string();
const ACTUAL_HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3";
let expected_err = format!("Hashes do not match, got: {} but digest hash was {}", HASH, ACTUAL_HASH);
Expand Down

0 comments on commit e8a83eb

Please sign in to comment.