From 9ba2a9538dca4c02a43a92c01cc691f5f87a74f7 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 17:45:52 +0300 Subject: [PATCH 01/10] Bump mysql_common version --- Cargo.toml | 7 +++---- src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 87702800..901ed47c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ futures-sink = "0.3" lazy_static = "1" lru = "0.6.0" mio = "0.7.7" -mysql_common = { version = "0.27.2", default-features = false } +mysql_common = { version = "0.28.0", default-features = false } native-tls = "0.2" once_cell = "1.7.2" pem = "0.8.1" @@ -47,10 +47,9 @@ rand = "0.8.0" [features] default = [ "flate2/zlib", - "mysql_common/bigdecimal", - "mysql_common/chrono", + "mysql_common/bigdecimal03", "mysql_common/rust_decimal", - "mysql_common/time", + "mysql_common/time03", "mysql_common/uuid", "mysql_common/frunk", ] diff --git a/src/lib.rs b/src/lib.rs index 545f62f4..3592daf7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,7 +91,7 @@ #[cfg(feature = "nightly")] extern crate test; -pub use mysql_common::{chrono, constants as consts, params, time, uuid}; +pub use mysql_common::{constants as consts, params, time03, uuid}; use std::sync::Arc; From 2751544214c507c43371a78b99d3501fb1490905 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 17:58:11 +0300 Subject: [PATCH 02/10] Fix MariaDb CI --- azure-pipelines.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 62f453ed..f8938577 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -156,6 +156,8 @@ jobs: displayName: Install docker - bash: | git clone https://github.com/blackbeam/rust-mysql-simple.git + cd rust-mysql-simple + git checkout 8d745ee displayName: Clone rust-mysql-simple (for ssl certs) - bash: | docker run --rm -d \ From 9531dc11d0ba22303514a504308398cb970bb253 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 18:01:05 +0300 Subject: [PATCH 03/10] Remove mysql_common deps reexports --- src/lib.rs | 2 +- tests/exports.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3592daf7..f71d3b7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,7 +91,7 @@ #[cfg(feature = "nightly")] extern crate test; -pub use mysql_common::{constants as consts, params, time03, uuid}; +pub use mysql_common::{constants as consts, params}; use std::sync::Arc; diff --git a/tests/exports.rs b/tests/exports.rs index fa75224f..3df07ebc 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -1,15 +1,15 @@ #[allow(unused_imports)] use mysql_async::{ - chrono, consts, from_row, from_row_opt, from_value, from_value_opt, + consts, from_row, from_row_opt, from_value, from_value_opt, futures::{DisconnectPool, GetConn}, params, prelude::{ BatchQuery, ConvIr, FromRow, FromValue, LocalInfileHandler, Protocol, Query, Queryable, StatementLike, ToValue, }, - time, uuid, BinaryProtocol, Column, Conn, Deserialized, DriverError, Error, FromRowError, - FromValueError, IoError, IsolationLevel, Opts, OptsBuilder, Params, ParseError, Pool, - PoolConstraints, PoolOpts, QueryResult, Result, Row, Serialized, ServerError, SslOpts, - Statement, TextProtocol, Transaction, TxOpts, UrlError, Value, WhiteListFsLocalInfileHandler, + BinaryProtocol, Column, Conn, Deserialized, DriverError, Error, FromRowError, FromValueError, + IoError, IsolationLevel, Opts, OptsBuilder, Params, ParseError, Pool, PoolConstraints, + PoolOpts, QueryResult, Result, Row, Serialized, ServerError, SslOpts, Statement, TextProtocol, + Transaction, TxOpts, UrlError, Value, WhiteListFsLocalInfileHandler, DEFAULT_INACTIVE_CONNECTION_TTL, DEFAULT_TTL_CHECK_INTERVAL, }; From 490d053846c165001ee1d248a6b84276451c87e9 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 18:01:33 +0300 Subject: [PATCH 04/10] Bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 901ed47c..4bcfa46d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT/Apache-2.0" name = "mysql_async" readme = "README.md" repository = "https://github.com/blackbeam/mysql_async" -version = "0.28.1" +version = "0.29.0" exclude = ["test/*"] edition = "2018" categories = ["asynchronous", "database"] From 2f19df95bba2d8c4fc39053f73e8985209ca6dc0 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 19:24:36 +0300 Subject: [PATCH 05/10] Update dependencies: lru, mio, pem --- Cargo.toml | 7 ++++--- src/io/mod.rs | 29 ++++++++++++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4bcfa46d..54d6390b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,16 +20,17 @@ futures-core = "0.3" futures-util = "0.3" futures-sink = "0.3" lazy_static = "1" -lru = "0.6.0" -mio = "0.7.7" +lru = "0.7.0" +mio = { version = "0.8.0", features = ["os-poll", "net"] } mysql_common = { version = "0.28.0", default-features = false } native-tls = "0.2" once_cell = "1.7.2" -pem = "0.8.1" +pem = "1.0.1" percent-encoding = "2.1.0" pin-project = "1.0.2" serde = "1" serde_json = "1" +socket2 = "0.4.2" thiserror = "1.0.4" tokio = { version = "1.0", features = ["io-util", "fs", "net", "time", "rt"] } tokio-util = { version = "0.6.0", features = ["codec"] } diff --git a/src/io/mod.rs b/src/io/mod.rs index 11e22be0..2346a696 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -11,10 +11,10 @@ pub use self::{read_packet::ReadPacket, write_packet::WritePacket}; use bytes::BytesMut; use futures_core::{ready, stream}; use futures_util::stream::{FuturesUnordered, StreamExt}; -use mio::net::{TcpKeepalive, TcpSocket}; use mysql_common::proto::codec::PacketCodec as PacketCodecInner; use native_tls::{Certificate, Identity, TlsConnector}; use pin_project::pin_project; +use socket2::{Domain, Protocol, Socket as Socket2Socket, TcpKeepalive, Type}; #[cfg(unix)] use tokio::io::AsyncWriteExt; use tokio::{ @@ -208,6 +208,7 @@ impl Endpoint { .map(|x| vec![x]) .or_else(|_| { pem::parse_many(&*root_cert_data) + .unwrap_or_default() .iter() .map(pem::encode) .map(|s| Certificate::from_pem(s.as_bytes())) @@ -364,20 +365,38 @@ impl Stream { keepalive_opts: Option, ) -> io::Result { let socket = if addr.is_ipv6() { - TcpSocket::new_v6()? + Socket2Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))? } else { - TcpSocket::new_v4()? + Socket2Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))? }; if let Some(keepalive_opts) = keepalive_opts { - socket.set_keepalive_params(keepalive_opts)?; + socket.set_tcp_keepalive(&keepalive_opts)?; } let stream = tokio::task::spawn_blocking(move || { - let mut stream = socket.connect(addr)?; + let addr = addr.into(); + socket.connect(&addr)?; let mut poll = mio::Poll::new()?; let mut events = mio::Events::with_capacity(1024); + socket.set_nonblocking(true)?; + + #[cfg(unix)] + let mut stream = unsafe { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + + let fd = socket.into_raw_fd(); + mio::net::TcpStream::from_raw_fd(fd) + }; + #[cfg(windows)] + let mut stream = unsafe { + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + + let sock = socket.into_raw_socket(); + mio::net::TcpStream::from_raw_socket(sock) + }; + poll.registry() .register(&mut stream, mio::Token(0), mio::Interest::WRITABLE)?; From b238500ed5f377300195774de90eec175437ec13 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 19:29:13 +0300 Subject: [PATCH 06/10] CI: update windows image --- azure-pipelines.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index f8938577..bded7336 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -51,7 +51,7 @@ jobs: - job: "TestBasicWindows" pool: - vmImage: "vs2017-win2016" + vmImage: "windows-2019" strategy: maxParallel: 10 matrix: From 1131cf2b98d763cdbd3641cb523e479018111d7e Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 15 Dec 2021 21:46:04 +0300 Subject: [PATCH 07/10] Simplify Stream::connect_tcp --- src/io/mod.rs | 145 +++++++++++--------------------------------------- src/opts.rs | 18 +------ 2 files changed, 33 insertions(+), 130 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 2346a696..33aac3c2 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,11 +10,10 @@ pub use self::{read_packet::ReadPacket, write_packet::WritePacket}; use bytes::BytesMut; use futures_core::{ready, stream}; -use futures_util::stream::{FuturesUnordered, StreamExt}; use mysql_common::proto::codec::PacketCodec as PacketCodecInner; use native_tls::{Certificate, Identity, TlsConnector}; use pin_project::pin_project; -use socket2::{Domain, Protocol, Socket as Socket2Socket, TcpKeepalive, Type}; +use socket2::{Socket as Socket2Socket, TcpKeepalive}; #[cfg(unix)] use tokio::io::AsyncWriteExt; use tokio::{ @@ -35,14 +34,17 @@ use std::{ Read, }, mem::replace, - net::{SocketAddr, ToSocketAddrs}, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, time::Duration, }; -use crate::{buffer_pool::PooledBuf, error::IoError, opts::SslOpts}; +use crate::{ + buffer_pool::PooledBuf, + error::IoError, + opts::{HostPortOrUrl, SslOpts, DEFAULT_PORT}, +}; #[cfg(unix)] use crate::io::socket::Socket; @@ -355,126 +357,41 @@ impl Stream { } } - pub(crate) async fn connect_tcp(addr: S, keepalive: Option) -> io::Result - where - S: ToSocketAddrs, - { - // TODO: Use tokio to setup keepalive (see tokio-rs/tokio#3082) - async fn connect_stream( - addr: SocketAddr, - keepalive_opts: Option, - ) -> io::Result { - let socket = if addr.is_ipv6() { - Socket2Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))? - } else { - Socket2Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))? - }; - - if let Some(keepalive_opts) = keepalive_opts { - socket.set_tcp_keepalive(&keepalive_opts)?; + pub(crate) async fn connect_tcp( + addr: &HostPortOrUrl, + keepalive: Option, + ) -> io::Result { + let tcp_stream = match addr { + HostPortOrUrl::HostPort(host, port) => { + TcpStream::connect((host.as_str(), *port)).await? } + HostPortOrUrl::Url(url) => { + let addrs = url.socket_addrs(|| Some(DEFAULT_PORT))?; + TcpStream::connect(&*addrs).await? + } + }; - let stream = tokio::task::spawn_blocking(move || { - let addr = addr.into(); - socket.connect(&addr)?; - let mut poll = mio::Poll::new()?; - let mut events = mio::Events::with_capacity(1024); - - socket.set_nonblocking(true)?; - - #[cfg(unix)] - let mut stream = unsafe { - use std::os::unix::io::{FromRawFd, IntoRawFd}; - - let fd = socket.into_raw_fd(); - mio::net::TcpStream::from_raw_fd(fd) - }; - #[cfg(windows)] - let mut stream = unsafe { - use std::os::windows::io::{FromRawSocket, IntoRawSocket}; - - let sock = socket.into_raw_socket(); - mio::net::TcpStream::from_raw_socket(sock) - }; - - poll.registry() - .register(&mut stream, mio::Token(0), mio::Interest::WRITABLE)?; - - loop { - poll.poll(&mut events, None)?; - - for event in &events { - if event.token() == mio::Token(0) && event.is_error() { - return Err(io::Error::new( - io::ErrorKind::ConnectionRefused, - "Connection refused", - )); - } - - if event.token() == mio::Token(0) && event.is_writable() { - // The socket connected (probably, it could still be a spurious - // wakeup) - return Ok::<_, io::Error>(stream); - } - } - } - }) - .await??; - + if let Some(duration) = keepalive { #[cfg(unix)] - let std_stream = unsafe { + let socket = unsafe { use std::os::unix::prelude::*; - let fd = stream.into_raw_fd(); - std::net::TcpStream::from_raw_fd(fd) + let fd = tcp_stream.as_raw_fd(); + Socket2Socket::from_raw_fd(fd) }; - #[cfg(windows)] - let std_stream = unsafe { + let socket = unsafe { use std::os::windows::prelude::*; - let fd = stream.into_raw_socket(); - std::net::TcpStream::from_raw_socket(fd) + let sock = tcp_stream.as_raw_socket(); + Socket2Socket::from_raw_socket(sock) }; - - Ok(TcpStream::from_std(std_stream)?) + socket.set_tcp_keepalive(&TcpKeepalive::new().with_time(duration))?; + std::mem::forget(socket); } - let keepalive_opts = keepalive.map(|time| TcpKeepalive::new().with_time(time)); - - match addr.to_socket_addrs() { - Ok(addresses) => { - let mut streams = FuturesUnordered::new(); - - for address in addresses { - streams.push(connect_stream(address, keepalive_opts.clone())); - } - - let mut err = None; - while let Some(stream) = streams.next().await { - match stream { - Err(e) => { - err = Some(e); - } - Ok(stream) => { - return Ok(Stream { - closed: false, - codec: Box::new(Framed::new(stream.into(), PacketCodec::default())) - .into(), - }); - } - } - } - - if let Some(e) = err { - Err(e) - } else { - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any address", - )) - } - } - Err(err) => Err(err), - } + Ok(Stream { + closed: false, + codec: Box::new(Framed::new(tcp_stream.into(), PacketCodec::default())).into(), + }) } #[cfg(unix)] diff --git a/src/opts.rs b/src/opts.rs index 2f2e2c8d..e2c4de4f 100644 --- a/src/opts.rs +++ b/src/opts.rs @@ -12,8 +12,7 @@ use url::{Host, Url}; use std::{ borrow::Cow, convert::TryFrom, - io, - net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, + net::{Ipv4Addr, Ipv6Addr}, path::Path, str::FromStr, sync::Arc, @@ -40,7 +39,7 @@ const_assert!( pub const DEFAULT_STMT_CACHE_SIZE: usize = 32; /// Default server port. -const DEFAULT_PORT: u16 = 3306; +pub const DEFAULT_PORT: u16 = 3306; /// Default `inactive_connection_ttl` of a pool. /// @@ -67,19 +66,6 @@ impl Default for HostPortOrUrl { } } -impl ToSocketAddrs for HostPortOrUrl { - type Iter = vec::IntoIter; - - fn to_socket_addrs(&self) -> io::Result> { - let res = match self { - Self::Url(url) => url.socket_addrs(|| Some(DEFAULT_PORT))?.into_iter(), - Self::HostPort(host, port) => (host.as_ref(), *port).to_socket_addrs()?, - }; - - Ok(res) - } -} - impl HostPortOrUrl { pub fn get_ip_or_hostname(&self) -> &str { match self { From 428d3a42d1ab23b1b8d49d134893e6c3577aa527 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Thu, 16 Dec 2021 13:54:19 +0300 Subject: [PATCH 08/10] ci: Add MariaDb 10.6 and 10.7 --- azure-pipelines.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index bded7336..fd177e5a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -136,6 +136,10 @@ jobs: strategy: maxParallel: 10 matrix: + v107: + DB_VERSION: "10.7" + v106: + DB_VERSION: "10.6" v105: DB_VERSION: "10.5" v104: @@ -164,7 +168,7 @@ jobs: --name container \ -v `pwd`:/root \ -p 3307:3306 \ - -e MYSQL_ROOT_PASSWORD=password \ + -e MARIADB_ROOT_PASSWORD=password \ mariadb:$(DB_VERSION) \ --max-allowed-packet=36700160 \ --local-infile \ From 1949a3fce9e882137a85b70988afa645c0f9a9ae Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Fri, 17 Dec 2021 12:42:42 +0300 Subject: [PATCH 09/10] Update nonce handling for mysql_common 0.28 --- src/conn/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 3b828862..872a34f4 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -429,9 +429,14 @@ impl Conn { async fn handle_handshake(&mut self) -> Result<()> { let packet = self.read_packet().await?; let handshake = ParseBuf(&*packet).parse::(())?; + + // Handshake scramble is always 21 bytes length (20 + zero terminator) self.inner.nonce = { let mut nonce = Vec::from(handshake.scramble_1_ref()); nonce.extend_from_slice(handshake.scramble_2_ref().unwrap_or(&[][..])); + // Trim zero terminator. Fill with zeroes if nonce + // is somehow smaller than 20 bytes (this matches the server behavior). + nonce.resize(20, 0); nonce }; From df6328f9f118405ec47457b5ce0bedb392312281 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Fri, 17 Dec 2021 13:22:30 +0300 Subject: [PATCH 10/10] ci: Fix container environment for MariaDb 10.1 --- azure-pipelines.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index fd177e5a..fe273551 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -169,6 +169,7 @@ jobs: -v `pwd`:/root \ -p 3307:3306 \ -e MARIADB_ROOT_PASSWORD=password \ + -e MYSQL_ROOT_PASSWORD=password \ mariadb:$(DB_VERSION) \ --max-allowed-packet=36700160 \ --local-infile \