diff --git a/Cargo.toml b/Cargo.toml index 11d15ce3b0..0126eae8ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ h2 = { version = "0.3.9", optional = true } itoa = "1" tracing = { version = "0.1", default-features = false, features = ["std"] } pin-project-lite = "0.2.4" -tower-service = "0.3" tokio = { version = "1", features = ["sync"] } want = "0.3" @@ -65,7 +64,6 @@ tokio = { version = "1", features = [ ] } tokio-test = "0.4" tokio-util = { version = "0.7", features = ["codec"] } -tower = { version = "0.4", features = ["make", "util"] } url = "2.2" [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index 781e1e7a16..ad73c6855c 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -8,7 +8,6 @@ use tokio::net::TcpListener; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use std::task::{Context, Poll}; type Counter = i32; @@ -42,10 +41,6 @@ impl Service> for Svc { type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: Request) -> Self::Future { fn mk_response(s: String) -> Result>, hyper::Error> { Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap()) diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index ae5ce15b71..7a2f3bc3a6 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -18,7 +18,6 @@ //! use http_body_util::Empty; //! use hyper::client::conn; //! use tokio::net::TcpStream; -//! use tower::ServiceExt; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -41,9 +40,6 @@ //! let response = request_sender.send_request(request).await?; //! assert!(response.status() == StatusCode::OK); //! -//! // To send via the same connection again, it may not work as it may not be ready, -//! // so we have to wait until the request_sender becomes ready. -//! request_sender.ready().await?; //! let request = Request::builder() //! .header("Host", "example.com") //! .method("GET") @@ -69,7 +65,6 @@ use futures_util::future; use httparse::ParserConfig; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service::Service; use tracing::{debug, trace}; use super::dispatch; @@ -266,23 +261,6 @@ where } } -impl Service> for SendRequest -where - B: Body + 'static, -{ - type Response = Response; - type Error = crate::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.send_request(req) - } -} - impl fmt::Debug for SendRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendRequest").finish() diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 5c75f302fa..8605f246d5 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -233,7 +233,7 @@ where } fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll> { - // can dispatch receive, or does it still care about, an incoming message? + // can dispatch receive, or does it still care about other incoming message? match ready!(self.dispatch.poll_ready(cx)) { Ok(()) => (), Err(()) => { @@ -242,6 +242,7 @@ where return Poll::Ready(Ok(())); } } + // dispatch is ready for a message, try to read one match ready!(self.conn.poll_read_head(cx)) { Some(Ok((mut head, body_len, wants))) => { @@ -511,14 +512,11 @@ cfg_server! { Ok(()) } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { if self.in_flight.is_some() { Poll::Pending } else { - self.service.poll_ready(cx).map_err(|_e| { - // FIXME: return error value. - trace!("service closed"); - }) + Poll::Ready(Ok(())) } } diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index f2c2e7d763..c0e1df47be 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -257,38 +257,6 @@ where loop { self.poll_ping(cx); - // Check that the service is ready to accept a new request. - // - // - If not, just drive the connection some. - // - If ready, try to accept a new request from the connection. - match service.poll_ready(cx) { - Poll::Ready(Ok(())) => (), - Poll::Pending => { - // use `poll_closed` instead of `poll_accept`, - // in order to avoid accepting a request. - ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; - trace!("incoming connection complete"); - return Poll::Ready(Ok(())); - } - Poll::Ready(Err(err)) => { - let err = crate::Error::new_user_service(err); - debug!("service closed: {}", err); - - let reason = err.h2_reason(); - if reason == Reason::NO_ERROR { - // NO_ERROR is only used for graceful shutdowns... - trace!("interpreting NO_ERROR user error as graceful_shutdown"); - self.conn.graceful_shutdown(); - } else { - trace!("abruptly shutting down with {:?}", reason); - self.conn.abrupt_shutdown(reason); - } - self.closing = Some(err); - break; - } - } - - // When the service is ready, accepts an incoming request. match ready!(self.conn.poll_accept(cx)) { Some(Ok((req, mut respond))) => { trace!("incoming request"); diff --git a/src/service/http.rs b/src/service/http.rs index ff05586498..dbbdaa107b 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,7 +1,8 @@ use std::error::Error as StdError; use crate::body::Body; -use crate::common::{task, Future, Poll}; +use crate::common::Future; +use crate::service::service::Service; use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. @@ -19,16 +20,13 @@ pub trait HttpService: sealed::Sealed { /// The `Future` returned by this `Service`. type Future: Future, Self::Error>>; - #[doc(hidden)] - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; - #[doc(hidden)] fn call(&mut self, req: Request) -> Self::Future; } impl HttpService for T where - T: tower_service::Service, Response = Response>, + T: Service, Response = Response>, B2: Body, T::Error: Into>, { @@ -37,18 +35,14 @@ where type Error = T::Error; type Future = T::Future; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - tower_service::Service::poll_ready(self, cx) - } - fn call(&mut self, req: Request) -> Self::Future { - tower_service::Service::call(self, req) + Service::call(self, req) } } impl sealed::Sealed for T where - T: tower_service::Service, Response = Response>, + T: Service, Response = Response>, B2: Body, { } diff --git a/src/service/mod.rs b/src/service/mod.rs index c82939d632..d149acf063 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -21,12 +21,16 @@ //! if you need to implement `Service` for a type manually, you can follow the example //! in `service_struct_impl.rs`. -pub use tower_service::Service; - mod http; +mod service; mod util; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] pub(super) use self::http::HttpService; +#[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "server", feature = "client") +))] +pub use self::service::Service; pub use self::util::service_fn; diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000000..b5de9bec20 --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,32 @@ +use std::future::Future; + +/// An asynchronous function from a `Request` to a `Response`. +/// +/// The `Service` trait is a simplified interface making it easy to write +/// network applications in a modular and reusable way, decoupled from the +/// underlying protocol. +/// +/// # Functional +/// +/// A `Service` is a function of a `Request`. It immediately returns a +/// `Future` representing the eventual completion of processing the +/// request. The actual request processing may happen at any time in the +/// future, on any thread or executor. The processing may depend on calling +/// other services. At some point in the future, the processing will complete, +/// and the `Future` will resolve to a response or error. +/// +/// At a high level, the `Service::call` function represents an RPC request. The +/// `Service` value can be a server or a client. +pub trait Service { + /// Responses given by the service. + type Response; + + /// Errors produced by the service. + type Error; + + /// The future response value. + type Future: Future>; + + /// Process the request and return the response asynchronously. + fn call(&mut self, req: Request) -> Self::Future; +} diff --git a/src/service/util.rs b/src/service/util.rs index f6dda7711f..a41945951c 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -3,7 +3,8 @@ use std::fmt; use std::marker::PhantomData; use crate::body::Body; -use crate::common::{task, Future, Poll}; +use crate::common::Future; +use crate::service::service::Service; use crate::{Request, Response}; /// Create a `Service` from a function. @@ -43,8 +44,7 @@ pub struct ServiceFn { _req: PhantomData, } -impl tower_service::Service> - for ServiceFn +impl Service> for ServiceFn where F: FnMut(Request) -> Ret, ReqBody: Body, @@ -56,10 +56,6 @@ where type Error = E; type Future = Ret; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: Request) -> Self::Future { (self.f)(req) } diff --git a/tests/server.rs b/tests/server.rs index c294a70f21..0486357a75 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -29,7 +29,7 @@ use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpSt use hyper::body::Body; use hyper::server::conn::Http; -use hyper::service::service_fn; +use hyper::service::{service_fn, Service}; use hyper::{Method, Recv, Request, Response, StatusCode, Uri, Version}; mod support; @@ -2310,77 +2310,6 @@ fn http2_body_user_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } -struct Http2ReadyErrorSvc; - -impl tower_service::Service> for Http2ReadyErrorSvc { - type Response = Response; - type Error = h2::Error; - type Future = Box< - dyn futures_core::Future> - + Send - + Sync - + Unpin, - >; - - fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Err::<(), _>(h2::Error::from( - h2::Reason::INADEQUATE_SECURITY, - ))) - } - - fn call(&mut self, _: hyper::Request) -> Self::Future { - unreachable!("poll_ready error should have shutdown conn"); - } -} - -#[tokio::test] -#[ignore] // sometimes ECONNRESET wins the race -async fn http2_service_poll_ready_error_sends_goaway() { - use std::error::Error; - - let _ = pretty_env_logger::try_init(); - - let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) - .await - .unwrap(); - - let addr_str = format!("http://{}", listener.local_addr().unwrap()); - - tokio::task::spawn(async move { - loop { - tokio::select! { - res = listener.accept() => { - let (stream, _) = res.unwrap(); - - tokio::task::spawn(async move { - let mut http = Http::new(); - http.http2_only(true); - - let service = Http2ReadyErrorSvc; - http.serve_connection(stream, service).await.unwrap(); - }); - } - } - } - }); - - let uri = addr_str.parse().expect("server addr should parse"); - let err = dbg!(TestClient::new() - .http2_only() - .get(uri) - .await - .expect_err("client.get should fail")); - - // client request should have gotten the specific GOAWAY error... - let h2_err = err - .source() - .expect("source") - .downcast_ref::() - .expect("downcast"); - - assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); -} - #[test] fn skips_content_length_for_304_responses() { let server = serve(); @@ -2789,15 +2718,11 @@ enum Msg { End, } -impl tower_service::Service> for TestService { +impl Service> for TestService { type Response = Response; type Error = BoxError; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - fn call(&mut self, mut req: Request) -> Self::Future { let tx = self.tx.clone(); let replies = self.reply.clone(); @@ -2856,22 +2781,18 @@ const HELLO: &str = "hello"; struct HelloWorld; -impl tower_service::Service> for HelloWorld { +impl Service> for HelloWorld { type Response = Response>; type Error = hyper::Error; type Future = future::Ready>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(Full::new(HELLO.into())); future::ok(response) } } -fn unreachable_service() -> impl tower_service::Service< +fn unreachable_service() -> impl Service< http::Request, Response = http::Response, Error = BoxError,