Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 102 additions & 56 deletions include/cql/cql_builder.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#ifndef CQL_BUILDER_H_
#define CQL_BUILDER_H_


#include <list>
#include <map>
#include <string>
#include <cassert>
#include <boost/asio/ssl.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/asio/ip/address.hpp>

#include "cql/cql_config.hpp"

#include "cql/policies/cql_round_robin_policy.hpp"
#include "cql/policies/cql_exponential_reconnection_policy_t.hpp"
#include "cql/policies/cql_default_retry_policy.hpp"
Expand Down Expand Up @@ -257,6 +254,7 @@ class cql_pooling_options_t
int _max_connections_for_remote;
};


class cql_policies_t {
public:
cql_policies_t() :
Expand All @@ -266,25 +264,50 @@ class cql_policies_t {
boost::posix_time::seconds(1), // base dealy:
boost::posix_time::minutes(10))), // max delay
_retry_policy(new cql_default_retry_policy_t())
{}


cql_policies_t(
{}
cql_policies_t(
boost::shared_ptr<cql_load_balancing_policy_t> load_balancing_policy,
boost::shared_ptr<cql_reconnection_policy_t> reconnection_policy,
boost::shared_ptr<cql_retry_policy_t> retry_policy) :
_load_balancing_policy(load_balancing_policy),
_reconnection_policy(reconnection_policy),
_retry_policy(retry_policy)
{}

inline boost::shared_ptr<cql_load_balancing_policy_t>
{
if( _load_balancing_policy.get() == NULL )
_load_balancing_policy.reset( new cql_round_robin_policy_t() );

if( _reconnection_policy.get() == NULL )
_reconnection_policy.reset( new cql_exponential_reconnection_policy_t( boost::posix_time::seconds(1), boost::posix_time::minutes(10) ) );

if( _retry_policy.get() == NULL )
_retry_policy.reset( new cql_default_retry_policy_t() );
}


void with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t > balancing_policy )
{
_load_balancing_policy = balancing_policy;
}

void with_reconnection_policy( boost::shared_ptr< cql::cql_reconnection_policy_t > reconnection_policy )
{
_reconnection_policy = reconnection_policy;
}

void with_retry_policy( boost::shared_ptr< cql::cql_retry_policy_t > retry_policy )
{
_retry_policy = retry_policy;
}

inline boost::shared_ptr<cql_load_balancing_policy_t>
load_balancing_policy() const
{
{
return _load_balancing_policy;
}

inline boost::shared_ptr<cql_reconnection_policy_t>
}
inline boost::shared_ptr<cql_reconnection_policy_t>
reconnection_policy() const
{
return _reconnection_policy;
Expand All @@ -299,18 +322,20 @@ class cql_policies_t {
private:
friend class cql_configuration_t;

virtual void
init(cql_cluster_t* cluster)
{
virtual void
init(cql_cluster_t* cluster)
{
assert(cluster != NULL);
_load_balancing_policy->init(cluster);
}

}
boost::shared_ptr<cql_load_balancing_policy_t> _load_balancing_policy;
boost::shared_ptr<cql_reconnection_policy_t> _reconnection_policy;
boost::shared_ptr<cql_retry_policy_t> _retry_policy;
};



class cql_configuration_t
{
public:
Expand All @@ -319,16 +344,18 @@ class cql_configuration_t
const cql_client_options_t& client_options,
const cql_protocol_options_t& protocol_options,
const cql_pooling_options_t& pooling_options,
const cql_policies_t& policies,
const cql_credentials_t& credentials) :
const cql_policies_t& policies,
const cql_credentials_t& credentials
) :
_io_service(io_service),
_client_options(client_options),
_protocol_options(protocol_options),
_pooling_options(pooling_options),
_policies(policies),
_policies(policies),
_credentials(credentials)
{}

{
}

