Skip to content

Commit

Permalink
feat(service): create own Service trait (#2920)
Browse files Browse the repository at this point in the history
This removes the dependency on `tower-service`, and simplifies the `Service` trait to be used by hyper's server connections.

Closes #2853 

BREAKING CHANGE: Change any manual `impl tower::Service` to implement `hyper::service::Service` instead. The `poll_ready` method has been removed.
  • Loading branch information
tomkarw committed Sep 8, 2022
1 parent fae97ce commit fee7d36
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 170 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Expand Up @@ -35,7 +35,6 @@ h2 = { version = "0.3.9", optional = true }
itoa = "1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project-lite = "0.2.4"
tower-service = "0.3"
tokio = { version = "1", features = ["sync"] }
want = "0.3"

Expand Down Expand Up @@ -65,7 +64,6 @@ tokio = { version = "1", features = [
] }
tokio-test = "0.4"
tokio-util = { version = "0.7", features = ["codec"] }
tower = { version = "0.4", features = ["make", "util"] }
url = "2.2"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
Expand Down
5 changes: 0 additions & 5 deletions examples/service_struct_impl.rs
Expand Up @@ -8,7 +8,6 @@ use tokio::net::TcpListener;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

type Counter = i32;

Expand Down Expand Up @@ -42,10 +41,6 @@ impl Service<Request<Recv>> for Svc {
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Recv>) -> Self::Future {
fn mk_response(s: String) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap())
Expand Down
22 changes: 0 additions & 22 deletions src/client/conn/mod.rs
Expand Up @@ -18,7 +18,6 @@
//! use http_body_util::Empty;
//! use hyper::client::conn;
//! use tokio::net::TcpStream;
//! use tower::ServiceExt;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -41,9 +40,6 @@
//! let response = request_sender.send_request(request).await?;
//! assert!(response.status() == StatusCode::OK);
//!
//! // To send via the same connection again, it may not work as it may not be ready,
//! // so we have to wait until the request_sender becomes ready.
//! request_sender.ready().await?;
//! let request = Request::builder()
//! .header("Host", "example.com")
//! .method("GET")
Expand All @@ -69,7 +65,6 @@ use futures_util::future;
use httparse::ParserConfig;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use tracing::{debug, trace};

use super::dispatch;
Expand Down Expand Up @@ -266,23 +261,6 @@ where
}
}

impl<B> Service<Request<B>> for SendRequest<B>
where
B: Body + 'static,
{
type Response = Response<Recv>;
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}

impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendRequest").finish()
Expand Down
10 changes: 4 additions & 6 deletions src/proto/h1/dispatch.rs
Expand Up @@ -233,7 +233,7 @@ where
}

fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// can dispatch receive, or does it still care about, an incoming message?
// can dispatch receive, or does it still care about other incoming message?
match ready!(self.dispatch.poll_ready(cx)) {
Ok(()) => (),
Err(()) => {
Expand All @@ -242,6 +242,7 @@ where
return Poll::Ready(Ok(()));
}
}

// dispatch is ready for a message, try to read one
match ready!(self.conn.poll_read_head(cx)) {
Some(Ok((mut head, body_len, wants))) => {
Expand Down Expand Up @@ -511,14 +512,11 @@ cfg_server! {
Ok(())
}

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
if self.in_flight.is_some() {
Poll::Pending
} else {
self.service.poll_ready(cx).map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
Poll::Ready(Ok(()))
}
}

Expand Down
32 changes: 0 additions & 32 deletions src/proto/h2/server.rs
Expand Up @@ -257,38 +257,6 @@ where
loop {
self.poll_ping(cx);

// Check that the service is ready to accept a new request.
//
// - If not, just drive the connection some.
// - If ready, try to accept a new request from the connection.
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
// use `poll_closed` instead of `poll_accept`,
// in order to avoid accepting a request.
ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
trace!("incoming connection complete");
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(err)) => {
let err = crate::Error::new_user_service(err);
debug!("service closed: {}", err);

let reason = err.h2_reason();
if reason == Reason::NO_ERROR {
// NO_ERROR is only used for graceful shutdowns...
trace!("interpreting NO_ERROR user error as graceful_shutdown");
self.conn.graceful_shutdown();
} else {
trace!("abruptly shutting down with {:?}", reason);
self.conn.abrupt_shutdown(reason);
}
self.closing = Some(err);
break;
}
}

