Skip to content

Commit 99e0932

Browse files
committed
feat(pool): add a Singleton pool type
1 parent 3021828 commit 99e0932

File tree

4 files changed

+307
-0
lines changed

4 files changed

+307
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4242
http-body-util = "0.1.0"
4343
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4444
tokio-test = "0.4"
45+
tower-test = "0.4"
4546
pretty_env_logger = "0.5"
4647

4748
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
@@ -60,6 +61,7 @@ default = []
6061
full = [
6162
"client",
6263
"client-legacy",
64+
"client-pool",
6365
"client-proxy",
6466
"client-proxy-system",
6567
"server",
@@ -74,6 +76,7 @@ full = [
7476

7577
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7678
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
79+
client-pool = []
7780
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7881
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
7982

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
#[cfg(feature = "client-legacy")]
55
pub mod legacy;
66

7+
#[cfg(feature = "client-pool")]
8+
pub mod pool;
9+
710
#[cfg(feature = "client-proxy")]
811
pub mod proxy;

src/client/pool/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//! Composable pool services
2+
3+
mod singleton;
4+
5+
pub use self::singleton::Singleton;

src/client/pool/singleton.rs

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::sync::{Arc, Mutex, Weak};
4+
use std::task::{self, Poll};
5+
6+
use futures_core::ready;
7+
use pin_project_lite::pin_project;
8+
use tokio::sync::oneshot;
9+
use tower_service::Service;
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
12+
13+
/// A singleton pool over an inner service.
14+
#[derive(Clone, Debug)]
15+
pub struct Singleton<M, Dst>
16+
where
17+
M: Service<Dst>,
18+
{
19+
mk_svc: M,
20+
state: Arc<Mutex<State<M::Response>>>,
21+
}
22+
23+
pin_project! {
24+
#[project = SingletonFutureProj]
25+
pub enum SingletonFuture<F, S> {
26+
Driving {
27+
#[pin]
28+
future: F,
29+
singleton: DitchGuard<S>,
30+
},
31+
Waiting {
32+
rx: oneshot::Receiver<S>,
33+
},
34+
Made {
35+
svc: Option<S>,
36+
},
37+
}
38+
}
39+
40+
// XXX: pub because of the enum SingletonFuture
41+
pub struct DitchGuard<S>(Weak<Mutex<State<S>>>);
42+
43+
#[derive(Debug)]
44+
enum State<S> {
45+
Empty,
46+
Making(Vec<oneshot::Sender<S>>),
47+
Made(S),
48+
}
49+
50+
impl<M, Target> Singleton<M, Target>
51+
where
52+
M: Service<Target>,
53+
M::Response: Clone,
54+
{
55+
/// Create a new singleton pool over an inner make service.
56+
pub fn new(mk_svc: M) -> Self {
57+
Singleton {
58+
mk_svc,
59+
state: Arc::new(Mutex::new(State::Empty)),
60+
}
61+
}
62+
63+
// pub fn reset?
64+
// pub fn retain?
65+
}
66+
67+
impl<M, Target> Service<Target> for Singleton<M, Target>
68+
where
69+
M: Service<Target>,
70+
M::Response: Clone,
71+
M::Error: Into<BoxError>,
72+
{
73+
type Response = M::Response;
74+
type Error = SingletonError;
75+
type Future = SingletonFuture<M::Future, M::Response>;
76+
77+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
78+
if let State::Empty = *self.state.lock().unwrap() {
79+
return self
80+
.mk_svc
81+
.poll_ready(cx)
82+
.map_err(|e| SingletonError(e.into()));
83+
}
84+
Poll::Ready(Ok(()))
85+
}
86+
87+
fn call(&mut self, dst: Target) -> Self::Future {
88+
let mut locked = self.state.lock().unwrap();
89+
match *locked {
90+
State::Empty => {
91+
let fut = self.mk_svc.call(dst);
92+
*locked = State::Making(Vec::new());
93+
SingletonFuture::Driving {
94+
future: fut,
95+
singleton: DitchGuard(Arc::downgrade(&self.state)),
96+
}
97+
}
98+
State::Making(ref mut waiters) => {
99+
let (tx, rx) = oneshot::channel();
100+
waiters.push(tx);
101+
SingletonFuture::Waiting { rx }
102+
}
103+
State::Made(ref svc) => SingletonFuture::Made {
104+
svc: Some(svc.clone()),
105+
},
106+
}
107+
}
108+
}
109+
110+
impl<F, S, E> Future for SingletonFuture<F, S>
111+
where
112+
F: Future<Output = Result<S, E>>,
113+
E: Into<BoxError>,
114+
S: Clone,
115+
{
116+
type Output = Result<S, SingletonError>;
117+
118+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
119+
match self.project() {
120+
SingletonFutureProj::Driving { future, singleton } => {
121+
match ready!(future.poll(cx)) {
122+
Ok(svc) => {
123+
if let Some(state) = singleton.0.upgrade() {
124+
let mut locked = state.lock().unwrap();
125+
singleton.0 = Weak::new();
126+
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
127+
State::Making(waiters) => {
128+
for tx in waiters {
129+
let _ = tx.send(svc.clone());
130+
}
131+
}
132+
State::Empty | State::Made(_) => {
133+
// shouldn't happen!
134+
}
135+
}
136+
}
137+
Poll::Ready(Ok(svc))
138+
}
139+
Err(e) => {
140+
if let Some(state) = singleton.0.upgrade() {
141+
let mut locked = state.lock().unwrap();
142+
singleton.0 = Weak::new();
143+
*locked = State::Empty;
144+
}
145+
Poll::Ready(Err(SingletonError(e.into())))
146+
}
147+
}
148+
}
149+
SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) {
150+
Ok(svc) => Poll::Ready(Ok(svc)),
151+
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
152+
},
153+
SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())),
154+
}
155+
}
156+
}
157+
158+
impl<S> Drop for DitchGuard<S> {
159+
fn drop(&mut self) {
160+
if let Some(state) = self.0.upgrade() {
161+
if let Ok(mut locked) = state.lock() {
162+
*locked = State::Empty;
163+
}
164+
}
165+
}
166+
}
167+
168+
// An opaque error type. By not exposing the type, nor being specifically
169+
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
170+
// error type. This will be possible with the refactor to baton passing.
171+
#[derive(Debug)]
172+
pub struct SingletonError(BoxError);
173+
174+
impl std::fmt::Display for SingletonError {
175+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176+
f.write_str("singleton connection error")
177+
}
178+
}
179+
180+
impl std::error::Error for SingletonError {
181+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
182+
Some(&*self.0)
183+
}
184+
}
185+
186+
#[derive(Debug)]
187+
struct Canceled;
188+
189+
impl std::fmt::Display for Canceled {
190+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191+
f.write_str("singleton connection canceled")
192+
}
193+
}
194+
195+
impl std::error::Error for Canceled {}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use std::future::Future;
200+
use std::pin::Pin;
201+
use std::task::Poll;
202+
203+
use tower_service::Service;
204+
205+
use super::Singleton;
206+
207+
#[tokio::test]
208+
async fn first_call_drives_subsequent_wait() {
209+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
210+
211+
let mut singleton = Singleton::new(mock_svc);
212+
213+
handle.allow(1);
214+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
215+
.await
216+
.unwrap();
217+
// First call: should go into Driving
218+
let fut1 = singleton.call(());
219+
// Second call: should go into Waiting
220+
let fut2 = singleton.call(());
221+
222+
// Expect exactly one request to the inner service
223+
let ((), send_response) = handle.next_request().await.unwrap();
224+
send_response.send_response("svc");
225+
226+
// Both futures should resolve to the same value
227+
assert_eq!(fut1.await.unwrap(), "svc");
228+
assert_eq!(fut2.await.unwrap(), "svc");
229+
}
230+
231+
#[tokio::test]
232+
async fn made_state_returns_immediately() {
233+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
234+
let mut singleton = Singleton::new(mock_svc);
235+
236+
handle.allow(1);
237+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
238+
.await
239+
.unwrap();
240+
// Drive first call to completion
241+
let fut1 = singleton.call(());
242+
let ((), send_response) = handle.next_request().await.unwrap();
243+
send_response.send_response("svc");
244+
assert_eq!(fut1.await.unwrap(), "svc");
245+
246+
// Second call should not hit inner service
247+
let res = singleton.call(()).await.unwrap();
248+
assert_eq!(res, "svc");
249+
}
250+
251+
#[tokio::test]
252+
async fn cancel_waiter_does_not_affect_others() {
253+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
254+
let mut singleton = Singleton::new(mock_svc);
255+
256+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
257+
.await
258+
.unwrap();
259+
let fut1 = singleton.call(());
260+
let fut2 = singleton.call(());
261+
drop(fut2); // cancel one waiter
262+
263+
let ((), send_response) = handle.next_request().await.unwrap();
264+
send_response.send_response("svc");
265+
266+
assert_eq!(fut1.await.unwrap(), "svc");
267+
}
268+
269+
// TODO: this should be able to be improved with a cooperative baton refactor
270+
#[tokio::test]
271+
async fn cancel_driver_cancels_all() {
272+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
273+
let mut singleton = Singleton::new(mock_svc);
274+
275+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
276+
.await
277+
.unwrap();
278+
let mut fut1 = singleton.call(());
279+
let fut2 = singleton.call(());
280+
281+
// poll driver just once, and then drop
282+
crate::common::future::poll_fn(move |cx| {
283+
let _ = Pin::new(&mut fut1).poll(cx);
284+
Poll::Ready(())
285+
})
286+
.await;
287+
288+
let ((), send_response) = handle.next_request().await.unwrap();
289+
send_response.send_response("svc");
290+
291+
assert_eq!(
292+
fut2.await.unwrap_err().0.to_string(),
293+
"singleton connection canceled"
294+
);
295+
}
296+
}

0 commit comments

Comments
 (0)