Skip to content

Commit

Permalink
refactor(server): server::conn::http1::Connection always holds a proto (
Browse files Browse the repository at this point in the history
#3018)

* server::conn::http1::Connection always holds a proto

* Remove `Connection::try_into_parts` as it can't fail
  • Loading branch information
bossmc committed Oct 24, 2022
1 parent 0766d3f commit 91e83b7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 55 deletions.
4 changes: 3 additions & 1 deletion src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl Sender {
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
#[cfg(feature = "http1")]
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.data_tx
.try_send(Ok(chunk))
Expand Down Expand Up @@ -447,7 +448,7 @@ mod tests {
assert!(err.is_body_write_aborted(), "{:?}", err);
}

#[cfg(not(miri))]
#[cfg(all(not(miri), feature = "http1"))]
#[tokio::test]
async fn channel_abort_when_buffer_is_full() {
let (mut tx, mut rx) = Recv::channel();
Expand All @@ -463,6 +464,7 @@ mod tests {
assert!(err.is_body_write_aborted(), "{:?}", err);
}

#[cfg(feature = "http1")]
#[test]
fn channel_buffers_one() {
let (mut tx, _rx) = Recv::channel();
Expand Down
1 change: 1 addition & 0 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use http_body::SizeHint;

pub use self::aggregate::aggregate;
pub use self::body::Recv;
#[cfg(feature = "http1")]
pub(crate) use self::body::Sender;
pub(crate) use self::length::DecodedLength;
pub use self::to_bytes::to_bytes;
Expand Down
13 changes: 0 additions & 13 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ pub(super) enum User {
#[cfg(feature = "http1")]
ManualUpgrade,

/// User called `server::Connection::without_shutdown()` on an HTTP/2 conn.
#[cfg(feature = "server")]
WithoutShutdownNonHttp1,

/// User aborted in an FFI callback.
#[cfg(feature = "ffi")]
AbortedByCallback,
Expand Down Expand Up @@ -308,11 +304,6 @@ impl Error {
Error::new_user(User::Body).with(cause)
}

#[cfg(feature = "server")]
pub(super) fn new_without_shutdown_not_h1() -> Error {
Error::new(Kind::User(User::WithoutShutdownNonHttp1))
}

#[cfg(feature = "http1")]
pub(super) fn new_shutdown(cause: std::io::Error) -> Error {
Error::new(Kind::Shutdown).with(cause)
Expand Down Expand Up @@ -399,10 +390,6 @@ impl Error {
Kind::User(User::NoUpgrade) => "no upgrade available",
#[cfg(feature = "http1")]
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
#[cfg(feature = "server")]
Kind::User(User::WithoutShutdownNonHttp1) => {
"without_shutdown() called on a non-HTTP/1 connection"
}
#[cfg(feature = "ffi")]
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
}
Expand Down
61 changes: 20 additions & 41 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pin_project_lite::pin_project! {
where
S: HttpService<Recv>,
{
conn: Option<Http1Dispatcher<T, S::ResBody, S>>,
conn: Http1Dispatcher<T, S::ResBody, S>,
}
}

Expand Down Expand Up @@ -98,12 +98,7 @@ where
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
match self.conn {
Some(ref mut h1) => {
h1.disable_keep_alive();
}
None => (),
}
self.conn.disable_keep_alive();
}

/// Return the inner IO object, and additional information.
Expand All @@ -116,25 +111,13 @@ where
/// # Panics
/// This method will panic if this connection is using an h2 protocol.
pub fn into_parts(self) -> Parts<I, S> {
self.try_into_parts()
.unwrap_or_else(|| panic!("h2 cannot into_inner"))
}

/// Return the inner IO object, and additional information, if available.
///
///
/// TODO:(mike) does this need to return none for h1 or is it expected to always be present? previously used an "unwrap"
/// This method will return a `None` if this connection is using an h2 protocol.
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
self.conn.map(|h1| {
let (io, read_buf, dispatch) = h1.into_inner();
Parts {
io,
read_buf,
service: dispatch.into_service(),
_inner: (),
}
})
let (io, read_buf, dispatch) = self.conn.into_inner();
Parts {
io,
read_buf,
service: dispatch.into_service(),
_inner: (),
}
}

/// Poll the connection for completion, but without calling `shutdown`
Expand All @@ -150,7 +133,7 @@ where
S::Future: Unpin,
B: Unpin,
{
self.conn.as_mut().unwrap().poll_without_shutdown(cx)
self.conn.poll_without_shutdown(cx)
}

/// Prevent shutdown of the underlying IO object at the end of service the request,
Expand All @@ -165,15 +148,11 @@ where
S::Future: Unpin,
B: Unpin,
{
// TODO(mike): "new_without_shutdown_not_h1" is not possible here
let mut conn = Some(self);
let mut zelf = Some(self);
futures_util::future::poll_fn(move |cx| {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
Poll::Ready(
conn.take()
.unwrap()
.try_into_parts()
.ok_or_else(crate::Error::new_without_shutdown_not_h1),
Ok(zelf.take().unwrap().into_parts())
)
})
}
Expand All @@ -185,7 +164,7 @@ where
where
I: Send,
{
upgrades::UpgradeableConnection { inner: self }
upgrades::UpgradeableConnection { inner: Some(self) }
}
}

Expand All @@ -201,7 +180,7 @@ where
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
match ready!(Pin::new(&mut self.conn).poll(cx)) {
Ok(done) => {
match done {
proto::Dispatched::Shutdown => {}
Expand Down Expand Up @@ -417,7 +396,7 @@ impl Builder {
let sd = proto::h1::dispatch::Server::new(service);
let proto = proto::h1::Dispatcher::new(sd, conn);
Connection {
conn: Some(proto),
conn: proto,
}
}
}
Expand All @@ -436,7 +415,7 @@ mod upgrades {
where
S: HttpService<Recv>,
{
pub(super) inner: Connection<T, S>,
pub(super) inner: Option<Connection<T, S>>,
}

impl<I, B, S> UpgradeableConnection<I, S>
Expand All @@ -452,7 +431,7 @@ mod upgrades {
/// This `Connection` should continue to be polled until shutdown
/// can finish.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
Pin::new(&mut self.inner).graceful_shutdown()
Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
}
}

Expand All @@ -467,10 +446,10 @@ mod upgrades {
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let (io, buf, _) = self.inner.conn.take().unwrap().into_inner();
let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
Expand Down

0 comments on commit 91e83b7

Please sign in to comment.