diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index e52508b..46a03bc 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -119,4 +119,4 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@nightly - - run: cargo rustdoc -- --cfg docsrs -D rustdoc::broken-intra-doc-links + - run: cargo rustdoc --features full -- --cfg docsrs -D rustdoc::broken-intra-doc-links diff --git a/Cargo.toml b/Cargo.toml index b7646ca..130e48f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] -client-pool = [] +client-pool = ["dep:futures-util"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] diff --git a/src/client/pool/cache.rs b/src/client/pool/cache.rs new file mode 100644 index 0000000..1353c74 --- /dev/null +++ b/src/client/pool/cache.rs @@ -0,0 +1,457 @@ +//! A cache of services +//! +//! The cache is a single list of cached services, bundled with a `MakeService`. +//! Calling the cache returns either an existing service, or makes a new one. +//! The returned `impl Service` can be used to send requests, and when dropped, +//! it will try to be returned back to the cache. + +pub use self::internal::builder; + +#[cfg(docsrs)] +pub use self::internal::Builder; +#[cfg(docsrs)] +pub use self::internal::Cache; +#[cfg(docsrs)] +pub use self::internal::Cached; + +// For now, nothing else in this module is nameable. We can always make things +// more public, but we can't change type shapes (generics) once things are +// public. +mod internal { + use std::fmt; + use std::future::Future; + use std::pin::Pin; + use std::sync::{Arc, Mutex, Weak}; + use std::task::{self, Poll}; + + use futures_core::ready; + use futures_util::future; + use tokio::sync::oneshot; + use tower_service::Service; + + use super::events; + + /// Start a builder to construct a `Cache` pool. + pub fn builder() -> Builder { + Builder { + events: events::Ignore, + } + } + + /// A cache pool of services from the inner make service. + /// + /// Created with [`builder()`]. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Debug)] + pub struct Cache + where + M: Service, + { + connector: M, + shared: Arc>>, + events: Ev, + } + + /// A builder to configure a `Cache`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Debug)] + pub struct Builder { + events: Ev, + } + + /// A cached service returned from a [`Cache`]. + /// + /// Implements `Service` by delegating to the inner service. Once dropped, + /// tries to reinsert into the `Cache`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + pub struct Cached { + inner: Option, + shared: Weak>>, + // todo: on_idle + } + + pub enum CacheFuture + where + M: Service, + { + Racing { + shared: Arc>>, + select: future::Select, M::Future>, + events: Ev, + }, + Connecting { + // TODO: could be Weak even here... + shared: Arc>>, + future: M::Future, + }, + Cached { + svc: Option>, + }, + } + + // shouldn't be pub + #[derive(Debug)] + pub struct Shared { + services: Vec, + waiters: Vec>, + } + + // impl Builder + + impl Builder { + /// Provide a `Future` executor to be used by the `Cache`. + /// + /// The executor is used handle some optional background tasks that + /// can improve the behavior of the cache, such as reducing connection + /// thrashing when a race is won. If not configured with an executor, + /// the default behavior is to ignore any of these optional background + /// tasks. + /// + /// The executor should implmenent [`hyper::rt::Executor`]. + /// + /// # Example + /// + /// ```rust + /// # #[cfg(feature = "tokio")] + /// # fn run() { + /// let _builder = hyper_util::client::pool::cache::builder() + /// .executor(hyper_util::rt::TokioExecutor::new()); + /// # } + /// ``` + pub fn executor(self, exec: E) -> Builder> { + Builder { + events: events::WithExecutor(exec), + } + } + + /// Build a `Cache` pool around the `connector`. + pub fn build(self, connector: M) -> Cache + where + M: Service, + { + Cache { + connector, + events: self.events, + shared: Arc::new(Mutex::new(Shared { + services: Vec::new(), + waiters: Vec::new(), + })), + } + } + } + + // impl Cache + + impl Service for Cache + where + M: Service, + M::Future: Unpin, + M::Response: Unpin, + Ev: events::Events> + Clone + Unpin, + { + type Response = Cached; + type Error = M::Error; + type Future = CacheFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if !self.shared.lock().unwrap().services.is_empty() { + Poll::Ready(Ok(())) + } else { + self.connector.poll_ready(cx) + } + } + + fn call(&mut self, target: Dst) -> Self::Future { + // 1. If already cached, easy! + let waiter = { + let mut locked = self.shared.lock().unwrap(); + if let Some(found) = locked.take() { + return CacheFuture::Cached { + svc: Some(Cached::new(found, Arc::downgrade(&self.shared))), + }; + } + + let (tx, rx) = oneshot::channel(); + locked.waiters.push(tx); + rx + }; + + // 2. Otherwise, we start a new connect, and also listen for + // any newly idle. + CacheFuture::Racing { + shared: self.shared.clone(), + select: future::select(waiter, self.connector.call(target)), + events: self.events.clone(), + } + } + } + + impl Clone for Cache + where + M: Service + Clone, + Ev: Clone, + { + fn clone(&self) -> Self { + Self { + connector: self.connector.clone(), + events: self.events.clone(), + shared: self.shared.clone(), + } + } + } + + impl Future for CacheFuture + where + M: Service, + M::Future: Unpin, + M::Response: Unpin, + Ev: events::Events> + Unpin, + { + type Output = Result, M::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + loop { + match &mut *self.as_mut() { + CacheFuture::Racing { + shared, + select, + events, + } => { + match ready!(Pin::new(select).poll(cx)) { + future::Either::Left((Err(_pool_closed), connecting)) => { + // pool was dropped, so we'll never get it from a waiter, + // but if this future still exists, then the user still + // wants a connection. just wait for the connecting + *self = CacheFuture::Connecting { + shared: shared.clone(), + future: connecting, + }; + } + future::Either::Left((Ok(pool_got), connecting)) => { + events.on_race_lost(BackgroundConnect { + future: connecting, + shared: Arc::downgrade(&shared), + }); + return Poll::Ready(Ok(Cached::new( + pool_got, + Arc::downgrade(&shared), + ))); + } + future::Either::Right((connected, _waiter)) => { + let inner = connected?; + return Poll::Ready(Ok(Cached::new( + inner, + Arc::downgrade(&shared), + ))); + } + } + } + CacheFuture::Connecting { shared, future } => { + let inner = ready!(Pin::new(future).poll(cx))?; + return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared)))); + } + CacheFuture::Cached { svc } => { + return Poll::Ready(Ok(svc.take().unwrap())); + } + } + } + } + } + + // impl Cached + + impl Cached { + fn new(inner: S, shared: Weak>>) -> Self { + Cached { + inner: Some(inner), + shared, + } + } + } + + impl Service for Cached + where + S: Service, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.inner.as_mut().unwrap().poll_ready(cx) + } + + fn call(&mut self, req: Req) -> Self::Future { + self.inner.as_mut().unwrap().call(req) + } + } + + impl Drop for Cached { + fn drop(&mut self) { + if let Some(value) = self.inner.take() { + if let Some(shared) = self.shared.upgrade() { + if let Ok(mut shared) = shared.lock() { + shared.put(value); + } + } + } + } + } + + impl fmt::Debug for Cached { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Cached") + .field(self.inner.as_ref().unwrap()) + .finish() + } + } + + // impl Shared + + impl Shared { + fn put(&mut self, val: V) { + let mut val = Some(val); + while let Some(tx) = self.waiters.pop() { + if !tx.is_closed() { + match tx.send(val.take().unwrap()) { + Ok(()) => break, + Err(v) => { + val = Some(v); + } + } + } + } + + if let Some(val) = val { + self.services.push(val); + } + } + + fn take(&mut self) -> Option { + // TODO: take in a loop + self.services.pop() + } + } + + pub struct BackgroundConnect { + future: CF, + shared: Weak>>, + } + + impl Future for BackgroundConnect + where + CF: Future> + Unpin, + { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.future).poll(cx)) { + Ok(svc) => { + if let Some(shared) = self.shared.upgrade() { + if let Ok(mut locked) = shared.lock() { + locked.put(svc); + } + } + Poll::Ready(()) + } + Err(_e) => Poll::Ready(()), + } + } + } +} + +mod events { + #[derive(Clone, Debug)] + #[non_exhaustive] + pub struct Ignore; + + #[derive(Clone, Debug)] + pub struct WithExecutor(pub(super) E); + + pub trait Events { + fn on_race_lost(&self, fut: CF); + } + + impl Events for Ignore { + fn on_race_lost(&self, _fut: CF) {} + } + + impl Events for WithExecutor + where + E: hyper::rt::Executor, + { + fn on_race_lost(&self, fut: CF) { + self.0.execute(fut); + } + } +} + +#[cfg(test)] +mod tests { + use futures_util::future; + use tower_service::Service; + use tower_test::assert_request_eq; + + #[tokio::test] + async fn test_makes_svc_when_empty() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + handle.allow(1); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + + let f = cache.call(1); + + future::join(f, async move { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + } + + #[tokio::test] + async fn test_reuses_after_idle() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + + // only 1 connection should ever be made + handle.allow(1); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let f = cache.call(1); + let cached = future::join(f, async { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + drop(cached); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let f = cache.call(1); + let cached = f.await.expect("call"); + drop(cached); + } +} diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index c0e9bfe..a17acd3 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -1,3 +1,4 @@ //! Composable pool services +pub mod cache; pub mod singleton;