diff --git a/include/cql/cql_builder.hpp b/include/cql/cql_builder.hpp index e419e3790..c4852e60f 100644 --- a/include/cql/cql_builder.hpp +++ b/include/cql/cql_builder.hpp @@ -1,7 +1,6 @@ #ifndef CQL_BUILDER_H_ #define CQL_BUILDER_H_ - #include #include #include @@ -9,9 +8,7 @@ #include #include #include - #include "cql/cql_config.hpp" - #include "cql/policies/cql_round_robin_policy.hpp" #include "cql/policies/cql_exponential_reconnection_policy_t.hpp" #include "cql/policies/cql_default_retry_policy.hpp" @@ -257,6 +254,7 @@ class cql_pooling_options_t int _max_connections_for_remote; }; + class cql_policies_t { public: cql_policies_t() : @@ -266,25 +264,50 @@ class cql_policies_t { boost::posix_time::seconds(1), // base dealy: boost::posix_time::minutes(10))), // max delay _retry_policy(new cql_default_retry_policy_t()) - {} - - - cql_policies_t( + {} + + + cql_policies_t( boost::shared_ptr load_balancing_policy, boost::shared_ptr reconnection_policy, boost::shared_ptr retry_policy) : _load_balancing_policy(load_balancing_policy), _reconnection_policy(reconnection_policy), _retry_policy(retry_policy) - {} - - inline boost::shared_ptr + { + if( _load_balancing_policy.get() == NULL ) + _load_balancing_policy.reset( new cql_round_robin_policy_t() ); + + if( _reconnection_policy.get() == NULL ) + _reconnection_policy.reset( new cql_exponential_reconnection_policy_t( boost::posix_time::seconds(1), boost::posix_time::minutes(10) ) ); + + if( _retry_policy.get() == NULL ) + _retry_policy.reset( new cql_default_retry_policy_t() ); + } + + + void with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t > balancing_policy ) + { + _load_balancing_policy = balancing_policy; + } + + void with_reconnection_policy( boost::shared_ptr< cql::cql_reconnection_policy_t > reconnection_policy ) + { + _reconnection_policy = reconnection_policy; + } + + void with_retry_policy( boost::shared_ptr< cql::cql_retry_policy_t > retry_policy ) + { + _retry_policy = retry_policy; + } + + inline boost::shared_ptr load_balancing_policy() const - { + { return _load_balancing_policy; - } - - inline boost::shared_ptr + } + + inline boost::shared_ptr reconnection_policy() const { return _reconnection_policy; @@ -299,18 +322,20 @@ class cql_policies_t { private: friend class cql_configuration_t; - virtual void - init(cql_cluster_t* cluster) - { + virtual void + init(cql_cluster_t* cluster) + { assert(cluster != NULL); _load_balancing_policy->init(cluster); - } - + } + boost::shared_ptr _load_balancing_policy; boost::shared_ptr _reconnection_policy; boost::shared_ptr _retry_policy; }; + + class cql_configuration_t { public: @@ -319,16 +344,18 @@ class cql_configuration_t const cql_client_options_t& client_options, const cql_protocol_options_t& protocol_options, const cql_pooling_options_t& pooling_options, - const cql_policies_t& policies, - const cql_credentials_t& credentials) : + const cql_policies_t& policies, + const cql_credentials_t& credentials + ) : _io_service(io_service), _client_options(client_options), _protocol_options(protocol_options), _pooling_options(pooling_options), - _policies(policies), + _policies(policies), _credentials(credentials) - {} - + { + } + inline const cql_protocol_options_t& protocol_options() const { @@ -345,14 +372,14 @@ class cql_configuration_t pooling_options() const { return _pooling_options; - } - - inline const cql_policies_t& + } + + inline const cql_policies_t& policies() const - { + { return _policies; } - + inline const cql_credentials_t& credentials() const { @@ -363,24 +390,27 @@ class cql_configuration_t io_service() { return _io_service; - } - + } + private: friend class cql_cluster_impl_t; - - void + + void init(cql_cluster_t* cluster) { _policies.init(cluster); } - + + boost::shared_ptr _io_service; cql_client_options_t _client_options; cql_protocol_options_t _protocol_options; cql_pooling_options_t _pooling_options; - cql_policies_t _policies; - cql_credentials_t _credentials; -}; + cql_policies_t _policies; + cql_credentials_t _credentials; +}; + + class cql_initializer_t { @@ -392,6 +422,8 @@ class cql_initializer_t configuration() = 0; }; + + class CQL_EXPORT cql_builder_t : public cql_initializer_t, boost::noncopyable @@ -418,16 +450,17 @@ class CQL_EXPORT cql_builder_t : { return boost::shared_ptr( new cql_configuration_t( - _io_service, - cql_client_options_t(_log_callback,_thread_pool_size), - cql_protocol_options_t( - _contact_points, - _ssl_context), - cql_pooling_options_t(), - cql_policies_t(), - _credentials)); - } - + _io_service, + cql_client_options_t(_log_callback,_thread_pool_size), + cql_protocol_options_t( + _contact_points, + _ssl_context), + cql_pooling_options_t(), + cql_policies_t( _load_balancing_policy, _reconnection_policy, _retry_policy ), + _credentials + )); + } + boost::shared_ptr build(); @@ -488,18 +521,31 @@ class CQL_EXPORT cql_builder_t : { _thread_pool_size=thread_pool_size; return *this; - } - - -private: + } + + cql::cql_builder_t& + with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t > load_balancing_policy ); + + cql::cql_builder_t& + with_reconnection_policy( boost::shared_ptr< cql::cql_reconnection_policy_t > reconnection_policy ); + + cql::cql_builder_t& + with_retry_policy( boost::shared_ptr< cql::cql_retry_policy_t > retry_policy ); + +private: boost::shared_ptr _io_service; std::list _contact_points; boost::shared_ptr _ssl_context; cql_connection_t::cql_log_callback_t _log_callback; cql_credentials_t _credentials; int _thread_pool_size; -}; - -} // namespace cql - -#endif // CQL_BUILDER_H_ + + boost::shared_ptr _load_balancing_policy; + boost::shared_ptr _reconnection_policy; + boost::shared_ptr _retry_policy; +}; + +} // namespace cql + +#endif // CQL_BUILDER_H_ + diff --git a/include/cql/cql_metadata.hpp b/include/cql/cql_metadata.hpp index d83b1337e..9a9232fb1 100644 --- a/include/cql/cql_metadata.hpp +++ b/include/cql/cql_metadata.hpp @@ -100,8 +100,8 @@ namespace cql { private: }; - - class cql_metadata_t: boost::noncopyable { + + class CQL_EXPORT cql_metadata_t: boost::noncopyable { public: typedef boost::signals2::signal)> diff --git a/include/cql/policies/cql_constant_reconnection_policy_t.hpp b/include/cql/policies/cql_constant_reconnection_policy_t.hpp new file mode 100644 index 000000000..261300b02 --- /dev/null +++ b/include/cql/policies/cql_constant_reconnection_policy_t.hpp @@ -0,0 +1,48 @@ +#ifndef CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_ +#define CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_ + +#include +#include "cql/policies/cql_reconnection_policy.hpp" + +namespace cql { + class cql_constant_reconnection_schedule_t; + + class CQL_EXPORT cql_constant_reconnection_policy_t + : public cql_reconnection_policy_t + { + public: + + inline boost::posix_time::time_duration + base_delay() const { return _base_delay; } + + virtual boost::shared_ptr + new_schedule(); + + cql_constant_reconnection_policy_t( + const boost::posix_time::time_duration& base_delay ); + + private: + boost::posix_time::time_duration _base_delay; + }; + + + + class CQL_EXPORT cql_constant_reconnection_schedule_t + : public cql_reconnection_schedule_t + { + public: + virtual boost::posix_time::time_duration + get_delay(); + + private: + cql_constant_reconnection_schedule_t( + boost::posix_time::time_duration const & base_delay ) : + _base_delay( base_delay ) {} + + friend class cql_constant_reconnection_policy_t; + boost::posix_time::time_duration const _base_delay; + }; +} + +#endif /* CQL_CONSTANT_RECONNECTION_POLICY_T_HPP_ */ + diff --git a/include/cql/policies/cql_dcaware_round_robin_balancing_policy.hpp b/include/cql/policies/cql_dcaware_round_robin_balancing_policy.hpp new file mode 100644 index 000000000..9b6e44a27 --- /dev/null +++ b/include/cql/policies/cql_dcaware_round_robin_balancing_policy.hpp @@ -0,0 +1,74 @@ +#ifndef CQL_DCAWARE_ROUND_ROBIN_BALANCING_POLICY_HPP_ +#define CQL_DCAWARE_ROUND_ROBIN_BALANCING_POLICY_HPP_ + +#include "cql/policies/cql_round_robin_policy.hpp" + +namespace cql { + class cql_host_t; + class cql_cluster_t; + + std::string DC( std::string const & dc, std::string localDc ); // conwert the data center name if the name is incorrect. + + class CQL_EXPORT cql_dcaware_round_robin_balancing_policy_t : public cql_load_balancing_policy_t + { + public: + + cql_dcaware_round_robin_balancing_policy_t( std::string localDc ) : + _usedHostsPerRemoteDc( 0 ), + _index2( 0 ), + _cluster2( NULL ), + _localDc( localDc ) {} + + cql_dcaware_round_robin_balancing_policy_t( std::string localDc, int usedHostsPerRemoteDc ) : + _usedHostsPerRemoteDc( usedHostsPerRemoteDc ), + _index2( 0 ), + _cluster2( NULL ), + _localDc( localDc ) {} + + virtual boost::shared_ptr + new_query_plan( + const boost::shared_ptr& query); + + virtual void + init( + cql_cluster_t* cluster); + + virtual cql::cql_host_distance_enum + distance(const cql::cql_host_t& host); + + private: + + std::string _localDc; + boost::mutex _mutex2; + cql_cluster_t * _cluster2; // need to have its own version of this pointer + int _usedHostsPerRemoteDc; + int _index2; + }; + + + + class CQL_EXPORT cql_dcaware_round_robin_query_plan_t : public cql_round_robin_query_plan_t + { + public: + + cql_dcaware_round_robin_query_plan_t( + const cql_cluster_t* cluster2, + unsigned index2, + std::string localDc, + int usedHostsPerRemoteDc + ); // the only one address the query will go to + + virtual boost::shared_ptr next_host_to_query(); // Returns next host to query. + + private: + boost::mutex _mutex2; + std::string _localDc; // the address IP of the node the query will go to + const cql_cluster_t* _cluster2; // + int _index2; // + std::map< std::string, long > _dcQueryCount; // how many times we queried each data center. + int _usedHostsPerRemoteDc; + }; + +} + +#endif // CQL_DCAWARE_ROUND_ROBIN_BALANCING_POLICY_HPP_ \ No newline at end of file diff --git a/include/cql/policies/cql_downgrading_consistency_retry_policy.hpp b/include/cql/policies/cql_downgrading_consistency_retry_policy.hpp new file mode 100644 index 000000000..9a561a1ae --- /dev/null +++ b/include/cql/policies/cql_downgrading_consistency_retry_policy.hpp @@ -0,0 +1,52 @@ +#ifndef CQL_DEFAULT_RETRY_POLICY_H_ +#define CQL_DEFAULT_RETRY_POLICY_H_ + +#include +#include + +#include "cql/policies/cql_retry_policy.hpp" + +namespace cql { + class cql_downgrading_consistency_retry_policy_t: + public cql_retry_policy_t, + boost::noncopyable + { + public: + virtual cql_retry_decision_t + read_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_responses, + int received_responses, + bool data_retrieved, + int retry_number); + + virtual cql_retry_decision_t + write_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + const std::string& write_type, + int required_acks, + int received_acks, + int retry_number + ); + + virtual cql_retry_decision_t + unavailable( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_replica, + int alive_replica, + int retry_number); + + cql_downgrading_consistency_retry_policy_t() {}; + + private: + + cql::cql_retry_decision_t + cql::cql_downgrading_consistency_retry_policy_t::max_likely_to_work_cl( int knownOk ); + + }; +} + +#endif diff --git a/include/cql/policies/cql_fallthrough_retry_policy.hpp b/include/cql/policies/cql_fallthrough_retry_policy.hpp new file mode 100644 index 000000000..49dd0fa14 --- /dev/null +++ b/include/cql/policies/cql_fallthrough_retry_policy.hpp @@ -0,0 +1,45 @@ +#ifndef CQL_DEFAULT_RETRY_POLICY_H_ +#define CQL_DEFAULT_RETRY_POLICY_H_ + +#include +#include +#include "cql/policies/cql_retry_policy.hpp" + +namespace cql { + class cql_fallthrough_retry_policy_t: + public cql_retry_policy_t, + boost::noncopyable + { + public: + virtual cql_retry_decision_t + read_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_responses, + int received_responses, + bool data_retrieved, + int retry_number); + + virtual cql_retry_decision_t + write_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + const std::string& write_type, + int required_acks, + int received_acks, + int retry_number + ); + + virtual cql_retry_decision_t + unavailable( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_replica, + int alive_replica, + int retry_number); + + cql_fallthrough_retry_policy_t() { }; + }; +} + +#endif diff --git a/include/cql/policies/cql_round_robin_policy.hpp b/include/cql/policies/cql_round_robin_policy.hpp index d9889ee3a..fea7f087d 100644 --- a/include/cql/policies/cql_round_robin_policy.hpp +++ b/include/cql/policies/cql_round_robin_policy.hpp @@ -29,8 +29,8 @@ namespace cql { unsigned _current; std::vector > _hosts; }; - - class cql_round_robin_policy_t : + + class CQL_EXPORT cql_round_robin_policy_t : public cql_load_balancing_policy_t, boost::noncopyable { diff --git a/src/cql/cql_builder.cpp b/src/cql/cql_builder.cpp index 407492f59..7be53b28c 100644 --- a/src/cql/cql_builder.cpp +++ b/src/cql/cql_builder.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ #include -#include - +#include + #include "cql/cql_cluster.hpp" #include "cql/cql_builder.hpp" #include "cql/internal/cql_util.hpp" @@ -105,4 +105,47 @@ cql::cql_builder_t::with_credentials( _credentials = credentials; return *this; -} +} + + +cql::cql_builder_t& cql::cql_builder_t::with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t > load_balancing_policy ) +{ + if( load_balancing_policy == NULL ) + return *this; + + if( load_balancing_policy == nullptr ) + return *this; + + _load_balancing_policy = load_balancing_policy; + + return *this; +} + + +cql::cql_builder_t& cql::cql_builder_t::with_reconnection_policy( boost::shared_ptr< cql::cql_reconnection_policy_t > reconnection_policy ) +{ + + if( reconnection_policy == NULL ) + return *this; + + if( reconnection_policy == nullptr ) + return *this; + + _reconnection_policy = reconnection_policy; + return *this; +} + + +cql::cql_builder_t& cql::cql_builder_t::with_retry_policy( boost::shared_ptr< cql::cql_retry_policy_t > retry_policy ) +{ + if( retry_policy == NULL ) + return *this; + + if( retry_policy == nullptr ) + return *this; + + _retry_policy = retry_policy; + return *this; +} + + diff --git a/src/cql/policies/cql_constant_reconnection_policy_t.cpp b/src/cql/policies/cql_constant_reconnection_policy_t.cpp new file mode 100644 index 000000000..8efcb1675 --- /dev/null +++ b/src/cql/policies/cql_constant_reconnection_policy_t.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2013 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "cql/policies/cql_constant_reconnection_policy_t.hpp" + +boost::posix_time::time_duration +cql::cql_constant_reconnection_schedule_t::get_delay() +{ + return _base_delay; +} + + +cql::cql_constant_reconnection_policy_t::cql_constant_reconnection_policy_t( + boost::posix_time::time_duration const & base_delay ) : + _base_delay(base_delay) +{ + if(base_delay.is_negative()) + throw std::invalid_argument("base_delay cannot be negative."); + + if(base_delay.total_milliseconds() == 0) + throw std::invalid_argument("base_delay must be at least 1 milisecond long."); +} + + +boost::shared_ptr +cql::cql_constant_reconnection_policy_t::new_schedule() +{ + return boost::shared_ptr( + new cql_constant_reconnection_schedule_t( base_delay() )); +} + + \ No newline at end of file diff --git a/src/cql/policies/cql_dcaware_round_robin_balancing_policy.cpp b/src/cql/policies/cql_dcaware_round_robin_balancing_policy.cpp new file mode 100644 index 000000000..fcacbcae1 --- /dev/null +++ b/src/cql/policies/cql_dcaware_round_robin_balancing_policy.cpp @@ -0,0 +1,210 @@ + +//#ifdef CQL_UNIT_TESTS //// In unit test we must use a mock-object to simulate hundreds of virtual nodes. +// #include "../test/integration_tests/include/dcaware_test_metadata_mock_object.hpp" +//#endif + +#include "cql/policies/cql_dcaware_round_robin_balancing_policy.hpp" +#include "cql/cql_host.hpp" +#include "cql/cql_cluster.hpp" +#include "cql/cql_metadata.hpp" + +std::string cql::DC( std::string const & dc, std::string localDc ) // conwert the data center name if the name is incorrect. +{ + if( dc == "" || dc == "unknown" ) // if there is something incorrect with the name of the datacenter, convert it to the local data center name. + return localDc; + + return dc; +} + + +boost::shared_ptr +cql::cql_dcaware_round_robin_balancing_policy_t::new_query_plan( + const boost::shared_ptr&) +{ + boost::mutex::scoped_lock lock( _mutex2 ); + return boost::shared_ptr( new cql_dcaware_round_robin_query_plan_t( _cluster2, ++_index2, _localDc, _usedHostsPerRemoteDc ) ); +} + + +cql::cql_host_distance_enum + cql::cql_dcaware_round_robin_balancing_policy_t::distance(const cql::cql_host_t& host) +{ + if( host.datacenter().empty() ) + return cql::CQL_HOST_DISTANCE_LOCAL; + + if( DC( host.datacenter(), _localDc ) == _localDc ) + return cql::CQL_HOST_DISTANCE_LOCAL; + + std::vector > collection; + _cluster2->metadata()->get_hosts( collection ); + int const host_size = collection.size(); + + int ix = 0; + + for( int i = 0; i < host_size; ++i ) + { + if( collection[ i ]->address() == host.address() ) + { + if ( ix < _usedHostsPerRemoteDc) + return cql::CQL_HOST_DISTANCE_REMOTE; + else + return cql::CQL_HOST_DISTANCE_IGNORE; + } + else if( collection[ i ]->datacenter() == host.datacenter() ) + { + ix++; + } + } + return cql::CQL_HOST_DISTANCE_IGNORE; +} + + + +void +cql::cql_dcaware_round_robin_balancing_policy_t::init(cql_cluster_t* cluster2) // j.kasprzak - zmiany. +{ + boost::mutex::scoped_lock lock(_mutex2); + _cluster2 = cluster2; + + if( _cluster2->metadata().get() != NULL ) //// if we here know the number of nodes, + { //// set at the beginning the starting node as random. + std::vector > collection; //// It should not start always with the same node. + _cluster2->metadata()->get_hosts( collection ); + int const host_size = collection.size(); + + if( host_size > 0 ) + { + _index2 = rand() % host_size; + } + } + else + { + _index2 = rand() % 1000; // set a random starting point. It should start differently every start. + } +} + + +cql::cql_dcaware_round_robin_query_plan_t::cql_dcaware_round_robin_query_plan_t( + const cql_cluster_t* cluster2, + unsigned index2, + std::string localDc, + int usedHostsPerRemoteDc + ) : + cql_round_robin_query_plan_t( cluster2, index2 ), + _localDc( localDc ), + _cluster2( cluster2), + _index2( index2 ), + _usedHostsPerRemoteDc( usedHostsPerRemoteDc ) +{ +} + + + +boost::shared_ptr +cql::cql_dcaware_round_robin_query_plan_t::next_host_to_query() +{ + boost::mutex::scoped_lock lock( _mutex2 ); + + if( _cluster2->metadata().get() == NULL ) //// Check if the metadata pointer is NULL. + return boost::shared_ptr(); //// + + std::vector > collection_hosts; //// all hosts. + _cluster2->metadata()->get_hosts( collection_hosts ); + + if( collection_hosts.empty() ) + return boost::shared_ptr(); //// there are no hosts available for this cluster. + + std::vector > local_hosts; //// local hosts + std::vector > remote_hosts; //// remote hosts + + for( int i = 0; i < collection_hosts.size(); ++i ) //// filter which divides hosts into two vectors: local and remote + { + if( !collection_hosts[ i ]->is_considerably_up() ) + continue; + + if( DC( collection_hosts[ i ]->datacenter(), _localDc ) == _localDc ) + { + local_hosts.push_back( collection_hosts[ i ] ); + } + else + { + remote_hosts.push_back( collection_hosts[ i ] ); + } + } + + ++_index2; // try the next host from the vector of hosts if we look in the local hosts. It must be a round robin algorithm. + + int const size_of_local( local_hosts.size() ); + if( !local_hosts.empty() ) + { + return local_hosts[ _index2 % size_of_local ]; // take the next host from the local pool + } + + if( remote_hosts.empty() ) + return boost::shared_ptr(); + + if( _usedHostsPerRemoteDc == 0 ) + return boost::shared_ptr(); + + int const remote_size( remote_hosts.size() ); + + bool const IS_RANDOM_SELECTION( true ); //// USE IMPROVED ALGORITHM WITH RANDOM SELECTION OF NODES AND DATACENTERS. + + if( IS_RANDOM_SELECTION ) + { + _index2 = rand() % remote_size; //// for every query to a remote data center take a random taken node. + + for( int j = 0; j < 10; ++j ) //// make ten tries of totaly random index. + { + std::string const data_center_name = DC( remote_hosts[ _index2 ]->datacenter(), _localDc ); + + std::map< std::string, long >::iterator p = _dcQueryCount.find( data_center_name ); + + if( p == _dcQueryCount.end() ) //// it is the first query to a host from this datacenter. + { + _dcQueryCount.insert( std::make_pair( data_center_name, 1 ) ); + return remote_hosts[ _index2 ]; + } + else + { + if( p->second < _usedHostsPerRemoteDc ) + { + ++( p->second ); // increment the number of usage hosts from this data center. + return remote_hosts[ _index2 ]; + } + } + _index2 = rand() % remote_size; //// for every query to a remote data center take a random taken node. + } + } + + + //// After 10 unsuccessful random indexes, make the linear search through the sorted array. + //// It gives the lowest address for the next available dataceter much more frequently than the high IP address. + for( int i = 0; i < remote_size; ++i ) // check each node from romote data centers. + { + int index3 = ( _index2 + i ) % remote_size; //// loop through the remote hosts. + std::string const data_center_name = DC( remote_hosts[ index3 ]->datacenter(), _localDc ); + + std::map< std::string, long >::iterator p = _dcQueryCount.find( data_center_name ); + + if( p == _dcQueryCount.end() ) //// it is the first query to a host from this datacenter. + { + _dcQueryCount.insert( std::make_pair( data_center_name, 1 ) ); + _index2 += i; + return remote_hosts[ index3 ]; + } + else + { + if( p->second < _usedHostsPerRemoteDc ) + { + ++( p->second ); // increment the number of usage hosts from this data center. + _index2 += i; + return remote_hosts[ index3 ]; + } + } + } + + return boost::shared_ptr(); //// return NULL. Not found a host. +} + + \ No newline at end of file diff --git a/src/cql/policies/cql_downgrading_consistency_retry_policy.cpp b/src/cql/policies/cql_downgrading_consistency_retry_policy.cpp new file mode 100644 index 000000000..129720dc9 --- /dev/null +++ b/src/cql/policies/cql_downgrading_consistency_retry_policy.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2013 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cql/policies/cql_downgrading_consistency_retry_policy.hpp" + +cql::cql_retry_decision_t +cql::cql_downgrading_consistency_retry_policy_t::read_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_responses, + int received_responses, + bool data_retrieved, + int retry_number + ) +{ + if( retry_number != 0 ) + return cql_retry_decision_t::rethrow_decision(); + + if (received_responses < required_responses) + { + return max_likely_to_work_cl(received_responses); + } + + return !data_retrieved ? cql_retry_decision_t::retry_decision_with( consistency ) : cql_retry_decision_t::rethrow_decision(); +} + +cql::cql_retry_decision_t +cql::cql_downgrading_consistency_retry_policy_t::write_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + const std::string& write_type, + int required_acks, + int received_acks, + int retry_number + ) +{ + if (retry_number != 0) + return cql_retry_decision_t::rethrow_decision(); + + if( write_type == "SIMPLE" || write_type == "BATCH" ) + { + // Since we provide atomicity there is no point in retrying + return cql_retry_decision_t::ignore(); + } + else if( write_type == "COUNTER" ) + { + // We should not retry counters, period! + return cql_retry_decision_t::ignore(); + } + else if( write_type == "UNLOGGED_BATCH" ) + { + // Since only part of the batch could have been persisted, + // retry with whatever consistency should allow to persist all + return max_likely_to_work_cl( received_acks ); + } + else if( write_type == "BATCH_LOG" ) + { + return cql_retry_decision_t::retry_decision_with( consistency ); + } + + return cql_retry_decision_t::rethrow_decision(); +} + + +cql::cql_retry_decision_t +cql::cql_downgrading_consistency_retry_policy_t::unavailable( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_replica, + int alive_replica, + int retry_number) +{ + return retry_number != 0 ? cql_retry_decision_t::rethrow_decision() : max_likely_to_work_cl( alive_replica ); +} + + +cql::cql_retry_decision_t +cql::cql_downgrading_consistency_retry_policy_t::max_likely_to_work_cl(int knownOk) +{ + if (knownOk >= 3) + return cql_retry_decision_t::retry_decision_with( cql_consistency_enum::CQL_CONSISTENCY_THREE ); + else if (knownOk >= 2) + return cql_retry_decision_t::retry_decision_with( cql_consistency_enum::CQL_CONSISTENCY_TWO ); + else if (knownOk >= 1) + return cql_retry_decision_t::retry_decision_with( cql_consistency_enum::CQL_CONSISTENCY_ONE ); + else + return cql_retry_decision_t::rethrow_decision(); +} + + diff --git a/src/cql/policies/cql_fallthrough_retry_policy.cpp b/src/cql/policies/cql_fallthrough_retry_policy.cpp new file mode 100644 index 000000000..f869b05ac --- /dev/null +++ b/src/cql/policies/cql_fallthrough_retry_policy.cpp @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2013 DataStax Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cql/policies/cql_fallthrough_retry_policy.hpp" + +cql::cql_retry_decision_t +cql::cql_fallthrough_retry_policy_t::read_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_responses, + int received_responses, + bool data_retrieved, + int retry_number + ) +{ + return cql_retry_decision_t::rethrow_decision(); +} + +cql::cql_retry_decision_t +cql::cql_fallthrough_retry_policy_t::write_timeout( + const cql_query_t& query, + cql_consistency_enum consistency, + const std::string& write_type, + int required_acks, + int received_acks, + int retry_number + ) +{ + return cql_retry_decision_t::rethrow_decision(); +} + +cql::cql_retry_decision_t +cql::cql_fallthrough_retry_policy_t::unavailable( + const cql_query_t& query, + cql_consistency_enum consistency, + int required_replica, + int alive_replica, + int retry_number) +{ + return cql_retry_decision_t::rethrow_decision(); +} + + diff --git a/test/integration_tests/include/consistency_tests.hpp b/test/integration_tests/include/consistency_tests.hpp new file mode 100644 index 000000000..c227cd38c --- /dev/null +++ b/test/integration_tests/include/consistency_tests.hpp @@ -0,0 +1,11 @@ +#include + +namespace cql +{ + class cql_ccm_bridge_t; + class cql_builder_t; +} + +void ContinueTheConsistencyTest( + boost::shared_ptr ccm, + boost::shared_ptr builder ); diff --git a/test/integration_tests/include/one_node_balancing.hpp b/test/integration_tests/include/one_node_balancing.hpp new file mode 100644 index 000000000..750593efa --- /dev/null +++ b/test/integration_tests/include/one_node_balancing.hpp @@ -0,0 +1,51 @@ +#ifndef CQL_ONE_NODE_BALANCING_POLICY_HPP_ +#define CQL_ONE_NODE_BALANCING_POLICY_HPP_ + +#include "cql/policies/cql_round_robin_policy.hpp" + +namespace cql { + class cql_host_t; + class cql_cluster_t; + + class CQL_EXPORT cql_one_node_balancing_policy_t : public cql_load_balancing_policy_t + { + public: + + cql_one_node_balancing_policy_t( std::string host_address ) : + _host_address( host_address ) {} // the IP of the node we connect to. + + virtual boost::shared_ptr + new_query_plan( + const boost::shared_ptr& query); + + virtual void + init( + cql_cluster_t* cluster); + + private: + boost::mutex _mutex2; + std::string _host_address; // the address IP of the node the query will go to + cql_cluster_t * _cluster2; // need to have its own version of this pointer + }; + + + class CQL_EXPORT cql_one_node_query_plan_t : public cql_query_plan_t + { + public: + + cql_one_node_query_plan_t( + const cql_cluster_t* cluster, + unsigned index, + std::string hostAddress ); // the only one address the query will go to + + virtual boost::shared_ptr next_host_to_query(); // Returns next host to query. + + private: + std::string _hostAddress; // the address IP of the node the query will go to + std::vector > _hosts; + boost::mutex _mutex; + }; + +} + +#endif \ No newline at end of file diff --git a/test/integration_tests/include/policy_tools.hpp b/test/integration_tests/include/policy_tools.hpp index 605e02e8b..99e37b65d 100644 --- a/test/integration_tests/include/policy_tools.hpp +++ b/test/integration_tests/include/policy_tools.hpp @@ -4,12 +4,14 @@ namespace policy_tools{ extern std::map coordinators; - + + void show_coordinators(); // show what queries went to what node IP. + void create_schema( boost::shared_ptr session, int replicationFactor); - + int init( boost::shared_ptr session, diff --git a/test/integration_tests/src/consistency_dcaware_tests.cpp b/test/integration_tests/src/consistency_dcaware_tests.cpp new file mode 100644 index 000000000..f9804eec4 --- /dev/null +++ b/test/integration_tests/src/consistency_dcaware_tests.cpp @@ -0,0 +1,61 @@ +#define BOOST_TEST_DYN_LINK + +#ifdef STAND_ALONE +# define BOOST_TEST_MODULE cassandra +#endif + +#include "cql/cql.hpp" +#include "cql_ccm_bridge.hpp" +#include "test_utils.hpp" +#include "policy_tools.hpp" + +#include "cql/cql_session.hpp" +#include "cql/cql_cluster.hpp" +#include "cql/cql_builder.hpp" + +#include +#include +#include "cql/policies/cql_round_robin_policy.hpp" +#include "cql/policies/cql_dcaware_round_robin_balancing_policy.hpp" + +#include "consistency_tests.hpp" + +struct CONSISTENCY_CCM_DC_AWARE_SETUP : test_utils::CCM_SETUP { + CONSISTENCY_CCM_DC_AWARE_SETUP() : CCM_SETUP(3,3) {} // create 3 local and 3 remote data centers. +}; + +BOOST_FIXTURE_TEST_SUITE( consistency_dcaware_tests, CONSISTENCY_CCM_DC_AWARE_SETUP ) + +//// consistency_dcaware_tests/testDcAwareRFOneTokenAware +BOOST_AUTO_TEST_CASE(testDcAwareRFOneTokenAware) //// Ask local nodes. At first nodes 1,2,3, later only nodes 1,3 after the second is removed. +{ + builder->with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t >( new cql::cql_dcaware_round_robin_balancing_policy_t( "dc1" ) ) ); + ContinueTheConsistencyTest( ccm, builder ); +} + + +//// consistency_dcaware_tests/testDcAwareSecondRFOneTokenAware +BOOST_AUTO_TEST_CASE(testDcAwareSecondRFOneTokenAware) //// Ask local nodes, it means the nodes 4,5,6. Nodes 1,2,3 should be ignored as remote nodes. +{ + builder->with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t >( new cql::cql_dcaware_round_robin_balancing_policy_t( "dc2" ) ) ); + ContinueTheConsistencyTest( ccm, builder ); +} + + +//// consistency_dcaware_tests/testDcAwareRemoteOnlyRFOneTokenAware +BOOST_AUTO_TEST_CASE(testDcAwareRemoteOnlyRFOneTokenAware) //// Ask all six nodes ( 1,2,3,4,5,6 ) because all nodes are treaded as remote nodes. There are no data centers named: "treat_all_as_remote" +{ + builder->with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t >( new cql::cql_dcaware_round_robin_balancing_policy_t( "treat_all_as_remote", 150 ) ) ); + ContinueTheConsistencyTest( ccm, builder ); +} + + +////// consistency_dcaware_tests/testDcAwareRemoteFailAlwaysOnlyRFOneTokenAware +//BOOST_AUTO_TEST_CASE(testDcAwareRemoteFailAlwaysOnlyRFOneTokenAware) //// This test should fail. We connect only to remote nodes, but the limit is zero as default. +//{ +// builder->with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t >( new cql::cql_dcaware_round_robin_balancing_policy_t( "treat_all_as_remote" ) ) ); +// ContinueTheConsistencyTest( ccm, builder ); +//} + + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file diff --git a/test/integration_tests/src/consistency_tests.cpp b/test/integration_tests/src/consistency_tests.cpp new file mode 100644 index 000000000..8d4d8b5f9 --- /dev/null +++ b/test/integration_tests/src/consistency_tests.cpp @@ -0,0 +1,147 @@ +#define BOOST_TEST_DYN_LINK + +#ifdef STAND_ALONE +# define BOOST_TEST_MODULE cassandra +#endif + +#include "cql/cql.hpp" +#include "cql_ccm_bridge.hpp" +#include "test_utils.hpp" +#include "policy_tools.hpp" + +#include "cql/cql_session.hpp" +#include "cql/cql_cluster.hpp" +#include "cql/cql_builder.hpp" + +#include +#include +#include "cql/policies/cql_round_robin_policy.hpp" + +#include "consistency_tests.hpp" + +struct CONSISTENCY_CCM_SETUP : test_utils::CCM_SETUP { + CONSISTENCY_CCM_SETUP() : CCM_SETUP(3,0) {} +}; + + +BOOST_FIXTURE_TEST_SUITE( consistency_tests, CONSISTENCY_CCM_SETUP ) //// consistency_tests/testRFOneTokenAware + + +/// --run_test=consistency_tests/testRFOneTokenAware +BOOST_AUTO_TEST_CASE(testRFOneTokenAware) +{ + builder->with_load_balancing_policy( boost::shared_ptr< cql::cql_load_balancing_policy_t >( new cql::cql_round_robin_policy_t() ) ); + ContinueTheConsistencyTest( ccm, builder ); +} + +BOOST_AUTO_TEST_SUITE_END() + + + + + +std::string GetNameOfConsistency( cql::cql_consistency_enum const a_consistencyName ) +{ + int i = static_cast< int >( a_consistencyName ); + if( i < 0 || i > 7 ) + return "????"; + + static std::string constistNameArr[] = { + "ANY ", + "ONE ", + "TWO ", + "THREE ", + "QUORUM ", + "ALL ", + "LOCAL_QUORUM", + "EACH_QUORUM " }; + + return constistNameArr[ i ]; +} + + +std::string GetResultName( int result ) +{ + return ( result == 0 ? "Ok " : "Failed" ); +} + + +void ContinueTheConsistencyTest( + boost::shared_ptr ccm, + boost::shared_ptr builder ) +{ + boost::shared_ptr cluster(builder->build()); + boost::shared_ptr session(cluster->connect()); + + if (!session) + { + BOOST_FAIL("Session creation failure."); + } + + policy_tools::create_schema(session, 1); + + policy_tools::init(session, 12, cql::CQL_CONSISTENCY_ONE); //// make the table and + policy_tools::query(session, 12, cql::CQL_CONSISTENCY_ONE); //// make 12 reads from the nodes. + policy_tools::show_coordinators(); //// Show what nodes are read from. + + + ccm->decommission(2); //// kill node number 2 + boost::this_thread::sleep(boost::posix_time::seconds(20)); //// wait for node 1 to be down + + policy_tools::reset_coordinators(); + policy_tools::query(session, 12, cql::CQL_CONSISTENCY_ONE); //// make 12 reads from the nodes. + policy_tools::show_coordinators(); //// Show what nodes are read from. + + std::vector< cql::cql_consistency_enum > failConsistencyV; + + failConsistencyV.push_back( cql::CQL_CONSISTENCY_ANY ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_ONE ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_TWO ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_THREE ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_QUORUM ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_ALL ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_LOCAL_QUORUM ); + failConsistencyV.push_back( cql::CQL_CONSISTENCY_EACH_QUORUM ); + + std::map< cql::cql_consistency_enum, std::pair< int, int > > resultsTogether; + + for( std::size_t i = 0; i < failConsistencyV.size(); ++i ) + { + int initResult = 1; + try + { + initResult = policy_tools::init( session, 12, failConsistencyV[ i ] ); + } //// zero means that the opertion performed with SUCCESS + catch( ... ) + { + initResult = 1; + } + + int queryResult = 1; + try + { + queryResult = policy_tools::query( session, 12, failConsistencyV[ i ] ); //// make 12 reads from the nodes. + } //// zero means that the opertion performed with SUCCESS + catch( ... ) + { + queryResult = 1; + } + + resultsTogether.insert( std::make_pair( failConsistencyV[ i ], std::make_pair( initResult, queryResult ) ) ); + } + + std::cout << std::endl << std::endl; + std::cout << "RESULTS FOR ALL CONSISTENCIES: " << std::endl; + std::cout << "CONSISTENCY INSERT SELECT " << std::endl; + for( std::map< cql::cql_consistency_enum, std::pair< int, int > >::const_iterator p = resultsTogether.begin(); p != resultsTogether.end(); ++p ) + { + std::cout << GetNameOfConsistency( p->first ) << " " << GetResultName( p->second.first ) << " " << GetResultName( p->second.second ) << std::endl; + } + + std::cout << std::endl << std::endl << "Press any key..."; + + { + std::string k; + std::cin >> k; //// Wait for the user to read the results from the screen. + } +} diff --git a/test/integration_tests/src/one_node_balancing.cpp b/test/integration_tests/src/one_node_balancing.cpp new file mode 100644 index 000000000..9ff91f997 --- /dev/null +++ b/test/integration_tests/src/one_node_balancing.cpp @@ -0,0 +1,77 @@ +#include "one_node_balancing.hpp" +#include "cql/cql_host.hpp" +#include "cql/cql_metadata.hpp" +#include "cql/cql_cluster.hpp" + + +boost::shared_ptr +cql::cql_one_node_balancing_policy_t::new_query_plan( + const boost::shared_ptr&) +{ + boost::mutex::scoped_lock lock(_mutex2 ); + return boost::shared_ptr( new cql_one_node_query_plan_t( _cluster2, 0, _host_address ) ); +} + +void +cql::cql_one_node_balancing_policy_t::init(cql_cluster_t* cluster) +{ + boost::mutex::scoped_lock lock(_mutex2); + _cluster2 = cluster; +} + + + +cql::cql_one_node_query_plan_t::cql_one_node_query_plan_t( + const cql_cluster_t* cluster, + unsigned index, + std::string hostAddress + ) : + _hostAddress( hostAddress ) +{ + + boost::mutex::scoped_lock lock(_mutex); + + if( cluster != NULL ) + { + if( cluster->metadata().get() != NULL ) + { + cluster->metadata()->get_hosts(_hosts); + } + } +} + + + +boost::shared_ptr +cql::cql_one_node_query_plan_t::next_host_to_query() +{ + boost::mutex::scoped_lock lock(_mutex); + + if( !_hosts.empty() ) + { + for( int i = 0; i < _hosts.size(); ++i ) + { + if( _hosts[ i ].get() != nullptr ) + { + if( _hosts[ i ].get()->address().to_string() == _hostAddress ) + { + if (_hosts[ i ].get()->is_considerably_up()) + { + return _hosts[ i ]; + } + } + } + } + + //// If the address does not fit, take the first address from the pool. + boost::shared_ptr host = _hosts[ 0 ]; + if (host->is_considerably_up()) + { + return host; + } + } + + return boost::shared_ptr(); +} + + diff --git a/test/integration_tests/src/policy_tools.cpp b/test/integration_tests/src/policy_tools.cpp index 9e1afb84d..ada530384 100644 --- a/test/integration_tests/src/policy_tools.cpp +++ b/test/integration_tests/src/policy_tools.cpp @@ -13,6 +13,15 @@ namespace policy_tools{ std::map coordinators; + +void show_coordinators() // show what queries went to what nodes IP. +{ + for( std::map::const_iterator p = coordinators.begin(); p != coordinators.end(); ++p ) + { + std::cout << p->first.to_string() << " : " << p->second << std::endl; + } +} + void create_schema( boost::shared_ptr session, diff --git a/test/unit_tests/cql_dcaware_load_balancing_test.cpp b/test/unit_tests/cql_dcaware_load_balancing_test.cpp new file mode 100644 index 000000000..a3924965b --- /dev/null +++ b/test/unit_tests/cql_dcaware_load_balancing_test.cpp @@ -0,0 +1,237 @@ +#ifndef CQL_METADATA_H_ +#define CQL_METADATA_H_ + +#include +#include + +namespace cql +{ + class cql_host_t; + class cql_metadata_t // the mock-object for testing dcaware_round_robin_balancing_policy. + { // this mock-object enables to create the virtual nodes to test balancing node selection policy + public: + + void get_hosts(std::vector >& collection) const + { + collection = _collection; + }; + + std::vector > _collection; + }; +} + +#endif // CQL_METADATA_H_ + +#include "../src/cql/policies/cql_dcaware_round_robin_balancing_policy.cpp" // the definition of dcaware_round_balancing_policy must be added to cql_test project again, to exist in the main cql_test.exe. + // It must be called locally, to call the mock-object instead of the real cql_metadata_t class. + +#include +#include "cql/policies/cql_constant_reconnection_policy_t.hpp" + +namespace cql +{ + class cql_cluster_dcaware_testing_t : public cql::cql_cluster_t //// a mock-object for testing the dcaware_round_robin_policy. Enables to manually create a lot of virtual nodes. + { + public: + cql_cluster_dcaware_testing_t() { m_metadata_dcaware.reset( new cql_metadata_t() ); } + + virtual boost::shared_ptr metadata() const + { + return m_metadata_dcaware; + } + + virtual boost::shared_ptr connect(){ return boost::shared_ptr(); } + virtual boost::shared_ptr connect(const std::string& keyspace){ return boost::shared_ptr(); } + virtual void shutdown(int timeout_ms = -1){} + + boost::shared_ptr< cql_metadata_t > m_metadata_dcaware; + }; +} + + +class DC_AWARE_ALGORITHM_TEST +{ +}; + + +int get_last_part_of_ip( std::string ip ) // get the last part of IP address. +{ + int p1 = ip.rfind( ":" ); + int p2 = ip.rfind( "." ); + std::string ret = ip.substr( p2 + 1, ( p1 - p2 - 1 ) ); + int ret2 = atoi( ret.c_str() ); + return ret2; +} + + +std::string Tx( int i ) // simple conversion from int to string. +{ + return boost::str(boost::format("%d") % i ); +} + + +BOOST_FIXTURE_TEST_SUITE( dcaware_algorithm_test, DC_AWARE_ALGORITHM_TEST ) //// dcaware_algorithm_test/dc_aware_algorithm + +BOOST_AUTO_TEST_CASE(dc_aware_algorithm) +{ + cql::cql_cluster_dcaware_testing_t cluster_dcaware_testing; + + srand( (unsigned)time(NULL) ); + + int the_number_of_data_centers( 30 ); + int the_number_of_nodes_in_each_data_center( 40 ); + int the_limit_of_number_of_queries_for_each_remote_data_center( 20 ); + + //// populate the cluster with many nodes in many remote datacenters. + for( int i1 = 10; i1 < 10 + the_number_of_data_centers; ++i1 ) // the number of remote data centers. + { + for( int i2 = 10; i2 < 10 + the_number_of_nodes_in_each_data_center; ++i2 ) // the number of nodes in each data centers. + { + std::string add = "192.168." + Tx( i1 ) + "." + Tx( i2 ); + boost::asio::ip::address address( boost::asio::ip::address::from_string( add ) ); + std::string kkk = address.to_string(); + cql::cql_endpoint_t end_point( address, 30000 ); + std::string end_point_data = end_point.to_string(); + boost::shared_ptr new_host = cql::cql_host_t::create( end_point, boost::shared_ptr< cql::cql_reconnection_policy_t>( new cql::cql_constant_reconnection_policy_t( boost::posix_time::seconds(1))) ); + new_host->set_location_info( "dc" + Tx( i1 ), "rack" + Tx( i1 ) ); // set the datacenter name and rack name. + cluster_dcaware_testing.metadata()->_collection.push_back( new_host ); + } + } + + + + + { //// Select only the local nodes. Ask for a local node which exists. Count the queries to each node. It should be linear round-robin on local nodes. + //// The same nodes should be taken many times. + cql::cql_dcaware_round_robin_balancing_policy_t dc_balancing( "dc10", the_limit_of_number_of_queries_for_each_remote_data_center ); + dc_balancing.init( &cluster_dcaware_testing ); + + boost::shared_ptr< cql::cql_query_plan_t> query_plan_dc = dc_balancing.new_query_plan( boost::shared_ptr< cql::cql_query_t>() ); + + std::map< std::string, long > dc_query_quantity; + std::map< std::string, long > ip_query_quantity; + + for( int i = 0; i < cluster_dcaware_testing.metadata()->_collection.size(); ++i) + { + boost::shared_ptr< cql::cql_host_t> host = query_plan_dc->next_host_to_query(); // Returns next host to query. + if( host.get() != NULL ) + { + std::map< std::string, long >::iterator p = dc_query_quantity.find( host->datacenter() ); + + if( p == dc_query_quantity.end() ) + { + dc_query_quantity.insert( std::make_pair( host->datacenter(), 1 ) ); + } + else + { + ++( p->second ); + } + + std::map< std::string, long >::iterator p2 = ip_query_quantity.find( host->endpoint().to_string() ); + + if( p2 == ip_query_quantity.end() ) + { + ip_query_quantity.insert( std::make_pair( host->endpoint().to_string(), 1 ) ); + } + else + { + ++( p2->second ); + } + } + else + { + break; + } + } + + BOOST_REQUIRE( dc_query_quantity.size() == 1 ); //// there should be only one datacenter which was queried. + + std::map< std::string, long >::const_iterator p2 = dc_query_quantity.find( "dc10" ); //// check if this is the local datacenter. + + BOOST_REQUIRE( p2 != dc_query_quantity.end() ); + BOOST_REQUIRE( p2->second == cluster_dcaware_testing.metadata()->_collection.size() ); + + for( std::map< std::string, long >::const_iterator p = ip_query_quantity.begin(); p != ip_query_quantity.end(); ++p ) + { // each node should be taken the same number of times, + BOOST_REQUIRE( p->second == the_number_of_data_centers ); // because this is the linear round-robin. + } + } + + + + + { // Select only the remote nodes. Ask for a local node which does not exist. Count the queries to each last number of IP address. + cql::cql_dcaware_round_robin_balancing_policy_t dc_balancing( "dc1", the_limit_of_number_of_queries_for_each_remote_data_center ); + dc_balancing.init( &cluster_dcaware_testing ); + + boost::shared_ptr< cql::cql_query_plan_t> query_plan_dc = dc_balancing.new_query_plan( boost::shared_ptr< cql::cql_query_t>() ); + + std::map< std::string, long > dc_query_quantity; // how many queries for each data centers. + std::map< long, long > last_ip_part_query_quantity; // how many queries for each last part of ip address. + + for( int i2 = 10; i2 < 10 + the_number_of_nodes_in_each_data_center; ++i2 ) // the number of nodes in each data centers. + { + last_ip_part_query_quantity.insert( std::make_pair( i2, 0 ) ); //// prepare the map for counting the number of queries for each last part of address. + } + + int const query_qnt = the_number_of_data_centers * the_limit_of_number_of_queries_for_each_remote_data_center / 2; // how many queries. + + for( int i = 0; i < query_qnt; ++i) + { + boost::shared_ptr< cql::cql_host_t> host = query_plan_dc->next_host_to_query(); // Returns next host to query. + if( host.get() != NULL ) + { + int const last_part_of_ip = get_last_part_of_ip( host->endpoint().to_string() ); + + std::map< std::string, long >::iterator p = dc_query_quantity.find( host->datacenter() ); + + if( p == dc_query_quantity.end() ) + { + dc_query_quantity.insert( std::make_pair( host->datacenter(), 1 ) ); + } + else + { + ++( p->second ); + } + + std::map< long, long >::iterator p3 = last_ip_part_query_quantity.find( last_part_of_ip ); + if( p3 != last_ip_part_query_quantity.end() ) + { + ++( p3->second ); + } + } + else + { + break; + } + } + + std::vector< int > occu; //// the number of queries to each fourth part of IP address. + + for( std::map< long, long >::const_iterator p4 = last_ip_part_query_quantity.begin(); p4 != last_ip_part_query_quantity.end(); ++p4 ) + occu.push_back( p4->second ); + + if( occu.size() < 4 ) + { + BOOST_REQUIRE( false ); //// to few nodes to compute the reliable reulsts of the test. + } + + std::sort( occu.begin(), occu.end() ); //// Sort by the number of queries by each last number of IP address. + + int index1 = occu.size() / 4; //// Take the value of the vector of about 25% of the length and of about the 75% of the length. + int index2 = index1 * 3; //// Compute what is the difference. There should be small difference. The less the better. + + if( index1 < occu.size() && index2 < occu.size() ) + { + int lowValue = occu[ index1 ]; + int highValue = occu[ index2 ]; + + BOOST_REQUIRE( lowValue > 0 ); + BOOST_REQUIRE( highValue / lowValue < 3 ); + } + } + +} + +BOOST_AUTO_TEST_SUITE_END() +