Skip to content

Commit

Permalink
Implement most of the suggestions by @letmutx.
Browse files Browse the repository at this point in the history
- In ascii::gets, instead of building a string, use BufWriter. Also
  remove unnecessary allocation. There is some necessary allocation
  due to the signature of the method. We want to maintain the signature.

- Don't add unnecessary indentation just to drop variables. Instead,
  use drop() or avoid creating variables that need to be dropped.

- Reworked the way we collect pipelined responses from the server.
  Now there's a simple ``final_result`` var and we update that var
  for each result.

- Handle recoverable errors differently from unrecoverable errors.
  Exit fast on most errors, since they're unrecoverable, but collect
  all responses on CommandError, since CommandError means we got
  a full response.
  • Loading branch information
hathawsh committed Feb 12, 2020
1 parent 29f88ff commit 25ddf82
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 92 deletions.
170 changes: 92 additions & 78 deletions src/protocol/ascii.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::io::{Read, Write};
use std::io::{BufWriter, Read, Write};

use super::check_key_len;
use client::Stats;
Expand Down Expand Up @@ -182,79 +182,86 @@ impl AsciiProtocol<Stream> {
let noreply = if options.noreply { " noreply" } else { "" };
let mut sent_count = 0;

{
let reader = self.reader.get_mut();
for (key_ref, value) in entries.into_iter() {
let key = key_ref.as_ref();
check_key_len(key)?;
if options.cas.is_some() {
write!(
reader,
"{command} {key} {flags} {exptime} {vlen} {cas}{noreply}\r\n",
command = command,
key = key,
flags = value.get_flags(),
exptime = options.exptime,
vlen = value.get_length(),
cas = options.cas.unwrap(),
noreply = noreply
)?;
} else {
write!(
reader,
"{command} {key} {flags} {exptime} {vlen}{noreply}\r\n",
command = command,
key = key,
flags = value.get_flags(),
exptime = options.exptime,
vlen = value.get_length(),
noreply = noreply
)?;
}

value.write_to(reader)?;
reader.write(b"\r\n")?;
sent_count += 1;
for (key_ref, value) in entries.into_iter() {
let key = key_ref.as_ref();
check_key_len(key)?;
if options.cas.is_some() {
write!(
self.reader.get_mut(),
"{command} {key} {flags} {exptime} {vlen} {cas}{noreply}\r\n",
command = command,
key = key,
flags = value.get_flags(),
exptime = options.exptime,
vlen = value.get_length(),
cas = options.cas.unwrap(),
noreply = noreply
)?;
} else {
write!(
self.reader.get_mut(),
"{command} {key} {flags} {exptime} {vlen}{noreply}\r\n",
command = command,
key = key,
flags = value.get_flags(),
exptime = options.exptime,
vlen = value.get_length(),
noreply = noreply
)?;
}

// Flush now that all the requests have been written.
reader.flush()?;
value.write_to(self.reader.get_mut())?;
self.reader.get_mut().write_all(b"\r\n")?;
sent_count += 1;
}

// Flush now that all the requests have been written.
self.reader.get_mut().flush()?;

if options.noreply {
return Ok(true);
}

// Receive all the responses. If there were errors, return the first.
// In order to keep the client in sync with the server,
// read all the responses, even after an EXISTS or NOT_FOUND
// error, unless some other error occurs.
// If there were errors, return the first error.

let mut final_result = Ok(true);

let mut all_stored = true;
let mut error_list: Vec<MemcacheError> = Vec::new();
for _ in 0..sent_count {
let one_result = self.reader.read_line(|response| {
let response = MemcacheError::try_from(response)?;
match response {
"STORED\r\n" => Ok(true),
"NOT_STORED\r\n" => Ok(false),
"EXISTS\r\n" => Err(CommandError::KeyExists)?,
"NOT_FOUND\r\n" => Err(CommandError::KeyNotFound)?,
response => Err(ServerError::BadResponse(Cow::Owned(response.into())))?,
"EXISTS\r\n" => Err(CommandError::KeyExists.into()),
"NOT_FOUND\r\n" => Err(CommandError::KeyNotFound.into()),
response => Err(ServerError::BadResponse(Cow::Owned(response.into())).into()),
}
});
match one_result {
Ok(true) => (),
Ok(false) => all_stored = false,
Err(e) => error_list.push(e),
Ok(false) => {
if let Ok(true) = final_result {
final_result = Ok(false)
}
}
Err(MemcacheError::CommandError(e)) => {
// Recoverable error. Report it after reading the rest of the responses.
if final_result.is_ok() {
final_result = Err(MemcacheError::CommandError(e));
}
}
Err(e) => return Err(e), // Unrecoverable error. Stop immediately.
}
}

match error_list.into_iter().next() {
None => Ok(all_stored),
Some(e) => Err(e),
}
final_result
}

pub(super) fn version(&mut self) -> Result<String, MemcacheError> {
self.reader.get_mut().write(b"version\r\n")?;
self.reader.get_mut().write_all(b"version\r\n")?;
self.reader.get_mut().flush()?;
self.reader.read_line(|response| {
let response = MemcacheError::try_from(response)?;
Expand Down Expand Up @@ -356,21 +363,23 @@ impl AsciiProtocol<Stream> {
&mut self,
keys: I,
) -> Result<HashMap<String, V>, MemcacheError> {
let keys: Vec<K> = keys.into_iter().collect();
let mut capacity = 0;
// Note: it would be nice to avoid allocation here, but we have to allocate strings
// anyway because the input key type is a reference while the output key type is String.
let keys: Vec<String> = keys.into_iter().map(|s| s.as_ref().to_string()).collect();

for k in keys.iter() {
let key = k.as_ref();
check_key_len(key)?;
capacity += key.len() + 1;
check_key_len(k)?;
}

let mut keystr = String::with_capacity(capacity);
let mut writer = BufWriter::new(self.reader.get_mut());
writer.write_all(b"gets")?;
for k in keys.iter() {
keystr.push(' ');
keystr.push_str(k.as_ref());
writer.write_all(b" ")?;
writer.write_all(k.as_bytes())?;
}

write!(self.reader.get_mut(), "gets{}\r\n", keystr)?;
writer.write_all(b"\r\n")?;
writer.flush()?;
drop(writer);

let mut result: HashMap<String, V> = HashMap::with_capacity(keys.len());
// there will be atmost keys.len() "VALUE <...>" responses and one END response
Expand Down Expand Up @@ -475,22 +484,19 @@ impl AsciiProtocol<Stream> {
keys: I,
) -> Result<Vec<bool>, MemcacheError> {
let mut sent_count = 0;
{
let reader = self.reader.get_mut();
for k in keys.into_iter() {
let key = k.as_ref();
check_key_len(key)?;
write!(reader, "delete {}\r\n", key)?;
sent_count += 1;
}
// Flush now that all the requests have been written.
reader.flush()?;
for k in keys.into_iter() {
let key = k.as_ref();
check_key_len(key)?;
write!(self.reader.get_mut(), "delete {}\r\n", key)?;
sent_count += 1;
}
// Flush now that all the requests have been written.
self.reader.get_mut().flush()?;

// Receive all the responses. If there were errors, return the first.

let mut deleted_list = Vec::with_capacity(sent_count);
let mut error_list: Vec<MemcacheError> = Vec::new();
let mut final_result = Ok(Vec::with_capacity(sent_count));

for _ in 0..sent_count {
let one_result = self
.reader
Expand All @@ -505,16 +511,24 @@ impl AsciiProtocol<Stream> {
Err(MemcacheError::CommandError(CommandError::KeyNotFound)) => Ok(false),
Err(e) => Err(e),
});

match one_result {
Ok(deleted) => deleted_list.push(deleted),
Err(e) => error_list.push(e),
Ok(deleted) => {
if let Ok(deleted_list) = &mut final_result {
deleted_list.push(deleted);
}
}
Err(MemcacheError::CommandError(e)) => {
// Recoverable error. Report it after reading the rest of the responses.
if final_result.is_ok() {
final_result = Err(MemcacheError::CommandError(e));
}
}
Err(e) => return Err(e), // Unrecoverable error. Stop immediately.
}
}

match error_list.into_iter().next() {
None => Ok(deleted_list),
Some(e) => Err(e),
}
final_result
}

pub(super) fn delete(&mut self, key: &str) -> Result<bool, MemcacheError> {
Expand Down Expand Up @@ -559,7 +573,7 @@ impl AsciiProtocol<Stream> {
}

pub(super) fn stats(&mut self) -> Result<Stats, MemcacheError> {
self.reader.get_mut().write(b"stats\r\n")?;
self.reader.get_mut().write_all(b"stats\r\n")?;
self.reader.get_mut().flush()?;

enum Loop {
Expand Down
43 changes: 29 additions & 14 deletions src/protocol/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,24 @@ impl BinaryProtocol {
}
// Flush now that all the requests have been written.
self.stream.flush()?;

// Receive all the responses. If there were errors, return the first.
let mut error_list = Vec::new();
let mut final_result = Ok(());

for _ in 0..sent_count {
match binary_packet::parse_response(&mut self.stream) {
Ok(_) => (),
Err(e) => error_list.push(e),
Err(MemcacheError::CommandError(e)) => {
// Recoverable error. Report it after reading the rest of the responses.
if final_result.is_ok() {
final_result = Err(MemcacheError::CommandError(e));
}
}
Err(e) => return Err(e), // Unrecoverable error. Stop immediately.
};
}
match error_list.into_iter().next() {
None => Ok(()),
Some(e) => Err(e),
}

final_result
}

pub(super) fn version(&mut self) -> Result<String, MemcacheError> {
Expand Down Expand Up @@ -287,19 +293,28 @@ impl BinaryProtocol {
}
// Flush now that all the requests have been written.
self.stream.flush()?;

// Receive all the responses. If there were errors, return the first.
let mut deleted_list = Vec::with_capacity(sent_count);
let mut error_list: Vec<MemcacheError> = Vec::new();
let mut final_result = Ok(Vec::with_capacity(sent_count));

for _ in 0..sent_count {
match binary_packet::parse_delete_response(&mut self.stream) {
Ok(deleted) => deleted_list.push(deleted),
Err(e) => error_list.push(e),
Ok(deleted) => {
if let Ok(deleted_list) = &mut final_result {
deleted_list.push(deleted);
}
}
Err(MemcacheError::CommandError(e)) => {
// Recoverable error. Report it after reading the rest of the responses.
if final_result.is_ok() {
final_result = Err(MemcacheError::CommandError(e));
}
}
Err(e) => return Err(e), // Unrecoverable error. Stop immediately.
}
}
match error_list.into_iter().next() {
None => Ok(deleted_list),
Some(e) => Err(e),
}

final_result
}

pub(super) fn increment(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
Expand Down

0 comments on commit 25ddf82

Please sign in to comment.