-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(transport): Support timeouts with "grpc-timeout" header #606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
38a3074
83098f9
bc177d3
1d89316
5602072
3452d30
afaa51d
3c5d8c5
a7e8f61
19132a2
d1e153b
08cb752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
use integration_tests::pb::{test_client, test_server, Input, Output}; | ||
use std::{net::SocketAddr, time::Duration}; | ||
use tokio::net::TcpListener; | ||
use tonic::{transport::Server, Code, Request, Response, Status}; | ||
|
||
#[tokio::test] | ||
async fn cancelation_on_timeout() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_secs(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 500 ms | ||
.insert("grpc-timeout", "500m".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
|
||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
#[tokio::test] | ||
async fn picks_server_timeout_if_thats_sorter() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_millis(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 10 hours | ||
.insert("grpc-timeout", "10H".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
#[tokio::test] | ||
async fn picks_client_timeout_if_thats_sorter() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_secs(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 100 ms | ||
.insert("grpc-timeout", "100m".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
async fn run_service_in_background(latency: Duration, server_timeout: Duration) -> SocketAddr { | ||
struct Svc { | ||
latency: Duration, | ||
} | ||
|
||
#[tonic::async_trait] | ||
impl test_server::Test for Svc { | ||
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> { | ||
tokio::time::sleep(self.latency).await; | ||
Ok(Response::new(Output {})) | ||
} | ||
} | ||
|
||
let svc = test_server::TestServer::new(Svc { latency }); | ||
|
||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
let addr = listener.local_addr().unwrap(); | ||
|
||
tokio::spawn(async move { | ||
Server::builder() | ||
.timeout(server_timeout) | ||
.add_service(svc) | ||
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||
.await | ||
.unwrap(); | ||
}); | ||
|
||
addr | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
use crate::{body::BoxBody, Status}; | ||
use futures_util::ready; | ||
use http::Response; | ||
use pin_project::pin_project; | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
use tower::Service; | ||
|
||
/// Middleware that attempts to recover from service errors by turning them into a response built | ||
/// from the `Status`. | ||
#[derive(Debug, Clone)] | ||
pub(crate) struct RecoverError<S> { | ||
inner: S, | ||
} | ||
|
||
impl<S> RecoverError<S> { | ||
pub(crate) fn new(inner: S) -> Self { | ||
Self { inner } | ||
} | ||
} | ||
|
||
impl<S, R> Service<R> for RecoverError<S> | ||
where | ||
S: Service<R, Response = Response<BoxBody>>, | ||
S::Error: Into<crate::Error>, | ||
{ | ||
type Response = Response<BoxBody>; | ||
type Error = crate::Error; | ||
type Future = ResponseFuture<S::Future>; | ||
|
||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
self.inner.poll_ready(cx).map_err(Into::into) | ||
} | ||
|
||
fn call(&mut self, req: R) -> Self::Future { | ||
ResponseFuture { | ||
inner: self.inner.call(req), | ||
} | ||
} | ||
} | ||
|
||
#[pin_project] | ||
pub(crate) struct ResponseFuture<F> { | ||
#[pin] | ||
inner: F, | ||
} | ||
|
||
impl<F, E> Future for ResponseFuture<F> | ||
where | ||
F: Future<Output = Result<Response<BoxBody>, E>>, | ||
E: Into<crate::Error>, | ||
{ | ||
type Output = Result<Response<BoxBody>, crate::Error>; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let result: Result<Response<BoxBody>, crate::Error> = | ||
ready!(self.project().inner.poll(cx)).map_err(Into::into); | ||
|
||
match result { | ||
Ok(res) => Poll::Ready(Ok(res)), | ||
Err(err) => { | ||
if let Some(status) = Status::try_from_error(&*err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to document this actually since its quite unclear. What errors would be returned as a status and which ones would fail? I do think this is the right approach though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @seanmonstar what do you think here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any error that |
||
let mut res = Response::new(BoxBody::empty()); | ||
status.add_header(res.headers_mut()).unwrap(); | ||
Poll::Ready(Ok(res)) | ||
} else { | ||
Poll::Ready(Err(err)) | ||
} | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.