From 41f2440a6bc732728a9cf19817df0641991b765f Mon Sep 17 00:00:00 2001 From: galeaspablo Date: Mon, 23 May 2022 02:15:08 +0100 Subject: [PATCH] BinlogStream's Conn can be closed and returned to Pool --- src/conn/binlog_stream.rs | 26 +++++++++- src/conn/mod.rs | 100 ++++++++++++++++++++++++++------------ src/io/read_packet.rs | 2 +- 3 files changed, 96 insertions(+), 32 deletions(-) diff --git a/src/conn/binlog_stream.rs b/src/conn/binlog_stream.rs index f6ffab13..6aca3305 100644 --- a/src/conn/binlog_stream.rs +++ b/src/conn/binlog_stream.rs @@ -19,11 +19,13 @@ use mysql_common::{ use std::{ future::Future, + io::ErrorKind, pin::Pin, task::{Context, Poll}, }; -use crate::{error::DriverError, io::ReadPacket, Conn, Result}; +use crate::connection_like::Connection; +use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result}; /// Binlog event stream. /// @@ -46,6 +48,28 @@ impl BinlogStream { pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> { self.esr.get_tme(table_id) } + + /// Closes the stream's `Conn`. Additionally, the connection is dropped, so its associated + /// pool (if any) will regain a connection slot. + pub async fn close(self) -> Result<()> { + match self.read_packet.0 { + // `close_conn` requires ownership of `Conn`. That's okay, because + // `BinLogStream`'s connection is always owned. + Connection::Conn(conn) => { + if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await { + // If the binlog was requested with the flag BINLOG_DUMP_NON_BLOCK, + // the connection's file handler will already have been closed (EOF). + if error.kind() == ErrorKind::BrokenPipe { + return Ok(()); + } + } + } + Connection::ConnMut(_) => {} + Connection::Tx(_) => {} + } + + Ok(()) + } } impl futures_core::stream::Stream for BinlogStream { diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 436afff6..0ecd969b 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -1031,8 +1031,6 @@ impl Conn { } pub async fn get_binlog_stream(mut self, request: BinlogRequest<'_>) -> Result { - // We'll disconnect this connection from a pool before requesting the binlog. - self.inner.pool = None; self.request_binlog(request).await?; Ok(BinlogStream::new(self)) @@ -1072,42 +1070,72 @@ mod test { Ok(()) } + async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec, u64)> { + let mut conn = match pool { + None => Conn::new(get_opts()).await.unwrap(), + Some(pool) => pool.get_conn().await.unwrap(), + }; + + if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE" + .first::(&mut conn) + .await + { + if !gtid_mode.starts_with("ON") { + panic!( + "GTID_MODE is disabled \ + (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)" + ); + } + } + + let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap(); + let filename = row.get(0).unwrap(); + let position = row.get(1).unwrap(); + + gen_dummy_data().await.unwrap(); + Ok((conn, filename, position)) + } + #[tokio::test] async fn should_read_binlog() -> super::Result<()> { - async fn get_conn() -> super::Result<(Conn, Vec, u64)> { - let mut conn = Conn::new(get_opts()).await?; + read_binlog_streams_and_close_their_connections(None, (12, 13, 14)) + .await + .unwrap(); - if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE" - .first::(&mut conn) - .await - { - if !gtid_mode.starts_with("ON") { - panic!( - "GTID_MODE is disabled \ - (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)" - ); - } - } + let pool = Pool::new(get_opts()); + read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17)) + .await + .unwrap(); - let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await?.unwrap(); - let filename = row.get(0).unwrap(); - let position = row.get(1).unwrap(); + // Disconnecting the pool verifies that closing the binlog connections + // left the pool in a sane state. + timeout(Duration::from_secs(10), pool.disconnect()) + .await + .unwrap() + .unwrap(); - gen_dummy_data().await.unwrap(); - Ok((conn, filename, position)) - } + Ok(()) + } + async fn read_binlog_streams_and_close_their_connections( + pool: Option<&Pool>, + binlog_server_ids: (u32, u32, u32), + ) -> super::Result<()> { // iterate using COM_BINLOG_DUMP - let (conn, filename, pos) = get_conn().await.unwrap(); + let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap(); let is_mariadb = conn.inner.is_mariadb; let mut binlog_stream = conn - .get_binlog_stream(BinlogRequest::new(12).with_filename(filename).with_pos(pos)) + .get_binlog_stream( + BinlogRequest::new(binlog_server_ids.0) + .with_filename(filename) + .with_pos(pos), + ) .await .unwrap(); let mut events_num = 0; - while let Ok(Some(event)) = timeout(Duration::from_secs(1), binlog_stream.next()).await { + while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await { let event = event.unwrap(); events_num += 1; @@ -1126,14 +1154,18 @@ mod test { } } assert!(events_num > 0); + timeout(Duration::from_secs(10), binlog_stream.close()) + .await + .unwrap() + .unwrap(); if !is_mariadb { // iterate using COM_BINLOG_DUMP_GTID - let (conn, filename, pos) = get_conn().await.unwrap(); + let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap(); let mut binlog_stream = conn .get_binlog_stream( - BinlogRequest::new(13) + BinlogRequest::new(binlog_server_ids.1) .with_use_gtid(true) .with_filename(filename) .with_pos(pos), @@ -1142,7 +1174,7 @@ mod test { .unwrap(); events_num = 0; - while let Ok(Some(event)) = timeout(Duration::from_secs(1), binlog_stream.next()).await + while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await { let event = event.unwrap(); events_num += 1; @@ -1162,14 +1194,18 @@ mod test { } } assert!(events_num > 0); + timeout(Duration::from_secs(10), binlog_stream.close()) + .await + .unwrap() + .unwrap(); } // iterate using COM_BINLOG_DUMP with BINLOG_DUMP_NON_BLOCK flag - let (conn, filename, pos) = get_conn().await.unwrap(); + let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap(); let mut binlog_stream = conn .get_binlog_stream( - BinlogRequest::new(14) + BinlogRequest::new(binlog_server_ids.2) .with_filename(filename) .with_pos(pos) .with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK), @@ -1182,9 +1218,13 @@ mod test { let event = event.unwrap(); events_num += 1; event.header().event_type().unwrap(); - event.read_data()?; + event.read_data().unwrap(); } assert!(events_num > 0); + timeout(Duration::from_secs(10), binlog_stream.close()) + .await + .unwrap() + .unwrap(); Ok(()) } diff --git a/src/io/read_packet.rs b/src/io/read_packet.rs index 9c69e15e..7e14fca0 100644 --- a/src/io/read_packet.rs +++ b/src/io/read_packet.rs @@ -20,7 +20,7 @@ use crate::{buffer_pool::PooledBuf, connection_like::Connection, error::IoError, /// Reads a packet. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReadPacket<'a, 't>(Connection<'a, 't>); +pub struct ReadPacket<'a, 't>(pub(crate) Connection<'a, 't>); impl<'a, 't> ReadPacket<'a, 't> { pub(crate) fn new>>(conn: T) -> Self {