Skip to content

Commit

Permalink
Merge pull request #280 from AlexPikalov/refactor/279
Browse files Browse the repository at this point in the history
refactor: make cdrs compatible with Rust edition 2018
  • Loading branch information
AlexPikalov committed Aug 22, 2019
2 parents 661f84b + 50447d5 commit d304532
Show file tree
Hide file tree
Showing 63 changed files with 4,555 additions and 4,306 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -20,7 +20,7 @@ script:
# - cargo fmt -- --write-mode=diff
- cargo build --verbose
- cargo test --verbose
- cargo test --verbose -- --ignored
- cargo test --verbose --features e2e-tests
- cargo test --features ssl

deploy:
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -2,6 +2,7 @@
name = "cdrs"
version = "2.1.1"
authors = ["Alex Pikalov <alex.pikalov.khar@gmail.com>"]
edition = "2018"

description = "Cassandra DB driver written in Rust"
documentation = "https://docs.rs/cdrs"
Expand All @@ -18,7 +19,7 @@ v3 = []
v4 = []
# enable v5 feature when it's actually implemented
# v5 = []
appveyor = []
e2e-tests = []

[dependencies]
byteorder = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/authenticators.rs
@@ -1,4 +1,4 @@
use types::CBytes;
use crate::types::CBytes;

pub trait Authenticator: Clone + Send + Sync {
fn get_auth_token(&self) -> CBytes;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/config_ssl.rs
Expand Up @@ -2,7 +2,7 @@
use openssl::ssl::SslConnector;
use std::time::Duration;

use authenticators::Authenticator;
use crate::authenticators::Authenticator;

/// Cluster configuration that holds per node SSL configs
pub struct ClusterSslConfig<'a, A: Authenticator + Sized>(pub Vec<NodeSslConfig<'a, A>>);
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/config_tcp.rs
@@ -1,6 +1,6 @@
use std::time::Duration;

use authenticators::Authenticator;
use crate::authenticators::Authenticator;

/// Cluster configuration that holds per node TCP configs
pub struct ClusterTcpConfig<'a, A: Authenticator + Sized>(pub Vec<NodeTcpConfig<'a, A>>);
Expand Down
20 changes: 11 additions & 9 deletions src/cluster/mod.rs
Expand Up @@ -11,19 +11,21 @@ mod ssl_connection_pool;
mod tcp_connection_pool;

