From 9b4fc0b1fcb428862df3054e9fdad3d201a23660 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Wed, 2 Nov 2022 00:03:30 +0100 Subject: [PATCH 1/4] GetConnInner::Queued --- src/conn/pool/futures/get_conn.rs | 60 +++++++++++++++++++++++-------- src/conn/pool/mod.rs | 28 +++++++++++---- 2 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 429a016a..06645db2 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -23,6 +23,7 @@ use crate::{ /// States of the GetConn future. pub(crate) enum GetConnInner { New, + Queued, Done, // TODO: one day this should be an existential Connecting(crate::BoxFuture<'static, Conn>), @@ -34,6 +35,7 @@ impl fmt::Debug for GetConnInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(), + GetConnInner::Queued => f.debug_tuple("GetConnInner::Queued").finish(), GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(), GetConnInner::Connecting(_) => f .debug_tuple("GetConnInner::Connecting") @@ -91,23 +93,51 @@ impl Future for GetConn { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match self.inner { - GetConnInner::New => match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx))? - .inner - .take() - { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); - } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") + GetConnInner::New => match Pin::new(self.pool_mut()).poll_new_conn(cx, false) { + Poll::Pending => { + self.inner = GetConnInner::Queued; + return Poll::Pending; } + Poll::Ready(res) => match res?.inner.take() { + GetConnInner::Connecting(conn_fut) => { + self.inner = GetConnInner::Connecting(conn_fut); + } + GetConnInner::Checking(conn_fut) => { + self.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") + } + GetConnInner::Queued => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::Queued") + } + }, }, + GetConnInner::Queued => { + match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, true))? + .inner + .take() + { + GetConnInner::Connecting(conn_fut) => { + self.inner = GetConnInner::Connecting(conn_fut); + } + GetConnInner::Checking(conn_fut) => { + self.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") + } + GetConnInner::Queued => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::Queued") + } + } + } GetConnInner::Done => { unreachable!("GetConn::poll polled after returning Async::Ready"); } diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 64792c5e..f8eddeae 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -222,11 +222,19 @@ impl Pool { } /// Poll the pool for an available connection. - fn poll_new_conn(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_new_conn_inner(cx) - } - - fn poll_new_conn_inner(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_new_conn( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + queued: bool, + ) -> Poll> { + self.poll_new_conn_inner(cx, queued) + } + + fn poll_new_conn_inner( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + queued: bool, + ) -> Poll> { let mut exchange = self.inner.exchange.lock().unwrap(); // NOTE: this load must happen while we hold the lock, @@ -238,6 +246,12 @@ impl Pool { exchange.spawn_futures_if_needed(&self.inner); + // Check if others are waiting and we're not queued. + if !exchange.waiting.is_empty() && !queued { + exchange.waiting.push_back(cx.waker().clone()); + return Poll::Pending; + } + while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() { if !conn.expired() { return Poll::Ready(Ok(GetConn { @@ -267,8 +281,8 @@ impl Pool { })); } - // no go -- we have to wait - exchange.waiting.push_back(cx.waker().clone()); + // Polled, but no conn available? Back to the front of the queue. + exchange.waiting.push_front(cx.waker().clone()); Poll::Pending } } From b6f3cb3942af57571913d23dee580a1c20e39152 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 4 Nov 2022 19:56:37 +0100 Subject: [PATCH 2/4] Use a BinaryHeap for the waiting queue --- src/conn/pool/futures/disconnect_pool.rs | 6 +- src/conn/pool/futures/get_conn.rs | 69 +++++++++++++--------- src/conn/pool/mod.rs | 74 ++++++++++++++++++++---- src/conn/pool/recycler.rs | 12 ++-- 4 files changed, 114 insertions(+), 47 deletions(-) diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index c409e18e..978ca734 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -16,7 +16,7 @@ use futures_core::ready; use tokio::sync::mpsc::UnboundedSender; use crate::{ - conn::pool::{Inner, Pool}, + conn::pool::{Inner, Pool, QueuedWaker, QUEUE_END_ID}, error::Error, Conn, }; @@ -50,7 +50,9 @@ impl Future for DisconnectPool { self.pool_inner.close.store(true, atomic::Ordering::Release); let mut exchange = self.pool_inner.exchange.lock().unwrap(); exchange.spawn_futures_if_needed(&self.pool_inner); - exchange.waiting.push_back(cx.waker().clone()); + exchange + .waiting + .push(QueuedWaker::new(QUEUE_END_ID, cx.waker().clone())); drop(exchange); if self.pool_inner.closed.load(atomic::Ordering::Acquire) { diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 06645db2..9fdc0bcd 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -16,14 +16,17 @@ use std::{ use futures_core::ready; use crate::{ - conn::{pool::Pool, Conn}, + conn::{ + pool::{Pool, QueueId}, + Conn, + }, error::*, }; /// States of the GetConn future. pub(crate) enum GetConnInner { New, - Queued, + Queued(QueueId), Done, // TODO: one day this should be an existential Connecting(crate::BoxFuture<'static, Conn>), @@ -35,7 +38,10 @@ impl fmt::Debug for GetConnInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(), - GetConnInner::Queued => f.debug_tuple("GetConnInner::Queued").finish(), + GetConnInner::Queued(queue_id) => f + .debug_tuple("GetConnInner::Queued") + .field(queue_id) + .finish(), GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(), GetConnInner::Connecting(_) => f .debug_tuple("GetConnInner::Connecting") @@ -93,31 +99,38 @@ impl Future for GetConn { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match self.inner { - GetConnInner::New => match Pin::new(self.pool_mut()).poll_new_conn(cx, false) { - Poll::Pending => { - self.inner = GetConnInner::Queued; - return Poll::Pending; - } - Poll::Ready(res) => match res?.inner.take() { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); + GetConnInner::New => { + let queue_id = QueueId::next(); + match Pin::new(self.pool_mut()).poll_new_conn(cx, false, queue_id) { + Poll::Pending => { + self.inner = GetConnInner::Queued(queue_id); + return Poll::Pending; } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") - } - GetConnInner::Queued => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::Queued") - } - }, - }, - GetConnInner::Queued => { - match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, true))? + Poll::Ready(res) => match res?.inner.take() { + GetConnInner::Connecting(conn_fut) => { + self.inner = GetConnInner::Connecting(conn_fut); + } + GetConnInner::Checking(conn_fut) => { + self.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!( + "Pool::poll_new_conn never gives out GetConnInner::New" + ) + } + GetConnInner::Queued(_) => { + unreachable!( + "Pool::poll_new_conn never gives out GetConnInner::Queued" + ) + } + }, + } + } + GetConnInner::Queued(queue_id) => { + match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, true, queue_id))? .inner .take() { @@ -133,7 +146,7 @@ impl Future for GetConn { GetConnInner::New => { unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") } - GetConnInner::Queued => { + GetConnInner::Queued(_) => { unreachable!("Pool::poll_new_conn never gives out GetConnInner::Queued") } } diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index f8eddeae..6def9510 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -10,7 +10,8 @@ use futures_util::FutureExt; use tokio::sync::mpsc; use std::{ - collections::VecDeque, + cmp::{Ordering, Reverse}, + collections::{BinaryHeap, VecDeque}, convert::TryFrom, pin::Pin, str::FromStr, @@ -62,7 +63,7 @@ impl From for IdlingConn { /// This is fine as long as we never do expensive work while holding the lock! #[derive(Debug)] struct Exchange { - waiting: VecDeque, + waiting: BinaryHeap, available: VecDeque, exist: usize, // only used to spawn the recycler the first time we're in async context @@ -87,6 +88,51 @@ impl Exchange { } } +const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) struct QueueId(Reverse); + +impl QueueId { + fn next() -> Self { + static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); + let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); + QueueId(Reverse(id)) + } +} + +#[derive(Debug)] +struct QueuedWaker { + queue_id: QueueId, + waker: Waker, +} + +impl QueuedWaker { + fn new(queue_id: QueueId, waker: Waker) -> Self { + QueuedWaker { queue_id, waker } + } +} + +impl Eq for QueuedWaker {} + +impl PartialEq for QueuedWaker { + fn eq(&self, other: &Self) -> bool { + self.queue_id == other.queue_id + } +} + +impl Ord for QueuedWaker { + fn cmp(&self, other: &Self) -> Ordering { + self.queue_id.cmp(&other.queue_id) + } +} + +impl PartialOrd for QueuedWaker { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Connection pool data. #[derive(Debug)] pub struct Inner { @@ -131,7 +177,7 @@ impl Pool { closed: false.into(), exchange: Mutex::new(Exchange { available: VecDeque::with_capacity(pool_opts.constraints().max()), - waiting: VecDeque::new(), + waiting: BinaryHeap::new(), exist: 0, recycler: Some((rx, pool_opts)), }), @@ -181,8 +227,8 @@ impl Pool { let mut exchange = self.inner.exchange.lock().unwrap(); if exchange.available.len() < self.opts.pool_opts().active_bound() { exchange.available.push_back(conn.into()); - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } return; } @@ -216,8 +262,8 @@ impl Pool { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= 1; // we just enabled the creation of a new connection! - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } @@ -226,14 +272,16 @@ impl Pool { self: Pin<&mut Self>, cx: &mut Context<'_>, queued: bool, + queue_id: QueueId, ) -> Poll> { - self.poll_new_conn_inner(cx, queued) + self.poll_new_conn_inner(cx, queued, queue_id) } fn poll_new_conn_inner( self: Pin<&mut Self>, cx: &mut Context<'_>, queued: bool, + queue_id: QueueId, ) -> Poll> { let mut exchange = self.inner.exchange.lock().unwrap(); @@ -248,7 +296,9 @@ impl Pool { // Check if others are waiting and we're not queued. if !exchange.waiting.is_empty() && !queued { - exchange.waiting.push_back(cx.waker().clone()); + exchange + .waiting + .push(QueuedWaker::new(queue_id, cx.waker().clone())); return Poll::Pending; } @@ -281,8 +331,10 @@ impl Pool { })); } - // Polled, but no conn available? Back to the front of the queue. - exchange.waiting.push_front(cx.waker().clone()); + // Polled, but no conn available? Back into the queue. + exchange + .waiting + .push(QueuedWaker::new(queue_id, cx.waker().clone())); Poll::Pending } } diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index b60066d1..02ea3ad3 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -76,8 +76,8 @@ impl Future for Recycler { $self.discard.push($conn.close_conn().boxed()); } else { exchange.available.push_back($conn.into()); - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } } @@ -163,8 +163,8 @@ impl Future for Recycler { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= self.discarded; for _ in 0..self.discarded { - if let Some(w) = exchange.waiting.pop_front() { - w.wake(); + if let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } } drop(exchange); @@ -197,8 +197,8 @@ impl Future for Recycler { if self.inner.closed.load(Ordering::Acquire) { // `DisconnectPool` might still wait to be woken up. let mut exchange = self.inner.exchange.lock().unwrap(); - while let Some(w) = exchange.waiting.pop_front() { - w.wake(); + while let Some(qw) = exchange.waiting.pop() { + qw.waker.wake(); } // we're about to exit, so there better be no outstanding connections assert_eq!(exchange.exist, 0); From 8dfc923888392d3474476fe8a42b369655874530 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Sat, 12 Nov 2022 19:01:09 +0300 Subject: [PATCH 3/4] GetConn: move queue_id up one level, remove duplicated code --- src/conn/pool/futures/get_conn.rs | 49 +++++-------------------------- src/conn/pool/mod.rs | 28 ++++++++---------- 2 files changed, 19 insertions(+), 58 deletions(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 9fdc0bcd..3f56d04d 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -26,7 +26,6 @@ use crate::{ /// States of the GetConn future. pub(crate) enum GetConnInner { New, - Queued(QueueId), Done, // TODO: one day this should be an existential Connecting(crate::BoxFuture<'static, Conn>), @@ -38,10 +37,6 @@ impl fmt::Debug for GetConnInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(), - GetConnInner::Queued(queue_id) => f - .debug_tuple("GetConnInner::Queued") - .field(queue_id) - .finish(), GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(), GetConnInner::Connecting(_) => f .debug_tuple("GetConnInner::Connecting") @@ -66,6 +61,7 @@ impl GetConnInner { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct GetConn { + pub(crate) queue_id: Option, pub(crate) pool: Option, pub(crate) inner: GetConnInner, } @@ -73,6 +69,7 @@ pub struct GetConn { impl GetConn { pub(crate) fn new(pool: &Pool) -> GetConn { GetConn { + queue_id: None, pool: Some(pool.clone()), inner: GetConnInner::New, } @@ -100,40 +97,11 @@ impl Future for GetConn { loop { match self.inner { GetConnInner::New => { - let queue_id = QueueId::next(); - match Pin::new(self.pool_mut()).poll_new_conn(cx, false, queue_id) { - Poll::Pending => { - self.inner = GetConnInner::Queued(queue_id); - return Poll::Pending; - } - Poll::Ready(res) => match res?.inner.take() { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); - } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!( - "Pool::poll_new_conn never gives out GetConnInner::New" - ) - } - GetConnInner::Queued(_) => { - unreachable!( - "Pool::poll_new_conn never gives out GetConnInner::Queued" - ) - } - }, - } - } - GetConnInner::Queued(queue_id) => { - match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, true, queue_id))? - .inner - .take() - { + let queued = self.queue_id.is_some(); + let queue_id = *self.queue_id.get_or_insert_with(|| QueueId::next()); + let next = + ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?; + match next { GetConnInner::Connecting(conn_fut) => { self.inner = GetConnInner::Connecting(conn_fut); } @@ -146,9 +114,6 @@ impl Future for GetConn { GetConnInner::New => { unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") } - GetConnInner::Queued(_) => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::Queued") - } } } GetConnInner::Done => { diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 6def9510..a88e7a9a 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -273,7 +273,7 @@ impl Pool { cx: &mut Context<'_>, queued: bool, queue_id: QueueId, - ) -> Poll> { + ) -> Poll> { self.poll_new_conn_inner(cx, queued, queue_id) } @@ -282,7 +282,7 @@ impl Pool { cx: &mut Context<'_>, queued: bool, queue_id: QueueId, - ) -> Poll> { + ) -> Poll> { let mut exchange = self.inner.exchange.lock().unwrap(); // NOTE: this load must happen while we hold the lock, @@ -304,16 +304,13 @@ impl Pool { while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() { if !conn.expired() { - return Poll::Ready(Ok(GetConn { - pool: Some(self.clone()), - inner: GetConnInner::Checking( - async move { - conn.stream_mut()?.check().await?; - Ok(conn) - } - .boxed(), - ), - })); + return Poll::Ready(Ok(GetConnInner::Checking( + async move { + conn.stream_mut()?.check().await?; + Ok(conn) + } + .boxed(), + ))); } else { self.send_to_recycler(conn); } @@ -325,10 +322,9 @@ impl Pool { // we are allowed to make a new connection, so we will! exchange.exist += 1; - return Poll::Ready(Ok(GetConn { - pool: Some(self.clone()), - inner: GetConnInner::Connecting(Conn::new(self.opts.clone()).boxed()), - })); + return Poll::Ready(Ok(GetConnInner::Connecting( + Conn::new(self.opts.clone()).boxed(), + ))); } // Polled, but no conn available? Back into the queue. From b1798b797aa00d2eafb3eccd640df0e8265fa1a8 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Sat, 12 Nov 2022 23:19:17 +0300 Subject: [PATCH 4/4] apply suggestion --- src/conn/pool/futures/get_conn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index 3f56d04d..843f8298 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -98,7 +98,7 @@ impl Future for GetConn { match self.inner { GetConnInner::New => { let queued = self.queue_id.is_some(); - let queue_id = *self.queue_id.get_or_insert_with(|| QueueId::next()); + let queue_id = *self.queue_id.get_or_insert_with(QueueId::next); let next = ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?; match next {