Skip to content

Commit

Permalink
feat(server): expose server::conn::http1::UpgradeableConnection (#3457
Browse files Browse the repository at this point in the history
)

This is the type returned by `Connection::with_upgrades()`. Making it nameable allows others to use its graceful shutdown method.
  • Loading branch information
davidpdrsn committed Dec 11, 2023
1 parent bb818cc commit 6e3042a
Showing 1 changed file with 44 additions and 51 deletions.
95 changes: 44 additions & 51 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::task::{Context, Poll};
use std::time::Duration;

use crate::rt::{Read, Write};
use crate::upgrade::Upgraded;
use bytes::Bytes;

use crate::body::{Body, Incoming as IncomingBody};
Expand Down Expand Up @@ -191,11 +192,11 @@ where
/// Enable this connection to support higher-level HTTP upgrades.
///
/// See [the `upgrade` module](crate::upgrade) for more.
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S>
pub fn with_upgrades(self) -> UpgradeableConnection<I, S>
where
I: Send,
{
upgrades::UpgradeableConnection { inner: Some(self) }
UpgradeableConnection { inner: Some(self) }
}
}

Expand Down Expand Up @@ -433,60 +434,52 @@ impl Builder {
}
}

mod upgrades {
use crate::upgrade::Upgraded;

use super::*;

// A future binding a connection with a Service with Upgrade support.
//
// This type is unnameable outside the crate.
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, S>
where
S: HttpService<IncomingBody>,
{
pub(super) inner: Option<Connection<T, S>>,
}
/// A future binding a connection with a Service with Upgrade support.
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, S>
where
S: HttpService<IncomingBody>,
{
pub(super) inner: Option<Connection<T, S>>,
}

impl<I, B, S> UpgradeableConnection<I, S>
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Start a graceful shutdown process for this connection.
///
/// This `Connection` should continue to be polled until shutdown
/// can finish.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
}
impl<I, B, S> UpgradeableConnection<I, S>
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Start a graceful shutdown process for this connection.
///
/// This `Connection` should continue to be polled until shutdown
/// can finish.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
}
}

impl<I, B, S> Future for UpgradeableConnection<I, S>
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;
impl<I, B, S> Future for UpgradeableConnection<I, S>
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}

0 comments on commit 6e3042a

Please sign in to comment.