Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(body): add trailers to Body channel (#2260) #2387

Merged
merged 8 commits into from Jan 15, 2021
3 changes: 2 additions & 1 deletion benches/end_to_end.rs
Expand Up @@ -8,7 +8,7 @@ use std::net::SocketAddr;
use futures_util::future::join_all;

use hyper::client::HttpConnector;
use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server};
use hyper::{body::HttpBody as _, Body, HeaderMap, Method, Request, Response, Server};

// HTTP1

Expand Down Expand Up @@ -313,6 +313,7 @@ impl Opts {
for _ in 0..chunk_cnt {
tx.send_data(chunk.into()).await.expect("send_data");
}
tx.send_trailers(HeaderMap::new()).expect("send_trailers");
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
});
body
} else {
Expand Down
63 changes: 43 additions & 20 deletions src/body/body.rs
Expand Up @@ -5,8 +5,6 @@ use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
Expand All @@ -17,14 +15,16 @@ use http_body::{Body as HttpBody, SizeHint};
use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Pin, Poll};
use crate::common::Future;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use crate::common::{Future, Never};
use crate::common::Never;
use crate::common::{task, watch, Pin, Poll};
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
type TrailersSender = oneshot::Sender<HeaderMap>;

/// A stream of `Bytes`, used when receiving bodies.
///
Expand All @@ -43,7 +43,8 @@ enum Kind {
Chan {
content_length: DecodedLength,
want_tx: watch::Sender,
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
trailers_rx: oneshot::Receiver<HeaderMap>,
},
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
H2 {
Expand Down Expand Up @@ -106,7 +107,8 @@ enum DelayEof {
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
want_rx: watch::Receiver,
tx: BodySender,
data_tx: BodySender,
trailers_tx: Option<TrailersSender>,
}

const WANT_PENDING: usize = 1;
Expand Down Expand Up @@ -137,19 +139,25 @@ impl Body {
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();

// If wanter is true, `Sender::poll_ready()` won't becoming ready
// until the `Body` has been polled for data once.
let want = if wanter { WANT_PENDING } else { WANT_READY };

let (want_tx, want_rx) = watch::channel(want);

let tx = Sender { want_rx, tx };
let tx = Sender {
want_rx,
data_tx,
trailers_tx: Some(trailers_tx),
};
let rx = Body::new(Kind::Chan {
content_length,
want_tx,
rx,
data_rx,
trailers_rx,
});

(tx, rx)
Expand Down Expand Up @@ -282,12 +290,13 @@ impl Body {
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
Kind::Chan {
content_length: ref mut len,
ref mut rx,
ref mut data_rx,
ref mut want_tx,
..
} => {
want_tx.send(WANT_READY);

match ready!(Pin::new(rx).poll_next(cx)?) {
match ready!(Pin::new(data_rx).poll_next(cx)?) {
Some(chunk) => {
len.sub_if(chunk.len() as u64);
Poll::Ready(Some(Ok(chunk)))
Expand Down Expand Up @@ -368,10 +377,15 @@ impl HttpBody for Body {
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
},

Kind::Chan {
ref mut trailers_rx,
..
} => match ready!(Pin::new(trailers_rx).poll(cx)) {
Ok(t) => Poll::Ready(Ok(Some(t))),
Err(_) => Poll::Ready(Err(crate::Error::new_closed())),
},
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),

_ => Poll::Ready(Ok(None)),
}
}
Expand Down Expand Up @@ -527,7 +541,7 @@ impl Sender {
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.tx
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
}
Expand All @@ -545,14 +559,23 @@ impl Sender {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
}

/// Send data on this channel when it is ready.
/// Send data on data channel when it is ready.
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
self.ready().await?;
self.tx
self.data_tx
.try_send(Ok(chunk))
.map_err(|_| crate::Error::new_closed())
}

/// Send trailers on trailers channel.
pub fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though not specifically required due to the internal implementation, I'm thinking this should be an async fn, so that we could adjust the implementation later if need be. But I'm open to hearing why it shouldn't.

let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(crate::Error::new_closed()),
};
tx.send(trailers).map_err(|_| crate::Error::new_closed())
}

/// Try to send data on this channel.
///
/// # Errors
Expand All @@ -566,23 +589,23 @@ impl Sender {
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.tx
self.data_tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

/// Aborts the body in an abnormal fashion.
pub fn abort(self) {
let _ = self
.tx
.data_tx
// clone so the send works even if buffer is full
.clone()
.try_send(Err(crate::Error::new_body_write_aborted()));
}

#[cfg(feature = "http1")]
pub(crate) fn send_error(&mut self, err: crate::Error) {
let _ = self.tx.try_send(Err(err));
let _ = self.data_tx.try_send(Err(err));
}
}

Expand Down Expand Up @@ -628,7 +651,7 @@ mod tests {

assert_eq!(
mem::size_of::<Sender>(),
mem::size_of::<usize>() * 4,
mem::size_of::<usize>() * 5,
aeryz marked this conversation as resolved.
Show resolved Hide resolved
"Sender"
);

Expand Down
5 changes: 4 additions & 1 deletion src/client/conn.rs
Expand Up @@ -63,7 +63,10 @@ use tower_service::Service;

use super::dispatch;
use crate::body::HttpBody;
use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll};
use crate::common::{
aeryz marked this conversation as resolved.
Show resolved Hide resolved
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
#[cfg(feature = "http1")]
Expand Down