Skip to content

Commit

Permalink
feat(client,service): completely remove tower-service crate, reimplem…
Browse files Browse the repository at this point in the history
…ent service::oneshot::Oneshot
  • Loading branch information
tomkarw committed Jul 27, 2022
1 parent ad706df commit 99ed036
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 134 deletions.
11 changes: 0 additions & 11 deletions Cargo.toml
Expand Up @@ -65,7 +65,6 @@ tokio = { version = "1", features = [
] }
tokio-test = "0.4"
tokio-util = { version = "0.7", features = ["codec"] }
tower = { version = "0.4", features = ["make", "util"] }
url = "2.2"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
Expand Down Expand Up @@ -187,16 +186,6 @@ name = "state"
path = "examples/state.rs"
required-features = ["full"]

[[example]]
name = "tower_client"
path = "examples/tower_client.rs"
required-features = ["full"]

[[example]]
name = "tower_server"
path = "examples/tower_server.rs"
required-features = ["full"]

[[example]]
name = "upgrades"
path = "examples/upgrades.rs"
Expand Down
1 change: 0 additions & 1 deletion examples/service_struct_impl.rs
Expand Up @@ -3,7 +3,6 @@ use hyper::{Body, Request, Response, Server};

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

type Counter = i32;

Expand Down
13 changes: 3 additions & 10 deletions src/client/client.rs
Expand Up @@ -20,6 +20,7 @@ use super::HttpConnector;
use crate::body::{Body, HttpBody};
use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
use crate::rt::Executor;
use crate::service::Service;

/// A Client to make outgoing HTTP requests.
///
Expand Down Expand Up @@ -521,7 +522,7 @@ where
}
}

impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
impl<C, B> Service<Request<B>> for Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: HttpBody + Send + 'static,
Expand All @@ -532,16 +533,12 @@ where
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.request(req)
}
}

impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
impl<C, B> Service<Request<B>> for &'_ Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: HttpBody + Send + 'static,
Expand All @@ -552,10 +549,6 @@ where
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.request(req)
}
Expand Down
20 changes: 3 additions & 17 deletions src/client/connect/dns.rs
Expand Up @@ -30,8 +30,8 @@ use std::task::{self, Poll};
use std::{fmt, io, vec};

use tokio::task::JoinHandle;
use tower_service::Service;
use tracing::debug;
use crate::service::Service;

pub(super) use self::sealed::Resolve;

