diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 92a1ce8..c459b54 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,13 +33,21 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - - name: Run tests + - name: Run tests with async off uses: actions-rs/cargo@v1 with: command: test env: CARGO_INCREMENTAL: 0 RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off" + - name: Run tests with async on + uses: actions-rs/cargo@v1 + with: + command: test + args: --all-features + env: + CARGO_INCREMENTAL: 0 + RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off" - name: Gather coverage data id: coverage uses: actions-rs/grcov@v0.1 diff --git a/Cargo.toml b/Cargo.toml index 9556522..386669f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,13 @@ edition = "2018" [features] default = ["tls"] tls = ["openssl"] +async = ["tokio"] [dependencies] byteorder = "1" url = "^2.1" rand = "0.8" enum_dispatch = "0.3" -openssl = { version = "^0.10", optional = true } r2d2 = "^0.8" +openssl = { version = "^0.10", optional = true } +tokio = { version = "^1.37", optional = true, features = ["macros", "rt", "time"] } diff --git a/src/async_client.rs b/src/async_client.rs new file mode 100644 index 0000000..f3306aa --- /dev/null +++ b/src/async_client.rs @@ -0,0 +1,577 @@ +use std::collections::HashMap; +use std::time::Duration; + +use crate::client::Stats; +use crate::error::MemcacheError; +use crate::stream::Stream; +use crate::value::{FromMemcacheValueExt, ToMemcacheValue}; +use crate::Connectable; + +use super::client as blocking; + +pub struct Client { + inner: blocking::Client, +} + +impl From for Client { + fn from(client: blocking::Client) -> Self { + Self { inner: client } + } +} + +impl Client { + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + + pub fn connect(target: C) -> Result { + Ok(blocking::Client::connect(target)?.into()) + } + + /// Get a reference to the inner `Client` object. + /// This will allow you to call methods on the `Client` object synchronously. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// let blocking_client = client.blocking(); + /// let _: Option = blocking_client.get("foo").unwrap(); + /// ``` + pub fn blocking(&self) -> &blocking::Client { + &self.inner + } + + /// Set the socket read timeout for TCP connections. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// client.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + /// ``` + pub fn set_read_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { + self.inner.set_read_timeout(timeout) + } + + /// Set the socket write timeout for TCP connections. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); + /// client.set_write_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + /// ``` + pub fn set_write_timeout(&self, timeout: Option) -> Result<(), MemcacheError> { + self.inner.set_write_timeout(timeout) + } + + /// Get the memcached server version. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.version().await.unwrap(); + /// }; + /// ``` + pub async fn version(&self) -> Result, MemcacheError> { + self.inner.version() + } + + /// Flush all cache on memcached server immediately. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn flush(&self) -> Result<(), MemcacheError> { + self.inner.flush() + } + + /// Flush all cache on memcached server with a delay seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.flush_with_delay(10).await.unwrap(); + /// }; + /// ``` + pub async fn flush_with_delay(&self, delay: u32) -> Result<(), MemcacheError> { + self.inner.flush_with_delay(delay) + } + + /// Get a key from memcached server. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// let _: Option = client.get("foo").await.unwrap(); + /// }; + /// ``` + pub async fn get(&self, key: &str) -> Result, MemcacheError> { + self.inner.get(key) + } + + /// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce network workloads. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "42", 0).await.unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).await.unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// }; + /// ``` + pub async fn gets(&self, keys: &[&str]) -> Result, MemcacheError> { + self.inner.gets(keys) + } + + /// Set a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn set>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { + self.inner.set(key, value, expiration) + } + + /// Compare and swap a key with the associate value into memcached server with expiration seconds. + /// `cas_id` should be obtained from a previous `gets` call. + /// + /// Example: + /// + /// ```rust + /// use std::collections::HashMap; + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).await.unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).await.unwrap()); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn cas>( + &self, + key: &str, + value: V, + expiration: u32, + cas_id: u64, + ) -> Result { + self.inner.cas(key, value, expiration, cas_id) + } + + /// Add a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// let key = "add_test"; + /// async { + /// client.delete(key).await.unwrap(); + /// client.add(key, "bar", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn add>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { + self.inner.add(key, value, expiration) + } + + /// Replace a key with associate value into memcached server with expiration seconds. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// let key = "replace_test"; + /// async { + /// client.set(key, "bar", 0).await.unwrap(); + /// client.replace(key, "baz", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn replace>( + &self, + key: &str, + value: V, + expiration: u32, + ) -> Result<(), MemcacheError> { + self.inner.replace(key, value, expiration) + } + + /// Append value to the key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// let key = "key_to_append"; + /// async { + /// client.set(key, "hello", 0).await.unwrap(); + /// client.append(key, ", world!").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn append>(&self, key: &str, value: V) -> Result<(), MemcacheError> { + self.inner.append(key, value) + } + + /// Prepend value to the key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// let key = "key_to_append"; + /// async { + /// client.set(key, "world!", 0).await.unwrap(); + /// client.prepend(key, "hello, ").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn prepend>(&self, key: &str, value: V) -> Result<(), MemcacheError> { + self.inner.prepend(key, value) + } + + /// Delete a key from memcached server. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.delete("foo").await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn delete(&self, key: &str) -> Result { + self.inner.delete(key) + } + + /// Increment the value with amount. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.increment("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn increment(&self, key: &str, amount: u64) -> Result { + self.inner.increment(key, amount) + } + + /// Decrement the value with amount. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// client.decrement("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn decrement(&self, key: &str, amount: u64) -> Result { + self.inner.decrement(key, amount) + } + + /// Set a new expiration time for a exist key. + /// + /// Example: + /// + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// assert_eq!(client.touch("not_exists_key", 12345).await.unwrap(), false); + /// client.set("foo", "bar", 123).await.unwrap(); + /// assert_eq!(client.touch("foo", 12345).await.unwrap(), true); + /// client.flush().await.unwrap(); + /// }; + /// ``` + pub async fn touch(&self, key: &str, expiration: u32) -> Result { + self.inner.touch(key, expiration) + } + + /// Get all servers' statistics. + /// + /// Example: + /// ```rust + /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// async { + /// let stats = client.stats().await.unwrap(); + /// }; + /// ``` + pub async fn stats(&self) -> Result, MemcacheError> { + self.inner.stats() + } +} + +pub struct ClientBuilder { + inner: blocking::ClientBuilder, +} + +impl ClientBuilder { + /// Create an empty client builder. + pub fn new() -> Self { + ClientBuilder { + inner: blocking::ClientBuilder::new(), + } + } + + /// Add a memcached server to the pool. + pub fn add_server(mut self, target: C) -> Result { + self.inner = self.inner.add_server(target)?; + Ok(self) + } + + /// Set the maximum number of connections managed by the pool. + pub fn with_max_pool_size(mut self, max_size: u32) -> Self { + self.inner = self.inner.with_max_pool_size(max_size); + self + } + + /// Set the minimum number of idle connections to maintain in the pool. + pub fn with_min_idle_conns(mut self, min_idle: u32) -> Self { + self.inner = self.inner.with_min_idle_conns(min_idle); + self + } + + /// Set the maximum lifetime of connections in the pool. + pub fn with_max_conn_lifetime(mut self, max_lifetime: Duration) -> Self { + self.inner = self.inner.with_max_conn_lifetime(max_lifetime); + self + } + + /// Set the socket read timeout for TCP connections. + pub fn with_read_timeout(mut self, read_timeout: Duration) -> Self { + self.inner = self.inner.with_read_timeout(read_timeout); + self + } + + /// Set the socket write timeout for TCP connections. + pub fn with_write_timeout(mut self, write_timeout: Duration) -> Self { + self.inner = self.inner.with_write_timeout(write_timeout); + self + } + + /// Set the connection timeout for TCP connections. + pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> Self { + self.inner = self.inner.with_connection_timeout(connection_timeout); + self + } + + /// Set the hash function for the client. + pub fn with_hash_function(mut self, hash_function: fn(&str) -> u64) -> Self { + self.inner = self.inner.with_hash_function(hash_function); + self + } + + /// Build the client. This will create a connection pool and return a client, or an error if the connection pool could not be created. + pub fn build(self) -> Result { + Ok(Client { + inner: self.inner.build()?, + }) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + #[tokio::test] + async fn build_client_happy_path() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .build() + .unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[test] + fn build_client_bad_url() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345:") + .unwrap() + .build(); + assert!(client.is_err()); + } + + #[test] + fn build_client_no_url() { + let client = super::Client::builder().build(); + assert!(client.is_err()); + + let client = super::Client::builder().add_server(Vec::::new()); + + assert!(client.is_err()); + } + + #[test] + fn build_client_with_large_pool_size() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + // This is a large pool size, but it should still be valid. + // This does make the test run very slow however. + .with_max_pool_size(100) + .build(); + assert!( + client.is_ok(), + "Expected successful client creation with large pool size" + ); + } + + #[test] + fn build_client_with_custom_hash_function() { + fn custom_hash_function(_key: &str) -> u64 { + 42 // A simple, predictable hash function for testing. + } + + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_hash_function(custom_hash_function) + .build() + .unwrap(); + + // This test assumes that the custom hash function will affect the selection of connections. + // As the implementation details of connection selection are not exposed, this test might need to be adjusted. + assert_eq!( + (client.inner.hash_function)("any_key"), + 42, + "Expected custom hash function to be used" + ); + } + + #[test] + fn build_client_zero_min_idle_conns() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_min_idle_conns(0) + .build(); + assert!(client.is_ok(), "Should handle zero min idle conns"); + } + + #[test] + fn build_client_invalid_hash_function() { + let invalid_hash_function = |_: &str| -> u64 { + panic!("This should not be called"); + }; + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_hash_function(invalid_hash_function) + .build(); + assert!(client.is_ok(), "Should handle custom hash function gracefully"); + } + + #[test] + fn build_client_with_unsupported_protocol() { + let client = super::Client::builder() + .add_server("unsupported://localhost:12345") + .unwrap() + .build(); + assert!(client.is_err(), "Expected error when using an unsupported protocol"); + } + + #[test] + fn build_client_with_all_optional_parameters() { + let client = super::Client::builder() + .add_server("memcache://localhost:12345") + .unwrap() + .with_max_pool_size(10) + .with_min_idle_conns(2) + .with_max_conn_lifetime(Duration::from_secs(30)) + .with_read_timeout(Duration::from_secs(5)) + .with_write_timeout(Duration::from_secs(5)) + .with_connection_timeout(Duration::from_secs(2)) + .build(); + assert!(client.is_ok(), "Should successfully build with all optional parameters"); + } + + #[cfg(unix)] + #[tokio::test] + async fn unix() { + let client = super::Client::connect("memcache:///tmp/memcached.sock").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_noverify() { + let client = super::Client::connect("memcache+tls://localhost:12350?verify_mode=none").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_verify() { + let client = + super::Client::connect("memcache+tls://localhost:12350?ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt") + .unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[cfg(feature = "tls")] + #[tokio::test] + async fn ssl_client_certs() { + let client = super::Client::connect("memcache+tls://localhost:12351?key_path=tests/assets/client.key&cert_path=tests/assets/client.crt&ca_path=tests/assets/RUST_MEMCACHE_TEST_CERT.crt").unwrap(); + assert!(client.version().await.unwrap()[0].1 != ""); + } + + #[tokio::test] + async fn delete() { + let client = super::Client::connect("memcache://localhost:12345").unwrap(); + client.set("an_exists_key", "value", 0).await.unwrap(); + assert_eq!(client.delete("an_exists_key").await.unwrap(), true); + assert_eq!(client.delete("a_not_exists_key").await.unwrap(), false); + } + + #[tokio::test] + async fn increment() { + let client = super::Client::connect("memcache://localhost:12345").unwrap(); + client.delete("counter").await.unwrap(); + client.set("counter", 321, 0).await.unwrap(); + assert_eq!(client.increment("counter", 123).await.unwrap(), 444); + } +} diff --git a/src/client.rs b/src/client.rs index 38274d0..6fc53ba 100644 --- a/src/client.rs +++ b/src/client.rs @@ -131,7 +131,7 @@ impl Client { Self::builder().add_server(target)?.build() } - fn get_connection(&self, key: &str) -> Pool { + pub(super) fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); return self.connections[(self.hash_function)(key) as usize % connections_count].clone(); } @@ -180,7 +180,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.version().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.version().await.unwrap(); + /// }; /// ``` pub fn version(&self) -> Result, MemcacheError> { let mut result = Vec::with_capacity(self.connections.len()); @@ -198,7 +205,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.flush().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn flush(&self) -> Result<(), MemcacheError> { for connection in self.connections.iter() { @@ -213,7 +227,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// client.flush_with_delay(10).unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// client.flush_with_delay(10).await.unwrap(); + /// }; /// ``` pub fn flush_with_delay(&self, delay: u32) -> Result<(), MemcacheError> { for connection in self.connections.iter() { @@ -228,7 +249,14 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// let _: Option = client.get("foo").unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// let _: Option = client.get("foo").await.unwrap(); + /// }; /// ``` pub fn get(&self, key: &str) -> Result, MemcacheError> { check_key_len(key)?; @@ -241,10 +269,22 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "42", 0).unwrap(); - /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).unwrap(); - /// assert_eq!(result.len(), 1); - /// assert_eq!(result["foo"], "42"); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "42", 0).unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "42", 0).await.unwrap(); + /// let result: std::collections::HashMap = client.gets(&["foo", "bar", "baz"]).await.unwrap(); + /// assert_eq!(result.len(), 1); + /// assert_eq!(result["foo"], "42"); + /// }; /// ``` pub fn gets(&self, keys: &[&str]) -> Result, MemcacheError> { for key in keys { @@ -272,8 +312,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "bar", 10).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "bar", 10).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn set>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -288,12 +338,26 @@ impl Client { /// ```rust /// use std::collections::HashMap; /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.set("foo", "bar", 10).unwrap(); - /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).unwrap(); - /// let (_, _, cas) = result.get("foo").unwrap(); - /// let cas = cas.unwrap(); - /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).unwrap()); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set("foo", "bar", 10).unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).unwrap()); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set("foo", "bar", 10).await.unwrap(); + /// let result: HashMap, u32, Option)> = client.gets(&["foo"]).await.unwrap(); + /// let (_, _, cas) = result.get("foo").unwrap(); + /// let cas = cas.unwrap(); + /// assert_eq!(true, client.cas("foo", "bar2", 10, cas).await.unwrap()); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn cas>( &self, @@ -313,9 +377,20 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "add_test"; - /// client.delete(key).unwrap(); - /// client.add(key, "bar", 100000000).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.delete(key).unwrap(); + /// client.add(key, "bar", 100000000).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.delete(key).await.unwrap(); + /// client.add(key, "bar", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn add>(&self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -329,9 +404,20 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "replace_test"; - /// client.set(key, "bar", 0).unwrap(); - /// client.replace(key, "baz", 100000000).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "bar", 0).unwrap(); + /// client.replace(key, "baz", 100000000).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "bar", 0).await.unwrap(); + /// client.replace(key, "baz", 100000000).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn replace>( &self, @@ -350,11 +436,24 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; - /// client.set(key, "hello", 0).unwrap(); - /// client.append(key, ", world!").unwrap(); - /// let result: String = client.get(key).unwrap().unwrap(); - /// assert_eq!(result, "hello, world!"); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "hello", 0).unwrap(); + /// client.append(key, ", world!").unwrap(); + /// let result: String = client.get(key).unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "hello", 0).await.unwrap(); + /// client.append(key, ", world!").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn append>(&self, key: &str, value: V) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -368,11 +467,24 @@ impl Client { /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); /// let key = "key_to_append"; - /// client.set(key, "world!", 0).unwrap(); - /// client.prepend(key, "hello, ").unwrap(); - /// let result: String = client.get(key).unwrap().unwrap(); - /// assert_eq!(result, "hello, world!"); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.set(key, "world!", 0).unwrap(); + /// client.prepend(key, "hello, ").unwrap(); + /// let result: String = client.get(key).unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.set(key, "world!", 0).await.unwrap(); + /// client.prepend(key, "hello, ").await.unwrap(); + /// let result: String = client.get(key).await.unwrap().unwrap(); + /// assert_eq!(result, "hello, world!"); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn prepend>(&self, key: &str, value: V) -> Result<(), MemcacheError> { check_key_len(key)?; @@ -385,8 +497,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.delete("foo").unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.delete("foo").unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.delete("foo").await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn delete(&self, key: &str) -> Result { check_key_len(key)?; @@ -399,8 +521,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.increment("counter", 42).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.increment("counter", 42).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.increment("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn increment(&self, key: &str, amount: u64) -> Result { check_key_len(key)?; @@ -413,8 +545,18 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// client.decrement("counter", 42).unwrap(); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// client.decrement("counter", 42).unwrap(); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// client.decrement("counter", 42).await.unwrap(); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn decrement(&self, key: &str, amount: u64) -> Result { check_key_len(key)?; @@ -427,10 +569,22 @@ impl Client { /// /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); - /// assert_eq!(client.touch("not_exists_key", 12345).unwrap(), false); - /// client.set("foo", "bar", 123).unwrap(); - /// assert_eq!(client.touch("foo", 12345).unwrap(), true); - /// # client.flush().unwrap(); + /// + /// #[cfg(not(feature = "async"))] + /// { + /// assert_eq!(client.touch("not_exists_key", 12345).unwrap(), false); + /// client.set("foo", "bar", 123).unwrap(); + /// assert_eq!(client.touch("foo", 12345).unwrap(), true); + /// client.flush().unwrap(); + /// } + /// + /// #[cfg(feature = "async")] + /// async { + /// assert_eq!(client.touch("not_exists_key", 12345).await.unwrap(), false); + /// client.set("foo", "bar", 123).await.unwrap(); + /// assert_eq!(client.touch("foo", 12345).await.unwrap(), true); + /// client.flush().await.unwrap(); + /// }; /// ``` pub fn touch(&self, key: &str, expiration: u32) -> Result { check_key_len(key)?; @@ -442,7 +596,14 @@ impl Client { /// Example: /// ```rust /// let client = memcache::Client::connect("memcache://localhost:12345").unwrap(); + /// + /// #[cfg(not(feature = "async"))] /// let stats = client.stats().unwrap(); + /// + /// #[cfg(feature = "async")] + /// async { + /// let stats = client.stats().await.unwrap(); + /// }; /// ``` pub fn stats(&self) -> Result, MemcacheError> { let mut result: Vec<(String, HashMap)> = vec![]; diff --git a/src/lib.rs b/src/lib.rs index 69e8473..c2f91c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,7 @@ -/*! +#![cfg_attr(feature = "cargo-clippy", allow(clippy::needless_return))] +#![cfg_attr( + not(feature = "async"), + doc = r#"/*! rust-memcache is a [memcached](https://memcached.org/) client written in pure rust. # Install: @@ -61,9 +64,77 @@ client.increment("counter", 2).unwrap(); let answer: i32 = client.get("counter").unwrap().unwrap(); assert_eq!(answer, 42); ``` -!*/ +!*/"# +)] +#![cfg_attr( + feature = "async", + doc = r#"/*! +rust-memcache is a [memcached](https://memcached.org/) client written in pure rust. -#![cfg_attr(feature = "cargo-clippy", allow(clippy::needless_return))] +# Install: + +The crate is called `memcache` and you can depend on it via cargo: + +```ini +[dependencies] +memcache = { version = "*", features = ["async"] } +``` + +# Features: + +- All memcached supported protocols + - Binary protocol + - ASCII protocol +- All memcached supported connections + - TCP connection + - UDP connection + - UNIX Domain socket connection + - TLS connection +- Encodings + - Typed interface + - Automatically compress + - Automatically serialize to JSON / msgpack etc +- Mutiple server support with custom key hash algorithm +- Authority + - Binary protocol (plain SASL authority) + - ASCII protocol + +# Basic usage: + +```rust +// create connection with to memcached server node: +let client = memcache::connect("memcache://127.0.0.1:12345?timeout=10&tcp_nodelay=true").unwrap(); + +async { + // flush the database: + client.flush().await.unwrap(); + + // set a string value: + client.set("foo", "bar", 0).await.unwrap(); + + // retrieve from memcached: + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + assert_eq!(value.unwrap(), "bar"); + + // prepend, append: + client.prepend("foo", "foo").await.unwrap(); + client.append("foo", "baz").await.unwrap(); + let value: String = client.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "foobarbaz"); + + // delete value: + client.delete("foo").await.unwrap(); + + // using counter: + client.set("counter", 40, 0).await.unwrap(); + client.increment("counter", 2).await.unwrap(); + let answer: i32 = client.get("counter").await.unwrap().unwrap(); + assert_eq!(answer, 42); +}; +``` +!*/"# +)] extern crate byteorder; extern crate enum_dispatch; @@ -73,14 +144,24 @@ extern crate r2d2; extern crate rand; extern crate url; +#[cfg(feature = "async")] +mod async_client; + mod client; + mod connection; mod error; mod protocol; mod stream; mod value; -pub use crate::client::{Client, ClientBuilder, Connectable}; +#[cfg(feature = "async")] +pub use crate::async_client::Client; + +#[cfg(not(feature = "async"))] +pub use crate::client::Client; + +pub use crate::client::{ClientBuilder, Connectable}; pub use crate::connection::ConnectionManager; pub use crate::error::{ClientError, CommandError, MemcacheError, ServerError}; pub use crate::stream::Stream; @@ -98,6 +179,19 @@ pub type Pool = r2d2::Pool; /// ```rust /// let client = memcache::connect("memcache://localhost:12345").unwrap(); /// ``` +#[cfg(not(feature = "async"))] +pub fn connect(target: C) -> Result { + Client::connect(target) +} + +/// Create an async memcached client instance and connect to memcached server. +/// +/// Example: +/// +/// ```rust +/// let client = memcache::connect("memcache://localhost:12345").unwrap(); +/// ``` +#[cfg(feature = "async")] pub fn connect(target: C) -> Result { Client::connect(target) } diff --git a/tests/test_ascii.rs b/tests/test_ascii.rs index 64fef61..5039a30 100644 --- a/tests/test_ascii.rs +++ b/tests/test_ascii.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::{thread, time}; #[test] +#[cfg(not(feature = "async"))] fn test_ascii() { let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); @@ -47,3 +48,52 @@ fn test_ascii() { client.stats().unwrap(); } + +#[tokio::test] +#[cfg(feature = "async")] +async fn test_ascii() { + let client = memcache::Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(); + + client.flush_with_delay(1).await.unwrap(); + thread::sleep(time::Duration::from_secs(1)); + client.flush().await.unwrap(); + + client.set("ascii_foo", "bar", 0).await.unwrap(); + let value: Option = client.get("ascii_foo").await.unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).await.unwrap(); + let values: HashMap, u32)> = client + .gets(&["ascii_foo", "ascii_baz", "not_exists_key"]) + .await + .unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + assert_eq!(String::from_utf8(ascii_foo_value.0.clone()).unwrap(), "bar".to_string()); + assert_eq!(String::from_utf8(ascii_baz_value.0.clone()).unwrap(), "qux".to_string()); + + client.touch("ascii_foo", 1000).await.unwrap(); + + let value: Option = client.get("not_exists_key").await.unwrap(); + assert_eq!(value, None); + + client.set("ascii_pend", "y", 0).await.unwrap(); + client.append("ascii_pend", "z").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, Some("yz".into())); + client.prepend("ascii_pend", "x").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, Some("xyz".into())); + + client.delete("ascii_pend").await.unwrap(); + let value: Option = client.get("ascii_pend").await.unwrap(); + assert_eq!(value, None); + + assert!(client.increment("ascii_counter", 1).await.is_err()); + client.set("ascii_counter", 3, 0).await.unwrap(); + assert_eq!(client.increment("ascii_counter", 100).await.unwrap(), 103); + assert_eq!(client.decrement("ascii_counter", 3).await.unwrap(), 100); + + client.stats().await.unwrap(); +} diff --git a/tests/tests.rs b/tests/tests.rs index 947084b..4489955 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -4,8 +4,6 @@ extern crate rand; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::iter; -use std::thread; -use std::thread::JoinHandle; use std::time; fn gen_random_key() -> String { @@ -16,237 +14,493 @@ fn gen_random_key() -> String { return String::from_utf8(bs).unwrap(); } -#[test] -fn test() { - let mut urls = vec![ - "memcache://localhost:12346?tcp_nodelay=true", - "memcache://localhost:12347?timeout=10", - "memcache://localhost:12348?protocol=ascii", - "memcache://localhost:12349?", - "memcache+tls://localhost:12350?verify_mode=none", - ]; - if cfg!(unix) { - urls.push("memcache:///tmp/memcached2.sock"); - } - let client = memcache::Client::connect(urls).unwrap(); - - client.version().unwrap(); - - client.set("foo", "bar", 0).unwrap(); - client.flush().unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - client.flush_with_delay(3).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - thread::sleep(time::Duration::from_secs(4)); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - let mut keys: Vec = Vec::new(); - for _ in 0..1000 { - let key = gen_random_key(); - keys.push(key.clone()); - client.set(key.as_str(), "xxx", 0).unwrap(); +#[cfg(not(feature = "async"))] +mod blocking { + use super::*; + use std::thread; + use std::thread::JoinHandle; + + #[test] + fn test() { + let mut urls = vec![ + "memcache://localhost:12346?tcp_nodelay=true", + "memcache://localhost:12347?timeout=10", + "memcache://localhost:12348?protocol=ascii", + "memcache://localhost:12349?", + "memcache+tls://localhost:12350?verify_mode=none", + ]; + if cfg!(unix) { + urls.push("memcache:///tmp/memcached2.sock"); + } + let client = memcache::Client::connect(urls).unwrap(); + + client.version().unwrap(); + + client.set("foo", "bar", 0).unwrap(); + client.flush().unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + client.flush_with_delay(3).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + thread::sleep(time::Duration::from_secs(4)); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).unwrap().unwrap(); + assert_eq!(value, "xxx"); + } } - for key in keys { - let value: String = client.get(key.as_str()).unwrap().unwrap(); - assert_eq!(value, "xxx"); + #[test] + fn issue74() { + use memcache::{Client, CommandError, MemcacheError}; + let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); + client.delete("issue74").unwrap(); + client.add("issue74", 1, 0).unwrap(); + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0) { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } } -} -#[test] -fn issue74() { - use memcache::{Client, CommandError, MemcacheError}; - let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); - client.delete("issue74").unwrap(); - client.add("issue74", 1, 0).unwrap(); - - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, - } + #[test] + fn udp_test() { + let urls = vec!["memcache+udp://localhost:22345"]; + let client = memcache::Client::connect(urls).unwrap(); - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, - } + client.version().unwrap(); - match client.add("issue74", 1, 0) { - Ok(_) => panic!("Should got an error!"), - Err(e) => match e { - MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), - _ => panic!("Unexpected error!"), - }, + client.set("foo", "bar", 0).unwrap(); + client.flush().unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + client.flush_with_delay(3).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + thread::sleep(time::Duration::from_secs(4)); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).unwrap(); + let value = client.add("foo", "baz", 0); + assert_eq!(value.is_err(), true); + + client.delete("foo").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, None); + + client.add("foo", "bar", 0).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bar"))); + + client.replace("foo", "baz", 0).unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("baz"))); + + client.append("foo", "bar").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("bazbar"))); + + client.prepend("foo", "bar").unwrap(); + let value: Option = client.get("foo").unwrap(); + assert_eq!(value, Some(String::from("barbazbar"))); + + client.set("fooo", 0, 0).unwrap(); + client.increment("fooo", 1).unwrap(); + let value: Option = client.get("fooo").unwrap(); + assert_eq!(value, Some(String::from("1"))); + + client.decrement("fooo", 1).unwrap(); + let value: Option = client.get("fooo").unwrap(); + assert_eq!(value, Some(String::from("0"))); + + assert_eq!(client.touch("foooo", 123).unwrap(), false); + assert_eq!(client.touch("fooo", 12345).unwrap(), true); + + // gets is not supported for udp + let value: Result, _> = client.gets(&["foo", "fooo"]); + assert_eq!(value.is_ok(), false); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).unwrap().unwrap(); + + assert_eq!(value, "xxx"); + } + + // test with multiple udp connections + let mut handles: Vec>> = Vec::new(); + for i in 0..10 { + handles.push(Some(thread::spawn(move || { + let key = format!("key{}", i); + let value = format!("value{}", i); + let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); + for j in 0..50 { + let value = format!("{}{}", value, j); + client.set(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + let result = client.add(key.as_str(), &value, 0); + assert_eq!(result.is_err(), true); + + client.delete(key.as_str()).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, None); + + client.add(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.replace(key.as_str(), &value, 0).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.append(key.as_str(), &value).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, Some(format!("{}{}", value, value))); + + client.prepend(key.as_str(), &value).unwrap(); + let result: Option = client.get(key.as_str()).unwrap(); + assert_eq!(result, Some(format!("{}{}{}", value, value, value))); + } + }))); + } + + for i in 0..10 { + handles[i].take().unwrap().join().unwrap(); + } } -} -#[test] -fn udp_test() { - let urls = vec!["memcache+udp://localhost:22345"]; - let client = memcache::Client::connect(urls).unwrap(); - - client.version().unwrap(); - - client.set("foo", "bar", 0).unwrap(); - client.flush().unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - client.flush_with_delay(3).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - thread::sleep(time::Duration::from_secs(4)); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.set("foo", "bar", 0).unwrap(); - let value = client.add("foo", "baz", 0); - assert_eq!(value.is_err(), true); - - client.delete("foo").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, None); - - client.add("foo", "bar", 0).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bar"))); - - client.replace("foo", "baz", 0).unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("baz"))); - - client.append("foo", "bar").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("bazbar"))); - - client.prepend("foo", "bar").unwrap(); - let value: Option = client.get("foo").unwrap(); - assert_eq!(value, Some(String::from("barbazbar"))); - - client.set("fooo", 0, 0).unwrap(); - client.increment("fooo", 1).unwrap(); - let value: Option = client.get("fooo").unwrap(); - assert_eq!(value, Some(String::from("1"))); - - client.decrement("fooo", 1).unwrap(); - let value: Option = client.get("fooo").unwrap(); - assert_eq!(value, Some(String::from("0"))); - - assert_eq!(client.touch("foooo", 123).unwrap(), false); - assert_eq!(client.touch("fooo", 12345).unwrap(), true); - - // gets is not supported for udp - let value: Result, _> = client.gets(&["foo", "fooo"]); - assert_eq!(value.is_ok(), false); - - let mut keys: Vec = Vec::new(); - for _ in 0..1000 { - let key = gen_random_key(); - keys.push(key.clone()); - client.set(key.as_str(), "xxx", 0).unwrap(); + #[test] + fn test_cas() { + use memcache::Client; + use std::collections::HashMap; + let clients = vec![ + Client::connect("memcache://localhost:12345").unwrap(), + Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), + ]; + for client in clients { + client.flush().unwrap(); + + client.set("ascii_foo", "bar", 0).unwrap(); + let value: Option = client.get("ascii_foo").unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).unwrap(); + + let values: HashMap, u32, Option)> = + client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + + assert!(ascii_foo_value.2.is_some()); + assert!(ascii_baz_value.2.is_some()); + assert_eq!( + true, + client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).unwrap() + ); + assert_eq!( + false, + client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).unwrap() + ); + + assert_eq!( + false, + client + .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) + .unwrap() + ); + client.flush().unwrap(); + } } +} - for key in keys { - let value: String = client.get(key.as_str()).unwrap().unwrap(); - - assert_eq!(value, "xxx"); +#[cfg(feature = "async")] +mod nonblocking { + use super::*; + + #[tokio::test] + async fn test() { + let mut urls = vec![ + "memcache://localhost:12346?tcp_nodelay=true", + "memcache://localhost:12347?timeout=10", + "memcache://localhost:12348?protocol=ascii", + "memcache://localhost:12349?", + "memcache+tls://localhost:12350?verify_mode=none", + ]; + if cfg!(unix) { + urls.push("memcache:///tmp/memcached2.sock"); + } + let client = memcache::Client::connect(urls).unwrap(); + + client.version().await.unwrap(); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush().await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush_with_delay(3).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + tokio::time::sleep(time::Duration::from_secs(4)).await; + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).await.unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).await.unwrap().unwrap(); + assert_eq!(value, "xxx"); + } } - // test with multiple udp connections - let mut handles: Vec>> = Vec::new(); - for i in 0..10 { - handles.push(Some(thread::spawn(move || { - let key = format!("key{}", i); - let value = format!("value{}", i); - let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); - for j in 0..50 { - let value = format!("{}{}", value, j); - client.set(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - let result = client.add(key.as_str(), &value, 0); - assert_eq!(result.is_err(), true); - - client.delete(key.as_str()).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, None); - - client.add(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.replace(key.as_str(), &value, 0).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result.as_ref(), Some(&value)); - - client.append(key.as_str(), &value).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, Some(format!("{}{}", value, value))); - - client.prepend(key.as_str(), &value).unwrap(); - let result: Option = client.get(key.as_str()).unwrap(); - assert_eq!(result, Some(format!("{}{}{}", value, value, value))); - } - }))); + #[tokio::test] + async fn issue74() { + use memcache::{Client, CommandError, MemcacheError}; + let client = Client::connect("memcache://localhost:12346?tcp_nodelay=true").unwrap(); + client.delete("issue74").await.unwrap(); + client.add("issue74", 1, 0).await.unwrap(); + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } + + match client.add("issue74", 1, 0).await { + Ok(_) => panic!("Should got an error!"), + Err(e) => match e { + MemcacheError::CommandError(e) => assert!(e == CommandError::KeyExists), + _ => panic!("Unexpected error!"), + }, + } } - for i in 0..10 { - handles[i].take().unwrap().join().unwrap(); + #[tokio::test] + async fn udp_test() { + let urls = vec!["memcache+udp://localhost:22345"]; + let client = memcache::Client::connect(urls).unwrap(); + + client.version().await.unwrap(); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush().await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + client.flush_with_delay(3).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + tokio::time::sleep(time::Duration::from_secs(4)).await; + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.set("foo", "bar", 0).await.unwrap(); + let value = client.add("foo", "baz", 0).await; + assert_eq!(value.is_err(), true); + + client.delete("foo").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, None); + + client.add("foo", "bar", 0).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bar"))); + + client.replace("foo", "baz", 0).await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("baz"))); + + client.append("foo", "bar").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("bazbar"))); + + client.prepend("foo", "bar").await.unwrap(); + let value: Option = client.get("foo").await.unwrap(); + assert_eq!(value, Some(String::from("barbazbar"))); + + client.set("fooo", 0, 0).await.unwrap(); + client.increment("fooo", 1).await.unwrap(); + let value: Option = client.get("fooo").await.unwrap(); + assert_eq!(value, Some(String::from("1"))); + + client.decrement("fooo", 1).await.unwrap(); + let value: Option = client.get("fooo").await.unwrap(); + assert_eq!(value, Some(String::from("0"))); + + assert_eq!(client.touch("foooo", 123).await.unwrap(), false); + assert_eq!(client.touch("fooo", 12345).await.unwrap(), true); + + // gets is not supported for udp + let value: Result, _> = client.gets(&["foo", "fooo"]).await; + assert_eq!(value.is_ok(), false); + + let mut keys: Vec = Vec::new(); + for _ in 0..1000 { + let key = gen_random_key(); + keys.push(key.clone()); + client.set(key.as_str(), "xxx", 0).await.unwrap(); + } + + for key in keys { + let value: String = client.get(key.as_str()).await.unwrap().unwrap(); + + assert_eq!(value, "xxx"); + } + + // test with multiple udp connections + let mut handles: Vec> = Vec::new(); + for i in 0..10 { + handles.push(tokio::spawn(async move { + let key = format!("key{}", i); + let value = format!("value{}", i); + let client = memcache::Client::connect("memcache://localhost:22345?udp=true").unwrap(); + for j in 0..50 { + let value = format!("{}{}", value, j); + client.set(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + let result = client.add(key.as_str(), &value, 0).await; + assert_eq!(result.is_err(), true); + + client.delete(key.as_str()).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, None); + + client.add(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.replace(key.as_str(), &value, 0).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result.as_ref(), Some(&value)); + + client.append(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}", value, value))); + + client.prepend(key.as_str(), &value).await.unwrap(); + let result: Option = client.get(key.as_str()).await.unwrap(); + assert_eq!(result, Some(format!("{}{}{}", value, value, value))); + } + })); + } + + for handle in handles.into_iter() { + handle.await.unwrap(); + } } -} -#[test] -fn test_cas() { - use memcache::Client; - use std::collections::HashMap; - let clients = vec![ - Client::connect("memcache://localhost:12345").unwrap(), - Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), - ]; - for client in clients { - client.flush().unwrap(); - - client.set("ascii_foo", "bar", 0).unwrap(); - let value: Option = client.get("ascii_foo").unwrap(); - assert_eq!(value, Some("bar".into())); - - client.set("ascii_baz", "qux", 0).unwrap(); - - let values: HashMap, u32, Option)> = - client.gets(&["ascii_foo", "ascii_baz", "not_exists_key"]).unwrap(); - assert_eq!(values.len(), 2); - let ascii_foo_value = values.get("ascii_foo").unwrap(); - let ascii_baz_value = values.get("ascii_baz").unwrap(); - - assert!(ascii_foo_value.2.is_some()); - assert!(ascii_baz_value.2.is_some()); - assert_eq!( - true, - client.cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()).unwrap() - ); - assert_eq!( - false, - client.cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()).unwrap() - ); - - assert_eq!( - false, - client - .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) - .unwrap() - ); - client.flush().unwrap(); + #[tokio::test] + async fn test_cas() { + use memcache::Client; + use std::collections::HashMap; + let clients = vec![ + Client::connect("memcache://localhost:12345").unwrap(), + Client::connect("memcache://localhost:12345?protocol=ascii").unwrap(), + ]; + for client in clients { + client.flush().await.unwrap(); + + client.set("ascii_foo", "bar", 0).await.unwrap(); + let value: Option = client.get("ascii_foo").await.unwrap(); + assert_eq!(value, Some("bar".into())); + + client.set("ascii_baz", "qux", 0).await.unwrap(); + + let values: HashMap, u32, Option)> = client + .gets(&["ascii_foo", "ascii_baz", "not_exists_key"]) + .await + .unwrap(); + assert_eq!(values.len(), 2); + let ascii_foo_value = values.get("ascii_foo").unwrap(); + let ascii_baz_value = values.get("ascii_baz").unwrap(); + + assert!(ascii_foo_value.2.is_some()); + assert!(ascii_baz_value.2.is_some()); + assert_eq!( + true, + client + .cas("ascii_foo", "bar2", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() + ); + assert_eq!( + false, + client + .cas("ascii_foo", "bar3", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() + ); + + assert_eq!( + false, + client + .cas("not_exists_key", "bar", 0, ascii_foo_value.2.unwrap()) + .await + .unwrap() + ); + client.flush().await.unwrap(); + } } }