Skip to content

Commit

Permalink
Add retry to GrpcStore.
Browse files Browse the repository at this point in the history
The GrpcStore does not currently perform any retrying on failure.  This
is contrary to the S3Store that does.  This causes issues when the upstream
is under heavy load.

Implement retry in the same manner as the S3Store in the GrpcStore.  No
retry is implemented for write, the same as the S3Store, as this would
require buffering of the file data which is not easily possible.
  • Loading branch information
chrisstaite-menlo committed Jul 19, 2023
1 parent 5f5b2c4 commit 259224b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 41 deletions.
2 changes: 2 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ rust_library(
"//util:buf_channel",
"//util:common",
"//util:error",
"//util:retry",
"//util:write_request_stream_wrapper",
"@crate_index//:bytes",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:prost",
"@crate_index//:rand",
"@crate_index//:shellexpand",
"@crate_index//:tokio",
"@crate_index//:tonic",
Expand Down
162 changes: 121 additions & 41 deletions cas/store/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::Send;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bytes::BytesMut;
use futures::stream::{unfold, FuturesUnordered};
use futures::{future, Stream, TryStreamExt};
use futures::{future, Future, Stream, TryStreamExt};
use prost::Message;
use rand::{rngs::OsRng, Rng};
use tokio::time::sleep;
use tonic::{transport, IntoRequest, Request, Response, Streaming};
use uuid::Uuid;

Expand All @@ -38,6 +42,7 @@ use proto::google::bytestream::{
byte_stream_client::ByteStreamClient, QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse,
WriteRequest, WriteResponse,
};
use retry::{ExponentialBackoff, Retrier, RetryResult};
use traits::{StoreTrait, UploadSizeInfo};
use write_request_stream_wrapper::WriteRequestStreamWrapper;

Expand All @@ -50,10 +55,32 @@ pub struct GrpcStore {
bytestream_client: ByteStreamClient<transport::Channel>,
ac_client: ActionCacheClient<transport::Channel>,
store_type: config::stores::StoreType,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
retry: config::stores::Retry,
retrier: Retrier,
}

impl GrpcStore {
pub async fn new(config: &config::stores::GrpcStore) -> Result<Self, Error> {
let jitter_amt = config.retry.jitter;
Self::new_with_jitter(
config,
Box::new(move |delay: Duration| {
if jitter_amt == 0. {
return delay;
}
let min = 1. - (jitter_amt / 2.);
let max = 1. + (jitter_amt / 2.);
delay.mul_f32(OsRng.gen_range(min..max))
}),
)
.await
}

pub async fn new_with_jitter(
config: &config::stores::GrpcStore,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
error_if!(config.endpoints.is_empty(), "Expected at least 1 endpoint in GrpcStore");
let mut endpoints = Vec::with_capacity(config.endpoints.len());
for endpoint in &config.endpoints {
Expand All @@ -77,9 +104,38 @@ impl GrpcStore {
bytestream_client: ByteStreamClient::new(conn.clone()),
ac_client: ActionCacheClient::new(conn),
store_type: config.store_type,
jitter_fn,
retry: config.retry.to_owned(),
retrier: Retrier::new(Box::new(|duration| Box::pin(sleep(duration)))),
})
}

async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
where
F: FnMut(I) -> Fut + Send + Copy,
Fut: Future<Output = Result<R, Error>> + Send,
R: Send,
I: Send + Clone,
{
let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
.map(|d| (self.jitter_fn)(d))
.take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1.
self.retrier
.retry(
retry_config,
unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}),
)
.await
}

pub async fn find_missing_blobs(
&self,
grpc_request: Request<FindMissingBlobsRequest>,
Expand All @@ -91,11 +147,14 @@ impl GrpcStore {

let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.cas_client.clone();
client
.find_missing_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::find_missing_blobs")
self.perform_request(request, |request| async move {
self.cas_client
.clone()
.find_missing_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::find_missing_blobs")
})
.await
}

pub async fn batch_update_blobs(
Expand All @@ -109,11 +168,14 @@ impl GrpcStore {

let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.cas_client.clone();
client
.batch_update_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::batch_update_blobs")
self.perform_request(request, |request| async move {
self.cas_client
.clone()
.batch_update_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::batch_update_blobs")
})
.await
}

pub async fn batch_read_blobs(
Expand All @@ -127,11 +189,14 @@ impl GrpcStore {

let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.cas_client.clone();
client
.batch_read_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::batch_read_blobs")
self.perform_request(request, |request| async move {
self.cas_client
.clone()
.batch_read_blobs(Request::new(request))
.await
.err_tip(|| "in GrpcStore::batch_read_blobs")
})
.await
}

pub async fn get_tree(
Expand All @@ -145,11 +210,14 @@ impl GrpcStore {

let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.cas_client.clone();
client
.get_tree(Request::new(request))
.await
.err_tip(|| "in GrpcStore::get_tree")
self.perform_request(request, |request| async move {
self.cas_client
.clone()
.get_tree(Request::new(request))
.await
.err_tip(|| "in GrpcStore::get_tree")
})
.await
}

pub async fn read(
Expand All @@ -174,11 +242,14 @@ impl GrpcStore {
request.resource_name.get((first_slash_pos + 1)..).unwrap()
);

let mut client = self.bytestream_client.clone();
client
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")
self.perform_request(request, |request| async move {
self.bytestream_client
.clone()
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")
})
.await
}

pub async fn write<T, E>(&self, stream: WriteRequestStreamWrapper<T, E>) -> Result<Response<WriteResponse>, Error>
Expand Down Expand Up @@ -259,11 +330,14 @@ impl GrpcStore {
request.resource_name.get((first_slash_pos + 1)..).unwrap()
);

let mut client = self.bytestream_client.clone();
client
.query_write_status(Request::new(request))
.await
.err_tip(|| "in GrpcStore::query_write_status")
self.perform_request(request, |request| async move {
self.bytestream_client
.clone()
.query_write_status(Request::new(request))
.await
.err_tip(|| "in GrpcStore::query_write_status")
})
.await
}

pub async fn get_action_result(
Expand All @@ -272,11 +346,14 @@ impl GrpcStore {
) -> Result<Response<ActionResult>, Error> {
let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.ac_client.clone();
client
.get_action_result(Request::new(request))
.await
.err_tip(|| "in GrpcStore::get_action_result")
self.perform_request(request, |request| async move {
self.ac_client
.clone()
.get_action_result(Request::new(request))
.await
.err_tip(|| "in GrpcStore::get_action_result")
})
.await
}

pub async fn update_action_result(
Expand All @@ -285,11 +362,14 @@ impl GrpcStore {
) -> Result<Response<ActionResult>, Error> {
let mut request = grpc_request.into_inner();
request.instance_name = self.instance_name.clone();
let mut client = self.ac_client.clone();
client
.update_action_result(Request::new(request))
.await
.err_tip(|| "in GrpcStore::update_action_result")
self.perform_request(request, |request| async move {
self.ac_client
.clone()
.update_action_result(Request::new(request))
.await
.err_tip(|| "in GrpcStore::update_action_result")
})
.await
}

async fn get_action_result_from_digest(&self, digest: DigestInfo) -> Result<Response<ActionResult>, Error> {
Expand Down
4 changes: 4 additions & 0 deletions config/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ pub struct GrpcStore {

/// The type of the upstream store, this ensures that the correct server calls are made.
pub store_type: StoreType,

/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,
}

/// Retry configuration. This configuration is exponential and each iteration
Expand Down

0 comments on commit 259224b

Please sign in to comment.