Skip to content

Commit

Permalink
feat(rt): make private executor traits public (but sealed) in `rt::bo…
Browse files Browse the repository at this point in the history
…unds` (hyperium#3127)

Define public trait aliases that are sealed, but can be named externally, and have documentation showing how to provide a Executor to match the bounds, and how to express the bounds in your own API.

Closes hyperium#2051
Closes hyperium#3097

Signed-off-by: Sven Pfennig <s.pfennig@reply.de>
  • Loading branch information
programatik29 authored and 0xE282B0 committed Jan 16, 2024
1 parent 3b76f3f commit 3986f77
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 44 deletions.
34 changes: 0 additions & 34 deletions src/common/exec.rs
Expand Up @@ -3,17 +3,8 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

#[cfg(feature = "server")]
use crate::body::Body;
#[cfg(all(feature = "http2", feature = "server"))]
use crate::proto::h2::server::H2Stream;
use crate::rt::Executor;

#[cfg(feature = "server")]
pub trait ConnStreamExec<F, B: Body>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
}

pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

// Executor must be provided by the user
Expand Down Expand Up @@ -44,31 +35,6 @@ impl fmt::Debug for Exec {
}
}

#[cfg(feature = "server")]
impl<F, B> ConnStreamExec<F, B> for Exec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: Body,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)
}
}

// ==== impl Executor =====

#[cfg(feature = "server")]
impl<E, F, B> ConnStreamExec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
B: Body,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)
}
}

// If http2 is not enable, we just have a stub here, so that the trait bounds
// that *would* have been needed are still checked. Why?
//
Expand Down
8 changes: 4 additions & 4 deletions src/proto/h2/server.rs
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, trace, warn};

use super::{ping, PipeToSendStream, SendBuf};
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::exec::ConnStreamExec;
use crate::rt::bounds::Http2ConnExec;
use crate::common::time::Time;
use crate::common::{date, task, Future, Pin, Poll};
use crate::ext::Protocol;
Expand Down Expand Up @@ -110,7 +110,7 @@ where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
E: ConnStreamExec<S::Future, B>,
E: Http2ConnExec<S::Future, B>,
{
pub(crate) fn new(
io: T,
Expand Down Expand Up @@ -186,7 +186,7 @@ where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
E: ConnStreamExec<S::Future, B>,
E: Http2ConnExec<S::Future, B>,
{
type Output = crate::Result<Dispatched>;

Expand Down Expand Up @@ -240,7 +240,7 @@ where
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<S::Future, B>,
E: Http2ConnExec<S::Future, B>,
{
if self.closing.is_none() {
loop {
Expand Down
69 changes: 69 additions & 0 deletions src/rt/bounds.rs
@@ -0,0 +1,69 @@
//! Trait aliases
//!
//! Traits in this module ease setting bounds and usually automatically
//! implemented by implementing another trait.

#[cfg(all(feature = "server", feature = "http2"))]
pub use self::h2::Http2ConnExec;

#[cfg(all(feature = "server", feature = "http2"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))]
mod h2 {
use crate::{common::exec::Exec, proto::h2::server::H2Stream, rt::Executor};
use http_body::Body;
use std::future::Future;

/// An executor to spawn http2 connections.
///
/// This trait is implemented for any type that implements [`Executor`]
/// trait for any future.
///
/// This trait is sealed and cannot be implemented for types outside this crate.
///
/// [`Executor`]: crate::rt::Executor
pub trait Http2ConnExec<F, B: Body>: sealed::Sealed<(F, B)> + Clone {
#[doc(hidden)]
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
}

impl<F, B> Http2ConnExec<F, B> for Exec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: Body,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)
}
}

impl<F, B> sealed::Sealed<(F, B)> for Exec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: Body,
{
}

#[doc(hidden)]
impl<E, F, B> Http2ConnExec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
B: Body,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)
}
}

impl<E, F, B> sealed::Sealed<(F, B)> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
B: Body,
{
}

mod sealed {
pub trait Sealed<T> {}
}
}
22 changes: 22 additions & 0 deletions src/rt.rs → src/rt/mod.rs
Expand Up @@ -5,13 +5,35 @@
//! If the `runtime` feature is disabled, the types in this module can be used
//! to plug in other runtimes.

pub mod bounds;

use std::{
future::Future,
pin::Pin,
time::{Duration, Instant},
};

/// An executor of futures.
///
/// This trait should be implemented for any future.
///
/// # Example
///
/// ```
/// # use hyper::rt::Executor;
/// # use std::future::Future;
/// struct TokioExecutor;
///
/// impl<F> Executor<F> for TokioExecutor
/// where
/// F: Future + Send + 'static,
/// F::Output: Send + 'static,
/// {
/// fn execute(&self, future: F) {
/// tokio::spawn(future);
/// }
/// }
/// ```
pub trait Executor<Fut> {
/// Place the future into the executor to be run.
fn execute(&self, fut: Fut);
Expand Down
13 changes: 8 additions & 5 deletions src/server/conn/http2.rs
Expand Up @@ -9,9 +9,9 @@ use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::body::{Body, Incoming as IncomingBody};
use crate::common::exec::ConnStreamExec;
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::proto;
use crate::rt::bounds::Http2ConnExec;
use crate::service::HttpService;
use crate::{common::time::Time, rt::Timer};

Expand Down Expand Up @@ -54,7 +54,7 @@ where
I: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<S::Future, B>,
E: Http2ConnExec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
///
Expand All @@ -78,7 +78,7 @@ where
I: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<S::Future, B>,
E: Http2ConnExec<S::Future, B>,
{
type Output = crate::Result<()>;

Expand All @@ -99,7 +99,10 @@ where
impl<E> Builder<E> {
/// Create a new connection builder.
///
/// This starts with the default options, and an executor.
/// This starts with the default options, and an executor which is a type
/// that implements [`Http2ConnExec`] trait.
///
/// [`Http2ConnExec`]: crate::rt::bounds::Http2ConnExec
pub fn new(exec: E) -> Self {
Self {
exec: exec,
Expand Down Expand Up @@ -259,7 +262,7 @@ impl<E> Builder<E> {
Bd: Body + 'static,
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
E: ConnStreamExec<S::Future, Bd>,
E: Http2ConnExec<S::Future, Bd>,
{
let proto = proto::h2::Server::new(
io,
Expand Down
2 changes: 1 addition & 1 deletion src/service/mod.rs
Expand Up @@ -26,7 +26,7 @@ mod service;
mod util;

#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
pub(super) use self::http::HttpService;
pub use self::http::HttpService;
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
Expand Down

0 comments on commit 3986f77

Please sign in to comment.