Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,26 @@ CASS_EXPORT CassError
cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
cass_bool_t enabled);

/**
* Enable/Disable the randomization of the contact points list.
*
* <b>Default:</b> cass_true (enabled).
*
* <b>Important:</b> This setting should only be disabled for debugging or
* tests.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] enabled
* @return CASS_OK if successful, otherwise an error occurred
*
* @see cass_cluster_set_resolve_timeout()
*/
CASS_EXPORT CassError
cass_cluster_set_use_randomized_contact_points(CassCluster* cluster,
cass_bool_t enabled);

/***********************************************************************************
*
* Session
Expand Down
6 changes: 6 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
#endif
}

CassError cass_cluster_set_use_randomized_contact_points(CassCluster* cluster,
cass_bool_t enabled) {
cluster->config().set_use_randomized_contact_points(enabled);
return CASS_OK;
}

void cass_cluster_free(CassCluster* cluster) {
delete cluster->from();
}
Expand Down
9 changes: 8 additions & 1 deletion src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class Config {
, timestamp_gen_(new ServerSideTimestampGenerator())
, retry_policy_(new DefaultRetryPolicy())
, use_schema_(true)
, use_hostname_resolution_(false) { }
, use_hostname_resolution_(false)
, use_randomized_contact_points_(true) { }

unsigned thread_count_io() const { return thread_count_io_; }

Expand Down Expand Up @@ -359,6 +360,11 @@ class Config {
use_hostname_resolution_ = enable;
}

bool use_randomized_contact_points() const { return use_randomized_contact_points_; }
void set_use_randomized_contact_points(bool enable) {
use_randomized_contact_points_ = enable;
}

private:
int port_;
int protocol_version_;
Expand Down Expand Up @@ -402,6 +408,7 @@ class Config {
SharedRefPtr<RetryPolicy> retry_policy_;
bool use_schema_;
bool use_hostname_resolution_;
bool use_randomized_contact_points_;
};

} // namespace cass
Expand Down
64 changes: 37 additions & 27 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ Connection::HeartbeatHandler::HeartbeatHandler(Connection* connection)
void Connection::HeartbeatHandler::on_set(ResponseMessage* response) {
LOG_TRACE("Heartbeat completed on host %s",
connection_->address_string().c_str());
connection_->idle_start_time_ms_ = 0;
connection_->heartbeat_outstanding_ = false;
}

Expand Down Expand Up @@ -196,7 +195,6 @@ Connection::Connection(uv_loop_t* loop,
, response_(new ResponseMessage())
, stream_manager_(protocol_version)
, ssl_session_(NULL)
, idle_start_time_ms_(0)
, heartbeat_outstanding_(false) {
socket_.data = this;
uv_tcp_init(loop_, &socket_);
Expand Down Expand Up @@ -237,10 +235,14 @@ void Connection::connect() {
}

bool Connection::write(Handler* handler, bool flush_immediately) {
return internal_write(handler, flush_immediately, true);
bool result = internal_write(handler, flush_immediately);
if (result) {
restart_heartbeat_timer();
}
return result;
}

bool Connection::internal_write(Handler* handler, bool flush_immediately, bool reset_idle_time) {
bool Connection::internal_write(Handler* handler, bool flush_immediately) {
int stream = stream_manager_.acquire(handler);
if (stream < 0) {
return false;
Expand Down Expand Up @@ -304,11 +306,6 @@ bool Connection::internal_write(Handler* handler, bool flush_immediately, bool r
pending_write->flush();
}

if (reset_idle_time) {
idle_start_time_ms_ = 0;
restart_heartbeat_timer();
}

return true;
}

Expand Down Expand Up @@ -344,6 +341,7 @@ void Connection::internal_close(ConnectionState close_state) {
uv_handle_t* handle = copy_cast<uv_tcp_t*, uv_handle_t*>(&socket_);
if (!uv_is_closing(handle)) {
heartbeat_timer_.stop();
terminate_timer_.stop();
connect_timer_.stop();
if (state_ == CONNECTION_STATE_CONNECTED ||
state_ == CONNECTION_STATE_READY) {
Expand Down Expand Up @@ -428,6 +426,9 @@ void Connection::consume(char* input, size_t size) {
char* buffer = input;
size_t remaining = size;

// A successful read means the connection is still responsive
restart_terminate_timer();

while (remaining != 0) {
ssize_t consumed = response_->decode(buffer, remaining);
if (consumed <= 0) {
Expand Down Expand Up @@ -722,7 +723,7 @@ void Connection::on_timeout(Timer* timer) {
}

void Connection::on_connected() {
write(new StartupHandler(this, new OptionsRequest()));
internal_write(new StartupHandler(this, new OptionsRequest()));
}

void Connection::on_authenticate(const std::string& class_name) {
Expand All @@ -742,7 +743,7 @@ void Connection::on_auth_challenge(const AuthResponseRequest* request,
}
AuthResponseRequest* auth_response = new AuthResponseRequest(response,
request->auth());
write(new StartupHandler(this, auth_response));
internal_write(new StartupHandler(this, auth_response));
}

void Connection::on_auth_success(const AuthResponseRequest* request,
Expand All @@ -757,14 +758,14 @@ void Connection::on_auth_success(const AuthResponseRequest* request,
void Connection::on_ready() {
if (state_ == CONNECTION_STATE_CONNECTED && listener_->event_types() != 0) {
set_state(CONNECTION_STATE_REGISTERING_EVENTS);
write(new StartupHandler(this, new RegisterRequest(listener_->event_types())));
internal_write(new StartupHandler(this, new RegisterRequest(listener_->event_types())));
return;
}

if (keyspace_.empty()) {
notify_ready();
} else {
write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\"")));
internal_write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\"")));
}
}

Expand All @@ -779,7 +780,7 @@ void Connection::on_supported(ResponseMessage* response) {
// TODO(mstump) do something with the supported info
(void)supported;

write(new StartupHandler(this, new StartupRequest()));
internal_write(new StartupHandler(this, new StartupRequest()));
}

void Connection::on_pending_schema_agreement(Timer* timer) {
Expand All @@ -794,6 +795,7 @@ void Connection::on_pending_schema_agreement(Timer* timer) {
void Connection::notify_ready() {
connect_timer_.stop();
restart_heartbeat_timer();
restart_terminate_timer();
set_state(CONNECTION_STATE_READY);
listener_->on_ready(this);
}
Expand Down Expand Up @@ -845,7 +847,7 @@ void Connection::send_credentials(const std::string& class_name) {
if (v1_auth) {
V1Authenticator::Credentials credentials;
v1_auth->get_credentials(&credentials);
write(new StartupHandler(this, new CredentialsRequest(credentials)));
internal_write(new StartupHandler(this, new CredentialsRequest(credentials)));
} else {
send_initial_auth_response(class_name);
}
Expand All @@ -862,7 +864,7 @@ void Connection::send_initial_auth_response(const std::string& class_name) {
return;
}
AuthResponseRequest* auth_response = new AuthResponseRequest(response, auth);
write(new StartupHandler(this, auth_response));
internal_write(new StartupHandler(this, auth_response));
}
}

Expand All @@ -877,18 +879,8 @@ void Connection::restart_heartbeat_timer() {
void Connection::on_heartbeat(Timer* timer) {
Connection* connection = static_cast<Connection*>(timer->data());

if (connection->idle_start_time_ms_ == 0) {
connection->idle_start_time_ms_ = get_time_since_epoch_ms();
} else if ((get_time_since_epoch_ms() - connection->idle_start_time_ms_) / 1000 >
connection->config().connection_idle_timeout_secs()){
connection->notify_error("Failed to send a heartbeat within connection idle interval. "
"Terminating connection...",
CONNECTION_ERROR_TIMEOUT);
return;
}

if (!connection->heartbeat_outstanding_) {
if (!connection->internal_write(new HeartbeatHandler(connection), true, false)) {
if (!connection->internal_write(new HeartbeatHandler(connection))) {
// Recycling only this connection with a timeout error. This is unlikely and
// it means the connection ran out of stream IDs as a result of requests
// that never returned and as a result timed out.
Expand All @@ -903,6 +895,24 @@ void Connection::on_heartbeat(Timer* timer) {
connection->restart_heartbeat_timer();
}

void Connection::restart_terminate_timer() {
// The terminate timer shouldn't be started without having heartbeats enabled,
// otherwise connections would be terminated in periods of request inactivity.
if (config_.connection_heartbeat_interval_secs() > 0 &&
config_.connection_idle_timeout_secs() > 0) {
terminate_timer_.start(loop_,
1000 * config_.connection_idle_timeout_secs(),
this, on_terminate);
}
}

void Connection::on_terminate(Timer* timer) {
Connection* connection = static_cast<Connection*>(timer->data());
connection->notify_error("Failed to send a heartbeat within connection idle interval. "
"Terminating connection...",
CONNECTION_ERROR_TIMEOUT);
}

void Connection::PendingSchemaAgreement::stop_timer() {
timer.stop();
}
Expand Down
6 changes: 4 additions & 2 deletions src/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class Connection {
Timer timer;
};

bool internal_write(Handler* request, bool flush_immediately, bool reset_idle_time);
bool internal_write(Handler* request, bool flush_immediately = true);
void internal_close(ConnectionState close_state);
void set_state(ConnectionState state);
void consume(char* input, size_t size);
Expand Down Expand Up @@ -298,6 +298,8 @@ class Connection {

void restart_heartbeat_timer();
static void on_heartbeat(Timer* timer);
void restart_terminate_timer();
static void on_terminate(Timer* timer);

private:
ConnectionState state_;
Expand Down Expand Up @@ -325,9 +327,9 @@ class Connection {
Timer connect_timer_;
ScopedPtr<SslSession> ssl_session_;

uint64_t idle_start_time_ms_;
bool heartbeat_outstanding_;
Timer heartbeat_timer_;
Timer terminate_timer_;

// buffer reuse for libuv
std::stack<uv_buf_t> buffer_reuse_list_;
Expand Down
28 changes: 18 additions & 10 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
#include "session.hpp"
#include "timer.hpp"

#include <algorithm>
#include <iomanip>
#include <iterator>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -58,20 +60,25 @@ namespace cass {

class ControlStartupQueryPlan : public QueryPlan {
public:
ControlStartupQueryPlan(const HostMap& hosts)
: hosts_(hosts)
, it_(hosts_.begin()) {}
ControlStartupQueryPlan(const HostMap& hosts, Random* random)
: index_(random != NULL ? random->next(std::max(static_cast<size_t>(1), hosts.size())) : 0)
, count_(0) {
hosts_.reserve(hosts.size());
std::transform(hosts.begin(), hosts.end(), std::back_inserter(hosts_), GetHost());
}

virtual SharedRefPtr<Host> compute_next() {
if (it_ == hosts_.end()) return SharedRefPtr<Host>();
const SharedRefPtr<Host>& host = it_->second;
++it_;
return host;
const size_t size = hosts_.size();
if (count_ >= size) return SharedRefPtr<Host>();
size_t index = (index_ + count_) % size;
++count_;
return hosts_[index];
}

private:
const HostMap hosts_;
HostMap::const_iterator it_;
HostVec hosts_;
size_t index_;
size_t count_;
};

bool ControlConnection::determine_address_for_peer_host(const Address& connected_address,
Expand Down Expand Up @@ -133,7 +140,8 @@ void ControlConnection::clear() {

void ControlConnection::connect(Session* session) {
session_ = session;
query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_)); // No hosts lock necessary (read-only)
query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_, // No hosts lock necessary (read-only)
session_->random_.get()));
protocol_version_ = session_->config().protocol_version();
use_schema_ = session_->config().use_schema();
token_aware_routing_ = session_->config().token_aware_routing();
Expand Down
10 changes: 8 additions & 2 deletions src/dc_aware_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

#include "scoped_lock.hpp"

namespace cass {
#include <algorithm>

namespace cass {

static const CopyOnWriteHostVec NO_HOSTS(new HostVec());

void DCAwarePolicy::init(const SharedRefPtr<Host>& connected_host, const HostMap& hosts) {
void DCAwarePolicy::init(const SharedRefPtr<Host>& connected_host,
const HostMap& hosts,
Random* random) {
if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) {
LOG_INFO("Using '%s' for the local data center "
"(if this is incorrect, please provide the correct data center)",
Expand All @@ -37,6 +40,9 @@ void DCAwarePolicy::init(const SharedRefPtr<Host>& connected_host, const HostMap
end = hosts.end(); i != end; ++i) {
on_add(i->second);
}
if (random != NULL) {
index_ = random->next(std::max(static_cast<size_t>(1), hosts.size()));
}
}

CassHostDistance DCAwarePolicy::distance(const SharedRefPtr<Host>& host) const {
Expand Down
2 changes: 1 addition & 1 deletion src/dc_aware_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
, local_dc_live_hosts_(new HostVec)
, index_(0) {}

virtual void init(const SharedRefPtr<Host>& connected_host, const HostMap& hosts);
virtual void init(const SharedRefPtr<Host>& connected_host, const HostMap& hosts, Random* random);

virtual CassHostDistance distance(const SharedRefPtr<Host>& host) const;

Expand Down
8 changes: 0 additions & 8 deletions src/host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

namespace cass {

void copy_hosts(const HostMap& from_hosts, CopyOnWriteHostVec& to_hosts) {
to_hosts->reserve(from_hosts.size());
for (HostMap::const_iterator i = from_hosts.begin(),
end = from_hosts.end(); i != end; ++i) {
to_hosts->push_back(i->second);
}
}

void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr<Host>& host) {
HostVec::iterator i;
for (i = hosts->begin(); i != hosts->end(); ++i) {
Expand Down
Loading