From d963e6a9504575116f63df2485d8480fdb9b6f0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oddbj=C3=B8rn=20Gr=C3=B8dem?= <29732646+oddgrd@users.noreply.github.com> Date: Mon, 29 Aug 2022 22:30:42 +0200 Subject: [PATCH] feat(body): make body::Sender and Body::channel private (#2970) Closes #2962 BREAKING CHANGE: A channel body will be available in `hyper-util`. --- src/body/body.rs | 18 +++++++++++------- src/body/mod.rs | 3 ++- tests/client.rs | 30 +++++++++++++++++++----------- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/body/body.rs b/src/body/body.rs index abba23c327..6cae4916da 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -61,7 +61,7 @@ enum Kind { /// [`Body::channel()`]: struct.Body.html#method.channel /// [`Sender::abort()`]: struct.Sender.html#method.abort #[must_use = "Sender does nothing unless sent on"] -pub struct Sender { +pub(crate) struct Sender { want_rx: watch::Receiver, data_tx: BodySender, trailers_tx: Option, @@ -75,7 +75,8 @@ impl Recv { /// /// Useful when wanting to stream chunks from another thread. #[inline] - pub fn channel() -> (Sender, Recv) { + #[allow(unused)] + pub(crate) fn channel() -> (Sender, Recv) { Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) } @@ -289,7 +290,7 @@ impl fmt::Debug for Recv { impl Sender { /// Check to see if this `Sender` can send more data. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { // Check if the receiver end has tried polling for the body yet ready!(self.poll_want(cx)?); self.data_tx @@ -311,7 +312,8 @@ impl Sender { } /// Send data on data channel when it is ready. - pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { + #[allow(unused)] + pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { self.ready().await?; self.data_tx .try_send(Ok(chunk)) @@ -319,7 +321,8 @@ impl Sender { } /// Send trailers on trailers channel. - pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { + #[allow(unused)] + pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { let tx = match self.trailers_tx.take() { Some(tx) => tx, None => return Err(crate::Error::new_closed()), @@ -339,14 +342,15 @@ impl Sender { /// This is mostly useful for when trying to send from some other thread /// that doesn't have an async context. If in an async context, prefer /// `send_data()` instead. - pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { + pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.data_tx .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) } /// Aborts the body in an abnormal fashion. - pub fn abort(self) { + #[allow(unused)] + pub(crate) fn abort(self) { let _ = self .data_tx // clone so the send works even if buffer is full diff --git a/src/body/mod.rs b/src/body/mod.rs index 0348a00efe..369e6f5145 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -20,7 +20,8 @@ pub use http_body::Body as HttpBody; pub use http_body::SizeHint; pub use self::aggregate::aggregate; -pub use self::body::{Recv, Sender}; +pub use self::body::Recv; +pub(crate) use self::body::Sender; pub(crate) use self::length::DecodedLength; pub use self::to_bytes::to_bytes; diff --git a/tests/client.rs b/tests/client.rs index 2bf0eedd14..2f8e352517 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1325,6 +1325,7 @@ test! { } mod conn { + use std::error::Error; use std::io::{self, Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; @@ -1333,15 +1334,15 @@ mod conn { use std::time::Duration; use bytes::{Buf, Bytes}; - use futures_channel::oneshot; + use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; - use http_body_util::Empty; - use hyper::upgrade::OnUpgrade; + use http_body_util::{Empty, StreamBody}; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; use hyper::body::HttpBody; use hyper::client::conn; + use hyper::upgrade::OnUpgrade; use hyper::{self, Method, Recv, Request, Response, StatusCode}; use super::{concat, s, support, tcp_connect, FutureHyperExt}; @@ -1524,17 +1525,23 @@ mod conn { rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); - let (mut sender, body) = Recv::channel(); + let (mut sender, recv) = mpsc::channel::>>(0); + let sender = thread::spawn(move || { - sender.try_send_data("hello".into()).expect("try_send_data"); + sender.try_send(Ok("hello".into())).expect("try_send_data"); support::runtime().block_on(rx).unwrap(); - sender.abort(); + + // Aborts the body in an abnormal fashion. + let _ = sender.try_send(Err(Box::new(std::io::Error::new( + io::ErrorKind::Other, + "body write aborted", + )))); }); let req = Request::builder() .method(Method::POST) .uri("/") - .body(body) + .body(StreamBody::new(recv)) .unwrap(); let res = client.send_request(req); rt.block_on(res).unwrap_err(); @@ -2111,7 +2118,7 @@ mod conn { .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) - .handshake::<_, Recv>(io) + .handshake(io) .await .expect("http handshake"); @@ -2120,9 +2127,10 @@ mod conn { }); // Use a channel to keep request stream open - let (_tx, body) = hyper::Recv::channel(); - let req1 = http::Request::new(body); - let _resp = client.send_request(req1).await.expect("send_request"); + let (_tx, recv) = mpsc::channel::>>(0); + let req = http::Request::new(StreamBody::new(recv)); + + let _resp = client.send_request(req).await.expect("send_request"); // sleep longer than keepalive would trigger tokio::time::sleep(Duration::from_secs(4)).await;