Skip to content

Commit

Permalink
Merge pull request #8 from harrydevnull/feature/fmt
Browse files Browse the repository at this point in the history
Feature/fmt
  • Loading branch information
harrydevnull committed Feb 8, 2017
2 parents 59c7ed0 + 871316d commit ffa7a95
Show file tree
Hide file tree
Showing 44 changed files with 1,032 additions and 1,015 deletions.
15 changes: 9 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
language: rust
cache: cargo
rust:
- stable
- beta
Expand All @@ -7,9 +8,6 @@ matrix:
allow_failures:
- rust: nightly

#before_install:
#- sudo sh -c "echo 'JVM_OPTS=\"\${JVM_OPTS} -Djava.net.preferIPv4Stack=false\"' >> /usr/local/cassandra/conf/cassandra-env.sh"
#- sudo service cassandra start

before_install:
- sudo update-java-alternatives -s java-8-oracle
Expand All @@ -21,9 +19,14 @@ before_install:
- sleep 20


#services:
#- cassandra
install:
- (cargo install rustfmt || true)
- PATH=$PATH:/home/travis/.cargo/bin

script:
- cargo fmt -- --write-mode=diff
- cargo build --verbose
- cargo test --verbose

addons:
apt:
Expand All @@ -39,7 +42,7 @@ addons:
- oracle-java8-installer



after_success:
wget https://github.com/SimonKagstrom/kcov/archive/master.tar.gz &&
tar xzf master.tar.gz &&
Expand Down
1 change: 0 additions & 1 deletion examples/batch_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ fn main() {

println!("batch result {:?}", batched.get_body());
}

2 changes: 1 addition & 1 deletion examples/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ fn main() {

match session.query(create_table_query, with_tracing, with_warnings) {
Ok(ref res) => println!("table created: {:?}", res.get_body()),
Err(ref err) => println!("Error occured: {:?}", err)
Err(ref err) => println!("Error occured: {:?}", err),
}
}
1 change: 0 additions & 1 deletion examples/prepare_execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,3 @@ fn main() {

println!("executed:\n{:?}", executed);
}

1 change: 0 additions & 1 deletion examples/read_table_into_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,3 @@ fn main() {
}

}

10 changes: 2 additions & 8 deletions examples/server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use std::thread;
use cdrs::client::CDRS;
use cdrs::authenticators::PasswordAuthenticator;
use cdrs::compression::Compression;
use cdrs::frame::events::{
SimpleServerEvent,
ServerEvent,
TopologyChangeType
};
use cdrs::frame::events::{SimpleServerEvent, ServerEvent, TopologyChangeType};
use cdrs::transport::TransportPlain;

