diff --git a/object_store/src/client/mock_server.rs b/object_store/src/client/mock_server.rs index 36c6b650c03..70b856186d7 100644 --- a/object_store/src/client/mock_server.rs +++ b/object_store/src/client/mock_server.rs @@ -15,17 +15,20 @@ // specific language governing permissions and limitations // under the License. +use futures::future::BoxFuture; +use futures::FutureExt; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use parking_lot::Mutex; use std::collections::VecDeque; use std::convert::Infallible; +use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::oneshot; use tokio::task::JoinHandle; -pub type ResponseFn = Box) -> Response + Send>; +pub type ResponseFn = Box) -> BoxFuture<'static, Response> + Send>; /// A mock server pub struct MockServer { @@ -46,9 +49,10 @@ impl MockServer { async move { Ok::<_, Infallible>(service_fn(move |req| { let r = Arc::clone(&r); + let next = r.lock().pop_front(); async move { - Ok::<_, Infallible>(match r.lock().pop_front() { - Some(r) => r(req), + Ok::<_, Infallible>(match next { + Some(r) => r(req).await, None => Response::new(Body::from("Hello World")), }) } @@ -93,7 +97,16 @@ impl MockServer { where F: FnOnce(Request) -> Response + Send + 'static, { - self.responses.lock().push_back(Box::new(f)) + let f = Box::new(|req| async move { f(req) }.boxed()); + self.responses.lock().push_back(f) + } + + pub fn push_async_fn(&self, f: F) + where + F: FnOnce(Request) -> Fut + Send + 'static, + Fut: Future> + Send + 'static, + { + self.responses.lock().push_back(Box::new(|r| f(r).boxed())) } /// Shutdown the mock server diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 08b9a74e17c..9d21867d8a4 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -119,11 +119,19 @@ impl From for std::io::Error { pub type Result = std::result::Result; -/// Contains the configuration for how to respond to server errors +/// The configuration for how to respond to request errors /// -/// By default they will be retried up to some limit, using exponential +/// The following categories of error will be retried: +/// +/// * 5xx server errors +/// * Connection errors +/// * Dropped connections +/// * Timeouts for [safe] / read-only requests +/// +/// Requests will be retried up to some limit, using exponential /// backoff with jitter. See [`BackoffConfig`] for more information /// +/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1 #[derive(Debug, Clone)] pub struct RetryConfig { /// The backoff configuration @@ -173,13 +181,16 @@ impl RetryExt for reqwest::RequestBuilder { let max_retries = config.max_retries; let retry_timeout = config.retry_timeout; + let (client, req) = self.build_split(); + let req = req.expect("request must be valid"); + async move { let mut retries = 0; let now = Instant::now(); loop { - let s = self.try_clone().expect("request body must be cloneable"); - match s.send().await { + let s = req.try_clone().expect("request body must be cloneable"); + match client.execute(s).await { Ok(r) => match r.error_for_status_ref() { Ok(_) if r.status().is_success() => return Ok(r), Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { @@ -242,7 +253,9 @@ impl RetryExt for reqwest::RequestBuilder { Err(e) => { let mut do_retry = false; - if let Some(source) = e.source() { + if req.method().is_safe() && e.is_timeout() { + do_retry = true + } else if let Some(source) = e.source() { if let Some(e) = source.downcast_ref::() { if e.is_connect() || e.is_closed() || e.is_incomplete_message() { do_retry = true; @@ -294,7 +307,11 @@ mod tests { retry_timeout: Duration::from_secs(1000), }; - let client = Client::new(); + let client = Client::builder() + .timeout(Duration::from_millis(100)) + .build() + .unwrap(); + let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry); // Simple request should work @@ -419,7 +436,7 @@ mod tests { let e = do_request().await.unwrap_err().to_string(); assert!( - e.contains("Error after 2 retries in") && + e.contains("Error after 2 retries in") && e.contains("max_retries:2, retry_timeout:1000s, source:HTTP status server error (502 Bad Gateway) for url"), "{e}" ); @@ -442,6 +459,25 @@ mod tests { "{e}" ); + // Retries on client timeout + mock.push_async_fn(|_| async move { + tokio::time::sleep(Duration::from_secs(10)).await; + panic!() + }); + do_request().await.unwrap(); + + // Does not retry PUT request + mock.push_async_fn(|_| async move { + tokio::time::sleep(Duration::from_secs(10)).await; + panic!() + }); + let res = client.request(Method::PUT, mock.url()).send_retry(&retry); + let e = res.await.unwrap_err().to_string(); + assert!( + e.contains("Error after 0 retries in") && e.contains("operation timed out"), + "{e}" + ); + // Shutdown mock.shutdown().await }