From f246cd73b16870a1ff01a7e9ca7a2c28a741f40a Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 1 Jun 2022 12:04:09 +0300 Subject: [PATCH] Resolve DisconnectPool in case of dead Recycler (fix #199) --- src/conn/pool/futures/disconnect_pool.rs | 23 +++++++++++++++++++++-- src/conn/pool/mod.rs | 9 --------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index 4d6616b9..c409e18e 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -12,9 +12,13 @@ use std::{ task::{Context, Poll}, }; +use futures_core::ready; +use tokio::sync::mpsc::UnboundedSender; + use crate::{ conn::pool::{Inner, Pool}, error::Error, + Conn, }; use std::sync::{atomic, Arc}; @@ -27,12 +31,14 @@ use std::sync::{atomic, Arc}; #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DisconnectPool { pool_inner: Arc, + drop: Option>>, } impl DisconnectPool { pub(crate) fn new(pool: Pool) -> Self { Self { pool_inner: pool.inner, + drop: Some(pool.drop), } } } @@ -40,7 +46,8 @@ impl DisconnectPool { impl Future for DisconnectPool { type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.pool_inner.close.store(true, atomic::Ordering::Release); let mut exchange = self.pool_inner.exchange.lock().unwrap(); exchange.spawn_futures_if_needed(&self.pool_inner); exchange.waiting.push_back(cx.waker().clone()); @@ -49,7 +56,19 @@ impl Future for DisconnectPool { if self.pool_inner.closed.load(atomic::Ordering::Acquire) { Poll::Ready(Ok(())) } else { - Poll::Pending + match self.drop.take() { + Some(drop) => match drop.send(None) { + Ok(_) => { + // Recycler is alive. Waiting for it to finish. + Poll::Ready(Ok(ready!(Box::pin(drop.closed()).as_mut().poll(cx)))) + } + Err(_) => { + // Recycler seem dead. No one will wake us. + Poll::Ready(Ok(())) + } + }, + None => Poll::Pending, + } } } } diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index ffbc2b89..d7a8ea10 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -162,15 +162,6 @@ impl Pool { /// **Note:** This Future won't resolve until all active connections, taken from it, /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. pub fn disconnect(self) -> DisconnectPool { - let was_closed = self.inner.close.swap(true, atomic::Ordering::AcqRel); - if !was_closed { - // make sure we wake up the Recycler. - // - // note the lack of an .expect() here, because the Recycler may decide that there are - // no connections to wait for and exit quickly! - let _ = self.drop.send(None).is_ok(); - } - DisconnectPool::new(self) }