Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/conn/binlog_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use mysql_common::{

use std::{
future::Future,
io::ErrorKind,
pin::Pin,
task::{Context, Poll},
};

use crate::{error::DriverError, io::ReadPacket, Conn, Result};
use crate::connection_like::Connection;
use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};

/// Binlog event stream.
///
Expand All @@ -46,6 +48,28 @@ impl BinlogStream {
pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
self.esr.get_tme(table_id)
}

/// Closes the stream's `Conn`. Additionally, the connection is dropped, so its associated
/// pool (if any) will regain a connection slot.
pub async fn close(self) -> Result<()> {
match self.read_packet.0 {
// `close_conn` requires ownership of `Conn`. That's okay, because
// `BinLogStream`'s connection is always owned.
Connection::Conn(conn) => {
if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
// If the binlog was requested with the flag BINLOG_DUMP_NON_BLOCK,
// the connection's file handler will already have been closed (EOF).
if error.kind() == ErrorKind::BrokenPipe {
return Ok(());
}
}
}
Connection::ConnMut(_) => {}
Connection::Tx(_) => {}
}

Ok(())
}
}

impl futures_core::stream::Stream for BinlogStream {
Expand Down
100 changes: 70 additions & 30 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,6 @@ impl Conn {
}

pub async fn get_binlog_stream(mut self, request: BinlogRequest<'_>) -> Result<BinlogStream> {
// We'll disconnect this connection from a pool before requesting the binlog.
self.inner.pool = None;
self.request_binlog(request).await?;

Ok(BinlogStream::new(self))
Expand Down Expand Up @@ -1072,42 +1070,72 @@ mod test {
Ok(())
}

async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec<u8>, u64)> {
let mut conn = match pool {
None => Conn::new(get_opts()).await.unwrap(),
Some(pool) => pool.get_conn().await.unwrap(),
};

if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
.first::<String, _>(&mut conn)
.await
{
if !gtid_mode.starts_with("ON") {
panic!(
"GTID_MODE is disabled \
(enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
);
}
}

let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap();
let filename = row.get(0).unwrap();
let position = row.get(1).unwrap();

gen_dummy_data().await.unwrap();
Ok((conn, filename, position))
}

#[tokio::test]
async fn should_read_binlog() -> super::Result<()> {
async fn get_conn() -> super::Result<(Conn, Vec<u8>, u64)> {
let mut conn = Conn::new(get_opts()).await?;
read_binlog_streams_and_close_their_connections(None, (12, 13, 14))
.await
.unwrap();

if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
.first::<String, _>(&mut conn)
.await
{
if !gtid_mode.starts_with("ON") {
panic!(
"GTID_MODE is disabled \
(enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
);
}
}
let pool = Pool::new(get_opts());
read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17))
.await
.unwrap();

let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await?.unwrap();
let filename = row.get(0).unwrap();
let position = row.get(1).unwrap();
// Disconnecting the pool verifies that closing the binlog connections
// left the pool in a sane state.
timeout(Duration::from_secs(10), pool.disconnect())
.await
.unwrap()
.unwrap();

gen_dummy_data().await.unwrap();
Ok((conn, filename, position))
}
Ok(())
}

async fn read_binlog_streams_and_close_their_connections(
pool: Option<&Pool>,
binlog_server_ids: (u32, u32, u32),
) -> super::Result<()> {
// iterate using COM_BINLOG_DUMP
let (conn, filename, pos) = get_conn().await.unwrap();
let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
let is_mariadb = conn.inner.is_mariadb;

let mut binlog_stream = conn
.get_binlog_stream(BinlogRequest::new(12).with_filename(filename).with_pos(pos))
.get_binlog_stream(
BinlogRequest::new(binlog_server_ids.0)
.with_filename(filename)
.with_pos(pos),
)
.await
.unwrap();

let mut events_num = 0;
while let Ok(Some(event)) = timeout(Duration::from_secs(1), binlog_stream.next()).await {
while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await {
let event = event.unwrap();
events_num += 1;

Expand All @@ -1126,14 +1154,18 @@ mod test {
}
}
assert!(events_num > 0);
timeout(Duration::from_secs(10), binlog_stream.close())
.await
.unwrap()
.unwrap();

if !is_mariadb {
// iterate using COM_BINLOG_DUMP_GTID
let (conn, filename, pos) = get_conn().await.unwrap();
let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();

let mut binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(13)
BinlogRequest::new(binlog_server_ids.1)
.with_use_gtid(true)
.with_filename(filename)
.with_pos(pos),
Expand All @@ -1142,7 +1174,7 @@ mod test {
.unwrap();

events_num = 0;
while let Ok(Some(event)) = timeout(Duration::from_secs(1), binlog_stream.next()).await
while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await
{
let event = event.unwrap();
events_num += 1;
Expand All @@ -1162,14 +1194,18 @@ mod test {
}
}
assert!(events_num > 0);
timeout(Duration::from_secs(10), binlog_stream.close())
.await
.unwrap()
.unwrap();
}

// iterate using COM_BINLOG_DUMP with BINLOG_DUMP_NON_BLOCK flag
let (conn, filename, pos) = get_conn().await.unwrap();
let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();

let mut binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(14)
BinlogRequest::new(binlog_server_ids.2)
.with_filename(filename)
.with_pos(pos)
.with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK),
Expand All @@ -1182,9 +1218,13 @@ mod test {
let event = event.unwrap();
events_num += 1;
event.header().event_type().unwrap();
event.read_data()?;
event.read_data().unwrap();
}
assert!(events_num > 0);
timeout(Duration::from_secs(10), binlog_stream.close())
.await
.unwrap()
.unwrap();

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/read_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{buffer_pool::PooledBuf, connection_like::Connection, error::IoError,
/// Reads a packet.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadPacket<'a, 't>(Connection<'a, 't>);
pub struct ReadPacket<'a, 't>(pub(crate) Connection<'a, 't>);

impl<'a, 't> ReadPacket<'a, 't> {
pub(crate) fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
Expand Down