Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ license = "MIT/Apache-2.0"
name = "mysql_async"
readme = "README.md"
repository = "https://github.com/blackbeam/mysql_async"
version = "0.27.1"
version = "0.28.0"
exclude = ["test/*"]
edition = "2018"
categories = ["asynchronous", "database"]

[dependencies]
bytes = "1.0"
flate2 = { version = "1.0", default-features = false }
futures-core = "0.3"
futures-util = "0.3"
futures-sink = "0.3"
lazy_static = "1"
lru = "0.6.0"
mio = "0.7.7"
mysql_common = "0.26.0"
mysql_common = { version = "0.27.2", default-features = false }
native-tls = "0.2"
once_cell = "1.7.2"
pem = "0.8.1"
percent-encoding = "2.1.0"
pin-project = "1.0.2"
Expand All @@ -36,11 +39,20 @@ uuid = { version = "0.8.1", features = ["v4"] }

[dev-dependencies]
tempfile = "3.1.0"
socket2 = "0.3.17"
socket2 = { version = "0.4.0", features = ["all"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
rand = "0.8.0"

[features]
default = [
"flate2/zlib",
"mysql_common/bigdecimal",
"mysql_common/chrono",
"mysql_common/rust_decimal",
"mysql_common/time",
"mysql_common/uuid",
"mysql_common/frunk",
]
nightly = []

[lib]
Expand Down
56 changes: 19 additions & 37 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ jobs:
sudo apt-get -y install mysql-server libmysqlclient-dev curl
sudo service mysql start
mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot -proot
mysql -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = WARN;" -uroot -proot
mysql -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;" -uroot -proot
mysql -e "SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;" -uroot -proot
mysql -e "SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;" -uroot -proot
mysql -e "SET @@GLOBAL.GTID_MODE = ON;" -uroot -proot
mysql -e "PURGE BINARY LOGS BEFORE now();" -uroot -proot
displayName: Install MySql
- bash: |
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $(RUST_TOOLCHAIN)
Expand All @@ -43,40 +49,6 @@ jobs:
DATABASE_URL: mysql://root:root@127.0.0.1:3306/mysql
displayName: Run tests

# - job: "TestBasicMacOs"
# pool:
# vmImage: "macOS-10.15"
# strategy:
# maxParallel: 10
# matrix:
# stable:
# RUST_TOOLCHAIN: stable
# steps:
# - bash: |
# brew update
# brew install mysql
# brew services start mysql
# brew services stop mysql
# sleep 3
# echo 'local_infile=1' >> /usr/local/etc/my.cnf
# echo 'socket=/tmp/mysql.sock' >> /usr/local/etc/my.cnf
# brew services start mysql
# sleep 5
# /usr/local/Cellar/mysql/*/bin/mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot
# displayName: Install MySql
# - bash: |
# curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUST_TOOLCHAIN
# displayName: Install rust (MacOs)
# - bash: |
# SSL=false COMPRESS=false cargo test
# SSL=true COMPRESS=false cargo test
# SSL=false COMPRESS=true cargo test
# SSL=true COMPRESS=true cargo test
# env:
# RUST_BACKTRACE: 1
# DATABASE_URL: mysql://root@127.0.0.1/mysql
# displayName: Run tests

- job: "TestBasicWindows"
pool:
vmImage: "vs2017-win2016"
Expand All @@ -95,6 +67,11 @@ jobs:
call "C:\Program Files (x86)\MySQL\MySQL Installer for Windows\MySQLInstallerConsole.exe" community install server;8.0.11;x64:*:port=3306;rootpasswd=password;servicename=MySQL -silent
netsh advfirewall firewall add rule name="Allow mysql" dir=in action=allow edge=yes remoteip=any protocol=TCP localport=80,8080,3306
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot -ppassword
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = WARN;" -uroot -ppassword
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;" -uroot -ppassword
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;" -uroot -ppassword
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;" -uroot -ppassword
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = ON;" -uroot -ppassword
displayName: Install MySql
- bash: |
rustup install $RUST_TOOLCHAIN
Expand Down Expand Up @@ -130,16 +107,20 @@ jobs:
docker --version
displayName: Install docker
- bash: |
docker run --rm --name container -v `pwd`:/root -p 3307:3306 -d -e MYSQL_ROOT_PASSWORD=password mysql:$(DB_VERSION) --max-allowed-packet=36700160 --local-infile
if [[ "5.6" == "$(DB_VERSION)" ]]; then ARG="--secure-auth=OFF"; fi
docker run -d --name container -v `pwd`:/root -p 3307:3306 -e MYSQL_ROOT_PASSWORD=password mysql:$(DB_VERSION) --max-allowed-packet=36700160 --local-infile --log-bin=mysql-bin --log-slave-updates --gtid_mode=ON --enforce_gtid_consistency=ON --server-id=1 $ARG
while ! nc -W 1 localhost 3307 | grep -q -P '.+'; do sleep 1; done
displayName: Run MySql in Docker
- bash: |
docker exec container bash -l -c "mysql -uroot -ppassword -e \"SET old_passwords = 1; GRANT ALL PRIVILEGES ON *.* TO 'root2'@'%' IDENTIFIED WITH mysql_old_password AS 'password'; SET PASSWORD FOR 'root2'@'%' = OLD_PASSWORD('password')\"";
condition: eq(variables['DB_VERSION'], '5.6')
- bash: |
docker exec container bash -l -c "apt-get update"
docker exec container bash -l -c "apt-get install -y curl clang libssl-dev pkg-config"
docker exec container bash -l -c "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable"
displayName: Install Rust in docker
- bash: |
if [[ "5.6" != "$(DB_VERSION)" ]]; then SSL=true; fi
if [[ "5.6" != "$(DB_VERSION)" ]]; then SSL=true; else DATABASE_URL="mysql://root2:password@127.0.0.1/mysql?secure_auth=false"; fi
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL cargo test"
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL COMPRESS=true cargo test"
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL SSL=$SSL cargo test"
Expand Down Expand Up @@ -186,10 +167,11 @@ jobs:
--max-allowed-packet=36700160 \
--local-infile \
--performance-schema=on \
--log-bin=mysql-bin --gtid-domain-id=1 --server-id=1 \
--ssl \
--ssl-ca=/root/rust-mysql-simple/tests/ca-cert.pem \
--ssl-cert=/root/rust-mysql-simple/tests/server-cert.pem \
--ssl-key=/root/rust-mysql-simple/tests/server-key.pem
--ssl-key=/root/rust-mysql-simple/tests/server-key.pem &
while ! nc -W 1 localhost 3307 | grep -q -P '.+'; do sleep 1; done
displayName: Run MariaDb in Docker
- bash: |
Expand Down
103 changes: 103 additions & 0 deletions src/buffer_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2021 Anatoly Ikorsky
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use std::{
mem::replace,
ops::Deref,
sync::{Arc, Mutex},
};

#[derive(Debug)]
pub struct BufferPool {
pool_cap: usize,
buffer_cap: usize,
pool: Mutex<Vec<Vec<u8>>>,
}

impl BufferPool {
pub fn new() -> Self {
let pool_cap = std::env::var("MYSQL_ASYNC_BUFFER_POOL_CAP")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(128_usize);

let buffer_cap = std::env::var("MYSQL_ASYNC_BUFFER_SIZE_CAP")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(4 * 1024 * 1024);

Self {
pool: Default::default(),
pool_cap,
buffer_cap,
}
}

pub fn get(self: &Arc<Self>) -> PooledBuf {
let mut buf = self.pool.lock().unwrap().pop().unwrap_or_default();

// SAFETY:
// 1. OK – 0 is always within capacity
// 2. OK - nothing to initialize
unsafe { buf.set_len(0) }

PooledBuf(buf, self.clone())
}

pub fn get_with<T: AsRef<[u8]>>(self: &Arc<Self>, content: T) -> PooledBuf {
let mut buf = self.get();
buf.as_mut().extend_from_slice(content.as_ref());
buf
}

fn put(self: &Arc<Self>, mut buf: Vec<u8>) {
if buf.len() > self.buffer_cap {
// TODO: until `Vec::shrink_to` stabilization

// SAFETY:
// 1. OK – new_len <= capacity
// 2. OK - 0..new_len is initialized
unsafe { buf.set_len(self.buffer_cap) }
buf.shrink_to_fit();
}

let mut pool = self.pool.lock().unwrap();
if pool.len() < self.pool_cap {
pool.push(buf);
}
}
}

impl Default for BufferPool {
fn default() -> Self {
Self::new()
}
}

#[derive(Debug)]
pub struct PooledBuf(Vec<u8>, Arc<BufferPool>);

impl AsMut<Vec<u8>> for PooledBuf {
fn as_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}
}

impl Deref for PooledBuf {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.0.deref()
}
}

