Skip to content

Commit

Permalink
Merge b44bee6 into faed4e1
Browse files Browse the repository at this point in the history
  • Loading branch information
hathawsh committed Feb 12, 2020
2 parents faed4e1 + b44bee6 commit 24895a2
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 87 deletions.
87 changes: 77 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ impl Client {
return self.get_connection(key).get()?.get(key);
}

/// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce netwark workloads.
/// Map a key to a connection index.
fn hash_key(&self, key: &str) -> usize {
let connections_count = self.connections.len();
(self.hash_function)(key) as usize % connections_count
}

/// Get multiple keys from memcached server. Using this function instead of calling `get` multiple times can reduce network workloads.
///
/// Example:
///
Expand All @@ -200,19 +206,20 @@ impl Client {
/// assert_eq!(result.len(), 1);
/// assert_eq!(result["foo"], "42");
/// ```
pub fn gets<V: FromMemcacheValueExt>(&self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
let mut con_keys: HashMap<usize, Vec<&str>> = HashMap::new();
pub fn gets<V, K>(&self, keys: &[K]) -> Result<HashMap<String, V>, MemcacheError>
where
V: FromMemcacheValueExt,
K: AsRef<str>,
{
let mut con_keys: HashMap<usize, Vec<&K>> = HashMap::new();
let mut result: HashMap<String, V> = HashMap::new();
let connections_count = self.connections.len();

for key in keys {
let connection_index = (self.hash_function)(key) as usize % connections_count;
let array = con_keys.entry(connection_index).or_insert_with(Vec::new);
array.push(key);
for k in keys {
con_keys.entry(self.hash_key(k.as_ref())).or_default().push(k);
}
for (&connection_index, keys) in con_keys.iter() {
for (connection_index, keys) in con_keys {
let connection = self.connections[connection_index].clone();
result.extend(connection.get()?.gets(keys)?);
result.extend(connection.get()?.gets(&keys)?);
}
return Ok(result);
}
Expand All @@ -230,6 +237,39 @@ impl Client {
return self.get_connection(key).get()?.set(key, value, expiration);
}

/// Set multiple keys with associated values into memcached server with expiration seconds.
///
/// Uses pipelining to reduce the number of server round trips.
///
/// Example:
///
/// ```rust
/// let client = memcache::Client::connect("memcache://localhost:12345").unwrap();
/// client.sets(vec![("foo", "Foo"), ("bar", "Bar")], 10).unwrap();
/// # client.flush().unwrap();
/// ```
pub fn sets<V, K, I>(&self, entries: I, expiration: u32) -> Result<(), MemcacheError>
where
V: ToMemcacheValue<Stream>,
K: AsRef<str>,
I: IntoIterator<Item = (K, V)>,
{
let mut entry_map: HashMap<usize, Vec<(K, V)>> = HashMap::new();
for (key, value) in entries {
entry_map
.entry(self.hash_key(key.as_ref()))
.or_default()
.push((key, value));
}

for (connection_index, entries_subset) in entry_map {
let connection = self.connections[connection_index].clone();
connection.get()?.sets(entries_subset, expiration)?;
}

Ok(())
}

/// Compare and swap a key with the associate value into memcached server with expiration seconds.
/// `cas_id` should be obtained from a previous `gets` call.
///
Expand Down Expand Up @@ -337,6 +377,33 @@ impl Client {
return self.get_connection(key).get()?.delete(key);
}

/// Delete multiple keys from memcached server.
///
/// Uses pipelining to reduce the number of server round trips.
///
/// Example:
///
/// ```rust
/// let client = memcache::Client::connect("memcache://localhost:12345").unwrap();
/// client.deletes(&["foo", "bar"]).unwrap();
/// # client.flush().unwrap();
/// ```
pub fn deletes<'a, K: AsRef<str> + Eq + Hash>(&self, keys: &'a [K]) -> Result<HashMap<&'a K, bool>, MemcacheError> {
let mut con_keys: HashMap<usize, Vec<&K>> = HashMap::new();
for key in keys {
con_keys.entry(self.hash_key(key.as_ref())).or_default().push(key);
}

let mut result: HashMap<&K, bool> = HashMap::new();
for (connection_index, keys_subset) in con_keys {
let connection = self.connections[connection_index].clone();
for (deleted, key) in connection.get()?.deletes(&keys_subset)?.into_iter().zip(keys_subset) {
result.insert(key, deleted);
}
}
Ok(result)
}

/// Increment the value with amount.
///
/// Example:
Expand Down

0 comments on commit 24895a2

Please sign in to comment.