From e0444a9e21d7cf9ad110e3c96452e579d126f18d Mon Sep 17 00:00:00 2001 From: blackbeam Date: Thu, 22 Mar 2018 19:23:15 +0300 Subject: [PATCH 1/3] Store remote in WhiteListFsLocalInfileHandler. Signature of LocalInfileHandler::handle changed to: fn handle(&self, file_name: &[u8]) -> BoxFuture> --- src/connection_like/mod.rs | 5 +++- src/local_infile_handler/builtin.rs | 43 +++++++++++++++++++++++------ src/local_infile_handler/mod.rs | 7 +++-- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/connection_like/mod.rs b/src/connection_like/mod.rs index 353afe85..a8519b86 100644 --- a/src/connection_like/mod.rs +++ b/src/connection_like/mod.rs @@ -388,10 +388,13 @@ pub trait ConnectionLike { let fut = parse_local_infile_packet(&*packet.0) .chain_err(|| Error::from(ErrorKind::UnexpectedPacket)) .and_then(|local_infile| match this.get_local_infile_handler() { - Some(handler) => handler.handle(local_infile.file_name_ref()), + Some(handler) => Ok((local_infile.into_owned(), handler)), None => Err(ErrorKind::NoLocalInfileHandler.into()), }) .into_future() + .and_then(|(local_infile, handler)| { + handler.handle(local_infile.file_name_ref()) + }) .and_then(|reader| { let mut buf = Vec::with_capacity(4096); unsafe { diff --git a/src/local_infile_handler/builtin.rs b/src/local_infile_handler/builtin.rs index cd3ad3b7..de77716b 100644 --- a/src/local_infile_handler/builtin.rs +++ b/src/local_infile_handler/builtin.rs @@ -6,6 +6,8 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use BoxFuture; +use lib_futures::{Future, IntoFuture, oneshot}; use std::fs; use std::io::{self, Read}; use mio::{Evented, Poll, PollOpt, Ready, Registration, Token}; @@ -16,7 +18,7 @@ use errors::*; use std::collections::HashSet; use std::str::from_utf8; use super::LocalInfileHandler; -use tokio::reactor::{Handle, PollEvented}; +use tokio::reactor::{Handle, Remote, PollEvented}; use tokio_io::AsyncRead; #[derive(Debug)] @@ -163,7 +165,7 @@ impl Evented for File { #[derive(Clone, Debug)] pub struct WhiteListFsLocalInfileHandler { white_list: HashSet, - handle: Handle, + handle: Remote, } impl WhiteListFsLocalInfileHandler { @@ -176,22 +178,45 @@ impl WhiteListFsLocalInfileHandler { for path in white_list.into_iter() { white_list_set.insert(Into::::into(path)); } - WhiteListFsLocalInfileHandler { white_list: white_list_set, handle: handle.clone() } + WhiteListFsLocalInfileHandler { + white_list: white_list_set, + handle: handle.remote().clone(), + } } } impl LocalInfileHandler for WhiteListFsLocalInfileHandler { - fn handle(&self, file_name: &[u8]) -> Result> { + fn handle(&self, file_name: &[u8]) -> BoxFuture> { let path: PathBuf = match from_utf8(file_name) { Ok(path_str) => path_str.into(), - Err(_) => bail!("Invalid file name"), + Err(_) => return Box::new(Err("Invalid file name".into()).into_future()), }; if self.white_list.contains(&path) { - Ok(Box::new( - PollEvented::new(File::new(path), &self.handle)?, - ) as Box) + match self.handle.handle() { + Some(handle) => { + let fut = PollEvented::new(File::new(path), &handle) + .map_err(Into::into) + .map(|poll_evented| Box::new(poll_evented) as Box) + .into_future(); + Box::new(fut) as BoxFuture> + }, + None => { + let (tx, rx) = oneshot(); + self.handle.spawn(|handle| { + let poll_evented_res = PollEvented::new(File::new(path), &handle); + let _ = tx.send(poll_evented_res.map_err(Error::from)); + Ok(()) + }); + let fut = rx + .map_err(|_| Error::from("Future Canceled")) + .and_then(|r| r.into_future()) + .map(|poll_evented| Box::new(poll_evented) as Box); + Box::new(fut) as BoxFuture> + } + } } else { - bail!(format!("Path `{}' is not in white list", path.display())); + let err_msg = format!("Path `{}' is not in white list", path.display()); + return Box::new(Err(err_msg.into()).into_future()); } } } diff --git a/src/local_infile_handler/mod.rs b/src/local_infile_handler/mod.rs index 9373aa60..4013f2a8 100644 --- a/src/local_infile_handler/mod.rs +++ b/src/local_infile_handler/mod.rs @@ -6,6 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +use BoxFuture; use errors::*; use std::fmt; use std::sync::Arc; @@ -32,8 +33,8 @@ pub mod builtin; /// struct ExampleHandler(&'static [u8]); /// /// impl LocalInfileHandler for ExampleHandler { -/// fn handle(&self, _: &[u8]) -> my::errors::Result> { -/// Ok(Box::new(self.0)) +/// fn handle(&self, _: &[u8]) -> Box, Error=my::errors::Error>> { +/// Box::new(futures::future::ok(Box::new(self.0) as Box)) /// } /// } /// @@ -71,7 +72,7 @@ pub mod builtin; pub trait LocalInfileHandler { /// `file_name` is the file name in `LOAD DATA LOCAL INFILE '' INTO TABLE ...;` /// query. - fn handle(&self, file_name: &[u8]) -> Result>; + fn handle(&self, file_name: &[u8]) -> BoxFuture>; } /// Object used to wrap `T: LocalInfileHandler` inside of Opts. From 415c86faa8237c5f3fe3202a3139fbaaddedb625 Mon Sep 17 00:00:00 2001 From: blackbeam Date: Thu, 22 Mar 2018 19:34:08 +0300 Subject: [PATCH 2/3] Require Send and Sync for LocalInfileHandler implementors. Also adds test, which tests that Opts satisfies Sync + Send. --- src/conn/mod.rs | 6 ++++++ src/local_infile_handler/mod.rs | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 32b7a1e9..b63f147b 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -469,6 +469,12 @@ mod test { builder } + #[test] + fn opts_should_satisfy_send_and_sync() { + struct A(T); + A(get_opts()); + } + #[test] fn should_connect() { let mut lp = Core::new().unwrap(); diff --git a/src/local_infile_handler/mod.rs b/src/local_infile_handler/mod.rs index 4013f2a8..05b3f462 100644 --- a/src/local_infile_handler/mod.rs +++ b/src/local_infile_handler/mod.rs @@ -69,7 +69,7 @@ pub mod builtin; /// lp.run(future).unwrap(); /// # } /// ``` -pub trait LocalInfileHandler { +pub trait LocalInfileHandler: Sync + Send { /// `file_name` is the file name in `LOAD DATA LOCAL INFILE '' INTO TABLE ...;` /// query. fn handle(&self, file_name: &[u8]) -> BoxFuture>; From a701da4d242466d45004a7b0842c3edf6e80d82e Mon Sep 17 00:00:00 2001 From: blackbeam Date: Thu, 22 Mar 2018 20:03:53 +0300 Subject: [PATCH 3/3] Remove unused import --- src/local_infile_handler/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/local_infile_handler/mod.rs b/src/local_infile_handler/mod.rs index 05b3f462..b9b88907 100644 --- a/src/local_infile_handler/mod.rs +++ b/src/local_infile_handler/mod.rs @@ -7,7 +7,6 @@ // modified, or distributed except according to those terms. use BoxFuture; -use errors::*; use std::fmt; use std::sync::Arc; use tokio_io::AsyncRead;