inline const cql_protocol_options_t&
protocol_options() const
{
Expand All @@ -345,14 +372,14 @@ class cql_configuration_t
pooling_options() const
{
return _pooling_options;
}

inline const cql_policies_t&
}
inline const cql_policies_t&
policies() const
{
{
return _policies;
}

inline const cql_credentials_t&
credentials() const
{
Expand All @@ -363,24 +390,27 @@ class cql_configuration_t
io_service()
{
return _io_service;
}

}
private:
friend class cql_cluster_impl_t;

void
void
init(cql_cluster_t* cluster)
{
_policies.init(cluster);
}



boost::shared_ptr<boost::asio::io_service> _io_service;
cql_client_options_t _client_options;
cql_protocol_options_t _protocol_options;
cql_pooling_options_t _pooling_options;
cql_policies_t _policies;
cql_credentials_t _credentials;
};
cql_policies_t _policies;
cql_credentials_t _credentials;
};



class cql_initializer_t
{
Expand All @@ -392,6 +422,8 @@ class cql_initializer_t
configuration() = 0;
};



class CQL_EXPORT cql_builder_t :
public cql_initializer_t,
boost::noncopyable
Expand All @@ -418,16 +450,17 @@ class CQL_EXPORT cql_builder_t :
{
return boost::shared_ptr<cql_configuration_t>(
new cql_configuration_t(
_io_service,
cql_client_options_t(_log_callback,_thread_pool_size),
cql_protocol_options_t(
_contact_points,
_ssl_context),
cql_pooling_options_t(),
cql_policies_t(),
_credentials));
}

_io_service,
cql_client_options_t(_log_callback,_thread_pool_size),
cql_protocol_options_t(
_contact_points,
_ssl_context),
cql_pooling_options_t(),
cql_policies_t( _load_balancing_policy, _reconnection_policy, _retry_policy ),
_credentials
));
}

boost::shared_ptr<cql_cluster_t>
build();

Expand Down Expand Up @@ -488,18 +521,31 @@ class CQL_EXPORT cql_builder_t :
{
_thread_pool_size=thread_pool_size;
return *this;
}


private:
}

cql::cql_builder_t&
with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t > load_balancing_policy );

cql::cql_builder_t&
with_reconnection_policy( boost::shared_ptr< cql::cql_reconnection_policy_t > reconnection_policy );

cql::cql_builder_t&
with_retry_policy( boost::shared_ptr< cql::cql_retry_policy_t > retry_policy );

private:
boost::shared_ptr<boost::asio::io_service> _io_service;
std::list<cql_endpoint_t> _contact_points;
boost::shared_ptr<boost::asio::ssl::context> _ssl_context;
cql_connection_t::cql_log_callback_t _log_callback;
cql_credentials_t _credentials;
int _thread_pool_size;
};

} // namespace cql

#endif // CQL_BUILDER_H_

boost::shared_ptr<cql_load_balancing_policy_t> _load_balancing_policy;
boost::shared_ptr<cql_reconnection_policy_t> _reconnection_policy;
boost::shared_ptr<cql_retry_policy_t> _retry_policy;
};

} // namespace cql

#endif // CQL_BUILDER_H_

4 changes: 2 additions & 2 deletions include/cql/cql_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ namespace cql {
private:

};

class cql_metadata_t: boost::noncopyable {
class CQL_EXPORT cql_metadata_t: boost::noncopyable {
public:
typedef
boost::signals2::signal<void(boost::shared_ptr<cql_host_state_changed_info_t>)>
Expand Down
48 changes: 48 additions & 0 deletions include/cql/policies/cql_constant_reconnection_policy_t.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_
#define CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_

#include <boost/date_time/posix_time/posix_time.hpp>
#include "cql/policies/cql_reconnection_policy.hpp"

namespace cql {
class cql_constant_reconnection_schedule_t;

class CQL_EXPORT cql_constant_reconnection_policy_t
: public cql_reconnection_policy_t
{
public:

inline boost::posix_time::time_duration
base_delay() const { return _base_delay; }

virtual boost::shared_ptr<cql_reconnection_schedule_t>
new_schedule();

cql_constant_reconnection_policy_t(
const boost::posix_time::time_duration& base_delay );

private:
boost::posix_time::time_duration _base_delay;
};



class CQL_EXPORT cql_constant_reconnection_schedule_t
: public cql_reconnection_schedule_t
{
public:
virtual boost::posix_time::time_duration
get_delay();

private:
cql_constant_reconnection_schedule_t(
boost::posix_time::time_duration const & base_delay ) :
_base_delay( base_delay ) {}

friend class cql_constant_reconnection_policy_t;
boost::posix_time::time_duration const _base_delay;
};
}

#endif /* CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_ */

Loading