Skip to content

Commit

Permalink
Switch away from future::lock::Mutex
Browse files Browse the repository at this point in the history
Use the runtime-specific locks instead. This should resolve #20.
A test was added to make sure that this does not come up again.
  • Loading branch information
Bunogi committed Oct 19, 2020
1 parent f315307 commit d6ec723
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -30,7 +30,7 @@ members = ["deadpool-darkredis"]
async-std = { version = "1.0", features = ["attributes"], optional = true }
futures = "0.3.4"
quick-error = "2.0.0"
tokio = { version = "0.3", optional = true, features = ["io-util", "net", "macros", "time", "rt-multi-thread"] }
tokio = { version = "0.3", optional = true, features = ["io-util", "net", "macros", "time", "rt-multi-thread", "sync"] }

[dev-dependencies]
num_cpus = "1.12.0"
Expand Down
4 changes: 3 additions & 1 deletion src/connection.rs
@@ -1,10 +1,11 @@
use crate::{Command, CommandList, DataType, Error, Result, Value};
use futures::{future::BoxFuture, lock::Mutex, FutureExt};
use futures::{future::BoxFuture, FutureExt};

#[cfg(feature = "runtime_async_std")]
use async_std::{
io,
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};
#[cfg(feature = "runtime_async_std")]
use futures::{AsyncReadExt, AsyncWriteExt};
Expand All @@ -13,6 +14,7 @@ use futures::{AsyncReadExt, AsyncWriteExt};
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
net::{TcpStream, ToSocketAddrs},
sync::Mutex,
};

use std::sync::Arc;
Expand Down
5 changes: 2 additions & 3 deletions src/connection/stream.rs
@@ -1,15 +1,14 @@
use super::{Connection, Result, Value};
use futures::{
lock::Mutex,
task::{Context, Poll},
Future, FutureExt, Stream,
};
use std::{pin::Pin, sync::Arc};

#[cfg(feature = "runtime_async_std")]
use async_std::net::TcpStream;
use async_std::{net::TcpStream, sync::Mutex};
#[cfg(feature = "runtime_tokio")]
use tokio::net::TcpStream;
use tokio::{net::TcpStream, sync::Mutex};

///A message received from a channel.
#[derive(Debug, Clone)]
Expand Down
48 changes: 43 additions & 5 deletions src/connectionpool.rs
@@ -1,7 +1,12 @@
use crate::{Command, Connection, Result};
use futures::lock::{Mutex, MutexGuard};
use futures::stream::StreamExt;
use std::sync::Arc;

#[cfg(feature = "runtime_async_std")]
use async_std::sync::{Mutex, MutexGuard};
#[cfg(feature = "runtime_tokio")]
use tokio::sync::{Mutex, MutexGuard};

///A connection pool. Clones are cheap and is the expected way to send the pool around your application.
#[derive(Clone, Debug)]
pub struct ConnectionPool {
Expand Down Expand Up @@ -63,14 +68,24 @@ impl ConnectionPool {
///available.
pub async fn get(&self) -> MutexGuard<'_, Connection> {
for conn in self.connections.iter() {
if let Some(lock) = conn.try_lock() {
return lock;
#[cfg(feature = "runtime_tokio")]
{
if let Ok(lock) = conn.try_lock() {
return lock;
}
}
#[cfg(feature = "runtime_async_std")]
{
if let Some(lock) = conn.try_lock() {
return lock;
}
}
}

//No free connections found, get the first available one
let lockers = self.connections.iter().map(|l| l.lock());
futures::future::select_all(lockers).await.0
let mut lockers: futures::stream::FuturesUnordered<_> =
self.connections.iter().map(|l| l.lock()).collect();
lockers.next().await.unwrap()
}

///Create a new, owned connection using the settings of the current pool. Useful for subscribers or blocking operations that may not yield a value for a long time.
Expand Down Expand Up @@ -129,4 +144,27 @@ mod test {
Value::String("darkredis-named".to_string().into_bytes())
);
}

#[cfg_attr(feature = "runtime_tokio", tokio::test)]
#[cfg_attr(feature = "runtime_async_std", async_std::test)]
async fn timeout() {
let pool = ConnectionPool::create(crate::test::TEST_ADDRESS.into(), None, 1)
.await
.unwrap();

let mut _conn = pool.get().await;

#[cfg(feature = "runtime_tokio")]
assert!(
tokio::time::timeout(std::time::Duration::from_millis(100), pool.get())
.await
.is_err()
);
#[cfg(feature = "runtime_async_std")]
assert!(
async_std::future::timeout(std::time::Duration::from_millis(100), pool.get())
.await
.is_err()
);
}
}

0 comments on commit d6ec723

Please sign in to comment.