Expand Down Expand Up @@ -113,10 +113,6 @@ impl Service<Name> for GaiResolver {
type Error = io::Error;
type Future = GaiFuture;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, name: Name) -> Self::Future {
let blocking = tokio::task::spawn_blocking(move || {
debug!("resolving host={:?}", name.host);
Expand Down Expand Up @@ -286,10 +282,6 @@ impl Service<Name> for TokioThreadpoolGaiResolver {
type Error = io::Error;
type Future = TokioThreadpoolGaiFuture;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, name: Name) -> Self::Future {
TokioThreadpoolGaiFuture { name }
}
Expand Down Expand Up @@ -319,16 +311,15 @@ impl Future for TokioThreadpoolGaiFuture {

mod sealed {
use super::{SocketAddr, Name};
use crate::common::{task, Future, Poll};
use tower_service::Service;
use crate::common::Future;
use crate::service::Service;

// "Trait alias" for `Service<Name, Response = Addrs>`
pub trait Resolve {
type Addrs: Iterator<Item = SocketAddr>;
type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
type Future: Future<Output = Result<Self::Addrs, Self::Error>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
fn resolve(&mut self, name: Name) -> Self::Future;
}

Expand All @@ -342,10 +333,6 @@ mod sealed {
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}

fn resolve(&mut self, name: Name) -> Self::Future {
Service::call(self, name)
}
Expand All @@ -356,7 +343,6 @@ pub(super) async fn resolve<R>(resolver: &mut R, name: Name) -> Result<R::Addrs,
where
R: Resolve,
{
futures_util::future::poll_fn(|cx| resolver.poll_ready(cx)).await?;
resolver.resolve(name).await
}

Expand Down
8 changes: 2 additions & 6 deletions src/client/connect/http.rs
Expand Up @@ -18,6 +18,7 @@ use tracing::{debug, trace, warn};

use super::dns::{self, resolve, GaiResolver, Resolve};
use super::{Connected, Connection};
use crate::service::Service;
//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;

/// A connector for the `http` scheme.
Expand Down Expand Up @@ -251,7 +252,7 @@ impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
}
}

impl<R> tower_service::Service<Uri> for HttpConnector<R>
impl<R> Service<Uri> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
Expand All @@ -260,11 +261,6 @@ where
type Error = ConnectError;
type Future = HttpConnecting<R>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
Poll::Ready(Ok(()))
}

fn call(&mut self, dst: Uri) -> Self::Future {
let mut self_ = self.clone();
HttpConnecting {
Expand Down
16 changes: 6 additions & 10 deletions src/client/connect/mod.rs
Expand Up @@ -43,11 +43,6 @@
//! dyn Future<Output = Result<Self::Response, Self::Error>> + Send
//! >>;
//!
//! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
//! // This connector is always ready, but others might not be.
//! Poll::Ready(Ok(()))
//! }
//!
//! fn call(&mut self, _: Uri) -> Self::Future {
//! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
//! }
Expand Down Expand Up @@ -276,6 +271,7 @@ pub(super) mod sealed {

use super::Connection;
use crate::common::{Future, Unpin};
use crate::service::Service;

/// Connect to a destination, returning an IO transport.
///
Expand Down Expand Up @@ -306,28 +302,28 @@ pub(super) mod sealed {

impl<S, T> Connect for S
where
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S: Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
type _Svc = S;

fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S::Future> {
crate::service::oneshot(self, dst)
}
}

impl<S, T> ConnectSvc for S
where
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S: Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
type Connection = T;
type Error = S::Error;
type Future = crate::service::Oneshot<S, Uri>;
type Future = crate::service::Oneshot<S::Future>;

fn connect(self, _: Internal, dst: Uri) -> Self::Future {
crate::service::oneshot(self, dst)
Expand All @@ -336,7 +332,7 @@ pub(super) mod sealed {

impl<S, T> Sealed for S
where
S: tower_service::Service<Uri, Response = T> + Send,
S: Service<Uri, Response = T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
Expand Down
49 changes: 25 additions & 24 deletions src/client/tests.rs
@@ -1,27 +1,28 @@
use std::io;

use futures_util::future;
use tokio::net::TcpStream;

use super::Client;

#[tokio::test]
async fn client_connect_uri_argument() {
let connector = tower::service_fn(|dst: http::Uri| {
assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP));
assert_eq!(dst.host(), Some("example.local"));
assert_eq!(dst.port(), None);
assert_eq!(dst.path(), "/", "path should be removed");

future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
});

let client = Client::builder().build::<_, crate::Body>(connector);
let _ = client
.get("http://example.local/and/a/path".parse().unwrap())
.await
.expect_err("response should fail");
}
// use std::io;
//
// use futures_util::future;
// use tokio::net::TcpStream;
// use crate::service::service_fn;
//
// use super::Client;
//
// #[tokio::test]
// async fn client_connect_uri_argument() {
// let connector = service_fn(|dst: http::Uri| {
// assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP));
// assert_eq!(dst.host(), Some("example.local"));
// assert_eq!(dst.port(), None);
// assert_eq!(dst.path(), "/", "path should be removed");
//
// future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
// });
//
// let client = Client::builder().build::<_, crate::Body>(connector);
// let _ = client
// .get("http://example.local/and/a/path".parse().unwrap())
// .await
// .expect_err("response should fail");
// }

/*
// FIXME: re-implement tests with `async/await`
Expand Down
61 changes: 11 additions & 50 deletions src/service/oneshot.rs
@@ -1,73 +1,34 @@
// TODO: Eventually to be replaced with tower_util::Oneshot.

use pin_project_lite::pin_project;
use tower_service::Service;

use crate::common::{task, Future, Pin, Poll};
use crate::service::Service;

pub(crate) fn oneshot<S, Req>(svc: S, req: Req) -> Oneshot<S, Req>
pub(crate) fn oneshot<S, Req, F>(mut svc: S, req: Req) -> Oneshot<F>
where
S: Service<Req>,
S: Service<Req, Future = F>,
{
Oneshot {
state: State::NotReady { svc, req },
}
Oneshot { fut: svc.call(req) }
}

pin_project! {
// A `Future` consuming a `Service` and request, waiting until the `Service`
// is ready, and then calling `Service::call` with the request, and
// waiting for that `Future`.
#[allow(missing_debug_implementations)]
pub struct Oneshot<S: Service<Req>, Req> {
pub struct Oneshot<F> {
#[pin]
state: State<S, Req>,
}
}

pin_project! {
#[project = StateProj]
#[project_replace = StateProjOwn]
enum State<S: Service<Req>, Req> {
NotReady {
svc: S,
req: Req,
},
Called {
#[pin]
fut: S::Future,
},
Tmp,
fut: F,
}
}

impl<S, Req> Future for Oneshot<S, Req>
impl<F, Response, Error> Future for Oneshot<F>
where
S: Service<Req>,
F: Future<Output = Result<Response, Error>>,
{
type Output = Result<S::Response, S::Error>;
type Output = Result<Response, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();

loop {
match me.state.as_mut().project() {
StateProj::NotReady { ref mut svc, .. } => {
ready!(svc.poll_ready(cx))?;
// fallthrough out of the match's borrow
}
StateProj::Called { fut } => {
return fut.poll(cx);
}
StateProj::Tmp => unreachable!(),
}

match me.state.as_mut().project_replace(State::Tmp) {
StateProjOwn::NotReady { mut svc, req } => {
me.state.set(State::Called { fut: svc.call(req) });
}
_ => unreachable!(),
}
}
let me = self.project();
me.fut.poll(cx)
}
}
5 changes: 0 additions & 5 deletions tests/client.rs
Expand Up @@ -2120,11 +2120,6 @@ mod dispatch_impl {
type Error = <HttpConnector as hyper::service::Service<Uri>>::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// don't forget to check inner service is ready :)
hyper::service::Service::<Uri>::poll_ready(&mut self.http, cx)
}

fn call(&mut self, dst: Uri) -> Self::Future {
self.connects.fetch_add(1, Ordering::SeqCst);
let closes = self.closes.clone();
Expand Down

0 comments on commit 99ed036

Please sign in to comment.