Skip to content

Commit

Permalink
Move to UnorderedFutures in cas_store.
Browse files Browse the repository at this point in the history
Taking inspiration from the work in CacheLookupScheduler, modify the
CasServer to use UnorderedFutures to perform batch operations.
  • Loading branch information
chrisstaite-menlo committed Jul 12, 2023
1 parent e85faed commit 5dc31e2
Showing 1 changed file with 92 additions and 107 deletions.
199 changes: 92 additions & 107 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use futures::{stream::Stream, FutureExt, StreamExt};
use futures::{stream::FuturesUnordered, stream::Stream, StreamExt, TryStreamExt};
use proto::google::rpc::Status as GrpcStatus;
use tonic::{Request, Response, Status};

Expand All @@ -31,7 +31,7 @@ use proto::build::bazel::remote::execution::v2::{
batch_read_blobs_response, batch_update_blobs_response,
content_addressable_storage_server::ContentAddressableStorage,
content_addressable_storage_server::ContentAddressableStorageServer as Server, BatchReadBlobsRequest,
BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, FindMissingBlobsRequest,
BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse, Digest, FindMissingBlobsRequest,
FindMissingBlobsResponse, GetTreeRequest, GetTreeResponse,
};
use store::{Store, StoreManager};
Expand Down Expand Up @@ -78,41 +78,34 @@ impl CasServer {
return grpc_store.find_missing_blobs(Request::new(inner_request)).await;
}

let mut futures = futures::stream::FuturesOrdered::new();
for digest in inner_request.blob_digests.into_iter() {
let digest: DigestInfo = digest.try_into()?;
let store_clone = store.clone();
futures.push_back(tokio::spawn(async move {
let store = Pin::new(store_clone.as_ref());
store.has(digest.clone()).await.map_or_else(
|e| {
let store_pin = Pin::new(store.as_ref());
let check_futures: FuturesUnordered<_> = inner_request
.blob_digests
.into_iter()
.map(|digest| async move {
let digest_info = match DigestInfo::try_from(digest.clone()) {
Ok(digest_info) => digest_info,
Err(err) => return Some(Err(err)),
};
match store_pin.has(digest_info.clone()).await {
Ok(maybe_size) => maybe_size.map_or(Some(Ok(digest)), |_| None),
Err(err) => {
log::error!(
"Error during .has() call in .find_missing_blobs() : {:?} - {}",
e,
digest.str()
err,
digest_info.str()
);
Some(digest.clone())
},
|maybe_sz| {
if maybe_sz.is_some() {
None
} else {
Some(digest.clone())
}
},
)
}));
}
let mut responses = Vec::with_capacity(futures.len());
while let Some(result) = futures.next().await {
let val = result.err_tip(|| "Internal error joining future")?;
if let Some(digest) = val {
responses.push(digest.into());
}
}
Ok(Response::new(FindMissingBlobsResponse {
missing_blob_digests: responses,
}))
Some(Ok(digest))
}
}
})
.collect();
let missing_blob_digests = check_futures
.filter_map(|result| async move { result })
.try_collect::<Vec<Digest>>()
.await?;

Ok(Response::new(FindMissingBlobsResponse { missing_blob_digests }))
}

