diff --git a/include/cassandra.h b/include/cassandra.h
index 0fd4654da..bba2b4b8a 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -1701,6 +1701,26 @@ CASS_EXPORT CassError
cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
cass_bool_t enabled);
+/**
+ * Enable/Disable the randomization of the contact points list.
+ *
+ * Default: cass_true (enabled).
+ *
+ * Important: This setting should only be disabled for debugging or
+ * tests.
+ *
+ * @public @memberof CassCluster
+ *
+ * @param[in] cluster
+ * @param[in] enabled
+ * @return CASS_OK if successful, otherwise an error occurred
+ *
+ * @see cass_cluster_set_resolve_timeout()
+ */
+CASS_EXPORT CassError
+cass_cluster_set_use_randomized_contact_points(CassCluster* cluster,
+ cass_bool_t enabled);
+
/***********************************************************************************
*
* Session
diff --git a/src/cluster.cpp b/src/cluster.cpp
index 1cd1ca5d1..217cfb86b 100644
--- a/src/cluster.cpp
+++ b/src/cluster.cpp
@@ -415,6 +415,12 @@ CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
#endif
}
+CassError cass_cluster_set_use_randomized_contact_points(CassCluster* cluster,
+ cass_bool_t enabled) {
+ cluster->config().set_use_randomized_contact_points(enabled);
+ return CASS_OK;
+}
+
void cass_cluster_free(CassCluster* cluster) {
delete cluster->from();
}
diff --git a/src/config.hpp b/src/config.hpp
index 180938726..e3538ccb7 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -74,7 +74,8 @@ class Config {
, timestamp_gen_(new ServerSideTimestampGenerator())
, retry_policy_(new DefaultRetryPolicy())
, use_schema_(true)
- , use_hostname_resolution_(false) { }
+ , use_hostname_resolution_(false)
+ , use_randomized_contact_points_(true) { }
unsigned thread_count_io() const { return thread_count_io_; }
@@ -359,6 +360,11 @@ class Config {
use_hostname_resolution_ = enable;
}
+ bool use_randomized_contact_points() const { return use_randomized_contact_points_; }
+ void set_use_randomized_contact_points(bool enable) {
+ use_randomized_contact_points_ = enable;
+ }
+
private:
int port_;
int protocol_version_;
@@ -402,6 +408,7 @@ class Config {
SharedRefPtr retry_policy_;
bool use_schema_;
bool use_hostname_resolution_;
+ bool use_randomized_contact_points_;
};
} // namespace cass
diff --git a/src/connection.cpp b/src/connection.cpp
index b4d06c31d..4c96b42ec 100644
--- a/src/connection.cpp
+++ b/src/connection.cpp
@@ -158,7 +158,6 @@ Connection::HeartbeatHandler::HeartbeatHandler(Connection* connection)
void Connection::HeartbeatHandler::on_set(ResponseMessage* response) {
LOG_TRACE("Heartbeat completed on host %s",
connection_->address_string().c_str());
- connection_->idle_start_time_ms_ = 0;
connection_->heartbeat_outstanding_ = false;
}
@@ -196,7 +195,6 @@ Connection::Connection(uv_loop_t* loop,
, response_(new ResponseMessage())
, stream_manager_(protocol_version)
, ssl_session_(NULL)
- , idle_start_time_ms_(0)
, heartbeat_outstanding_(false) {
socket_.data = this;
uv_tcp_init(loop_, &socket_);
@@ -237,10 +235,14 @@ void Connection::connect() {
}
bool Connection::write(Handler* handler, bool flush_immediately) {
- return internal_write(handler, flush_immediately, true);
+ bool result = internal_write(handler, flush_immediately);
+ if (result) {
+ restart_heartbeat_timer();
+ }
+ return result;
}
-bool Connection::internal_write(Handler* handler, bool flush_immediately, bool reset_idle_time) {
+bool Connection::internal_write(Handler* handler, bool flush_immediately) {
int stream = stream_manager_.acquire(handler);
if (stream < 0) {
return false;
@@ -304,11 +306,6 @@ bool Connection::internal_write(Handler* handler, bool flush_immediately, bool r
pending_write->flush();
}
- if (reset_idle_time) {
- idle_start_time_ms_ = 0;
- restart_heartbeat_timer();
- }
-
return true;
}
@@ -344,6 +341,7 @@ void Connection::internal_close(ConnectionState close_state) {
uv_handle_t* handle = copy_cast(&socket_);
if (!uv_is_closing(handle)) {
heartbeat_timer_.stop();
+ terminate_timer_.stop();
connect_timer_.stop();
if (state_ == CONNECTION_STATE_CONNECTED ||
state_ == CONNECTION_STATE_READY) {
@@ -428,6 +426,9 @@ void Connection::consume(char* input, size_t size) {
char* buffer = input;
size_t remaining = size;
+ // A successful read means the connection is still responsive
+ restart_terminate_timer();
+
while (remaining != 0) {
ssize_t consumed = response_->decode(buffer, remaining);
if (consumed <= 0) {
@@ -722,7 +723,7 @@ void Connection::on_timeout(Timer* timer) {
}
void Connection::on_connected() {
- write(new StartupHandler(this, new OptionsRequest()));
+ internal_write(new StartupHandler(this, new OptionsRequest()));
}
void Connection::on_authenticate(const std::string& class_name) {
@@ -742,7 +743,7 @@ void Connection::on_auth_challenge(const AuthResponseRequest* request,
}
AuthResponseRequest* auth_response = new AuthResponseRequest(response,
request->auth());
- write(new StartupHandler(this, auth_response));
+ internal_write(new StartupHandler(this, auth_response));
}
void Connection::on_auth_success(const AuthResponseRequest* request,
@@ -757,14 +758,14 @@ void Connection::on_auth_success(const AuthResponseRequest* request,
void Connection::on_ready() {
if (state_ == CONNECTION_STATE_CONNECTED && listener_->event_types() != 0) {
set_state(CONNECTION_STATE_REGISTERING_EVENTS);
- write(new StartupHandler(this, new RegisterRequest(listener_->event_types())));
+ internal_write(new StartupHandler(this, new RegisterRequest(listener_->event_types())));
return;
}
if (keyspace_.empty()) {
notify_ready();
} else {
- write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\"")));
+ internal_write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\"")));
}
}
@@ -779,7 +780,7 @@ void Connection::on_supported(ResponseMessage* response) {
// TODO(mstump) do something with the supported info
(void)supported;
- write(new StartupHandler(this, new StartupRequest()));
+ internal_write(new StartupHandler(this, new StartupRequest()));
}
void Connection::on_pending_schema_agreement(Timer* timer) {
@@ -794,6 +795,7 @@ void Connection::on_pending_schema_agreement(Timer* timer) {
void Connection::notify_ready() {
connect_timer_.stop();
restart_heartbeat_timer();
+ restart_terminate_timer();
set_state(CONNECTION_STATE_READY);
listener_->on_ready(this);
}
@@ -845,7 +847,7 @@ void Connection::send_credentials(const std::string& class_name) {
if (v1_auth) {
V1Authenticator::Credentials credentials;
v1_auth->get_credentials(&credentials);
- write(new StartupHandler(this, new CredentialsRequest(credentials)));
+ internal_write(new StartupHandler(this, new CredentialsRequest(credentials)));
} else {
send_initial_auth_response(class_name);
}
@@ -862,7 +864,7 @@ void Connection::send_initial_auth_response(const std::string& class_name) {
return;
}
AuthResponseRequest* auth_response = new AuthResponseRequest(response, auth);
- write(new StartupHandler(this, auth_response));
+ internal_write(new StartupHandler(this, auth_response));
}
}
@@ -877,18 +879,8 @@ void Connection::restart_heartbeat_timer() {
void Connection::on_heartbeat(Timer* timer) {
Connection* connection = static_cast(timer->data());
- if (connection->idle_start_time_ms_ == 0) {
- connection->idle_start_time_ms_ = get_time_since_epoch_ms();
- } else if ((get_time_since_epoch_ms() - connection->idle_start_time_ms_) / 1000 >
- connection->config().connection_idle_timeout_secs()){
- connection->notify_error("Failed to send a heartbeat within connection idle interval. "
- "Terminating connection...",
- CONNECTION_ERROR_TIMEOUT);
- return;
- }
-
if (!connection->heartbeat_outstanding_) {
- if (!connection->internal_write(new HeartbeatHandler(connection), true, false)) {
+ if (!connection->internal_write(new HeartbeatHandler(connection))) {
// Recycling only this connection with a timeout error. This is unlikely and
// it means the connection ran out of stream IDs as a result of requests
// that never returned and as a result timed out.
@@ -903,6 +895,24 @@ void Connection::on_heartbeat(Timer* timer) {
connection->restart_heartbeat_timer();
}
+void Connection::restart_terminate_timer() {
+ // The terminate timer shouldn't be started without having heartbeats enabled,
+ // otherwise connections would be terminated in periods of request inactivity.
+ if (config_.connection_heartbeat_interval_secs() > 0 &&
+ config_.connection_idle_timeout_secs() > 0) {
+ terminate_timer_.start(loop_,
+ 1000 * config_.connection_idle_timeout_secs(),
+ this, on_terminate);
+ }
+}
+
+void Connection::on_terminate(Timer* timer) {
+ Connection* connection = static_cast(timer->data());
+ connection->notify_error("Failed to send a heartbeat within connection idle interval. "
+ "Terminating connection...",
+ CONNECTION_ERROR_TIMEOUT);
+}
+
void Connection::PendingSchemaAgreement::stop_timer() {
timer.stop();
}
diff --git a/src/connection.hpp b/src/connection.hpp
index 8c98d07bc..6bdc2984a 100644
--- a/src/connection.hpp
+++ b/src/connection.hpp
@@ -254,7 +254,7 @@ class Connection {
Timer timer;
};
- bool internal_write(Handler* request, bool flush_immediately, bool reset_idle_time);
+ bool internal_write(Handler* request, bool flush_immediately = true);
void internal_close(ConnectionState close_state);
void set_state(ConnectionState state);
void consume(char* input, size_t size);
@@ -298,6 +298,8 @@ class Connection {
void restart_heartbeat_timer();
static void on_heartbeat(Timer* timer);
+ void restart_terminate_timer();
+ static void on_terminate(Timer* timer);
private:
ConnectionState state_;
@@ -325,9 +327,9 @@ class Connection {
Timer connect_timer_;
ScopedPtr ssl_session_;
- uint64_t idle_start_time_ms_;
bool heartbeat_outstanding_;
Timer heartbeat_timer_;
+ Timer terminate_timer_;
// buffer reuse for libuv
std::stack buffer_reuse_list_;
diff --git a/src/control_connection.cpp b/src/control_connection.cpp
index 1665cce06..bbddffa8f 100644
--- a/src/control_connection.cpp
+++ b/src/control_connection.cpp
@@ -29,7 +29,9 @@
#include "session.hpp"
#include "timer.hpp"
+#include
#include
+#include
#include
#include
@@ -58,20 +60,25 @@ namespace cass {
class ControlStartupQueryPlan : public QueryPlan {
public:
- ControlStartupQueryPlan(const HostMap& hosts)
- : hosts_(hosts)
- , it_(hosts_.begin()) {}
+ ControlStartupQueryPlan(const HostMap& hosts, Random* random)
+ : index_(random != NULL ? random->next(std::max(static_cast(1), hosts.size())) : 0)
+ , count_(0) {
+ hosts_.reserve(hosts.size());
+ std::transform(hosts.begin(), hosts.end(), std::back_inserter(hosts_), GetHost());
+ }
virtual SharedRefPtr compute_next() {
- if (it_ == hosts_.end()) return SharedRefPtr();
- const SharedRefPtr& host = it_->second;
- ++it_;
- return host;
+ const size_t size = hosts_.size();
+ if (count_ >= size) return SharedRefPtr();
+ size_t index = (index_ + count_) % size;
+ ++count_;
+ return hosts_[index];
}
private:
- const HostMap hosts_;
- HostMap::const_iterator it_;
+ HostVec hosts_;
+ size_t index_;
+ size_t count_;
};
bool ControlConnection::determine_address_for_peer_host(const Address& connected_address,
@@ -133,7 +140,8 @@ void ControlConnection::clear() {
void ControlConnection::connect(Session* session) {
session_ = session;
- query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_)); // No hosts lock necessary (read-only)
+ query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_, // No hosts lock necessary (read-only)
+ session_->random_.get()));
protocol_version_ = session_->config().protocol_version();
use_schema_ = session_->config().use_schema();
token_aware_routing_ = session_->config().token_aware_routing();
diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp
index 6dbe6e3f7..036975c8e 100644
--- a/src/dc_aware_policy.cpp
+++ b/src/dc_aware_policy.cpp
@@ -20,12 +20,15 @@
#include "scoped_lock.hpp"
-namespace cass {
+#include
+namespace cass {
static const CopyOnWriteHostVec NO_HOSTS(new HostVec());
-void DCAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap& hosts) {
+void DCAwarePolicy::init(const SharedRefPtr& connected_host,
+ const HostMap& hosts,
+ Random* random) {
if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) {
LOG_INFO("Using '%s' for the local data center "
"(if this is incorrect, please provide the correct data center)",
@@ -37,6 +40,9 @@ void DCAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap
end = hosts.end(); i != end; ++i) {
on_add(i->second);
}
+ if (random != NULL) {
+ index_ = random->next(std::max(static_cast(1), hosts.size()));
+ }
}
CassHostDistance DCAwarePolicy::distance(const SharedRefPtr& host) const {
diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp
index 9eb6cc74f..21b001540 100644
--- a/src/dc_aware_policy.hpp
+++ b/src/dc_aware_policy.hpp
@@ -46,7 +46,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
, local_dc_live_hosts_(new HostVec)
, index_(0) {}
- virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts);
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random);
virtual CassHostDistance distance(const SharedRefPtr& host) const;
diff --git a/src/host.cpp b/src/host.cpp
index 120e0efe1..217625efb 100644
--- a/src/host.cpp
+++ b/src/host.cpp
@@ -18,14 +18,6 @@
namespace cass {
-void copy_hosts(const HostMap& from_hosts, CopyOnWriteHostVec& to_hosts) {
- to_hosts->reserve(from_hosts.size());
- for (HostMap::const_iterator i = from_hosts.begin(),
- end = from_hosts.end(); i != end; ++i) {
- to_hosts->push_back(i->second);
- }
-}
-
void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host) {
HostVec::iterator i;
for (i = hosts->begin(); i != hosts->end(); ++i) {
diff --git a/src/host.hpp b/src/host.hpp
index 928a641ed..c0a0a9a51 100644
--- a/src/host.hpp
+++ b/src/host.hpp
@@ -247,11 +247,16 @@ class Host : public RefCounted {
};
typedef std::map > HostMap;
+struct GetHost {
+ typedef std::pair Pair;
+ Host::Ptr operator()(const Pair& pair) const {
+ return pair.second;
+ }
+};
typedef std::pair > HostPair;
typedef std::vector > HostVec;
typedef CopyOnWritePtr CopyOnWriteHostVec;
-void copy_hosts(const HostMap& from_hosts, CopyOnWriteHostVec& to_hosts);
void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host);
void remove_host(CopyOnWriteHostVec& hosts, const SharedRefPtr& host);
diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp
index 005543c56..ce0061f3b 100644
--- a/src/latency_aware_policy.cpp
+++ b/src/latency_aware_policy.cpp
@@ -19,15 +19,21 @@
#include "get_time.hpp"
#include "logger.hpp"
+#include
+#include
+
namespace cass {
-void LatencyAwarePolicy::init(const SharedRefPtr& connected_host, const HostMap& hosts) {
- copy_hosts(hosts, hosts_);
+void LatencyAwarePolicy::init(const SharedRefPtr& connected_host,
+ const HostMap& hosts,
+ Random* random) {
+ hosts_->reserve(hosts.size());
+ std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost());
for (HostMap::const_iterator i = hosts.begin(),
end = hosts.end(); i != end; ++i) {
i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured);
}
- ChainedLoadBalancingPolicy::init(connected_host, hosts);
+ ChainedLoadBalancingPolicy::init(connected_host, hosts, random);
}
void LatencyAwarePolicy::register_handles(uv_loop_t* loop) {
diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp
index 1f2ee4a6a..824608e70 100644
--- a/src/latency_aware_policy.hpp
+++ b/src/latency_aware_policy.hpp
@@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy {
virtual ~LatencyAwarePolicy() {}
- virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts);
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random);
virtual void register_handles(uv_loop_t* loop);
virtual void close_handles();
diff --git a/src/list_policy.cpp b/src/list_policy.cpp
index 891f6aa83..aea9b9ddb 100644
--- a/src/list_policy.cpp
+++ b/src/list_policy.cpp
@@ -16,21 +16,27 @@
#include "list_policy.hpp"
+#include "logger.hpp"
+
namespace cass {
void ListPolicy::init(const SharedRefPtr& connected_host,
- const HostMap& hosts) {
- HostMap whitelist_hosts;
+ const HostMap& hosts,
+ Random* random) {
+ HostMap valid_hosts;
for (HostMap::const_iterator i = hosts.begin(),
end = hosts.end(); i != end; ++i) {
const SharedRefPtr& host = i->second;
if (is_valid_host(host)) {
- whitelist_hosts.insert(HostPair(i->first, host));
+ valid_hosts.insert(HostPair(i->first, host));
}
}
- assert(!whitelist_hosts.empty());
- child_policy_->init(connected_host, whitelist_hosts);
+ if (valid_hosts.empty()) {
+ LOG_ERROR("No valid hosts available for list policy");
+ }
+
+ ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random);
}
CassHostDistance ListPolicy::distance(const SharedRefPtr& host) const {
@@ -41,9 +47,9 @@ CassHostDistance ListPolicy::distance(const SharedRefPtr& host) const {
}
QueryPlan* ListPolicy::new_query_plan(const std::string& connected_keyspace,
- const Request* request,
- const TokenMap* token_map,
- Request::EncodingCache* cache) {
+ const Request* request,
+ const TokenMap* token_map,
+ Request::EncodingCache* cache) {
return child_policy_->new_query_plan(connected_keyspace,
request,
token_map,
diff --git a/src/list_policy.hpp b/src/list_policy.hpp
index 5dfbc6e9e..32cff537a 100644
--- a/src/list_policy.hpp
+++ b/src/list_policy.hpp
@@ -30,7 +30,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy {
virtual ~ListPolicy() {}
- virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts);
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random);
virtual CassHostDistance distance(const SharedRefPtr& host) const;
diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp
index 8c1731204..44e89e39d 100644
--- a/src/load_balancing.hpp
+++ b/src/load_balancing.hpp
@@ -51,6 +51,7 @@ typedef enum CassHostDistance_ {
namespace cass {
+class Random;
class RoutableRequest;
class TokenMap;
@@ -80,7 +81,7 @@ class LoadBalancingPolicy : public Host::StateListener, public RefCounted& connected_host, const HostMap& hosts) = 0;
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random) = 0;
virtual void register_handles(uv_loop_t* loop) {}
virtual void close_handles() {}
@@ -103,8 +104,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
virtual ~ChainedLoadBalancingPolicy() {}
- virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts) {
- return child_policy_->init(connected_host, hosts);
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random) {
+ return child_policy_->init(connected_host, hosts, random);
}
virtual CassHostDistance distance(const SharedRefPtr& host) const { return child_policy_->distance(host); }
diff --git a/src/random.cpp b/src/random.cpp
index 618804ce7..a22019bdf 100644
--- a/src/random.cpp
+++ b/src/random.cpp
@@ -14,6 +14,12 @@
limitations under the License.
*/
+#include "random.hpp"
+
+#include "cassandra.h"
+#include "logger.hpp"
+#include "scoped_lock.hpp"
+
#if defined(_WIN32)
#ifndef _WINSOCKAPI_
#define _WINSOCKAPI_
@@ -29,10 +35,22 @@
#include
#endif
-#include "logger.hpp"
-
namespace cass {
+Random::Random()
+ // Use high resolution time if we can't get a real random seed
+ : rng_(get_random_seed(uv_hrtime())) {
+}
+
+uint64_t Random::next(uint64_t max) {
+ const uint64_t limit = CASS_UINT64_MAX - CASS_UINT64_MAX % max;
+ uint64_t r;
+ do {
+ r = rng_();
+ } while(r >= limit);
+ return r % max;
+}
+
#if defined(_WIN32)
uint64_t get_random_seed(uint64_t seed) {
diff --git a/src/random.hpp b/src/random.hpp
index be6177567..38f1923dc 100644
--- a/src/random.hpp
+++ b/src/random.hpp
@@ -19,8 +19,20 @@
#include "third_party/mt19937_64/mt19937_64.hpp"
+#include
+
namespace cass {
+class Random {
+public:
+ Random();
+
+ uint64_t next(uint64_t max);
+
+private:
+ MT19937_64 rng_;
+};
+
uint64_t get_random_seed(uint64_t seed);
} // namespace cass
diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp
new file mode 100644
index 000000000..f4405f65f
--- /dev/null
+++ b/src/round_robin_policy.cpp
@@ -0,0 +1,72 @@
+/*
+ Copyright (c) 2014-2016 DataStax
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#include "round_robin_policy.hpp"
+
+#include
+#include
+
+namespace cass {
+
+void RoundRobinPolicy::init(const SharedRefPtr& connected_host,
+ const HostMap& hosts,
+ Random* random) {
+ hosts_->reserve(hosts.size());
+ std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost());
+ if (random != NULL) {
+ index_ = random->next(std::max(static_cast(1), hosts.size()));
+ }
+}
+
+CassHostDistance RoundRobinPolicy::distance(const SharedRefPtr& host) const {
+ return CASS_HOST_DISTANCE_LOCAL;
+}
+
+QueryPlan* RoundRobinPolicy::new_query_plan(const std::string& connected_keyspace,
+ const Request* request,
+ const TokenMap* token_map,
+ Request::EncodingCache* cache) {
+ return new RoundRobinQueryPlan(hosts_, index_++);
+}
+
+void RoundRobinPolicy::on_add(const SharedRefPtr& host) {
+ add_host(hosts_, host);
+}
+
+void RoundRobinPolicy::on_remove(const SharedRefPtr& host) {
+ remove_host(hosts_, host);
+}
+
+void RoundRobinPolicy::on_up(const SharedRefPtr& host) {
+ on_add(host);
+}
+
+void RoundRobinPolicy::on_down(const SharedRefPtr& host) {
+ on_remove(host);
+}
+
+SharedRefPtr RoundRobinPolicy::RoundRobinQueryPlan::compute_next() {
+ while (remaining_ > 0) {
+ --remaining_;
+ const SharedRefPtr& host((*hosts_)[index_++ % hosts_->size()]);
+ if (host->is_up()) {
+ return host;
+ }
+ }
+ return SharedRefPtr();
+}
+
+} // namespace cass
diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp
index b6dec324b..14fa780a8 100644
--- a/src/round_robin_policy.hpp
+++ b/src/round_robin_policy.hpp
@@ -21,8 +21,7 @@
#include "copy_on_write_ptr.hpp"
#include "load_balancing.hpp"
#include "host.hpp"
-
-#include
+#include "random.hpp"
namespace cass {
@@ -30,38 +29,21 @@ class RoundRobinPolicy : public LoadBalancingPolicy {
public:
RoundRobinPolicy()
: hosts_(new HostVec)
- , index_(0) {}
+ , index_(0) { }
- virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts) {
- copy_hosts(hosts, hosts_);
- }
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random);
- virtual CassHostDistance distance(const SharedRefPtr& host) const {
- return CASS_HOST_DISTANCE_LOCAL;
- }
+ virtual CassHostDistance distance(const SharedRefPtr& host) const;
virtual QueryPlan* new_query_plan(const std::string& connected_keyspace,
const Request* request,
const TokenMap* token_map,
- Request::EncodingCache* cache) {
- return new RoundRobinQueryPlan(hosts_, index_++);
- }
-
- virtual void on_add(const SharedRefPtr& host) {
- add_host(hosts_, host);
- }
-
- virtual void on_remove(const SharedRefPtr& host) {
- remove_host(hosts_, host);
- }
+ Request::EncodingCache* cache);
- virtual void on_up(const SharedRefPtr& host) {
- on_add(host);
- }
-
- virtual void on_down(const SharedRefPtr& host) {
- on_remove(host);
- }
+ virtual void on_add(const SharedRefPtr& host);
+ virtual void on_remove(const SharedRefPtr& host);
+ virtual void on_up(const SharedRefPtr& host);
+ virtual void on_down(const SharedRefPtr& host);
virtual LoadBalancingPolicy* new_instance() { return new RoundRobinPolicy(); }
@@ -71,18 +53,9 @@ class RoundRobinPolicy : public LoadBalancingPolicy {
RoundRobinQueryPlan(const CopyOnWriteHostVec& hosts, size_t start_index)
: hosts_(hosts)
, index_(start_index)
- , remaining_(hosts->size()) {}
-
- SharedRefPtr compute_next() {
- while (remaining_ > 0) {
- --remaining_;
- const SharedRefPtr& host((*hosts_)[index_++ % hosts_->size()]);
- if (host->is_up()) {
- return host;
- }
- }
- return SharedRefPtr();
- }
+ , remaining_(hosts->size()) { }
+
+ virtual SharedRefPtr compute_next();
private:
const CopyOnWriteHostVec hosts_;
diff --git a/src/session.cpp b/src/session.cpp
index 68e21026a..b44f2fe18 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -153,6 +153,7 @@ Session::~Session() {
void Session::clear(const Config& config) {
config_ = config;
+ random_.reset();
metrics_.reset(new Metrics(config_.thread_count_io() + 1));
load_balancing_policy_.reset(config.load_balancing_policy());
connect_future_.reset();
@@ -419,6 +420,12 @@ void Session::on_event(const SessionEvent& event) {
case SessionEvent::CONNECT: {
int port = config_.port();
+ // This needs to be done on the session thread because it could pause
+ // generating a new random seed.
+ if (config_.use_randomized_contact_points()) {
+ random_.reset(new Random());
+ }
+
MultiResolver::Ptr resolver(
new MultiResolver(this, on_resolve,
#if UV_VERSION_MAJOR >= 1
@@ -548,7 +555,7 @@ void Session::on_add_resolve_name(NameResolver* resolver) {
void Session::on_control_connection_ready() {
// No hosts lock necessary (only called on session thread and read-only)
- load_balancing_policy_->init(control_connection_.connected_host(), hosts_);
+ load_balancing_policy_->init(control_connection_.connected_host(), hosts_, random_.get());
load_balancing_policy_->register_handles(loop());
for (IOWorkerVec::iterator it = io_workers_.begin(),
end = io_workers_.end(); it != end; ++it) {
diff --git a/src/session.hpp b/src/session.hpp
index 61372d085..56e6c357b 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -27,6 +27,7 @@
#include "metadata.hpp"
#include "metrics.hpp"
#include "mpmc_queue.hpp"
+#include "random.hpp"
#include "ref_counted.hpp"
#include "resolver.hpp"
#include "row.hpp"
@@ -203,7 +204,7 @@ class Session : public EventThread {
ScopedPtr token_map_;
Metadata metadata_;
-
+ ScopedPtr random_;
ControlConnection control_connection_;
bool current_host_mark_;
int pending_pool_count_;
diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp
index 3615573d1..47110db02 100644
--- a/src/token_aware_policy.cpp
+++ b/src/token_aware_policy.cpp
@@ -16,6 +16,10 @@
#include "token_aware_policy.hpp"
+#include "random.hpp"
+
+#include
+
namespace cass {
// The number of replicas is bounded by replication factor per DC. In practice, the number
@@ -30,6 +34,15 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a
return false;
}
+void TokenAwarePolicy::init(const SharedRefPtr& connected_host,
+ const HostMap& hosts,
+ Random* random) {
+ if (random != NULL) {
+ index_ = random->next(std::max(static_cast(1), hosts.size()));
+ }
+ ChainedLoadBalancingPolicy::init(connected_host, hosts, random);
+}
+
QueryPlan* TokenAwarePolicy::new_query_plan(const std::string& connected_keyspace,
const Request* request,
const TokenMap* token_map,
diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp
index e1ea6d191..2b86f66fb 100644
--- a/src/token_aware_policy.hpp
+++ b/src/token_aware_policy.hpp
@@ -32,6 +32,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy {
virtual ~TokenAwarePolicy() {}
+ virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random);
+
virtual QueryPlan* new_query_plan(const std::string& connected_keyspace,
const Request* request,
const TokenMap* token_map,
diff --git a/test/integration_tests/src/test_basics.cpp b/test/integration_tests/src/test_basics.cpp
index 123afb247..20d13075e 100644
--- a/test/integration_tests/src/test_basics.cpp
+++ b/test/integration_tests/src/test_basics.cpp
@@ -537,7 +537,7 @@ BOOST_AUTO_TEST_CASE(rows_in_rows_out)
// Create insert statement for bound parameters
std::string insert_query(boost::str(boost::format("INSERT INTO %s (tweet_id, t1, t2, t3) VALUES (?, ?, ?, ?);") % test_utils::SIMPLE_TABLE));
- const size_t num_rows = 100000;
+ const size_t num_rows = 1000;
for (size_t i = 0; i < num_rows; ++i) {
test_utils::CassStatementPtr statement(cass_statement_new(insert_query.c_str(), 4));
diff --git a/test/integration_tests/src/test_consistency.cpp b/test/integration_tests/src/test_consistency.cpp
index a0598d3c1..350e4dda0 100644
--- a/test/integration_tests/src/test_consistency.cpp
+++ b/test/integration_tests/src/test_consistency.cpp
@@ -199,7 +199,8 @@ BOOST_AUTO_TEST_CASE(retry_policy_downgrading)
test_utils::CassClusterPtr cluster(cass_cluster_new());
CassRetryPolicy* downgrading_policy = cass_retry_policy_downgrading_consistency_new();
cass_cluster_set_retry_policy(cluster.get(), downgrading_policy);
- cass_cluster_set_connection_heartbeat_interval(cluster.get(), 0);
+ cass_cluster_set_connection_heartbeat_interval(cluster.get(), 1);
+ cass_cluster_set_connection_idle_timeout(cluster.get(), 10);
if (ccm->create_cluster(3)) {
ccm->start_cluster();
diff --git a/test/integration_tests/src/test_control_connection.cpp b/test/integration_tests/src/test_control_connection.cpp
index 6ef9cb828..2f19e0569 100644
--- a/test/integration_tests/src/test_control_connection.cpp
+++ b/test/integration_tests/src/test_control_connection.cpp
@@ -39,21 +39,32 @@ struct ControlConnectionTests {
, ip_prefix(ccm->get_ip_prefix())
, version(test_utils::get_version()) {}
- void check_for_live_hosts(test_utils::CassSessionPtr session,
+ std::string get_executing_host(test_utils::CassSessionPtr session) {
+ std::stringstream query;
+ query << "SELECT * FROM " << (version >= "3.0.0" ? "system_schema.keyspaces" : "system.schema_keyspaces");
+ test_utils::CassStatementPtr statement(cass_statement_new(query.str().c_str(), 0));
+ test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get()));
+ if (cass_future_error_code(future.get()) == CASS_OK) {
+ return cass::get_host_from_future(future.get());
+ } else {
+ CassString message;
+ cass_future_error_message(future.get(), &message.data, &message.length);
+ std::cerr << "Failed to query host: " << std::string(message.data, message.length) << std::endl;
+ }
+
+ return "";
+ }
+
+ void check_for_live_hosts(test_utils::CassSessionPtr session,
const std::set& should_be_present) {
std::set hosts;
std::stringstream query;
query << "SELECT * FROM " << (version >= "3.0.0" ? "system_schema.keyspaces" : "system.schema_keyspaces");
for (size_t i = 0; i < should_be_present.size() + 2; ++i) {
- test_utils::CassStatementPtr statement(cass_statement_new(query.str().c_str(), 0));
- test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get()));
- if (cass_future_error_code(future.get()) == CASS_OK) {
- hosts.insert(cass::get_host_from_future(future.get()));
- } else {
- CassString message;
- cass_future_error_message(future.get(), &message.data, &message.length);
- std::cerr << "Failed to query host: " << std::string(message.data, message.length) << std::endl;
+ std::string host = get_executing_host(session);
+ if (!host.empty()) {
+ hosts.insert(host);
}
}
@@ -395,4 +406,63 @@ BOOST_AUTO_TEST_CASE(node_decommission)
ccm->remove_cluster();
}
+/**
+ * Randomized contact points
+ *
+ * This test ensures the driver will randomize the contact points when executing
+ * a query plan
+ *
+ * @since 2.4.3
+ * @jira_ticket CPP-193
+ * @test_category control_connection
+ */
+BOOST_AUTO_TEST_CASE(randomized_contact_points)
+{
+ std::string starting_host = ip_prefix + "1";
+ size_t retries = 0;
+ test_utils::CassSessionPtr session;
+
+ {
+ test_utils::CassClusterPtr cluster(cass_cluster_new());
+ if (ccm->create_cluster(4)) {
+ ccm->start_cluster();
+ }
+
+ test_utils::initialize_contact_points(cluster.get(), ip_prefix, 4);
+ cass_cluster_set_use_randomized_contact_points(cluster.get(), cass_true);
+
+ // Make sure the first host executing a statement is not .1
+ do {
+ test_utils::CassLog::reset("Adding pool for host " + ip_prefix);
+ session = test_utils::CassSessionPtr(test_utils::create_session(cluster.get()));
+
+ // Wait for all hosts to be added to the pool; timeout after 10 seconds
+ boost::chrono::steady_clock::time_point end = boost::chrono::steady_clock::now() + boost::chrono::milliseconds(10000);
+ while (test_utils::CassLog::message_count() != 4ul && boost::chrono::steady_clock::now() < end) {
+ boost::this_thread::sleep_for(boost::chrono::seconds(1));
+ }
+ BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 4ul);
+
+ starting_host = get_executing_host(session);
+ } while (starting_host == ip_prefix + "1" && retries++ < 10);
+ }
+
+ BOOST_CHECK_NE(ip_prefix + "1", starting_host);
+ BOOST_CHECK_LT(retries, 10);
+
+ // Ensure the remaining hosts are executed (round robin))
+ {
+ int node = starting_host.at(starting_host.length() - 1) - '0';
+ for (int i = 0; i < 3; ++i) {
+ node = (node + 1 > 4) ? 1 : node + 1;
+ std::string expected_host = ip_prefix + boost::lexical_cast(node);
+ std::string host = get_executing_host(session);
+ BOOST_CHECK_EQUAL(expected_host, host);
+ }
+ }
+
+ // Ensure the next host is the starting host
+ BOOST_CHECK_EQUAL(starting_host, get_executing_host(session));
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/test/integration_tests/src/test_logging.cpp b/test/integration_tests/src/test_logging.cpp
index cdf345c74..8cec5a960 100644
--- a/test/integration_tests/src/test_logging.cpp
+++ b/test/integration_tests/src/test_logging.cpp
@@ -134,7 +134,11 @@ BOOST_AUTO_TEST_CASE(logging_pool_error_reduced)
// Create a connection error by pausing the node during async connection (ERROR)
test_utils::CassFuturePtr connect_future(cass_session_connect(session.get(), cluster.get()));
ccm->pause_node(1);
- cass_future_error_code(connect_future.get());
+ while (CASS_ERROR_LIB_NO_HOSTS_AVAILABLE == cass_future_error_code(connect_future.get())) {
+ ccm->resume_node(1);
+ connect_future = test_utils::CassFuturePtr(cass_session_connect(session.get(), cluster.get()));
+ ccm->pause_node(1);
+ }
BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 1);
// Sleep to allow the connection pool failure on the paused node (WARN)
diff --git a/test/integration_tests/src/test_pool.cpp b/test/integration_tests/src/test_pool.cpp
index 3ca1f4ab8..5ad084d6a 100644
--- a/test/integration_tests/src/test_pool.cpp
+++ b/test/integration_tests/src/test_pool.cpp
@@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(no_hosts_backpressure)
BOOST_AUTO_TEST_CASE(connection_spawn)
{
TestPool tester;
- const std::string SPAWN_MSG = "Spawning new connection to host " + tester.ccm->get_ip_prefix() + "1:9042";
+ const std::string SPAWN_MSG = "Spawning new connection to host " + tester.ccm->get_ip_prefix() + "1";
test_utils::CassLog::reset(SPAWN_MSG);
test_utils::MultipleNodesTest inst(1, 0);
diff --git a/test/integration_tests/src/test_schema_metadata.cpp b/test/integration_tests/src/test_schema_metadata.cpp
index 8d14c6316..2e5612b2e 100644
--- a/test/integration_tests/src/test_schema_metadata.cpp
+++ b/test/integration_tests/src/test_schema_metadata.cpp
@@ -1494,7 +1494,7 @@ BOOST_AUTO_TEST_CASE(lookup) {
BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_BIGINT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 1)));
datatype = cass_aggregate_meta_return_type(agg_meta);
BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_DOUBLE, cass_data_type_type(datatype));
- func_meta = cass_aggregate_meta_state_func(agg_meta);
+ func_meta = cass_aggregate_meta_final_func(agg_meta);
BOOST_CHECK_EQUAL(1, cass_function_meta_argument_count(func_meta));
datatype = cass_function_meta_argument_type_by_name(func_meta, "state");
BOOST_CHECK_EQUAL(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype));
diff --git a/test/integration_tests/src/test_sessions.cpp b/test/integration_tests/src/test_sessions.cpp
index 001397503..27bb63a3d 100644
--- a/test/integration_tests/src/test_sessions.cpp
+++ b/test/integration_tests/src/test_sessions.cpp
@@ -51,7 +51,7 @@ BOOST_FIXTURE_TEST_SUITE(sessions, SessionTests)
BOOST_AUTO_TEST_CASE(connect_invalid_name)
{
- test_utils::CassLog::reset("Unable to resolve host node.domain-does-not-exist.dne:9042");
+ test_utils::CassLog::reset("Unable to resolve address for node.domain-does-not-exist.dne");
CassError code;
diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp
index 254358a62..f01d031ef 100644
--- a/test/integration_tests/src/test_utils.cpp
+++ b/test/integration_tests/src/test_utils.cpp
@@ -225,6 +225,7 @@ MultipleNodesTest::MultipleNodesTest(unsigned int num_nodes_dc1,
cass_cluster_set_num_threads_io(cluster, 4);
cass_cluster_set_max_concurrent_creation(cluster, 8);
cass_cluster_set_protocol_version(cluster, protocol_version);
+ cass_cluster_set_use_randomized_contact_points(cluster, cass_false);
}
MultipleNodesTest::~MultipleNodesTest() {
diff --git a/test/unit_tests/src/test_load_balancing.cpp b/test/unit_tests/src/test_load_balancing.cpp
index 247806b72..fa897ec13 100644
--- a/test/unit_tests/src/test_load_balancing.cpp
+++ b/test/unit_tests/src/test_load_balancing.cpp
@@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(simple) {
populate_hosts(2, "rack", "dc", &hosts);
cass::RoundRobinPolicy policy;
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
// start on first elem
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE(on_add)
populate_hosts(2, "rack", "dc", &hosts);
cass::RoundRobinPolicy policy;
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
// baseline
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -204,7 +204,7 @@ BOOST_AUTO_TEST_CASE(on_remove)
populate_hosts(3, "rack", "dc", &hosts);
cass::RoundRobinPolicy policy;
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
cass::SharedRefPtr host = hosts.begin()->second;
@@ -228,7 +228,7 @@ BOOST_AUTO_TEST_CASE(on_down_on_up)
populate_hosts(3, "rack", "dc", &hosts);
cass::RoundRobinPolicy policy;
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp_before1(policy.new_query_plan("ks", NULL, NULL, NULL));
boost::scoped_ptr qp_before2(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -280,7 +280,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) {
populate_hosts(local_count, "rack", LOCAL_DC, &hosts);
populate_hosts(remote_count, "rack", REMOTE_DC, &hosts);
cass::DCAwarePolicy policy(LOCAL_DC, remote_count, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
const size_t total_hosts = local_count + remote_count;
@@ -306,7 +306,7 @@ BOOST_AUTO_TEST_CASE(some_dc_local_unspecified)
h->set_rack_and_dc("", "");
cass::DCAwarePolicy policy(LOCAL_DC, 1, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -322,7 +322,7 @@ BOOST_AUTO_TEST_CASE(single_local_down)
populate_hosts(1, "rack", REMOTE_DC, &hosts);
cass::DCAwarePolicy policy(LOCAL_DC, 1, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan
target_host->set_down();
@@ -348,7 +348,7 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned)
populate_hosts(1, "rack", REMOTE_DC, &hosts);
cass::DCAwarePolicy policy(LOCAL_DC, 1, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan
target_host->set_down();
@@ -381,7 +381,7 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned)
cass::SharedRefPtr target_host = hosts[target_addr];
cass::DCAwarePolicy policy(LOCAL_DC, 1, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan
target_host->set_down();
@@ -413,7 +413,7 @@ BOOST_AUTO_TEST_CASE(used_hosts_per_remote_dc)
for (size_t used_hosts = 0; used_hosts < 3; ++used_hosts) {
cass::DCAwarePolicy policy(LOCAL_DC, used_hosts, false);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
size_t total_hosts = 3 + used_hosts;
@@ -433,7 +433,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl)
// Not allowing remote DCs for local CLs
bool allow_remote_dcs_for_local_cl = false;
cass::DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
// Set local CL
cass::SharedRefPtr request(new cass::QueryRequest());
@@ -449,7 +449,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl)
// Allowing remote DCs for local CLs
bool allow_remote_dcs_for_local_cl = true;
cass::DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
// Set local CL
cass::SharedRefPtr request(new cass::QueryRequest());
@@ -471,7 +471,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc)
// Set local DC using connected host
{
cass::DCAwarePolicy policy("", 0, false);
- policy.init(hosts[cass::Address("2.0.0.0", 9042)], hosts);
+ policy.init(hosts[cass::Address("2.0.0.0", 9042)], hosts, NULL);
cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
const size_t seq[] = {2, 3, 4};
@@ -482,7 +482,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc)
{
cass::DCAwarePolicy policy("", 0, false);
policy.init(cass::SharedRefPtr(
- new cass::Host(cass::Address("0.0.0.0", 9042), false)), hosts);
+ new cass::Host(cass::Address("0.0.0.0", 9042), false)), hosts, NULL);
cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
const size_t seq[] = {1};
@@ -521,7 +521,7 @@ BOOST_AUTO_TEST_CASE(simple)
token_map->build();
cass::TokenAwarePolicy policy(new cass::RoundRobinPolicy());
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
cass::SharedRefPtr request(new cass::QueryRequest(1));
const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887
@@ -599,7 +599,7 @@ BOOST_AUTO_TEST_CASE(network_topology)
token_map->build();
cass::TokenAwarePolicy policy(new cass::DCAwarePolicy(LOCAL_DC, num_hosts / 2, false));
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
cass::SharedRefPtr request(new cass::QueryRequest(1));
const char* value = "abc"; // hash: -5434086359492102041
@@ -706,7 +706,7 @@ BOOST_AUTO_TEST_CASE(simple)
cass::HostMap hosts;
populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts);
cass::LatencyAwarePolicy policy(new cass::RoundRobinPolicy(), settings);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
// Record some latencies with 100 ns being the minimum
for (cass::HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) {
@@ -763,7 +763,7 @@ BOOST_AUTO_TEST_CASE(min_average_under_min_measured)
cass::HostMap hosts;
populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts);
cass::LatencyAwarePolicy policy(new cass::RoundRobinPolicy(), settings);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
int count = 1;
for (cass::HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) {
@@ -802,7 +802,7 @@ BOOST_AUTO_TEST_CASE(simple)
whitelist_hosts.push_back("37.0.0.0");
whitelist_hosts.push_back("83.0.0.0");
cass::WhitelistPolicy policy(new cass::RoundRobinPolicy(), whitelist_hosts);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -824,7 +824,7 @@ BOOST_AUTO_TEST_CASE(dc)
whitelist_dcs.push_back(LOCAL_DC);
whitelist_dcs.push_back(REMOTE_DC);
cass::WhitelistDCPolicy policy(new cass::RoundRobinPolicy(), whitelist_dcs);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -849,7 +849,7 @@ BOOST_AUTO_TEST_CASE(simple)
blacklist_hosts.push_back("2.0.0.0");
blacklist_hosts.push_back("3.0.0.0");
cass::BlacklistPolicy policy(new cass::RoundRobinPolicy(), blacklist_hosts);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));
@@ -871,7 +871,7 @@ BOOST_AUTO_TEST_CASE(dc)
blacklist_dcs.push_back(LOCAL_DC);
blacklist_dcs.push_back(REMOTE_DC);
cass::BlacklistDCPolicy policy(new cass::RoundRobinPolicy(), blacklist_dcs);
- policy.init(cass::SharedRefPtr(), hosts);
+ policy.init(cass::SharedRefPtr(), hosts, NULL);
boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL));