Skip to content

Commit

Permalink
refactor: Avoid using async fn so &self does not get captured
Browse files Browse the repository at this point in the history
  • Loading branch information
Markus Westerlind authored and Marwes committed Dec 30, 2019
1 parent 1562fe2 commit 99cc739
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 132 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tokio-net = "0.2.0-alpha.2"
[features]
default = [ "geospatial" ]

executor = []
geospatial = []

[dev-dependencies]
Expand Down
130 changes: 66 additions & 64 deletions src/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio::net::tcp::TcpStream;
use tokio::sync::{mpsc, oneshot};

use futures::{
future::Either,
prelude::*,
ready, task,
task::{Spawn, SpawnExt},
Expand Down Expand Up @@ -188,6 +187,7 @@ impl ActualConnection {
}
}

/// Opens a connection.
pub async fn connect(connection_info: ConnectionInfo) -> RedisResult<Connection> {
let con = match *connection_info.addr {
ConnectionAddr::Tcp(ref host, port) => {
Expand All @@ -204,61 +204,64 @@ pub async fn connect(connection_info: ConnectionInfo) -> RedisResult<Connection>
Err(err) => return Err(err.into()),
};

TcpStream::connect(&socket_addr)
.map_ok(|con| ActualConnection::Tcp(WriteWrapper(BufReader::new(con))))
.await?
TcpStream::connect(&socket_addr)
.map_ok(|con| ActualConnection::Tcp(WriteWrapper(BufReader::new(con))))
.await?
}
#[cfg(unix)]
ConnectionAddr::Unix(ref path) =>
ConnectionAddr::Unix(ref path) => {
UnixStream::connect(path.clone())
.map_ok(|stream| ActualConnection::Unix(WriteWrapper(BufReader::new(stream)))).await?,

.map_ok(|stream| ActualConnection::Unix(WriteWrapper(BufReader::new(stream))))
.await?
}

#[cfg(not(unix))]
ConnectionAddr::Unix(_) => return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot connect to unix sockets \
on this platform",
))),
ConnectionAddr::Unix(_) => {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot connect to unix sockets \
on this platform",
)))
}
};

let mut rv = Connection {
con,
db: connection_info.db,
};

match connection_info.passwd {
Some(ref passwd) => {
let mut cmd = cmd("AUTH");
cmd.arg(&**passwd);
let x = cmd.query_async::<_, Value>(&mut rv).await;
match x {
Ok(Value::Okay) => (),
_ => {
fail!((
ErrorKind::AuthenticationFailed,
"Password authentication failed"
));
}
}
}
None => (),
}
let mut rv = Connection {
con,
db: connection_info.db,
};

if connection_info.db != 0 {
let mut cmd = cmd("SELECT");
cmd.arg(connection_info.db);
let result = cmd.query_async::<_, Value>(&mut rv).await;
match result {
Ok(Value::Okay) => Ok(rv),
_ => fail!((
ErrorKind::ResponseError,
"Redis server refused to switch database"
)),
match connection_info.passwd {
Some(ref passwd) => {
let mut cmd = cmd("AUTH");
cmd.arg(&**passwd);
let x = cmd.query_async::<_, Value>(&mut rv).await;
match x {
Ok(Value::Okay) => (),
_ => {
fail!((
ErrorKind::AuthenticationFailed,
"Password authentication failed"
));
}
} else {
Ok(rv)
}
}).await
}
None => (),
}

if connection_info.db != 0 {
let mut cmd = cmd("SELECT");
cmd.arg(connection_info.db);
let result = cmd.query_async::<_, Value>(&mut rv).await;
match result {
Ok(Value::Okay) => Ok(rv),
_ => fail!((
ErrorKind::ResponseError,
"Redis server refused to switch database"
)),
}
} else {
Ok(rv)
}
}

/// An async abstraction over connections.
Expand Down Expand Up @@ -563,6 +566,7 @@ where
}
}

/// A connection object bound to an executor.
#[derive(Clone)]
pub struct SharedConnection {
pipeline: Pipeline<Vec<u8>, Value, RedisError>,
Expand All @@ -571,26 +575,24 @@ pub struct SharedConnection {

impl SharedConnection {
/// Creates a shared connection from a connection and executor.
pub fn new<E>(con: Connection, executor: E) -> impl Future<Output = RedisResult<Self>>
pub fn new<E>(con: Connection, executor: E) -> RedisResult<Self>
where
E: Spawn,
{
future::lazy(|_| {
let pipeline = match con.con {
ActualConnection::Tcp(tcp) => {
let codec = ValueCodec::default().framed(tcp.0.into_inner());
Pipeline::new(codec, executor)
}
#[cfg(unix)]
ActualConnection::Unix(unix) => {
let codec = ValueCodec::default().framed(unix.0.into_inner());
Pipeline::new(codec, executor)
}
};
Ok(SharedConnection {
pipeline,
db: con.db,
})
let pipeline = match con.con {
ActualConnection::Tcp(tcp) => {
let codec = ValueCodec::default().framed(tcp.0.into_inner());
Pipeline::new(codec, executor)
}
#[cfg(unix)]
ActualConnection::Unix(unix) => {
let codec = ValueCodec::default().framed(unix.0.into_inner());
Pipeline::new(codec, executor)
}
};
Ok(SharedConnection {
pipeline,
db: con.db,
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ impl Client {
}
}

self.get_async_connection()
.and_then(move |con| crate::aio::SharedConnection::new(con, TokioExecutor))
self.get_async_connection().and_then(move |con| {
future::ready(crate::aio::SharedConnection::new(con, TokioExecutor))
})
}

/// Returns a async shared connection with a specific executor.
Expand All @@ -93,7 +94,7 @@ impl Client {
E: task::Spawn,
{
self.get_async_connection()
.and_then(move |con| crate::aio::SharedConnection::new(con, executor))
.and_then(move |con| future::ready(crate::aio::SharedConnection::new(con, executor)))
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::types::{
from_redis_value, ErrorKind, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value,
};

use futures::{future::Either, prelude::*};
use futures::prelude::*;

#[derive(Clone)]
enum Arg<D> {
Expand Down Expand Up @@ -327,14 +327,19 @@ impl Cmd {

/// Async version of `query`.
#[inline]
pub async fn query_async<C, T: FromRedisValue>(&self, con: &mut C) -> RedisResult<T>
pub fn query_async<'c, C, T: FromRedisValue>(
&self,
con: &'c mut C,
) -> impl Future<Output = RedisResult<T>> + 'c
where
C: crate::aio::ConnectionLike + Send + 'static,
T: Send + 'static,
{
let pcmd = self.get_packed_command();
let val = con.req_packed_command(pcmd).await?;
from_redis_value(&val)
async move {
let val = con.req_packed_command(pcmd).await?;
from_redis_value(&val)
}
}

/// Similar to `query()` but returns an iterator over the items of the
Expand Down
17 changes: 6 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,25 +303,20 @@
//! # #[tokio::main]
//! # async fn main() -> redis::RedisResult<()> {
//! let client = redis::Client::open("redis://127.0.0.1/").unwrap();
//! let con = client.get_async_connection().await?;
//! let mut con = client.get_async_connection().await?;
//!
//! let (con, ()) = redis::cmd("SET")
//! let () = redis::cmd("SET")
//! .arg("key1")
//! .arg(b"foo")
//! // `query_async` acts in the same way as `query` but requires the connection to be
//! // taken by value as the method returns a `Future` instead of `Result`.
//! // This connection will be returned after the future has been completed allowing it to
//! // be used again.
//! .query_async(con)
//! .query_async(&mut con)
//! .await?;
//!
//! let (con, ()) = redis::cmd("SET").arg(&["key2", "bar"]).query_async(con).await?;
//! let () = redis::cmd("SET").arg(&["key2", "bar"]).query_async(&mut con).await?;
//!
//! let result = redis::cmd("MGET")
//! .arg(&["key1", "key2"])
//! .query_async(con)
//! .await
//! .map(|t| t.1);
//! .query_async(&mut con)
//! .await;
//! assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
//! Ok(())
//! # }
Expand Down
2 changes: 1 addition & 1 deletion src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ where
match opt {
Some(value) => {
self.reader().consume(removed);
let reader = self.reader.take().unwrap();
self.reader.take().unwrap();
return Ok(value?).into();
}
None => {
Expand Down
39 changes: 22 additions & 17 deletions src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,23 @@ impl<'a> ScriptInvocation<'a> {
/// Asynchronously invokes the script and returns the result.
#[inline]
pub fn invoke_async<'c, C, T: FromRedisValue + Send + 'static>(
&'c self,
&self,
con: &'c mut C,
) -> impl Future<Output = RedisResult<T>> + 'c
where
C: aio::ConnectionLike + Clone + Send + 'static,
T: FromRedisValue + Send + 'static,
{
let mut eval_cmd = cmd("EVALSHA");
eval_cmd
.arg(self.script.hash.as_bytes())
.arg(self.keys.len())
.arg(&*self.keys)
.arg(&*self.args);

let mut load_cmd = cmd("SCRIPT");
load_cmd.arg("LOAD").arg(self.script.code.as_bytes());
async move {
let mut eval_cmd = cmd("EVALSHA");
eval_cmd
.arg(self.script.hash.as_bytes())
.arg(self.keys.len())
.arg(&*self.keys)
.arg(&*self.args);

let mut load_cmd = cmd("SCRIPT");
load_cmd.arg("LOAD").arg(self.script.code.as_bytes());

let future = {
let mut con = con.clone();
let eval_cmd = eval_cmd.clone();
Expand All @@ -182,7 +181,6 @@ impl<'a> ScriptInvocation<'a> {
con: con.clone(),
eval_cmd,
load_cmd,
status: ScriptStatus::NotLoaded,
future,
}
.await
Expand Down Expand Up @@ -217,18 +215,25 @@ where
self_.future = match self_.future {
CmdFuture::Load(ref mut future) => {
// When we're done loading the script into Redis, try eval'ing it again
let (con, _hash) = ready!(future.as_mut().poll(cx))?;
CmdFuture::Eval(self_.eval_cmd.query_async(con).boxed())
let _hash = ready!(future.as_mut().poll(cx))?;

let mut con = self_.con.clone();
let eval_cmd = self_.eval_cmd.clone();
CmdFuture::Eval(async move { eval_cmd.query_async(&mut con).await }.boxed())
}
CmdFuture::Eval(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok((con, val)) => {
Ok(val) => {
// Return the value from the script evaluation
return Ok((con, val)).into();
return Ok(val).into();
}
Err(err) => {
// Load the script into Redis if the script hash wasn't there already
if err.kind() == ErrorKind::NoScriptError {
CmdFuture::Load(self_.load_cmd.query_async(self_.con.clone()).boxed())
let load_cmd = self_.load_cmd.clone();
let mut con = self_.con.clone();
CmdFuture::Load(
async move { load_cmd.query_async(&mut con).await }.boxed(),
)
} else {
return Err(err).into();
}
Expand Down
Loading

0 comments on commit 99cc739

Please sign in to comment.