impl Drop for PooledBuf {
fn drop(&mut self) {
self.1.put(replace(&mut self.0, vec![]))
}
}
96 changes: 96 additions & 0 deletions src/conn/binlog_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2020 Anatoly Ikorsky
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use futures_core::ready;
use mysql_common::{
binlog::{
consts::BinlogVersion::Version4,
events::{Event, TableMapEvent},
EventStreamReader,
},
io::ParseBuf,
packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
};

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use crate::{error::DriverError, io::ReadPacket, Conn, Result};

/// Binlog event stream.
///
/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
pub struct BinlogStream {
read_packet: ReadPacket<'static, 'static>,
esr: EventStreamReader,
}

impl BinlogStream {
/// `conn` is a `Conn` with `request_binlog` executed on it.
pub(super) fn new(conn: Conn) -> Self {
BinlogStream {
read_packet: ReadPacket::new(conn),
esr: EventStreamReader::new(Version4),
}
}

/// Returns a table map event for the given table id.
pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
self.esr.get_tme(table_id)
}
}

impl futures_core::stream::Stream for BinlogStream {
type Item = Result<Event>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
Ok(packet) => packet,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};

let first_byte = packet.get(0).copied();

if first_byte == Some(255) {
if let Ok(ErrPacket::Error(err)) =
ParseBuf(&*packet).parse(self.read_packet.conn_ref().capabilities())
{
return Poll::Ready(Some(Err(From::from(err))));
}
}

if first_byte == Some(254) && packet.len() < 8 {
if ParseBuf(&*packet)
.parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
self.read_packet.conn_ref().capabilities(),
)
.is_ok()
{
return Poll::Ready(None);
}
}

if first_byte == Some(0) {
let event_data = &packet[1..];
match self.esr.read(event_data) {
Ok(event) => {
return Poll::Ready(Some(Ok(event)));
}
Err(err) => return Poll::Ready(Some(Err(err.into()))),
}
} else {
return Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
payload: packet.to_vec(),
}
.into())));
}
}
}
Loading