// default credentials
Expand All @@ -27,9 +23,7 @@ fn main() {

let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap();

thread::spawn(move|| {
listener.start(&Compression::None).unwrap()
});
thread::spawn(move || listener.start(&Compression::None).unwrap());

let topology_changes = stream
// inspects all events in a stream
Expand Down
18 changes: 9 additions & 9 deletions examples/simple_with_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ fn main() {
let with_warnings = false;
let query_op = session.query(select_query, with_tracing, with_warnings);

match query_op {

Ok(res) => {
println!("Result frame: {:?},\nparsed body: {:?}",
res,
res.get_body())
}
Err(err) => println!("{:?}", err),
}
match query_op {

Ok(res) => {
println!("Result frame: {:?},\nparsed body: {:?}",
res,
res.get_body())
}
Err(err) => println!("{:?}", err),
}

}
Err(err) => println!("{:?}", err),
Expand Down
8 changes: 4 additions & 4 deletions src/authenticators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ pub trait Authenticator: Clone {
#[derive(Debug, Clone)]
pub struct PasswordAuthenticator<'a> {
username: &'a str,
password: &'a str
password: &'a str,
}

impl<'a> PasswordAuthenticator<'a> {
pub fn new<'b>(username: &'b str, password: &'b str) -> PasswordAuthenticator<'b> {
return PasswordAuthenticator {
username: username,
password: password
password: password,
};
}
}
Expand Down Expand Up @@ -47,7 +47,6 @@ impl Authenticator for NoneAuthenticator {
fn get_cassandra_name(&self) -> Option<&str> {
return None;
}

}


Expand All @@ -69,7 +68,8 @@ mod tests {
#[test]
fn test_password_authenticator_get_cassandra_name() {
let auth = PasswordAuthenticator::new("foo", "bar");
assert_eq!(auth.get_cassandra_name(), Some("org.apache.cassandra.auth.PasswordAuthenticator"));
assert_eq!(auth.get_cassandra_name(),
Some("org.apache.cassandra.auth.PasswordAuthenticator"));
}

#[test]
Expand Down
123 changes: 63 additions & 60 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ use events::{Listener, EventStream, new_listener};
/// CDRS driver structure that provides a basic functionality to work with DB including
/// establishing new connection, getting supported options, preparing and executing CQL
/// queries, using compression and other.
pub struct CDRS<T: Authenticator,X:CDRSTransport> {
pub struct CDRS<T: Authenticator, X: CDRSTransport> {
compressor: Compression,
authenticator: T,
transport: X
transport: X,
}

/// Map of options supported by Cassandra server.
pub type CassandraOptions = HashMap<String, Vec<String>>;

impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T, X> {
/// The method creates new instance of CDRS driver. At this step an instance doesn't
/// connected to DB Server. To create new instance two parameters are needed to be
/// provided - `addr` is IP address of DB Server, `authenticator` is a selected authenticator
/// that is supported by particular DB Server. There are few authenticators already
/// provided by this trait.
pub fn new(transport: X, authenticator: T) -> CDRS<T,X> {
pub fn new(transport: X, authenticator: T) -> CDRS<T, X> {
return CDRS {
compressor: Compression::None,
authenticator: authenticator,
transport: transport
transport: transport,
};
}

Expand All @@ -54,8 +54,8 @@ impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
.map(|frame| match frame.get_body() {
ResponseBody::Supported(ref supported_body) => {
return supported_body.data.clone();
},
_ => unreachable!()
}
_ => unreachable!(),
});
}

Expand All @@ -66,7 +66,7 @@ impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
/// method provided by CRDR driver, it's `Compression::None` that tells drivers that it
/// should work without compression. If compression is provided then incomming frames
/// will be decompressed automatically.
pub fn start(mut self, compressor: Compression) -> error::Result<Session<T,X>> {
pub fn start(mut self, compressor: Compression) -> error::Result<Session<T, X>> {
self.compressor = compressor;
let startup_frame = Frame::new_req_startup(compressor.as_str()).into_cbytes();

Expand All @@ -83,23 +83,24 @@ impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
.expect("Cassandra Server did communicate that it needed password
authentication but the auth schema was missing in the body response");

//This creates a new scope; avoiding a clone
// This creates a new scope; avoiding a clone
// and we check whether
// 1. any authenticators has been passed in by client and if not send error back
// 2. authenticator is provided by the client and `auth_scheme` presented by the server and client are same
// if not send error back
// 2. authenticator is provided by the client and `auth_scheme` presented by
// the server and client are same if not send error back
// 3. if it falls through it means the preliminary conditions are true

let auth_check = self.authenticator.get_cassandra_name()
let auth_check = self.authenticator
.get_cassandra_name()
.ok_or(error::Error::General("No authenticator was provided ".to_string()))
.map(|auth| {
if authenticator.as_str() != auth {
let io_err = io::Error::new(
io::ErrorKind::NotFound,
format!("Unsupported type of authenticator. {:?} got,
let io_err =
io::Error::new(io::ErrorKind::NotFound,
format!("Unsupported type of authenticator. {:?} got,
but {} is supported.",
authenticator,
authenticator.as_str()));
authenticator,
authenticator.as_str()));
return Err(error::Error::Io(io_err));
}
return Ok(());
Expand All @@ -110,7 +111,8 @@ impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
}

let auth_token_bytes = self.authenticator.get_auth_token().into_cbytes();
try!(self.transport.write(Frame::new_req_auth_response(auth_token_bytes).into_cbytes().as_slice()));
try!(self.transport
.write(Frame::new_req_auth_response(auth_token_bytes).into_cbytes().as_slice()));
try!(parse_frame(&mut self.transport, &compressor));

return Ok(Session::start(self));
Expand All @@ -122,26 +124,27 @@ impl<'a, T: Authenticator + 'a, X: CDRSTransport + 'a> CDRS<T,X> {
}

fn drop_connection(&mut self) -> error::Result<()> {
return self.transport.close(net::Shutdown::Both)
return self.transport
.close(net::Shutdown::Both)
.map_err(|err| error::Error::Io(err));
}
}

/// The object that provides functionality for communication with Cassandra server.
pub struct Session<T: Authenticator,X: CDRSTransport> {
pub struct Session<T: Authenticator, X: CDRSTransport> {
started: bool,
cdrs: CDRS<T,X>,
compressor: Compression
cdrs: CDRS<T, X>,
compressor: Compression,
}

impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
impl<T: Authenticator, X: CDRSTransport> Session<T, X> {
/// Creates new session basing on CDRS instance.
pub fn start(cdrs: CDRS<T,X>) -> Session<T,X> {
pub fn start(cdrs: CDRS<T, X>) -> Session<T, X> {
let compressor = cdrs.compressor.clone();
return Session {
cdrs: cdrs,
started: true,
compressor: compressor
compressor: compressor,
};
}

Expand All @@ -166,12 +169,11 @@ impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
}

/// The method makes a request to DB Server to prepare provided query.
pub fn prepare(
&mut self,
query: String,
with_tracing: bool,
with_warnings: bool
) -> error::Result<Frame> {
pub fn prepare(&mut self,
query: String,
with_tracing: bool,
with_warnings: bool)
-> error::Result<Frame> {
let mut flags = vec![];
if with_tracing {
flags.push(Flag::Tracing);
Expand All @@ -190,13 +192,12 @@ impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
/// The method makes a request to DB Server to execute a query with provided id
/// using provided query parameters. `id` is an ID of a query which Server
/// returns back to a driver as a response to `prepare` request.
pub fn execute(
&mut self,
id: CBytesShort,
query_parameters: QueryParams,
with_tracing: bool,
with_warnings: bool
) -> error::Result<Frame> {
pub fn execute(&mut self,
id: CBytesShort,
query_parameters: QueryParams,
with_tracing: bool,
with_warnings: bool)
-> error::Result<Frame> {

let mut flags = vec![];
if with_tracing {
Expand All @@ -214,15 +215,15 @@ impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
/// The method makes a request to DB Server to execute a query provided in `query` argument.
/// you can build the query with QueryBuilder
/// ```
/// let qb = QueryBuilder::new().query("select * from emp").consistency(Consistency::One).page_size(Some(4));
/// let qb = QueryBuilder::new().query("select * from emp")
/// .consistency(Consistency::One).page_size(Some(4));
/// session.query_with_builder(qb);
/// ```
pub fn query(
&mut self,
query: Query,
with_tracing: bool,
with_warnings: bool
) -> error::Result<Frame> {
pub fn query(&mut self,
query: Query,
with_tracing: bool,
with_warnings: bool)
-> error::Result<Frame> {
let mut flags = vec![];

if with_tracing {
Expand All @@ -234,25 +235,25 @@ impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
}

let query_frame = Frame::new_req_query(query.query,
query.consistency,
query.values,
query.with_names,
query.page_size,
query.paging_state,
query.serial_consistency,
query.timestamp,
flags).into_cbytes();
query.consistency,
query.values,
query.with_names,
query.page_size,
query.paging_state,
query.serial_consistency,
query.timestamp,
flags)
.into_cbytes();

try!(self.cdrs.transport.write(query_frame.as_slice()));
return parse_frame(&mut self.cdrs.transport, &self.compressor);
}

pub fn batch(
&mut self,
batch_query: QueryBatch,
with_tracing: bool,
with_warnings: bool
) -> error::Result<Frame> {
pub fn batch(&mut self,
batch_query: QueryBatch,
with_tracing: bool,
with_warnings: bool)
-> error::Result<Frame> {
let mut flags = vec![];

if with_tracing {
Expand All @@ -270,7 +271,9 @@ impl<T: Authenticator,X: CDRSTransport> Session<T,X> {
}

/// It consumes CDRS
pub fn listen_for<'a>(mut self, events: Vec<SimpleServerEvent>) -> error::Result<(Listener<X>, EventStream)> {
pub fn listen_for<'a>(mut self,
events: Vec<SimpleServerEvent>)
-> error::Result<(Listener<X>, EventStream)> {
let query_frame = Frame::new_req_register(events).into_cbytes();
try!(self.cdrs.transport.write(query_frame.as_slice()));
try!(parse_frame(&mut self.cdrs.transport, &self.compressor));
Expand Down
Loading

0 comments on commit ffa7a95

Please sign in to comment.