From f0fea88a7eca7e3a644ed4c8bf508c953b5aae02 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Mon, 1 Aug 2016 15:11:38 -0700 Subject: [PATCH 1/7] Randomize contact points and intial host in load balancing policies --- include/cassandra.h | 20 ++++++ src/cluster.cpp | 6 ++ src/config.hpp | 9 ++- src/control_connection.cpp | 28 +++++--- src/dc_aware_policy.cpp | 10 ++- src/dc_aware_policy.hpp | 2 +- src/host.cpp | 8 --- src/host.hpp | 7 +- src/latency_aware_policy.cpp | 12 +++- src/latency_aware_policy.hpp | 2 +- src/list_policy.cpp | 22 ++++--- src/list_policy.hpp | 2 +- src/load_balancing.hpp | 7 +- src/random.cpp | 22 ++++++- src/random.hpp | 12 ++++ src/round_robin_policy.cpp | 72 +++++++++++++++++++++ src/round_robin_policy.hpp | 51 ++++----------- src/session.cpp | 9 ++- src/session.hpp | 2 + src/token_aware_policy.cpp | 13 ++++ src/token_aware_policy.hpp | 2 + test/unit_tests/src/test_load_balancing.cpp | 44 ++++++------- 22 files changed, 259 insertions(+), 103 deletions(-) create mode 100644 src/round_robin_policy.cpp diff --git a/include/cassandra.h b/include/cassandra.h index 9f7007ef3..ddd2ffa84 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -1705,6 +1705,26 @@ CASS_EXPORT CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster, cass_bool_t enabled); +/** + * Enable/Disable the randomization of the contact points list. + * + * Default: cass_true (enabled). + * + * Important: This setting should only be disabled for debugging or + * tests. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] enabled + * @return CASS_OK if successful, otherwise an error occurred + * + * @see cass_cluster_set_resolve_timeout() + */ +CASS_EXPORT CassError +cass_cluster_set_use_randomized_contact_points(CassCluster* cluster, + cass_bool_t enabled); + /*********************************************************************************** * * Session diff --git a/src/cluster.cpp b/src/cluster.cpp index 677a6806d..4c58ba630 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -423,6 +423,12 @@ CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster, #endif } +CassError cass_cluster_set_use_randomized_contact_points(CassCluster* cluster, + cass_bool_t enabled) { + cluster->config().set_use_randomized_contact_points(enabled); + return CASS_OK; +} + void cass_cluster_free(CassCluster* cluster) { delete cluster->from(); } diff --git a/src/config.hpp b/src/config.hpp index 180938726..e3538ccb7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -74,7 +74,8 @@ class Config { , timestamp_gen_(new ServerSideTimestampGenerator()) , retry_policy_(new DefaultRetryPolicy()) , use_schema_(true) - , use_hostname_resolution_(false) { } + , use_hostname_resolution_(false) + , use_randomized_contact_points_(true) { } unsigned thread_count_io() const { return thread_count_io_; } @@ -359,6 +360,11 @@ class Config { use_hostname_resolution_ = enable; } + bool use_randomized_contact_points() const { return use_randomized_contact_points_; } + void set_use_randomized_contact_points(bool enable) { + use_randomized_contact_points_ = enable; + } + private: int port_; int protocol_version_; @@ -402,6 +408,7 @@ class Config { SharedRefPtr retry_policy_; bool use_schema_; bool use_hostname_resolution_; + bool use_randomized_contact_points_; }; } // namespace cass diff --git a/src/control_connection.cpp b/src/control_connection.cpp index cb61e2895..bf00fd29c 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -29,7 +29,9 @@ #include "session.hpp" #include "timer.hpp" +#include #include +#include #include #include @@ -58,20 +60,25 @@ namespace cass { class ControlStartupQueryPlan : public QueryPlan { public: - ControlStartupQueryPlan(const HostMap& hosts) - : hosts_(hosts) - , it_(hosts_.begin()) {} + ControlStartupQueryPlan(const HostMap& hosts, Random* random) + : index_(random != NULL ? random->next(std::max(static_cast(1), hosts.size())) : 0) + , count_(0) { + hosts_.reserve(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::back_inserter(hosts_), GetHost()); + } virtual SharedRefPtr compute_next() { - if (it_ == hosts_.end()) return SharedRefPtr(); - const SharedRefPtr& host = it_->second; - ++it_; - return host; + const size_t size = hosts_.size(); + if (count_ >= size) return SharedRefPtr(); + size_t index = (index_ + count_) % size; + ++count_; + return hosts_[index]; } private: - const HostMap hosts_; - HostMap::const_iterator it_; + HostVec hosts_; + size_t index_; + size_t count_; }; bool ControlConnection::determine_address_for_peer_host(const Address& connected_address, @@ -131,7 +138,8 @@ void ControlConnection::clear() { void ControlConnection::connect(Session* session) { session_ = session; - query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_)); // No hosts lock necessary (read-only) + query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_, // No hosts lock necessary (read-only) + session_->random_.get())); protocol_version_ = session_->config().protocol_version(); should_query_tokens_ = session_->config().token_aware_routing(); if (protocol_version_ < 0) { diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index ea0d387d3..0d14e79f4 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -20,12 +20,15 @@ #include "scoped_lock.hpp" -namespace cass { +#include +namespace cass { static const CopyOnWriteHostVec NO_HOSTS(new HostVec()); -void DCAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap& hosts) { +void DCAwarePolicy::init(const SharedRefPtr& connected_host, + const HostMap& hosts, + Random* random) { if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) { LOG_INFO("Using '%s' for the local data center " "(if this is incorrect, please provide the correct data center)", @@ -37,6 +40,9 @@ void DCAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap end = hosts.end(); i != end; ++i) { on_add(i->second); } + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } } CassHostDistance DCAwarePolicy::distance(const SharedRefPtr& host) const { diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index d510c30fd..b0231a9ab 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -46,7 +46,7 @@ class DCAwarePolicy : public LoadBalancingPolicy { , local_dc_live_hosts_(new HostVec) , index_(0) {} - virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts); + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); virtual CassHostDistance distance(const SharedRefPtr& host) const; diff --git a/src/host.cpp b/src/host.cpp index 120e0efe1..217625efb 100644 --- a/src/host.cpp +++ b/src/host.cpp @@ -18,14 +18,6 @@ namespace cass { -void copy_hosts(const HostMap& from_hosts, CopyOnWriteHostVec& to_hosts) { - to_hosts->reserve(from_hosts.size()); - for (HostMap::const_iterator i = from_hosts.begin(), - end = from_hosts.end(); i != end; ++i) { - to_hosts->push_back(i->second); - } -} - void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host) { HostVec::iterator i; for (i = hosts->begin(); i != hosts->end(); ++i) { diff --git a/src/host.hpp b/src/host.hpp index 84108f026..eea584049 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -235,11 +235,16 @@ class Host : public RefCounted { }; typedef std::map > HostMap; +struct GetHost { + typedef std::pair Pair; + Host::Ptr operator()(const Pair& pair) const { + return pair.second; + } +}; typedef std::pair > HostPair; typedef std::vector > HostVec; typedef CopyOnWritePtr CopyOnWriteHostVec; -void copy_hosts(const HostMap& from_hosts, CopyOnWriteHostVec& to_hosts); void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host); void remove_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host); diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index c48406eda..97eab77dc 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -19,15 +19,21 @@ #include "get_time.hpp" #include "logger.hpp" +#include +#include + namespace cass { -void LatencyAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap& hosts) { - copy_hosts(hosts, hosts_); +void LatencyAwarePolicy::init(const SharedRefPtr& connected_host, + const HostMap& hosts, + Random* random) { + hosts_->reserve(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured); } - ChainedLoadBalancingPolicy::init(connected_host, hosts); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random); } void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index c71dfab9a..f911adc35 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~LatencyAwarePolicy() {} - virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts); + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); virtual void register_handles(uv_loop_t* loop); virtual void close_handles(); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index 19e8f9aec..5d03696e4 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -16,21 +16,27 @@ #include "list_policy.hpp" +#include "logger.hpp" + namespace cass { void ListPolicy::init(const SharedRefPtr& connected_host, - const HostMap& hosts) { - HostMap whitelist_hosts; + const HostMap& hosts, + Random* random) { + HostMap valid_hosts; for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { const SharedRefPtr& host = i->second; if (is_valid_host(host)) { - whitelist_hosts.insert(HostPair(i->first, host)); + valid_hosts.insert(HostPair(i->first, host)); } } - assert(!whitelist_hosts.empty()); - child_policy_->init(connected_host, whitelist_hosts); + if (valid_hosts.empty()) { + LOG_ERROR("No valid hosts available for list policy"); + } + + ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random); } CassHostDistance ListPolicy::distance(const SharedRefPtr& host) const { @@ -41,9 +47,9 @@ CassHostDistance ListPolicy::distance(const SharedRefPtr& host) const { } QueryPlan* ListPolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap& token_map, - Request::EncodingCache* cache) { + const Request* request, + const TokenMap& token_map, + Request::EncodingCache* cache) { return child_policy_->new_query_plan(connected_keyspace, request, token_map, diff --git a/src/list_policy.hpp b/src/list_policy.hpp index 0d6ef4ce6..1af0959dd 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -30,7 +30,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ~ListPolicy() {} - virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts); + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); virtual CassHostDistance distance(const SharedRefPtr& host) const; diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index 37e6410c2..7b2ae0011 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -51,6 +51,7 @@ typedef enum CassHostDistance_ { namespace cass { +class Random; class RoutableRequest; class TokenMap; @@ -80,7 +81,7 @@ class LoadBalancingPolicy : public Host::StateListener, public RefCounted& connected_host, const HostMap& hosts) = 0; + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random) = 0; virtual void register_handles(uv_loop_t* loop) {} virtual void close_handles() {} @@ -103,8 +104,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy { virtual ~ChainedLoadBalancingPolicy() {} - virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts) { - return child_policy_->init(connected_host, hosts); + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random) { + return child_policy_->init(connected_host, hosts, random); } virtual CassHostDistance distance(const SharedRefPtr& host) const { return child_policy_->distance(host); } diff --git a/src/random.cpp b/src/random.cpp index 618804ce7..a22019bdf 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -14,6 +14,12 @@ limitations under the License. */ +#include "random.hpp" + +#include "cassandra.h" +#include "logger.hpp" +#include "scoped_lock.hpp" + #if defined(_WIN32) #ifndef _WINSOCKAPI_ #define _WINSOCKAPI_ @@ -29,10 +35,22 @@ #include #endif -#include "logger.hpp" - namespace cass { +Random::Random() + // Use high resolution time if we can't get a real random seed + : rng_(get_random_seed(uv_hrtime())) { +} + +uint64_t Random::next(uint64_t max) { + const uint64_t limit = CASS_UINT64_MAX - CASS_UINT64_MAX % max; + uint64_t r; + do { + r = rng_(); + } while(r >= limit); + return r % max; +} + #if defined(_WIN32) uint64_t get_random_seed(uint64_t seed) { diff --git a/src/random.hpp b/src/random.hpp index be6177567..38f1923dc 100644 --- a/src/random.hpp +++ b/src/random.hpp @@ -19,8 +19,20 @@ #include "third_party/mt19937_64/mt19937_64.hpp" +#include + namespace cass { +class Random { +public: + Random(); + + uint64_t next(uint64_t max); + +private: + MT19937_64 rng_; +}; + uint64_t get_random_seed(uint64_t seed); } // namespace cass diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp new file mode 100644 index 000000000..e06f080b7 --- /dev/null +++ b/src/round_robin_policy.cpp @@ -0,0 +1,72 @@ +/* + Copyright (c) 2014-2016 DataStax + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "round_robin_policy.hpp" + +#include +#include + +namespace cass { + +void RoundRobinPolicy::init(const SharedRefPtr& connected_host, + const HostMap& hosts, + Random* random) { + hosts_->reserve(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } +} + +CassHostDistance RoundRobinPolicy::distance(const SharedRefPtr& host) const { + return CASS_HOST_DISTANCE_LOCAL; +} + +QueryPlan* RoundRobinPolicy::new_query_plan(const std::string& connected_keyspace, + const Request* request, + const TokenMap& token_map, + Request::EncodingCache* cache) { + return new RoundRobinQueryPlan(hosts_, index_++); +} + +void RoundRobinPolicy::on_add(const SharedRefPtr& host) { + add_host(hosts_, host); +} + +void RoundRobinPolicy::on_remove(const SharedRefPtr& host) { + remove_host(hosts_, host); +} + +void RoundRobinPolicy::on_up(const SharedRefPtr& host) { + on_add(host); +} + +void RoundRobinPolicy::on_down(const SharedRefPtr& host) { + on_remove(host); +} + +SharedRefPtr RoundRobinPolicy::RoundRobinQueryPlan::compute_next() { + while (remaining_ > 0) { + --remaining_; + const SharedRefPtr& host((*hosts_)[index_++ % hosts_->size()]); + if (host->is_up()) { + return host; + } + } + return SharedRefPtr(); +} + +} // namespace cass diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index 96600e39e..7ef0483b9 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -21,8 +21,7 @@ #include "copy_on_write_ptr.hpp" #include "load_balancing.hpp" #include "host.hpp" - -#include +#include "random.hpp" namespace cass { @@ -30,38 +29,21 @@ class RoundRobinPolicy : public LoadBalancingPolicy { public: RoundRobinPolicy() : hosts_(new HostVec) - , index_(0) {} + , index_(0) { } - virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts) { - copy_hosts(hosts, hosts_); - } + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); - virtual CassHostDistance distance(const SharedRefPtr& host) const { - return CASS_HOST_DISTANCE_LOCAL; - } + virtual CassHostDistance distance(const SharedRefPtr& host) const; virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, const Request* request, const TokenMap& token_map, - Request::EncodingCache* cache) { - return new RoundRobinQueryPlan(hosts_, index_++); - } - - virtual void on_add(const SharedRefPtr& host) { - add_host(hosts_, host); - } - - virtual void on_remove(const SharedRefPtr& host) { - remove_host(hosts_, host); - } + Request::EncodingCache* cache); - virtual void on_up(const SharedRefPtr& host) { - on_add(host); - } - - virtual void on_down(const SharedRefPtr& host) { - on_remove(host); - } + virtual void on_add(const SharedRefPtr& host); + virtual void on_remove(const SharedRefPtr& host); + virtual void on_up(const SharedRefPtr& host); + virtual void on_down(const SharedRefPtr& host); virtual LoadBalancingPolicy* new_instance() { return new RoundRobinPolicy(); } @@ -71,18 +53,9 @@ class RoundRobinPolicy : public LoadBalancingPolicy { RoundRobinQueryPlan(const CopyOnWriteHostVec& hosts, size_t start_index) : hosts_(hosts) , index_(start_index) - , remaining_(hosts->size()) {} - - SharedRefPtr compute_next() { - while (remaining_ > 0) { - --remaining_; - const SharedRefPtr& host((*hosts_)[index_++ % hosts_->size()]); - if (host->is_up()) { - return host; - } - } - return SharedRefPtr(); - } + , remaining_(hosts->size()) { } + + virtual SharedRefPtr compute_next(); private: const CopyOnWriteHostVec hosts_; diff --git a/src/session.cpp b/src/session.cpp index 90e14543d..bee0c9ee4 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -153,6 +153,7 @@ Session::~Session() { void Session::clear(const Config& config) { config_ = config; + random_.reset(); metrics_.reset(new Metrics(config_.thread_count_io() + 1)); load_balancing_policy_.reset(config.load_balancing_policy()); connect_future_.reset(); @@ -419,6 +420,12 @@ void Session::on_event(const SessionEvent& event) { case SessionEvent::CONNECT: { int port = config_.port(); + // This needs to be done on the session thread because it could pause + // generating a new random seed. + if (config_.use_randomized_contact_points()) { + random_.reset(new Random()); + } + MultiResolver::Ptr resolver( new MultiResolver(this, on_resolve, #if UV_VERSION_MAJOR >= 1 @@ -548,7 +555,7 @@ void Session::on_add_resolve_name(NameResolver* resolver) { void Session::on_control_connection_ready() { // No hosts lock necessary (only called on session thread and read-only) - load_balancing_policy_->init(control_connection_.connected_host(), hosts_); + load_balancing_policy_->init(control_connection_.connected_host(), hosts_, random_.get()); load_balancing_policy_->register_handles(loop()); for (IOWorkerVec::iterator it = io_workers_.begin(), end = io_workers_.end(); it != end; ++it) { diff --git a/src/session.hpp b/src/session.hpp index 93163003a..a211d834e 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -27,6 +27,7 @@ #include "metadata.hpp" #include "metrics.hpp" #include "mpmc_queue.hpp" +#include "random.hpp" #include "ref_counted.hpp" #include "resolver.hpp" #include "row.hpp" @@ -196,6 +197,7 @@ class Session : public EventThread { IOWorkerVec io_workers_; ScopedPtr > > request_queue_; Metadata metadata_; + ScopedPtr random_; ControlConnection control_connection_; bool current_host_mark_; int pending_pool_count_; diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index 06e0c1033..267d2641b 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -16,6 +16,10 @@ #include "token_aware_policy.hpp" +#include "random.hpp" + +#include + namespace cass { // The number of replicas is bounded by replication factor per DC. In practice, the number @@ -30,6 +34,15 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a return false; } +void TokenAwarePolicy::init(const SharedRefPtr& connected_host, + const HostMap& hosts, + Random* random) { + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } + ChainedLoadBalancingPolicy::init(connected_host, hosts, random); +} + QueryPlan* TokenAwarePolicy::new_query_plan(const std::string& connected_keyspace, const Request* request, const TokenMap& token_map, diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index b18cdab45..1d6cf63da 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -32,6 +32,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~TokenAwarePolicy() {} + virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); + virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, const Request* request, const TokenMap& token_map, diff --git a/test/unit_tests/src/test_load_balancing.cpp b/test/unit_tests/src/test_load_balancing.cpp index 1ed776f68..1c47692fa 100644 --- a/test/unit_tests/src/test_load_balancing.cpp +++ b/test/unit_tests/src/test_load_balancing.cpp @@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(simple) { populate_hosts(2, "rack", "dc", &hosts); cass::RoundRobinPolicy policy; - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -183,7 +183,7 @@ BOOST_AUTO_TEST_CASE(on_add) populate_hosts(2, "rack", "dc", &hosts); cass::RoundRobinPolicy policy; - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -208,7 +208,7 @@ BOOST_AUTO_TEST_CASE(on_remove) populate_hosts(3, "rack", "dc", &hosts); cass::RoundRobinPolicy policy; - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -234,7 +234,7 @@ BOOST_AUTO_TEST_CASE(on_down_on_up) populate_hosts(3, "rack", "dc", &hosts); cass::RoundRobinPolicy policy; - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -288,7 +288,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) { populate_hosts(local_count, "rack", LOCAL_DC, &hosts); populate_hosts(remote_count, "rack", REMOTE_DC, &hosts); cass::DCAwarePolicy policy(LOCAL_DC, remote_count, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); const size_t total_hosts = local_count + remote_count; cass::TokenMap tokenMap; @@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(some_dc_local_unspecified) h->set_rack_and_dc("", ""); cass::DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, tokenMap, NULL)); @@ -332,7 +332,7 @@ BOOST_AUTO_TEST_CASE(single_local_down) populate_hosts(1, "rack", REMOTE_DC, &hosts); cass::DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -360,7 +360,7 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned) populate_hosts(1, "rack", REMOTE_DC, &hosts); cass::DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -395,7 +395,7 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned) cass::SharedRefPtr target_host = hosts[target_addr]; cass::DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; @@ -429,7 +429,7 @@ BOOST_AUTO_TEST_CASE(used_hosts_per_remote_dc) for (size_t used_hosts = 0; used_hosts < 3; ++used_hosts) { cass::DCAwarePolicy policy(LOCAL_DC, used_hosts, false); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, cass::TokenMap(), NULL)); size_t total_hosts = 3 + used_hosts; @@ -449,7 +449,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) // Not allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = false; cass::DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); // Set local CL cass::SharedRefPtr request(new cass::QueryRequest()); @@ -465,7 +465,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) // Allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = true; cass::DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); // Set local CL cass::SharedRefPtr request(new cass::QueryRequest()); @@ -487,7 +487,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc) // Set local DC using connected host { cass::DCAwarePolicy policy("", 0, false); - policy.init(hosts[cass::Address("2.0.0.0", 4092)], hosts); + policy.init(hosts[cass::Address("2.0.0.0", 4092)], hosts, NULL); cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, cass::TokenMap(), NULL)); const size_t seq[] = {2, 3, 4}; @@ -498,7 +498,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc) { cass::DCAwarePolicy policy("", 0, false); policy.init(cass::SharedRefPtr( - new cass::Host(cass::Address("0.0.0.0", 4092), false)), hosts); + new cass::Host(cass::Address("0.0.0.0", 4092), false)), hosts, NULL); cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, cass::TokenMap(), NULL)); const size_t seq[] = {1}; @@ -544,7 +544,7 @@ BOOST_AUTO_TEST_CASE(simple) } token_map.build(); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::SharedRefPtr request(new cass::QueryRequest(1)); const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887 @@ -625,7 +625,7 @@ BOOST_AUTO_TEST_CASE(network_topology) } token_map.build(); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::SharedRefPtr request(new cass::QueryRequest(1)); const char* value = "abc"; // hash: -5434086359492102041 @@ -732,7 +732,7 @@ BOOST_AUTO_TEST_CASE(simple) cass::HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); cass::LatencyAwarePolicy policy(new cass::RoundRobinPolicy(), settings); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); // Record some latencies with 100 ns being the minimum for (cass::HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -789,7 +789,7 @@ BOOST_AUTO_TEST_CASE(min_average_under_min_measured) cass::HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); cass::LatencyAwarePolicy policy(new cass::RoundRobinPolicy(), settings); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); int count = 1; for (cass::HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -828,7 +828,7 @@ BOOST_AUTO_TEST_CASE(simple) whitelist_hosts.push_back("37.0.0.0"); whitelist_hosts.push_back("83.0.0.0"); cass::WhitelistPolicy policy(new cass::RoundRobinPolicy(), whitelist_hosts); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, tokenMap, NULL)); @@ -851,7 +851,7 @@ BOOST_AUTO_TEST_CASE(dc) whitelist_dcs.push_back(LOCAL_DC); whitelist_dcs.push_back(REMOTE_DC); cass::WhitelistDCPolicy policy(new cass::RoundRobinPolicy(), whitelist_dcs); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, tokenMap, NULL)); @@ -877,7 +877,7 @@ BOOST_AUTO_TEST_CASE(simple) blacklist_hosts.push_back("2.0.0.0"); blacklist_hosts.push_back("3.0.0.0"); cass::BlacklistPolicy policy(new cass::RoundRobinPolicy(), blacklist_hosts); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, tokenMap, NULL)); @@ -900,7 +900,7 @@ BOOST_AUTO_TEST_CASE(dc) blacklist_dcs.push_back(LOCAL_DC); blacklist_dcs.push_back(REMOTE_DC); cass::BlacklistDCPolicy policy(new cass::RoundRobinPolicy(), blacklist_dcs); - policy.init(cass::SharedRefPtr(), hosts); + policy.init(cass::SharedRefPtr(), hosts, NULL); cass::TokenMap tokenMap; boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, tokenMap, NULL)); From 42b4747a24fb10a9ee7c56e5a2209670d82d0069 Mon Sep 17 00:00:00 2001 From: Michael Fero Date: Wed, 10 Aug 2016 16:09:09 +0000 Subject: [PATCH 2/7] test: Adding test to ensure randomized contacts - Updated test harness to disable randomize contacts by default --- .../src/test_control_connection.cpp | 90 ++++++++++++++++--- test/integration_tests/src/test_utils.cpp | 1 + 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/test/integration_tests/src/test_control_connection.cpp b/test/integration_tests/src/test_control_connection.cpp index cd1696691..cf3712779 100644 --- a/test/integration_tests/src/test_control_connection.cpp +++ b/test/integration_tests/src/test_control_connection.cpp @@ -39,21 +39,32 @@ struct ControlConnectionTests { , ip_prefix(ccm->get_ip_prefix()) , version(test_utils::get_version()) {} - void check_for_live_hosts(test_utils::CassSessionPtr session, + std::string get_executing_host(test_utils::CassSessionPtr session) { + std::stringstream query; + query << "SELECT * FROM " << (version >= "3.0.0" ? "system_schema.keyspaces" : "system.schema_keyspaces"); + test_utils::CassStatementPtr statement(cass_statement_new(query.str().c_str(), 0)); + test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); + if (cass_future_error_code(future.get()) == CASS_OK) { + return cass::get_host_from_future(future.get()); + } else { + CassString message; + cass_future_error_message(future.get(), &message.data, &message.length); + std::cerr << "Failed to query host: " << std::string(message.data, message.length) << std::endl; + } + + return ""; + } + + void check_for_live_hosts(test_utils::CassSessionPtr session, const std::set& should_be_present) { std::set hosts; std::stringstream query; query << "SELECT * FROM " << (version >= "3.0.0" ? "system_schema.keyspaces" : "system.schema_keyspaces"); for (size_t i = 0; i < should_be_present.size() + 2; ++i) { - test_utils::CassStatementPtr statement(cass_statement_new(query.str().c_str(), 0)); - test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); - if (cass_future_error_code(future.get()) == CASS_OK) { - hosts.insert(cass::get_host_from_future(future.get())); - } else { - CassString message; - cass_future_error_message(future.get(), &message.data, &message.length); - std::cerr << "Failed to query host: " << std::string(message.data, message.length) << std::endl; + std::string host = get_executing_host(session); + if (!host.empty()) { + hosts.insert(host); } } @@ -83,7 +94,7 @@ BOOST_FIXTURE_TEST_SUITE(control_connection, ControlConnectionTests) BOOST_AUTO_TEST_CASE(connect_invalid_ip) { - test_utils::CassLog::reset("Host 1.1.1.1 had the following error on startup: Connection timeout"); + test_utils::CassLog::reset("Unable to establish a control connection to host 1.1.1.1 because of the following error: Connection timeout"); test_utils::CassClusterPtr cluster(cass_cluster_new()); cass_cluster_set_contact_points(cluster.get(), "1.1.1.1"); @@ -394,4 +405,63 @@ BOOST_AUTO_TEST_CASE(node_decommission) ccm->remove_cluster(); } +/** + * Randomized contact points + * + * This test ensures the driver will randomize the contact points when executing + * a query plan + * + * @since 2.4.3 + * @jira_ticket CPP-193 + * @test_category control_connection + */ +BOOST_AUTO_TEST_CASE(randomized_contact_points) +{ + std::string starting_host = ip_prefix + "1"; + size_t retries = 0; + test_utils::CassSessionPtr session; + + { + test_utils::CassClusterPtr cluster(cass_cluster_new()); + if (ccm->create_cluster(4)) { + ccm->start_cluster(); + } + + test_utils::initialize_contact_points(cluster.get(), ip_prefix, 4, 0); + cass_cluster_set_use_randomized_contact_points(cluster.get(), cass_true); + + // Make sure the first host executing a statement is not .1 + do { + test_utils::CassLog::reset("Adding pool for host " + ip_prefix); + session = test_utils::CassSessionPtr(test_utils::create_session(cluster.get())); + + // Wait for all hosts to be added to the pool; timeout after 10 seconds + boost::chrono::steady_clock::time_point end = boost::chrono::steady_clock::now() + boost::chrono::milliseconds(10000); + while (test_utils::CassLog::message_count() != 4ul && boost::chrono::steady_clock::now() < end) { + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + } + BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 4ul); + + starting_host = get_executing_host(session); + } while (starting_host == ip_prefix + "1" && retries++ < 10); + } + + BOOST_CHECK_NE(ip_prefix + "1", starting_host); + BOOST_CHECK_LT(retries, 10); + + // Ensure the remaining hosts are executed (round robin)) + { + int node = starting_host.at(starting_host.length() - 1) - '0'; + for (int i = 0; i < 3; ++i) { + node = (node + 1 > 4) ? 1 : node + 1; + std::string expected_host = ip_prefix + boost::lexical_cast(node); + std::string host = get_executing_host(session); + BOOST_CHECK_EQUAL(expected_host, host); + } + } + + // Ensure the next host is the starting host + BOOST_CHECK_EQUAL(starting_host, get_executing_host(session)); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp index 091e5a6f5..5b7eb46d6 100644 --- a/test/integration_tests/src/test_utils.cpp +++ b/test/integration_tests/src/test_utils.cpp @@ -222,6 +222,7 @@ MultipleNodesTest::MultipleNodesTest(unsigned int num_nodes_dc1, unsigned int nu cass_cluster_set_num_threads_io(cluster, 4); cass_cluster_set_max_concurrent_creation(cluster, 8); cass_cluster_set_protocol_version(cluster, protocol_version); + cass_cluster_set_use_randomized_contact_points(cluster, cass_false); } MultipleNodesTest::~MultipleNodesTest() { From 8af7eee666ddf90d95ac20b4145a6442637a595d Mon Sep 17 00:00:00 2001 From: Michael Fero Date: Wed, 10 Aug 2016 18:46:42 +0000 Subject: [PATCH 3/7] test: Fixing broken aggregate lookup test --- test/integration_tests/src/test_schema_metadata.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration_tests/src/test_schema_metadata.cpp b/test/integration_tests/src/test_schema_metadata.cpp index 3eeaf596e..db6796366 100644 --- a/test/integration_tests/src/test_schema_metadata.cpp +++ b/test/integration_tests/src/test_schema_metadata.cpp @@ -1490,7 +1490,7 @@ BOOST_AUTO_TEST_CASE(lookup) { BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_BIGINT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 1))); datatype = cass_aggregate_meta_return_type(agg_meta); BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_DOUBLE, cass_data_type_type(datatype)); - func_meta = cass_aggregate_meta_state_func(agg_meta); + func_meta = cass_aggregate_meta_final_func(agg_meta); BOOST_CHECK_EQUAL(1, cass_function_meta_argument_count(func_meta)); datatype = cass_function_meta_argument_type_by_name(func_meta, "state"); BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype)); From 854360b1e39ab49cd262b7c4f2fdb3ba85cbbf7f Mon Sep 17 00:00:00 2001 From: Michael Fero Date: Wed, 10 Aug 2016 18:48:52 +0000 Subject: [PATCH 4/7] test: Decreasing time and increasing stability of tests --- test/integration_tests/src/test_basics.cpp | 2 +- test/integration_tests/src/test_consistency.cpp | 3 ++- test/integration_tests/src/test_logging.cpp | 7 ++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/test/integration_tests/src/test_basics.cpp b/test/integration_tests/src/test_basics.cpp index ba8bdfdab..ae24bf348 100644 --- a/test/integration_tests/src/test_basics.cpp +++ b/test/integration_tests/src/test_basics.cpp @@ -537,7 +537,7 @@ BOOST_AUTO_TEST_CASE(rows_in_rows_out) // Create insert statement for bound parameters std::string insert_query(boost::str(boost::format("INSERT INTO %s (tweet_id, t1, t2, t3) VALUES (?, ?, ?, ?);") % test_utils::SIMPLE_TABLE)); - const size_t num_rows = 100000; + const size_t num_rows = 1000; for (size_t i = 0; i < num_rows; ++i) { test_utils::CassStatementPtr statement(cass_statement_new(insert_query.c_str(), 4)); diff --git a/test/integration_tests/src/test_consistency.cpp b/test/integration_tests/src/test_consistency.cpp index 1307e22c3..05ad8bafc 100644 --- a/test/integration_tests/src/test_consistency.cpp +++ b/test/integration_tests/src/test_consistency.cpp @@ -199,7 +199,8 @@ BOOST_AUTO_TEST_CASE(retry_policy_downgrading) test_utils::CassClusterPtr cluster(cass_cluster_new()); CassRetryPolicy* downgrading_policy = cass_retry_policy_downgrading_consistency_new(); cass_cluster_set_retry_policy(cluster.get(), downgrading_policy); - cass_cluster_set_connection_heartbeat_interval(cluster.get(), 0); + cass_cluster_set_connection_heartbeat_interval(cluster.get(), 1); + cass_cluster_set_connection_idle_timeout(cluster.get(), 10); if (ccm->create_cluster(3)) { ccm->start_cluster(); diff --git a/test/integration_tests/src/test_logging.cpp b/test/integration_tests/src/test_logging.cpp index c41f4bd4b..7497cbf99 100644 --- a/test/integration_tests/src/test_logging.cpp +++ b/test/integration_tests/src/test_logging.cpp @@ -114,6 +114,7 @@ BOOST_AUTO_TEST_CASE(logging_connection_error_reduced) */ BOOST_AUTO_TEST_CASE(logging_pool_error_reduced) { + test_utils::CassLog::set_output_log_level(CASS_LOG_DEBUG); test_utils::CassLog::reset("Connection pool was unable to connect to host"); test_utils::CassLog::set_expected_log_level(CASS_LOG_ERROR); @@ -134,7 +135,11 @@ BOOST_AUTO_TEST_CASE(logging_pool_error_reduced) // Create a connection error by pausing the node during async connection (ERROR) test_utils::CassFuturePtr connect_future(cass_session_connect(session.get(), cluster.get())); ccm->pause_node(1); - cass_future_error_code(connect_future.get()); + while (CASS_ERROR_LIB_NO_HOSTS_AVAILABLE == cass_future_error_code(connect_future.get())) { + ccm->resume_node(1); + connect_future = test_utils::CassFuturePtr(cass_session_connect(session.get(), cluster.get())); + ccm->pause_node(1); + } BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 1); // Sleep to allow the connection pool failure on the paused node (WARN) From 4f822cd22f6fa3a8dac2ac37a218b8d681dc3c03 Mon Sep 17 00:00:00 2001 From: Michael Fero Date: Wed, 10 Aug 2016 18:49:22 +0000 Subject: [PATCH 5/7] test: Fixing broken log message checks --- test/integration_tests/src/test_pool.cpp | 2 +- test/integration_tests/src/test_sessions.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration_tests/src/test_pool.cpp b/test/integration_tests/src/test_pool.cpp index f56ada8eb..0314a7d3b 100644 --- a/test/integration_tests/src/test_pool.cpp +++ b/test/integration_tests/src/test_pool.cpp @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(no_hosts_backpressure) BOOST_AUTO_TEST_CASE(connection_spawn) { TestPool tester; - const std::string SPAWN_MSG = "Spawning new connection to host " + tester.ccm->get_ip_prefix() + "1:9042"; + const std::string SPAWN_MSG = "Spawning new connection to host " + tester.ccm->get_ip_prefix() + "1"; test_utils::CassLog::reset(SPAWN_MSG); test_utils::MultipleNodesTest inst(1, 0); diff --git a/test/integration_tests/src/test_sessions.cpp b/test/integration_tests/src/test_sessions.cpp index 268c3dcc6..353f66025 100644 --- a/test/integration_tests/src/test_sessions.cpp +++ b/test/integration_tests/src/test_sessions.cpp @@ -51,7 +51,7 @@ BOOST_FIXTURE_TEST_SUITE(sessions, SessionTests) BOOST_AUTO_TEST_CASE(connect_invalid_name) { - test_utils::CassLog::reset("Unable to resolve host node.domain-does-not-exist.dne:9042"); + test_utils::CassLog::reset("Unable to resolve address for node.domain-does-not-exist.dne"); CassError code; From edfe4b632247788fe56fc6df988ab1e5bc988306 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Thu, 11 Aug 2016 15:28:51 -0700 Subject: [PATCH 6/7] Fix: Properly terminate unresponsive connections --- src/connection.cpp | 64 +++++++++++++++++++++++++++------------------- src/connection.hpp | 6 +++-- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index b4d06c31d..4c96b42ec 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -158,7 +158,6 @@ Connection::HeartbeatHandler::HeartbeatHandler(Connection* connection) void Connection::HeartbeatHandler::on_set(ResponseMessage* response) { LOG_TRACE("Heartbeat completed on host %s", connection_->address_string().c_str()); - connection_->idle_start_time_ms_ = 0; connection_->heartbeat_outstanding_ = false; } @@ -196,7 +195,6 @@ Connection::Connection(uv_loop_t* loop, , response_(new ResponseMessage()) , stream_manager_(protocol_version) , ssl_session_(NULL) - , idle_start_time_ms_(0) , heartbeat_outstanding_(false) { socket_.data = this; uv_tcp_init(loop_, &socket_); @@ -237,10 +235,14 @@ void Connection::connect() { } bool Connection::write(Handler* handler, bool flush_immediately) { - return internal_write(handler, flush_immediately, true); + bool result = internal_write(handler, flush_immediately); + if (result) { + restart_heartbeat_timer(); + } + return result; } -bool Connection::internal_write(Handler* handler, bool flush_immediately, bool reset_idle_time) { +bool Connection::internal_write(Handler* handler, bool flush_immediately) { int stream = stream_manager_.acquire(handler); if (stream < 0) { return false; @@ -304,11 +306,6 @@ bool Connection::internal_write(Handler* handler, bool flush_immediately, bool r pending_write->flush(); } - if (reset_idle_time) { - idle_start_time_ms_ = 0; - restart_heartbeat_timer(); - } - return true; } @@ -344,6 +341,7 @@ void Connection::internal_close(ConnectionState close_state) { uv_handle_t* handle = copy_cast(&socket_); if (!uv_is_closing(handle)) { heartbeat_timer_.stop(); + terminate_timer_.stop(); connect_timer_.stop(); if (state_ == CONNECTION_STATE_CONNECTED || state_ == CONNECTION_STATE_READY) { @@ -428,6 +426,9 @@ void Connection::consume(char* input, size_t size) { char* buffer = input; size_t remaining = size; + // A successful read means the connection is still responsive + restart_terminate_timer(); + while (remaining != 0) { ssize_t consumed = response_->decode(buffer, remaining); if (consumed <= 0) { @@ -722,7 +723,7 @@ void Connection::on_timeout(Timer* timer) { } void Connection::on_connected() { - write(new StartupHandler(this, new OptionsRequest())); + internal_write(new StartupHandler(this, new OptionsRequest())); } void Connection::on_authenticate(const std::string& class_name) { @@ -742,7 +743,7 @@ void Connection::on_auth_challenge(const AuthResponseRequest* request, } AuthResponseRequest* auth_response = new AuthResponseRequest(response, request->auth()); - write(new StartupHandler(this, auth_response)); + internal_write(new StartupHandler(this, auth_response)); } void Connection::on_auth_success(const AuthResponseRequest* request, @@ -757,14 +758,14 @@ void Connection::on_auth_success(const AuthResponseRequest* request, void Connection::on_ready() { if (state_ == CONNECTION_STATE_CONNECTED && listener_->event_types() != 0) { set_state(CONNECTION_STATE_REGISTERING_EVENTS); - write(new StartupHandler(this, new RegisterRequest(listener_->event_types()))); + internal_write(new StartupHandler(this, new RegisterRequest(listener_->event_types()))); return; } if (keyspace_.empty()) { notify_ready(); } else { - write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\""))); + internal_write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\""))); } } @@ -779,7 +780,7 @@ void Connection::on_supported(ResponseMessage* response) { // TODO(mstump) do something with the supported info (void)supported; - write(new StartupHandler(this, new StartupRequest())); + internal_write(new StartupHandler(this, new StartupRequest())); } void Connection::on_pending_schema_agreement(Timer* timer) { @@ -794,6 +795,7 @@ void Connection::on_pending_schema_agreement(Timer* timer) { void Connection::notify_ready() { connect_timer_.stop(); restart_heartbeat_timer(); + restart_terminate_timer(); set_state(CONNECTION_STATE_READY); listener_->on_ready(this); } @@ -845,7 +847,7 @@ void Connection::send_credentials(const std::string& class_name) { if (v1_auth) { V1Authenticator::Credentials credentials; v1_auth->get_credentials(&credentials); - write(new StartupHandler(this, new CredentialsRequest(credentials))); + internal_write(new StartupHandler(this, new CredentialsRequest(credentials))); } else { send_initial_auth_response(class_name); } @@ -862,7 +864,7 @@ void Connection::send_initial_auth_response(const std::string& class_name) { return; } AuthResponseRequest* auth_response = new AuthResponseRequest(response, auth); - write(new StartupHandler(this, auth_response)); + internal_write(new StartupHandler(this, auth_response)); } } @@ -877,18 +879,8 @@ void Connection::restart_heartbeat_timer() { void Connection::on_heartbeat(Timer* timer) { Connection* connection = static_cast(timer->data()); - if (connection->idle_start_time_ms_ == 0) { - connection->idle_start_time_ms_ = get_time_since_epoch_ms(); - } else if ((get_time_since_epoch_ms() - connection->idle_start_time_ms_) / 1000 > - connection->config().connection_idle_timeout_secs()){ - connection->notify_error("Failed to send a heartbeat within connection idle interval. " - "Terminating connection...", - CONNECTION_ERROR_TIMEOUT); - return; - } - if (!connection->heartbeat_outstanding_) { - if (!connection->internal_write(new HeartbeatHandler(connection), true, false)) { + if (!connection->internal_write(new HeartbeatHandler(connection))) { // Recycling only this connection with a timeout error. This is unlikely and // it means the connection ran out of stream IDs as a result of requests // that never returned and as a result timed out. @@ -903,6 +895,24 @@ void Connection::on_heartbeat(Timer* timer) { connection->restart_heartbeat_timer(); } +void Connection::restart_terminate_timer() { + // The terminate timer shouldn't be started without having heartbeats enabled, + // otherwise connections would be terminated in periods of request inactivity. + if (config_.connection_heartbeat_interval_secs() > 0 && + config_.connection_idle_timeout_secs() > 0) { + terminate_timer_.start(loop_, + 1000 * config_.connection_idle_timeout_secs(), + this, on_terminate); + } +} + +void Connection::on_terminate(Timer* timer) { + Connection* connection = static_cast(timer->data()); + connection->notify_error("Failed to send a heartbeat within connection idle interval. " + "Terminating connection...", + CONNECTION_ERROR_TIMEOUT); +} + void Connection::PendingSchemaAgreement::stop_timer() { timer.stop(); } diff --git a/src/connection.hpp b/src/connection.hpp index dd46cae50..dec32d65c 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -253,7 +253,7 @@ class Connection { Timer timer; }; - bool internal_write(Handler* request, bool flush_immediately, bool reset_idle_time); + bool internal_write(Handler* request, bool flush_immediately = true); void internal_close(ConnectionState close_state); void set_state(ConnectionState state); void consume(char* input, size_t size); @@ -297,6 +297,8 @@ class Connection { void restart_heartbeat_timer(); static void on_heartbeat(Timer* timer); + void restart_terminate_timer(); + static void on_terminate(Timer* timer); private: ConnectionState state_; @@ -324,9 +326,9 @@ class Connection { Timer connect_timer_; ScopedPtr ssl_session_; - uint64_t idle_start_time_ms_; bool heartbeat_outstanding_; Timer heartbeat_timer_; + Timer terminate_timer_; // buffer reuse for libuv std::stack buffer_reuse_list_; From fbadda89543d01e530cc3e1067ff32155de4409a Mon Sep 17 00:00:00 2001 From: Michael Fero Date: Mon, 15 Aug 2016 19:06:12 +0000 Subject: [PATCH 7/7] [ci-skip] Removing noisy log output --- test/integration_tests/src/test_logging.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration_tests/src/test_logging.cpp b/test/integration_tests/src/test_logging.cpp index 7497cbf99..43cb48347 100644 --- a/test/integration_tests/src/test_logging.cpp +++ b/test/integration_tests/src/test_logging.cpp @@ -114,7 +114,6 @@ BOOST_AUTO_TEST_CASE(logging_connection_error_reduced) */ BOOST_AUTO_TEST_CASE(logging_pool_error_reduced) { - test_utils::CassLog::set_output_log_level(CASS_LOG_DEBUG); test_utils::CassLog::reset("Connection pool was unable to connect to host"); test_utils::CassLog::set_expected_log_level(CASS_LOG_ERROR);