diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index c409e18e..978ca734 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -16,7 +16,7 @@ use futures_core::ready; use tokio::sync::mpsc::UnboundedSender; use crate::{ - conn::pool::{Inner, Pool}, + conn::pool::{Inner, Pool, QueuedWaker, QUEUE_END_ID}, error::Error, Conn, }; @@ -50,7 +50,9 @@ impl Future for DisconnectPool { self.pool_inner.close.store(true, atomic::Ordering::Release); let mut exchange = self.pool_inner.exchange.lock().unwrap(); exchange.spawn_futures_if_needed(&self.pool_inner); - exchange.waiting.push_back(cx.waker().clone()); + exchange + .waiting + .push(QueuedWaker::new(QUEUE_END_ID, cx.waker().clone())); drop(exchange); if self.pool_inner.closed.load(atomic::Ordering::Acquire) { diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 429a016a..843f8298 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -16,7 +16,10 @@ use std::{ use futures_core::ready; use crate::{ - conn::{pool::Pool, Conn}, + conn::{ + pool::{Pool, QueueId}, + Conn, + }, error::*, }; @@ -58,6 +61,7 @@ impl GetConnInner { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct GetConn { + pub(crate) queue_id: Option, pub(crate) pool: Option, pub(crate) inner: GetConnInner, } @@ -65,6 +69,7 @@ pub struct GetConn { impl GetConn { pub(crate) fn new(pool: &Pool) -> GetConn { GetConn { + queue_id: None, pool: Some(pool.clone()), inner: GetConnInner::New, } @@ -91,23 +96,26 @@ impl Future for GetConn { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match self.inner { - GetConnInner::New => match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx))? - .inner - .take() - { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); - } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") + GetConnInner::New => { + let queued = self.queue_id.is_some(); + let queue_id = *self.queue_id.get_or_insert_with(QueueId::next); + let next = + ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?; + match next { + GetConnInner::Connecting(conn_fut) => { + self.inner = GetConnInner::Connecting(conn_fut); + } + GetConnInner::Checking(conn_fut) => { + self.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") + } } - }, + } GetConnInner::Done => { unreachable!("GetConn::poll polled after returning Async::Ready"); } diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 64792c5e..a88e7a9a 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -10,7 +10,8 @@ use futures_util::FutureExt; use tokio::sync::mpsc; use std::{ - collections::VecDeque, + cmp::{Ordering, Reverse}, + collections::{BinaryHeap, VecDeque}, convert::TryFrom, pin::Pin, str::FromStr, @@ -62,7 +63,7 @@ impl From for IdlingConn { /// This is fine as long as we never do expensive work while holding the lock! #[derive(Debug)] struct Exchange { - waiting: VecDeque, + waiting: BinaryHeap, available: VecDeque, exist: usize, // only used to spawn the recycler the first time we're in async context @@ -87,6 +88,51 @@ impl Exchange { } } +const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) struct QueueId(Reverse); + +impl QueueId { + fn next() -> Self { + static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); + let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); + QueueId(Reverse(id)) + } +} + +#[derive(Debug)] +struct QueuedWaker { + queue_id: QueueId, + waker: Waker, +} + +impl QueuedWaker { + fn new(queue_id: QueueId, waker: Waker) -> Self { + QueuedWaker { queue_id, waker } + } +} + +impl Eq for QueuedWaker {} + +impl PartialEq for QueuedWaker { + fn eq(&self, other: &Self) -> bool { + self.queue_id == other.queue_id + } +} + +impl Ord for QueuedWaker { + fn cmp(&self, other: &Self) -> Ordering { + self.queue_id.cmp(&other.queue_id) + } +} + +impl PartialOrd for QueuedWaker { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Connection pool data. #[derive(Debug)] pub struct Inner { @@ -131,7 +177,7 @@ impl Pool { closed: false.into(), exchange: Mutex::new(Exchange { available: VecDeque::with_capacity(pool_opts.constraints().max()), - waiting: VecDeque::new(), + waiting: BinaryHeap::new(), exist: 0, recycler: Some((rx, pool_opts)), }), @@ -181,8 +227,8 @@ impl Pool { let mut exchange = self.inner.exchange.lock().unwrap(); if exchange.available.len() < self.opts.pool_opts().active_bound() { exchange.available.push_back(conn.into()); - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } return; } @@ -216,17 +262,27 @@ impl Pool { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= 1; // we just enabled the creation of a new connection! - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } /// Poll the pool for an available connection. - fn poll_new_conn(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_new_conn_inner(cx) - } - - fn poll_new_conn_inner(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_new_conn( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + queued: bool, + queue_id: QueueId, + ) -> Poll> { + self.poll_new_conn_inner(cx, queued, queue_id) + } + + fn poll_new_conn_inner( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + queued: bool, + queue_id: QueueId, + ) -> Poll> { let mut exchange = self.inner.exchange.lock().unwrap(); // NOTE: this load must happen while we hold the lock, @@ -238,18 +294,23 @@ impl Pool { exchange.spawn_futures_if_needed(&self.inner); + // Check if others are waiting and we're not queued. + if !exchange.waiting.is_empty() && !queued { + exchange + .waiting + .push(QueuedWaker::new(queue_id, cx.waker().clone())); + return Poll::Pending; + } + while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() { if !conn.expired() { - return Poll::Ready(Ok(GetConn { - pool: Some(self.clone()), - inner: GetConnInner::Checking( - async move { - conn.stream_mut()?.check().await?; - Ok(conn) - } - .boxed(), - ), - })); + return Poll::Ready(Ok(GetConnInner::Checking( + async move { + conn.stream_mut()?.check().await?; + Ok(conn) + } + .boxed(), + ))); } else { self.send_to_recycler(conn); } @@ -261,14 +322,15 @@ impl Pool { // we are allowed to make a new connection, so we will! exchange.exist += 1; - return Poll::Ready(Ok(GetConn { - pool: Some(self.clone()), - inner: GetConnInner::Connecting(Conn::new(self.opts.clone()).boxed()), - })); + return Poll::Ready(Ok(GetConnInner::Connecting( + Conn::new(self.opts.clone()).boxed(), + ))); } - // no go -- we have to wait - exchange.waiting.push_back(cx.waker().clone()); + // Polled, but no conn available? Back into the queue. + exchange + .waiting + .push(QueuedWaker::new(queue_id, cx.waker().clone())); Poll::Pending } } diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index b60066d1..02ea3ad3 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -76,8 +76,8 @@ impl Future for Recycler { $self.discard.push($conn.close_conn().boxed()); } else { exchange.available.push_back($conn.into()); - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } } @@ -163,8 +163,8 @@ impl Future for Recycler { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= self.discarded; for _ in 0..self.discarded { - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } drop(exchange); @@ -197,8 +197,8 @@ impl Future for Recycler { if self.inner.closed.load(Ordering::Acquire) { // `DisconnectPool` might still wait to be woken up. let mut exchange = self.inner.exchange.lock().unwrap(); - while let Some(w) = exchange.waiting.pop_front() { - w.wake(); + while let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } // we're about to exit, so there better be no outstanding connections assert_eq!(exchange.exist, 0);