Skip to content

Commit

Permalink
Merge 29fa272 into 225f989
Browse files Browse the repository at this point in the history
  • Loading branch information
harrydevnull committed Apr 8, 2017
2 parents 225f989 + 29fa272 commit 85e74f6
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 363 deletions.
12 changes: 5 additions & 7 deletions src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ impl LoadBalancingStrategy {
/// Returns next value for selected load balancing strategy
pub fn next<'a, N>(&'a self, nodes: &'a Vec<N>, i: usize) -> Option<&N> {
println!("node# {:?}", i);
match self {
&LoadBalancingStrategy::Random => {
nodes.iter().nth(self.rnd_idx((0, Some(nodes.len()))))
}
&LoadBalancingStrategy::RoundRobin => {
match *self {
LoadBalancingStrategy::Random => nodes.get(self.rnd_idx((0, Some(nodes.len())))),
LoadBalancingStrategy::RoundRobin => {
let mut cycle = nodes.iter().cycle().skip(i);
cycle.next()
}
Expand Down Expand Up @@ -114,7 +112,7 @@ r2d2::ManageConnection for ClusterConnectionManager<T, X> {

fn connect(&self) -> Result<Self::Connection, Self::Error> {
let transport_res: CResult<X> = self.load_balancer.next()
.ok_or("Cannot get next node".into())
.ok_or_else(|| "Cannot get next node".into())
.and_then(|x| x.try_clone().map_err(|e| e.into()));
let transport = try!(transport_res);
let compression = self.compression.clone();
Expand All @@ -126,7 +124,7 @@ r2d2::ManageConnection for ClusterConnectionManager<T, X> {
fn is_valid(&self, connection: &mut Self::Connection) -> Result<(), Self::Error> {
let query = QueryBuilder::new("SELECT * FROM system.peers;").finalize();

connection.query(query, false, false).map(|_| (()))
connection.query(query, false, false).map(|_| ())
}

fn has_broken(&self, _connection: &mut Self::Connection) -> bool {
Expand Down
38 changes: 19 additions & 19 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ pub enum CompressionError {

impl fmt::Display for CompressionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
&CompressionError::Snappy(ref err) => write!(f, "Snappy Error: {:?}", err),
&CompressionError::Lz4(ref err) => write!(f, "Lz4 Error: {:?}", err),
match *self {
CompressionError::Snappy(ref err) => write!(f, "Snappy Error: {:?}", err),
CompressionError::Lz4(ref err) => write!(f, "Lz4 Error: {:?}", err),
}
}
}

impl Error for CompressionError {
fn description(&self) -> &str {
match self {
&CompressionError::Snappy(ref err) => err.description(),
&CompressionError::Lz4(ref err) => err.description(),
match *self {
CompressionError::Snappy(ref err) => err.description(),
CompressionError::Lz4(ref err) => err.description(),
}
}
}
Expand Down Expand Up @@ -116,10 +116,10 @@ impl Compression {
///
/// ```
pub fn encode(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
&Compression::Lz4 => Compression::encode_lz4(bytes),
&Compression::Snappy => Compression::encode_snappy(bytes),
&Compression::None => Ok(bytes),
match *self {
Compression::Lz4 => Compression::encode_lz4(bytes),
Compression::Snappy => Compression::encode_snappy(bytes),
Compression::None => Ok(bytes),
}
}

Expand All @@ -138,19 +138,19 @@ impl Compression {
/// assert_eq!(lz4_compression.decode(input).unwrap(), bytes);
/// ```
pub fn decode(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
match self {
&Compression::Lz4 => Compression::decode_lz4(bytes),
&Compression::Snappy => Compression::decode_snappy(bytes),
&Compression::None => Ok(bytes),
match *self {
Compression::Lz4 => Compression::decode_lz4(bytes),
Compression::Snappy => Compression::decode_snappy(bytes),
Compression::None => Ok(bytes),
}
}

/// It transforms compression method into a `&str`.
pub fn as_str(&self) -> Option<&'static str> {
match self {
&Compression::Lz4 => Some(LZ4),
&Compression::Snappy => Some(SNAPPY),
&Compression::None => None,
match *self {
Compression::Lz4 => Some(LZ4),
Compression::Snappy => Some(SNAPPY),
Compression::None => None,
}
}

Expand All @@ -169,7 +169,7 @@ impl Compression {
}

fn encode_lz4(bytes: Vec<u8>) -> Result<Vec<u8>> {
return Ok(lz4::compress(bytes.as_slice()));
Ok(lz4::compress(bytes.as_slice()))
}

fn decode_lz4(bytes: Vec<u8>) -> Result<Vec<u8>> {
Expand Down
11 changes: 2 additions & 9 deletions src/frame/frame_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@ use super::super::IntoBytes;
use frame::*;

/// The structure which represents a body of a frame of type `options`.
#[derive(Debug)]
#[derive(Debug,Default)]
pub struct BodyReqOptions;

impl BodyReqOptions {
/// Creates new body of a frame of type `options`
pub fn new() -> BodyReqOptions {
BodyReqOptions {}
}
}

impl IntoBytes for BodyReqOptions {
fn into_cbytes(&self) -> Vec<u8> {
vec![]
Expand All @@ -28,7 +21,7 @@ impl Frame {
// sync client
let stream: u64 = 0;
let opcode = Opcode::Options;
let body = BodyReqOptions::new();
let body: BodyReqOptions = Default::default();

Frame {
version: version,
Expand Down
10 changes: 3 additions & 7 deletions src/frame/frame_ready.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::convert::From;
use super::super::IntoBytes;

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq,Default)]
pub struct BodyResReady;

impl BodyResReady {
pub fn new() -> BodyResReady {
BodyResReady {}
}
}

impl From<Vec<u8>> for BodyResReady {
fn from(_vec: Vec<u8>) -> BodyResReady {
Expand All @@ -29,7 +24,8 @@ mod tests {

#[test]
fn body_res_ready_new() {
assert_eq!(BodyResReady::new(), BodyResReady {});
let body: BodyResReady = Default::default();
assert_eq!(body, BodyResReady {});
}

#[test]
Expand Down
34 changes: 17 additions & 17 deletions src/frame/frame_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,32 @@ pub enum ResponseBody {
impl ResponseBody {
pub fn from(bytes: &[u8], response_type: &Opcode) -> error::Result<ResponseBody> {
let mut cursor: Cursor<&[u8]> = Cursor::new(bytes);
Ok(match response_type {
Ok(match *response_type {
// request frames
&Opcode::Startup => unreachable!(),
&Opcode::Options => unreachable!(),
&Opcode::Query => unreachable!(),
&Opcode::Prepare => unreachable!(),
&Opcode::Execute => unreachable!(),
&Opcode::Register => unreachable!(),
&Opcode::Batch => unreachable!(),
&Opcode::AuthResponse => unreachable!(),
Opcode::Startup => unreachable!(),
Opcode::Options => unreachable!(),
Opcode::Query => unreachable!(),
Opcode::Prepare => unreachable!(),
Opcode::Execute => unreachable!(),
Opcode::Register => unreachable!(),
Opcode::Batch => unreachable!(),
Opcode::AuthResponse => unreachable!(),

// response frames
&Opcode::Error => ResponseBody::Error(CDRSError::from_cursor(&mut cursor)?),
&Opcode::Ready => ResponseBody::Ready(BodyResResultVoid::from_cursor(&mut cursor)?),
&Opcode::Authenticate => {
Opcode::Error => ResponseBody::Error(CDRSError::from_cursor(&mut cursor)?),
Opcode::Ready => ResponseBody::Ready(BodyResResultVoid::from_cursor(&mut cursor)?),
Opcode::Authenticate => {
ResponseBody::Authenticate(BodyResAuthenticate::from_cursor(&mut cursor)?)
}
&Opcode::Supported => {
Opcode::Supported => {
ResponseBody::Supported(BodyResSupported::from_cursor(&mut cursor)?)
}
&Opcode::Result => ResponseBody::Result(ResResultBody::from_cursor(&mut cursor)?),
&Opcode::Event => ResponseBody::Event(BodyResEvent::from_cursor(&mut cursor)?),
&Opcode::AuthChallenge => {
Opcode::Result => ResponseBody::Result(ResResultBody::from_cursor(&mut cursor)?),
Opcode::Event => ResponseBody::Event(BodyResEvent::from_cursor(&mut cursor)?),
Opcode::AuthChallenge => {
ResponseBody::AuthChallenge(BodyResAuthChallenge::from_cursor(&mut cursor)?)
}
&Opcode::AuthSuccess => {
Opcode::AuthSuccess => {
ResponseBody::AuthSuccess(BodyReqAuthSuccess::from_cursor(&mut cursor)?)
}
})
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
//! It's under a hard development as of now.
extern crate snap;
extern crate byteorder;
#[macro_use]
pub mod macros;

#[macro_use]
extern crate log;
extern crate lz4_compress;
Expand Down
Loading

0 comments on commit 85e74f6

Please sign in to comment.