async fn inner_batch_update_blobs(
Expand All @@ -135,37 +128,36 @@ impl CasServer {
return grpc_store.batch_update_blobs(Request::new(inner_request)).await;
}

let mut futures = futures::stream::FuturesOrdered::new();
for request in inner_request.requests {
let store_clone = store.clone();
let digest: DigestInfo = request.digest.err_tip(|| "Digest not found in request")?.try_into()?;
let digest_copy = digest.clone();
let request_data = request.data;
futures.push_back(tokio::spawn(
async move {
let size_bytes = usize::try_from(digest_copy.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
error_if!(
size_bytes != request_data.len(),
"Digest for upload had mismatching sizes, digest said {} data said {}",
size_bytes,
request_data.len()
);
Pin::new(store_clone.as_ref())
.update_oneshot(digest_copy, request_data)
.await
.err_tip(|| "Error writing to store")
}
.map(|result| batch_update_blobs_response::Response {
digest: Some(digest.into()),
let store_pin = Pin::new(store.as_ref());
let update_futures: FuturesUnordered<_> = inner_request
.requests
.into_iter()
.map(|request| async move {
let digest = request.digest.clone().err_tip(|| "Digest not found in request")?;
let request_data = request.data;
let digest_info = DigestInfo::try_from(digest.clone())?;
let size_bytes = usize::try_from(digest_info.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
error_if!(
size_bytes != request_data.len(),
"Digest for upload had mismatching sizes, digest said {} data said {}",
size_bytes,
request_data.len()
);
let result = store_pin
.update_oneshot(digest_info, request_data)
.await
.err_tip(|| "Error writing to store");
Ok::<_, Error>(batch_update_blobs_response::Response {
digest: Some(digest),
status: Some(result.map_or_else(|e| e.into(), |_| GrpcStatus::default())),
}),
));
}
let mut responses = Vec::with_capacity(futures.len());
while let Some(result) = futures.next().await {
responses.push(result.err_tip(|| "Internal error joining future")?);
}
})
})
.collect();
let responses = update_futures
.try_collect::<Vec<batch_update_blobs_response::Response>>()
.await?;

Ok(Response::new(BatchUpdateBlobsResponse { responses }))
}

Expand All @@ -189,49 +181,42 @@ impl CasServer {
return grpc_store.batch_read_blobs(Request::new(inner_request)).await;
}

let mut futures = futures::stream::FuturesOrdered::new();
for digest in inner_request.digests {
let digest: DigestInfo = digest.try_into()?;
let digest_copy = digest.clone();
let store_clone = store.clone();

futures.push_back(tokio::spawn(
async move {
let size_bytes = usize::try_from(digest_copy.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let store_data = Pin::new(store_clone.as_ref())
.get_part_unchunked(digest_copy, 0, None, Some(size_bytes))
.await
.err_tip(|| "Error reading from store")?;
Ok(store_data)
}
.map(|result: Result<Bytes, Error>| {
let (status, data) = result.map_or_else(
|mut e| {
if e.code == Code::NotFound {
// Trim the error code. Not Found is quite common and we don't want to send a large
// error (debug) message for something that is common. We resize to just the last
// message as it will be the most relevant.
e.messages.resize_with(1, || "".to_string());
}
(e.into(), Bytes::new())
},
|v| (GrpcStatus::default(), v),
);
batch_read_blobs_response::Response {
status: Some(status),
digest: Some(digest.into()),
data,
}
}),
));
}
let store_pin = Pin::new(store.as_ref());
let read_futures: FuturesUnordered<_> = inner_request
.digests
.into_iter()
.map(|digest| async move {
let digest_copy = DigestInfo::try_from(digest.clone())?;
let size_bytes = usize::try_from(digest_copy.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
// TODO(allada) There is a security risk here of someone taking all the memory on the instance.
let result = store_pin
.get_part_unchunked(digest_copy, 0, None, Some(size_bytes))
.await
.err_tip(|| "Error reading from store");
let (status, data) = result.map_or_else(
|mut e| {
if e.code == Code::NotFound {
// Trim the error code. Not Found is quite common and we don't want to send a large
// error (debug) message for something that is common. We resize to just the last
// message as it will be the most relevant.
e.messages.resize_with(1, || "".to_string());
}
(e.into(), Bytes::new())
},
|v| (GrpcStatus::default(), v),
);
Ok::<_, Error>(batch_read_blobs_response::Response {
status: Some(status),
digest: Some(digest),
data,
})
})
.collect();
let responses = read_futures
.try_collect::<Vec<batch_read_blobs_response::Response>>()
.await?;

let mut responses = Vec::with_capacity(futures.len());
while let Some(result) = futures.next().await {
responses.push(result.err_tip(|| "Internal error joining future")?);
}
Ok(Response::new(BatchReadBlobsResponse { responses }))
}

Expand Down

0 comments on commit 5dc31e2

Please sign in to comment.