From bb3af17ce1a3841e9170adabcce595c7c8743ea7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 15 Aug 2022 09:15:59 -0700 Subject: [PATCH] feat(client): remove higher-level `hyper::Client` (#2941) This removes the following types and methods from hyper: - `Client` - `Error::is_connect()` BREAKING CHANGE: A pooling client is in the hyper-util crate. --- src/body/body.rs | 93 +-- src/client/client.rs | 1356 ------------------------------------ src/client/conn/http2.rs | 14 +- src/client/conn/mod.rs | 123 +--- src/client/connect/mod.rs | 2 + src/client/dispatch.rs | 17 + src/client/mod.rs | 26 +- src/client/pool.rs | 1044 --------------------------- src/common/lazy.rs | 76 -- src/common/mod.rs | 6 - src/common/sync_wrapper.rs | 110 --- src/error.rs | 54 -- src/lib.rs | 3 - tests/client.rs | 1034 +-------------------------- 14 files changed, 34 insertions(+), 3924 deletions(-) delete mode 100644 src/client/client.rs delete mode 100644 src/client/pool.rs delete mode 100644 src/common/lazy.rs delete mode 100644 src/common/sync_wrapper.rs diff --git a/src/body/body.rs b/src/body/body.rs index 856aea3e29..008c37404f 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -10,8 +10,6 @@ use http_body::{Body as HttpBody, SizeHint}; use super::DecodedLength; use crate::common::Future; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -use crate::common::Never; use crate::common::{task, watch, Pin, Poll}; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; @@ -29,9 +27,6 @@ type TrailersSender = oneshot::Sender; #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, - /// Keep the extra bits in an `Option>`, so that - /// Body stays small in the common case (no extras needed). - extra: Option>, } enum Kind { @@ -52,34 +47,6 @@ enum Kind { Ffi(crate::ffi::UserBody), } -struct Extra { - /// Allow the client to pass a future to delay the `Body` from returning - /// EOF. This allows the `Client` to try to put the idle connection - /// back into the pool before the body is "finished". - /// - /// The reason for this is so that creating a new request after finishing - /// streaming the body of a response could sometimes result in creating - /// a brand new connection, since the pool didn't know about the idle - /// connection yet. - delayed_eof: Option, -} - -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -type DelayEofUntil = oneshot::Receiver; - -enum DelayEof { - /// Initial state, stream hasn't seen EOF yet. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - NotEof(DelayEofUntil), - /// Transitions to this state once we've seen `poll` try to - /// return EOF (`None`). This future is then polled, and - /// when it completes, the Body finally returns EOF (`None`). - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Eof(DelayEofUntil), -} - /// A sender half created through [`Body::channel()`]. /// /// Useful when wanting to stream chunks from another thread. @@ -153,7 +120,7 @@ impl Body { } fn new(kind: Kind) -> Body { - Body { kind, extra: None } + Body { kind } } #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] @@ -176,62 +143,6 @@ impl Body { body } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { - self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); - } - - fn take_delayed_eof(&mut self) -> Option { - self.extra - .as_mut() - .and_then(|extra| extra.delayed_eof.take()) - } - - #[cfg(any(feature = "http1", feature = "http2"))] - fn extra_mut(&mut self) -> &mut Extra { - self.extra - .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) - } - - fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll>> { - match self.take_delayed_eof() { - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { - ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { - self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); - ok - } - Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { - Poll::Ready(Ok(never)) => match never {}, - Poll::Pending => { - self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); - Poll::Pending - } - Poll::Ready(Err(_done)) => Poll::Ready(None), - }, - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - }, - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { - Poll::Ready(Ok(never)) => match never {}, - Poll::Pending => { - self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); - Poll::Pending - } - Poll::Ready(Err(_done)) => Poll::Ready(None), - }, - #[cfg(any( - not(any(feature = "http1", feature = "http2")), - not(feature = "client") - ))] - Some(delay_eof) => match delay_eof {}, - None => self.poll_inner(cx), - } - } - #[cfg(feature = "ffi")] pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { match self.kind { @@ -313,7 +224,7 @@ impl HttpBody for Body { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - self.poll_eof(cx) + self.poll_inner(cx) } fn poll_trailers( diff --git a/src/client/client.rs b/src/client/client.rs deleted file mode 100644 index e605b4506a..0000000000 --- a/src/client/client.rs +++ /dev/null @@ -1,1356 +0,0 @@ -use std::error::Error as StdError; -use std::fmt; -use std::mem; -use std::time::Duration; - -use futures_channel::oneshot; -use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; -use http::header::{HeaderValue, HOST}; -use http::uri::{Port, Scheme}; -use http::{Method, Request, Response, Uri, Version}; -use tracing::{debug, trace, warn}; - -use super::conn; -use super::connect::{self, sealed::Connect, Alpn, Connected, Connection}; -use super::pool::{ - self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation, -}; -use crate::body::{Body, HttpBody}; -use crate::common::{ - exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin, - Poll, -}; -use crate::rt::Executor; - -/// A Client to make outgoing HTTP requests. -/// -/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The -/// underlying connection pool will be reused. -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -pub struct Client { - config: Config, - conn_builder: conn::Builder, - connector: C, - pool: Pool>, -} - -#[derive(Clone, Copy, Debug)] -struct Config { - retry_canceled_requests: bool, - set_host: bool, - ver: Ver, -} - -/// A `Future` that will resolve to an HTTP Response. -/// -/// This is returned by `Client::request` (and `Client::get`). -#[must_use = "futures do nothing unless polled"] -pub struct ResponseFuture { - inner: SyncWrapper>> + Send>>>, -} - -// ===== impl Client ===== - -impl Client<(), Body> { - /// Create a builder to configure a new `Client`. - #[inline] - pub fn builder() -> Builder { - Builder::default() - } -} - -impl Client -where - C: Connect + Clone + Send + Sync + 'static, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - /// Send a `GET` request to the supplied `Uri`. - /// - /// # Note - /// - /// This requires that the `HttpBody` type have a `Default` implementation. - /// It *should* return an "empty" version of itself, such that - /// `HttpBody::is_end_stream` is `true`. - pub fn get(&self, uri: Uri) -> ResponseFuture - where - B: Default, - { - let body = B::default(); - if !body.is_end_stream() { - warn!("default HttpBody used for get() does not return true for is_end_stream"); - } - - let mut req = Request::new(body); - *req.uri_mut() = uri; - self.request(req) - } - - /// Send a constructed `Request` using this `Client`. - pub fn request(&self, mut req: Request) -> ResponseFuture { - let is_http_connect = req.method() == Method::CONNECT; - match req.version() { - Version::HTTP_11 => (), - Version::HTTP_10 => { - if is_http_connect { - warn!("CONNECT is not allowed for HTTP/1.0"); - return ResponseFuture::new(future::err( - crate::Error::new_user_unsupported_request_method(), - )); - } - } - Version::HTTP_2 => (), - // completely unsupported HTTP version (like HTTP/0.9)! - other => return ResponseFuture::error_version(other), - }; - - let pool_key = match extract_domain(req.uri_mut(), is_http_connect) { - Ok(s) => s, - Err(err) => { - return ResponseFuture::new(future::err(err)); - } - }; - - ResponseFuture::new(self.clone().retryably_send_request(req, pool_key)) - } - - async fn retryably_send_request( - self, - mut req: Request, - pool_key: PoolKey, - ) -> crate::Result> { - let uri = req.uri().clone(); - - loop { - req = match self.send_request(req, pool_key.clone()).await { - Ok(resp) => return Ok(resp), - Err(ClientError::Normal(err)) => return Err(err), - Err(ClientError::Canceled { - connection_reused, - mut req, - reason, - }) => { - if !self.config.retry_canceled_requests || !connection_reused { - // if client disabled, don't retry - // a fresh connection means we definitely can't retry - return Err(reason); - } - - trace!( - "unstarted request canceled, trying again (reason={:?})", - reason - ); - *req.uri_mut() = uri.clone(); - req - } - } - } - } - - async fn send_request( - &self, - mut req: Request, - pool_key: PoolKey, - ) -> Result, ClientError> { - let mut pooled = match self.connection_for(pool_key).await { - Ok(pooled) => pooled, - Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)), - Err(ClientConnectError::H2CheckoutIsClosed(reason)) => { - return Err(ClientError::Canceled { - connection_reused: true, - req, - reason, - }) - } - }; - - if pooled.is_http1() { - if req.version() == Version::HTTP_2 { - warn!("Connection is HTTP/1, but request requires HTTP/2"); - return Err(ClientError::Normal( - crate::Error::new_user_unsupported_version(), - )); - } - - if self.config.set_host { - let uri = req.uri().clone(); - req.headers_mut().entry(HOST).or_insert_with(|| { - let hostname = uri.host().expect("authority implies host"); - if let Some(port) = get_non_default_port(&uri) { - let s = format!("{}:{}", hostname, port); - HeaderValue::from_str(&s) - } else { - HeaderValue::from_str(hostname) - } - .expect("uri host is valid header value") - }); - } - - // CONNECT always sends authority-form, so check it first... - if req.method() == Method::CONNECT { - authority_form(req.uri_mut()); - } else if pooled.conn_info.is_proxied { - absolute_form(req.uri_mut()); - } else { - origin_form(req.uri_mut()); - } - } else if req.method() == Method::CONNECT { - authority_form(req.uri_mut()); - } - - let fut = pooled - .send_request_retryable(req) - .map_err(ClientError::map_with_reused(pooled.is_reused())); - - // If the Connector included 'extra' info, add to Response... - let extra_info = pooled.conn_info.extra.clone(); - let fut = fut.map_ok(move |mut res| { - if let Some(extra) = extra_info { - extra.set(res.extensions_mut()); - } - res - }); - - // As of futures@0.1.21, there is a race condition in the mpsc - // channel, such that sending when the receiver is closing can - // result in the message being stuck inside the queue. It won't - // ever notify until the Sender side is dropped. - // - // To counteract this, we must check if our senders 'want' channel - // has been closed after having tried to send. If so, error out... - if pooled.is_closed() { - return fut.await; - } - - let mut res = fut.await?; - - // If pooled is HTTP/2, we can toss this reference immediately. - // - // when pooled is dropped, it will try to insert back into the - // pool. To delay that, spawn a future that completes once the - // sender is ready again. - // - // This *should* only be once the related `Connection` has polled - // for a new request to start. - // - // It won't be ready if there is a body to stream. - if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { - drop(pooled); - } else if !res.body().is_end_stream() { - let (delayed_tx, delayed_rx) = oneshot::channel(); - res.body_mut().delayed_eof(delayed_rx); - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { - // At this point, `pooled` is dropped, and had a chance - // to insert into the pool (if conn was idle) - drop(delayed_tx); - }); - - self.conn_builder.exec.execute(on_idle); - } else { - // There's no body to delay, but the connection isn't - // ready yet. Only re-insert when it's ready - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); - - self.conn_builder.exec.execute(on_idle); - } - - Ok(res) - } - - async fn connection_for( - &self, - pool_key: PoolKey, - ) -> Result>, ClientConnectError> { - // This actually races 2 different futures to try to get a ready - // connection the fastest, and to reduce connection churn. - // - // - If the pool has an idle connection waiting, that's used - // immediately. - // - Otherwise, the Connector is asked to start connecting to - // the destination Uri. - // - Meanwhile, the pool Checkout is watching to see if any other - // request finishes and tries to insert an idle connection. - // - If a new connection is started, but the Checkout wins after - // (an idle connection became available first), the started - // connection future is spawned into the runtime to complete, - // and then be inserted into the pool as an idle connection. - let checkout = self.pool.checkout(pool_key.clone()); - let connect = self.connect_to(pool_key); - let is_ver_h2 = self.config.ver == Ver::Http2; - - // The order of the `select` is depended on below... - - match future::select(checkout, connect).await { - // Checkout won, connect future may have been started or not. - // - // If it has, let it finish and insert back into the pool, - // so as to not waste the socket... - Either::Left((Ok(checked_out), connecting)) => { - // This depends on the `select` above having the correct - // order, such that if the checkout future were ready - // immediately, the connect future will never have been - // started. - // - // If it *wasn't* ready yet, then the connect future will - // have been started... - if connecting.started() { - let bg = connecting - .map_err(|err| { - trace!("background connect error: {}", err); - }) - .map(|_pooled| { - // dropping here should just place it in - // the Pool for us... - }); - // An execute error here isn't important, we're just trying - // to prevent a waste of a socket... - self.conn_builder.exec.execute(bg); - } - Ok(checked_out) - } - // Connect won, checkout can just be dropped. - Either::Right((Ok(connected), _checkout)) => Ok(connected), - // Either checkout or connect could get canceled: - // - // 1. Connect is canceled if this is HTTP/2 and there is - // an outstanding HTTP/2 connecting task. - // 2. Checkout is canceled if the pool cannot deliver an - // idle connection reliably. - // - // In both cases, we should just wait for the other future. - Either::Left((Err(err), connecting)) => { - if err.is_canceled() { - connecting.await.map_err(ClientConnectError::Normal) - } else { - Err(ClientConnectError::Normal(err)) - } - } - Either::Right((Err(err), checkout)) => { - if err.is_canceled() { - checkout.await.map_err(move |err| { - if is_ver_h2 - && err.is_canceled() - && err.find_source::().is_some() - { - ClientConnectError::H2CheckoutIsClosed(err) - } else { - ClientConnectError::Normal(err) - } - }) - } else { - Err(ClientConnectError::Normal(err)) - } - } - } - } - - fn connect_to( - &self, - pool_key: PoolKey, - ) -> impl Lazy>>> + Unpin { - let executor = self.conn_builder.exec.clone(); - let pool = self.pool.clone(); - #[cfg(not(feature = "http2"))] - let conn_builder = self.conn_builder.clone(); - #[cfg(feature = "http2")] - let mut conn_builder = self.conn_builder.clone(); - let ver = self.config.ver; - let is_ver_h2 = ver == Ver::Http2; - let connector = self.connector.clone(); - let dst = domain_as_uri(pool_key.clone()); - hyper_lazy(move || { - // Try to take a "connecting lock". - // - // If the pool_key is for HTTP/2, and there is already a - // connection being established, then this can't take a - // second lock. The "connect_to" future is Canceled. - let connecting = match pool.connecting(&pool_key, ver) { - Some(lock) => lock, - None => { - let canceled = - crate::Error::new_canceled().with("HTTP/2 connection in progress"); - return Either::Right(future::err(canceled)); - } - }; - Either::Left( - connector - .connect(connect::sealed::Internal, dst) - .map_err(crate::Error::new_connect) - .and_then(move |io| { - let connected = io.connected(); - // If ALPN is h2 and we aren't http2_only already, - // then we need to convert our pool checkout into - // a single HTTP2 one. - let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { - match connecting.alpn_h2(&pool) { - Some(lock) => { - trace!("ALPN negotiated h2, updating pool"); - lock - } - None => { - // Another connection has already upgraded, - // the pool checkout should finish up for us. - let canceled = crate::Error::new_canceled() - .with("ALPN upgraded to HTTP/2"); - return Either::Right(future::err(canceled)); - } - } - } else { - connecting - }; - - #[cfg_attr(not(feature = "http2"), allow(unused))] - let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; - #[cfg(feature = "http2")] - { - conn_builder.http2_only(is_h2); - } - - Either::Left(Box::pin(async move { - let (tx, conn) = conn_builder.handshake(io).await?; - - trace!("handshake complete, spawning background dispatcher task"); - executor.execute( - conn.map_err(|e| debug!("client connection error: {}", e)) - .map(|_| ()), - ); - - // Wait for 'conn' to ready up before we - // declare this tx as usable - let tx = tx.when_ready().await?; - - let tx = { - #[cfg(feature = "http2")] - { - if is_h2 { - PoolTx::Http2(tx.into_http2()) - } else { - PoolTx::Http1(tx) - } - } - #[cfg(not(feature = "http2"))] - PoolTx::Http1(tx) - }; - - Ok(pool.pooled( - connecting, - PoolClient { - conn_info: connected, - tx, - }, - )) - })) - }), - ) - }) - } -} - -impl tower_service::Service> for Client -where - C: Connect + Clone + Send + Sync + 'static, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Response = Response; - type Error = crate::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.request(req) - } -} - -impl tower_service::Service> for &'_ Client -where - C: Connect + Clone + Send + Sync + 'static, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Response = Response; - type Error = crate::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.request(req) - } -} - -impl Clone for Client { - fn clone(&self) -> Client { - Client { - config: self.config.clone(), - conn_builder: self.conn_builder.clone(), - connector: self.connector.clone(), - pool: self.pool.clone(), - } - } -} - -impl fmt::Debug for Client { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Client").finish() - } -} - -// ===== impl ResponseFuture ===== - -impl ResponseFuture { - fn new(value: F) -> Self - where - F: Future>> + Send + 'static, - { - Self { - inner: SyncWrapper::new(Box::pin(value)), - } - } - - fn error_version(ver: Version) -> Self { - warn!("Request has unsupported version \"{:?}\"", ver); - ResponseFuture::new(Box::pin(future::err( - crate::Error::new_user_unsupported_version(), - ))) - } -} - -impl fmt::Debug for ResponseFuture { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Future") - } -} - -impl Future for ResponseFuture { - type Output = crate::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.inner.get_mut().as_mut().poll(cx) - } -} - -// ===== impl PoolClient ===== - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -struct PoolClient { - conn_info: Connected, - tx: PoolTx, -} - -enum PoolTx { - Http1(conn::SendRequest), - #[cfg(feature = "http2")] - Http2(conn::Http2SendRequest), -} - -impl PoolClient { - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - match self.tx { - PoolTx::Http1(ref mut tx) => tx.poll_ready(cx), - #[cfg(feature = "http2")] - PoolTx::Http2(_) => Poll::Ready(Ok(())), - } - } - - fn is_http1(&self) -> bool { - !self.is_http2() - } - - fn is_http2(&self) -> bool { - match self.tx { - PoolTx::Http1(_) => false, - #[cfg(feature = "http2")] - PoolTx::Http2(_) => true, - } - } - - fn is_ready(&self) -> bool { - match self.tx { - PoolTx::Http1(ref tx) => tx.is_ready(), - #[cfg(feature = "http2")] - PoolTx::Http2(ref tx) => tx.is_ready(), - } - } - - fn is_closed(&self) -> bool { - match self.tx { - PoolTx::Http1(ref tx) => tx.is_closed(), - #[cfg(feature = "http2")] - PoolTx::Http2(ref tx) => tx.is_closed(), - } - } -} - -impl PoolClient { - fn send_request_retryable( - &mut self, - req: Request, - ) -> impl Future, (crate::Error, Option>)>> - where - B: Send, - { - match self.tx { - #[cfg(not(feature = "http2"))] - PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req), - #[cfg(feature = "http2")] - PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)), - #[cfg(feature = "http2")] - PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)), - } - } -} - -impl Poolable for PoolClient -where - B: Send + 'static, -{ - fn is_open(&self) -> bool { - match self.tx { - PoolTx::Http1(ref tx) => tx.is_ready(), - #[cfg(feature = "http2")] - PoolTx::Http2(ref tx) => tx.is_ready(), - } - } - - fn reserve(self) -> Reservation { - match self.tx { - PoolTx::Http1(tx) => Reservation::Unique(PoolClient { - conn_info: self.conn_info, - tx: PoolTx::Http1(tx), - }), - #[cfg(feature = "http2")] - PoolTx::Http2(tx) => { - let b = PoolClient { - conn_info: self.conn_info.clone(), - tx: PoolTx::Http2(tx.clone()), - }; - let a = PoolClient { - conn_info: self.conn_info, - tx: PoolTx::Http2(tx), - }; - Reservation::Shared(a, b) - } - } - } - - fn can_share(&self) -> bool { - self.is_http2() - } -} - -// ===== impl ClientError ===== - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -enum ClientError { - Normal(crate::Error), - Canceled { - connection_reused: bool, - req: Request, - reason: crate::Error, - }, -} - -impl ClientError { - fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option>)) -> Self { - move |(err, orig_req)| { - if let Some(req) = orig_req { - ClientError::Canceled { - connection_reused: conn_reused, - reason: err, - req, - } - } else { - ClientError::Normal(err) - } - } - } -} - -enum ClientConnectError { - Normal(crate::Error), - H2CheckoutIsClosed(crate::Error), -} - -/// A marker to identify what version a pooled connection is. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(super) enum Ver { - Auto, - Http2, -} - -fn origin_form(uri: &mut Uri) { - let path = match uri.path_and_query() { - Some(path) if path.as_str() != "/" => { - let mut parts = ::http::uri::Parts::default(); - parts.path_and_query = Some(path.clone()); - Uri::from_parts(parts).expect("path is valid uri") - } - _none_or_just_slash => { - debug_assert!(Uri::default() == "/"); - Uri::default() - } - }; - *uri = path -} - -fn absolute_form(uri: &mut Uri) { - debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme"); - debug_assert!( - uri.authority().is_some(), - "absolute_form needs an authority" - ); - // If the URI is to HTTPS, and the connector claimed to be a proxy, - // then it *should* have tunneled, and so we don't want to send - // absolute-form in that case. - if uri.scheme() == Some(&Scheme::HTTPS) { - origin_form(uri); - } -} - -fn authority_form(uri: &mut Uri) { - if let Some(path) = uri.path_and_query() { - // `https://hyper.rs` would parse with `/` path, don't - // annoy people about that... - if path != "/" { - warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path); - } - } - *uri = match uri.authority() { - Some(auth) => { - let mut parts = ::http::uri::Parts::default(); - parts.authority = Some(auth.clone()); - Uri::from_parts(parts).expect("authority is valid") - } - None => { - unreachable!("authority_form with relative uri"); - } - }; -} - -fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result { - let uri_clone = uri.clone(); - match (uri_clone.scheme(), uri_clone.authority()) { - (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())), - (None, Some(auth)) if is_http_connect => { - let scheme = match auth.port_u16() { - Some(443) => { - set_scheme(uri, Scheme::HTTPS); - Scheme::HTTPS - } - _ => { - set_scheme(uri, Scheme::HTTP); - Scheme::HTTP - } - }; - Ok((scheme, auth.clone())) - } - _ => { - debug!("Client requires absolute-form URIs, received: {:?}", uri); - Err(crate::Error::new_user_absolute_uri_required()) - } - } -} - -fn domain_as_uri((scheme, auth): PoolKey) -> Uri { - http::uri::Builder::new() - .scheme(scheme) - .authority(auth) - .path_and_query("/") - .build() - .expect("domain is valid Uri") -} - -fn set_scheme(uri: &mut Uri, scheme: Scheme) { - debug_assert!( - uri.scheme().is_none(), - "set_scheme expects no existing scheme" - ); - let old = mem::replace(uri, Uri::default()); - let mut parts: ::http::uri::Parts = old.into(); - parts.scheme = Some(scheme); - parts.path_and_query = Some("/".parse().expect("slash is a valid path")); - *uri = Uri::from_parts(parts).expect("scheme is valid"); -} - -fn get_non_default_port(uri: &Uri) -> Option> { - match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) { - (Some(443), true) => None, - (Some(80), false) => None, - _ => uri.port(), - } -} - -fn is_schema_secure(uri: &Uri) -> bool { - uri.scheme_str() - .map(|scheme_str| matches!(scheme_str, "wss" | "https")) - .unwrap_or_default() -} - -/// A builder to configure a new [`Client`](Client). -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -#[derive(Clone)] -pub struct Builder { - client_config: Config, - conn_builder: conn::Builder, - pool_config: pool::Config, -} - -impl Default for Builder { - fn default() -> Self { - Self { - client_config: Config { - retry_canceled_requests: true, - set_host: true, - ver: Ver::Auto, - }, - conn_builder: conn::Builder::new(), - pool_config: pool::Config { - idle_timeout: Some(Duration::from_secs(90)), - max_idle_per_host: std::usize::MAX, - }, - } - } -} - -impl Builder { - #[doc(hidden)] - #[deprecated( - note = "name is confusing, to disable the connection pool, call pool_max_idle_per_host(0)" - )] - pub fn keep_alive(&mut self, val: bool) -> &mut Self { - if !val { - // disable - self.pool_max_idle_per_host(0) - } else if self.pool_config.max_idle_per_host == 0 { - // enable - self.pool_max_idle_per_host(std::usize::MAX) - } else { - // already enabled - self - } - } - - #[doc(hidden)] - #[deprecated(note = "renamed to `pool_idle_timeout`")] - pub fn keep_alive_timeout(&mut self, val: D) -> &mut Self - where - D: Into>, - { - self.pool_idle_timeout(val) - } - - /// Set an optional timeout for idle sockets being kept-alive. - /// - /// Pass `None` to disable timeout. - /// - /// Default is 90 seconds. - pub fn pool_idle_timeout(&mut self, val: D) -> &mut Self - where - D: Into>, - { - self.pool_config.idle_timeout = val.into(); - self - } - - #[doc(hidden)] - #[deprecated(note = "renamed to `pool_max_idle_per_host`")] - pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { - self.pool_config.max_idle_per_host = max_idle; - self - } - - /// Sets the maximum idle connection per host allowed in the pool. - /// - /// Default is `usize::MAX` (no limit). - pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { - self.pool_config.max_idle_per_host = max_idle; - self - } - - // HTTP/1 options - - /// Sets the exact size of the read buffer to *always* use. - /// - /// Note that setting this option unsets the `http1_max_buf_size` option. - /// - /// Default is an adaptive read buffer. - pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self { - self.conn_builder.http1_read_buf_exact_size(Some(sz)); - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// Note that setting this option unsets the `http1_read_exact_buf_size` option. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { - self.conn_builder.http1_max_buf_size(max); - self - } - - /// Set whether HTTP/1 connections will accept spaces between header names - /// and the colon that follow them in responses. - /// - /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when - /// parsing. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > No whitespace is allowed between the header field-name and colon. In - /// > the past, differences in the handling of such whitespace have led to - /// > security vulnerabilities in request routing and response handling. A - /// > server MUST reject any received request message that contains - /// > whitespace between a header field-name and colon with a response code - /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a - /// > response message before forwarding the message downstream. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self { - self.conn_builder - .http1_allow_spaces_after_header_name_in_responses(val); - self - } - - /// Set whether HTTP/1 connections will accept obsolete line folding for - /// header values. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > A server that receives an obs-fold in a request message that is not - /// > within a message/http container MUST either reject the message by - /// > sending a 400 (Bad Request), preferably with a representation - /// > explaining that obsolete line folding is unacceptable, or replace - /// > each received obs-fold with one or more SP octets prior to - /// > interpreting the field value or forwarding the message downstream. - /// - /// > A proxy or gateway that receives an obs-fold in a response message - /// > that is not within a message/http container MUST either discard the - /// > message and replace it with a 502 (Bad Gateway) response, preferably - /// > with a representation explaining that unacceptable line folding was - /// > received, or replace each received obs-fold with one or more SP - /// > octets prior to interpreting the field value or forwarding the - /// > message downstream. - /// - /// > A user agent that receives an obs-fold in a response message that is - /// > not within a message/http container MUST replace each received - /// > obs-fold with one or more SP octets prior to interpreting the field - /// > value. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self { - self.conn_builder - .http1_allow_obsolete_multiline_headers_in_responses(val); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { - self.conn_builder.http1_writev(enabled); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self { - self.conn_builder.http1_title_case_headers(val); - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Response`. It will also look for and use - /// such an extension in any provided `Request`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self { - self.conn_builder.http1_preserve_header_case(val); - self - } - - /// Set whether HTTP/0.9 responses should be tolerated. - /// - /// Default is false. - pub fn http09_responses(&mut self, val: bool) -> &mut Self { - self.conn_builder.http09_responses(val); - self - } - - /// Set whether the connection **must** use HTTP/2. - /// - /// The destination must either allow HTTP2 Prior Knowledge, or the - /// `Connect` should be configured to do use ALPN to upgrade to `h2` - /// as part of the connection process. This will not make the `Client` - /// utilize ALPN by itself. - /// - /// Note that setting this to true prevents HTTP/1 from being allowed. - /// - /// Default is false. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_only(&mut self, val: bool) -> &mut Self { - self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto }; - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.conn_builder - .http2_initial_stream_window_size(sz.into()); - self - } - - /// Sets the max connection-level flow control for HTTP2 - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - self.conn_builder - .http2_initial_connection_window_size(sz.into()); - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - self.conn_builder.http2_adaptive_window(enabled); - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - self.conn_builder.http2_max_frame_size(sz); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.conn_builder.http2_keep_alive_interval(interval); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.conn_builder.http2_keep_alive_timeout(timeout); - self - } - - /// Sets whether HTTP2 keep-alive should apply while the connection is idle. - /// - /// If disabled, keep-alive pings are only sent while there are open - /// request/responses streams. If enabled, pings are also sent when no - /// streams are active. Does nothing if `http2_keep_alive_interval` is - /// disabled. - /// - /// Default is `false`. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { - self.conn_builder.http2_keep_alive_while_idle(enabled); - self - } - - /// Sets the maximum number of HTTP2 concurrent locally reset streams. - /// - /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more - /// details. - /// - /// The default value is determined by the `h2` crate. - /// - /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { - self.conn_builder.http2_max_concurrent_reset_streams(max); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently 1MB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { - self.conn_builder.http2_max_send_buf_size(max); - self - } - - /// Set whether to retry requests that get disrupted before ever starting - /// to write. - /// - /// This means a request that is queued, and gets given an idle, reused - /// connection, and then encounters an error immediately as the idle - /// connection was found to be unusable. - /// - /// When this is set to `false`, the related `ResponseFuture` would instead - /// resolve to an `Error::Cancel`. - /// - /// Default is `true`. - #[inline] - pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self { - self.client_config.retry_canceled_requests = val; - self - } - - /// Set whether to automatically add the `Host` header to requests. - /// - /// If true, and a request does not include a `Host` header, one will be - /// added automatically, derived from the authority of the `Uri`. - /// - /// Default is `true`. - #[inline] - pub fn set_host(&mut self, val: bool) -> &mut Self { - self.client_config.set_host = val; - self - } - - /// Provide an executor to execute background `Connection` tasks. - pub fn executor(&mut self, exec: E) -> &mut Self - where - E: Executor + Send + Sync + 'static, - { - self.conn_builder.executor(exec); - self - } - - /// Combine the configuration of this builder with a connector to create a `Client`. - pub fn build(&self, connector: C) -> Client - where - C: Connect + Clone, - B: HttpBody + Send, - B::Data: Send, - { - Client { - config: self.client_config, - conn_builder: self.conn_builder.clone(), - connector, - pool: Pool::new(self.pool_config, &self.conn_builder.exec), - } - } -} - -impl fmt::Debug for Builder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Builder") - .field("client_config", &self.client_config) - .field("conn_builder", &self.conn_builder) - .field("pool_config", &self.pool_config) - .finish() - } -} - -#[cfg(test)] -mod unit_tests { - use super::*; - - #[test] - fn response_future_is_sync() { - fn assert_sync() {} - assert_sync::(); - } - - #[test] - fn set_relative_uri_with_implicit_path() { - let mut uri = "http://hyper.rs".parse().unwrap(); - origin_form(&mut uri); - assert_eq!(uri.to_string(), "/"); - } - - #[test] - fn test_origin_form() { - let mut uri = "http://hyper.rs/guides".parse().unwrap(); - origin_form(&mut uri); - assert_eq!(uri.to_string(), "/guides"); - - let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap(); - origin_form(&mut uri); - assert_eq!(uri.to_string(), "/guides?foo=bar"); - } - - #[test] - fn test_absolute_form() { - let mut uri = "http://hyper.rs/guides".parse().unwrap(); - absolute_form(&mut uri); - assert_eq!(uri.to_string(), "http://hyper.rs/guides"); - - let mut uri = "https://hyper.rs/guides".parse().unwrap(); - absolute_form(&mut uri); - assert_eq!(uri.to_string(), "/guides"); - } - - #[test] - fn test_authority_form() { - let _ = pretty_env_logger::try_init(); - - let mut uri = "http://hyper.rs".parse().unwrap(); - authority_form(&mut uri); - assert_eq!(uri.to_string(), "hyper.rs"); - - let mut uri = "hyper.rs".parse().unwrap(); - authority_form(&mut uri); - assert_eq!(uri.to_string(), "hyper.rs"); - } - - #[test] - fn test_extract_domain_connect_no_port() { - let mut uri = "hyper.rs".parse().unwrap(); - let (scheme, host) = extract_domain(&mut uri, true).expect("extract domain"); - assert_eq!(scheme, *"http"); - assert_eq!(host, "hyper.rs"); - } - - #[test] - fn test_is_secure() { - assert_eq!( - is_schema_secure(&"http://hyper.rs".parse::().unwrap()), - false - ); - assert_eq!(is_schema_secure(&"hyper.rs".parse::().unwrap()), false); - assert_eq!( - is_schema_secure(&"wss://hyper.rs".parse::().unwrap()), - true - ); - assert_eq!( - is_schema_secure(&"ws://hyper.rs".parse::().unwrap()), - false - ); - } - - #[test] - fn test_get_non_default_port() { - assert!(get_non_default_port(&"http://hyper.rs".parse::().unwrap()).is_none()); - assert!(get_non_default_port(&"http://hyper.rs:80".parse::().unwrap()).is_none()); - assert!(get_non_default_port(&"https://hyper.rs:443".parse::().unwrap()).is_none()); - assert!(get_non_default_port(&"hyper.rs:80".parse::().unwrap()).is_none()); - - assert_eq!( - get_non_default_port(&"http://hyper.rs:123".parse::().unwrap()) - .unwrap() - .as_u16(), - 123 - ); - assert_eq!( - get_non_default_port(&"https://hyper.rs:80".parse::().unwrap()) - .unwrap() - .as_u16(), - 80 - ); - assert_eq!( - get_non_default_port(&"hyper.rs:123".parse::().unwrap()) - .unwrap() - .as_u16(), - 123 - ); - } -} diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 5e63f51b35..3be24ed080 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -22,7 +22,7 @@ use super::super::dispatch; /// The sender side of an established connection. pub struct SendRequest { - dispatch: dispatch::Sender, Response>, + dispatch: dispatch::UnboundedSender, Response>, } /// A future that processes all HTTP state for the IO object. @@ -66,8 +66,12 @@ impl SendRequest { /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.dispatch.poll_ready(cx) + pub fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + if self.is_closed() { + Poll::Ready(Err(crate::Error::new_closed())) + } else { + Poll::Ready(Ok(())) + } } /* @@ -83,11 +87,11 @@ impl SendRequest { pub(super) fn is_ready(&self) -> bool { self.dispatch.is_ready() } + */ pub(super) fn is_closed(&self) -> bool { self.dispatch.is_closed() } - */ } impl SendRequest @@ -423,7 +427,7 @@ impl Builder { proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec) .await?; Ok(( - SendRequest { dispatch: tx }, + SendRequest { dispatch: tx.unbound() }, Connection { inner: (PhantomData, h2) }, )) } diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 430497a7e3..7880c65a95 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -63,7 +63,7 @@ use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use futures_util::future::{self, Either, FutureExt as _}; +use futures_util::future; use httparse::ParserConfig; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; @@ -214,16 +214,6 @@ pub struct Parts { _inner: (), } -// ========== internal client api - -// A `SendRequest` that can be cloned to send HTTP2 requests. -// private for now, probably not a great idea of a type... -#[must_use = "futures do nothing unless polled"] -#[cfg(feature = "http2")] -pub(super) struct Http2SendRequest { - dispatch: dispatch::UnboundedSender, Response>, -} - // ===== impl SendRequest impl SendRequest { @@ -233,30 +223,6 @@ impl SendRequest { pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { self.dispatch.poll_ready(cx) } - - pub(super) async fn when_ready(self) -> crate::Result { - let mut me = Some(self); - future::poll_fn(move |cx| { - ready!(me.as_mut().unwrap().poll_ready(cx))?; - Poll::Ready(Ok(me.take().unwrap())) - }) - .await - } - - pub(super) fn is_ready(&self) -> bool { - self.dispatch.is_ready() - } - - pub(super) fn is_closed(&self) -> bool { - self.dispatch.is_closed() - } - - #[cfg(feature = "http2")] - pub(super) fn into_http2(self) -> Http2SendRequest { - Http2SendRequest { - dispatch: self.dispatch.unbound(), - } - } } impl SendRequest @@ -316,32 +282,6 @@ where ResponseFuture { inner } } - - pub(super) fn send_request_retryable( - &mut self, - req: Request, - ) -> impl Future, (crate::Error, Option>)>> + Unpin - where - B: Send, - { - match self.dispatch.try_send(req) { - Ok(rx) => { - Either::Left(rx.then(move |res| { - match res { - Ok(Ok(res)) => future::ok(res), - Ok(Err(err)) => future::err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - } - })) - } - Err(req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - Either::Right(future::err((err, Some(req)))) - } - } - } } impl Service> for SendRequest @@ -367,67 +307,6 @@ impl fmt::Debug for SendRequest { } } -// ===== impl Http2SendRequest - -#[cfg(feature = "http2")] -impl Http2SendRequest { - pub(super) fn is_ready(&self) -> bool { - self.dispatch.is_ready() - } - - pub(super) fn is_closed(&self) -> bool { - self.dispatch.is_closed() - } -} - -#[cfg(feature = "http2")] -impl Http2SendRequest -where - B: HttpBody + 'static, -{ - pub(super) fn send_request_retryable( - &mut self, - req: Request, - ) -> impl Future, (crate::Error, Option>)>> - where - B: Send, - { - match self.dispatch.try_send(req) { - Ok(rx) => { - Either::Left(rx.then(move |res| { - match res { - Ok(Ok(res)) => future::ok(res), - Ok(Err(err)) => future::err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - } - })) - } - Err(req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - Either::Right(future::err((err, Some(req)))) - } - } - } -} - -#[cfg(feature = "http2")] -impl fmt::Debug for Http2SendRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Http2SendRequest").finish() - } -} - -#[cfg(feature = "http2")] -impl Clone for Http2SendRequest { - fn clone(&self) -> Self { - Http2SendRequest { - dispatch: self.dispatch.clone(), - } - } -} - // ===== impl Connection impl Connection diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 7a54048d03..7b8d5c7e87 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -166,6 +166,7 @@ impl Connected { self.alpn == Alpn::H2 } + /* // Don't public expose that `Connected` is `Clone`, unsure if we want to // keep that contract... #[cfg(feature = "http2")] @@ -176,6 +177,7 @@ impl Connected { extra: self.extra.clone(), } } + */ } // ===== impl Extra ===== diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 21b111bbf2..68de43b347 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -7,6 +7,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::common::Pin; use crate::common::{task, Poll}; +#[cfg(test)] pub(crate) type RetryPromise = oneshot::Receiver)>>; pub(crate) type Promise = oneshot::Receiver>; @@ -58,13 +59,16 @@ impl Sender { .map_err(|_| crate::Error::new_closed()) } + #[cfg(test)] pub(crate) fn is_ready(&self) -> bool { self.giver.is_wanting() } + /* pub(crate) fn is_closed(&self) -> bool { self.giver.is_canceled() } + */ fn can_send(&mut self) -> bool { if self.giver.give() || !self.buffered_once { @@ -79,6 +83,7 @@ impl Sender { } } + #[cfg(test)] pub(crate) fn try_send(&mut self, val: T) -> Result, T> { if !self.can_send() { return Err(val); @@ -112,14 +117,17 @@ impl Sender { #[cfg(feature = "http2")] impl UnboundedSender { + /* pub(crate) fn is_ready(&self) -> bool { !self.giver.is_canceled() } + */ pub(crate) fn is_closed(&self) -> bool { self.giver.is_canceled() } + #[cfg(test)] pub(crate) fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); self.inner @@ -127,6 +135,14 @@ impl UnboundedSender { .map(move |_| rx) .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } + + pub(crate) fn send(&mut self, val: T) -> Result, T> { + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::NoRetry(tx))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } } #[cfg(feature = "http2")] @@ -198,6 +214,7 @@ impl Drop for Envelope { } pub(crate) enum Callback { + #[allow(unused)] Retry(oneshot::Sender)>>), NoRetry(oneshot::Sender>), } diff --git a/src/client/mod.rs b/src/client/mod.rs index fbd8eae6ac..62e3d25e7c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,31 +1,11 @@ //! HTTP Client //! -//! There are two levels of APIs provided for construct HTTP clients: -//! -//! - The higher-level [`Client`](Client) type. -//! - The lower-level [`conn`](conn) module. -//! -//! # Client -//! -//! The [`Client`](Client) is the main way to send HTTP requests to a server. -//! The default `Client` provides these things on top of the lower-level API: -//! -//! - A default **connector**, able to resolve hostnames and connect to -//! destinations over plain-text TCP. -//! - A **pool** of existing connections, allowing better performance when -//! making multiple requests to the same hostname. -//! - Automatic setting of the `Host` header, based on the request `Uri`. -//! - Automatic request **retries** when a pooled connection is closed by the -//! server before any bytes have been written. -//! -//! Many of these features can configured, by making use of -//! [`Client::builder`](Client::builder). +//! hyper provides HTTP over a single connection. See the [`conn`](conn) module. //! //! ## Example //! //! For a small example program simply fetching a URL, take a look at the //! [full client example](https://github.com/hyperium/hyper/blob/master/examples/client.rs). -//! pub mod connect; #[cfg(all(test, feature = "runtime"))] @@ -34,10 +14,6 @@ mod tests; cfg_feature! { #![any(feature = "http1", feature = "http2")] - pub use self::client::{Builder, Client, ResponseFuture}; - - mod client; pub mod conn; pub(super) mod dispatch; - mod pool; } diff --git a/src/client/pool.rs b/src/client/pool.rs deleted file mode 100644 index b9772d688d..0000000000 --- a/src/client/pool.rs +++ /dev/null @@ -1,1044 +0,0 @@ -use std::collections::{HashMap, HashSet, VecDeque}; -use std::error::Error as StdError; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex, Weak}; - -#[cfg(not(feature = "runtime"))] -use std::time::{Duration, Instant}; - -use futures_channel::oneshot; -#[cfg(feature = "runtime")] -use tokio::time::{Duration, Instant, Interval}; -use tracing::{debug, trace}; - -use super::client::Ver; -use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -pub(super) struct Pool { - // If the pool is disabled, this is None. - inner: Option>>>, -} - -// Before using a pooled connection, make sure the sender is not dead. -// -// This is a trait to allow the `client::pool::tests` to work for `i32`. -// -// See https://github.com/hyperium/hyper/issues/1429 -pub(super) trait Poolable: Unpin + Send + Sized + 'static { - fn is_open(&self) -> bool; - /// Reserve this connection. - /// - /// Allows for HTTP/2 to return a shared reservation. - fn reserve(self) -> Reservation; - fn can_share(&self) -> bool; -} - -/// When checking out a pooled connection, it might be that the connection -/// only supports a single reservation, or it might be usable for many. -/// -/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be -/// used for multiple requests. -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -pub(super) enum Reservation { - /// This connection could be used multiple times, the first one will be - /// reinserted into the `idle` pool, and the second will be given to - /// the `Checkout`. - #[cfg(feature = "http2")] - Shared(T, T), - /// This connection requires unique access. It will be returned after - /// use is complete. - Unique(T), -} - -/// Simple type alias in case the key type needs to be adjusted. -pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc; - -struct PoolInner { - // A flag that a connection is being established, and the connection - // should be shared. This prevents making multiple HTTP/2 connections - // to the same host. - connecting: HashSet, - // These are internal Conns sitting in the event loop in the KeepAlive - // state, waiting to receive a new Request to send on the socket. - idle: HashMap>>, - max_idle_per_host: usize, - // These are outstanding Checkouts that are waiting for a socket to be - // able to send a Request one. This is used when "racing" for a new - // connection. - // - // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait - // for the Pool to receive an idle Conn. When a Conn becomes idle, - // this list is checked for any parked Checkouts, and tries to notify - // them that the Conn could be used instead of waiting for a brand new - // connection. - waiters: HashMap>>, - // A oneshot channel is used to allow the interval to be notified when - // the Pool completely drops. That way, the interval can cancel immediately. - #[cfg(feature = "runtime")] - idle_interval_ref: Option>, - #[cfg(feature = "runtime")] - exec: Exec, - timeout: Option, -} - -// This is because `Weak::new()` *allocates* space for `T`, even if it -// doesn't need it! -struct WeakOpt(Option>); - -#[derive(Clone, Copy, Debug)] -pub(super) struct Config { - pub(super) idle_timeout: Option, - pub(super) max_idle_per_host: usize, -} - -impl Config { - pub(super) fn is_enabled(&self) -> bool { - self.max_idle_per_host > 0 - } -} - -impl Pool { - pub(super) fn new(config: Config, __exec: &Exec) -> Pool { - let inner = if config.is_enabled() { - Some(Arc::new(Mutex::new(PoolInner { - connecting: HashSet::new(), - idle: HashMap::new(), - #[cfg(feature = "runtime")] - idle_interval_ref: None, - max_idle_per_host: config.max_idle_per_host, - waiters: HashMap::new(), - #[cfg(feature = "runtime")] - exec: __exec.clone(), - timeout: config.idle_timeout, - }))) - } else { - None - }; - - Pool { inner } - } - - fn is_enabled(&self) -> bool { - self.inner.is_some() - } - - #[cfg(test)] - pub(super) fn no_timer(&self) { - // Prevent an actual interval from being created for this pool... - #[cfg(feature = "runtime")] - { - let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); - assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); - let (tx, _) = oneshot::channel(); - inner.idle_interval_ref = Some(tx); - } - } -} - -impl Pool { - /// Returns a `Checkout` which is a future that resolves if an idle - /// connection becomes available. - pub(super) fn checkout(&self, key: Key) -> Checkout { - Checkout { - key, - pool: self.clone(), - waiter: None, - } - } - - /// Ensure that there is only ever 1 connecting task for HTTP/2 - /// connections. This does nothing for HTTP/1. - pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option> { - if ver == Ver::Http2 { - if let Some(ref enabled) = self.inner { - let mut inner = enabled.lock().unwrap(); - return if inner.connecting.insert(key.clone()) { - let connecting = Connecting { - key: key.clone(), - pool: WeakOpt::downgrade(enabled), - }; - Some(connecting) - } else { - trace!("HTTP/2 connecting already in progress for {:?}", key); - None - }; - } - } - - // else - Some(Connecting { - key: key.clone(), - // in HTTP/1's case, there is never a lock, so we don't - // need to do anything in Drop. - pool: WeakOpt::none(), - }) - } - - #[cfg(test)] - fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner> { - self.inner.as_ref().expect("enabled").lock().expect("lock") - } - - /* Used in client/tests.rs... - #[cfg(feature = "runtime")] - #[cfg(test)] - pub(super) fn h1_key(&self, s: &str) -> Key { - Arc::new(s.to_string()) - } - - #[cfg(feature = "runtime")] - #[cfg(test)] - pub(super) fn idle_count(&self, key: &Key) -> usize { - self - .locked() - .idle - .get(key) - .map(|list| list.len()) - .unwrap_or(0) - } - */ - - pub(super) fn pooled( - &self, - #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting, - value: T, - ) -> Pooled { - let (value, pool_ref) = if let Some(ref enabled) = self.inner { - match value.reserve() { - #[cfg(feature = "http2")] - Reservation::Shared(to_insert, to_return) => { - let mut inner = enabled.lock().unwrap(); - inner.put(connecting.key.clone(), to_insert, enabled); - // Do this here instead of Drop for Connecting because we - // already have a lock, no need to lock the mutex twice. - inner.connected(&connecting.key); - // prevent the Drop of Connecting from repeating inner.connected() - connecting.pool = WeakOpt::none(); - - // Shared reservations don't need a reference to the pool, - // since the pool always keeps a copy. - (to_return, WeakOpt::none()) - } - Reservation::Unique(value) => { - // Unique reservations must take a reference to the pool - // since they hope to reinsert once the reservation is - // completed - (value, WeakOpt::downgrade(enabled)) - } - } - } else { - // If pool is not enabled, skip all the things... - - // The Connecting should have had no pool ref - debug_assert!(connecting.pool.upgrade().is_none()); - - (value, WeakOpt::none()) - }; - Pooled { - key: connecting.key.clone(), - is_reused: false, - pool: pool_ref, - value: Some(value), - } - } - - fn reuse(&self, key: &Key, value: T) -> Pooled { - debug!("reuse idle connection for {:?}", key); - // TODO: unhack this - // In Pool::pooled(), which is used for inserting brand new connections, - // there's some code that adjusts the pool reference taken depending - // on if the Reservation can be shared or is unique. By the time - // reuse() is called, the reservation has already been made, and - // we just have the final value, without knowledge of if this is - // unique or shared. So, the hack is to just assume Ver::Http2 means - // shared... :( - let mut pool_ref = WeakOpt::none(); - if !value.can_share() { - if let Some(ref enabled) = self.inner { - pool_ref = WeakOpt::downgrade(enabled); - } - } - - Pooled { - is_reused: true, - key: key.clone(), - pool: pool_ref, - value: Some(value), - } - } -} - -/// Pop off this list, looking for a usable connection that hasn't expired. -struct IdlePopper<'a, T> { - key: &'a Key, - list: &'a mut Vec>, -} - -impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { - fn pop(self, expiration: &Expiration) -> Option> { - while let Some(entry) = self.list.pop() { - // If the connection has been closed, or is older than our idle - // timeout, simply drop it and keep looking... - if !entry.value.is_open() { - trace!("removing closed connection for {:?}", self.key); - continue; - } - // TODO: Actually, since the `idle` list is pushed to the end always, - // that would imply that if *this* entry is expired, then anything - // "earlier" in the list would *have* to be expired also... Right? - // - // In that case, we could just break out of the loop and drop the - // whole list... - if expiration.expires(entry.idle_at) { - trace!("removing expired connection for {:?}", self.key); - continue; - } - - let value = match entry.value.reserve() { - #[cfg(feature = "http2")] - Reservation::Shared(to_reinsert, to_checkout) => { - self.list.push(Idle { - idle_at: Instant::now(), - value: to_reinsert, - }); - to_checkout - } - Reservation::Unique(unique) => unique, - }; - - return Some(Idle { - idle_at: entry.idle_at, - value, - }); - } - - None - } -} - -impl PoolInner { - fn put(&mut self, key: Key, value: T, __pool_ref: &Arc>>) { - if value.can_share() && self.idle.contains_key(&key) { - trace!("put; existing idle HTTP/2 connection for {:?}", key); - return; - } - trace!("put; add idle connection for {:?}", key); - let mut remove_waiters = false; - let mut value = Some(value); - if let Some(waiters) = self.waiters.get_mut(&key) { - while let Some(tx) = waiters.pop_front() { - if !tx.is_canceled() { - let reserved = value.take().expect("value already sent"); - let reserved = match reserved.reserve() { - #[cfg(feature = "http2")] - Reservation::Shared(to_keep, to_send) => { - value = Some(to_keep); - to_send - } - Reservation::Unique(uniq) => uniq, - }; - match tx.send(reserved) { - Ok(()) => { - if value.is_none() { - break; - } else { - continue; - } - } - Err(e) => { - value = Some(e); - } - } - } - - trace!("put; removing canceled waiter for {:?}", key); - } - remove_waiters = waiters.is_empty(); - } - if remove_waiters { - self.waiters.remove(&key); - } - - match value { - Some(value) => { - // borrow-check scope... - { - let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); - if self.max_idle_per_host <= idle_list.len() { - trace!("max idle per host for {:?}, dropping connection", key); - return; - } - - debug!("pooling idle connection for {:?}", key); - idle_list.push(Idle { - value, - idle_at: Instant::now(), - }); - } - - #[cfg(feature = "runtime")] - { - self.spawn_idle_interval(__pool_ref); - } - } - None => trace!("put; found waiter for {:?}", key), - } - } - - /// A `Connecting` task is complete. Not necessarily successfully, - /// but the lock is going away, so clean up. - fn connected(&mut self, key: &Key) { - let existed = self.connecting.remove(key); - debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); - // cancel any waiters. if there are any, it's because - // this Connecting task didn't complete successfully. - // those waiters would never receive a connection. - self.waiters.remove(key); - } - - #[cfg(feature = "runtime")] - fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { - let (dur, rx) = { - if self.idle_interval_ref.is_some() { - return; - } - - if let Some(dur) = self.timeout { - let (tx, rx) = oneshot::channel(); - self.idle_interval_ref = Some(tx); - (dur, rx) - } else { - return; - } - }; - - let interval = IdleTask { - interval: tokio::time::interval(dur), - pool: WeakOpt::downgrade(pool_ref), - pool_drop_notifier: rx, - }; - - self.exec.execute(interval); - } -} - -impl PoolInner { - /// Any `FutureResponse`s that were created will have made a `Checkout`, - /// and possibly inserted into the pool that it is waiting for an idle - /// connection. If a user ever dropped that future, we need to clean out - /// those parked senders. - fn clean_waiters(&mut self, key: &Key) { - let mut remove_waiters = false; - if let Some(waiters) = self.waiters.get_mut(key) { - waiters.retain(|tx| !tx.is_canceled()); - remove_waiters = waiters.is_empty(); - } - if remove_waiters { - self.waiters.remove(key); - } - } -} - -#[cfg(feature = "runtime")] -impl PoolInner { - /// This should *only* be called by the IdleTask - fn clear_expired(&mut self) { - let dur = self.timeout.expect("interval assumes timeout"); - - let now = Instant::now(); - //self.last_idle_check_at = now; - - self.idle.retain(|key, values| { - values.retain(|entry| { - if !entry.value.is_open() { - trace!("idle interval evicting closed for {:?}", key); - return false; - } - - // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. - if now.saturating_duration_since(entry.idle_at) > dur { - trace!("idle interval evicting expired for {:?}", key); - return false; - } - - // Otherwise, keep this value... - true - }); - - // returning false evicts this key/val - !values.is_empty() - }); - } -} - -impl Clone for Pool { - fn clone(&self) -> Pool { - Pool { - inner: self.inner.clone(), - } - } -} - -/// A wrapped poolable value that tries to reinsert to the Pool on Drop. -// Note: The bounds `T: Poolable` is needed for the Drop impl. -pub(super) struct Pooled { - value: Option, - is_reused: bool, - key: Key, - pool: WeakOpt>>, -} - -impl Pooled { - pub(super) fn is_reused(&self) -> bool { - self.is_reused - } - - pub(super) fn is_pool_enabled(&self) -> bool { - self.pool.0.is_some() - } - - fn as_ref(&self) -> &T { - self.value.as_ref().expect("not dropped") - } - - fn as_mut(&mut self) -> &mut T { - self.value.as_mut().expect("not dropped") - } -} - -impl Deref for Pooled { - type Target = T; - fn deref(&self) -> &T { - self.as_ref() - } -} - -impl DerefMut for Pooled { - fn deref_mut(&mut self) -> &mut T { - self.as_mut() - } -} - -impl Drop for Pooled { - fn drop(&mut self) { - if let Some(value) = self.value.take() { - if !value.is_open() { - // If we *already* know the connection is done here, - // it shouldn't be re-inserted back into the pool. - return; - } - - if let Some(pool) = self.pool.upgrade() { - if let Ok(mut inner) = pool.lock() { - inner.put(self.key.clone(), value, &pool); - } - } else if !value.can_share() { - trace!("pool dropped, dropping pooled ({:?})", self.key); - } - // Ver::Http2 is already in the Pool (or dead), so we wouldn't - // have an actual reference to the Pool. - } - } -} - -impl fmt::Debug for Pooled { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Pooled").field("key", &self.key).finish() - } -} - -struct Idle { - idle_at: Instant, - value: T, -} - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -pub(super) struct Checkout { - key: Key, - pool: Pool, - waiter: Option>, -} - -#[derive(Debug)] -pub(super) struct CheckoutIsClosedError; - -impl StdError for CheckoutIsClosedError {} - -impl fmt::Display for CheckoutIsClosedError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("checked out connection was closed") - } -} - -impl Checkout { - fn poll_waiter( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - if let Some(mut rx) = self.waiter.take() { - match Pin::new(&mut rx).poll(cx) { - Poll::Ready(Ok(value)) => { - if value.is_open() { - Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) - } else { - Poll::Ready(Some(Err( - crate::Error::new_canceled().with(CheckoutIsClosedError) - ))) - } - } - Poll::Pending => { - self.waiter = Some(rx); - Poll::Pending - } - Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( - crate::Error::new_canceled().with("request has been canceled") - ))), - } - } else { - Poll::Ready(None) - } - } - - fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option> { - let entry = { - let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); - let expiration = Expiration::new(inner.timeout); - let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { - trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); - // A block to end the mutable borrow on list, - // so the map below can check is_empty() - { - let popper = IdlePopper { - key: &self.key, - list, - }; - popper.pop(&expiration) - } - .map(|e| (e, list.is_empty())) - }); - - let (entry, empty) = if let Some((e, empty)) = maybe_entry { - (Some(e), empty) - } else { - // No entry found means nuke the list for sure. - (None, true) - }; - if empty { - //TODO: This could be done with the HashMap::entry API instead. - inner.idle.remove(&self.key); - } - - if entry.is_none() && self.waiter.is_none() { - let (tx, mut rx) = oneshot::channel(); - trace!("checkout waiting for idle connection: {:?}", self.key); - inner - .waiters - .entry(self.key.clone()) - .or_insert_with(VecDeque::new) - .push_back(tx); - - // register the waker with this oneshot - assert!(Pin::new(&mut rx).poll(cx).is_pending()); - self.waiter = Some(rx); - } - - entry - }; - - entry.map(|e| self.pool.reuse(&self.key, e.value)) - } -} - -impl Future for Checkout { - type Output = crate::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - if let Some(pooled) = ready!(self.poll_waiter(cx)?) { - return Poll::Ready(Ok(pooled)); - } - - if let Some(pooled) = self.checkout(cx) { - Poll::Ready(Ok(pooled)) - } else if !self.pool.is_enabled() { - Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) - } else { - // There's a new waiter, already registered in self.checkout() - debug_assert!(self.waiter.is_some()); - Poll::Pending - } - } -} - -impl Drop for Checkout { - fn drop(&mut self) { - if self.waiter.take().is_some() { - trace!("checkout dropped for {:?}", self.key); - if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { - inner.clean_waiters(&self.key); - } - } - } -} - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -pub(super) struct Connecting { - key: Key, - pool: WeakOpt>>, -} - -impl Connecting { - pub(super) fn alpn_h2(self, pool: &Pool) -> Option { - debug_assert!( - self.pool.0.is_none(), - "Connecting::alpn_h2 but already Http2" - ); - - pool.connecting(&self.key, Ver::Http2) - } -} - -impl Drop for Connecting { - fn drop(&mut self) { - if let Some(pool) = self.pool.upgrade() { - // No need to panic on drop, that could abort! - if let Ok(mut inner) = pool.lock() { - inner.connected(&self.key); - } - } - } -} - -struct Expiration(Option); - -impl Expiration { - fn new(dur: Option) -> Expiration { - Expiration(dur) - } - - fn expires(&self, instant: Instant) -> bool { - match self.0 { - // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. - Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout, - None => false, - } - } -} - -#[cfg(feature = "runtime")] -pin_project_lite::pin_project! { - struct IdleTask { - #[pin] - interval: Interval, - pool: WeakOpt>>, - // This allows the IdleTask to be notified as soon as the entire - // Pool is fully dropped, and shutdown. This channel is never sent on, - // but Err(Canceled) will be received when the Pool is dropped. - #[pin] - pool_drop_notifier: oneshot::Receiver, - } -} - -#[cfg(feature = "runtime")] -impl Future for IdleTask { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.pool_drop_notifier.as_mut().poll(cx) { - Poll::Ready(Ok(n)) => match n {}, - Poll::Pending => (), - Poll::Ready(Err(_canceled)) => { - trace!("pool closed, canceling idle interval"); - return Poll::Ready(()); - } - } - - ready!(this.interval.as_mut().poll_tick(cx)); - - if let Some(inner) = this.pool.upgrade() { - if let Ok(mut inner) = inner.lock() { - trace!("idle interval checking for expired"); - inner.clear_expired(); - continue; - } - } - return Poll::Ready(()); - } - } -} - -impl WeakOpt { - fn none() -> Self { - WeakOpt(None) - } - - fn downgrade(arc: &Arc) -> Self { - WeakOpt(Some(Arc::downgrade(arc))) - } - - fn upgrade(&self) -> Option> { - self.0.as_ref().and_then(Weak::upgrade) - } -} - -#[cfg(test)] -mod tests { - use std::task::Poll; - use std::time::Duration; - - use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::common::{exec::Exec, task, Future, Pin}; - - /// Test unique reservations. - #[derive(Debug, PartialEq, Eq)] - struct Uniq(T); - - impl Poolable for Uniq { - fn is_open(&self) -> bool { - true - } - - fn reserve(self) -> Reservation { - Reservation::Unique(self) - } - - fn can_share(&self) -> bool { - false - } - } - - fn c(key: Key) -> Connecting { - Connecting { - key, - pool: WeakOpt::none(), - } - } - - fn host_key(s: &str) -> Key { - (http::uri::Scheme::HTTP, s.parse().expect("host key")) - } - - fn pool_no_timer() -> Pool { - pool_max_idle_no_timer(::std::usize::MAX) - } - - fn pool_max_idle_no_timer(max_idle: usize) -> Pool { - let pool = Pool::new( - super::Config { - idle_timeout: Some(Duration::from_millis(100)), - max_idle_per_host: max_idle, - }, - &Exec::Default, - ); - pool.no_timer(); - pool - } - - #[tokio::test] - async fn test_pool_checkout_smoke() { - let pool = pool_no_timer(); - let key = host_key("foo"); - let pooled = pool.pooled(c(key.clone()), Uniq(41)); - - drop(pooled); - - match pool.checkout(key).await { - Ok(pooled) => assert_eq!(*pooled, Uniq(41)), - Err(_) => panic!("not ready"), - }; - } - - /// Helper to check if the future is ready after polling once. - struct PollOnce<'a, F>(&'a mut F); - - impl Future for PollOnce<'_, F> - where - F: Future> + Unpin, - { - type Output = Option<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match Pin::new(&mut self.0).poll(cx) { - Poll::Ready(Ok(_)) => Poll::Ready(Some(())), - Poll::Ready(Err(_)) => Poll::Ready(Some(())), - Poll::Pending => Poll::Ready(None), - } - } - } - - #[tokio::test] - async fn test_pool_checkout_returns_none_if_expired() { - let pool = pool_no_timer(); - let key = host_key("foo"); - let pooled = pool.pooled(c(key.clone()), Uniq(41)); - - drop(pooled); - tokio::time::sleep(pool.locked().timeout.unwrap()).await; - let mut checkout = pool.checkout(key); - let poll_once = PollOnce(&mut checkout); - let is_not_ready = poll_once.await.is_none(); - assert!(is_not_ready); - } - - #[cfg(feature = "runtime")] - #[tokio::test] - async fn test_pool_checkout_removes_expired() { - let pool = pool_no_timer(); - let key = host_key("foo"); - - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); - - assert_eq!( - pool.locked().idle.get(&key).map(|entries| entries.len()), - Some(3) - ); - tokio::time::sleep(pool.locked().timeout.unwrap()).await; - - let mut checkout = pool.checkout(key.clone()); - let poll_once = PollOnce(&mut checkout); - // checkout.await should clean out the expired - poll_once.await; - assert!(pool.locked().idle.get(&key).is_none()); - } - - #[test] - fn test_pool_max_idle_per_host() { - let pool = pool_max_idle_no_timer(2); - let key = host_key("foo"); - - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); - - // pooled and dropped 3, max_idle should only allow 2 - assert_eq!( - pool.locked().idle.get(&key).map(|entries| entries.len()), - Some(2) - ); - } - - #[cfg(feature = "runtime")] - #[tokio::test] - async fn test_pool_timer_removes_expired() { - let _ = pretty_env_logger::try_init(); - tokio::time::pause(); - - let pool = Pool::new( - super::Config { - idle_timeout: Some(Duration::from_millis(10)), - max_idle_per_host: std::usize::MAX, - }, - &Exec::Default, - ); - - let key = host_key("foo"); - - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); - - assert_eq!( - pool.locked().idle.get(&key).map(|entries| entries.len()), - Some(3) - ); - - // Let the timer tick passed the expiration... - tokio::time::advance(Duration::from_millis(30)).await; - // Yield so the Interval can reap... - tokio::task::yield_now().await; - - assert!(pool.locked().idle.get(&key).is_none()); - } - - #[tokio::test] - async fn test_pool_checkout_task_unparked() { - use futures_util::future::join; - use futures_util::FutureExt; - - let pool = pool_no_timer(); - let key = host_key("foo"); - let pooled = pool.pooled(c(key.clone()), Uniq(41)); - - let checkout = join(pool.checkout(key), async { - // the checkout future will park first, - // and then this lazy future will be polled, which will insert - // the pooled back into the pool - // - // this test makes sure that doing so will unpark the checkout - drop(pooled); - }) - .map(|(entry, _)| entry); - - assert_eq!(*checkout.await.unwrap(), Uniq(41)); - } - - #[tokio::test] - async fn test_pool_checkout_drop_cleans_up_waiters() { - let pool = pool_no_timer::>(); - let key = host_key("foo"); - - let mut checkout1 = pool.checkout(key.clone()); - let mut checkout2 = pool.checkout(key.clone()); - - let poll_once1 = PollOnce(&mut checkout1); - let poll_once2 = PollOnce(&mut checkout2); - - // first poll needed to get into Pool's parked - poll_once1.await; - assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); - poll_once2.await; - assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); - - // on drop, clean up Pool - drop(checkout1); - assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); - - drop(checkout2); - assert!(pool.locked().waiters.get(&key).is_none()); - } - - #[derive(Debug)] - struct CanClose { - #[allow(unused)] - val: i32, - closed: bool, - } - - impl Poolable for CanClose { - fn is_open(&self) -> bool { - !self.closed - } - - fn reserve(self) -> Reservation { - Reservation::Unique(self) - } - - fn can_share(&self) -> bool { - false - } - } - - #[test] - fn pooled_drop_if_closed_doesnt_reinsert() { - let pool = pool_no_timer(); - let key = host_key("foo"); - pool.pooled( - c(key.clone()), - CanClose { - val: 57, - closed: true, - }, - ); - - assert!(!pool.locked().idle.contains_key(&key)); - } -} diff --git a/src/common/lazy.rs b/src/common/lazy.rs deleted file mode 100644 index 2722077303..0000000000 --- a/src/common/lazy.rs +++ /dev/null @@ -1,76 +0,0 @@ -use pin_project_lite::pin_project; - -use super::{task, Future, Pin, Poll}; - -pub(crate) trait Started: Future { - fn started(&self) -> bool; -} - -pub(crate) fn lazy(func: F) -> Lazy -where - F: FnOnce() -> R, - R: Future + Unpin, -{ - Lazy { - inner: Inner::Init { func }, - } -} - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -pin_project! { - #[allow(missing_debug_implementations)] - pub(crate) struct Lazy { - #[pin] - inner: Inner, - } -} - -pin_project! { - #[project = InnerProj] - #[project_replace = InnerProjReplace] - enum Inner { - Init { func: F }, - Fut { #[pin] fut: R }, - Empty, - } -} - -impl Started for Lazy -where - F: FnOnce() -> R, - R: Future, -{ - fn started(&self) -> bool { - match self.inner { - Inner::Init { .. } => false, - Inner::Fut { .. } | Inner::Empty => true, - } - } -} - -impl Future for Lazy -where - F: FnOnce() -> R, - R: Future, -{ - type Output = R::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut this = self.project(); - - if let InnerProj::Fut { fut } = this.inner.as_mut().project() { - return fut.poll(cx); - } - - match this.inner.as_mut().project_replace(Inner::Empty) { - InnerProjReplace::Init { func } => { - this.inner.set(Inner::Fut { fut: func() }); - if let InnerProj::Fut { fut } = this.inner.project() { - return fut.poll(cx); - } - unreachable!() - } - _ => unreachable!("lazy state wrong"), - } - } -} diff --git a/src/common/mod.rs b/src/common/mod.rs index 68a81538f6..bc1781e832 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -13,16 +13,10 @@ pub(crate) mod date; #[cfg(any(feature = "http1", feature = "http2", feature = "server"))] pub(crate) mod exec; pub(crate) mod io; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -mod lazy; mod never; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -pub(crate) mod sync_wrapper; pub(crate) mod task; pub(crate) mod watch; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -pub(crate) use self::lazy::{lazy, Started as Lazy}; #[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))] pub(crate) use self::never::Never; pub(crate) use self::task::Poll; diff --git a/src/common/sync_wrapper.rs b/src/common/sync_wrapper.rs deleted file mode 100644 index 704d1a6712..0000000000 --- a/src/common/sync_wrapper.rs +++ /dev/null @@ -1,110 +0,0 @@ -/* - * This is a copy of the sync_wrapper crate. - */ - -/// A mutual exclusion primitive that relies on static type information only -/// -/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut` -/// reference, the Rust type system ensures that no other part of the program can hold another -/// reference to the data. Therefore it is safe to access it even if the current thread obtained -/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking -/// a [`Mutex`] can be avoided by using this static version. -/// -/// One example where this is often applicable is [`Future`], which requires an exclusive reference -/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by -/// multiple threads concurrently, the executor can only run the `Future` on one thread at any -/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can -/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it -/// contains such a `Future`. -/// -/// # Example -/// -/// ```ignore -/// use hyper::common::sync_wrapper::SyncWrapper; -/// use std::future::Future; -/// -/// struct MyThing { -/// future: SyncWrapper + Send>>, -/// } -/// -/// impl MyThing { -/// // all accesses to `self.future` now require an exclusive reference or ownership -/// } -/// -/// fn assert_sync() {} -/// -/// assert_sync::(); -/// ``` -/// -/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html -/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html -/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll -/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html -#[repr(transparent)] -pub(crate) struct SyncWrapper(T); - -impl SyncWrapper { - /// Creates a new SyncWrapper containing the given value. - /// - /// # Examples - /// - /// ```ignore - /// use hyper::common::sync_wrapper::SyncWrapper; - /// - /// let wrapped = SyncWrapper::new(42); - /// ``` - pub(crate) fn new(value: T) -> Self { - Self(value) - } - - /// Acquires a reference to the protected value. - /// - /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method - /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which - /// returns an error if another thread panicked while holding the lock. It is not recommended - /// to send an exclusive reference to a potentially damaged value to another thread for further - /// processing. - /// - /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut - /// - /// # Examples - /// - /// ```ignore - /// use hyper::common::sync_wrapper::SyncWrapper; - /// - /// let mut wrapped = SyncWrapper::new(42); - /// let value = wrapped.get_mut(); - /// *value = 0; - /// assert_eq!(*wrapped.get_mut(), 0); - /// ``` - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.0 - } - - /// Consumes this wrapper, returning the underlying data. - /// - /// This is safe because it requires ownership of the wrapper, aherefore this method will neither - /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which - /// returns an error if another thread panicked while holding the lock. It is not recommended - /// to send an exclusive reference to a potentially damaged value to another thread for further - /// processing. - /// - /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner - /// - /// # Examples - /// - /// ```ignore - /// use hyper::common::sync_wrapper::SyncWrapper; - /// - /// let mut wrapped = SyncWrapper::new(42); - /// assert_eq!(wrapped.into_inner(), 42); - /// ``` - #[allow(dead_code)] - pub(crate) fn into_inner(self) -> T { - self.0 - } -} - -// this is safe because the only operations permitted on this data structure require exclusive -// access or ownership -unsafe impl Sync for SyncWrapper {} diff --git a/src/error.rs b/src/error.rs index 2bf134de3a..e949ea4ae9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -34,9 +34,6 @@ pub(super) enum Kind { /// An `io::Error` that occurred while trying to read or write to a network stream. #[cfg(any(feature = "http1", feature = "http2"))] Io, - /// Error occurred while connecting. - #[allow(unused)] - Connect, /// Error creating a TcpListener. #[cfg(all(feature = "tcp", feature = "server"))] Listen, @@ -101,22 +98,10 @@ pub(super) enum User { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] UnexpectedHeader, - /// User tried to create a Request with bad version. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - UnsupportedVersion, - /// User tried to create a CONNECT Request with the Client. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - UnsupportedRequestMethod, /// User tried to respond with a 1xx (not 101) response code. #[cfg(feature = "http1")] #[cfg(feature = "server")] UnsupportedStatusCode, - /// User tried to send a Request with Client with non-absolute URI. - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - AbsoluteUriRequired, /// User tried polling for an upgrade that doesn't exist. NoUpgrade, @@ -173,11 +158,6 @@ impl Error { matches!(self.inner.kind, Kind::ChannelClosed) } - /// Returns true if this was an error from `Connect`. - pub fn is_connect(&self) -> bool { - matches!(self.inner.kind, Kind::Connect) - } - /// Returns true if the connection closed before a message could complete. pub fn is_incomplete_message(&self) -> bool { matches!(self.inner.kind, Kind::IncompleteMessage) @@ -270,12 +250,6 @@ impl Error { Error::new(Kind::Listen).with(cause) } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - pub(super) fn new_connect>(cause: E) -> Error { - Error::new(Kind::Connect).with(cause) - } - pub(super) fn new_closed() -> Error { Error::new(Kind::ChannelClosed) } @@ -309,30 +283,12 @@ impl Error { Error::new(Kind::HeaderTimeout) } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - pub(super) fn new_user_unsupported_version() -> Error { - Error::new_user(User::UnsupportedVersion) - } - - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - pub(super) fn new_user_unsupported_request_method() -> Error { - Error::new_user(User::UnsupportedRequestMethod) - } - #[cfg(feature = "http1")] #[cfg(feature = "server")] pub(super) fn new_user_unsupported_status_code() -> Error { Error::new_user(User::UnsupportedStatusCode) } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - pub(super) fn new_user_absolute_uri_required() -> Error { - Error::new_user(User::AbsoluteUriRequired) - } - pub(super) fn new_user_no_upgrade() -> Error { Error::new_user(User::NoUpgrade) } @@ -411,7 +367,6 @@ impl Error { #[cfg(feature = "http1")] Kind::UnexpectedMessage => "received unexpected message from connection", Kind::ChannelClosed => "channel closed", - Kind::Connect => "error trying to connect", Kind::Canceled => "operation was canceled", #[cfg(all(feature = "server", feature = "tcp"))] Kind::Listen => "error creating server listener", @@ -436,20 +391,11 @@ impl Error { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Kind::User(User::UnsupportedVersion) => "request has unsupported HTTP version", - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Kind::User(User::UnsupportedRequestMethod) => "request has unsupported HTTP method", #[cfg(feature = "http1")] #[cfg(feature = "server")] Kind::User(User::UnsupportedStatusCode) => { "response has 1xx status code, not supported by server" } - #[cfg(any(feature = "http1", feature = "http2"))] - #[cfg(feature = "client")] - Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs", Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(feature = "http1")] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", diff --git a/src/lib.rs b/src/lib.rs index 04926928d4..865027459c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,9 +93,6 @@ cfg_feature! { #![feature = "client"] pub mod client; - #[cfg(any(feature = "http1", feature = "http2"))] - #[doc(no_inline)] - pub use crate::client::Client; } cfg_feature! { diff --git a/tests/client.rs b/tests/client.rs index 3408547822..5360b71ad5 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -9,7 +9,6 @@ use std::fmt; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; -use std::task::{Context, Poll}; use std::thread; use std::time::Duration; @@ -17,11 +16,11 @@ use http::uri::PathAndQuery; use http_body_util::{BodyExt, StreamBody}; use hyper::body::to_bytes as concat; use hyper::header::HeaderValue; -use hyper::{Body, Method, Request, StatusCode, Uri, Version}; +use hyper::{Method, Request, StatusCode, Uri, Version}; use bytes::Bytes; use futures_channel::oneshot; -use futures_core::{Future, Stream, TryFuture}; +use futures_core::{Future, TryFuture}; use futures_util::future::{self, FutureExt, TryFutureExt}; use tokio::net::TcpStream; mod support; @@ -1325,1035 +1324,6 @@ test! { body: None, } -mod dispatch_impl { - use super::*; - use std::io::{self, Read, Write}; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - use std::thread; - use std::time::Duration; - - use futures_channel::{mpsc, oneshot}; - use futures_core::{self, Future}; - use futures_util::future::{FutureExt, TryFutureExt}; - use futures_util::stream::StreamExt; - use http::Uri; - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tokio::net::TcpStream; - - use super::support; - use hyper::body::HttpBody; - use hyper::client::connect::{Connected, Connection}; - use hyper::Client; - - #[test] - fn drop_body_before_eof_closes_connection() { - // https://github.com/hyperium/hyper/issues/1353 - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - let (closes_tx, closes) = mpsc::channel(10); - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let (tx1, rx1) = oneshot::channel(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - let body = vec![b'x'; 1024 * 128]; - write!( - sock, - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - body.len() - ) - .expect("write head"); - let _ = sock.write_all(&body); - let _ = tx1.send(()); - }); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req).map_ok(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - }); - let rx = rx1.expect("thread panicked"); - rt.block_on(async move { - let (res, ()) = future::join(res, rx).await; - res.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - }); - - rt.block_on(closes.into_future()).0.expect("closes"); - } - - #[test] - fn dropped_client_closes_connection() { - // https://github.com/hyperium/hyper/issues/1353 - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - let body = [b'x'; 64]; - write!( - sock, - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - body.len() - ) - .expect("write head"); - let _ = sock.write_all(&body); - let _ = tx1.send(()); - }); - - let res = { - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - client - .request(req) - .and_then(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res) - }) - .map_ok(|_| ()) - }; - // client is dropped - let rx = rx1.expect("thread panicked"); - rt.block_on(async move { - let (res, ()) = future::join(res, rx).await; - res.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - }); - - rt.block_on(closes.into_future()).0.expect("closes"); - } - - #[tokio::test] - async fn drop_client_closes_idle_connections() { - use futures_util::future; - - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, mut closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - let body = [b'x'; 64]; - write!( - sock, - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - body.len() - ) - .expect("write head"); - let _ = sock.write_all(&body); - let _ = tx1.send(()); - - // prevent this thread from closing until end of test, so the connection - // stays open and idle until Client is dropped - support::runtime().block_on(client_drop_rx.into_future()) - }); - - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req).and_then(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res) - }); - let rx = rx1.expect("thread panicked"); - - let (res, ()) = future::join(res, rx).await; - res.unwrap(); - - // not closed yet, just idle - future::poll_fn(|ctx| { - assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); - Poll::Ready(()) - }) - .await; - - // drop to start the connections closing - drop(client); - - // and wait a few ticks for the connections to close - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; - } - - #[tokio::test] - async fn drop_response_future_closes_in_progress_connection() { - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - // we never write a response head - // simulates a slow server operation - let _ = tx1.send(()); - - // prevent this thread from closing until end of test, so the connection - // stays open and idle until Client is dropped - let _ = client_drop_rx.recv(); - }); - - let res = { - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - client.request(req).map(|_| unreachable!()) - }; - - future::select(res, rx1).await; - - // res now dropped - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; - } - - #[tokio::test] - async fn drop_response_body_closes_in_progress_connection() { - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - write!( - sock, - "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n" - ) - .expect("write head"); - let _ = tx1.send(()); - - // prevent this thread from closing until end of test, so the connection - // stays open and idle until Client is dropped - let _ = client_drop_rx.recv(); - }); - - let rx = rx1.expect("thread panicked"); - let res = { - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - // notably, haven't read body yet - client.request(req) - }; - - let (res, ()) = future::join(res, rx).await; - // drop the body - res.unwrap(); - - // and wait a few ticks to see the connection drop - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; - } - - #[tokio::test] - async fn no_keep_alive_closes_connection() { - // https://github.com/hyperium/hyper/issues/1383 - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - let (_tx2, rx2) = std::sync::mpsc::channel::<()>(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .unwrap(); - let _ = tx1.send(()); - - // prevent this thread from closing until end of test, so the connection - // stays open and idle until Client is dropped - let _ = rx2.recv(); - }); - - let client = Client::builder() - .pool_max_idle_per_host(0) - .build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req).and_then(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res) - }); - let rx = rx1.expect("thread panicked"); - - let (res, ()) = future::join(res, rx).await; - res.unwrap(); - - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; - } - - #[tokio::test] - async fn socket_disconnect_closes_idle_conn() { - // notably when keep-alive is enabled - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .unwrap(); - let _ = tx1.send(()); - }); - - let client = Client::builder().build(DebugConnector::with_closes(closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req).and_then(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res) - }); - let rx = rx1.expect("thread panicked"); - - let (res, ()) = future::join(res, rx).await; - res.unwrap(); - - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; - } - - #[test] - fn connect_call_is_lazy() { - // We especially don't want connects() triggered if there's - // idle connections that the Checkout would have found - let _ = pretty_env_logger::try_init(); - - let _rt = support::runtime(); - let connector = DebugConnector::new(); - let connects = connector.connects.clone(); - - let client = Client::builder().build(connector); - - assert_eq!(connects.load(Ordering::Relaxed), 0); - let req = Request::builder() - .uri("http://hyper.local/a") - .body(Body::empty()) - .unwrap(); - let _fut = client.request(req); - // internal Connect::connect should have been lazy, and not - // triggered an actual connect yet. - assert_eq!(connects.load(Ordering::Relaxed), 0); - } - - #[test] - fn client_keep_alive_0() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - let connector = DebugConnector::new(); - let connects = connector.connects.clone(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - //drop(server); - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 1"); - let _ = tx1.send(()); - - let n2 = sock.read(&mut buf).expect("read 2"); - assert_ne!(n2, 0); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 2"); - let _ = tx2.send(()); - }); - - assert_eq!(connects.load(Ordering::SeqCst), 0); - - let rx = rx1.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - - assert_eq!(connects.load(Ordering::SeqCst), 1); - - // sleep real quick to let the threadpool put connection in ready - // state and back into client pool - thread::sleep(Duration::from_millis(50)); - - let rx = rx2.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/b", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - - assert_eq!( - connects.load(Ordering::SeqCst), - 1, - "second request should still only have 1 connect" - ); - drop(client); - } - - #[test] - fn client_keep_alive_extra_body() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - - let connector = DebugConnector::new(); - let connects = connector.connects.clone(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello") - .expect("write 1"); - // the body "hello", while ignored because its a HEAD request, should mean the connection - // cannot be put back in the pool - let _ = tx1.send(()); - - let mut sock2 = server.accept().unwrap().0; - let n2 = sock2.read(&mut buf).expect("read 2"); - assert_ne!(n2, 0); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock2 - .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 2"); - let _ = tx2.send(()); - }); - - assert_eq!(connects.load(Ordering::Relaxed), 0); - - let rx = rx1.expect("thread panicked"); - let req = Request::builder() - .method("HEAD") - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - - assert_eq!(connects.load(Ordering::Relaxed), 1); - - let rx = rx2.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/b", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - - assert_eq!(connects.load(Ordering::Relaxed), 2); - } - - #[test] - fn client_keep_alive_when_response_before_request_body_ends() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - - let connector = DebugConnector::new(); - let connects = connector.connects.clone(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let (tx3, rx3) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 1"); - // after writing the response, THEN stream the body - let _ = tx1.send(()); - - sock.read(&mut buf).expect("read 2"); - let _ = tx2.send(()); - - let n2 = sock.read(&mut buf).expect("read 3"); - assert_ne!(n2, 0); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 2"); - let _ = tx3.send(()); - }); - - assert_eq!(connects.load(Ordering::Relaxed), 0); - - let delayed_body = rx1 - .then(|_| tokio::time::sleep(Duration::from_millis(200))) - .map(|_| Ok::<_, ()>(Bytes::from("hello a"))) - .map_err(|_| -> std::convert::Infallible { panic!("rx1") }) - .into_stream(); - - let rx = rx2.expect("thread panicked"); - let req = Request::builder() - .method("POST") - .uri(&*format!("http://{}/a", addr)) - .body(BodyExt::boxed(StreamBody::new(delayed_body))) - .unwrap(); - let client2 = client.clone(); - - // req 1 - let fut = future::join(client.request(req), rx) - .then(|_| tokio::time::sleep(Duration::from_millis(200))) - // req 2 - .then(move |()| { - let rx = rx3.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/b", addr)) - .body(BodyExt::boxed(http_body_util::Empty::new())) - .unwrap(); - future::join(client2.request(req), rx).map(|r| r.0) - }); - - rt.block_on(fut).unwrap(); - - assert_eq!(connects.load(Ordering::Relaxed), 1); - } - - #[tokio::test] - async fn client_keep_alive_eager_when_chunked() { - // If a response body has been read to completion, with completion - // determined by some other factor, like decompression, and thus - // it is in't polled a final time to clear the final 0-len chunk, - // try to eagerly clear it so the connection can still be used. - - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let connector = DebugConnector::new(); - let connects = connector.connects.clone(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - //drop(server); - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all( - b"\ - HTTP/1.1 200 OK\r\n\ - transfer-encoding: chunked\r\n\ - \r\n\ - 5\r\n\ - hello\r\n\ - 0\r\n\r\n\ - ", - ) - .expect("write 1"); - let _ = tx1.send(()); - - let n2 = sock.read(&mut buf).expect("read 2"); - assert_ne!(n2, 0, "bytes of second request"); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 2"); - let _ = tx2.send(()); - }); - - assert_eq!(connects.load(Ordering::SeqCst), 0); - - let rx = rx1.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let fut = client.request(req); - - let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap(); - assert_eq!(connects.load(Ordering::SeqCst), 1); - assert_eq!(resp.status(), 200); - assert_eq!(resp.headers()["transfer-encoding"], "chunked"); - - // Read the "hello" chunk... - let chunk = resp.body_mut().data().await.unwrap().unwrap(); - assert_eq!(chunk, "hello"); - - // With our prior knowledge, we know that's the end of the body. - // So just drop the body, without polling for the `0\r\n\r\n` end. - drop(resp); - - // sleep real quick to let the threadpool put connection in ready - // state and back into client pool - tokio::time::sleep(Duration::from_millis(50)).await; - - let rx = rx2.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/b", addr)) - .body(Body::empty()) - .unwrap(); - let fut = client.request(req); - future::join(fut, rx).map(|r| r.0).await.unwrap(); - - assert_eq!( - connects.load(Ordering::SeqCst), - 1, - "second request should still only have 1 connect" - ); - drop(client); - } - - #[test] - fn connect_proxy_sends_absolute_uri() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - let connector = DebugConnector::new().proxy(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - //drop(server); - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - let n = sock.read(&mut buf).expect("read 1"); - let expected = format!( - "GET http://{addr}/foo/bar HTTP/1.1\r\nhost: {addr}\r\n\r\n", - addr = addr - ); - assert_eq!(s(&buf[..n]), expected); - - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 1"); - let _ = tx1.send(()); - }); - - let rx = rx1.expect("thread panicked"); - let req = Request::builder() - .uri(&*format!("http://{}/foo/bar", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - } - - #[test] - fn connect_proxy_http_connect_sends_authority_form() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - let connector = DebugConnector::new().proxy(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - //drop(server); - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - let n = sock.read(&mut buf).expect("read 1"); - let expected = format!( - "CONNECT {addr} HTTP/1.1\r\nhost: {addr}\r\n\r\n", - addr = addr - ); - assert_eq!(s(&buf[..n]), expected); - - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - .expect("write 1"); - let _ = tx1.send(()); - }); - - let rx = rx1.expect("thread panicked"); - let req = Request::builder() - .method("CONNECT") - .uri(&*format!("http://{}/useless/path", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req); - rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - } - - #[test] - fn client_upgrade() { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = support::runtime(); - - let connector = DebugConnector::new(); - - let client = Client::builder().build(connector); - - let (tx1, rx1) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all( - b"\ - HTTP/1.1 101 Switching Protocols\r\n\ - Upgrade: foobar\r\n\ - \r\n\ - foobar=ready\ - ", - ) - .unwrap(); - let _ = tx1.send(()); - - let n = sock.read(&mut buf).expect("read 2"); - assert_eq!(&buf[..n], b"foo=bar"); - sock.write_all(b"bar=foo").expect("write 2"); - }); - - let rx = rx1.expect("thread panicked"); - - let req = Request::builder() - .method("GET") - .uri(&*format!("http://{}/up", addr)) - .body(Body::empty()) - .unwrap(); - - let res = client.request(req); - let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - - assert_eq!(res.status(), 101); - let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); - - let parts = upgraded.downcast::().unwrap(); - assert_eq!(s(&parts.read_buf), "foobar=ready"); - - let mut io = parts.io; - rt.block_on(io.write_all(b"foo=bar")).unwrap(); - let mut vec = vec![]; - rt.block_on(io.read_to_end(&mut vec)).unwrap(); - assert_eq!(vec, b"bar=foo"); - } - - #[test] - fn alpn_h2() { - use hyper::server::conn::Http; - use hyper::service::service_fn; - use hyper::Response; - use tokio::net::TcpListener; - - let _ = pretty_env_logger::try_init(); - let rt = support::runtime(); - let listener = rt - .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))) - .unwrap(); - let addr = listener.local_addr().unwrap(); - let mut connector = DebugConnector::new(); - connector.alpn_h2 = true; - let connects = connector.connects.clone(); - - let client = Client::builder().build::<_, ::hyper::Body>(connector); - - rt.spawn(async move { - let (socket, _addr) = listener.accept().await.expect("accept"); - Http::new() - .http2_only(true) - .serve_connection( - socket, - service_fn(|req| async move { - assert_eq!(req.headers().get("host"), None); - Ok::<_, hyper::Error>(Response::new(Body::empty())) - }), - ) - .await - .expect("server"); - }); - - assert_eq!(connects.load(Ordering::SeqCst), 0); - - let url = format!("http://{}/a", addr) - .parse::<::hyper::Uri>() - .unwrap(); - let res1 = client.get(url.clone()); - let res2 = client.get(url.clone()); - let res3 = client.get(url.clone()); - rt.block_on(future::try_join3(res1, res2, res3)).unwrap(); - - // Since the client doesn't know it can ALPN at first, it will have - // started 3 connections. But, the server above will only handle 1, - // so the unwrapped responses futures show it still worked. - assert_eq!(connects.load(Ordering::SeqCst), 3); - - let res4 = client.get(url.clone()); - rt.block_on(res4).unwrap(); - - // HTTP/2 request allowed - let res5 = client.request( - Request::builder() - .uri(url) - .version(hyper::Version::HTTP_2) - .body(Default::default()) - .unwrap(), - ); - rt.block_on(res5).unwrap(); - - assert_eq!( - connects.load(Ordering::SeqCst), - 3, - "after ALPN, no more connects" - ); - drop(client); - } - - #[derive(Clone)] - struct DebugConnector { - closes: mpsc::Sender<()>, - connects: Arc, - is_proxy: bool, - alpn_h2: bool, - } - - impl DebugConnector { - fn new() -> DebugConnector { - let (tx, _) = mpsc::channel(10); - DebugConnector::with_closes(tx) - } - - fn with_closes(closes: mpsc::Sender<()>) -> DebugConnector { - DebugConnector { - closes, - connects: Arc::new(AtomicUsize::new(0)), - is_proxy: false, - alpn_h2: false, - } - } - - fn proxy(mut self) -> Self { - self.is_proxy = true; - self - } - } - - impl hyper::service::Service for DebugConnector { - type Response = DebugStream; - type Error = std::io::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, dst: Uri) -> Self::Future { - self.connects.fetch_add(1, Ordering::SeqCst); - let closes = self.closes.clone(); - let is_proxy = self.is_proxy; - let is_alpn_h2 = self.alpn_h2; - - Box::pin(async move { - let host = dst.host().expect("no host in uri"); - let port = dst.port_u16().expect("no port in uri"); - - let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; - - Ok(DebugStream { - tcp: stream, - on_drop: closes, - is_alpn_h2, - is_proxy, - }) - }) - } - } - - struct DebugStream { - tcp: TcpStream, - on_drop: mpsc::Sender<()>, - is_alpn_h2: bool, - is_proxy: bool, - } - - impl Drop for DebugStream { - fn drop(&mut self) { - let _ = self.on_drop.try_send(()); - } - } - - impl AsyncWrite for DebugStream { - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.tcp).poll_shutdown(cx) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.tcp).poll_flush(cx) - } - - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.tcp).poll_write(cx, buf) - } - } - - impl AsyncRead for DebugStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.tcp).poll_read(cx, buf) - } - } - - impl Connection for DebugStream { - fn connected(&self) -> Connected { - let connected = Connected::new().proxy(self.is_proxy); - - if self.is_alpn_h2 { - connected.negotiated_h2() - } else { - connected - } - } - } -} - mod conn { use std::io::{self, Read, Write}; use std::net::{SocketAddr, TcpListener};