diff --git a/.gitignore b/.gitignore
index 1b5cd3946..9cda32e6f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,7 @@ libcassandra.so*
*.a
# cmake output
+src/third_party/sparsehash/src/sparsehash/internal/sparseconfig.h
*.cmake
!Find*.cmake
*.build
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e38d8071b..0f8ab52c4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -64,7 +64,6 @@ option(CASS_USE_BOOST_ATOMIC "Use Boost atomics library" OFF)
option(CASS_USE_STD_ATOMIC "Use C++11 atomics library" OFF)
option(CASS_USE_OPENSSL "Use OpenSSL" ON)
option(CASS_USE_TCMALLOC "Use tcmalloc" OFF)
-option(CASS_USE_SPARSEHASH "Use sparsehash" OFF)
option(CASS_USE_ZLIB "Use zlib" OFF)
option(CASS_USE_LIBSSH2 "Use libssh2 for integration tests" ON)
@@ -108,12 +107,6 @@ if(CASS_USE_BOOST_ATOMIC OR CASS_BUILD_INTEGRATION_TESTS OR CASS_BUILD_UNIT_TEST
CassUseBoost()
endif()
-# Sparsehash
-if(CASS_USE_SPARSEHASH)
- CassUseSparshHash()
-endif()
-
-
# OpenSSL
if(CASS_USE_OPENSSL)
CassUseOpenSSL()
diff --git a/cmake/modules/CppDriver.cmake b/cmake/modules/CppDriver.cmake
index 8538aef7e..ef56260a1 100644
--- a/cmake/modules/CppDriver.cmake
+++ b/cmake/modules/CppDriver.cmake
@@ -158,40 +158,6 @@ macro(CassUseBoost)
endif()
endmacro()
-#------------------------
-# CassUseSparseHash
-#
-# Add includes required for using SparseHash.
-#
-# Input: CASS_INCLUDES
-# Output: CASS_INCLUDES
-#------------------------
-macro(CassUseSparseHash)
- # Setup the paths and hints for sparsehash
- set(_SPARSEHASH_ROOT_PATHS "${PROJECT_SOURCE_DIR}/lib/sparsehash/")
- set(_SPARSEHASH_ROOT_HINTS ${SPARSEHASH_ROOT_DIR} $ENV{SPARSEHASH_ROOT_DIR})
- if(NOT WIN32)
- set(_SPARSEHASH_ROOT_PATHS ${_SPARSEHASH_ROOT_PATHS} "/usr/" "/usr/local/")
- endif()
- set(_SPARSEHASH_ROOT_HINTS_AND_PATHS
- HINTS ${_SPARSEHASH_ROOT_HINTS}
- PATHS ${_SPARSEHASH_ROOT_PATHS})
-
- # Ensure sparsehash headers were found
- find_path(SPARSEHASH_INCLUDE_DIR
- NAMES google/dense_hash_map
- HINTS ${_SPARSEHASH_INCLUDE_DIR} ${_SPARSEHASH_ROOT_HINTS_AND_PATHS}
- PATH_SUFFIXES include)
- find_package_handle_standard_args(SparseJash "Could NOT find sparsehash, try to set the path to the sparsehash root folder in the system variable SPARSEHASH_ROOT_DIR"
- SPARSEHASH_INCLUDE_DIR)
-
- set(CASS_INCLUDES ${CASS_INCLUDES} ${SPARSEHASH_INCLUDE_DIR})
-
- if (SPARSEHASH_INCLUDE_DIR)
- add_definitions("-DCASS_USE_SPARSEHASH")
- endif()
-endmacro()
-
#------------------------
# CassUseOpenSSL
#
@@ -458,8 +424,11 @@ macro(CassAddIncludes)
${CASS_SOURCE_DIR}/src
${CASS_SOURCE_DIR}/src/ssl
${CASS_SOURCE_DIR}/src/third_party/rapidjson
+ ${CASS_SOURCE_DIR}/src/third_party/rapidjson
+ ${CASS_SOURCE_DIR}/src/third_party/sparsehash/src
${CASS_INCLUDES}
)
+ add_subdirectory(src/third_party/sparsehash)
endmacro()
#------------------------
diff --git a/include/cassandra.h b/include/cassandra.h
index 9f7007ef3..0fd4654da 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -1329,9 +1329,9 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
/**
* Configures the cluster to use token-aware request routing or not.
*
- * Important: Token-aware routing depends on keyspace information.
- * For this reason enabling token-aware routing will also enable the usage
- * of schema metadata.
+ * Important: Token-aware routing depends on keyspace metadata.
+ * For this reason enabling token-aware routing will also enable retrieving
+ * and updating keyspace schema metadata.
*
* Default: cass_true (enabled).
*
@@ -1343,8 +1343,6 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
*
* @param[in] cluster
* @param[in] enabled
- *
- * @see cass_cluster_set_use_schema();
*/
CASS_EXPORT void
cass_cluster_set_token_aware_routing(CassCluster* cluster,
@@ -1665,9 +1663,8 @@ cass_cluster_set_retry_policy(CassCluster* cluster,
/**
* Enable/Disable retrieving and updating schema metadata. If disabled
* this is allows the driver to skip over retrieving and updating schema
- * metadata, but it also disables the usage of token-aware routing and
- * cass_session_get_schema_meta() will always return an empty object. This can
- * be useful for reducing the startup overhead of short-lived sessions.
+ * metadata and cass_session_get_schema_meta() will always return an empty object.
+ * This can be useful for reducing the startup overhead of short-lived sessions.
*
* Default: cass_true (enabled).
*
@@ -1677,7 +1674,6 @@ cass_cluster_set_retry_policy(CassCluster* cluster,
* @param[in] enabled
*
* @see cass_session_get_schema_meta()
- * @see cass_cluster_set_token_aware_routing()
*/
CASS_EXPORT void
cass_cluster_set_use_schema(CassCluster* cluster,
diff --git a/src/address.cpp b/src/address.cpp
index 4675ca78e..7553d8292 100644
--- a/src/address.cpp
+++ b/src/address.cpp
@@ -21,6 +21,9 @@
namespace cass {
+const Address Address::EMPTY_KEY("0.0.0.0", 0);
+const Address Address::DELETED_KEY("0.0.0.0", 1);
+
Address::Address() {
init();
}
@@ -153,6 +156,9 @@ int Address::compare(const Address& a) const {
if (family() != a.family()) {
return family() < a.family() ? -1 : 1;
}
+ if (port() != a.port()) {
+ return port() < a.port() ? -1 : 1;
+ }
if (family() == AF_INET) {
if (addr_in()->sin_addr.s_addr != a.addr_in()->sin_addr.s_addr) {
return addr_in()->sin_addr.s_addr < a.addr_in()->sin_addr.s_addr ? -1 : 1;
diff --git a/src/address.hpp b/src/address.hpp
index b50f4c2a3..5c4dd107d 100644
--- a/src/address.hpp
+++ b/src/address.hpp
@@ -17,8 +17,11 @@
#ifndef __CASS_ADDRESS_HPP_INCLUDED__
#define __CASS_ADDRESS_HPP_INCLUDED__
+#include "hash.hpp"
#include "utils.hpp"
+#include
+
#include
#include
#include
@@ -29,6 +32,9 @@ namespace cass {
class Address {
public:
+ static const Address EMPTY_KEY;
+ static const Address DELETED_KEY;
+
Address();
Address(const std::string& ip, int port);
@@ -77,8 +83,21 @@ class Address {
struct sockaddr_storage addr_;
};
+struct AddressHash {
+ std::size_t operator()(const cass::Address& a) const {
+ if (a.family() == AF_INET) {
+ return cass::hash::fnv1a(reinterpret_cast(a.addr_in()),
+ sizeof(struct sockaddr_in));
+ } else if (a.family() == AF_INET6) {
+ return cass::hash::fnv1a(reinterpret_cast(a.addr_in6()),
+ sizeof(struct sockaddr_in6));
+ }
+ return 0;
+ }
+};
+
typedef std::vector AddressVec;
-typedef std::set AddressSet;
+typedef sparsehash::dense_hash_set AddressSet;
inline bool operator<(const Address& a, const Address& b) {
return a.compare(b) < 0;
diff --git a/src/cluster.cpp b/src/cluster.cpp
index 677a6806d..1cd1ca5d1 100644
--- a/src/cluster.cpp
+++ b/src/cluster.cpp
@@ -259,10 +259,6 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
void cass_cluster_set_token_aware_routing(CassCluster* cluster,
cass_bool_t enabled) {
cluster->config().set_token_aware_routing(enabled == cass_true);
- // Token-aware routing relies on up-to-date schema information
- if (enabled == cass_true) {
- cluster->config().set_use_schema(true);
- }
}
void cass_cluster_set_latency_aware_routing(CassCluster* cluster,
@@ -407,10 +403,6 @@ void cass_cluster_set_timestamp_gen(CassCluster* cluster,
void cass_cluster_set_use_schema(CassCluster* cluster,
cass_bool_t enabled) {
cluster->config().set_use_schema(enabled == cass_true);
- // Token-aware routing relies on up-to-date schema information
- if (enabled == cass_false) {
- cluster->config().set_token_aware_routing(false);
- }
}
CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
diff --git a/src/connection.hpp b/src/connection.hpp
index dd46cae50..8c98d07bc 100644
--- a/src/connection.hpp
+++ b/src/connection.hpp
@@ -20,6 +20,7 @@
#include "buffer.hpp"
#include "cassandra.h"
#include "handler.hpp"
+#include "hash.hpp"
#include "host.hpp"
#include "list.hpp"
#include "macros.hpp"
diff --git a/src/constants.hpp b/src/constants.hpp
index 60f41c1ec..eff58c96c 100644
--- a/src/constants.hpp
+++ b/src/constants.hpp
@@ -17,6 +17,8 @@
#ifndef __CASS_CONSTANTS_HPP_INCLUDED__
#define __CASS_CONSTANTS_HPP_INCLUDED__
+#define CASS_UINT32_MAX 4294967295UL
+
#define CASS_INT64_MAX 9223372036854775807LL
#define CASS_INT64_MIN (-CASS_INT64_MAX - 1)
diff --git a/src/control_connection.cpp b/src/control_connection.cpp
index cb61e2895..1665cce06 100644
--- a/src/control_connection.cpp
+++ b/src/control_connection.cpp
@@ -112,7 +112,8 @@ ControlConnection::ControlConnection()
, session_(NULL)
, connection_(NULL)
, protocol_version_(0)
- , should_query_tokens_(false) {}
+ , use_schema_(false)
+ , token_aware_routing_(false) { }
const SharedRefPtr& ControlConnection::connected_host() const {
return current_host_;
@@ -126,19 +127,21 @@ void ControlConnection::clear() {
query_plan_.reset();
protocol_version_ = 0;
last_connection_error_.clear();
- should_query_tokens_ = false;
+ use_schema_ = false;
+ token_aware_routing_ = false;
}
void ControlConnection::connect(Session* session) {
session_ = session;
query_plan_.reset(new ControlStartupQueryPlan(session_->hosts_)); // No hosts lock necessary (read-only)
protocol_version_ = session_->config().protocol_version();
- should_query_tokens_ = session_->config().token_aware_routing();
+ use_schema_ = session_->config().use_schema();
+ token_aware_routing_ = session_->config().token_aware_routing();
if (protocol_version_ < 0) {
protocol_version_ = CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION;
}
- if (session_->config().use_schema()) {
+ if (use_schema_ || token_aware_routing_) {
set_event_types(CASS_EVENT_TOPOLOGY_CHANGE | CASS_EVENT_STATUS_CHANGE |
CASS_EVENT_SCHEMA_CHANGE);
} else {
@@ -199,9 +202,6 @@ void ControlConnection::on_ready(Connection* connection) {
LOG_DEBUG("Connection ready on host %s",
connection->address().to_string().c_str());
- // A protocol version is need to encode/decode maps properly
- session_->metadata().set_protocol_version(protocol_version_);
-
// The control connection has to refresh meta when there's a reconnect because
// events could have been missed while not connected.
query_meta_hosts();
@@ -284,7 +284,9 @@ void ControlConnection::on_event(EventResponse* response) {
SharedRefPtr host = session_->get_host(response->affected_node());
if (host) {
session_->on_remove(host);
- session_->metadata().remove_host(host);
+ if (session_->token_map_) {
+ session_->token_map_->remove_host_and_build(host);
+ }
} else {
LOG_DEBUG("Tried to remove host %s that doesn't exist", address_str.c_str());
}
@@ -298,7 +300,9 @@ void ControlConnection::on_event(EventResponse* response) {
refresh_node_info(host, false, true);
} else {
LOG_DEBUG("Move event for host %s that doesn't exist", address_str.c_str());
- session_->metadata().remove_host(host);
+ if (session_->token_map_) {
+ session_->token_map_->remove_host_and_build(host);
+ }
}
break;
}
@@ -324,10 +328,17 @@ void ControlConnection::on_event(EventResponse* response) {
}
case CASS_EVENT_SCHEMA_CHANGE:
+ // Only handle keyspace events when using token-aware routing
+ if (!use_schema_ &&
+ response->schema_change_target() != EventResponse::KEYSPACE) {
+ return;
+ }
+
LOG_DEBUG("Schema change (%d): %.*s %.*s\n",
response->schema_change(),
(int)response->keyspace().size(), response->keyspace().data(),
(int)response->target().size(), response->target().data());
+
switch (response->schema_change()) {
case EventResponse::CREATED:
case EventResponse::UPDATED:
@@ -389,8 +400,11 @@ void ControlConnection::on_event(EventResponse* response) {
void ControlConnection::query_meta_hosts() {
ScopedRefPtr > handler(
new ControlMultipleRequestHandler(this, ControlConnection::on_query_hosts, UnusedData()));
- handler->execute_query("local", SELECT_LOCAL_TOKENS);
- handler->execute_query("peers", SELECT_PEERS_TOKENS);
+ // This needs to happen before other schema metadata queries so that we have
+ // a valid Cassandra version because this version determines which follow up
+ // schema metadata queries are executed.
+ handler->execute_query("local", token_aware_routing_ ? SELECT_LOCAL_TOKENS : SELECT_LOCAL);
+ handler->execute_query("peers", token_aware_routing_ ? SELECT_PEERS_TOKENS : SELECT_PEERS);
}
void ControlConnection::on_query_hosts(ControlConnection* control_connection,
@@ -403,6 +417,11 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
Session* session = control_connection->session_;
+ if (session->token_map_) {
+ // Clearing token/hosts will not invalidate the replicas
+ session->token_map_->clear_tokens_and_hosts();
+ }
+
bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
// If the 'system.local' table is empty the connection isn't used as a control
@@ -418,9 +437,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
ResultResponse* local_result;
if (MultipleRequestHandler::get_result_response(responses, "local", &local_result) &&
local_result->row_count() > 0) {
- local_result->decode_first_row();
- control_connection->update_node_info(host, &local_result->first_row());
- session->metadata().set_cassandra_version(host->cassandra_version());
+ control_connection->update_node_info(host, &local_result->first_row(), ADD_HOST);
+ control_connection->cassandra_version_ = host->cassandra_version();
} else {
LOG_WARN("No row found in %s's local system table",
connection->address_string().c_str());
@@ -438,7 +456,6 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
{
ResultResponse* peers_result;
if (MultipleRequestHandler::get_result_response(responses, "peers", &peers_result)) {
- peers_result->decode_first_row();
ResultIterator rows(peers_result);
while (rows.next()) {
Address address;
@@ -459,7 +476,7 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
host->set_mark(session->current_host_mark_);
- control_connection->update_node_info(host, rows.row());
+ control_connection->update_node_info(host, rows.row(), ADD_HOST);
if (is_new && !is_initial_connection) {
session->on_add(host, false);
}
@@ -469,7 +486,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
session->purge_hosts(is_initial_connection);
- if (session->config().use_schema()) {
+ if (control_connection->use_schema_ ||
+ control_connection->token_aware_routing_) {
control_connection->query_meta_schema();
} else if (is_initial_connection) {
control_connection->state_ = CONTROL_STATE_READY;
@@ -486,25 +504,33 @@ void ControlConnection::query_meta_schema() {
ScopedRefPtr > handler(
new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_schema, UnusedData()));
- if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
- handler->execute_query("keyspaces", SELECT_KEYSPACES_30);
- handler->execute_query("tables", SELECT_TABLES_30);
- handler->execute_query("views", SELECT_VIEWS_30);
- handler->execute_query("columns", SELECT_COLUMNS_30);
- handler->execute_query("indexes", SELECT_INDEXES_30);
- handler->execute_query("user_types", SELECT_USERTYPES_30);
- handler->execute_query("functions", SELECT_FUNCTIONS_30);
- handler->execute_query("aggregates", SELECT_AGGREGATES_30);
+ if (cassandra_version_ >= VersionNumber(3, 0, 0)) {
+ if (use_schema_ || token_aware_routing_) {
+ handler->execute_query("keyspaces", SELECT_KEYSPACES_30);
+ }
+ if (use_schema_) {
+ handler->execute_query("tables", SELECT_TABLES_30);
+ handler->execute_query("views", SELECT_VIEWS_30);
+ handler->execute_query("columns", SELECT_COLUMNS_30);
+ handler->execute_query("indexes", SELECT_INDEXES_30);
+ handler->execute_query("user_types", SELECT_USERTYPES_30);
+ handler->execute_query("functions", SELECT_FUNCTIONS_30);
+ handler->execute_query("aggregates", SELECT_AGGREGATES_30);
+ }
} else {
- handler->execute_query("keyspaces", SELECT_KEYSPACES_20);
- handler->execute_query("tables", SELECT_COLUMN_FAMILIES_20);
- handler->execute_query("columns", SELECT_COLUMNS_20);
- if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
- handler->execute_query("user_types", SELECT_USERTYPES_21);
+ if (use_schema_ || token_aware_routing_) {
+ handler->execute_query("keyspaces", SELECT_KEYSPACES_20);
}
- if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
- handler->execute_query("functions", SELECT_FUNCTIONS_22);
- handler->execute_query("aggregates", SELECT_AGGREGATES_22);
+ if (use_schema_) {
+ handler->execute_query("tables", SELECT_COLUMN_FAMILIES_20);
+ handler->execute_query("columns", SELECT_COLUMNS_20);
+ if (cassandra_version_ >= VersionNumber(2, 1, 0)) {
+ handler->execute_query("user_types", SELECT_USERTYPES_21);
+ }
+ if (cassandra_version_ >= VersionNumber(2, 2, 0)) {
+ handler->execute_query("functions", SELECT_FUNCTIONS_22);
+ handler->execute_query("aggregates", SELECT_AGGREGATES_22);
+ }
}
}
}
@@ -518,53 +544,65 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
}
Session* session = control_connection->session_;
-
- session->metadata().clear_and_update_back();
+ int protocol_version = control_connection->protocol_version_;
+ const VersionNumber& cassandra_version = control_connection->cassandra_version_;
bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
- ResultResponse* keyspaces_result;
- if (MultipleRequestHandler::get_result_response(responses, "keyspaces", &keyspaces_result)) {
- session->metadata().update_keyspaces(keyspaces_result);
+ if (session->token_map_) {
+ ResultResponse* keyspaces_result;
+ if (MultipleRequestHandler::get_result_response(responses, "keyspaces", &keyspaces_result)) {
+ session->token_map_->clear_replicas_and_strategies(); // Only clear replicas once we have the new keyspaces
+ session->token_map_->add_keyspaces(cassandra_version, keyspaces_result);
+ }
+ session->token_map_->build();
}
- ResultResponse* tables_result;
- if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) {
- session->metadata().update_tables(tables_result);
- }
+ if (control_connection->use_schema_) {
+ session->metadata().clear_and_update_back(cassandra_version);
- ResultResponse* views_result;
- if (MultipleRequestHandler::get_result_response(responses, "views", &views_result)) {
- session->metadata().update_views(views_result);
- }
+ ResultResponse* keyspaces_result;
+ if (MultipleRequestHandler::get_result_response(responses, "keyspaces", &keyspaces_result)) {
+ session->metadata().update_keyspaces(protocol_version, cassandra_version, keyspaces_result);
+ }
- ResultResponse* columns_result = NULL;
- if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) {
- session->metadata().update_columns(columns_result);
- }
+ ResultResponse* tables_result;
+ if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) {
+ session->metadata().update_tables(protocol_version, cassandra_version, tables_result);
+ }
- ResultResponse* indexes_result;
- if (MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result)) {
- session->metadata().update_indexes(indexes_result);
- }
+ ResultResponse* views_result;
+ if (MultipleRequestHandler::get_result_response(responses, "views", &views_result)) {
+ session->metadata().update_views(protocol_version, cassandra_version, views_result);
+ }
- ResultResponse* user_types_result;
- if (MultipleRequestHandler::get_result_response(responses, "user_types", &user_types_result)) {
- session->metadata().update_user_types(user_types_result);
- }
+ ResultResponse* columns_result = NULL;
+ if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) {
+ session->metadata().update_columns(protocol_version, cassandra_version, columns_result);
+ }
- ResultResponse* functions_result;
- if (MultipleRequestHandler::get_result_response(responses, "functions", &functions_result)) {
- session->metadata().update_functions(functions_result);
- }
+ ResultResponse* indexes_result;
+ if (MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result)) {
+ session->metadata().update_indexes(protocol_version, cassandra_version, indexes_result);
+ }
- ResultResponse* aggregates_result;
- if (MultipleRequestHandler::get_result_response(responses, "aggregates", &aggregates_result)) {
- session->metadata().update_aggregates(aggregates_result);
- }
+ ResultResponse* user_types_result;
+ if (MultipleRequestHandler::get_result_response(responses, "user_types", &user_types_result)) {
+ session->metadata().update_user_types(protocol_version, cassandra_version, user_types_result);
+ }
- session->metadata().swap_to_back_and_update_front();
- if (control_connection->should_query_tokens_) session->metadata().build();
+ ResultResponse* functions_result;
+ if (MultipleRequestHandler::get_result_response(responses, "functions", &functions_result)) {
+ session->metadata().update_functions(protocol_version, cassandra_version, functions_result);
+ }
+
+ ResultResponse* aggregates_result;
+ if (MultipleRequestHandler::get_result_response(responses, "aggregates", &aggregates_result)) {
+ session->metadata().update_aggregates(protocol_version, cassandra_version, aggregates_result);
+ }
+
+ session->metadata().swap_to_back_and_update_front();
+ }
if (is_initial_connection) {
control_connection->state_ = CONTROL_STATE_READY;
@@ -587,7 +625,7 @@ void ControlConnection::refresh_node_info(SharedRefPtr host,
std::string query;
ControlHandler::ResponseCallback response_callback;
- bool token_query = should_query_tokens_ && (host->was_just_added() || query_tokens);
+ bool token_query = token_aware_routing_ && (host->was_just_added() || query_tokens);
if (is_connected_host || !host->listen_address().empty()) {
if (is_connected_host) {
query.assign(token_query ? SELECT_LOCAL_TOKENS : SELECT_LOCAL);
@@ -636,8 +674,7 @@ void ControlConnection::on_refresh_node_info(ControlConnection* control_connecti
host_address_str.c_str());
return;
}
- result->decode_first_row();
- control_connection->update_node_info(data.host, &result->first_row());
+ control_connection->update_node_info(data.host, &result->first_row(), UPDATE_HOST_AND_BUILD);
if (data.is_new_node) {
control_connection->session_->on_add(data.host, false);
@@ -665,7 +702,6 @@ void ControlConnection::on_refresh_node_info_all(ControlConnection* control_conn
return;
}
- result->decode_first_row();
ResultIterator rows(result);
while (rows.next()) {
const Row* row = rows.row();
@@ -676,7 +712,7 @@ void ControlConnection::on_refresh_node_info_all(ControlConnection* control_conn
row->get_by_name("rpc_address"),
&address);
if (is_valid_address && data.host->address().compare(address) == 0) {
- control_connection->update_node_info(data.host, row);
+ control_connection->update_node_info(data.host, row, UPDATE_HOST_AND_BUILD);
if (data.is_new_node) {
control_connection->session_->on_add(data.host, false);
}
@@ -685,7 +721,7 @@ void ControlConnection::on_refresh_node_info_all(ControlConnection* control_conn
}
}
-void ControlConnection::update_node_info(SharedRefPtr host, const Row* row) {
+void ControlConnection::update_node_info(SharedRefPtr host, const Row* row, UpdateHostType type) {
const Value* v;
std::string rack;
@@ -727,21 +763,22 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
host->address().to_string().c_str());
}
- if (should_query_tokens_) {
+ if (token_aware_routing_) {
bool is_connected_host = connection_ != NULL && host->address().compare(connection_->address()) == 0;
std::string partitioner;
if (is_connected_host && row->get_string_by_name("partitioner", &partitioner)) {
- session_->metadata().set_partitioner(partitioner);
+ if (!session_->token_map_) {
+ session_->token_map_.reset(TokenMap::from_partitioner(partitioner));
+ }
}
v = row->get_by_name("tokens");
- if (v != NULL) {
- CollectionIterator i(v);
- TokenStringList tokens;
- while (i.next()) {
- tokens.push_back(i.value()->to_string_ref());
- }
- if (!tokens.empty()) {
- session_->metadata().update_host(host, tokens);
+ if (v != NULL && v->is_collection()) {
+ if (session_->token_map_) {
+ if (type == UPDATE_HOST_AND_BUILD) {
+ session_->token_map_->update_host_and_build(host, v);
+ } else {
+ session_->token_map_->add_host(host, v);
+ }
}
}
}
@@ -750,7 +787,7 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
std::string query;
- if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ if (cassandra_version_ >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_KEYSPACES_30);
} else {
query.assign(SELECT_KEYSPACES_20);
@@ -777,7 +814,18 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
keyspace_name.c_str());
return;
}
- control_connection->session_->metadata().update_keyspaces(result);
+
+ Session* session = control_connection->session_;
+ int protocol_version = control_connection->protocol_version_;
+ const VersionNumber& cassandra_version = control_connection->cassandra_version_;
+
+ if (session->token_map_) {
+ session->token_map_->update_keyspaces_and_build(cassandra_version, result);
+ }
+
+ if (control_connection->use_schema_) {
+ session->metadata().update_keyspaces(protocol_version, cassandra_version, result);
+ }
}
void ControlConnection::refresh_table_or_view(const StringRef& keyspace_name,
@@ -787,7 +835,7 @@ void ControlConnection::refresh_table_or_view(const StringRef& keyspace_name,
std::string column_query;
std::string index_query;
- if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ if (cassandra_version_ >= VersionNumber(3, 0, 0)) {
table_query.assign(SELECT_TABLES_30);
table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");
@@ -837,6 +885,8 @@ void ControlConnection::on_refresh_table_or_view(ControlConnection* control_conn
const MultipleRequestHandler::ResponseMap& responses) {
ResultResponse* tables_result;
Session* session = control_connection->session_;
+ int protocol_version = control_connection->protocol_version_;
+ const VersionNumber& cassandra_version = control_connection->cassandra_version_;
if (!MultipleRequestHandler::get_result_response(responses, "tables", &tables_result) ||
tables_result->row_count() == 0) {
ResultResponse* views_result;
@@ -846,19 +896,19 @@ void ControlConnection::on_refresh_table_or_view(ControlConnection* control_conn
data.keyspace_name.c_str(), data.table_or_view_name.c_str());
return;
}
- session->metadata().update_views(views_result);
+ session->metadata().update_views(protocol_version, cassandra_version, views_result);
} else {
- session->metadata().update_tables(tables_result);
+ session->metadata().update_tables(protocol_version, cassandra_version, tables_result);
}
ResultResponse* columns_result;
if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) {
- session->metadata().update_columns(columns_result);
+ session->metadata().update_columns(protocol_version, cassandra_version, columns_result);
}
ResultResponse* indexes_result;
if (MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result)) {
- session->metadata().update_indexes(indexes_result);
+ session->metadata().update_indexes(protocol_version, cassandra_version, indexes_result);
}
}
@@ -867,7 +917,7 @@ void ControlConnection::refresh_type(const StringRef& keyspace_name,
const StringRef& type_name) {
std::string query;
- if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ if (cassandra_version_ >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_USERTYPES_30);
} else {
query.assign(SELECT_USERTYPES_21);
@@ -895,7 +945,10 @@ void ControlConnection::on_refresh_type(ControlConnection* control_connection,
keyspace_and_type_names.second.c_str());
return;
}
- control_connection->session_->metadata().update_user_types(result);
+ Session* session = control_connection->session_;
+ int protocol_version = control_connection->protocol_version_;
+ const VersionNumber& cassandra_version = control_connection->cassandra_version_;
+ session->metadata().update_user_types(protocol_version, cassandra_version, result);
}
void ControlConnection::refresh_function(const StringRef& keyspace_name,
@@ -904,7 +957,7 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
bool is_aggregate) {
std::string query;
- if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ if (cassandra_version_ >= VersionNumber(3, 0, 0)) {
if (is_aggregate) {
query.assign(SELECT_AGGREGATES_30);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND argument_types=?");
@@ -959,10 +1012,13 @@ void ControlConnection::on_refresh_function(ControlConnection* control_connectio
Metadata::full_function_name(data.function, data.arg_types).c_str());
return;
}
+ Session* session = control_connection->session_;
+ int protocol_version = control_connection->protocol_version_;
+ const VersionNumber& cassandra_version = control_connection->cassandra_version_;
if (data.is_aggregate) {
- control_connection->session_->metadata().update_aggregates(result);
+ session->metadata().update_aggregates(protocol_version, cassandra_version, result);
} else {
- control_connection->session_->metadata().update_functions(result);
+ session->metadata().update_functions(protocol_version, cassandra_version, result);
}
}
@@ -1026,6 +1082,17 @@ void ControlConnection::on_reconnect(Timer* timer) {
control_connection->reconnect(false);
}
+template
+void ControlConnection::ControlMultipleRequestHandler::execute_query(
+ const std::string& index, const std::string& query) {
+ // We need to update the loop time to prevent new requests from timing out
+ // in cases where a callback took a long time to execute. In the future,
+ // we might improve this by executing the these long running callbacks
+ // on a seperate thread.
+ uv_update_time(control_connection_->session_->loop());
+ MultipleRequestHandler::execute_query(index, query);
+}
+
template
void ControlConnection::ControlMultipleRequestHandler::on_set(
const MultipleRequestHandler::ResponseMap& responses) {
diff --git a/src/control_connection.hpp b/src/control_connection.hpp
index 579b96161..011e345e9 100644
--- a/src/control_connection.hpp
+++ b/src/control_connection.hpp
@@ -19,7 +19,6 @@
#include "address.hpp"
#include "connection.hpp"
-#include "token_map.hpp"
#include "handler.hpp"
#include "host.hpp"
#include "load_balancing.hpp"
@@ -27,6 +26,9 @@
#include "multiple_request_handler.hpp"
#include "response.hpp"
#include "scoped_ptr.hpp"
+#include "token_map.hpp"
+
+#include
namespace cass {
@@ -57,6 +59,10 @@ class ControlConnection : public Connection::Listener {
return protocol_version_;
}
+ const VersionNumber& cassandra_version() const {
+ return cassandra_version_;
+ }
+
const SharedRefPtr& connected_host() const;
void clear();
@@ -81,6 +87,8 @@ class ControlConnection : public Connection::Listener {
, response_callback_(response_callback)
, data_(data) {}
+ void execute_query(const std::string& index, const std::string& query);
+
virtual void on_set(const MultipleRequestHandler::ResponseMap& responses);
virtual void on_error(CassError code, const std::string& message) {
@@ -171,6 +179,11 @@ class ControlConnection : public Connection::Listener {
bool is_aggregate;
};
+ enum UpdateHostType {
+ ADD_HOST,
+ UPDATE_HOST_AND_BUILD
+ };
+
void schedule_reconnect(uint64_t ms = 0);
void reconnect(bool retry_current_host);
@@ -206,7 +219,7 @@ class ControlConnection : public Connection::Listener {
const RefreshNodeData& data,
Response* response);
- void update_node_info(SharedRefPtr host, const Row* row);
+ void update_node_info(SharedRefPtr host, const Row* row, UpdateHostType type);
void refresh_keyspace(const StringRef& keyspace_name);
static void on_refresh_keyspace(ControlConnection* control_connection, const std::string& keyspace_name, Response* response);
@@ -239,8 +252,10 @@ class ControlConnection : public Connection::Listener {
ScopedPtr query_plan_;
Host::Ptr current_host_;
int protocol_version_;
+ VersionNumber cassandra_version_;
std::string last_connection_error_;
- bool should_query_tokens_;
+ bool use_schema_;
+ bool token_aware_routing_;
static Address bind_any_ipv4_;
static Address bind_any_ipv6_;
diff --git a/src/copy_on_write_ptr.hpp b/src/copy_on_write_ptr.hpp
index 062180688..533d67c03 100644
--- a/src/copy_on_write_ptr.hpp
+++ b/src/copy_on_write_ptr.hpp
@@ -45,6 +45,10 @@ class CopyOnWritePtr {
return *this;
}
+ operator bool() const {
+ return ptr_->ref != NULL;
+ }
+
const T& operator*() const {
return *(ptr_->ref);
}
diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp
index ea0d387d3..6dbe6e3f7 100644
--- a/src/dc_aware_policy.cpp
+++ b/src/dc_aware_policy.cpp
@@ -57,7 +57,7 @@ CassHostDistance DCAwarePolicy::distance(const SharedRefPtr& host) const {
QueryPlan* DCAwarePolicy::new_query_plan(const std::string& connected_keyspace,
const Request* request,
- const TokenMap& token_map,
+ const TokenMap* token_map,
Request::EncodingCache* cache) {
CassConsistency cl = request != NULL ? request->consistency() : Request::DEFAULT_CONSISTENCY;
return new DCAwareQueryPlan(this, cl, index_++);
diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp
index d510c30fd..9eb6cc74f 100644
--- a/src/dc_aware_policy.hpp
+++ b/src/dc_aware_policy.hpp
@@ -52,7 +52,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
virtual QueryPlan* new_query_plan(const std::string& connected_keyspace,
const Request* request,
- const TokenMap& token_map,
+ const TokenMap* token_map,
Request::EncodingCache* cache);
virtual void on_add(const SharedRefPtr& host);
diff --git a/src/future.cpp b/src/future.cpp
index aa044303b..1db89ac26 100644
--- a/src/future.cpp
+++ b/src/future.cpp
@@ -59,7 +59,6 @@ const CassResult* cass_future_get_result(CassFuture* future) {
cass::SharedRefPtr result(response_future->response());
if (!result) return NULL;
- result->decode_first_row();
result->inc_ref();
return CassResult::to(result.get());
}
diff --git a/src/hash.hpp b/src/hash.hpp
new file mode 100644
index 000000000..63c1a3841
--- /dev/null
+++ b/src/hash.hpp
@@ -0,0 +1,58 @@
+/*
+ Copyright (c) 2014-2016 DataStax
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef __CASS_HASH_HPP_INCLUDED__
+#define __CASS_HASH_HPP_INCLUDED__
+
+#include
+#include
+
+namespace cass { namespace hash {
+
+typedef int (Op)(int);
+
+inline int nop(int c) { return c; }
+
+#if defined(__x86_64__) || defined(_M_X64) || defined(__aarch64__)
+#define FNV1_64_INIT 0xcbf29ce484222325ULL
+#define FNV1_64_PRIME 0x100000001b3ULL
+
+inline uint64_t fnv1a(const char* data, size_t length, Op op = nop) {
+ uint64_t h = FNV1_64_INIT;
+ for (size_t i = 0; i < length; ++i) {
+ h ^= static_cast(op(data[i]));
+ h *= FNV1_64_PRIME;
+ }
+ return h;
+}
+#else
+#define FNV1_32_INIT 0x811c9dc5
+#define FNV1_32_PRIME 0x01000193
+
+inline uint32_t fnv1a(const char* data, size_t length, Op op = nop) {
+ uint32_t h = FNV1_32_INIT;
+ for (size_t i = 0; i < length; ++i) {
+ h ^= static_cast(op(data[i]));
+ h *= FNV1_32_PRIME;
+ }
+ return h;
+}
+#endif
+
+} } // namespace cass::hash
+
+
+#endif
diff --git a/src/hash_table.hpp b/src/hash_table.hpp
index e23550fad..f3f8a8f24 100644
--- a/src/hash_table.hpp
+++ b/src/hash_table.hpp
@@ -18,6 +18,7 @@
#define __CASS_HASH_INDEX_HPP_INCLUDED__
#include "fixed_vector.hpp"
+#include "hash.hpp"
#include "macros.hpp"
#include "string_ref.hpp"
#include "utils.hpp"
@@ -30,36 +31,6 @@
namespace cass {
-#if defined(__x86_64__) || defined(_M_X64) || defined(__aarch64__)
-#define FNV1_64_INIT 0xcbf29ce484222325ULL
-#define FNV1_64_PRIME 0x100000001b3ULL
-
-inline uint64_t fnv1a_hash_lower(StringRef s) {
- uint64_t h = FNV1_64_INIT;
- for(StringRef::const_iterator i = s.begin(), end = s.end(); i != end; ++i) {
- h ^= static_cast(static_cast(::tolower(*i)));
- h *= FNV1_64_PRIME;
- }
- return h;
-}
-
-#undef FNV1_64_INIT
-#undef FNV1_64_PRIME
-#else
-#define FNV1_32_INIT 0x811c9dc5
-#define FNV1_32_PRIME 0x01000193
-inline uint32_t fnv1a_hash_lower(StringRef s) {
- uint32_t h = FNV1_32_INIT;
- for(StringRef::const_iterator i = s.begin(), end = s.end(); i != end; ++i) {
- h ^= static_cast(static_cast(::tolower(*i)));
- h *= FNV1_32_PRIME;
- }
- return h;
-}
-#undef FNV1_32_INIT
-#undef FNV1_32_PRIME
-#endif
-
typedef FixedVector IndexVec;
template
@@ -73,7 +44,6 @@ struct HashTableEntry {
T* next;
};
-
template
class CaseInsensitiveHashTable {
public:
@@ -129,7 +99,8 @@ size_t CaseInsensitiveHashTable::get_indices(StringRef name, IndexVec* result
name = name.substr(1, name.size() - 2);
}
- size_t h = fnv1a_hash_lower(name) & index_mask_;
+ size_t h = hash::fnv1a(name.data(),
+ name.size(), ::tolower) & index_mask_;
size_t start = h;
while (index_[h] != NULL && !iequals(name, index_[h]->name)) {
@@ -188,7 +159,8 @@ void CaseInsensitiveHashTable::set_entries(const EntryVec& entries) {
template
void CaseInsensitiveHashTable::add_index(T* entry) {
- size_t h = fnv1a_hash_lower(entry->name) & index_mask_;
+ size_t h = hash::fnv1a(entry->name.data(),
+ entry->name.size(), ::tolower) & index_mask_;
if (index_[h] == NULL) {
index_[h] = entry;
diff --git a/src/host.hpp b/src/host.hpp
index 84108f026..928a641ed 100644
--- a/src/host.hpp
+++ b/src/host.hpp
@@ -31,6 +31,7 @@
#include
#include
#include
+#include
#include
namespace cass {
@@ -113,6 +114,8 @@ class Host : public RefCounted {
Host(const Address& address, bool mark)
: address_(address)
+ , rack_id_(0)
+ , dc_id_(0)
, mark_(mark)
, state_(ADDED)
, address_string_(address.to_string()) { }
@@ -140,6 +143,13 @@ class Host : public RefCounted {
dc_ = dc;
}
+ uint32_t rack_id() const { return rack_id_; }
+ uint32_t dc_id() const { return dc_id_; }
+ void set_rack_and_dc_ids(uint32_t rack_id, uint32_t dc_id) {
+ rack_id_ = rack_id;
+ dc_id_ = dc_id;
+ }
+
const std::string& listen_address() const { return listen_address_; }
void set_listen_address(const std::string& listen_address) {
listen_address_ = listen_address;
@@ -219,6 +229,8 @@ class Host : public RefCounted {
}
Address address_;
+ uint32_t rack_id_;
+ uint32_t dc_id_;
bool mark_;
Atomic state_;
std::string address_string_;
diff --git a/src/io_worker.cpp b/src/io_worker.cpp
index df63833b1..0f16275cb 100644
--- a/src/io_worker.cpp
+++ b/src/io_worker.cpp
@@ -35,6 +35,10 @@ IOWorker::IOWorker(Session* session)
, keyspace_(new std::string)
, pending_request_count_(0)
, request_queue_(config_.queue_size_io()) {
+ pools_.set_empty_key(Address::EMPTY_KEY);
+ pools_.set_deleted_key(Address::DELETED_KEY);
+ unavailable_addresses_.set_empty_key(Address::EMPTY_KEY);
+ unavailable_addresses_.set_deleted_key(Address::DELETED_KEY);
prepare_.data = this;
uv_mutex_init(&unavailable_addresses_mutex_);
}
diff --git a/src/io_worker.hpp b/src/io_worker.hpp
index c785d7bf7..64fb5d909 100644
--- a/src/io_worker.hpp
+++ b/src/io_worker.hpp
@@ -29,7 +29,8 @@
#include "spsc_queue.hpp"
#include "timer.hpp"
-#include