Skip to content

Commit

Permalink
Add retry support to get_part in s3_store
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Nov 7, 2021
1 parent d6cd4f9 commit ea2fc4c
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 90 deletions.
3 changes: 3 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ rust_library(
"//util:common",
"//util:error",
"//util:retry",
"//util:write_counter",
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
Expand Down Expand Up @@ -148,6 +149,8 @@ rust_test(
"//third_party:rusoto_mock",
"//third_party:rusoto_s3",
"//third_party:tokio",
"//third_party:tokio_util",
"//util:async_fixed_buffer",
"//util:common",
"//util:error",
":s3_store",
Expand Down
202 changes: 134 additions & 68 deletions cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use config;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use retry::{ExponentialBackoff, Retrier, RetryResult};
use traits::{ResultFuture, StoreTrait, UploadSizeInfo};
use write_counter::WriteCounter;

// S3 parts cannot be smaller than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
Expand All @@ -38,6 +39,35 @@ const DEFAULT_BUFFER_POOL_SIZE: usize = 50;

type ReaderType = Box<dyn AsyncRead + Send + Unpin + Sync + 'static>;

fn should_retry<T, E>(result: Result<T, RusotoError<E>>) -> RetryResult<T>
where
RusotoError<E>: std::fmt::Display,
{
match result {
// Object found in S3.
Ok(v) => RetryResult::Ok(v), // Success

// Timeout-like errors. This can retry.
Err(RusotoError::HttpDispatch(e)) => RetryResult::Retry(make_err!(Code::Unavailable, "{}", e.to_string())),

// HTTP-level errors. Sometimes can retry.
Err(RusotoError::Unknown(e)) => match e.status {
StatusCode::NOT_FOUND => RetryResult::Err(make_err!(Code::NotFound, "{}", e.status.to_string())),
StatusCode::INTERNAL_SERVER_ERROR => {
RetryResult::Retry(make_err!(Code::Unavailable, "{}", e.status.to_string()))
}
StatusCode::SERVICE_UNAVAILABLE => {
RetryResult::Retry(make_err!(Code::Unavailable, "{}", e.status.to_string()))
}
StatusCode::CONFLICT => RetryResult::Retry(make_err!(Code::Unavailable, "{}", e.status.to_string())),
other_err => RetryResult::Err(make_err!(Code::Unavailable, "{}", other_err.to_string())),
},

// Other errors (eg. Validation, Parse, Credentials). Never retry.
Err(other_err) => RetryResult::Err(make_err!(Code::Unavailable, "{}", other_err.to_string())),
}
}

pub struct S3Store {
s3_client: Arc<S3Client>,
bucket: String,
Expand All @@ -60,6 +90,9 @@ impl S3Store {
.map_err(|e| make_input_err!("{}", e.to_string()))?,
),
Box::new(move |delay: Duration| {
if jitter_amt == 0. {
return delay;
}
let min = 1. - (jitter_amt / 2.);
let max = 1. + (jitter_amt / 2.);
delay.mul_f32(OsRng.gen_range(min..max))
Expand Down Expand Up @@ -117,51 +150,29 @@ impl StoreTrait for S3Store {
..Default::default()
};

let r = self.s3_client.head_object(head_req).await;
let (should_retry, err) = match r {
// Object found in S3.
Ok(_) => return Some((RetryResult::Ok(true), state)),
let result = self.s3_client.head_object(head_req).await;
if let Err(RusotoError::Service(HeadObjectError::NoSuchKey(_))) = &result {
return Some((RetryResult::Ok(false), state));
}

// Object not found in S3.
Err(RusotoError::Service(HeadObjectError::NoSuchKey(_))) => {
return Some((RetryResult::Ok(false), state))
match should_retry(result) {
RetryResult::Ok(_) => Some((RetryResult::Ok(true), state)),
RetryResult::Err(err) => {
if err.code == Code::NotFound {
return Some((RetryResult::Ok(false), state));
}
Some((RetryResult::Err(err), state))
}

// Timeout-like errors. This can retry.
Err(RusotoError::HttpDispatch(e)) => (true, e.to_string()),

// HTTP-level errors. Sometimes can retry.
Err(RusotoError::Unknown(e)) => match e.status {
StatusCode::NOT_FOUND => return Some((RetryResult::Ok(false), state)),
StatusCode::INTERNAL_SERVER_ERROR => (true, e.status.to_string()),
StatusCode::SERVICE_UNAVAILABLE => (true, e.status.to_string()),
StatusCode::CONFLICT => (true, e.status.to_string()),
other_err => (false, other_err.to_string()),
},

// Other errors (eg. Validation, Parse, Credentials). Never retry.
Err(other_err) => (false, other_err.to_string()),
};

if should_retry {
return Some((
RetryResult::Retry(err) => Some((
RetryResult::Retry(make_err!(
Code::Unavailable,
"Error attempting to load s3 result, retried {} times. Error: {}",
self.retry.max_retries + 1,
err,
)),
state,
));
}
Some((
RetryResult::Err(make_err!(
Code::Unavailable,
"Error attempting to load s3 result. This is not a retryable error: {}",
err
)),
state,
))
}
}),
)
.await
Expand Down Expand Up @@ -324,7 +335,7 @@ impl StoreTrait for S3Store {
fn get_part<'a>(
self: Pin<&'a Self>,
digest: DigestInfo,
mut writer: &'a mut (dyn AsyncWrite + Send + Unpin + Sync),
writer: &'a mut (dyn AsyncWrite + Send + Unpin + Sync),
offset: usize,
length: Option<usize>,
) -> ResultFuture<'a, ()> {
Expand All @@ -333,40 +344,95 @@ impl StoreTrait for S3Store {
let end_read_byte = length
.map_or(Some(None), |length| Some(offset.checked_add(length)))
.err_tip(|| "Integer overflow protection triggered")?;
let get_req = GetObjectRequest {
bucket: self.bucket.to_owned(),
key: s3_path.to_owned(),
range: Some(format!(
"bytes={}-{}",
offset,
end_read_byte.map_or_else(|| "".to_string(), |v| v.to_string())
)),
..Default::default()
};

let get_object_output = self.s3_client.get_object(get_req).await.map_err(|e| match e {
RusotoError::Service(GetObjectError::NoSuchKey(err)) => {
return make_err!(Code::NotFound, "File not found in S3: {:?}", err)
}
_ => make_err!(Code::Unknown, "Error reading from S3: {:?}", e),
})?;
let s3_in_stream = get_object_output
.body
.err_tip(|| "Expected body to be set in s3 get request")?;
let mut s3_reader = s3_in_stream.into_async_read();
// TODO(blaise.bruer) We do get a size return from this function, but it is unlikely
// we can validate it is the proper size (other than if it was too large). Maybe we
// could make a debug log or something to print out data in the case it does not match
// expectations of some kind?
tokio::io::copy(&mut s3_reader, &mut writer)
.await
.err_tip(|| "Failed to download from s3")?;
let _ = writer.write(&[]).await; // At this point we really only care about if shutdown() fails.
writer
.shutdown()
let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
.map(|d| (self.jitter_fn)(d))
.take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1.

self.retrier
.retry(
retry_config,
unfold(WriteCounter::new(writer), move |mut write_counter| async move {
let result = self
.s3_client
.get_object(GetObjectRequest {
bucket: self.bucket.to_owned(),
key: s3_path.to_owned(),
range: Some(format!(
"bytes={}-{}",
offset + write_counter.get_bytes_written() as usize,
end_read_byte.map_or_else(|| "".to_string(), |v| v.to_string())
)),
..Default::default()
})
.await;

if let Err(RusotoError::Service(GetObjectError::NoSuchKey(err))) = &result {
return Some((
RetryResult::Err(make_err!(Code::NotFound, "File not found in S3: {:?}", err)),
write_counter,
));
}

let s3_in_stream = match should_retry(result) {
RetryResult::Ok(get_object_output) => {
let body_result = get_object_output
.body
.err_tip(|| "Expected body to be set in s3 get request");
match body_result {
Ok(s3_in_stream) => s3_in_stream,
Err(err) => {
return Some((
RetryResult::Err(make_err!(
Code::Unavailable,
"Error attempting to get s3 result. This is not a retryable error: {}",
err
)),
write_counter,
));
}
}
}
RetryResult::Err(err) => {
return Some((RetryResult::Err(err), write_counter));
}
RetryResult::Retry(err) => {
return Some((
RetryResult::Retry(make_err!(
Code::Unavailable,
"Error attempting to get s3 result, retried {} times. Error: {}",
self.retry.max_retries + 1,
err,
)),
write_counter,
));
}
};

// Copy data from s3 input stream to the writer stream.
let result = tokio::io::copy(&mut s3_in_stream.into_async_read(), &mut write_counter).await;

// We may want to retry, but only if the pipe was broken.
if let Err(e) = result {
if !write_counter.did_fail() && e.kind() == std::io::ErrorKind::BrokenPipe {
return Some((RetryResult::Retry(e.into()), write_counter));
}
return Some((RetryResult::Err(e.into()), write_counter));
}

let _ = write_counter.write(&[]).await; // At this point we really only care about if shutdown() fails.
let shutdown_result = write_counter
.shutdown()
.await
.err_tip(|| "Failed to shutdown write stream in S3Store::get_part");
if let Err(e) = shutdown_result {
return Some((RetryResult::Err(make_err!(Code::Unavailable, "{}", e)), write_counter));
}

Some((RetryResult::Ok(()), write_counter))
}),
)
.await
.err_tip(|| "Failed to shutdown write stream in S3Store::get_part")?;
Ok(())
})
}
}
Loading

0 comments on commit ea2fc4c

Please sign in to comment.