// When the service is ready, accepts an incoming request.
match ready!(self.conn.poll_accept(cx)) {
Some(Ok((req, mut respond))) => {
trace!("incoming request");
Expand Down
16 changes: 5 additions & 11 deletions src/service/http.rs
@@ -1,7 +1,8 @@
use std::error::Error as StdError;

use crate::body::Body;
use crate::common::{task, Future, Poll};
use crate::common::Future;
use crate::service::service::Service;
use crate::{Request, Response};

/// An asynchronous function from `Request` to `Response`.
Expand All @@ -19,16 +20,13 @@ pub trait HttpService<ReqBody>: sealed::Sealed<ReqBody> {
/// The `Future` returned by this `Service`.
type Future: Future<Output = Result<Response<Self::ResBody>, Self::Error>>;

#[doc(hidden)]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;

#[doc(hidden)]
fn call(&mut self, req: Request<ReqBody>) -> Self::Future;
}

impl<T, B1, B2> HttpService<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
T: Service<Request<B1>, Response = Response<B2>>,
B2: Body,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand All @@ -37,18 +35,14 @@ where
type Error = T::Error;
type Future = T::Future;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
tower_service::Service::poll_ready(self, cx)
}

fn call(&mut self, req: Request<B1>) -> Self::Future {
tower_service::Service::call(self, req)
Service::call(self, req)
}
}

impl<T, B1, B2> sealed::Sealed<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
T: Service<Request<B1>, Response = Response<B2>>,
B2: Body,
{
}
Expand Down
8 changes: 6 additions & 2 deletions src/service/mod.rs
Expand Up @@ -21,12 +21,16 @@
//! if you need to implement `Service` for a type manually, you can follow the example
//! in `service_struct_impl.rs`.

pub use tower_service::Service;

mod http;
mod service;
mod util;

#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
pub(super) use self::http::HttpService;
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub use self::service::Service;

pub use self::util::service_fn;
32 changes: 32 additions & 0 deletions src/service/service.rs
@@ -0,0 +1,32 @@
use std::future::Future;

/// An asynchronous function from a `Request` to a `Response`.
///
/// The `Service` trait is a simplified interface making it easy to write
/// network applications in a modular and reusable way, decoupled from the
/// underlying protocol.
///
/// # Functional
///
/// A `Service` is a function of a `Request`. It immediately returns a
/// `Future` representing the eventual completion of processing the
/// request. The actual request processing may happen at any time in the
/// future, on any thread or executor. The processing may depend on calling
/// other services. At some point in the future, the processing will complete,
/// and the `Future` will resolve to a response or error.
///
/// At a high level, the `Service::call` function represents an RPC request. The
/// `Service` value can be a server or a client.
pub trait Service<Request> {
/// Responses given by the service.
type Response;

/// Errors produced by the service.
type Error;

/// The future response value.
type Future: Future<Output = Result<Self::Response, Self::Error>>;

/// Process the request and return the response asynchronously.
fn call(&mut self, req: Request) -> Self::Future;
}
10 changes: 3 additions & 7 deletions src/service/util.rs
Expand Up @@ -3,7 +3,8 @@ use std::fmt;
use std::marker::PhantomData;

use crate::body::Body;
use crate::common::{task, Future, Poll};
use crate::common::Future;
use crate::service::service::Service;
use crate::{Request, Response};

/// Create a `Service` from a function.
Expand Down Expand Up @@ -43,8 +44,7 @@ pub struct ServiceFn<F, R> {
_req: PhantomData<fn(R)>,
}

impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
for ServiceFn<F, ReqBody>
impl<F, ReqBody, Ret, ResBody, E> Service<Request<ReqBody>> for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: Body,
Expand All @@ -56,10 +56,6 @@ where
type Error = E;
type Future = Ret;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
(self.f)(req)
}
Expand Down

0 comments on commit fee7d36

Please sign in to comment.