Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ serde_json = "1"
socket2 = "0.4.2"
thiserror = "1.0.4"
tokio = { version = "1.0", features = ["io-util", "fs", "net", "time", "rt"] }
tokio-util = { version = "0.6.0", features = ["codec"] }
tokio-util = { version = "0.6.0", features = ["codec", "io"] }
tokio-native-tls = "0.3.0"
twox-hash = "1"
url = "2.1"
Expand Down
187 changes: 174 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,196 @@
## mysql-async

Tokio based asynchronous MySql client library for The Rust Programming Language.

[![Gitter](https://badges.gitter.im/rust-mysql/community.svg)](https://gitter.im/rust-mysql/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)

[![Build Status](https://dev.azure.com/aikorsky/mysql%20Rust/_apis/build/status/blackbeam.mysql_async?branchName=master)](https://dev.azure.com/aikorsky/mysql%20Rust/_build/latest?definitionId=2&branchName=master)
[![](https://meritbadge.herokuapp.com/mysql_async)](https://crates.io/crates/mysql_async)
[![](https://img.shields.io/crates/d/mysql_async.svg)](https://crates.io/crates/mysql_async)
[![API Documentation on docs.rs](https://docs.rs/mysql_async/badge.svg)](https://docs.rs/mysql_async)

### Installation
# mysql_async

Tokio based asynchronous MySql client library for The Rust Programming Language.

## Installation

The library is hosted on [crates.io](https://crates.io/crates/mysql_async/).

```toml
[dependencies]
mysql_async = "<desired version>"
```

### Example
## Example

```rust
use mysql_async::prelude::*;

#[derive(Debug, PartialEq, Eq, Clone)]
struct Payment {
customer_id: i32,
amount: i32,
account_name: Option<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
let payments = vec![
Payment { customer_id: 1, amount: 2, account_name: None },
Payment { customer_id: 3, amount: 4, account_name: Some("foo".into()) },
Payment { customer_id: 5, amount: 6, account_name: None },
Payment { customer_id: 7, amount: 8, account_name: None },
Payment { customer_id: 9, amount: 10, account_name: Some("bar".into()) },
];

let database_url = /* ... */
# get_opts();

let pool = mysql_async::Pool::new(database_url);
let mut conn = pool.get_conn().await?;

// Create a temporary table
r"CREATE TEMPORARY TABLE payment (
customer_id int not null,
amount int not null,
account_name text
)".ignore(&mut conn).await?;

// Save payments
r"INSERT INTO payment (customer_id, amount, account_name)
VALUES (:customer_id, :amount, :account_name)"
.with(payments.iter().map(|payment| params! {
"customer_id" => payment.customer_id,
"amount" => payment.amount,
"account_name" => payment.account_name.as_ref(),
}))
.batch(&mut conn)
.await?;

// Load payments from the database. Type inference will work here.
let loaded_payments = "SELECT customer_id, amount, account_name FROM payment"
.with(())
.map(&mut conn, |(customer_id, amount, account_name)| Payment { customer_id, amount, account_name })
.await?;

// Dropped connection will go to the pool
drop(conn);

// The Pool must be disconnected explicitly because
// it's an asynchronous operation.
pool.disconnect().await?;

assert_eq!(loaded_payments, payments);

// the async fn returns Result, so
Ok(())
}
```

## LOCAL INFILE Handlers

**Warning:** You should be aware of [Security Considerations for LOAD DATA LOCAL][1].

There are two flavors of LOCAL INFILE handlers – _global_ and _local_.

I case of a LOCAL INFILE request from the server the driver will try to find a handler for it:

1. It'll try to use _local_ handler installed on the connection, if any;
2. It'll try to use _global_ handler, specified via [`OptsBuilder::local_infile_handler`],
if any;
3. It will emit [`LocalInfileError::NoHandler`] if no handlers found.

The purpose of a handler (_local_ or _global_) is to return [`InfileData`].

### _Global_ LOCAL INFILE handler

See [`prelude::GlobalHandler`].

Simply speaking the _global_ handler is an async function that takes a file name (as `&[u8]`)
and returns `Result<InfileData>`.

You can set it up using [`OptsBuilder::local_infile_handler`]. Server will use it if there is no
_local_ handler installed for the connection. This handler might be called multiple times.

Examles:

1. [`WhiteListFsHandler`] is a _global_ handler.
2. Every `T: Fn(&[u8]) -> BoxFuture<'static, Result<InfileData, LocalInfileError>>`
is a _global_ handler.

### _Local_ LOCAL INFILE handler.

Simply speaking the _local_ handler is a future, that returns `Result<InfileData>`.

This is a one-time handler – it's consumed after use. You can set it up using
[`Conn::set_infile_handler`]. This handler have priority over _global_ handler.

Worth noting:

1. `impl Drop for Conn` will clear _local_ handler, i.e. handler will be removed when
connection is returned to a `Pool`.
2. [`Conn::reset`] will clear _local_ handler.

Example:

```rust
#
let pool = mysql_async::Pool::new(database_url);

let mut conn = pool.get_conn().await?;
"CREATE TEMPORARY TABLE tmp (id INT, val TEXT)".ignore(&mut conn).await?;

// We are going to call `LOAD DATA LOCAL` so let's setup a one-time handler.
conn.set_infile_handler(async move {
// We need to return a stream of `io::Result<Bytes>`
Ok(stream::iter([Bytes::from("1,a\r\n"), Bytes::from("2,b\r\n3,c")]).map(Ok).boxed())
});

let result = r#"LOAD DATA LOCAL INFILE 'whatever'
INTO TABLE `tmp`
FIELDS TERMINATED BY ',' ENCLOSED BY '\"'
LINES TERMINATED BY '\r\n'"#.ignore(&mut conn).await;

match result {
Ok(()) => (),
Err(Error::Server(ref err)) if err.code == 1148 => {
// The used command is not allowed with this MySQL version
return Ok(());
},
Err(Error::Server(ref err)) if err.code == 3948 => {
// Loading local data is disabled;
// this must be enabled on both the client and the server
return Ok(());
}
e @ Err(_) => e.unwrap(),
}

// Now let's verify the result
let result: Vec<(u32, String)> = conn.query("SELECT * FROM tmp ORDER BY id ASC").await?;
assert_eq!(
result,
vec![(1, "a".into()), (2, "b".into()), (3, "c".into())]
);

drop(conn);
pool.disconnect().await?;
```

[1]: https://dev.mysql.com/doc/refman/8.0/en/load-data-local-security.html

Please see the crate docs – [docs.rs](https://docs.rs/mysql_async).
## Change log

### License
Available [here](https://github.com/blackbeam/mysql_async/releases)

## License

Licensed under either of
* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)

* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or https://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or https://opensource.org/licenses/MIT)

at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
additional terms or conditions.
Unless you explicitly state otherwise, any contribution intentionally
submitted for inclusion in the work by you, as defined in the Apache-2.0
license, shall be dual licensed as above, without any additional terms or
conditions.
30 changes: 30 additions & 0 deletions README.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[![Gitter](https://badges.gitter.im/rust-mysql/community.svg)](https://gitter.im/rust-mysql/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)

[![Build Status](https://dev.azure.com/aikorsky/mysql%20Rust/_apis/build/status/blackbeam.mysql_async?branchName=master)](https://dev.azure.com/aikorsky/mysql%20Rust/_build/latest?definitionId=2&branchName=master)
[![](https://meritbadge.herokuapp.com/mysql_async)](https://crates.io/crates/mysql_async)
[![](https://img.shields.io/crates/d/mysql_async.svg)](https://crates.io/crates/mysql_async)
[![API Documentation on docs.rs](https://docs.rs/mysql_async/badge.svg)](https://docs.rs/mysql_async)

# {{crate}}

{{readme}}

## Change log

Available [here](https://github.com/blackbeam/mysql_async/releases)

## License

Licensed under either of

* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or https://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or https://opensource.org/licenses/MIT)

at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally
submitted for inclusion in the work by you, as defined in the Apache-2.0
license, shall be dual licensed as above, without any additional terms or
conditions.
75 changes: 67 additions & 8 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
transaction::TxStatus,
BinaryProtocol, Queryable, TextProtocol,
},
BinlogStream, OptsBuilder,
BinlogStream, InfileData, OptsBuilder,
};

use self::routines::Routine;
Expand Down Expand Up @@ -111,6 +111,9 @@ struct ConnInner {
auth_switched: bool,
/// Connection is already disconnected.
pub(crate) disconnected: bool,
/// One-time connection-level infile handler.
infile_handler:
Option<Pin<Box<dyn Future<Output = crate::Result<InfileData>> + Send + Sync + 'static>>>,
}

impl fmt::Debug for ConnInner {
Expand Down Expand Up @@ -151,6 +154,7 @@ impl ConnInner {
auth_plugin: AuthPlugin::MysqlNativePassword,
auth_switched: false,
disconnected: false,
infile_handler: None,
}
}

Expand Down Expand Up @@ -374,6 +378,20 @@ impl Conn {
&self.inner.opts
}

/// Setup _local_ `LOCAL INFILE` handler (see ["LOCAL INFILE Handlers"][2] section
/// of the crate-level docs).
///
/// It'll overwrite existing _local_ handler, if any.
///
/// [2]: ../mysql_async/#local-infile-handlers
pub fn set_infile_handler<T>(&mut self, handler: T)
where
T: Future<Output = crate::Result<InfileData>>,
T: Send + Sync + 'static,
{
self.inner.infile_handler = Some(Box::pin(handler));
}

fn take_stream(&mut self) -> Stream {
self.inner.stream.take().unwrap()
}
Expand Down Expand Up @@ -911,6 +929,7 @@ impl Conn {
};

self.inner.stmt_cache.clear();
self.inner.infile_handler = None;
self.inner.pool = pool;
Ok(())
}
Expand Down Expand Up @@ -1022,15 +1041,16 @@ impl Conn {

#[cfg(test)]
mod test {
use futures_util::stream::StreamExt;
use bytes::Bytes;
use futures_util::stream::{self, StreamExt};
use mysql_common::binlog::events::EventData;
use tokio::time::timeout;

use std::time::Duration;

use crate::{
from_row, params, prelude::*, test_misc::get_opts, BinlogDumpFlags, BinlogRequest, Conn,
Error, OptsBuilder, Pool, WhiteListFsLocalInfileHandler,
Error, OptsBuilder, Pool, WhiteListFsHandler,
};

async fn gen_dummy_data() -> super::Result<()> {
Expand Down Expand Up @@ -1676,8 +1696,7 @@ mod test {

write(file_name, b"AAAAAA\nBBBBBB\nCCCCCC\n")?;

let opts = get_opts()
.local_infile_handler(Some(WhiteListFsLocalInfileHandler::new(&[file_name][..])));
let opts = get_opts().local_infile_handler(Some(WhiteListFsHandler::new(&[file_name][..])));

// LOCAL INFILE in the middle of a multi-result set should not break anything.
let mut conn = Conn::new(opts).await.unwrap();
Expand Down Expand Up @@ -1802,7 +1821,48 @@ mod test {
}

#[tokio::test]
async fn should_handle_local_infile() -> super::Result<()> {
async fn should_handle_local_infile_locally() -> super::Result<()> {
let mut conn = Conn::new(get_opts()).await.unwrap();
conn.query_drop("CREATE TEMPORARY TABLE tmp (a TEXT);")
.await
.unwrap();

conn.set_infile_handler(async move {
Ok(
stream::iter([Bytes::from("AAAAAA\n"), Bytes::from("BBBBBB\nCCCCCC\n")])
.map(Ok)
.boxed(),
)
});

match conn
.query_drop(r#"LOAD DATA LOCAL INFILE "dummy" INTO TABLE tmp;"#)
.await
{
Ok(_) => (),
Err(super::Error::Server(ref err)) if err.code == 1148 => {
// The used command is not allowed with this MySQL version
return Ok(());
}
Err(super::Error::Server(ref err)) if err.code == 3948 => {
// Loading local data is disabled;
// this must be enabled on both the client and server sides
return Ok(());
}
e @ Err(_) => e.unwrap(),
};

let result: Vec<String> = conn.query("SELECT * FROM tmp").await?;
assert_eq!(result.len(), 3);
assert_eq!(result[0], "AAAAAA");
assert_eq!(result[1], "BBBBBB");
assert_eq!(result[2], "CCCCCC");

Ok(())
}

#[tokio::test]
async fn should_handle_local_infile_globally() -> super::Result<()> {
use std::fs::write;

let file_path = tempfile::Builder::new().tempfile_in("").unwrap();
Expand All @@ -1811,8 +1871,7 @@ mod test {

write(file_name, b"AAAAAA\nBBBBBB\nCCCCCC\n")?;

let opts = get_opts()
.local_infile_handler(Some(WhiteListFsLocalInfileHandler::new(&[file_name][..])));
let opts = get_opts().local_infile_handler(Some(WhiteListFsHandler::new(&[file_name][..])));

let mut conn = Conn::new(opts).await.unwrap();
conn.query_drop("CREATE TEMPORARY TABLE tmp (a TEXT);")
Expand Down
Loading