From a43d5a224ab5a477e9165387397cf22641497339 Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 16:43:32 -0500 Subject: [PATCH 1/8] :sparkles: add support for async clients While we use Tokio for testing, this will support other runtimes too. --- Cargo.toml | 4 +- src/async_client.rs | 479 ++++++++++++++++++++++++++++++++++++++++++++ src/client.rs | 17 +- src/lib.rs | 18 ++ 4 files changed, 515 insertions(+), 3 deletions(-) create mode 100644 src/async_client.rs diff --git a/Cargo.toml b/Cargo.toml index 9556522..37c4339 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,13 @@ edition = "2018" [features] default = ["tls"] tls = ["openssl"] +async = ["tokio"] [dependencies] byteorder = "1" url = "^2.1" rand = "0.8" enum_dispatch = "0.3" -openssl = { version = "^0.10", optional = true } r2d2 = "^0.8" +openssl = { version = "^0.10", optional = true } +tokio = { version = "^1.37", optional = true, features = ["macros", "rt"] } diff --git a/src/async_client.rs b/src/async_client.rs new file mode 100644 index 0000000..30f5dec --- /dev/null +++ b/src/async_client.rs @@ -0,0 +1,479 @@ +use std::collections::HashMap; +use std::time::Duration; + +use crate::client::{Client, Stats}; +use crate::error::MemcacheError; +use crate::stream::Stream; +use crate::value::{FromMemcacheValueExt, ToMemcacheValue}; +use crate::Connectable; + +pub struct AsyncClient { + inner: Client, +} + +impl From for AsyncClient { + fn from(client: Client) -> Self { + AsyncClient { inner: client } + } +} + +impl AsyncClient { + pub fn connect(target: C) -> Result { + Ok(Client::connect(target)?.into()) + } + + /// Set the socket read timeout for TCP connections. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// client.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + /// ``` + pub fn set_read_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { + self.inner.set_read_timeout(timeout) + } + + /// Set the socket write timeout for TCP connections. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345?protocol=ascii").unwrap(); + /// client.set_write_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + /// ``` + pub fn set_write_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { + self.inner.set_write_timeout(timeout) + } + + /// Get the memcached server version. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.version().await.unwrap(); + /// }; + /// ``` + pub async fn version(&self) -> Result, MemcacheError> { + self.inner.version() + } + + /// Flush all cache on memcached server immediately. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn flush(&self) -> Result<(), MemcacheError> { + self.inner.flush() + } + + /// Flush all cache on memcached server with a delay seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.flush_with_delay(10).await.unwrap(); + /// }; + /// ``` + pub async fn flush_with_delay(&self, delay: u32) -> Result<(), MemcacheError> { + self.inner.flush_with_delay(delay) + } + + /// Get a key from memcached server. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// let _: Option = client.get("foo").await.unwrap(); + /// }; + /// ``` + pub async fn get(&self, key: &str) -> Result, MemcacheError> { + self.inner.get(key) + } + + /// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce network workloads. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "42", 0).await.unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).await.unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// }; + /// ``` + pub async fn gets(&self, keys: &[&str]) -> Result, MemcacheError> { + self.inner.gets(keys) + } + + /// Set a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn set>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { + self.inner.set(key, value, expiration) + } + + /// Compare and swap a key with the associate value into memcached server with expiration seconds. + /// `cas_id` should be obtained from a previous `gets` call. + /// + /// Example: + /// + /// ```rust + /// use std::collections::HashMap; + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).await.unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).await.unwrap()); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn cas>( + &self, + key: &str, + value: V, + expiration: u32, + cas_id: u64, + ) -> Result { + self.inner.cas(key, value, expiration, cas_id) + } + + /// Add a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let key = "add_test"; + /// async { + /// client.delete(key).await.unwrap(); + /// client.add(key, "bar", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn add>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { + self.inner.add(key, value, expiration) + } + + /// Replace a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let key = "replace_test"; + /// async { + /// client.set(key, "bar", 0).await.unwrap(); + /// client.replace(key, "baz", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn replace>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { + self.inner.replace(key, value, expiration) + } + + /// Append value to the key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let key = "key_to_append"; + /// async { + /// client.set(key, "hello", 0).await.unwrap(); + /// client.append(key, ", world!").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn append>(&self, key: &str, value: V) -> Result<(), MemcacheError> { + self.inner.append(key, value) + } + + /// Prepend value to the key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let key = "key_to_append"; + /// async { + /// client.set(key, "world!", 0).await.unwrap(); + /// client.prepend(key, "hello, ").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn prepend>(&self, key: &str, value: V) -> Result<(), MemcacheError> { + self.inner.prepend(key, value) + } + + /// Delete a key from memcached server. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.delete("foo").await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn delete(&self, key: &str) -> Result { + self.inner.delete(key) + } + + /// Increment the value with amount. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.increment("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn increment(&self, key: &str, amount: u64) -> Result { + self.inner.increment(key, amount) + } + + /// Decrement the value with amount. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.decrement("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn decrement(&self, key: &str, amount: u64) -> Result { + self.inner.decrement(key, amount) + } + + /// Set a new expiration time for a exist key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// assert_eq!(client.touch("not_exists_key", 12345).await.unwrap(), false); + /// client.set("foo", "bar", 123).await.unwrap(); + /// assert_eq!(client.touch("foo", 12345).await.unwrap(), true); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn touch(&self, key: &str, expiration: u32) -> Result { + self.inner.touch(key, expiration) + } + + /// Get all servers' statistics. + /// + /// Example: + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// async { + /// let stats = client.stats().await.unwrap(); + /// }; + /// ``` + pub async fn stats(&self) -> Result, MemcacheError> { + self.inner.stats() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + #[tokio::test] + async fn build_client_happy_path() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .build_async() + .unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[test] + fn build_client_bad_url() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345:") + .unwrap() + .build_async(); + assert!(client.is_err()); + } + + #[test] + fn build_client_no_url() { + let client = super::Client::builder().build_async(); + assert!(client.is_err()); + + let client = super::Client::builder().add_server(Vec::::new()); + + assert!(client.is_err()); + } + + #[test] + fn build_client_with_large_pool_size() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + // This is a large pool size, but it should still be valid. + // This does make the test run very slow however. + .with_max_pool_size(100) + .build_async(); + assert!( + client.is_ok(), + "Expected successful client creation with large pool size" + ); + } + + #[test] + fn build_client_with_custom_hash_function() { + fn custom_hash_function(_key: &str) -> u64 { + 42 // A simple, predictable hash function for testing. + } + + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_hash_function(custom_hash_function) + .build_async() + .unwrap(); + + // This test assumes that the custom hash function will affect the selection of connections. + // As the implementation details of connection selection are not exposed, this test might need to be adjusted. + assert_eq!( + (client.inner.hash_function)("any_key"), + 42, + "Expected custom hash function to be used" + ); + } + + #[test] + fn build_client_zero_min_idle_conns() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_min_idle_conns(0) + .build_async(); + assert!(client.is_ok(), "Should handle zero min idle conns"); + } + + #[test] + fn build_client_invalid_hash_function() { + let invalid_hash_function = |_: &str| -> u64 { + panic!("This should not be called"); + }; + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_hash_function(invalid_hash_function) + .build_async(); + assert!(client.is_ok(), "Should handle custom hash function gracefully"); + } + + #[test] + fn build_client_with_unsupported_protocol() { + let client = super::Client::builder() + .add_server("unsupported://localhost:12345") + .unwrap() + .build_async(); + assert!(client.is_err(), "Expected error when using an unsupported protocol"); + } + + #[test] + fn build_client_with_all_optional_parameters() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_max_pool_size(10) + .with_min_idle_conns(2) + .with_max_conn_lifetime(Duration::from_secs(30)) + .with_read_timeout(Duration::from_secs(5)) + .with_write_timeout(Duration::from_secs(5)) + .with_connection_timeout(Duration::from_secs(2)) + .build_async(); + assert!(client.is_ok(), "Should successfully build with all optional parameters"); + } + + #[cfg(unix)] + #[tokio::test] + async fn unix() { + let client = super::AsyncClient::connect("memcache:///tmp/memcached.sock").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_noverify() { + let client = super::AsyncClient::connect("memcache+tls://localhost:12350?verify_mode=none").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_verify() { + let client = + super::AsyncClient::connect("memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt") + .unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_client_certs() { + let client = super::AsyncClient::connect("memcache+tls://localhost:12351?key_path=tests/assets/client.key&cert_path=tests/assets/client.crt&ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[tokio::test] + async fn delete() { + let client = super::AsyncClient::connect("memcache://localhost:12345").unwrap(); + client.set("an_exists_key", "value", 0).await.unwrap(); + assert_eq!(client.delete("an_exists_key").await.unwrap(), true); + assert_eq!(client.delete("a_not_exists_key").await.unwrap(), false); + } + + #[tokio::test] + async fn increment() { + let client = super::AsyncClient::connect("memcache://localhost:12345").unwrap(); + client.delete("counter").await.unwrap(); + client.set("counter", 321, 0).await.unwrap(); + assert_eq!(client.increment("counter", 123).await.unwrap(), 444); + } +} diff --git a/src/client.rs b/src/client.rs index 38274d0..19a6dfe 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,9 @@ use std::time::Duration; use url::Url; +#[cfg(feature = "async")] +use crate::async_client::AsyncClient; + use crate::connection::ConnectionManager; use crate::error::{ClientError, MemcacheError}; use crate::protocol::{Protocol, ProtocolTrait}; @@ -66,7 +69,7 @@ impl Connectable for Vec<&str> { #[derive(Clone)] pub struct Client { - connections: Vec>, + pub connections: Vec>, pub hash_function: fn(&str) -> u64, } @@ -131,7 +134,12 @@ impl Client { Self::builder().add_server(target)?.build() } - fn get_connection(&self, key: &str) -> Pool { + #[cfg(feature = "async")] + pub fn connect_async(target: C) -> Result { + Ok(Self::connect(target)?.into()) + } + + pub(super) fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); return self.connections[(self.hash_function)(key) as usize % connections_count].clone(); } @@ -587,6 +595,11 @@ impl ClientBuilder { Ok(client) } + + #[cfg(feature = "async")] + pub fn build_async(self) -> Result { + Ok(self.build()?.into()) + } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 69e8473..d8e1b48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,9 @@ extern crate r2d2; extern crate rand; extern crate url; +#[cfg(feature = "async")] +mod async_client; + mod client; mod connection; mod error; @@ -80,6 +83,9 @@ mod protocol; mod stream; mod value; +#[cfg(feature = "async")] +pub use crate::async_client::AsyncClient; + pub use crate::client::{Client, ClientBuilder, Connectable}; pub use crate::connection::ConnectionManager; pub use crate::error::{ClientError, CommandError, MemcacheError, ServerError}; @@ -101,3 +107,15 @@ pub type Pool = r2d2::Pool; pub fn connect(target: C) -> Result { Client::connect(target) } + +/// Create an async memcached client instance and connect to memcached server. +/// +/// Example: +/// +/// ```rust +/// let client = memcache::connect_async("memcache://localhost:12345").unwrap(); +/// ``` +#[cfg(feature = "async")] +pub fn connect_async(target: C) -> Result { + AsyncClient::connect(target) +} From fcef669dc6dca166d921ec691871deab5776e171 Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 16:48:18 -0500 Subject: [PATCH 2/8] connections doesn't need to be pub --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 19a6dfe..d8492da 100644 --- a/src/client.rs +++ b/src/client.rs @@ -69,7 +69,7 @@ impl Connectable for Vec<&str> { #[derive(Clone)] pub struct Client { - pub connections: Vec>, + connections: Vec>, pub hash_function: fn(&str) -> u64, } From 0311406a3cf5a4974c06e90272f7b605aad4fe29 Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 16:48:29 -0500 Subject: [PATCH 3/8] cargo fmt --- src/async_client.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/async_client.rs b/src/async_client.rs index 30f5dec..0a0c89f 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -130,7 +130,12 @@ impl AsyncClient { /// client.flush().await.unwrap(); /// }; /// ``` - pub async fn set>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { + pub async fn set>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { self.inner.set(key, value, expiration) } @@ -174,7 +179,12 @@ impl AsyncClient { /// client.flush().await.unwrap(); /// }; /// ``` - pub async fn add>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { + pub async fn add>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { self.inner.add(key, value, expiration) } @@ -448,9 +458,10 @@ mod tests { #[cfg(feature = "tls")] #[tokio::test] async fn ssl_verify() { - let client = - super::AsyncClient::connect("memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt") - .unwrap(); + let client = super::AsyncClient::connect( + "memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt", + ) + .unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } From 3de93ac8e01d61285edbf3af17d5e5e768de53a2 Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 16:51:33 -0500 Subject: [PATCH 4/8] have ci run async tests as well --- .github/workflows/ci.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 92a1ce8..e94a989 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -37,6 +37,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: test + args: --all-features env: CARGO_INCREMENTAL: 0 RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off" From 11eb0501532fb2db52029b051e730846f4b8536a Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 17:14:40 -0500 Subject: [PATCH 5/8] add escape hatch into async client to allow sync calls --- src/async_client.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/async_client.rs b/src/async_client.rs index 0a0c89f..0a9e0fc 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -22,6 +22,20 @@ impl AsyncClient { Ok(Client::connect(target)?.into()) } + /// Get a reference to the inner `Client` object. + /// This will allow you to call methods on the `Client` object synchronously. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let blocking_client = client.blocking(); + /// let _: Option = blocking_client.get("foo").unwrap(); + /// ``` + pub fn blocking(&self) -> &Client { + &self.inner + } + /// Set the socket read timeout for TCP connections. /// /// Example: From 7c774e87ebe2f8a1bf28e7b702931be0cc16c01c Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 20:43:13 -0500 Subject: [PATCH 6/8] make the async feature flag mututally exclusive with blocking --- .github/workflows/ci.yaml | 7 + Cargo.toml | 2 +- src/async_client.rs | 160 ++++++--- src/client.rs | 250 +++++++++++--- src/lib.rs | 92 ++++- tests/test_ascii.rs | 47 +++ tests/tests.rs | 684 ++++++++++++++++++++++++++------------ 7 files changed, 921 insertions(+), 321 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e94a989..499aba6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,6 +33,13 @@ jobs: uses: actions-rs/cargo@v1 with: command: build + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test + env: + CARGO_INCREMENTAL: 0 + RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off" - name: Run tests uses: actions-rs/cargo@v1 with: diff --git a/Cargo.toml b/Cargo.toml index 37c4339..386669f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ rand = "0.8" enum_dispatch = "0.3" r2d2 = "^0.8" openssl = { version = "^0.10", optional = true } -tokio = { version = "^1.37", optional = true, features = ["macros", "rt"] } +tokio = { version = "^1.37", optional = true, features = ["macros", "rt", "time"] } diff --git a/src/async_client.rs b/src/async_client.rs index 0a9e0fc..efe22d6 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -1,25 +1,31 @@ use std::collections::HashMap; use std::time::Duration; -use crate::client::{Client, Stats}; +use crate::client::Stats; use crate::error::MemcacheError; use crate::stream::Stream; use crate::value::{FromMemcacheValueExt, ToMemcacheValue}; use crate::Connectable; -pub struct AsyncClient { - inner: Client, +use super::client as blocking; + +pub struct Client { + inner: blocking::Client, } -impl From for AsyncClient { - fn from(client: Client) -> Self { - AsyncClient { inner: client } +impl From for Client { + fn from(client: blocking::Client) -> Self { + Self { inner: client } } } -impl AsyncClient { +impl Client { + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + pub fn connect(target: C) -> Result { - Ok(Client::connect(target)?.into()) + Ok(blocking::Client::connect(target)?.into()) } /// Get a reference to the inner `Client` object. @@ -28,11 +34,11 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let blocking_client = client.blocking(); /// let _: Option = blocking_client.get("foo").unwrap(); /// ``` - pub fn blocking(&self) -> &Client { + pub fn blocking(&self) -> &blocking::Client { &self.inner } @@ -41,7 +47,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// client.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); /// ``` pub fn set_read_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { @@ -53,7 +59,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345?protocol=ascii").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); /// client.set_write_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); /// ``` pub fn set_write_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { @@ -65,7 +71,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.version().await.unwrap(); /// }; @@ -79,7 +85,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.flush().await.unwrap(); /// }; @@ -93,7 +99,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.flush_with_delay(10).await.unwrap(); /// }; @@ -107,7 +113,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// let _: Option = client.get("foo").await.unwrap(); /// }; @@ -121,7 +127,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.set("foo", "42", 0).await.unwrap(); /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).await.unwrap(); @@ -138,7 +144,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.set("foo", "bar", 10).await.unwrap(); /// client.flush().await.unwrap(); @@ -160,7 +166,7 @@ impl AsyncClient { /// /// ```rust /// use std::collections::HashMap; - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.set("foo", "bar", 10).await.unwrap(); /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).await.unwrap(); @@ -185,7 +191,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "add_test"; /// async { /// client.delete(key).await.unwrap(); @@ -207,7 +213,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "replace_test"; /// async { /// client.set(key, "bar", 0).await.unwrap(); @@ -229,7 +235,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; /// async { /// client.set(key, "hello", 0).await.unwrap(); @@ -248,7 +254,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; /// async { /// client.set(key, "world!", 0).await.unwrap(); @@ -267,7 +273,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.delete("foo").await.unwrap(); /// client.flush().await.unwrap(); @@ -282,7 +288,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.increment("counter", 42).await.unwrap(); /// client.flush().await.unwrap(); @@ -297,7 +303,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// client.decrement("counter", 42).await.unwrap(); /// client.flush().await.unwrap(); @@ -312,7 +318,7 @@ impl AsyncClient { /// Example: /// /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// assert_eq!(client.touch("not_exists_key", 12345).await.unwrap(), false); /// client.set("foo", "bar", 123).await.unwrap(); @@ -328,7 +334,7 @@ impl AsyncClient { /// /// Example: /// ```rust - /// let client = memcache::AsyncClient::connect("memcache://localhost:12345").unwrap(); + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// async { /// let stats = client.stats().await.unwrap(); /// }; @@ -338,6 +344,74 @@ impl AsyncClient { } } +pub struct ClientBuilder { + inner: blocking::ClientBuilder, +} + +impl ClientBuilder { + /// Create an empty client builder. + pub fn new() -> Self { + ClientBuilder { + inner: blocking::ClientBuilder::new(), + } + } + + /// Add a memcached server to the pool. + pub fn add_server(mut self, target: C) -> Result { + self.inner = self.inner.add_server(target)?; + Ok(self) + } + + /// Set the maximum number of connections managed by the pool. + pub fn with_max_pool_size(mut self, max_size: u32) -> Self { + self.inner = self.inner.with_max_pool_size(max_size); + self + } + + /// Set the minimum number of idle connections to maintain in the pool. + pub fn with_min_idle_conns(mut self, min_idle: u32) -> Self { + self.inner = self.inner.with_min_idle_conns(min_idle); + self + } + + /// Set the maximum lifetime of connections in the pool. + pub fn with_max_conn_lifetime(mut self, max_lifetime: Duration) -> Self { + self.inner = self.inner.with_max_conn_lifetime(max_lifetime); + self + } + + /// Set the socket read timeout for TCP connections. + pub fn with_read_timeout(mut self, read_timeout: Duration) -> Self { + self.inner = self.inner.with_read_timeout(read_timeout); + self + } + + /// Set the socket write timeout for TCP connections. + pub fn with_write_timeout(mut self, write_timeout: Duration) -> Self { + self.inner = self.inner.with_write_timeout(write_timeout); + self + } + + /// Set the connection timeout for TCP connections. + pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> Self { + self.inner = self.inner.with_connection_timeout(connection_timeout); + self + } + + /// Set the hash function for the client. + pub fn with_hash_function(mut self, hash_function: fn(&str) -> u64) -> Self { + self.inner = self.inner.with_hash_function(hash_function); + self + } + + /// Build the client. This will create a connection pool and return a client, or an error if the connection pool could not be created. + pub fn build(self) -> Result { + Ok(Client { + inner: self.inner.build()?, + }) + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -347,7 +421,7 @@ mod tests { let client = super::Client::builder() .add_server("memcache://localhost:12345") .unwrap() - .build_async() + .build() .unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } @@ -357,13 +431,13 @@ mod tests { let client = super::Client::builder() .add_server("memcache://localhost:12345:") .unwrap() - .build_async(); + .build(); assert!(client.is_err()); } #[test] fn build_client_no_url() { - let client = super::Client::builder().build_async(); + let client = super::Client::builder().build(); assert!(client.is_err()); let client = super::Client::builder().add_server(Vec::::new()); @@ -379,7 +453,7 @@ mod tests { // This is a large pool size, but it should still be valid. // This does make the test run very slow however. .with_max_pool_size(100) - .build_async(); + .build(); assert!( client.is_ok(), "Expected successful client creation with large pool size" @@ -396,7 +470,7 @@ mod tests { .add_server("memcache://localhost:12345") .unwrap() .with_hash_function(custom_hash_function) - .build_async() + .build() .unwrap(); // This test assumes that the custom hash function will affect the selection of connections. @@ -414,7 +488,7 @@ mod tests { .add_server("memcache://localhost:12345") .unwrap() .with_min_idle_conns(0) - .build_async(); + .build(); assert!(client.is_ok(), "Should handle zero min idle conns"); } @@ -427,7 +501,7 @@ mod tests { .add_server("memcache://localhost:12345") .unwrap() .with_hash_function(invalid_hash_function) - .build_async(); + .build(); assert!(client.is_ok(), "Should handle custom hash function gracefully"); } @@ -436,7 +510,7 @@ mod tests { let client = super::Client::builder() .add_server("unsupported://localhost:12345") .unwrap() - .build_async(); + .build(); assert!(client.is_err(), "Expected error when using an unsupported protocol"); } @@ -451,28 +525,28 @@ mod tests { .with_read_timeout(Duration::from_secs(5)) .with_write_timeout(Duration::from_secs(5)) .with_connection_timeout(Duration::from_secs(2)) - .build_async(); + .build(); assert!(client.is_ok(), "Should successfully build with all optional parameters"); } #[cfg(unix)] #[tokio::test] async fn unix() { - let client = super::AsyncClient::connect("memcache:///tmp/memcached.sock").unwrap(); + let client = super::Client::connect("memcache:///tmp/memcached.sock").unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } #[cfg(feature = "tls")] #[tokio::test] async fn ssl_noverify() { - let client = super::AsyncClient::connect("memcache+tls://localhost:12350?verify_mode=none").unwrap(); + let client = super::Client::connect("memcache+tls://localhost:12350?verify_mode=none").unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } #[cfg(feature = "tls")] #[tokio::test] async fn ssl_verify() { - let client = super::AsyncClient::connect( + let client = super::Client::connect( "memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt", ) .unwrap(); @@ -482,13 +556,13 @@ mod tests { #[cfg(feature = "tls")] #[tokio::test] async fn ssl_client_certs() { - let client = super::AsyncClient::connect("memcache+tls://localhost:12351?key_path=tests/assets/client.key&cert_path=tests/assets/client.crt&ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt").unwrap(); + let client = super::Client::connect("memcache+tls://localhost:12351?key_path=tests/assets/client.key&cert_path=tests/assets/client.crt&ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt").unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } #[tokio::test] async fn delete() { - let client = super::AsyncClient::connect("memcache://localhost:12345").unwrap(); + let client = super::Client::connect("memcache://localhost:12345").unwrap(); client.set("an_exists_key", "value", 0).await.unwrap(); assert_eq!(client.delete("an_exists_key").await.unwrap(), true); assert_eq!(client.delete("a_not_exists_key").await.unwrap(), false); @@ -496,7 +570,7 @@ mod tests { #[tokio::test] async fn increment() { - let client = super::AsyncClient::connect("memcache://localhost:12345").unwrap(); + let client = super::Client::connect("memcache://localhost:12345").unwrap(); client.delete("counter").await.unwrap(); client.set("counter", 321, 0).await.unwrap(); assert_eq!(client.increment("counter", 123).await.unwrap(), 444); diff --git a/src/client.rs b/src/client.rs index d8492da..6fc53ba 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,9 +5,6 @@ use std::time::Duration; use url::Url; -#[cfg(feature = "async")] -use crate::async_client::AsyncClient; - use crate::connection::ConnectionManager; use crate::error::{ClientError, MemcacheError}; use crate::protocol::{Protocol, ProtocolTrait}; @@ -134,11 +131,6 @@ impl Client { Self::builder().add_server(target)?.build() } - #[cfg(feature = "async")] - pub fn connect_async(target: C) -> Result { - Ok(Self::connect(target)?.into()) - } - pub(super) fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); return self.connections[(self.hash_function)(key) as usize % connections_count].clone(); @@ -188,7 +180,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.version().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.version().await.unwrap(); + /// }; /// ``` pub fn version(&self) -> Result, MemcacheError> { let mut result = Vec::with_capacity(self.connections.len()); @@ -206,7 +205,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.flush().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn flush(&self) -> Result<(), MemcacheError> { for connection in self.connections.iter() { @@ -221,7 +227,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.flush_with_delay(10).unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.flush_with_delay(10).await.unwrap(); + /// }; /// ``` pub fn flush_with_delay(&self, delay: u32) -> Result<(), MemcacheError> { for connection in self.connections.iter() { @@ -236,7 +249,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// let _: Option = client.get("foo").unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// let _: Option = client.get("foo").await.unwrap(); + /// }; /// ``` pub fn get(&self, key: &str) -> Result, MemcacheError> { check_key_len(key)?; @@ -249,10 +269,22 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "42", 0).unwrap(); - /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).unwrap(); - /// assert_eq!(result.len(), 1); - /// assert_eq!(result["foo"], "42"); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "42", 0).unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "42", 0).await.unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).await.unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// }; /// ``` pub fn gets(&self, keys: &[&str]) -> Result, MemcacheError> { for key in keys { @@ -280,8 +312,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "bar", 10).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "bar", 10).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn set>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -296,12 +338,26 @@ impl Client { /// ```rust /// use std::collections::HashMap; /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "bar", 10).unwrap(); - /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).unwrap(); - /// let (_, _, cas) = result.get("foo").unwrap(); - /// let cas = cas.unwrap(); - /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).unwrap()); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "bar", 10).unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).unwrap()); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).await.unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).await.unwrap()); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn cas>( &self, @@ -321,9 +377,20 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "add_test"; - /// client.delete(key).unwrap(); - /// client.add(key, "bar", 100000000).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.delete(key).unwrap(); + /// client.add(key, "bar", 100000000).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.delete(key).await.unwrap(); + /// client.add(key, "bar", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn add>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -337,9 +404,20 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "replace_test"; - /// client.set(key, "bar", 0).unwrap(); - /// client.replace(key, "baz", 100000000).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "bar", 0).unwrap(); + /// client.replace(key, "baz", 100000000).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "bar", 0).await.unwrap(); + /// client.replace(key, "baz", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn replace>( &self, @@ -358,11 +436,24 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; - /// client.set(key, "hello", 0).unwrap(); - /// client.append(key, ", world!").unwrap(); - /// let result: String = client.get(key).unwrap().unwrap(); - /// assert_eq!(result, "hello, world!"); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "hello", 0).unwrap(); + /// client.append(key, ", world!").unwrap(); + /// let result: String = client.get(key).unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "hello", 0).await.unwrap(); + /// client.append(key, ", world!").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn append>(&self, key: &str, value: V) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -376,11 +467,24 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; - /// client.set(key, "world!", 0).unwrap(); - /// client.prepend(key, "hello, ").unwrap(); - /// let result: String = client.get(key).unwrap().unwrap(); - /// assert_eq!(result, "hello, world!"); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "world!", 0).unwrap(); + /// client.prepend(key, "hello, ").unwrap(); + /// let result: String = client.get(key).unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "world!", 0).await.unwrap(); + /// client.prepend(key, "hello, ").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn prepend>(&self, key: &str, value: V) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -393,8 +497,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.delete("foo").unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.delete("foo").unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.delete("foo").await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn delete(&self, key: &str) -> Result { check_key_len(key)?; @@ -407,8 +521,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.increment("counter", 42).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.increment("counter", 42).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.increment("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn increment(&self, key: &str, amount: u64) -> Result { check_key_len(key)?; @@ -421,8 +545,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.decrement("counter", 42).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.decrement("counter", 42).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.decrement("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn decrement(&self, key: &str, amount: u64) -> Result { check_key_len(key)?; @@ -435,10 +569,22 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// assert_eq!(client.touch("not_exists_key", 12345).unwrap(), false); - /// client.set("foo", "bar", 123).unwrap(); - /// assert_eq!(client.touch("foo", 12345).unwrap(), true); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// assert_eq!(client.touch("not_exists_key", 12345).unwrap(), false); + /// client.set("foo", "bar", 123).unwrap(); + /// assert_eq!(client.touch("foo", 12345).unwrap(), true); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// assert_eq!(client.touch("not_exists_key", 12345).await.unwrap(), false); + /// client.set("foo", "bar", 123).await.unwrap(); + /// assert_eq!(client.touch("foo", 12345).await.unwrap(), true); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn touch(&self, key: &str, expiration: u32) -> Result { check_key_len(key)?; @@ -450,7 +596,14 @@ impl Client { /// Example: /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// let stats = client.stats().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// let stats = client.stats().await.unwrap(); + /// }; /// ``` pub fn stats(&self) -> Result, MemcacheError> { let mut result: Vec<(String, HashMap)> = vec![]; @@ -595,11 +748,6 @@ impl ClientBuilder { Ok(client) } - - #[cfg(feature = "async")] - pub fn build_async(self) -> Result { - Ok(self.build()?.into()) - } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index d8e1b48..c2f91c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,7 @@ -/*! +#![cfg_attr(feature = "cargo-clippy", allow(clippy::needless_return))] +#![cfg_attr( + not(feature = "async"), + doc = r#"/*! rust-memcache is a [memcached](https://memcached.org/) client written in pure rust. # Install: @@ -61,9 +64,77 @@ client.increment("counter", 2).unwrap(); let answer: i32 = client.get("counter").unwrap().unwrap(); assert_eq!(answer, 42); ``` -!*/ +!*/"# +)] +#![cfg_attr( + feature = "async", + doc = r#"/*! +rust-memcache is a [memcached](https://memcached.org/) client written in pure rust. -#![cfg_attr(feature = "cargo-clippy", allow(clippy::needless_return))] +# Install: + +The crate is called `memcache` and you can depend on it via cargo: + +```ini +[dependencies] +memcache = { version = "*", features = ["async"] } +``` + +# Features: + +- All memcached supported protocols + - Binary protocol + - ASCII protocol +- All memcached supported connections + - TCP connection + - UDP connection + - UNIX Domain socket connection + - TLS connection +- Encodings + - Typed interface + - Automatically compress + - Automatically serialize to JSON / msgpack etc +- Mutiple server support with custom key hash algorithm +- Authority + - Binary protocol (plain SASL authority) + - ASCII protocol + +# Basic usage: + +```rust +// create connection with to memcached server node: +let client = memcache::connect("memcache://127.0.0.1:12345?timeout=10&tcp_nodelay=true").unwrap(); + +async { + // flush the database: + client.flush().await.unwrap(); + + // set a string value: + client.set("foo", "bar", 0).await.unwrap(); + + // retrieve from memcached: + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + assert_eq!(value.unwrap(), "bar"); + + // prepend, append: + client.prepend("foo", "foo").await.unwrap(); + client.append("foo", "baz").await.unwrap(); + let value: String = client.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "foobarbaz"); + + // delete value: + client.delete("foo").await.unwrap(); + + // using counter: + client.set("counter", 40, 0).await.unwrap(); + client.increment("counter", 2).await.unwrap(); + let answer: i32 = client.get("counter").await.unwrap().unwrap(); + assert_eq!(answer, 42); +}; +``` +!*/"# +)] extern crate byteorder; extern crate enum_dispatch; @@ -77,6 +148,7 @@ extern crate url; mod async_client; mod client; + mod connection; mod error; mod protocol; @@ -84,9 +156,12 @@ mod stream; mod value; #[cfg(feature = "async")] -pub use crate::async_client::AsyncClient; +pub use crate::async_client::Client; + +#[cfg(not(feature = "async"))] +pub use crate::client::Client; -pub use crate::client::{Client, ClientBuilder, Connectable}; +pub use crate::client::{ClientBuilder, Connectable}; pub use crate::connection::ConnectionManager; pub use crate::error::{ClientError, CommandError, MemcacheError, ServerError}; pub use crate::stream::Stream; @@ -104,6 +179,7 @@ pub type Pool = r2d2::Pool; /// ```rust /// let client = memcache::connect("memcache://localhost:12345").unwrap(); /// ``` +#[cfg(not(feature = "async"))] pub fn connect(target: C) -> Result { Client::connect(target) } @@ -113,9 +189,9 @@ pub fn connect(target: C) -> Result { /// Example: /// /// ```rust -/// let client = memcache::connect_async("memcache://localhost:12345").unwrap(); +/// let client = memcache::connect("memcache://localhost:12345").unwrap(); /// ``` #[cfg(feature = "async")] -pub fn connect_async(target: C) -> Result { - AsyncClient::connect(target) +pub fn connect(target: C) -> Result { + Client::connect(target) } diff --git a/tests/test_ascii.rs b/tests/test_ascii.rs index 64fef61..4003702 100644 --- a/tests/test_ascii.rs +++ b/tests/test_ascii.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::{thread, time}; #[test] +#[cfg(not(feature = "async"))] fn test_ascii() { let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); @@ -47,3 +48,49 @@ fn test_ascii() { client.stats().unwrap(); } + +#[tokio::test] +#[cfg(feature = "async")] +async fn test_ascii() { + let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); + + client.flush_with_delay(1).await.unwrap(); + thread::sleep(time::Duration::from_secs(1)); + client.flush().await.unwrap(); + + client.set("ascii_foo", "bar", 0).await.unwrap(); + let value: Option = client.get("ascii_foo").await.unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).await.unwrap(); + let values: HashMap, u32)> = client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).await.unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + assert_eq!(String::from_utf8(ascii_foo_value.0.clone()).unwrap(), "bar".to_string()); + assert_eq!(String::from_utf8(ascii_baz_value.0.clone()).unwrap(), "qux".to_string()); + + client.touch("ascii_foo", 1000).await.unwrap(); + + let value: Option = client.get("not_exists_key").await.unwrap(); + assert_eq!(value, None); + + client.set("ascii_pend", "y", 0).await.unwrap(); + client.append("ascii_pend", "z").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, Some("yz".into())); + client.prepend("ascii_pend", "x").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, Some("xyz".into())); + + client.delete("ascii_pend").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, None); + + assert!(client.increment("ascii_counter", 1).await.is_err()); + client.set("ascii_counter", 3, 0).await.unwrap(); + assert_eq!(client.increment("ascii_counter", 100).await.unwrap(), 103); + assert_eq!(client.decrement("ascii_counter", 3).await.unwrap(), 100); + + client.stats().await.unwrap(); +} diff --git a/tests/tests.rs b/tests/tests.rs index 947084b..f7e899f 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -4,8 +4,6 @@ extern crate rand; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::iter; -use std::thread; -use std::thread::JoinHandle; use std::time; fn gen_random_key() -> String { @@ -16,237 +14,487 @@ fn gen_random_key() -> String { return String::from_utf8(bs).unwrap(); } -#[test] -fn test() { - let mut urls = vec![ - "memcache://localhost:12346?tcp_nodelay=true", - "memcache://localhost:12347?timeout=10", - "memcache://localhost:12348?protocol=ascii", - "memcache://localhost:12349?", - "memcache+tls://localhost:12350?verify_mode=none", - ]; - if cfg!(unix) { - urls.push("memcache:///tmp/memcached2.sock"); - } - let client = memcache::Client::connect(urls).unwrap(); - - client.version().unwrap(); - - client.set("foo", "bar", 0).unwrap(); - client.flush().unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - client.flush_with_delay(3).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - thread::sleep(time::Duration::from_secs(4)); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - let mut keys: Vec = Vec::new(); - for _ in 0..1000 { - let key = gen_random_key(); - keys.push(key.clone()); - client.set(key.as_str(), "xxx", 0).unwrap(); +#[cfg(not(feature = "async"))] +mod blocking { + use super::*; + use std::thread; + use std::thread::JoinHandle; + + #[test] + fn test() { + let mut urls = vec![ + "memcache://localhost:12346?tcp_nodelay=true", + "memcache://localhost:12347?timeout=10", + "memcache://localhost:12348?protocol=ascii", + "memcache://localhost:12349?", + "memcache+tls://localhost:12350?verify_mode=none", + ]; + if cfg!(unix) { + urls.push("memcache:///tmp/memcached2.sock"); + } + let client = memcache::Client::connect(urls).unwrap(); + + client.version().unwrap(); + + client.set("foo", "bar", 0).unwrap(); + client.flush().unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + client.flush_with_delay(3).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + thread::sleep(time::Duration::from_secs(4)); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).unwrap().unwrap(); + assert_eq!(value, "xxx"); + } } - for key in keys { - let value: String = client.get(key.as_str()).unwrap().unwrap(); - assert_eq!(value, "xxx"); + #[test] + fn issue74() { + use memcache::{Client, CommandError, MemcacheError}; + let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); + client.delete("issue74").unwrap(); + client.add("issue74", 1, 0).unwrap(); + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } } -} -#[test] -fn issue74() { - use memcache::{Client, CommandError, MemcacheError}; - let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); - client.delete("issue74").unwrap(); - client.add("issue74", 1, 0).unwrap(); - - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, - } + #[test] + fn udp_test() { + let urls = vec!["memcache+udp://localhost:22345"]; + let client = memcache::Client::connect(urls).unwrap(); - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, - } + client.version().unwrap(); - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, + client.set("foo", "bar", 0).unwrap(); + client.flush().unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + client.flush_with_delay(3).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + thread::sleep(time::Duration::from_secs(4)); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + let value = client.add("foo", "baz", 0); + assert_eq!(value.is_err(), true); + + client.delete("foo").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.add("foo", "bar", 0).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + + client.replace("foo", "baz", 0).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("baz"))); + + client.append("foo", "bar").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bazbar"))); + + client.prepend("foo", "bar").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("barbazbar"))); + + client.set("fooo", 0, 0).unwrap(); + client.increment("fooo", 1).unwrap(); + let value: Option = client.get("fooo").unwrap(); + assert_eq!(value, Some(String::from("1"))); + + client.decrement("fooo", 1).unwrap(); + let value: Option = client.get("fooo").unwrap(); + assert_eq!(value, Some(String::from("0"))); + + assert_eq!(client.touch("foooo", 123).unwrap(), false); + assert_eq!(client.touch("fooo", 12345).unwrap(), true); + + // gets is not supported for udp + let value: Result, _> = client.gets(&["foo", "fooo"]); + assert_eq!(value.is_ok(), false); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).unwrap().unwrap(); + + assert_eq!(value, "xxx"); + } + + // test with multiple udp connections + let mut handles: Vec>> = Vec::new(); + for i in 0..10 { + handles.push(Some(thread::spawn(move || { + let key = format!("key{}", i); + let value = format!("value{}", i); + let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); + for j in 0..50 { + let value = format!("{}{}", value, j); + client.set(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + let result = client.add(key.as_str(), &value, 0); + assert_eq!(result.is_err(), true); + + client.delete(key.as_str()).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, None); + + client.add(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.replace(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.append(key.as_str(), &value).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, Some(format!("{}{}", value, value))); + + client.prepend(key.as_str(), &value).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, Some(format!("{}{}{}", value, value, value))); + } + }))); + } + + for i in 0..10 { + handles[i].take().unwrap().join().unwrap(); + } } -} -#[test] -fn udp_test() { - let urls = vec!["memcache+udp://localhost:22345"]; - let client = memcache::Client::connect(urls).unwrap(); - - client.version().unwrap(); - - client.set("foo", "bar", 0).unwrap(); - client.flush().unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - client.flush_with_delay(3).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - thread::sleep(time::Duration::from_secs(4)); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - let value = client.add("foo", "baz", 0); - assert_eq!(value.is_err(), true); - - client.delete("foo").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.add("foo", "bar", 0).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - - client.replace("foo", "baz", 0).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("baz"))); - - client.append("foo", "bar").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bazbar"))); - - client.prepend("foo", "bar").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("barbazbar"))); - - client.set("fooo", 0, 0).unwrap(); - client.increment("fooo", 1).unwrap(); - let value: Option = client.get("fooo").unwrap(); - assert_eq!(value, Some(String::from("1"))); - - client.decrement("fooo", 1).unwrap(); - let value: Option = client.get("fooo").unwrap(); - assert_eq!(value, Some(String::from("0"))); - - assert_eq!(client.touch("foooo", 123).unwrap(), false); - assert_eq!(client.touch("fooo", 12345).unwrap(), true); - - // gets is not supported for udp - let value: Result, _> = client.gets(&["foo", "fooo"]); - assert_eq!(value.is_ok(), false); - - let mut keys: Vec = Vec::new(); - for _ in 0..1000 { - let key = gen_random_key(); - keys.push(key.clone()); - client.set(key.as_str(), "xxx", 0).unwrap(); + #[test] + fn test_cas() { + use memcache::Client; + use std::collections::HashMap; + let clients = vec![ + Client::connect("memcache://localhost:12345").unwrap(), + Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), + ]; + for client in clients { + client.flush().unwrap(); + + client.set("ascii_foo", "bar", 0).unwrap(); + let value: Option = client.get("ascii_foo").unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).unwrap(); + + let values: HashMap, u32, Option)> = + client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + + assert!(ascii_foo_value.2.is_some()); + assert!(ascii_baz_value.2.is_some()); + assert_eq!( + true, + client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).unwrap() + ); + assert_eq!( + false, + client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).unwrap() + ); + + assert_eq!( + false, + client + .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) + .unwrap() + ); + client.flush().unwrap(); + } } +} - for key in keys { - let value: String = client.get(key.as_str()).unwrap().unwrap(); - - assert_eq!(value, "xxx"); +#[cfg(feature = "async")] +mod nonblocking { + use super::*; + + #[tokio::test] + async fn test() { + let mut urls = vec![ + "memcache://localhost:12346?tcp_nodelay=true", + "memcache://localhost:12347?timeout=10", + "memcache://localhost:12348?protocol=ascii", + "memcache://localhost:12349?", + "memcache+tls://localhost:12350?verify_mode=none", + ]; + if cfg!(unix) { + urls.push("memcache:///tmp/memcached2.sock"); + } + let client = memcache::Client::connect(urls).unwrap(); + + client.version().await.unwrap(); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush().await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush_with_delay(3).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + tokio::time::sleep(time::Duration::from_secs(4)).await; + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).await.unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).await.unwrap().unwrap(); + assert_eq!(value, "xxx"); + } } - // test with multiple udp connections - let mut handles: Vec>> = Vec::new(); - for i in 0..10 { - handles.push(Some(thread::spawn(move || { - let key = format!("key{}", i); - let value = format!("value{}", i); - let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); - for j in 0..50 { - let value = format!("{}{}", value, j); - client.set(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - let result = client.add(key.as_str(), &value, 0); - assert_eq!(result.is_err(), true); - - client.delete(key.as_str()).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, None); - - client.add(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.replace(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.append(key.as_str(), &value).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, Some(format!("{}{}", value, value))); - - client.prepend(key.as_str(), &value).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, Some(format!("{}{}{}", value, value, value))); - } - }))); + #[tokio::test] + async fn issue74() { + use memcache::{Client, CommandError, MemcacheError}; + let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); + client.delete("issue74").await.unwrap(); + client.add("issue74", 1, 0).await.unwrap(); + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } } - for i in 0..10 { - handles[i].take().unwrap().join().unwrap(); + #[tokio::test] + async fn udp_test() { + let urls = vec!["memcache+udp://localhost:22345"]; + let client = memcache::Client::connect(urls).unwrap(); + + client.version().await.unwrap(); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush().await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush_with_delay(3).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + tokio::time::sleep(time::Duration::from_secs(4)).await; + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + let value = client.add("foo", "baz", 0).await; + assert_eq!(value.is_err(), true); + + client.delete("foo").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.add("foo", "bar", 0).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + + client.replace("foo", "baz", 0).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("baz"))); + + client.append("foo", "bar").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bazbar"))); + + client.prepend("foo", "bar").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("barbazbar"))); + + client.set("fooo", 0, 0).await.unwrap(); + client.increment("fooo", 1).await.unwrap(); + let value: Option = client.get("fooo").await.unwrap(); + assert_eq!(value, Some(String::from("1"))); + + client.decrement("fooo", 1).await.unwrap(); + let value: Option = client.get("fooo").await.unwrap(); + assert_eq!(value, Some(String::from("0"))); + + assert_eq!(client.touch("foooo", 123).await.unwrap(), false); + assert_eq!(client.touch("fooo", 12345).await.unwrap(), true); + + // gets is not supported for udp + let value: Result, _> = client.gets(&["foo", "fooo"]).await; + assert_eq!(value.is_ok(), false); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).await.unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).await.unwrap().unwrap(); + + assert_eq!(value, "xxx"); + } + + // test with multiple udp connections + let mut handles: Vec> = Vec::new(); + for i in 0..10 { + handles.push(tokio::spawn( + async move { + let key = format!("key{}", i); + let value = format!("value{}", i); + let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); + for j in 0..50 { + let value = format!("{}{}", value, j); + client.set(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + let result = client.add(key.as_str(), &value, 0).await; + assert_eq!(result.is_err(), true); + + client.delete(key.as_str()).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, None); + + client.add(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.replace(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.append(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}", value, value))); + + client.prepend(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}{}", value, value, value))); + } + } + )); + } + + for handle in handles.into_iter() { + handle.await.unwrap(); + } } -} -#[test] -fn test_cas() { - use memcache::Client; - use std::collections::HashMap; - let clients = vec![ - Client::connect("memcache://localhost:12345").unwrap(), - Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), - ]; - for client in clients { - client.flush().unwrap(); - - client.set("ascii_foo", "bar", 0).unwrap(); - let value: Option = client.get("ascii_foo").unwrap(); - assert_eq!(value, Some("bar".into())); - - client.set("ascii_baz", "qux", 0).unwrap(); - - let values: HashMap, u32, Option)> = - client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).unwrap(); - assert_eq!(values.len(), 2); - let ascii_foo_value = values.get("ascii_foo").unwrap(); - let ascii_baz_value = values.get("ascii_baz").unwrap(); - - assert!(ascii_foo_value.2.is_some()); - assert!(ascii_baz_value.2.is_some()); - assert_eq!( - true, - client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).unwrap() - ); - assert_eq!( - false, - client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).unwrap() - ); - - assert_eq!( - false, - client - .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) - .unwrap() - ); - client.flush().unwrap(); + #[tokio::test] + async fn test_cas() { + use memcache::Client; + use std::collections::HashMap; + let clients = vec![ + Client::connect("memcache://localhost:12345").unwrap(), + Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), + ]; + for client in clients { + client.flush().await.unwrap(); + + client.set("ascii_foo", "bar", 0).await.unwrap(); + let value: Option = client.get("ascii_foo").await.unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).await.unwrap(); + + let values: HashMap, u32, Option)> = + client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).await.unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + + assert!(ascii_foo_value.2.is_some()); + assert!(ascii_baz_value.2.is_some()); + assert_eq!( + true, + client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).await.unwrap() + ); + assert_eq!( + false, + client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).await.unwrap() + ); + + assert_eq!( + false, + client + .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() + ); + client.flush().await.unwrap(); + } } } From c9a651fe289648fcb4997a6bb82eacf986fd68d2 Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 20:44:32 -0500 Subject: [PATCH 7/8] cargo fmt --- src/async_client.rs | 7 ++-- tests/test_ascii.rs | 5 ++- tests/tests.rs | 84 ++++++++++++++++++++++++--------------------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/async_client.rs b/src/async_client.rs index efe22d6..f3306aa 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -546,10 +546,9 @@ mod tests { #[cfg(feature = "tls")] #[tokio::test] async fn ssl_verify() { - let client = super::Client::connect( - "memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt", - ) - .unwrap(); + let client = + super::Client::connect("memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt") + .unwrap(); assert!(client.version().await.unwrap()[0].1 != ""); } diff --git a/tests/test_ascii.rs b/tests/test_ascii.rs index 4003702..5039a30 100644 --- a/tests/test_ascii.rs +++ b/tests/test_ascii.rs @@ -63,7 +63,10 @@ async fn test_ascii() { assert_eq!(value, Some("bar".into())); client.set("ascii_baz", "qux", 0).await.unwrap(); - let values: HashMap, u32)> = client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).await.unwrap(); + let values: HashMap, u32)> = client + .gets(&["ascii_foo", "ascii_baz", "not_exists_key"]) + .await + .unwrap(); assert_eq!(values.len(), 2); let ascii_foo_value = values.get("ascii_foo").unwrap(); let ascii_baz_value = values.get("ascii_baz").unwrap(); diff --git a/tests/tests.rs b/tests/tests.rs index f7e899f..4489955 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -410,42 +410,40 @@ mod nonblocking { // test with multiple udp connections let mut handles: Vec> = Vec::new(); for i in 0..10 { - handles.push(tokio::spawn( - async move { - let key = format!("key{}", i); - let value = format!("value{}", i); - let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); - for j in 0..50 { - let value = format!("{}{}", value, j); - client.set(key.as_str(), &value, 0).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - let result = client.add(key.as_str(), &value, 0).await; - assert_eq!(result.is_err(), true); - - client.delete(key.as_str()).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result, None); - - client.add(key.as_str(), &value, 0).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.replace(key.as_str(), &value, 0).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.append(key.as_str(), &value).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result, Some(format!("{}{}", value, value))); - - client.prepend(key.as_str(), &value).await.unwrap(); - let result: Option = client.get(key.as_str()).await.unwrap(); - assert_eq!(result, Some(format!("{}{}{}", value, value, value))); - } + handles.push(tokio::spawn(async move { + let key = format!("key{}", i); + let value = format!("value{}", i); + let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); + for j in 0..50 { + let value = format!("{}{}", value, j); + client.set(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + let result = client.add(key.as_str(), &value, 0).await; + assert_eq!(result.is_err(), true); + + client.delete(key.as_str()).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, None); + + client.add(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.replace(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.append(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}", value, value))); + + client.prepend(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}{}", value, value, value))); } - )); + })); } for handle in handles.into_iter() { @@ -470,8 +468,10 @@ mod nonblocking { client.set("ascii_baz", "qux", 0).await.unwrap(); - let values: HashMap, u32, Option)> = - client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).await.unwrap(); + let values: HashMap, u32, Option)> = client + .gets(&["ascii_foo", "ascii_baz", "not_exists_key"]) + .await + .unwrap(); assert_eq!(values.len(), 2); let ascii_foo_value = values.get("ascii_foo").unwrap(); let ascii_baz_value = values.get("ascii_baz").unwrap(); @@ -480,11 +480,17 @@ mod nonblocking { assert!(ascii_baz_value.2.is_some()); assert_eq!( true, - client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).await.unwrap() + client + .cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() ); assert_eq!( false, - client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).await.unwrap() + client + .cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() ); assert_eq!( From 864cc893f63f478464284cd599d8e5b7c8ecf0fc Mon Sep 17 00:00:00 2001 From: Shahab Dogar Date: Wed, 10 Apr 2024 20:48:54 -0500 Subject: [PATCH 8/8] give tests more descriptive names --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 499aba6..c459b54 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,14 +33,14 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - - name: Run tests + - name: Run tests with async off uses: actions-rs/cargo@v1 with: command: test env: CARGO_INCREMENTAL: 0 RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off" - - name: Run tests + - name: Run tests with async on uses: actions-rs/cargo@v1 with: command: test