Skip to content

Commit

Permalink
fix(client): try to reuse connections when pool checkout wins
Browse files Browse the repository at this point in the history
If a checkout wins, meaning an idle connection became available before
a connect future completed, instead of just dropping the connect future,
it spawns it into the background executor to allow being placed into
the pool on completion.
  • Loading branch information
seanmonstar committed Jun 28, 2018
1 parent 1f95f58 commit f2d464a
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 37 deletions.
60 changes: 52 additions & 8 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use http::uri::Scheme;

use body::{Body, Payload};
use common::Exec;
use common::lazy as hyper_lazy;
use self::connect::{Connect, Destination};
use self::pool::{Pool, Poolable, Reservation};

Expand Down Expand Up @@ -274,7 +275,7 @@ where C: Connect + Sync + 'static,
let dst = Destination {
uri: url,
};
future::lazy(move || {
hyper_lazy(move || {
if let Some(connecting) = pool.connecting(&pool_key) {
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
Expand Down Expand Up @@ -318,9 +319,43 @@ where C: Connect + Sync + 'static,
})
};

let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.or_else(|(e, other)| {
let executor = self.executor.clone();
// The order of the `select` is depended on below...
let race = checkout.select2(connect)
.map(move |either| match either {
// Checkout won, connect future may have been started or not.
//
// If it has, let it finish and insert back into the pool,
// so as to not waste the socket...
Either::A((checked_out, connecting)) => {
// This depends on the `select` above having the correct
// order, such that if the checkout future were ready
// immediately, the connect future will never have been
// started.
//
// If it *wasn't* ready yet, then the connect future will
// have been started...
if connecting.started() {
let bg = connecting
.map(|_pooled| {
// dropping here should just place it in
// the Pool for us...
})
.map_err(|err| {
trace!("background connect error: {}", err);
});
// An execute error here isn't important, we're just trying
// to prevent a waste of a socket...
let _ = executor.execute(bg);
}
checked_out
},
// Connect won, checkout can just be dropped.
Either::B((connected, _checkout)) => {
connected
},
})
.or_else(|either| match either {
// Either checkout or connect could get canceled:
//
// 1. Connect is canceled if this is HTTP/2 and there is
Expand All @@ -329,10 +364,19 @@ where C: Connect + Sync + 'static,
// idle connection reliably.
//
// In both cases, we should just wait for the other future.
if e.is_canceled() {
Either::A(other.map_err(ClientError::Normal))
} else {
Either::B(future::err(ClientError::Normal(e)))
Either::A((err, connecting)) => {
if err.is_canceled() {
Either::A(Either::A(connecting.map_err(ClientError::Normal)))
} else {
Either::B(future::err(ClientError::Normal(err)))
}
},
Either::B((err, checkout)) => {
if err.is_canceled() {
Either::A(Either::B(checkout.map_err(ClientError::Normal)))
} else {
Either::B(future::err(ClientError::Normal(err)))
}
}
});

Expand Down
20 changes: 20 additions & 0 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,26 @@ impl<T: Poolable> Pool<T> {
}
}

#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn h1_key(&self, s: &str) -> Key {
(Arc::new(s.to_string()), Ver::Http1)
}

#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn idle_count(&self, key: &Key) -> usize {
self
.inner
.connections
.lock()
.unwrap()
.idle
.get(key)
.map(|list| list.len())
.unwrap_or(0)
}

fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.connections.lock().unwrap();
Expand Down
97 changes: 94 additions & 3 deletions src/client/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![cfg(feature = "runtime")]
extern crate pretty_env_logger;

use futures::Async;
use futures::{Async, Future, Stream};
use futures::future::poll_fn;
use futures::sync::oneshot;
use tokio::runtime::current_thread::Runtime;

use mock::MockConnector;
Expand Down Expand Up @@ -73,8 +74,6 @@ fn conn_reset_after_write() {
{
let req = Request::builder()
.uri("http://mock.local/a")
//TODO: remove this header when auto lengths are fixed
.header("content-length", "0")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
Expand Down Expand Up @@ -110,3 +109,95 @@ fn conn_reset_after_write() {
}
}

#[test]
fn checkout_win_allows_connect_future_to_be_pooled() {
let _ = pretty_env_logger::try_init();

let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();


let (tx, rx) = oneshot::channel::<()>();
let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock_fut("http://mock.local", rx);

let client = Client::builder()
.build::<_, ::Body>(connector);

client.pool.no_timer();

let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse");

// First request just sets us up to have a connection able to be put
// back in the pool. *However*, it doesn't insert immediately. The
// body has 1 pending byte, and we will only drain in request 2, once
// the connect future has been started.
let mut body = {
let res1 = client.get(uri.clone())
.map(|res| res.into_body().concat2());
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e));

rt.block_on(res1.join(srv1)).expect("res1").0
};


// The second request triggers the only mocked connect future, but then
// the drained body allows the first socket to go back to the pool,
// "winning" the checkout race.
{
let res2 = client.get(uri.clone());
let drain = poll_fn(move || {
body.poll()
});
let srv2 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e));

rt.block_on(res2.join(drain).join(srv2)).expect("res2");
}

// "Release" the mocked connect future, and let the runtime spin once so
// it's all setup...
{
let mut tx = Some(tx);
let client = &client;
let key = client.pool.h1_key("http://mock.local");
let mut tick_cnt = 0;
let fut = poll_fn(move || {
tx.take();

if client.pool.idle_count(&key) == 0 {
tick_cnt += 1;
assert!(tick_cnt < 10, "ticked too many times waiting for idle");
trace!("no idle yet; tick count: {}", tick_cnt);
::futures::task::current().notify();
Ok(Async::NotReady)
} else {
Ok::<_, ()>(Async::Ready(()))
}
});
rt.block_on(fut).unwrap();
}

// Third request just tests out that the "loser" connection was pooled. If
// it isn't, this will panic since the MockConnector doesn't have any more
// mocks to give out.
{
let res3 = client.get(uri);
let srv3 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv3 poll_fn error: {}", e));

rt.block_on(res3.join(srv3)).expect("res3");
}
}


63 changes: 63 additions & 0 deletions src/common/lazy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::mem;

use futures::{Future, IntoFuture, Poll};

pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
Lazy {
inner: Inner::Init(func),
}
}

pub struct Lazy<F, R: IntoFuture> {
inner: Inner<F, R::Future>
}

enum Inner<F, R> {
Init(F),
Fut(R),
Empty,
}

impl<F, R> Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
pub fn started(&self) -> bool {
match self.inner {
Inner::Init(_) => false,
Inner::Fut(_) |
Inner::Empty => true,
}
}
}

impl<F, R> Future for Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
type Item = R::Item;
type Error = R::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Inner::Fut(ref mut f) => return f.poll(),
_ => (),
}

match mem::replace(&mut self.inner, Inner::Empty) {
Inner::Init(func) => {
let mut fut = func().into_future();
let ret = fut.poll();
self.inner = Inner::Fut(fut);
ret
},
_ => unreachable!("lazy state wrong"),
}
}
}
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod buf;
mod exec;
pub(crate) mod io;
mod lazy;
mod never;

pub(crate) use self::buf::StaticBuf;
pub(crate) use self::exec::Exec;
pub(crate) use self::lazy::lazy;
pub use self::never::Never;
Loading

0 comments on commit f2d464a

Please sign in to comment.