Skip to content

Commit

Permalink
Retry Safe/Read-Only Requests on Timeout (#5278)
Browse files Browse the repository at this point in the history
* Retry safe requests on timeout

* Docs
  • Loading branch information
tustvold committed Jan 4, 2024
1 parent 5a67f1f commit 8fda518
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
21 changes: 17 additions & 4 deletions object_store/src/client/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use futures::future::BoxFuture;
use futures::FutureExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;
pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, Response<Body>> + Send>;

/// A mock server
pub struct MockServer {
Expand All @@ -46,9 +49,10 @@ impl MockServer {
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
let next = r.lock().pop_front();
async move {
Ok::<_, Infallible>(match r.lock().pop_front() {
Some(r) => r(req),
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
None => Response::new(Body::from("Hello World")),
})
}
Expand Down Expand Up @@ -93,7 +97,16 @@ impl MockServer {
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
self.responses.lock().push_back(Box::new(f))
let f = Box::new(|req| async move { f(req) }.boxed());
self.responses.lock().push_back(f)
}

pub fn push_async_fn<F, Fut>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Fut + Send + 'static,
Fut: Future<Output = Response<Body>> + Send + 'static,
{
self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
}

/// Shutdown the mock server
Expand Down
50 changes: 43 additions & 7 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ impl From<Error> for std::io::Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Contains the configuration for how to respond to server errors
/// The configuration for how to respond to request errors
///
/// By default they will be retried up to some limit, using exponential
/// The following categories of error will be retried:
///
/// * 5xx server errors
/// * Connection errors
/// * Dropped connections
/// * Timeouts for [safe] / read-only requests
///
/// Requests will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
Expand Down Expand Up @@ -173,13 +181,16 @@ impl RetryExt for reqwest::RequestBuilder {
let max_retries = config.max_retries;
let retry_timeout = config.retry_timeout;

let (client, req) = self.build_split();
let req = req.expect("request must be valid");

async move {
let mut retries = 0;
let now = Instant::now();

loop {
let s = self.try_clone().expect("request body must be cloneable");
match s.send().await {
let s = req.try_clone().expect("request body must be cloneable");
match client.execute(s).await {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
Expand Down Expand Up @@ -242,7 +253,9 @@ impl RetryExt for reqwest::RequestBuilder {
Err(e) =>
{
let mut do_retry = false;
if let Some(source) = e.source() {
if req.method().is_safe() && e.is_timeout() {
do_retry = true
} else if let Some(source) = e.source() {
if let Some(e) = source.downcast_ref::<hyper::Error>() {
if e.is_connect() || e.is_closed() || e.is_incomplete_message() {
do_retry = true;
Expand Down Expand Up @@ -294,7 +307,11 @@ mod tests {
retry_timeout: Duration::from_secs(1000),
};

let client = Client::new();
let client = Client::builder()
.timeout(Duration::from_millis(100))
.build()
.unwrap();

let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry);

// Simple request should work
Expand Down Expand Up @@ -419,7 +436,7 @@ mod tests {

let e = do_request().await.unwrap_err().to_string();
assert!(
e.contains("Error after 2 retries in") &&
e.contains("Error after 2 retries in") &&
e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status server error (502 Bad Gateway) for url"),
"{e}"
);
Expand All @@ -442,6 +459,25 @@ mod tests {
"{e}"
);

// Retries on client timeout
mock.push_async_fn(|_| async move {
tokio::time::sleep(Duration::from_secs(10)).await;
panic!()
});
do_request().await.unwrap();

// Does not retry PUT request
mock.push_async_fn(|_| async move {
tokio::time::sleep(Duration::from_secs(10)).await;
panic!()
});
let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
let e = res.await.unwrap_err().to_string();
assert!(
e.contains("Error after 0 retries in") && e.contains("operation timed out"),
"{e}"
);

// Shutdown
mock.shutdown().await
}
Expand Down

0 comments on commit 8fda518

Please sign in to comment.