From 194e6f984763f5dc1c376082170a85cc4db40ce4 Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Wed, 26 Jul 2023 21:47:06 +0200 Subject: [PATCH] fix(client): early server response shouldn't propagate NO_ERROR (#3275) Closes #2872 --- src/body/incoming.rs | 11 ++++++++- src/proto/mod.rs | 2 +- tests/client.rs | 58 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index c8f3b06770..cdebd3db58 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -201,7 +201,16 @@ impl Body for Incoming { ping.record_data(bytes.len()); return Poll::Ready(Some(Ok(Frame::data(bytes)))); } - Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))), + Some(Err(e)) => { + return match e.reason() { + // These reasons should cause the body reading to stop, but not fail it. + // The same logic as for `Read for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { + Poll::Ready(None) + } + _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + }; + } None => { *data_done = true; // fall through to trailers diff --git a/src/proto/mod.rs b/src/proto/mod.rs index f938bf532b..3628576dc1 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -50,7 +50,7 @@ pub(crate) enum BodyLength { Unknown, } -/// Status of when a Disaptcher future completes. +/// Status of when a Dispatcher future completes. pub(crate) enum Dispatched { /// Dispatcher completely shutdown connection. Shutdown, diff --git a/tests/client.rs b/tests/client.rs index ef80596c01..3d46c3fbfa 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1338,7 +1338,7 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; - use http_body_util::{BodyExt, Empty, StreamBody}; + use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; @@ -2126,6 +2126,62 @@ mod conn { .expect("client should be open"); } + #[tokio::test] + async fn http2_responds_before_consuming_request_body() { + // Test that a early-response from server works correctly (request body wasn't fully consumed). + // https://github.com/hyperium/hyper/issues/2872 + use hyper::service::service_fn; + + let _ = pretty_env_logger::try_init(); + + let (listener, addr) = setup_tk_test_server().await; + + // Spawn an HTTP2 server that responds before reading the whole request body. + // It's normal case to decline the request due to headers or size of the body. + tokio::spawn(async move { + let sock = TokioIo::new(listener.accept().await.unwrap().0); + hyper::server::conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .serve_connection( + sock, + service_fn(|_req| async move { + Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from( + "No bread for you!", + )))) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .handshake(io) + .await + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (_tx, recv) = mpsc::channel::, Box>>(0); + let req = Request::post("/a").body(StreamBody::new(recv)).unwrap(); + let resp = client.send_request(req).await.expect("send_request"); + assert!(resp.status().is_success()); + + let mut body = String::new(); + concat(resp.into_body()) + .await + .unwrap() + .reader() + .read_to_string(&mut body) + .unwrap(); + + assert_eq!(&body, "No bread for you!"); + } + #[tokio::test] async fn h2_connect() { let (listener, addr) = setup_tk_test_server().await;