Skip to content

Commit

Permalink
Merge 7507640 into e532196
Browse files Browse the repository at this point in the history
  • Loading branch information
antonok-edm committed Apr 8, 2020
2 parents e532196 + 7507640 commit 6e9e059
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 127 deletions.
144 changes: 68 additions & 76 deletions src/protocol/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fmt;
use std::io::{Read, Write};

use super::check_key_len;
use super::{check_key_len, ProtocolTrait};
use client::Stats;
use error::{ClientError, CommandError, MemcacheError, ServerError};
use std::borrow::Cow;
Expand Down Expand Up @@ -137,10 +137,6 @@ impl AsciiProtocol<Stream> {
self.reader.get_mut()
}

pub(super) fn auth(&mut self, username: &str, password: &str) -> Result<(), MemcacheError> {
return self.set("auth", format!("{} {}", username, password), 0);
}

fn store<V: ToMemcacheValue<Stream>>(
&mut self,
command: StoreCommand,
Expand Down Expand Up @@ -202,19 +198,6 @@ impl AsciiProtocol<Stream> {
})
}

pub(super) fn version(&mut self) -> Result<String, MemcacheError> {
self.reader.get_mut().write(b"version\r\n")?;
self.reader.get_mut().flush()?;
self.reader.read_line(|response| {
let response = MemcacheError::try_from(response)?;
if !response.starts_with("VERSION") {
Err(ServerError::BadResponse(Cow::Owned(response.into())))?
}
let version = response.trim_start_matches("VERSION ").trim_end_matches("\r\n");
Ok(version.to_string())
})
}