#[cfg(feature = "ssl")]
pub use cluster::config_ssl::{ClusterSslConfig, NodeSslConfig, NodeSslConfigBuilder};
pub use cluster::config_tcp::{ClusterTcpConfig, NodeTcpConfig, NodeTcpConfigBuilder};
pub use cluster::pager::{QueryPager, SessionPager, PagerState};
pub use crate::cluster::config_ssl::{ClusterSslConfig, NodeSslConfig, NodeSslConfigBuilder};
pub use crate::cluster::config_tcp::{ClusterTcpConfig, NodeTcpConfig, NodeTcpConfigBuilder};
pub use crate::cluster::pager::{PagerState, QueryPager, SessionPager};
#[cfg(feature = "ssl")]
pub use cluster::ssl_connection_pool::{new_ssl_pool, SslConnectionPool, SslConnectionsManager};
pub use cluster::tcp_connection_pool::{
pub use crate::cluster::ssl_connection_pool::{
new_ssl_pool, SslConnectionPool, SslConnectionsManager,
};
pub use crate::cluster::tcp_connection_pool::{
new_tcp_pool, startup, TcpConnectionPool, TcpConnectionsManager,
};

use compression::Compression;
use error;
use query::{BatchExecutor, ExecExecutor, PrepareExecutor, QueryExecutor};
use transport::CDRSTransport;
use crate::compression::Compression;
use crate::error;
use crate::query::{BatchExecutor, ExecExecutor, PrepareExecutor, QueryExecutor};
use crate::transport::CDRSTransport;

/// `GetConnection` trait provides a unified interface for Session to get a connection
/// from a load balancer
Expand Down
170 changes: 98 additions & 72 deletions src/cluster/pager.rs
Expand Up @@ -2,63 +2,75 @@ use r2d2;
use std::cell::RefCell;
use std::marker::PhantomData;

use cluster::CDRSSession;
use error;
use frame::frame_result::{RowsMetadata, RowsMetadataFlag};
use query::{PreparedQuery, QueryParamsBuilder};
use transport::CDRSTransport;
use types::rows::Row;
use types::CBytes;

pub struct SessionPager<'a,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error>
+ Sized,
S: CDRSSession<'static, T, M> + 'a,
T: CDRSTransport + 'static>
{
use crate::cluster::CDRSSession;
use crate::error;
use crate::frame::frame_result::{RowsMetadata, RowsMetadataFlag};
use crate::query::{PreparedQuery, QueryParamsBuilder};
use crate::transport::CDRSTransport;
use crate::types::rows::Row;
use crate::types::CBytes;

pub struct SessionPager<
'a,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M> + 'a,
T: CDRSTransport + 'static,
> {
page_size: i32,
session: &'a S,
transport_type: PhantomData<&'a T>,
connection_type: PhantomData<&'a M>,
}

impl<'a,
'b: 'a,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>,
T: CDRSTransport + 'static> SessionPager<'a, M, S, T>
impl<
'a,
'b: 'a,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>,
T: CDRSTransport + 'static,
> SessionPager<'a, M, S, T>
{
pub fn new(session: &'b S, page_size: i32) -> SessionPager<'a, M, S, T> {
SessionPager { session,
page_size,
transport_type: PhantomData,
connection_type: PhantomData }
SessionPager {
session,
page_size,
transport_type: PhantomData,
connection_type: PhantomData,
}
}

pub fn query_with_pager_state<Q>(&'a mut self,
query: Q,
state: PagerState)
-> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where Q: ToString
pub fn query_with_pager_state<Q>(
&'a mut self,
query: Q,
state: PagerState,
) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where
Q: ToString,
{
QueryPager { pager: self,
pager_state: state,
query }
QueryPager {
pager: self,
pager_state: state,
query,
}
}

pub fn query<Q>(&'a mut self, query: Q) -> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
where Q: ToString
where
Q: ToString,
{
self.query_with_pager_state(query, PagerState::new())
}

pub fn exec_with_pager_state(&'a mut self,
query: &'a PreparedQuery,
state: PagerState)
-> ExecPager<'a, SessionPager<'a, M, S, T>> {
ExecPager { pager: self,
pager_state: state,
query }
pub fn exec_with_pager_state(
&'a mut self,
query: &'a PreparedQuery,
state: PagerState,
) -> ExecPager<'a, SessionPager<'a, M, S, T>> {
ExecPager {
pager: self,
pager_state: state,
query,
}
}

pub fn exec(&'a mut self, query: &'a PreparedQuery) -> ExecPager<'a, SessionPager<'a, M, S, T>> {
Expand All @@ -72,33 +84,37 @@ pub struct QueryPager<'a, Q: ToString, P: 'a> {
query: Q,
}

impl<'a,
Q: ToString,
T: CDRSTransport + 'static,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
impl<
'a,
Q: ToString,
T: CDRSTransport + 'static,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>,
> QueryPager<'a, Q, SessionPager<'a, M, S, T>>
{
pub fn next(&mut self) -> error::Result<Vec<Row>> {
let mut params = QueryParamsBuilder::new().page_size(self.pager.page_size);
if self.pager_state.cursor.is_some() {
params = params.paging_state(self.pager_state.cursor.clone().unwrap());
}

let body = self.pager
.session
.query_with_params(self.query.to_string(), params.finalize())
.and_then(|frame| frame.get_body())?;
let body = self
.pager
.session
.query_with_params(self.query.to_string(), params.finalize())
.and_then(|frame| frame.get_body())?;

let metadata_res: error::Result<RowsMetadata> =
body.as_rows_metadata()
.ok_or("Pager query should yield a vector of rows".into());
let metadata_res: error::Result<RowsMetadata> = body
.as_rows_metadata()
.ok_or("Pager query should yield a vector of rows".into());
let metadata = metadata_res?;

self.pager_state.has_more_pages =
Some(RowsMetadataFlag::has_has_more_pages(metadata.flags.clone()));
self.pager_state.cursor = metadata.paging_state.clone();
body.into_rows()
.ok_or("Pager query should yield a vector of rows".into())
body
.into_rows()
.ok_or("Pager query should yield a vector of rows".into())
}

pub fn has_more(&self) -> bool {
Expand All @@ -118,32 +134,36 @@ pub struct ExecPager<'a, P: 'a> {
query: &'a PreparedQuery,
}

impl<'a,
T: CDRSTransport + 'static,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>> ExecPager<'a, SessionPager<'a, M, S, T>>
impl<
'a,
T: CDRSTransport + 'static,
M: r2d2::ManageConnection<Connection = RefCell<T>, Error = error::Error> + Sized,
S: CDRSSession<'static, T, M>,
> ExecPager<'a, SessionPager<'a, M, S, T>>
{
pub fn next(&mut self) -> error::Result<Vec<Row>> {
let mut params = QueryParamsBuilder::new().page_size(self.pager.page_size);
if self.pager_state.cursor.is_some() {
params = params.paging_state(self.pager_state.cursor.clone().unwrap());
}

let body = self.pager
.session
.exec_with_params(self.query, params.finalize())
.and_then(|frame| frame.get_body())?;
let body = self
.pager
.session
.exec_with_params(self.query, params.finalize())
.and_then(|frame| frame.get_body())?;

let metadata_res: error::Result<RowsMetadata> =
body.as_rows_metadata()
.ok_or("Pager query should yield a vector of rows".into());
let metadata_res: error::Result<RowsMetadata> = body
.as_rows_metadata()
.ok_or("Pager query should yield a vector of rows".into());
let metadata = metadata_res?;

self.pager_state.has_more_pages =
Some(RowsMetadataFlag::has_has_more_pages(metadata.flags.clone()));
self.pager_state.cursor = metadata.paging_state.clone();
body.into_rows()
.ok_or("Pager query should yield a vector of rows".into())
body
.into_rows()
.ok_or("Pager query should yield a vector of rows".into())
}

pub fn has_more(&self) -> bool {
Expand All @@ -165,18 +185,24 @@ pub struct PagerState {

impl PagerState {
pub fn new() -> Self {
PagerState { cursor: None,
has_more_pages: None }
PagerState {
cursor: None,
has_more_pages: None,
}
}

pub fn with_cursor(cursor: CBytes) -> Self {
PagerState { cursor: Some(cursor),
has_more_pages: None }
PagerState {
cursor: Some(cursor),
has_more_pages: None,
}
}

pub fn with_cursor_and_more_flag(cursor: CBytes, has_more: bool) -> Self {
PagerState { cursor: Some(cursor),
has_more_pages: Some(has_more) }
PagerState {
cursor: Some(cursor),
has_more_pages: Some(has_more),
}
}

pub fn has_more(&self) -> bool {
Expand Down
32 changes: 16 additions & 16 deletions src/cluster/session.rs
Expand Up @@ -3,28 +3,28 @@ use std::cell::RefCell;
use std::io::Write;

#[cfg(feature = "ssl")]
use cluster::{new_ssl_pool, ClusterSslConfig, SslConnectionPool};
use cluster::{
use crate::cluster::{new_ssl_pool, ClusterSslConfig, SslConnectionPool};
use crate::cluster::{
new_tcp_pool, startup, CDRSSession, ClusterTcpConfig, GetCompressor, GetConnection,
TcpConnectionPool,
};
use error;
use load_balancing::LoadBalancingStrategy;
use transport::{CDRSTransport, TransportTcp};

use authenticators::Authenticator;
use cluster::SessionPager;
use compression::Compression;
use events::{new_listener, EventStream, Listener};
use frame::events::SimpleServerEvent;
use frame::parser::parse_frame;
use frame::{Frame, IntoBytes};
use query::{BatchExecutor, ExecExecutor, PrepareExecutor, QueryExecutor};
use crate::error;
use crate::load_balancing::LoadBalancingStrategy;
use crate::transport::{CDRSTransport, TransportTcp};

use crate::authenticators::Authenticator;
use crate::cluster::SessionPager;
use crate::compression::Compression;
use crate::events::{new_listener, EventStream, Listener};
use crate::frame::events::SimpleServerEvent;
use crate::frame::parser::parse_frame;
use crate::frame::{Frame, IntoBytes};
use crate::query::{BatchExecutor, ExecExecutor, PrepareExecutor, QueryExecutor};

#[cfg(feature = "ssl")]
use openssl::ssl::SslConnector;
use crate::transport::TransportTls;
#[cfg(feature = "ssl")]
use transport::TransportTls;
use openssl::ssl::SslConnector;

/// CDRS session that holds one pool of authorized connecitons per node.
/// `compression` field contains data compressor that will be used
Expand Down

0 comments on commit d304532

Please sign in to comment.