diff --git a/cmake/modules/CppDriver.cmake b/cmake/modules/CppDriver.cmake index a1b4eaf3d..acf94fcb6 100644 --- a/cmake/modules/CppDriver.cmake +++ b/cmake/modules/CppDriver.cmake @@ -429,7 +429,7 @@ macro(CassAddIncludes) ${CASS_SOURCE_DIR}/src/third_party/sparsehash/src ${CASS_INCLUDES} ) - add_subdirectory(src/third_party/sparsehash) + add_subdirectory(${CASS_SOURCE_DIR}/src/third_party/sparsehash) endmacro() #------------------------ diff --git a/src/address.cpp b/src/address.cpp index 7553d8292..c8ddc617a 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -18,6 +18,7 @@ #include #include +#include namespace cass { @@ -25,11 +26,11 @@ const Address Address::EMPTY_KEY("0.0.0.0", 0); const Address Address::DELETED_KEY("0.0.0.0", 1); Address::Address() { - init(); + memset(&addr_, 0, sizeof(addr_)); } Address::Address(const std::string& ip, int port) { - init(); + memset(&addr_, 0, sizeof(addr_)); from_string(ip, port, this); } @@ -117,7 +118,6 @@ int Address::port() const { } else if (family() == AF_INET6) { return htons(addr_in6()->sin6_port); } else { - assert(false); return -1; } } @@ -136,8 +136,6 @@ std::string Address::to_string(bool with_port) const { if (with_port) ss << "["; ss << host; if (with_port) ss << "]:" << port(); - } else { - assert(false); } return ss.str(); } @@ -166,9 +164,6 @@ int Address::compare(const Address& a) const { } else if (family() == AF_INET6) { return memcmp(&(addr_in6()->sin6_addr), &(a.addr_in6()->sin6_addr), sizeof(addr_in6()->sin6_addr)); - } else { - assert(false); - return -1; } return 0; } diff --git a/src/address.hpp b/src/address.hpp index 5c4dd107d..204be847d 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -22,10 +22,8 @@ #include -#include -#include -#include #include +#include #include namespace cass { @@ -39,7 +37,6 @@ class Address { Address(const std::string& ip, int port); - static bool from_string(const std::string& ip, int port, Address* output = NULL); @@ -68,6 +65,8 @@ class Address { return copy_cast(&addr_); } + bool is_valid() const { return family() == AF_INET || family() == AF_INET6; } + int family() const { return addr_.ss_family; } int port() const; @@ -78,8 +77,6 @@ class Address { int compare(const Address& a) const; private: - void init() { memset(&addr_, 0, sizeof(addr_)); } - struct sockaddr_storage addr_; }; diff --git a/src/auth.cpp b/src/auth.cpp index 2b0277f2f..69f2e2bb3 100644 --- a/src/auth.cpp +++ b/src/auth.cpp @@ -15,7 +15,7 @@ */ #include "auth.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "cassandra.h" diff --git a/src/auth.hpp b/src/auth.hpp index d5e4baaea..926bd4831 100644 --- a/src/auth.hpp +++ b/src/auth.hpp @@ -18,6 +18,7 @@ #define __CASS_AUTH_HPP_INCLUDED__ #include "buffer.hpp" +#include "external.hpp" #include "host.hpp" #include "macros.hpp" #include "ref_counted.hpp" @@ -169,4 +170,6 @@ class PlainTextAuthProvider : public AuthProvider { } // namespace cass +EXTERNAL_TYPE(cass::ExternalAuthenticator, CassAuthenticator) + #endif diff --git a/src/batch_request.cpp b/src/batch_request.cpp index 3373f8b60..30a82f5c0 100644 --- a/src/batch_request.cpp +++ b/src/batch_request.cpp @@ -18,7 +18,8 @@ #include "constants.hpp" #include "execute_request.hpp" -#include "external_types.hpp" +#include "external.hpp" +#include "handler.hpp" #include "serialization.hpp" #include "statement.hpp" diff --git a/src/batch_request.hpp b/src/batch_request.hpp index 5a4960744..cddf8554d 100644 --- a/src/batch_request.hpp +++ b/src/batch_request.hpp @@ -19,12 +19,13 @@ #include "cassandra.h" #include "constants.hpp" +#include "external.hpp" #include "request.hpp" #include "ref_counted.hpp" -#include #include #include +#include namespace cass { @@ -33,7 +34,7 @@ class ExecuteRequest; class BatchRequest : public RoutableRequest { public: - typedef std::list > StatementList; + typedef std::vector > StatementList; BatchRequest(uint8_t type_) : RoutableRequest(CQL_OPCODE_BATCH) @@ -62,4 +63,6 @@ class BatchRequest : public RoutableRequest { } // namespace cass +EXTERNAL_TYPE(cass::BatchRequest, CassBatch) + #endif diff --git a/src/cluster.cpp b/src/cluster.cpp index 217cfb86b..27826d193 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -19,7 +19,7 @@ #include "dc_aware_policy.hpp" #include "logger.hpp" #include "round_robin_policy.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "utils.hpp" #include diff --git a/src/cluster.hpp b/src/cluster.hpp index 3ecfc2a02..2adbf1300 100644 --- a/src/cluster.hpp +++ b/src/cluster.hpp @@ -18,6 +18,7 @@ #define __CASS_CLUSTER_HPP_INCLUDED__ #include "config.hpp" +#include "external.hpp" namespace cass { @@ -32,4 +33,6 @@ class Cluster { } // namespace cass +EXTERNAL_TYPE(cass::Cluster, CassCluster) + #endif diff --git a/src/collection.cpp b/src/collection.cpp index dc075aab8..b09dfe9fc 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -17,8 +17,9 @@ #include "collection.hpp" #include "constants.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "macros.hpp" +#include "tuple.hpp" #include "user_type_value.hpp" #include diff --git a/src/collection.hpp b/src/collection.hpp index ed1e99599..42a99932c 100644 --- a/src/collection.hpp +++ b/src/collection.hpp @@ -20,6 +20,7 @@ #include "cassandra.h" #include "data_type.hpp" #include "encode.hpp" +#include "external.hpp" #include "buffer.hpp" #include "ref_counted.hpp" #include "types.hpp" @@ -139,5 +140,7 @@ class Collection : public RefCounted { } // namespace cass +EXTERNAL_TYPE(cass::Collection, CassCollection) + #endif diff --git a/src/config.hpp b/src/config.hpp index e3538ccb7..3bf408566 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -20,6 +20,7 @@ #include "auth.hpp" #include "cassandra.h" #include "dc_aware_policy.hpp" +#include "host_targeting_policy.hpp" #include "latency_aware_policy.hpp" #include "retry_policy.hpp" #include "ssl.hpp" @@ -66,6 +67,7 @@ class Config { , load_balancing_policy_(new DCAwarePolicy()) , token_aware_routing_(true) , latency_aware_routing_(false) + , host_targeting_(false) , tcp_nodelay_enable_(true) , tcp_keepalive_enable_(false) , tcp_keepalive_delay_secs_(0) @@ -260,6 +262,9 @@ class Config { if (latency_aware()) { chain = new LatencyAwarePolicy(chain, latency_aware_routing_settings_); } + if (host_targeting()) { + chain = new HostTargetingPolicy(chain); + } return chain; } @@ -282,6 +287,10 @@ class Config { void set_latency_aware_routing(bool is_latency_aware) { latency_aware_routing_ = is_latency_aware; } + bool host_targeting() const { return host_targeting_; } + + void set_host_targeting(bool is_host_targeting) { host_targeting_ = is_host_targeting; } + void set_latency_aware_routing_settings(const LatencyAwarePolicy::Settings& settings) { latency_aware_routing_settings_ = settings; } @@ -394,6 +403,7 @@ class Config { SharedRefPtr ssl_context_; bool token_aware_routing_; bool latency_aware_routing_; + bool host_targeting_; LatencyAwarePolicy::Settings latency_aware_routing_settings_; ContactPointList whitelist_; ContactPointList blacklist_; diff --git a/src/data_type.cpp b/src/data_type.cpp index 679628cc1..cb809de8f 100644 --- a/src/data_type.cpp +++ b/src/data_type.cpp @@ -17,7 +17,7 @@ #include "data_type.hpp" #include "collection.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "tuple.hpp" #include "types.hpp" #include "user_type_value.hpp" diff --git a/src/data_type.hpp b/src/data_type.hpp index 3e6350e1c..901b4b474 100644 --- a/src/data_type.hpp +++ b/src/data_type.hpp @@ -18,6 +18,7 @@ #define __CASS_DATA_TYPE_HPP_INCLUDED__ #include "cassandra.h" +#include "external.hpp" #include "hash_table.hpp" #include "macros.hpp" #include "ref_counted.hpp" @@ -591,4 +592,6 @@ struct IsValidDataType { } // namespace cass +EXTERNAL_TYPE(cass::DataType, CassDataType) + #endif diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index 036975c8e..5129eca8c 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -17,7 +17,7 @@ #include "dc_aware_policy.hpp" #include "logger.hpp" - +#include "request_handler.hpp" #include "scoped_lock.hpp" #include @@ -62,10 +62,9 @@ CassHostDistance DCAwarePolicy::distance(const SharedRefPtr& host) const { } QueryPlan* DCAwarePolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) { - CassConsistency cl = request != NULL ? request->consistency() : Request::DEFAULT_CONSISTENCY; + RequestHandler* request_handler, + const TokenMap* token_map) { + CassConsistency cl = request_handler != NULL ? request_handler->request()->consistency() : Request::DEFAULT_CONSISTENCY; return new DCAwareQueryPlan(this, cl, index_++); } diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index 21b001540..0dbcf49bc 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -51,9 +51,8 @@ class DCAwarePolicy : public LoadBalancingPolicy { virtual CassHostDistance distance(const SharedRefPtr& host) const; virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache); + RequestHandler* request_handler, + const TokenMap* token_map); virtual void on_add(const SharedRefPtr& host); diff --git a/src/error_response.cpp b/src/error_response.cpp index 0dc7201cc..d7a906c46 100644 --- a/src/error_response.cpp +++ b/src/error_response.cpp @@ -16,7 +16,7 @@ #include "error_response.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "logger.hpp" #include "serialization.hpp" diff --git a/src/error_response.hpp b/src/error_response.hpp index 644287b32..5934a735f 100644 --- a/src/error_response.hpp +++ b/src/error_response.hpp @@ -17,6 +17,7 @@ #ifndef __CASS_ERROR_RESPONSE_HPP_INCLUDED__ #define __CASS_ERROR_RESPONSE_HPP_INCLUDED__ +#include "external.hpp" #include "response.hpp" #include "constants.hpp" #include "scoped_ptr.hpp" @@ -84,4 +85,6 @@ bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected } // namespace cass +EXTERNAL_TYPE(cass::ErrorResponse, CassErrorResult) + #endif diff --git a/src/external_types.cpp b/src/external.cpp similarity index 98% rename from src/external_types.cpp rename to src/external.cpp index 26c4fff8e..b8fd3cbc8 100644 --- a/src/external_types.cpp +++ b/src/external.cpp @@ -14,8 +14,11 @@ limitations under the License. */ -#include "external_types.hpp" +#include "external.hpp" +#include "cassandra.h" + +#include #include #define NUM_SECONDS_PER_DAY (24U * 60U * 60U) diff --git a/src/external.hpp b/src/external.hpp new file mode 100644 index 000000000..763bb5a0d --- /dev/null +++ b/src/external.hpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2014-2016 DataStax + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __CASS_EXTERNAL_HPP_INCLUDED__ +#define __CASS_EXTERNAL_HPP_INCLUDED__ + +// This abstraction allows us to separate internal types from the +// external opaque pointers that we expose. +template +struct External : public In { + In* from() { return static_cast(this); } + const In* from() const { return static_cast(this); } + static Ex* to(In* in) { return static_cast(in); } + static const Ex* to(const In* in) { return static_cast(in); } +}; + +#define EXTERNAL_TYPE(InternalType, ExternalType) \ + extern "C" { \ + struct ExternalType##_ : public External { \ + private: \ + ~ExternalType##_() { } \ + }; \ + } + +#endif diff --git a/src/external_types.hpp b/src/external_types.hpp deleted file mode 100644 index 357150376..000000000 --- a/src/external_types.hpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - Copyright (c) 2014-2016 DataStax - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#ifndef __CASS_EXTERNAL_TYPES_HPP_INCLUDED__ -#define __CASS_EXTERNAL_TYPES_HPP_INCLUDED__ - -#include "auth.hpp" -#include "cassandra.h" -#include "batch_request.hpp" -#include "cluster.hpp" -#include "collection.hpp" -#include "data_type.hpp" -#include "error_response.hpp" -#include "future.hpp" -#include "iterator.hpp" -#include "metadata.hpp" -#include "prepared.hpp" -#include "result_response.hpp" -#include "request.hpp" -#include "retry_policy.hpp" -#include "row.hpp" -#include "session.hpp" -#include "ssl.hpp" -#include "statement.hpp" -#include "timestamp_generator.hpp" -#include "tuple.hpp" -#include "user_type_value.hpp" -#include "uuids.hpp" -#include "value.hpp" - -// This abstraction allows us to separate internal types from the -// external opaque pointers that we expose. -template -struct External : public In { - In* from() { return static_cast(this); } - const In* from() const { return static_cast(this); } - static Ex* to(In* in) { return static_cast(in); } - static const Ex* to(const In* in) { return static_cast(in); } -}; - -#define EXTERNAL_TYPE(InternalType, ExternalType) \ - struct ExternalType##_ : public External { \ - private: \ - ~ExternalType##_() { } \ - } - -extern "C" { - -EXTERNAL_TYPE(cass::Cluster, CassCluster); -EXTERNAL_TYPE(cass::Session, CassSession); -EXTERNAL_TYPE(cass::Statement, CassStatement); -EXTERNAL_TYPE(cass::Future, CassFuture); -EXTERNAL_TYPE(cass::Prepared, CassPrepared); -EXTERNAL_TYPE(cass::BatchRequest, CassBatch); -EXTERNAL_TYPE(cass::ResultResponse, CassResult); -EXTERNAL_TYPE(cass::ErrorResponse, CassErrorResult); -EXTERNAL_TYPE(cass::Collection, CassCollection); -EXTERNAL_TYPE(cass::Iterator, CassIterator); -EXTERNAL_TYPE(cass::Row, CassRow); -EXTERNAL_TYPE(cass::Value, CassValue); -EXTERNAL_TYPE(cass::SslContext, CassSsl); -EXTERNAL_TYPE(cass::ExternalAuthenticator, CassAuthenticator); -EXTERNAL_TYPE(cass::Metadata::SchemaSnapshot, CassSchemaMeta); -EXTERNAL_TYPE(cass::KeyspaceMetadata, CassKeyspaceMeta); -EXTERNAL_TYPE(cass::TableMetadata, CassTableMeta); -EXTERNAL_TYPE(cass::ViewMetadata, CassMaterializedViewMeta); -EXTERNAL_TYPE(cass::ColumnMetadata, CassColumnMeta); -EXTERNAL_TYPE(cass::IndexMetadata, CassIndexMeta); -EXTERNAL_TYPE(cass::FunctionMetadata, CassFunctionMeta); -EXTERNAL_TYPE(cass::AggregateMetadata, CassAggregateMeta); -EXTERNAL_TYPE(cass::UuidGen, CassUuidGen); -EXTERNAL_TYPE(cass::Tuple, CassTuple); -EXTERNAL_TYPE(cass::UserTypeValue, CassUserType); -EXTERNAL_TYPE(cass::DataType, CassDataType); -EXTERNAL_TYPE(cass::TimestampGenerator, CassTimestampGen); -EXTERNAL_TYPE(cass::RetryPolicy, CassRetryPolicy); -EXTERNAL_TYPE(cass::CustomPayload, CassCustomPayload); - -} - -#undef EXTERNAL_TYPE - -#endif diff --git a/src/future.cpp b/src/future.cpp index 1db89ac26..c7d27d31f 100644 --- a/src/future.cpp +++ b/src/future.cpp @@ -16,9 +16,11 @@ #include "future.hpp" +#include "external.hpp" +#include "prepared.hpp" #include "request_handler.hpp" +#include "result_response.hpp" #include "scoped_ptr.hpp" -#include "external_types.hpp" extern "C" { @@ -77,7 +79,7 @@ const CassPrepared* cass_future_get_prepared(CassFuture* future) { cass::Prepared* prepared = new cass::Prepared(result, response_future->statement, - response_future->schema_metadata); + *response_future->schema_metadata); if (prepared) prepared->inc_ref(); return CassPrepared::to(prepared); } @@ -100,7 +102,7 @@ const CassErrorResult* cass_future_get_error_result(CassFuture* future) { } CassError cass_future_error_code(CassFuture* future) { - const cass::Future::Error* error = future->get_error(); + const cass::Future::Error* error = future->error(); if (error != NULL) { return error->code; } else { @@ -111,7 +113,7 @@ CassError cass_future_error_code(CassFuture* future) { void cass_future_error_message(CassFuture* future, const char** message, size_t* message_length) { - const cass::Future::Error* error = future->get_error(); + const cass::Future::Error* error = future->error(); if (error != NULL) { const std::string& m = error->message; *message = m.data(); @@ -182,39 +184,13 @@ void Future::internal_set(ScopedMutex& lock) { is_set_ = true; uv_cond_broadcast(&cond_); if (callback_) { - if (loop_.load() == NULL) { - Callback callback = callback_; - void* data = data_; - lock.unlock(); - callback(CassFuture::to(this), data); - } else { - run_callback_on_work_thread(); - } + Callback callback = callback_; + void* data = data_; + lock.unlock(); + callback(CassFuture::to(this), data); } } -void Future::run_callback_on_work_thread() { - inc_ref(); // Keep the future alive for the callback - work_.data = this; - uv_queue_work(loop_.load(), &work_, on_work, on_after_work); -} - -void Future::on_work(uv_work_t* work) { - Future* future = static_cast(work->data); - - ScopedMutex lock(&future->mutex_); - Callback callback = future->callback_; - void* data = future->data_; - lock.unlock(); - - callback(CassFuture::to(future), data); -} - -void Future::on_after_work(uv_work_t* work, int status) { - Future* future = static_cast(work->data); - future->dec_ref(); -} - } // namespace cass diff --git a/src/future.hpp b/src/future.hpp index c190e4099..2d44cc468 100644 --- a/src/future.hpp +++ b/src/future.hpp @@ -19,6 +19,7 @@ #include "atomic.hpp" #include "cassandra.h" +#include "external.hpp" #include "host.hpp" #include "macros.hpp" #include "scoped_lock.hpp" @@ -82,7 +83,7 @@ class Future : public RefCounted { return internal_wait_for(lock, timeout_us); } - Error* get_error() { + Error* error() { ScopedMutex lock(&mutex_); internal_wait(lock); return error_.get(); @@ -129,18 +130,12 @@ class Future : public RefCounted { uv_mutex_t mutex_; -private: - void run_callback_on_work_thread(); - static void on_work(uv_work_t* work); - static void on_after_work(uv_work_t* work, int status); - private: bool is_set_; uv_cond_t cond_; FutureType type_; ScopedPtr error_; Atomic loop_; - uv_work_t work_; Callback callback_; void* data_; @@ -150,4 +145,6 @@ class Future : public RefCounted { } // namespace cass +EXTERNAL_TYPE(cass::Future, CassFuture) + #endif diff --git a/src/host_targeting_policy.cpp b/src/host_targeting_policy.cpp new file mode 100644 index 000000000..59b104b8a --- /dev/null +++ b/src/host_targeting_policy.cpp @@ -0,0 +1,83 @@ +/* + Copyright (c) 2014-2016 DataStax + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "host_targeting_policy.hpp" + +namespace cass { + +void HostTargetingPolicy::init(const SharedRefPtr& connected_host, + const cass::HostMap& hosts, + Random* random) { + for (cass::HostMap::const_iterator i = hosts.begin(), + end = hosts.end(); i != end; ++i) { + available_hosts_[i->first] = i->second; + } + ChainedLoadBalancingPolicy::init(connected_host, hosts, random); +} + +QueryPlan* HostTargetingPolicy::new_query_plan(const std::string& connected_keyspace, + RequestHandler* request_handler, + const TokenMap* token_map) { + QueryPlan* child_plan = child_policy_->new_query_plan(connected_keyspace, + request_handler, + token_map); + if (request_handler == NULL || + !request_handler->preferred_address().is_valid()) { + return child_plan; + } + + HostMap::const_iterator i = available_hosts_.find(request_handler->preferred_address()); + if (i == available_hosts_.end()) { + return child_plan; + } + + return new HostTargetingQueryPlan(i->second, child_plan); +} + +void HostTargetingPolicy::on_add(const SharedRefPtr& host) { + available_hosts_[host->address()] = host; + ChainedLoadBalancingPolicy::on_add(host); +} + +void HostTargetingPolicy::on_remove(const SharedRefPtr& host) { + available_hosts_.erase(host->address()); + ChainedLoadBalancingPolicy::on_remove(host); +} + +void HostTargetingPolicy::on_up(const SharedRefPtr& host) { + available_hosts_[host->address()] = host; + ChainedLoadBalancingPolicy::on_up(host); +} + +void HostTargetingPolicy::on_down(const SharedRefPtr& host) { + available_hosts_.erase(host->address()); + ChainedLoadBalancingPolicy::on_down(host); +} + +SharedRefPtr HostTargetingPolicy::HostTargetingQueryPlan::compute_next() { + if (first_) { + first_ = false; + return preferred_host_; + } else { + Host::Ptr next = child_plan_->compute_next(); + if (next && next->address() == preferred_host_->address()) { + return child_plan_->compute_next(); + } + return next; + } +} + +} // namespace cass diff --git a/src/host_targeting_policy.hpp b/src/host_targeting_policy.hpp new file mode 100644 index 000000000..fbd4457a6 --- /dev/null +++ b/src/host_targeting_policy.hpp @@ -0,0 +1,75 @@ +/* + Copyright (c) 2014-2016 DataStax + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __CASS_HOST_TARGETING_POLICY_HPP_INCLUDED__ +#define __CASS_HOST_TARGETING_POLICY_HPP_INCLUDED__ + +#include "address.hpp" +#include "load_balancing.hpp" +#include "request_handler.hpp" + +#include + +namespace cass { + +class HostTargetingPolicy : public ChainedLoadBalancingPolicy { +public: + HostTargetingPolicy(LoadBalancingPolicy* child_policy) + : ChainedLoadBalancingPolicy(child_policy) { + available_hosts_.set_empty_key(Address::EMPTY_KEY); + available_hosts_.set_deleted_key(Address::DELETED_KEY); + } + + virtual void init(const SharedRefPtr& connected_host, + const cass::HostMap& hosts, Random* random); + + virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, + RequestHandler* request_handler, + const TokenMap* token_map); + + virtual LoadBalancingPolicy* new_instance() { + return new HostTargetingPolicy(child_policy_->new_instance()); + } + + virtual void on_add(const SharedRefPtr& host); + virtual void on_remove(const SharedRefPtr& host); + virtual void on_up(const SharedRefPtr& host); + virtual void on_down(const SharedRefPtr& host); + +private: + class HostTargetingQueryPlan : public QueryPlan { + public: + HostTargetingQueryPlan(const Host::Ptr& preferred_host, QueryPlan* child_plan) + : first_(true) + , preferred_host_(preferred_host) + , child_plan_(child_plan) { } + + virtual SharedRefPtr compute_next(); + + private: + bool first_; + Host::Ptr preferred_host_; + ScopedPtr child_plan_; + }; + +private: + typedef sparsehash::dense_hash_map HostMap; + HostMap available_hosts_; +}; + +} // namespace cass + +#endif diff --git a/src/iterator.cpp b/src/iterator.cpp index 8b75ae25b..0d9682a2f 100644 --- a/src/iterator.cpp +++ b/src/iterator.cpp @@ -17,7 +17,7 @@ #include "iterator.hpp" #include "collection_iterator.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "map_iterator.hpp" #include "result_iterator.hpp" #include "row_iterator.hpp" diff --git a/src/iterator.hpp b/src/iterator.hpp index 1628861dd..a450de4d2 100644 --- a/src/iterator.hpp +++ b/src/iterator.hpp @@ -18,6 +18,7 @@ #define __CASS_ITERATOR_HPP_INCLUDED__ #include "cassandra.h" +#include "external.hpp" namespace cass { @@ -38,4 +39,6 @@ class Iterator { } // namespace cass +EXTERNAL_TYPE(cass::Iterator, CassIterator) + #endif diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index ce0061f3b..64d1462af 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -51,12 +51,12 @@ void LatencyAwarePolicy::close_handles() { } QueryPlan* LatencyAwarePolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) { + RequestHandler* request_handler, + const TokenMap* token_map) { return new LatencyAwareQueryPlan(this, - child_policy_->new_query_plan(connected_keyspace, request, - token_map, cache)); + child_policy_->new_query_plan(connected_keyspace, + request_handler, + token_map)); } void LatencyAwarePolicy::on_add(const SharedRefPtr& host) { diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index 824608e70..173f5c0c0 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -57,9 +57,8 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual void close_handles(); virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache); + RequestHandler* request_handler, + const TokenMap* token_map); virtual LoadBalancingPolicy* new_instance() { return new LatencyAwarePolicy(child_policy_->new_instance(), settings_); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index aea9b9ddb..be3ab8fa3 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -47,13 +47,11 @@ CassHostDistance ListPolicy::distance(const SharedRefPtr& host) const { } QueryPlan* ListPolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) { + RequestHandler* request_handler, + const TokenMap* token_map) { return child_policy_->new_query_plan(connected_keyspace, - request, - token_map, - cache); + request_handler, + token_map); } void ListPolicy::on_add(const SharedRefPtr& host) { diff --git a/src/list_policy.hpp b/src/list_policy.hpp index 32cff537a..0be338298 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -35,9 +35,8 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual CassHostDistance distance(const SharedRefPtr& host) const; virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache); + RequestHandler* request_handler, + const TokenMap* token_map); virtual void on_add(const SharedRefPtr& host); virtual void on_remove(const SharedRefPtr& host); diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index 44e89e39d..531dc875f 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -52,7 +52,7 @@ typedef enum CassHostDistance_ { namespace cass { class Random; -class RoutableRequest; +class RequestHandler; class TokenMap; inline bool is_dc_local(CassConsistency cl) { @@ -89,9 +89,8 @@ class LoadBalancingPolicy : public Host::StateListener, public RefCounted& host) const = 0; virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) = 0; + RequestHandler* request_handler, + const TokenMap* token_map) = 0; virtual LoadBalancingPolicy* new_instance() = 0; }; diff --git a/src/metadata.cpp b/src/metadata.cpp index 9e223e439..f95c67c89 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -17,8 +17,9 @@ #include "metadata.hpp" #include "buffer.hpp" +#include "collection.hpp" #include "collection_iterator.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "iterator.hpp" #include "logger.hpp" #include "map_iterator.hpp" diff --git a/src/metadata.hpp b/src/metadata.hpp index 5b36339c5..a19d2baa3 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -18,6 +18,7 @@ #define __CASS_SCHEMA_METADATA_HPP_INCLUDED__ #include "copy_on_write_ptr.hpp" +#include "external.hpp" #include "host.hpp" #include "iterator.hpp" #include "macros.hpp" @@ -719,6 +720,15 @@ class Metadata { } // namespace cass +EXTERNAL_TYPE(cass::Metadata::SchemaSnapshot, CassSchemaMeta) +EXTERNAL_TYPE(cass::KeyspaceMetadata, CassKeyspaceMeta) +EXTERNAL_TYPE(cass::TableMetadata, CassTableMeta) +EXTERNAL_TYPE(cass::ViewMetadata, CassMaterializedViewMeta) +EXTERNAL_TYPE(cass::ColumnMetadata, CassColumnMeta) +EXTERNAL_TYPE(cass::IndexMetadata, CassIndexMeta) +EXTERNAL_TYPE(cass::FunctionMetadata, CassFunctionMeta) +EXTERNAL_TYPE(cass::AggregateMetadata, CassAggregateMeta) + #endif diff --git a/src/prepared.cpp b/src/prepared.cpp index a0bbe4786..6d05f82ce 100644 --- a/src/prepared.cpp +++ b/src/prepared.cpp @@ -18,7 +18,7 @@ #include "execute_request.hpp" #include "logger.hpp" -#include "external_types.hpp" +#include "external.hpp" extern "C" { diff --git a/src/prepared.hpp b/src/prepared.hpp index e4a39b1cc..99e6b0d17 100644 --- a/src/prepared.hpp +++ b/src/prepared.hpp @@ -17,6 +17,7 @@ #ifndef __CASS_PREPARED_HPP_INCLUDED__ #define __CASS_PREPARED_HPP_INCLUDED__ +#include "external.hpp" #include "ref_counted.hpp" #include "result_response.hpp" #include "metadata.hpp" @@ -46,4 +47,6 @@ class Prepared : public RefCounted { } // namespace cass +EXTERNAL_TYPE(cass::Prepared, CassPrepared) + #endif diff --git a/src/request.cpp b/src/request.cpp index 051015be4..105d1f855 100644 --- a/src/request.cpp +++ b/src/request.cpp @@ -16,7 +16,7 @@ #include "request.hpp" -#include "external_types.hpp" +#include "external.hpp" extern "C" { diff --git a/src/request.hpp b/src/request.hpp index 128f83dda..a2b2be9d4 100644 --- a/src/request.hpp +++ b/src/request.hpp @@ -20,6 +20,7 @@ #include "buffer.hpp" #include "cassandra.h" #include "constants.hpp" +#include "external.hpp" #include "macros.hpp" #include "ref_counted.hpp" #include "retry_policy.hpp" @@ -146,4 +147,7 @@ class RoutableRequest : public Request { } // namespace cass +EXTERNAL_TYPE(cass::CustomPayload, CassCustomPayload) + + #endif diff --git a/src/request_handler.cpp b/src/request_handler.cpp index 842f5120f..aa3c0244a 100644 --- a/src/request_handler.cpp +++ b/src/request_handler.cpp @@ -105,7 +105,7 @@ void RequestHandler::set_error(CassError code, const std::string& message) { if (is_query_plan_exhausted_) { future_->set_error(code, message); } else { - future_->set_error_with_host_address(current_host_->address(), code, message); + future_->set_error_with_address(current_host_->address(), code, message); } return_connection_and_finish(); } diff --git a/src/request_handler.hpp b/src/request_handler.hpp index d63bf65dc..48805c693 100644 --- a/src/request_handler.hpp +++ b/src/request_handler.hpp @@ -41,9 +41,13 @@ class Timer; class ResponseFuture : public Future { public: - ResponseFuture(int protocol_version, const VersionNumber& cassandra_version, const Metadata& metadata) + ResponseFuture() + : Future(CASS_FUTURE_TYPE_RESPONSE) { } + + + ResponseFuture(const Metadata::SchemaSnapshot& schema_metadata) : Future(CASS_FUTURE_TYPE_RESPONSE) - , schema_metadata(metadata.schema_snapshot(protocol_version, cassandra_version)) { } + , schema_metadata(new Metadata::SchemaSnapshot(schema_metadata)) { } void set_response(Address address, const SharedRefPtr& response) { ScopedMutex lock(&mutex_); @@ -58,7 +62,7 @@ class ResponseFuture : public Future { return response_; } - void set_error_with_host_address(Address address, CassError code, const std::string& message) { + void set_error_with_address(Address address, CassError code, const std::string& message) { ScopedMutex lock(&mutex_); address_ = address; internal_set_error(code, message, lock); @@ -72,14 +76,14 @@ class ResponseFuture : public Future { internal_set_error(code, message, lock); } - Address get_host_address() { + Address address() { ScopedMutex lock(&mutex_); internal_wait(lock); return address_; } std::string statement; - Metadata::SchemaSnapshot schema_metadata; + ScopedPtr schema_metadata; private: Address address_; @@ -108,6 +112,14 @@ class RequestHandler : public Handler { virtual void retry(); + const Address& preferred_address() const { + return preferred_address_; + } + + void set_preferred_address(const Address& preferred_address) { + preferred_address_ = preferred_address; + } + void set_query_plan(QueryPlan* query_plan) { query_plan_.reset(query_plan); } @@ -144,6 +156,7 @@ class RequestHandler : public Handler { ScopedRefPtr future_; RetryPolicy* retry_policy_; int num_retries_; + Address preferred_address_; bool is_query_plan_exhausted_; SharedRefPtr current_host_; ScopedPtr query_plan_; diff --git a/src/result_response.cpp b/src/result_response.cpp index 0670ff44a..6f4cce662 100644 --- a/src/result_response.cpp +++ b/src/result_response.cpp @@ -16,7 +16,7 @@ #include "result_response.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "result_metadata.hpp" #include "serialization.hpp" diff --git a/src/result_response.hpp b/src/result_response.hpp index 0a174da82..7abb11d28 100644 --- a/src/result_response.hpp +++ b/src/result_response.hpp @@ -117,4 +117,6 @@ class ResultResponse : public Response { } // namespace cass +EXTERNAL_TYPE(cass::ResultResponse, CassResult) + #endif diff --git a/src/retry_policy.cpp b/src/retry_policy.cpp index 3c643adc3..fa55473d8 100644 --- a/src/retry_policy.cpp +++ b/src/retry_policy.cpp @@ -16,7 +16,7 @@ #include "retry_policy.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "logger.hpp" extern "C" { diff --git a/src/retry_policy.hpp b/src/retry_policy.hpp index fd4c83b66..f93ea13dd 100644 --- a/src/retry_policy.hpp +++ b/src/retry_policy.hpp @@ -18,6 +18,7 @@ #define __CASS_RETRY_POLICY_HPP_INCLUDED__ #include "cassandra.h" +#include "external.hpp" #include "ref_counted.hpp" #ifdef _WIN32 @@ -137,5 +138,7 @@ class LoggingRetryPolicy : public RetryPolicy { } // namespace cass +EXTERNAL_TYPE(cass::RetryPolicy, CassRetryPolicy) + #endif diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp index f4405f65f..9201fb021 100644 --- a/src/round_robin_policy.cpp +++ b/src/round_robin_policy.cpp @@ -36,9 +36,8 @@ CassHostDistance RoundRobinPolicy::distance(const SharedRefPtr& host) cons } QueryPlan* RoundRobinPolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) { + RequestHandler* request_handler, + const TokenMap* token_map) { return new RoundRobinQueryPlan(hosts_, index_++); } diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index 14fa780a8..2c71f5310 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -36,9 +36,8 @@ class RoundRobinPolicy : public LoadBalancingPolicy { virtual CassHostDistance distance(const SharedRefPtr& host) const; virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache); + RequestHandler* request_handler, + const TokenMap* token_map); virtual void on_add(const SharedRefPtr& host); virtual void on_remove(const SharedRefPtr& host); diff --git a/src/row.cpp b/src/row.cpp index 065989205..5eee47e51 100644 --- a/src/row.cpp +++ b/src/row.cpp @@ -16,9 +16,10 @@ #include "row.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "result_metadata.hpp" #include "result_response.hpp" +#include "serialization.hpp" #include "string_ref.hpp" extern "C" { diff --git a/src/row.hpp b/src/row.hpp index 1e762e6c3..0a9bde301 100644 --- a/src/row.hpp +++ b/src/row.hpp @@ -17,6 +17,7 @@ #ifndef __CASS_ROW_HPP_INCLUDED__ #define __CASS_ROW_HPP_INCLUDED__ +#include "external.hpp" #include "string_ref.hpp" #include "value.hpp" @@ -52,4 +53,6 @@ char* decode_row(char* row, const ResultResponse* result, OutputValueVec& output } // namespace cass +EXTERNAL_TYPE(cass::Row, CassRow) + #endif diff --git a/src/session.cpp b/src/session.cpp index b44f2fe18..deacdaf9c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -16,14 +16,17 @@ #include "session.hpp" +#include "batch_request.hpp" +#include "cluster.hpp" #include "config.hpp" #include "constants.hpp" #include "logger.hpp" #include "prepare_request.hpp" #include "request_handler.hpp" #include "scoped_lock.hpp" +#include "statement.hpp" #include "timer.hpp" -#include "external_types.hpp" +#include "external.hpp" extern "C" { @@ -579,7 +582,7 @@ Future* Session::prepare(const char* statement, size_t length) { PrepareRequest* prepare = new PrepareRequest(); prepare->set_query(statement, length); - ResponseFuture* future = new ResponseFuture(protocol_version(), cassandra_version(), metadata_); + ResponseFuture* future = new ResponseFuture(metadata_.schema_snapshot(protocol_version(), cassandra_version())); future->inc_ref(); // External reference future->statement.assign(statement, length); @@ -668,8 +671,9 @@ void Session::on_down(SharedRefPtr host) { } } -Future* Session::execute(const RoutableRequest* request) { - ResponseFuture* future = new ResponseFuture(protocol_version(), cassandra_version(), metadata_); +Future* Session::execute(const RoutableRequest* request, + const Address* preferred_address) { + ResponseFuture* future = new ResponseFuture(); future->inc_ref(); // External reference RetryPolicy* retry_policy @@ -679,6 +683,9 @@ Future* Session::execute(const RoutableRequest* request) { RequestHandler* request_handler = new RequestHandler(request, future, retry_policy); + if (preferred_address) { + request_handler->set_preferred_address(*preferred_address); + } request_handler->inc_ref(); // IOWorker reference execute(request_handler); @@ -698,8 +705,7 @@ void Session::on_execute(uv_async_t* data) { RequestHandler* request_handler = NULL; while (session->request_queue_->dequeue(request_handler)) { if (request_handler != NULL) { - request_handler->set_query_plan(session->new_query_plan(request_handler->request(), - request_handler->encoding_cache())); + request_handler->set_query_plan(session->new_query_plan(request_handler)); if (request_handler->timestamp() == CASS_INT64_MIN) { request_handler->set_timestamp(session->config_.timestamp_gen()->next()); @@ -743,9 +749,9 @@ void Session::on_execute(uv_async_t* data) { } } -QueryPlan* Session::new_query_plan(const Request* request, Request::EncodingCache* cache) { +QueryPlan* Session::new_query_plan(RequestHandler* request_handler) { const CopyOnWritePtr keyspace(keyspace_); - return load_balancing_policy_->new_query_plan(*keyspace, request, token_map_.get(), cache); + return load_balancing_policy_->new_query_plan(*keyspace, request_handler, token_map_.get()); } } // namespace cass diff --git a/src/session.hpp b/src/session.hpp index 56e6c357b..fa25751a0 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,6 +20,7 @@ #include "config.hpp" #include "control_connection.hpp" #include "event_thread.hpp" +#include "external.hpp" #include "future.hpp" #include "host.hpp" #include "io_worker.hpp" @@ -35,7 +36,6 @@ #include "scoped_ptr.hpp" #include "token_map.hpp" -#include #include #include #include @@ -101,7 +101,8 @@ class Session : public EventThread { void close_async(Future* future, bool force = false); Future* prepare(const char* statement, size_t length); - Future* execute(const RoutableRequest* statement); + Future* execute(const RoutableRequest* statement, + const Address* preferred_address = NULL); const Metadata& metadata() const { return metadata_; } @@ -159,7 +160,7 @@ class Session : public EventThread { static void on_execute(uv_async_t* data); #endif - QueryPlan* new_query_plan(const Request* request = NULL, Request::EncodingCache* cache = NULL); + QueryPlan* new_query_plan(RequestHandler* request_handler = NULL); void on_reconnect(Timer* timer); @@ -222,4 +223,6 @@ class SessionFuture : public Future { } // namespace cass +EXTERNAL_TYPE(cass::Session, CassSession) + #endif diff --git a/src/ssl.cpp b/src/ssl.cpp index 2d5b91347..8e0d73d0e 100644 --- a/src/ssl.cpp +++ b/src/ssl.cpp @@ -17,7 +17,7 @@ #include "ssl.hpp" #include "cassandra.h" -#include "external_types.hpp" +#include "external.hpp" #include diff --git a/src/ssl.hpp b/src/ssl.hpp index 0b69cc4e1..09f0d0d19 100644 --- a/src/ssl.hpp +++ b/src/ssl.hpp @@ -19,6 +19,7 @@ #include "cassandra.h" #include "cassconfig.hpp" +#include "external.hpp" #include "host.hpp" #include "ref_counted.hpp" #include "ring_buffer.hpp" @@ -107,4 +108,6 @@ class SslContextFactoryBase { #include "ssl/ssl_no_impl.hpp" #endif +EXTERNAL_TYPE(cass::SslContext, CassSsl) + #endif diff --git a/src/statement.cpp b/src/statement.cpp index 2329003c8..a634d8d82 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -18,12 +18,14 @@ #include "collection.hpp" #include "execute_request.hpp" -#include "external_types.hpp" +#include "external.hpp" +#include "handler.hpp" #include "macros.hpp" #include "prepared.hpp" #include "query_request.hpp" #include "scoped_ptr.hpp" #include "string_ref.hpp" +#include "tuple.hpp" #include "user_type_value.hpp" #include diff --git a/src/statement.hpp b/src/statement.hpp index b435dabc0..aef3b0c2e 100644 --- a/src/statement.hpp +++ b/src/statement.hpp @@ -19,6 +19,7 @@ #include "abstract_data.hpp" #include "constants.hpp" +#include "external.hpp" #include "macros.hpp" #include "request.hpp" #include "result_metadata.hpp" @@ -113,4 +114,6 @@ class Statement : public RoutableRequest, public AbstractData { } // namespace cass +EXTERNAL_TYPE(cass::Statement, CassStatement) + #endif diff --git a/src/testing.cpp b/src/testing.cpp index b1153499b..3637eb38a 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -17,12 +17,16 @@ #include "testing.hpp" #include "address.hpp" +#include "cluster.hpp" +#include "external.hpp" +#include "future.hpp" #include "get_time.hpp" #include "logger.hpp" #include "metadata.hpp" #include "murmur3.hpp" +#include "request_handler.hpp" #include "result_response.hpp" -#include "external_types.hpp" +#include "session.hpp" namespace cass { @@ -32,7 +36,7 @@ std::string get_host_from_future(CassFuture* future) { } cass::ResponseFuture* response_future = static_cast(future->from()); - return response_future->get_host_address().to_string(); + return response_future->address().to_string(); } unsigned get_connect_timeout_from_cluster(CassCluster* cluster) { diff --git a/src/third_party/sparsehash/CMakeLists.txt b/src/third_party/sparsehash/CMakeLists.txt index e604ec395..703210f87 100644 --- a/src/third_party/sparsehash/CMakeLists.txt +++ b/src/third_party/sparsehash/CMakeLists.txt @@ -104,5 +104,5 @@ check_type_size("uint16_t" UINT16_T) check_type_size("u_int16_t" U_INT16_T) check_type_size("__uint16_t" __UINT16_T) -configure_file("${PROJECT_SOURCE_DIR}/src/third_party/sparsehash/config.h.cmake" - "${PROJECT_SOURCE_DIR}/src/third_party/sparsehash/src/sparsehash/internal/sparseconfig.h") +configure_file("${CASS_SOURCE_DIR}/src/third_party/sparsehash/config.h.cmake" + "${CASS_SOURCE_DIR}/src/third_party/sparsehash/src/sparsehash/internal/sparseconfig.h") diff --git a/src/timestamp_generator.cpp b/src/timestamp_generator.cpp index 63f05511b..b9f096970 100644 --- a/src/timestamp_generator.cpp +++ b/src/timestamp_generator.cpp @@ -16,7 +16,7 @@ #include "timestamp_generator.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "get_time.hpp" #include "logger.hpp" diff --git a/src/timestamp_generator.hpp b/src/timestamp_generator.hpp index afcd36eb9..bab72b065 100644 --- a/src/timestamp_generator.hpp +++ b/src/timestamp_generator.hpp @@ -19,6 +19,7 @@ #include "atomic.hpp" #include "constants.hpp" +#include "external.hpp" #include "macros.hpp" #include "ref_counted.hpp" #include "request.hpp" @@ -74,5 +75,7 @@ class MonotonicTimestampGenerator : public TimestampGenerator { } // namespace cass +EXTERNAL_TYPE(cass::TimestampGenerator, CassTimestampGen) + #endif diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index 47110db02..0aa60ee00 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -17,6 +17,7 @@ #include "token_aware_policy.hpp" #include "random.hpp" +#include "request_handler.hpp" #include @@ -44,26 +45,27 @@ void TokenAwarePolicy::init(const SharedRefPtr& connected_host, } QueryPlan* TokenAwarePolicy::new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache) { - if (request != NULL) { + RequestHandler* request_handler, + const TokenMap* token_map) { + if (request_handler != NULL) { + const RoutableRequest* request = static_cast(request_handler->request()); switch (request->opcode()) { { case CQL_OPCODE_QUERY: case CQL_OPCODE_EXECUTE: case CQL_OPCODE_BATCH: - const RoutableRequest* rr = static_cast(request); - const std::string& statement_keyspace = rr->keyspace(); + const std::string& statement_keyspace = request->keyspace(); const std::string& keyspace = statement_keyspace.empty() ? connected_keyspace : statement_keyspace; std::string routing_key; - if (rr->get_routing_key(&routing_key, cache) && !keyspace.empty()) { + if (request->get_routing_key(&routing_key, request_handler->encoding_cache()) && !keyspace.empty()) { if (token_map != NULL) { CopyOnWriteHostVec replicas = token_map->get_replicas(keyspace, routing_key); if (replicas && !replicas->empty()) { return new TokenAwareQueryPlan(child_policy_.get(), - child_policy_->new_query_plan(connected_keyspace, request, token_map, cache), + child_policy_->new_query_plan(connected_keyspace, + request_handler, + token_map), replicas, index_++); } @@ -76,7 +78,9 @@ QueryPlan* TokenAwarePolicy::new_query_plan(const std::string& connected_keyspac break; } } - return child_policy_->new_query_plan(connected_keyspace, request, token_map, cache); + return child_policy_->new_query_plan(connected_keyspace, + request_handler, + token_map); } SharedRefPtr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 2b86f66fb..af66f2214 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -35,9 +35,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual void init(const SharedRefPtr& connected_host, const HostMap& hosts, Random* random); virtual QueryPlan* new_query_plan(const std::string& connected_keyspace, - const Request* request, - const TokenMap* token_map, - Request::EncodingCache* cache); + RequestHandler* request_handler, + const TokenMap* token_map); LoadBalancingPolicy* new_instance() { return new TokenAwarePolicy(child_policy_->new_instance()); } diff --git a/src/tuple.cpp b/src/tuple.cpp index 5f8579292..c4586d8d2 100644 --- a/src/tuple.cpp +++ b/src/tuple.cpp @@ -14,11 +14,12 @@ limitations under the License. */ -#include "collection.hpp" +#include "tuple.hpp" +#include "collection.hpp" #include "constants.hpp" #include "encode.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "macros.hpp" #include "user_type_value.hpp" diff --git a/src/tuple.hpp b/src/tuple.hpp index ea3ba145c..2018caf3a 100644 --- a/src/tuple.hpp +++ b/src/tuple.hpp @@ -20,6 +20,7 @@ #include "cassandra.h" #include "data_type.hpp" #include "encode.hpp" +#include "external.hpp" #include "buffer.hpp" #include "ref_counted.hpp" #include "types.hpp" @@ -108,5 +109,7 @@ class Tuple { } // namespace cass +EXTERNAL_TYPE(cass::Tuple, CassTuple) + #endif diff --git a/src/user_type_value.cpp b/src/user_type_value.cpp index ddfad02b5..e53e724b7 100644 --- a/src/user_type_value.cpp +++ b/src/user_type_value.cpp @@ -18,7 +18,8 @@ #include "collection.hpp" #include "macros.hpp" -#include "external_types.hpp" +#include "external.hpp" +#include "tuple.hpp" #include "utils.hpp" #include diff --git a/src/user_type_value.hpp b/src/user_type_value.hpp index 1e60ee60a..af03c76d8 100644 --- a/src/user_type_value.hpp +++ b/src/user_type_value.hpp @@ -20,6 +20,7 @@ #include "abstract_data.hpp" #include "cassandra.h" #include "data_type.hpp" +#include "external.hpp" #include "ref_counted.hpp" namespace cass { @@ -50,4 +51,6 @@ class UserTypeValue : public AbstractData { } // namespace cass +EXTERNAL_TYPE(cass::UserTypeValue, CassUserType) + #endif diff --git a/src/uuids.cpp b/src/uuids.cpp index e891dcba3..14355307e 100644 --- a/src/uuids.cpp +++ b/src/uuids.cpp @@ -22,7 +22,7 @@ #include "md5.hpp" #include "serialization.hpp" #include "scoped_lock.hpp" -#include "external_types.hpp" +#include "external.hpp" #include #include diff --git a/src/uuids.hpp b/src/uuids.hpp index 857fab6ac..941cadf83 100644 --- a/src/uuids.hpp +++ b/src/uuids.hpp @@ -19,6 +19,7 @@ #include "atomic.hpp" #include "cassandra.h" +#include "external.hpp" #include "random.hpp" #include @@ -50,4 +51,6 @@ class UuidGen { } // namespace cass +EXTERNAL_TYPE(cass::UuidGen, CassUuidGen) + #endif diff --git a/src/value.cpp b/src/value.cpp index 55bba4758..ffe0daa84 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -18,7 +18,7 @@ #include "collection_iterator.hpp" #include "data_type.hpp" -#include "external_types.hpp" +#include "external.hpp" #include "serialization.hpp" extern "C" { diff --git a/src/value.hpp b/src/value.hpp index 81e8c6cd2..c0cc3da44 100644 --- a/src/value.hpp +++ b/src/value.hpp @@ -18,6 +18,7 @@ #define __CASS_VALUE_HPP_INCLUDED__ #include "cassandra.h" +#include "external.hpp" #include "result_metadata.hpp" #include "string_ref.hpp" @@ -161,4 +162,6 @@ typedef std::vector OutputValueVec; } // namespace cass +EXTERNAL_TYPE(cass::Value, CassValue) + #endif diff --git a/test/ccm_bridge/data/config.txt b/test/ccm_bridge/data/config.txt index e671d3ebc..a73eb6c04 100644 --- a/test/ccm_bridge/data/config.txt +++ b/test/ccm_bridge/data/config.txt @@ -27,6 +27,18 @@ ## #BRANCH_TAG= ## +# Flag to determine if Cassandra/DSE version should be obtained from ASF/GitHub +# +# Uncomment to specify use of installation directory +## +#USE_INSTALL_DIR=false +## +# Cassandra/DSE installation directory to utilize when creating CCM cluster +# +# Uncomment to specify installation directory to use +## +#INSTALL_DIR=/path/to/installation +## # CCM Deployment Type (local|remote) # # Setting to indicate how CCM commands should be run (locally or through SSH) @@ -52,7 +64,7 @@ # # Uncomment to specify DSE version ## -#DSE_VERSION=4.8.8 +#DSE_VERSION=5.0.2 ## # CCM DSE Credentials Type (username_password|ini_file) # diff --git a/test/ccm_bridge/src/bridge.cpp b/test/ccm_bridge/src/bridge.cpp index 4694c355a..5ea04e38e 100644 --- a/test/ccm_bridge/src/bridge.cpp +++ b/test/ccm_bridge/src/bridge.cpp @@ -113,6 +113,8 @@ typedef SSIZE_T ssize_t; #define CCM_CONFIGURATION_KEY_CASSANDRA_VERSION "cassandra_version" #define CCM_CONFIGURATION_KEY_USE_GIT "use_git" #define CCM_CONFIGURATION_KEY_BRANCH_TAG "branch_tag" +#define CCM_CONFIGURATION_KEY_USE_INSTALL_DIR "use_install_dir" +#define CCM_CONFIGURATION_KEY_INSTALL_DIR "install_dir" #define CCM_CONFIGURATION_KEY_DEPLOYMENT_TYPE "deployment_type" #define CCM_CONFIGURATION_KEY_USE_DSE "use_dse" #define CCM_CONFIGURATION_KEY_DSE_VERSION "dse_version" @@ -139,14 +141,22 @@ const std::string DSE_WORKLOADS[] = { }; const std::vector CCM::Bridge::dse_workloads_(DSE_WORKLOADS, DSE_WORKLOADS + sizeof(DSE_WORKLOADS) / sizeof(DSE_WORKLOADS[0])); +const CCM::DseWorkload DEFAULT_WORKLOAD[] = { + CCM::DSE_WORKLOAD_CASSANDRA +}; +const std::vector CCM::Bridge::DEFAULT_DSE_WORKLOAD( + DEFAULT_WORKLOAD, DEFAULT_WORKLOAD + + sizeof(DEFAULT_WORKLOAD) / sizeof(DEFAULT_WORKLOAD[0])); using namespace CCM; CCM::Bridge::Bridge(CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, bool use_git /*= DEFAULT_USE_GIT*/, const std::string& branch_tag /* ""*/, + bool use_install_dir /*=DEFAULT_USE_INSTALL_DIR*/, + const std::string& install_dir /*=""*/, bool use_dse /*= DEFAULT_USE_DSE*/, - DseWorkload dse_workload /*= DEFAULT_DSE_WORKLOAD*/, + std::vector dse_workload /*= DEFAULT_DSE_WORKLOAD*/, const std::string& cluster_prefix /*= DEFAULT_CLUSTER_PREFIX*/, DseCredentialsType dse_credentials_type /*= DEFAULT_DSE_CREDENTIALS*/, const std::string& dse_username /*= ""*/, @@ -163,6 +173,8 @@ CCM::Bridge::Bridge(CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, , dse_version_(DEFAULT_DSE_VERSION) , use_git_(use_git) , branch_tag_(branch_tag) + , use_install_dir_(use_install_dir) + , install_dir_(install_dir) , use_dse_(use_dse) , dse_workload_(dse_workload) , cluster_prefix_(cluster_prefix) @@ -192,6 +204,11 @@ CCM::Bridge::Bridge(CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, dse_version_ = DseVersion(server_version.to_string()); cassandra_version_ = dse_version_.get_cass_version(); } + + // Determine if installation directory can be used + if (use_install_dir_ && install_dir_.empty()) { + throw BridgeException("Unable to use Installation Directory: Directory must not be blank"); + } #ifdef CASS_USE_LIBSSH2 // Determine if libssh2 needs to be initialized if (deployment_type_ == DeploymentType::REMOTE) { @@ -217,6 +234,7 @@ CCM::Bridge::Bridge(const std::string& configuration_file) : cassandra_version_(DEFAULT_CASSANDRA_VERSION) , dse_version_(DEFAULT_DSE_VERSION) , use_git_(DEFAULT_USE_GIT) + , use_install_dir_(DEFAULT_USE_INSTALL_DIR) , use_dse_(DEFAULT_USE_DSE) , dse_workload_(DEFAULT_DSE_WORKLOAD) , cluster_prefix_(DEFAULT_CLUSTER_PREFIX) @@ -233,7 +251,6 @@ CCM::Bridge::Bridge(const std::string& configuration_file) , deployment_type_(DeploymentType::LOCAL) , host_("127.0.0.1") { #endif - // Initialize the default remote configuration settings short port = DEFAULT_REMOTE_DEPLOYMENT_PORT; std::string username = DEFAULT_REMOTE_DEPLOYMENT_USERNAME; @@ -271,6 +288,17 @@ CCM::Bridge::Bridge(const std::string& configuration_file) } } else if (key.compare(CCM_CONFIGURATION_KEY_BRANCH_TAG) == 0) { branch_tag_ = value; + } else if (key.compare(CCM_CONFIGURATION_KEY_USE_INSTALL_DIR) == 0) { + //Convert the value + std::stringstream valueStream(value); + if (!(valueStream >> std::boolalpha >> use_install_dir_).fail()) { + continue; + } else { + LOG_ERROR("Invalid Flag [" << value << "] for Use Install Directory: Using default [" << DEFAULT_USE_INSTALL_DIR << "]"); + use_install_dir_ = DEFAULT_USE_INSTALL_DIR; + } + } else if (key.compare(CCM_CONFIGURATION_KEY_INSTALL_DIR) == 0) { + install_dir_ = value; } else if (key.compare(CCM_CONFIGURATION_KEY_USE_DSE) == 0) { //Convert the value std::stringstream valueStream(value); @@ -366,6 +394,11 @@ CCM::Bridge::Bridge(const std::string& configuration_file) cassandra_version_ = dse_version_.get_cass_version(); } + // Determine if installation directory can be used + if (use_install_dir_ && install_dir_.empty()) { + throw BridgeException("Unable to use Installation Directory: Directory must not be blank"); + } + // Display the configuration settings being used LOG("Host: " << host_); LOG("Cassandra Version: " << cassandra_version_.to_string()); @@ -375,6 +408,9 @@ CCM::Bridge::Bridge(const std::string& configuration_file) if (use_git_ && !branch_tag_.empty()) { LOG(" Branch/Tag: " << branch_tag_); } + if (use_install_dir_ && !install_dir_.empty()) { + LOG(" Installation Directory: " << install_dir_); + } LOG("Cluster Prefix: " << cluster_prefix_); LOG("Deployment Type: " << deployment_type_.to_string()); #ifdef CASS_USE_LIBSSH2 @@ -524,8 +560,11 @@ bool CCM::Bridge::create_cluster(std::vector data_center_nodes, std::string active_cluster_name = get_active_cluster(); std::string cluster_name = generate_cluster_name(cassandra_version_, data_center_nodes, with_vnodes, is_ssl, is_client_authentication); - if (use_dse_ && dse_workload_ != DSE_WORKLOAD_CASSANDRA) { - cluster_name.append("-").append(dse_workloads_[dse_workload_]); + for (std::vector::iterator iterator = dse_workload_.begin(); + iterator != dse_workload_.end(); ++iterator) { + if (use_dse_ && *iterator != DSE_WORKLOAD_CASSANDRA) { + cluster_name.append("-").append(dse_workloads_[*iterator]); + } } if (!switch_cluster(cluster_name)) { // Ensure any active cluster is stopped @@ -536,31 +575,35 @@ bool CCM::Bridge::create_cluster(std::vector data_center_nodes, // Create the cluster create command and execute std::vector create_command; create_command.push_back("create"); - create_command.push_back("-v"); - if (use_dse_) { - if (use_git_) { - if (branch_tag_.empty()) { - create_command.push_back("git:" + dse_version_.to_string()); + if (use_install_dir_ && !install_dir_.empty()) { + create_command.push_back("--install-dir=" + install_dir_); + } else { + create_command.push_back("-v"); + if (use_dse_) { + if (use_git_) { + if (branch_tag_.empty()) { + create_command.push_back("git:" + dse_version_.to_string()); + } else { + create_command.push_back("git:" + branch_tag_); + } } else { - create_command.push_back("git:" + branch_tag_); + create_command.push_back(dse_version_.to_string()); + } + create_command.push_back("--dse"); + if (dse_credentials_type_ == DseCredentialsType::USERNAME_PASSWORD) { + create_command.push_back("--dse-username=" + dse_username_); + create_command.push_back("--dse-password=" + dse_password_); } } else { - create_command.push_back(dse_version_.to_string()); - } - create_command.push_back("--dse"); - if (dse_credentials_type_ == DseCredentialsType::USERNAME_PASSWORD) { - create_command.push_back("--dse-username=" + dse_username_); - create_command.push_back("--dse-password=" + dse_password_); - } - } else { - if (use_git_) { - if (branch_tag_.empty()) { - create_command.push_back("git:cassandra-" + cassandra_version_.to_string()); + if (use_git_) { + if (branch_tag_.empty()) { + create_command.push_back("git:cassandra-" + cassandra_version_.to_string()); + } else { + create_command.push_back("git:" + branch_tag_); + } } else { - create_command.push_back("git:" + branch_tag_); + create_command.push_back(cassandra_version_.to_string()); } - } else { - create_command.push_back(cassandra_version_.to_string()); } } create_command.push_back("-b"); @@ -603,8 +646,9 @@ bool CCM::Bridge::create_cluster(std::vector data_center_nodes, } // Set the DSE workload (if applicable) - if (use_dse_ && dse_workload_ != DSE_WORKLOAD_CASSANDRA) { - set_dse_workload(dse_workload_); + if (use_dse_ && + !(dse_workload_.size() == 1 && dse_workload_[0] == DSE_WORKLOAD_CASSANDRA)) { + set_dse_workloads(dse_workload_); } } @@ -863,6 +907,16 @@ void CCM::Bridge::disable_node_gossip(unsigned int node) { execute_ccm_command(disable_node_gossip_command); } +void CCM::Bridge::disable_node_trace(unsigned int node) { + // Create the disable node trace command and execute + std::vector disable_node_trace_command; + disable_node_trace_command.push_back(generate_node_name(node)); + disable_node_trace_command.push_back("nodetool"); + disable_node_trace_command.push_back("settraceprobability"); + disable_node_trace_command.push_back("0"); + execute_ccm_command(disable_node_trace_command); +} + void CCM::Bridge::enable_node_binary_protocol(unsigned int node) { // Create the enable node binary protocol command and execute std::vector enable_node_binary_protocol_command; @@ -881,6 +935,16 @@ void CCM::Bridge::enable_node_gossip(unsigned int node) { execute_ccm_command(disable_node_gossip_command); } +void CCM::Bridge::enable_node_trace(unsigned int node) { + // Create the enable node trace command and execute + std::vector enable_node_trace_command; + enable_node_trace_command.push_back(generate_node_name(node)); + enable_node_trace_command.push_back("nodetool"); + enable_node_trace_command.push_back("settraceprobability"); + enable_node_trace_command.push_back("1"); + execute_ccm_command(enable_node_trace_command); +} + void CCM::Bridge::execute_cql_on_node(unsigned int node, const std::string& cql) { // Update the CQL statement for the command line std::stringstream execute_statement; @@ -1097,14 +1161,29 @@ DseVersion CCM::Bridge::get_dse_version(const std::string& configuration_file) { return dse_version; } -bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload, bool is_kill /*= false */) { - // Update the member variable - dse_workload_ = workload; +bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload, + bool is_kill /*= false */) { + std::vector workloads; + workloads.push_back(workload); + return set_dse_workloads(1, workloads, is_kill); +} + +bool CCM::Bridge::set_dse_workloads(unsigned int node, + std::vector workloads, bool is_kill /*= false */) { + // Ensure the workloads can be processed + if (workloads.empty()) { + throw BridgeException("No workloads to assign"); + } + + // Update the member variable with the workloads and generate workloads + dse_workload_.clear(); + dse_workload_ = workloads; + std::string dse_workloads = generate_dse_workloads(workloads); // Determine if the node is currently active/up bool was_node_active = false; if (!is_node_down(node)) { - LOG("Stopping Active Node to Set Workload: " << dse_workloads_[workload] + LOG("Stopping Active Node to Set Workload: " << dse_workloads << " workload on node " << node); stop_node(node, is_kill); was_node_active = true; @@ -1114,12 +1193,12 @@ bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload, bool std::vector dse_workload_command; dse_workload_command.push_back(generate_node_name(node)); dse_workload_command.push_back("setworkload"); - dse_workload_command.push_back(dse_workloads_[workload]); + dse_workload_command.push_back(dse_workloads); execute_ccm_command(dse_workload_command); // Determine if the node should be restarted if (was_node_active) { - LOG("Restarting Node to Apply Workload: " << dse_workloads_[workload] + LOG("Restarting Node to Apply Workload: " << dse_workloads << " workload on node " << node); start_node(node); } @@ -1127,11 +1206,25 @@ bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload, bool return was_node_active; } -bool CCM::Bridge::set_dse_workload(DseWorkload workload, bool is_kill /*= false */) { +bool CCM::Bridge::set_dse_workload(DseWorkload workload, + bool is_kill /*= false */) { + std::vector workloads; + workloads.push_back(workload); + return set_dse_workloads(workloads, is_kill); +} + +bool CCM::Bridge::set_dse_workloads(std::vector workloads, + bool is_kill /*= false */) { + // Ensure the workloads can be processed + if (workloads.empty()) { + throw BridgeException("No workloads to assign"); + } + // Determine if the cluster is currently active/up bool was_cluster_active = false; if (!is_cluster_down()) { - LOG("Stopping Active Cluster to Set Workload: " << dse_workloads_[workload] << " workload"); + LOG("Stopping Active Cluster to Set Workload: " << + generate_dse_workloads(workloads) << " workload"); stop_cluster(is_kill); was_cluster_active = true; } @@ -1139,12 +1232,13 @@ bool CCM::Bridge::set_dse_workload(DseWorkload workload, bool is_kill /*= false // Iterate over each node and set the DSE workload ClusterStatus status = cluster_status(); for (unsigned int i = 1; i <= status.node_count; ++i) { - set_dse_workload(i, workload); + set_dse_workloads(i, workloads, false); } // Determine if the cluster should be restarted if (was_cluster_active) { - LOG("Restarting Cluster to Apply Workload: " << dse_workloads_[workload] << " workload"); + LOG("Restarting Cluster to Apply Workload: " << + generate_dse_workloads(workloads) << " workload"); start_cluster(); } @@ -1646,39 +1740,42 @@ std::vector CCM::Bridge::generate_create_updateconf_command(CassVer // Create the update configuration command (common updates) std::vector updateconf_command; updateconf_command.push_back("updateconf"); - updateconf_command.push_back("--rt=10000"); - updateconf_command.push_back("read_request_timeout_in_ms:10000"); - updateconf_command.push_back("write_request_timeout_in_ms:10000"); - updateconf_command.push_back("request_timeout_in_ms:10000"); - updateconf_command.push_back("phi_convict_threshold:16"); - updateconf_command.push_back("hinted_handoff_enabled:false"); - updateconf_command.push_back("dynamic_snitch_update_interval_in_ms:1000"); - updateconf_command.push_back("native_transport_max_threads:1"); - updateconf_command.push_back("rpc_min_threads:1"); - updateconf_command.push_back("rpc_max_threads:1"); - updateconf_command.push_back("concurrent_reads:2"); - updateconf_command.push_back("concurrent_writes:2"); - updateconf_command.push_back("concurrent_compactors:1"); - updateconf_command.push_back("compaction_throughput_mb_per_sec:0"); - updateconf_command.push_back("key_cache_size_in_mb:0"); - updateconf_command.push_back("key_cache_save_period:0"); - updateconf_command.push_back("memtable_flush_writers:1"); - updateconf_command.push_back("max_hints_delivery_threads:1"); - - // Create Cassandra version specific updates (C* v1.2.x) - if (cassandra_version < "2.0.0") { - updateconf_command.push_back("reduce_cache_sizes_at:0"); - updateconf_command.push_back("reduce_cache_capacity_to:0"); - updateconf_command.push_back("flush_largest_memtables_at:0"); - updateconf_command.push_back("index_interval:512"); - } else { - updateconf_command.push_back("cas_contention_timeout_in_ms:10000"); - updateconf_command.push_back("file_cache_size_in_mb:0"); - } + // Disable optimizations (limits) when using DSE + if (!use_dse_) { + updateconf_command.push_back("--rt=10000"); + updateconf_command.push_back("read_request_timeout_in_ms:10000"); + updateconf_command.push_back("write_request_timeout_in_ms:10000"); + updateconf_command.push_back("request_timeout_in_ms:10000"); + updateconf_command.push_back("phi_convict_threshold:16"); + updateconf_command.push_back("hinted_handoff_enabled:false"); + updateconf_command.push_back("dynamic_snitch_update_interval_in_ms:1000"); + updateconf_command.push_back("native_transport_max_threads:1"); + updateconf_command.push_back("rpc_min_threads:1"); + updateconf_command.push_back("rpc_max_threads:1"); + updateconf_command.push_back("concurrent_reads:2"); + updateconf_command.push_back("concurrent_writes:2"); + updateconf_command.push_back("concurrent_compactors:1"); + updateconf_command.push_back("compaction_throughput_mb_per_sec:0"); + updateconf_command.push_back("key_cache_size_in_mb:0"); + updateconf_command.push_back("key_cache_save_period:0"); + updateconf_command.push_back("memtable_flush_writers:1"); + updateconf_command.push_back("max_hints_delivery_threads:1"); + + // Create Cassandra version specific updates (C* v1.2.x) + if (cassandra_version < "2.0.0") { + updateconf_command.push_back("reduce_cache_sizes_at:0"); + updateconf_command.push_back("reduce_cache_capacity_to:0"); + updateconf_command.push_back("flush_largest_memtables_at:0"); + updateconf_command.push_back("index_interval:512"); + } else { + updateconf_command.push_back("cas_contention_timeout_in_ms:10000"); + updateconf_command.push_back("file_cache_size_in_mb:0"); + } - // Create Cassandra version specific updates (C* < v2.1) - if (cassandra_version < "2.1.0") { - updateconf_command.push_back("in_memory_compaction_limit_in_mb:1"); + // Create Cassandra version specific updates (C* < v2.1) + if (cassandra_version < "2.1.0") { + updateconf_command.push_back("in_memory_compaction_limit_in_mb:1"); + } } // Create Cassandra version specific updated (C* 2.2+) @@ -1694,6 +1791,18 @@ std::vector CCM::Bridge::generate_create_updateconf_command(CassVer return updateconf_command; } +std::string CCM::Bridge::generate_dse_workloads(std::vector workloads) { + std::string dse_workloads; + for (std::vector::iterator iterator = workloads.begin(); + iterator != workloads.end(); ++iterator) { + dse_workloads += dse_workloads_[*iterator]; + if ((iterator + 1) != workloads.end()) { + dse_workloads += ","; + } + } + return dse_workloads; +} + std::string CCM::Bridge::generate_node_name(unsigned int node) { std::stringstream node_name; node_name << "node" << node; diff --git a/test/ccm_bridge/src/bridge.hpp b/test/ccm_bridge/src/bridge.hpp index dbf0e1bae..393631591 100644 --- a/test/ccm_bridge/src/bridge.hpp +++ b/test/ccm_bridge/src/bridge.hpp @@ -43,10 +43,10 @@ typedef struct _LIBSSH2_CHANNEL LIBSSH2_CHANNEL; // Default values #define DEFAULT_CASSANDRA_VERSION CassVersion("3.7") -#define DEFAULT_DSE_VERSION DseVersion("4.8.8") +#define DEFAULT_DSE_VERSION DseVersion("5.0.2") #define DEFAULT_USE_GIT false +#define DEFAULT_USE_INSTALL_DIR false #define DEFAULT_USE_DSE false -#define DEFAULT_DSE_WORKLOAD DSE_WORKLOAD_CASSANDRA #define DEFAULT_CLUSTER_PREFIX "cpp-driver" #define DEFAULT_DSE_CREDENTIALS DseCredentialsType::USERNAME_PASSWORD #define DEFAULT_DEPLOYMENT DeploymentType::LOCAL @@ -131,6 +131,11 @@ namespace CCM { class Bridge { public: + /** + * Default DSE workload to apply (Cassandra) + */ + static const std::vector DEFAULT_DSE_WORKLOAD; + /** * Constructor * @@ -145,10 +150,15 @@ namespace CCM { * @param branch_tag Branch/Tag to use when use_git is enabled * (default: Empty). This value is independent of the * version specified. + * @param use_install_dir True if CCM should use a particular installation + * directory; false otherwise + * (default: DEAFAULT_USE_INSTALL_DIR) + * @param install_dir Installation directory to use when use_install_dir is + * enabled (default: Empty) * @param use_dse True if CCM should load DSE for provided version; false * otherwise (default: DEFAULT_USE_DSE) * @param dse_workload DSE workload to utilize - * (default: DSE_WORKLOAD_CASSANDRA) + * (default: DEFAULT_DSE_WORKLOAD) * @param cluster_prefix Prefix to use when creating a cluster name * (default: DEFAULT_CLUSTER_PREFIX) * @param dse_credentials_type Username|Password/INI file credentials @@ -178,8 +188,10 @@ namespace CCM { Bridge(CassVersion cassandra_version = DEFAULT_CASSANDRA_VERSION, bool use_git = DEFAULT_USE_GIT, const std::string& branch_tag = "", + bool use_install_dir = DEFAULT_USE_INSTALL_DIR, + const std::string& install_dir = "", bool use_dse = DEFAULT_USE_DSE, - DseWorkload dse_workload = DSE_WORKLOAD_CASSANDRA, + std::vector dse_workload = DEFAULT_DSE_WORKLOAD, const std::string& cluster_prefix = DEFAULT_CLUSTER_PREFIX, DseCredentialsType dse_credentials_type = DEFAULT_DSE_CREDENTIALS, const std::string& dse_username = "", @@ -437,6 +449,13 @@ namespace CCM { */ void disable_node_gossip(unsigned int node); + /** + * Disable trace for a node on the active Cassandra cluster + * + * @param node Node to disable tracing + */ + void disable_node_trace(unsigned int node); + /** * Enable binary protocol for a node on the active Cassandra cluster * @@ -451,6 +470,13 @@ namespace CCM { */ void enable_node_gossip(unsigned int node); + /** + * Enable trace for a node on the active Cassandra cluster + * + * @param node Node to enable tracing + */ + void enable_node_trace(unsigned int node); + /** * Execute a CQL statement on a particular node * @@ -578,6 +604,21 @@ namespace CCM { */ bool set_dse_workload(unsigned int node, DseWorkload workload, bool is_kill = false); + /** + * Set the DSE workloads on a node + * + * NOTE: This operation should be performed before starting the node; + * otherwise the node will be stopped and restarted + * + * @param node Node to set DSE workload on + * @param workloads Workloads to be set + * @param is_kill True if forced termination requested; false otherwise + * (default: false) + * @return True if node was restarted; false otherwise + */ + bool set_dse_workloads(unsigned int node, std::vector workloads, + bool is_kill = false); + /** * Set the DSE workload on the cluster * @@ -591,6 +632,20 @@ namespace CCM { */ bool set_dse_workload(DseWorkload workload, bool is_kill = false); + /** + * Set the DSE workloads on the cluster + * + * NOTE: This operation should be performed before starting the cluster; + * otherwise the cluster will be stopped and restarted + * + * @param workloads Workloads to be set + * @param is_kill True if forced termination requested; false otherwise + * (default: false) + * @return True if cluster was restarted; false otherwise + */ + bool set_dse_workloads(std::vector workloads, + bool is_kill = false); + /** * Check to see if a node has been decommissioned * @@ -636,14 +691,25 @@ namespace CCM { * Branch/Tag to retrieve from ASF/GitHub */ std::string branch_tag_; + /** + * Flag to determine if installation directory should be used (passed to + * CCM) + */ + bool use_install_dir_; + /** + * Installation directory to pass to CCM + */ + std::string install_dir_; /** * Flag to determine if DSE is being used */ bool use_dse_; /** * Workload to apply to the DSE cluster + * + * NOTE: Multiple workloads will be applied simultaneously via CCM */ - DseWorkload dse_workload_; + std::vector dse_workload_; /** * Cluster prefix to apply to cluster name during create command */ @@ -853,6 +919,15 @@ namespace CCM { */ std::vector generate_create_updateconf_command(CassVersion cassandra_version); + /** + * Generate the command separated list for have a single or multiple + * workloads for the CCM setworkload command + * + * @param workloads Workloads to be set + * @return String representing the workloads for the setworkload command + */ + std::string generate_dse_workloads(std::vector workloads); + /** * Get the next available node * diff --git a/test/integration_tests/src/test_timestamps.cpp b/test/integration_tests/src/test_timestamps.cpp index 8a0586312..8a1fc533c 100644 --- a/test/integration_tests/src/test_timestamps.cpp +++ b/test/integration_tests/src/test_timestamps.cpp @@ -17,7 +17,6 @@ #include "test_utils.hpp" #include "cassandra.h" -#include "external_types.hpp" #include "timestamp_generator.hpp" #include "get_time.hpp" diff --git a/test/unit_tests/src/test_load_balancing.cpp b/test/unit_tests/src/test_load_balancing.cpp index fa897ec13..a320b2b28 100644 --- a/test/unit_tests/src/test_load_balancing.cpp +++ b/test/unit_tests/src/test_load_balancing.cpp @@ -25,6 +25,7 @@ #include "loop_thread.hpp" #include "murmur3.hpp" #include "query_request.hpp" +#include "request_handler.hpp" #include "token_aware_policy.hpp" #include "whitelist_policy.hpp" #include "blacklist_policy.hpp" @@ -161,17 +162,17 @@ BOOST_AUTO_TEST_CASE(simple) { policy.init(cass::SharedRefPtr(), hosts, NULL); // start on first elem - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq1[] = {1, 2}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); // rotate starting element - boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL)); const size_t seq2[] = {2, 1}; verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2)); // back around - boost::scoped_ptr qp3(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp3(policy.new_query_plan("ks", NULL, NULL)); verify_sequence(qp3.get(), VECTOR_FROM(size_t, seq1)); } @@ -184,7 +185,7 @@ BOOST_AUTO_TEST_CASE(on_add) policy.init(cass::SharedRefPtr(), hosts, NULL); // baseline - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq1[] = {1, 2}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); @@ -193,7 +194,7 @@ BOOST_AUTO_TEST_CASE(on_add) cass::SharedRefPtr host = host_for_addr(addr_new); policy.on_add(host); - boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL)); const size_t seq2[] = {2, seq_new, 1}; verify_sequence(qp2.get(), VECTOR_FROM(size_t, seq2)); } @@ -206,11 +207,11 @@ BOOST_AUTO_TEST_CASE(on_remove) cass::RoundRobinPolicy policy; policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); cass::SharedRefPtr host = hosts.begin()->second; policy.on_remove(host); - boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp2(policy.new_query_plan("ks", NULL, NULL)); // first query plan has it // (note: not manipulating Host::state_ for dynamic removal) @@ -230,8 +231,8 @@ BOOST_AUTO_TEST_CASE(on_down_on_up) cass::RoundRobinPolicy policy; policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp_before1(policy.new_query_plan("ks", NULL, NULL, NULL)); - boost::scoped_ptr qp_before2(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp_before1(policy.new_query_plan("ks", NULL, NULL)); + boost::scoped_ptr qp_before2(policy.new_query_plan("ks", NULL, NULL)); cass::SharedRefPtr host = hosts.begin()->second; policy.on_down(host); @@ -252,8 +253,8 @@ BOOST_AUTO_TEST_CASE(on_down_on_up) // host is added to the list, but not 'up' policy.on_up(host); - boost::scoped_ptr qp_after1(policy.new_query_plan("ks", NULL, NULL, NULL)); - boost::scoped_ptr qp_after2(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp_after1(policy.new_query_plan("ks", NULL, NULL)); + boost::scoped_ptr qp_after2(policy.new_query_plan("ks", NULL, NULL)); // 1 is dynamically excluded from plan { @@ -284,7 +285,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) { const size_t total_hosts = local_count + remote_count; - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); std::vector seq(total_hosts); for (size_t i = 0; i < total_hosts; ++i) seq[i] = i + 1; verify_sequence(qp.get(), seq); @@ -308,7 +309,7 @@ BOOST_AUTO_TEST_CASE(some_dc_local_unspecified) cass::DCAwarePolicy policy(LOCAL_DC, 1, false); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = {2, 3, 1}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); @@ -324,10 +325,10 @@ BOOST_AUTO_TEST_CASE(single_local_down) cass::DCAwarePolicy policy(LOCAL_DC, 1, false); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan + boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL));// has down host ptr in plan target_host->set_down(); policy.on_down(target_host); - boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL, NULL));// should not have down host ptr in plan + boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL));// should not have down host ptr in plan { const size_t seq[] = {2, 3, 4}; @@ -350,10 +351,10 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned) cass::DCAwarePolicy policy(LOCAL_DC, 1, false); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan + boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL));// has down host ptr in plan target_host->set_down(); policy.on_down(target_host); - boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL, NULL));// should not have down host ptr in plan + boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL));// should not have down host ptr in plan { const size_t seq[] = {2}; @@ -365,7 +366,7 @@ BOOST_AUTO_TEST_CASE(all_local_removed_returned) policy.on_up(target_host); // make sure we get the local node first after on_up - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); { const size_t seq[] = {1, 2}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); @@ -383,10 +384,10 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned) cass::DCAwarePolicy policy(LOCAL_DC, 1, false); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL, NULL));// has down host ptr in plan + boost::scoped_ptr qp_before(policy.new_query_plan("ks", NULL, NULL));// has down host ptr in plan target_host->set_down(); policy.on_down(target_host); - boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL, NULL));// should not have down host ptr in plan + boost::scoped_ptr qp_after(policy.new_query_plan("ks", NULL, NULL));// should not have down host ptr in plan { const size_t seq[] = {1}; @@ -398,7 +399,7 @@ BOOST_AUTO_TEST_CASE(remote_removed_returned) policy.on_up(target_host); // make sure we get both nodes, correct order after - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); { const size_t seq[] = {1, 2}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); @@ -415,7 +416,7 @@ BOOST_AUTO_TEST_CASE(used_hosts_per_remote_dc) cass::DCAwarePolicy policy(LOCAL_DC, used_hosts, false); policy.init(cass::SharedRefPtr(), hosts, NULL); - cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); size_t total_hosts = 3 + used_hosts; std::vector seq(total_hosts); for (size_t i = 0; i < total_hosts; ++i) seq[i] = i + 1; @@ -438,9 +439,11 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) // Set local CL cass::SharedRefPtr request(new cass::QueryRequest()); request->set_consistency(CASS_CONSISTENCY_LOCAL_ONE); + cass::SharedRefPtr request_handler( + new cass::RequestHandler(request.get(), NULL, NULL)); // Check for only local hosts are used - cass::ScopedPtr qp(policy.new_query_plan("ks", request.get(), NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); const size_t seq[] = {1, 2, 3}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -454,9 +457,11 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) // Set local CL cass::SharedRefPtr request(new cass::QueryRequest()); request->set_consistency(CASS_CONSISTENCY_LOCAL_QUORUM); + cass::SharedRefPtr request_handler( + new cass::RequestHandler(request.get(), NULL, NULL)); // Check for only local hosts are used - cass::ScopedPtr qp(policy.new_query_plan("ks", request.get(), NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); const size_t seq[] = {1, 2, 3, 4, 5, 6}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -473,7 +478,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc) cass::DCAwarePolicy policy("", 0, false); policy.init(hosts[cass::Address("2.0.0.0", 9042)], hosts, NULL); - cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = {2, 3, 4}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -484,7 +489,7 @@ BOOST_AUTO_TEST_CASE(start_with_empty_local_dc) policy.init(cass::SharedRefPtr( new cass::Host(cass::Address("0.0.0.0", 9042), false)), hosts, NULL); - cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = {1}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -527,9 +532,11 @@ BOOST_AUTO_TEST_CASE(simple) const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887 request->set(0, cass::CassString(value, strlen(value))); request->add_key_index(0); + cass::SharedRefPtr request_handler( + new cass::RequestHandler(request.get(), NULL, NULL)); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 4, 1, 2, 3 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -539,7 +546,7 @@ BOOST_AUTO_TEST_CASE(simple) curr_host_it->second->set_down(); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 2, 4, 3 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -552,7 +559,7 @@ BOOST_AUTO_TEST_CASE(simple) curr_host_it->second->set_down(); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 2, 1, 3 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -605,9 +612,11 @@ BOOST_AUTO_TEST_CASE(network_topology) const char* value = "abc"; // hash: -5434086359492102041 request->set(0, cass::CassString(value, strlen(value))); request->add_key_index(0); + cass::SharedRefPtr request_handler( + new cass::RequestHandler(request.get(), NULL, NULL)); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 3, 5, 7, 1, 4, 6, 2 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -617,7 +626,7 @@ BOOST_AUTO_TEST_CASE(network_topology) curr_host_it->second->set_down(); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 3, 5, 7, 6, 2, 4 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -629,7 +638,7 @@ BOOST_AUTO_TEST_CASE(network_topology) curr_host_it->second->set_down(); { - cass::ScopedPtr qp(policy.new_query_plan("test", request.get(), token_map.get(), NULL)); + cass::ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); const size_t seq[] = { 5, 7, 1, 2, 4, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -739,7 +748,7 @@ BOOST_AUTO_TEST_CASE(simple) // 1 and 4 are under the minimum, but 2 and 3 will be skipped { - cass::ScopedPtr qp(policy.new_query_plan("", NULL, NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("", NULL, NULL)); const size_t seq1[] = {1, 4, 2, 3}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); } @@ -749,7 +758,7 @@ BOOST_AUTO_TEST_CASE(simple) // After waiting no hosts should be skipped (notice 2 and 3 tried first) { - cass::ScopedPtr qp(policy.new_query_plan("", NULL, NULL, NULL)); + cass::ScopedPtr qp(policy.new_query_plan("", NULL, NULL)); const size_t seq1[] = {2, 3, 4, 1}; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq1)); } @@ -804,7 +813,7 @@ BOOST_AUTO_TEST_CASE(simple) cass::WhitelistPolicy policy(new cass::RoundRobinPolicy(), whitelist_hosts); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); // Verify only hosts 37 and 83 are computed in the query plan const size_t seq1[] = { 37, 83 }; @@ -826,7 +835,7 @@ BOOST_AUTO_TEST_CASE(dc) cass::WhitelistDCPolicy policy(new cass::RoundRobinPolicy(), whitelist_dcs); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); // Verify only hosts LOCAL_DC and REMOTE_DC are computed in the query plan const size_t seq1[] = { 1, 2, 3, 7, 8, 9 }; @@ -851,7 +860,7 @@ BOOST_AUTO_TEST_CASE(simple) cass::BlacklistPolicy policy(new cass::RoundRobinPolicy(), blacklist_hosts); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); // Verify only hosts 1, 4 and 5 are computed in the query plan const size_t seq1[] = { 1, 4, 5 }; @@ -873,7 +882,7 @@ BOOST_AUTO_TEST_CASE(dc) cass::BlacklistDCPolicy policy(new cass::RoundRobinPolicy(), blacklist_dcs); policy.init(cass::SharedRefPtr(), hosts, NULL); - boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL, NULL)); + boost::scoped_ptr qp(policy.new_query_plan("ks", NULL, NULL)); // Verify only hosts from BACKUP_DC are computed in the query plan const size_t seq1[] = { 4, 5, 6 };