Skip to content

Commit

Permalink
Merge 337433e into faed4e1
Browse files Browse the repository at this point in the history
  • Loading branch information
hathawsh authored Feb 9, 2020
2 parents faed4e1 + 337433e commit 51ca27b
Show file tree
Hide file tree
Showing 6 changed files with 461 additions and 87 deletions.
102 changes: 92 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,30 +189,43 @@ impl Client {
return self.get_connection(key).get()?.get(key);
}

/// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce netwark workloads.
/// Map a key to a connection index.
fn hash_key(&self, key: &str) -> usize {
let connections_count = self.connections.len();
(self.hash_function)(key) as usize % connections_count
}

/// Alias for get_multi().
pub fn gets<V: FromMemcacheValueExt>(&self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
self.get_multi(keys)
}

/// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce network workloads.
///
/// Example:
///
/// ```rust
/// let client = memcache::Client::connect("memcache://localhost:12345").unwrap();
/// client.set("foo", "42", 0).unwrap();
/// let result: std::collections::HashMap<String, String> = client.gets(&["foo", "bar", "baz"]).unwrap();
/// let result: std::collections::HashMap<String, String> = client.get_multi(&["foo", "bar", "baz"]).unwrap();
/// assert_eq!(result.len(), 1);
/// assert_eq!(result["foo"], "42");
/// ```
pub fn gets<V: FromMemcacheValueExt>(&self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
let mut con_keys: HashMap<usize, Vec<&str>> = HashMap::new();
pub fn get_multi<V, K, I>(&self, keys: I) -> Result<HashMap<String, V>, MemcacheError>
where
V: FromMemcacheValueExt,
K: AsRef<str>,
I: IntoIterator<Item = K>,
{
let mut con_keys: HashMap<usize, Vec<K>> = HashMap::new();
let mut result: HashMap<String, V> = HashMap::new();
let connections_count = self.connections.len();

for key in keys {
let connection_index = (self.hash_function)(key) as usize % connections_count;
let array = con_keys.entry(connection_index).or_insert_with(Vec::new);
array.push(key);
for k in keys {
con_keys.entry(self.hash_key(k.as_ref())).or_default().push(k);
}
for (&connection_index, keys) in con_keys.iter() {
let connection = self.connections[connection_index].clone();
result.extend(connection.get()?.gets(keys)?);
result.extend(connection.get()?.get_multi(keys)?);
}
return Ok(result);
}
Expand All @@ -230,6 +243,39 @@ impl Client {
return self.get_connection(key).get()?.set(key, value, expiration);
}

/// Set multiple keys with associated values into memcached server with expiration seconds.
///
/// Uses pipelining to reduce the number of server round trips.
///
/// Example:
///
/// ```rust
/// let client = memcache::Client::connect("memcache://localhost:12345").unwrap();
/// client.set_multi(vec![("foo", "Foo"), ("bar", "Bar")], 10).unwrap();
/// # client.flush().unwrap();
/// ```
pub fn set_multi<V, K, I>(&self, entries: I, expiration: u32) -> Result<(), MemcacheError>
where
V: ToMemcacheValue<Stream>,
K: AsRef<str>,
I: IntoIterator<Item = (K, V)>,
{
let mut entry_map: HashMap<usize, Vec<(K, V)>> = HashMap::new();
for (key, value) in entries.into_iter() {
entry_map
.entry(self.hash_key(key.as_ref()))
.or_default()
.push((key, value));
}

for (connection_index, entries_subset) in entry_map.into_iter() {
let connection = self.connections[connection_index].clone();
connection.get()?.set_multi(entries_subset, expiration)?;
}

Ok(())
}

/// Compare and swap a key with the associate value into memcached server with expiration seconds.
/// `cas_id` should be obtained from a previous `gets` call.
///
Expand Down Expand Up @@ -337,6 +383,42 @@ impl Client {
return self.get_connection(key).get()?.delete(key);
}

/// Delete keys from memcached server.
///
/// Uses pipelining to reduce the number of server round trips.
///
/// Example:
///
/// ```rust
/// let client = memcache::Client::connect("memcache://localhost:12345").unwrap();
/// client.delete_multi(&["foo", "bar"]).unwrap();
/// # client.flush().unwrap();
/// ```
pub fn delete_multi<K, I>(&self, keys: I) -> Result<HashMap<K, bool>, MemcacheError>
where
K: AsRef<str> + Eq + Hash,
I: IntoIterator<Item = K>,
{
let mut con_keys: HashMap<usize, Vec<K>> = HashMap::new();
for key in keys.into_iter() {
con_keys.entry(self.hash_key(key.as_ref())).or_default().push(key);
}

let mut result: HashMap<K, bool> = HashMap::new();
for (connection_index, keys_subset) in con_keys.into_iter() {
let connection = self.connections[connection_index].clone();
for (deleted, key) in connection
.get()?
.delete_multi(&keys_subset)?
.into_iter()
.zip(keys_subset)
{
result.insert(key, deleted);
}
}
Ok(result)
}

/// Increment the value with amount.
///
/// Example:
Expand Down
Loading

0 comments on commit 51ca27b

Please sign in to comment.