fn parse_ok_response(&mut self) -> Result<(), MemcacheError> {
self.reader.read_line(|response| {
let response = MemcacheError::try_from(response)?;
Expand All @@ -226,35 +209,6 @@ impl AsciiProtocol<Stream> {
})
}

pub(super) fn flush(&mut self) -> Result<(), MemcacheError> {
write!(self.reader.get_mut(), "flush_all\r\n")?;
self.parse_ok_response()
}

pub(super) fn flush_with_delay(&mut self, delay: u32) -> Result<(), MemcacheError> {
write!(self.reader.get_mut(), "flush_all {}\r\n", delay)?;
self.reader.get_mut().flush()?;
self.parse_ok_response()
}

pub(super) fn get<V: FromMemcacheValueExt>(&mut self, key: &str) -> Result<Option<V>, MemcacheError> {
write!(self.reader.get_mut(), "get {}\r\n", key)?;

if let Some((k, v)) = self.parse_get_response(false)? {
if k != key {
Err(ServerError::BadResponse(Cow::Borrowed(
"key doesn't match in the response",
)))?
} else if self.parse_get_response::<V>(false)?.is_none() {
Ok(Some(v))
} else {
Err(ServerError::BadResponse(Cow::Borrowed("Expected end of get response")))?
}
} else {
Ok(None)
}
}

fn parse_get_response<V: FromMemcacheValueExt>(
&mut self,
has_cas: bool,
Expand Down Expand Up @@ -301,7 +255,62 @@ impl AsciiProtocol<Stream> {
}
}

pub(super) fn gets<V: FromMemcacheValueExt>(&mut self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
fn parse_u64_response(&mut self) -> Result<u64, MemcacheError> {
self.reader.read_line(|response| {
let s = MemcacheError::try_from(response)?;
Ok(s.trim_end_matches("\r\n").parse::<u64>()?)
})
}
}

impl ProtocolTrait for AsciiProtocol<Stream> {
fn auth(&mut self, username: &str, password: &str) -> Result<(), MemcacheError> {
return self.set("auth", format!("{} {}", username, password), 0);
}

fn version(&mut self) -> Result<String, MemcacheError> {
self.reader.get_mut().write(b"version\r\n")?;
self.reader.get_mut().flush()?;
self.reader.read_line(|response| {
let response = MemcacheError::try_from(response)?;
if !response.starts_with("VERSION") {
Err(ServerError::BadResponse(Cow::Owned(response.into())))?
}
let version = response.trim_start_matches("VERSION ").trim_end_matches("\r\n");
Ok(version.to_string())
})
}

fn flush(&mut self) -> Result<(), MemcacheError> {
write!(self.reader.get_mut(), "flush_all\r\n")?;
self.parse_ok_response()
}

fn flush_with_delay(&mut self, delay: u32) -> Result<(), MemcacheError> {
write!(self.reader.get_mut(), "flush_all {}\r\n", delay)?;
self.reader.get_mut().flush()?;
self.parse_ok_response()
}

fn get<V: FromMemcacheValueExt>(&mut self, key: &str) -> Result<Option<V>, MemcacheError> {
write!(self.reader.get_mut(), "get {}\r\n", key)?;

if let Some((k, v)) = self.parse_get_response(false)? {
if k != key {
Err(ServerError::BadResponse(Cow::Borrowed(
"key doesn't match in the response",
)))?
} else if self.parse_get_response::<V>(false)?.is_none() {
Ok(Some(v))
} else {
Err(ServerError::BadResponse(Cow::Borrowed("Expected end of get response")))?
}
} else {
Ok(None)
}
}

fn gets<V: FromMemcacheValueExt>(&mut self, keys: &[&str]) -> Result<HashMap<String, V>, MemcacheError> {
for key in keys {
check_key_len(key)?;
}
Expand All @@ -321,7 +330,7 @@ impl AsciiProtocol<Stream> {
Err(ServerError::BadResponse(Cow::Borrowed("Expected end of gets response")))?
}

pub(super) fn cas<V: ToMemcacheValue<Stream>>(
fn cas<V: ToMemcacheValue<Stream>>(
&mut self,
key: &str,
value: V,
Expand All @@ -342,33 +351,23 @@ impl AsciiProtocol<Stream> {
}
}

pub(super) fn set<V: ToMemcacheValue<Stream>>(
&mut self,
key: &str,
value: V,
expiration: u32,
) -> Result<(), MemcacheError> {
fn set<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> {
let options = Options {
exptime: expiration,
..Default::default()
};
self.store(StoreCommand::Set, key, value, &options).map(|_| ())
}

pub(super) fn add<V: ToMemcacheValue<Stream>>(
&mut self,
key: &str,
value: V,
expiration: u32,
) -> Result<(), MemcacheError> {
fn add<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V, expiration: u32) -> Result<(), MemcacheError> {
let options = Options {
exptime: expiration,
..Default::default()
};
self.store(StoreCommand::Add, key, value, &options).map(|_| ())
}

pub(super) fn replace<V: ToMemcacheValue<Stream>>(
fn replace<V: ToMemcacheValue<Stream>>(
&mut self,
key: &str,
value: V,
Expand All @@ -381,19 +380,19 @@ impl AsciiProtocol<Stream> {
self.store(StoreCommand::Replace, key, value, &options).map(|_| ())
}

pub(super) fn append<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
fn append<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
self.store(StoreCommand::Append, key, value, &Default::default())
.map(|_| ())
}

pub(super) fn prepend<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
fn prepend<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
self.store(StoreCommand::Prepend, key, value, &Default::default())
.map(|_| ())
}

pub(super) fn delete(&mut self, key: &str) -> Result<bool, MemcacheError> {
fn delete(&mut self, key: &str) -> Result<bool, MemcacheError> {
check_key_len(key)?;
write!(self.reader.get_mut(), "delete {}\r\n", key)?;
self.reader.get_mut().flush()?;
Expand All @@ -411,26 +410,19 @@ impl AsciiProtocol<Stream> {
})
}

fn parse_u64_response(&mut self) -> Result<u64, MemcacheError> {
self.reader.read_line(|response| {
let s = MemcacheError::try_from(response)?;
Ok(s.trim_end_matches("\r\n").parse::<u64>()?)
})
}

pub(super) fn increment(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
fn increment(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
write!(self.reader.get_mut(), "incr {} {}\r\n", key, amount)?;
self.parse_u64_response()
}

pub(super) fn decrement(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
fn decrement(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
write!(self.reader.get_mut(), "decr {} {}\r\n", key, amount)?;
self.parse_u64_response()
}

pub(super) fn touch(&mut self, key: &str, expiration: u32) -> Result<bool, MemcacheError> {
fn touch(&mut self, key: &str, expiration: u32) -> Result<bool, MemcacheError> {
check_key_len(key)?;
write!(self.reader.get_mut(), "touch {} {}\r\n", key, expiration)?;
self.reader.get_mut().flush()?;
Expand All @@ -448,7 +440,7 @@ impl AsciiProtocol<Stream> {
})
}

pub(super) fn stats(&mut self) -> Result<Stats, MemcacheError> {
fn stats(&mut self) -> Result<Stats, MemcacheError> {
self.reader.get_mut().write(b"stats\r\n")?;
self.reader.get_mut().flush()?;

Expand Down

0 comments on commit 6e9e059

Please sign in to comment.