Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/conn/pool/futures/disconnect_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures_core::ready;
use tokio::sync::mpsc::UnboundedSender;

use crate::{
conn::pool::{Inner, Pool},
conn::pool::{Inner, Pool, QueuedWaker, QUEUE_END_ID},
error::Error,
Conn,
};
Expand Down Expand Up @@ -50,7 +50,9 @@ impl Future for DisconnectPool {
self.pool_inner.close.store(true, atomic::Ordering::Release);
let mut exchange = self.pool_inner.exchange.lock().unwrap();
exchange.spawn_futures_if_needed(&self.pool_inner);
exchange.waiting.push_back(cx.waker().clone());
exchange
.waiting
.push(QueuedWaker::new(QUEUE_END_ID, cx.waker().clone()));
drop(exchange);

if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
Expand Down
42 changes: 25 additions & 17 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
use futures_core::ready;

use crate::{
conn::{pool::Pool, Conn},
conn::{
pool::{Pool, QueueId},
Conn,
},
error::*,
};

Expand Down Expand Up @@ -58,13 +61,15 @@ impl GetConnInner {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct GetConn {
pub(crate) queue_id: Option<QueueId>,
pub(crate) pool: Option<Pool>,
pub(crate) inner: GetConnInner,
}

impl GetConn {
pub(crate) fn new(pool: &Pool) -> GetConn {
GetConn {
queue_id: None,
pool: Some(pool.clone()),
inner: GetConnInner::New,
}
Expand All @@ -91,23 +96,26 @@ impl Future for GetConn {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.inner {
GetConnInner::New => match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx))?
.inner
.take()
{
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
}
GetConnInner::Checking(conn_fut) => {
self.inner = GetConnInner::Checking(conn_fut);
}
GetConnInner::Done => unreachable!(
"Pool::poll_new_conn never gives out already-consumed GetConns"
),
GetConnInner::New => {
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
GetConnInner::New => {
let queued = self.queue_id.is_some();
let queue_id = *self.queue_id.get_or_insert_with(QueueId::next);
let next =
ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?;
match next {
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
}
GetConnInner::Checking(conn_fut) => {
self.inner = GetConnInner::Checking(conn_fut);
}
GetConnInner::Done => unreachable!(
"Pool::poll_new_conn never gives out already-consumed GetConns"
),
GetConnInner::New => {
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
}
}
},
}
GetConnInner::Done => {
unreachable!("GetConn::poll polled after returning Async::Ready");
}
Expand Down
118 changes: 90 additions & 28 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use futures_util::FutureExt;
use tokio::sync::mpsc;

use std::{
collections::VecDeque,
cmp::{Ordering, Reverse},
collections::{BinaryHeap, VecDeque},
convert::TryFrom,
pin::Pin,
str::FromStr,
Expand Down Expand Up @@ -62,7 +63,7 @@ impl From<Conn> for IdlingConn {
/// This is fine as long as we never do expensive work while holding the lock!
#[derive(Debug)]
struct Exchange {
waiting: VecDeque<Waker>,
waiting: BinaryHeap<QueuedWaker>,
available: VecDeque<IdlingConn>,
exist: usize,
// only used to spawn the recycler the first time we're in async context
Expand All @@ -87,6 +88,51 @@ impl Exchange {
}
}

const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));

#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct QueueId(Reverse<u64>);

impl QueueId {
fn next() -> Self {
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
QueueId(Reverse(id))
}
}

#[derive(Debug)]
struct QueuedWaker {
queue_id: QueueId,
waker: Waker,
}

impl QueuedWaker {
fn new(queue_id: QueueId, waker: Waker) -> Self {
QueuedWaker { queue_id, waker }
}
}

impl Eq for QueuedWaker {}

impl PartialEq for QueuedWaker {
fn eq(&self, other: &Self) -> bool {
self.queue_id == other.queue_id
}
}

impl Ord for QueuedWaker {
fn cmp(&self, other: &Self) -> Ordering {
self.queue_id.cmp(&other.queue_id)
}
}

impl PartialOrd for QueuedWaker {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Connection pool data.
#[derive(Debug)]
pub struct Inner {
Expand Down Expand Up @@ -131,7 +177,7 @@ impl Pool {
closed: false.into(),
exchange: Mutex::new(Exchange {
available: VecDeque::with_capacity(pool_opts.constraints().max()),
waiting: VecDeque::new(),
waiting: BinaryHeap::new(),
exist: 0,
recycler: Some((rx, pool_opts)),
}),
Expand Down Expand Up @@ -181,8 +227,8 @@ impl Pool {
let mut exchange = self.inner.exchange.lock().unwrap();
if exchange.available.len() < self.opts.pool_opts().active_bound() {
exchange.available.push_back(conn.into());
if let Some(w) = exchange.waiting.pop_front() {
w.wake();
if let Some(qw) = exchange.waiting.pop() {
qw.waker.wake();
}
return;
}
Expand Down Expand Up @@ -216,17 +262,27 @@ impl Pool {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.exist -= 1;
// we just enabled the creation of a new connection!
if let Some(w) = exchange.waiting.pop_front() {
w.wake();
if let Some(qw) = exchange.waiting.pop() {
qw.waker.wake();
}
}

/// Poll the pool for an available connection.
fn poll_new_conn(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<GetConn>> {
self.poll_new_conn_inner(cx)
}

fn poll_new_conn_inner(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<GetConn>> {
fn poll_new_conn(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
queued: bool,
queue_id: QueueId,
) -> Poll<Result<GetConnInner>> {
self.poll_new_conn_inner(cx, queued, queue_id)
}

fn poll_new_conn_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
queued: bool,
queue_id: QueueId,
) -> Poll<Result<GetConnInner>> {
let mut exchange = self.inner.exchange.lock().unwrap();

// NOTE: this load must happen while we hold the lock,
Expand All @@ -238,18 +294,23 @@ impl Pool {

exchange.spawn_futures_if_needed(&self.inner);

// Check if others are waiting and we're not queued.
if !exchange.waiting.is_empty() && !queued {
exchange
.waiting
.push(QueuedWaker::new(queue_id, cx.waker().clone()));
return Poll::Pending;
}

while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
if !conn.expired() {
return Poll::Ready(Ok(GetConn {
pool: Some(self.clone()),
inner: GetConnInner::Checking(
async move {
conn.stream_mut()?.check().await?;
Ok(conn)
}
.boxed(),
),
}));
return Poll::Ready(Ok(GetConnInner::Checking(
async move {
conn.stream_mut()?.check().await?;
Ok(conn)
}
.boxed(),
)));
} else {
self.send_to_recycler(conn);
}
Expand All @@ -261,14 +322,15 @@ impl Pool {
// we are allowed to make a new connection, so we will!
exchange.exist += 1;

return Poll::Ready(Ok(GetConn {
pool: Some(self.clone()),
inner: GetConnInner::Connecting(Conn::new(self.opts.clone()).boxed()),
}));
return Poll::Ready(Ok(GetConnInner::Connecting(
Conn::new(self.opts.clone()).boxed(),
)));
}

// no go -- we have to wait
exchange.waiting.push_back(cx.waker().clone());
// Polled, but no conn available? Back into the queue.
exchange
.waiting
.push(QueuedWaker::new(queue_id, cx.waker().clone()));
Poll::Pending
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/conn/pool/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl Future for Recycler {
$self.discard.push($conn.close_conn().boxed());
} else {
exchange.available.push_back($conn.into());
if let Some(w) = exchange.waiting.pop_front() {
w.wake();
if let Some(qw) = exchange.waiting.pop() {
qw.waker.wake();
}
}
}
Expand Down Expand Up @@ -163,8 +163,8 @@ impl Future for Recycler {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.exist -= self.discarded;
for _ in 0..self.discarded {
if let Some(w) = exchange.waiting.pop_front() {
w.wake();
if let Some(qw) = exchange.waiting.pop() {
qw.waker.wake();
}
}
drop(exchange);
Expand Down Expand Up @@ -197,8 +197,8 @@ impl Future for Recycler {
if self.inner.closed.load(Ordering::Acquire) {
// `DisconnectPool` might still wait to be woken up.
let mut exchange = self.inner.exchange.lock().unwrap();
while let Some(w) = exchange.waiting.pop_front() {
w.wake();
while let Some(qw) = exchange.waiting.pop() {
qw.waker.wake();
}
// we're about to exit, so there better be no outstanding connections
assert_eq!(exchange.exist, 0);
Expand Down