From 4947b076f5b0b5149ee7f6144515535b85f65db5 Mon Sep 17 00:00:00 2001 From: David Freese Date: Wed, 20 Oct 2021 08:05:27 -0700 Subject: [PATCH] fix(transport): Correctly map hyper errors (#629) Co-authored-by: Lucio Franco --- tests/integration_tests/tests/connection.rs | 9 +++-- tonic/src/status.rs | 37 ++++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/tests/connection.rs b/tests/integration_tests/tests/connection.rs index f27ef09d1..b763c32cf 100644 --- a/tests/integration_tests/tests/connection.rs +++ b/tests/integration_tests/tests/connection.rs @@ -5,7 +5,7 @@ use std::time::Duration; use tokio::sync::oneshot; use tonic::{ transport::{Endpoint, Server}, - Request, Response, Status, + Code, Request, Response, Status, }; struct Svc(Arc>>>); @@ -52,7 +52,8 @@ async fn connect_returns_err_via_call_after_connected() { let res = client.unary_call(Request::new(Input {})).await; - assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.code(), Code::Unavailable); jh.await.unwrap(); } @@ -84,7 +85,9 @@ async fn connect_lazy_reconnects_after_first_failure() { // The server shut down, third call should fail tokio::time::sleep(Duration::from_millis(100)).await; - client.unary_call(Request::new(Input {})).await.unwrap_err(); + let err = client.unary_call(Request::new(Input {})).await.unwrap_err(); + + assert_eq!(err.code(), Code::Unavailable); jh.await.unwrap(); } diff --git a/tonic/src/status.rs b/tonic/src/status.rs index e4e1b32a2..ee560c9f3 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -330,7 +330,8 @@ impl Status { Err(err) => err, }; - if let Some(status) = find_status_in_source_chain(&*err) { + if let Some(mut status) = find_status_in_source_chain(&*err) { + status.source = Some(err); return Ok(status); } @@ -377,6 +378,31 @@ impl Status { reason.into() } + /// Handles hyper errors specifically, which expose a number of different parameters about the + /// http stream's error: https://docs.rs/hyper/0.14.11/hyper/struct.Error.html. + /// + /// Returns Some if there's a way to handle the error, or None if the information from this + /// hyper error, but perhaps not its source, should be ignored. + #[cfg(feature = "transport")] + fn from_hyper_error(err: &hyper::Error) -> Option { + // is_timeout results from hyper's keep-alive logic + // (https://docs.rs/hyper/0.14.11/src/hyper/error.rs.html#192-194). Per the grpc spec + // > An expired client initiated PING will cause all calls to be closed with an UNAVAILABLE + // > status. Note that the frequency of PINGs is highly dependent on the network + // > environment, implementations are free to adjust PING frequency based on network and + // > application requirements, which is why it's mapped to unavailable here. + // + // Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is + // consistent with the behavior of a C++ gRPC client when the server is not running, and + // matches the spec of: + // > The service is currently unavailable. This is most likely a transient condition that + // > can be corrected if retried with a backoff. + if err.is_timeout() || err.is_connect() { + return Some(Status::unavailable(err.to_string())); + } + None + } + pub(crate) fn map_error(err: E) -> Status where E: Into>, @@ -556,11 +582,20 @@ fn find_status_in_source_chain(err: &(dyn Error + 'static)) -> Option { return Some(Status::cancelled(timeout.to_string())); } + #[cfg(feature = "transport")] + if let Some(hyper) = err + .downcast_ref::() + .and_then(Status::from_hyper_error) + { + return Some(hyper); + } + source = err.source(); } None } + impl fmt::Debug for Status { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // A manual impl to reduce the noise of frequently empty fields.