Skip to content

Commit

Permalink
Retry GrpcStore get_part_ref (#646)
Browse files Browse the repository at this point in the history
If a read request fails, it is retried in GrpcStore::read, however if a stream
breaks while it's being read from then it is not retried.  Therefore, break
the read retry logic from the get_part_ref retry logic to allow a read to
resume from where it left off.
  • Loading branch information
chrisstaite-menlo authored Jan 30, 2024
1 parent c8473ac commit d46180c
Showing 1 changed file with 102 additions and 58 deletions.
160 changes: 102 additions & 58 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,7 @@ impl GrpcStore {
.await
}

pub async fn read(
&self,
grpc_request: impl IntoRequest<ReadRequest>,
) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
error_if!(
matches!(self.store_type, nativelink_config::stores::StoreType::ac),
"CAS operation on AC store"
);

let mut request = grpc_request.into_request().into_inner();

fn get_read_request(&self, mut request: ReadRequest) -> Result<ReadRequest, Error> {
// `resource_name` pattern is: "{instance_name}/blobs/{hash}/{size}".
let first_slash_pos = request
.resource_name
Expand All @@ -267,25 +257,42 @@ impl GrpcStore {
self.instance_name,
request.resource_name.get((first_slash_pos + 1)..).unwrap()
);
Ok(request)
}

self.perform_request(request, |request| async move {
let mut response = self
.bytestream_client
.clone()
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")?
.into_inner();
let first_response = response
.message()
.await
.err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
Ok(FirstStream {
first_response: Some(first_response),
stream: response,
})
async fn read_internal(
&self,
request: ReadRequest,
) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
let mut response = self
.bytestream_client
.clone()
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")?
.into_inner();
let first_response = response
.message()
.await
.err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
Ok(FirstStream {
first_response: Some(first_response),
stream: response,
})
.await
}

pub async fn read(
&self,
grpc_request: impl IntoRequest<ReadRequest>,
) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
error_if!(
matches!(self.store_type, nativelink_config::stores::StoreType::ac),
"CAS operation on AC store"
);

let request = self.get_read_request(grpc_request.into_request().into_inner())?;
self.perform_request(request, |request| async move { self.read_internal(request).await })
.await
}

pub async fn write<T, E>(&self, stream: WriteRequestStreamWrapper<T, E>) -> Result<Response<WriteResponse>, Error>
Expand Down Expand Up @@ -659,38 +666,75 @@ impl Store for GrpcStore {
digest.size_bytes,
);

let mut stream = self
.read(Request::new(ReadRequest {
resource_name,
read_offset: offset as i64,
read_limit: length.unwrap_or(0) as i64,
}))
.await
.err_tip(|| "in GrpcStore::get_part()")?;

loop {
let Some(maybe_message) = stream.next().await else {
writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")?;
break; // EOF.
};
let message = maybe_message.err_tip(|| "While fetching message in GrpcStore::get_part()")?;
if message.data.is_empty() {
writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")?;
break; // EOF.
}
writer
.send(message.data)
.await
.err_tip(|| "While sending in GrpcStore::get_part()")?;
struct LocalState<'a> {
resource_name: String,
writer: &'a mut DropCloserWriteHalf,
read_offset: i64,
read_limit: i64,
}

Ok(())
let local_state = LocalState {
resource_name,
writer,
read_offset: offset as i64,
read_limit: length.unwrap_or(0) as i64,
};

let retry_config = self.get_retry_config();
self.retrier
.retry(
retry_config,
unfold(local_state, move |mut local_state| async move {
let request = ReadRequest {
resource_name: local_state.resource_name.clone(),
read_offset: local_state.read_offset,
read_limit: local_state.read_limit,
};
let mut stream = match self.read_internal(request).await.err_tip(|| "in GrpcStore::get_part()") {
Ok(stream) => stream,
Err(err) => return Some((RetryResult::Retry(err), local_state)),
};

loop {
let data = match stream.next().await {
// Create an empty response to represent EOF.
None => bytes::Bytes::new(),
Some(Ok(message)) => message.data,
Some(Err(status)) => {
return Some((
RetryResult::Retry(
Into::<Error>::into(status)
.append("While fetching message in GrpcStore::get_part()"),
),
local_state,
))
}
};
let length = data.len() as i64;
// This is the usual exit from the loop at EOF.
if length == 0 {
let eof_result = local_state
.writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")
.map_or_else(RetryResult::Err, RetryResult::Ok);
return Some((eof_result, local_state));
}
// Forward the data upstream.
if let Err(err) = local_state
.writer
.send(data)
.await
.err_tip(|| "While sending in GrpcStore::get_part()")
{
return Some((RetryResult::Err(err), local_state));
}
local_state.read_offset += length;
}
}),
)
.await
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
Expand Down

0 comments on commit d46180c

Please sign in to comment.