Skip to content

Commit

Permalink
fix(client): send an error back to client when dispatch misbehaves (f…
Browse files Browse the repository at this point in the history
…ixes #2649)
  • Loading branch information
nox authored and seanmonstar committed Oct 28, 2022
1 parent 78e2c58 commit 9fa3638
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
54 changes: 40 additions & 14 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand All @@ -97,7 +97,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand All @@ -124,7 +124,7 @@ impl<T, U> UnboundedSender<T, U> {
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand Down Expand Up @@ -198,33 +198,59 @@ impl<T, U> Drop for Envelope<T, U> {
}

pub(crate) enum Callback<T, U> {
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
}

impl<T, U> Drop for Callback<T, U> {
fn drop(&mut self) {
// FIXME(nox): What errors do we want here?
let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
"user code panicked"
} else {
"runtime dropped the dispatch task"
});

match self {
Callback::Retry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err((error, None)));
}
}
Callback::NoRetry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err(error));
}
}
}
}
}

impl<T, U> Callback<T, U> {
#[cfg(feature = "http2")]
pub(crate) fn is_canceled(&self) -> bool {
match *self {
Callback::Retry(ref tx) => tx.is_closed(),
Callback::NoRetry(ref tx) => tx.is_closed(),
Callback::Retry(Some(ref tx)) => tx.is_closed(),
Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
_ => unreachable!(),
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_closed(cx),
Callback::NoRetry(ref mut tx) => tx.poll_closed(cx),
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
_ => unreachable!(),
}
}

pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
Callback::Retry(ref mut tx) => {
let _ = tx.take().unwrap().send(val);
}
Callback::NoRetry(tx) => {
let _ = tx.send(val.map_err(|e| e.0));
Callback::NoRetry(ref mut tx) => {
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ pub(super) enum User {
#[cfg(feature = "server")]
WithoutShutdownNonHttp1,

/// The dispatch task is gone.
#[cfg(feature = "client")]
DispatchGone,

/// User aborted in an FFI callback.
#[cfg(feature = "ffi")]
AbortedByCallback,
Expand Down Expand Up @@ -387,6 +391,11 @@ impl Error {
Error::new_user(User::AbortedByCallback)
}

#[cfg(feature = "client")]
pub(super) fn new_user_dispatch_gone() -> Error {
Error::new(Kind::User(User::DispatchGone))
}

#[cfg(feature = "http2")]
pub(super) fn new_h2(cause: ::h2::Error) -> Error {
if cause.is_io() {
Expand Down Expand Up @@ -483,6 +492,8 @@ impl Error {
Kind::User(User::WithoutShutdownNonHttp1) => {
"without_shutdown() called on a non-HTTP/1 connection"
}
#[cfg(feature = "client")]
Kind::User(User::DispatchGone) => "dispatch task is gone",
#[cfg(feature = "ffi")]
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
}
Expand Down
38 changes: 38 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3114,6 +3114,44 @@ mod conn {
done_tx.send(()).unwrap();
}

#[tokio::test]
async fn test_body_panics() {
use hyper::body::HttpBody;

let _ = pretty_env_logger::try_init();

let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
drain_til_eof(sock).await.expect("server read");
});

let io = tcp_connect(&addr).await.expect("tcp connect");

let (mut client, conn) = conn::Builder::new().handshake(io).await.expect("handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

let req = Request::post("/a")
.body(Body::from("baguette").map_data::<_, &[u8]>(|_| panic!("oopsie")))
.unwrap();

let error = client.send_request(req).await.unwrap_err();

assert!(error.is_user());
assert_eq!(
error.to_string(),
"dispatch task is gone: user code panicked"
);
}

async fn drain_til_eof<T: AsyncRead + Unpin>(mut sock: T) -> io::Result<()> {
let mut buf = [0u8; 1024];
loop {
Expand Down

0 comments on commit 9fa3638

Please sign in to comment.