From 5a7086bea40d05525177b0f4bddfa50006e0a402 Mon Sep 17 00:00:00 2001 From: Charles Bracher Date: Thu, 23 Jan 2014 18:28:07 +0000 Subject: [PATCH 1/4] minor fix to protect priomise from being set twice. Seems like this is what was intended. --- include/cql/internal/cql_promise.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/cql/internal/cql_promise.hpp b/include/cql/internal/cql_promise.hpp index d2320df28..52c8236cd 100644 --- a/include/cql/internal/cql_promise.hpp +++ b/include/cql/internal/cql_promise.hpp @@ -32,6 +32,7 @@ class cql_promise_t { { boost::mutex::scoped_lock lock(*_mutex); if (!*_value_set) { + *_value_set = true; _promise->set_value(value); return true; } @@ -44,6 +45,7 @@ class cql_promise_t { { boost::mutex::scoped_lock lock(*_mutex); if (!*_value_set) { + *_value_set = true; _promise->set_exception(exception); return true; } From 693159476d2f6de0272fbe9403cf14116d8f7871 Mon Sep 17 00:00:00 2001 From: Charles Bracher Date: Wed, 29 Jan 2014 00:22:03 +0000 Subject: [PATCH 2/4] BE-260 looking to address two issues: cql::cql_session_impl_t::cql_connections_collection_t* cql::cql_session_impl_t::add_to_connection_pool passes out an unlocked/unprotected pointer to cql_connections_collection_t in cql_session_impl_t::connect adding in a mutex above that, since this pointer could become invalidated by activity in other threads. Also, callbacks on bad connections in cql_session_impl_t::allocate_connection were resulting in many valgrind errors. Holding on to these in a dump list. That will introduce some data leakage, but can come back to make it clear out say based on some time interval. These valgrind error look to have been associated with segfaults. Note that all of these issues surfaced in an env where the cassandra servers were going up and down. That is where these issues came to light for us. --- include/cql/internal/cql_connection_impl.hpp | 26 ++++++++----- src/cql/internal/cql_session_impl.cpp | 40 +++++++++++++++++++- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/include/cql/internal/cql_connection_impl.hpp b/include/cql/internal/cql_connection_impl.hpp index 4635cbe0a..3822bc9df 100644 --- a/include/cql/internal/cql_connection_impl.hpp +++ b/include/cql/internal/cql_connection_impl.hpp @@ -1068,17 +1068,23 @@ class cql_connection_impl_t : public cql::cql_connection_t inline void check_transport_err(const boost::system::error_code& err) { - if (!_closing && !_transport->lowest_layer().is_open()) { - _ready = false; - _defunct = true; - } + try + { + if (!_closing && !_transport->lowest_layer().is_open()) { + _ready = false; + _defunct = true; + } - if (_connect_errback && !_closing) { - cql::cql_error_t e; - e.transport = true; - e.code = err.value(); - e.message = err.message(); - _connect_errback(*this, e); + if (_connect_errback && !_closing) { + cql::cql_error_t e; + e.transport = true; + e.code = err.value(); + e.message = err.message(); + _connect_errback(*this, e); + } + } catch (std::exception& ex) + { + std::cout << "in check_transport_err caught: " << ex.what() << std::endl; } } diff --git a/src/cql/internal/cql_session_impl.cpp b/src/cql/internal/cql_session_impl.cpp index a7b4cbe66..2f327f922 100644 --- a/src/cql/internal/cql_session_impl.cpp +++ b/src/cql/internal/cql_session_impl.cpp @@ -31,6 +31,42 @@ #include "cql/internal/cql_socket.hpp" #include "cql/internal/cql_connection_impl.hpp" +namespace +{ + class endless_dump_t + { + public: + + void store_connection(boost::shared_ptr bad_conn, + boost::shared_ptr > promise) + { + MyLock lock(_mutex); + if (bad_conn) + { + bad_conn->close(); + _bad_conns.push_back(bad_conn_data(bad_conn, promise)); + } + } + + private: + typedef boost::unique_lock MyLock; + boost::mutex _mutex; + struct bad_conn_data + { + bad_conn_data( boost::shared_ptr conn, + boost::shared_ptr > promise) + : _conn(conn), + _promise(promise) + { + } + + boost::shared_ptr _conn; + boost::shared_ptr > _promise; + }; + std::list< bad_conn_data > _bad_conns; + }; + endless_dump_t endless_dump; +} cql::cql_session_impl_t::cql_session_impl_t( const cql_session_callback_info_t& callbacks, @@ -227,9 +263,9 @@ cql::cql_session_impl_t::allocate_connection( if (shared_future.get().error.is_err()) { decrease_connection_counter(host); + endless_dump.store_connection(connection, promise); throw cql_connection_allocation_error( ("Error when connecting to host: " + host->endpoint().to_string()).c_str()); - connection.reset(); } return connection; @@ -320,6 +356,8 @@ cql::cql_session_impl_t::connect( continue; } + boost::recursive_mutex::scoped_lock lock(_mutex); + cql_endpoint_t host_address = host->endpoint(); tried_hosts->push_back(host_address); cql_connections_collection_t* connections = add_to_connection_pool(host_address); From 548f83b3c6bf917d51fc23d62c1281b25fd2172a Mon Sep 17 00:00:00 2001 From: Charles Bracher Date: Thu, 6 Mar 2014 21:13:17 +0000 Subject: [PATCH 3/4] added in changes to support our own dca load balancing policy with simple selection of the local datacenter. --- include/cql/cql_builder.hpp | 21 ++++++++++++++++++++- src/cql/cql_builder.cpp | 4 ++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/include/cql/cql_builder.hpp b/include/cql/cql_builder.hpp index 21a1e9eeb..d02caa07d 100644 --- a/include/cql/cql_builder.hpp +++ b/include/cql/cql_builder.hpp @@ -252,7 +252,7 @@ class cql_pooling_options_t class cql_policies_t { public: cql_policies_t() : - _load_balancing_policy(new cql_round_robin_policy_t()), + _load_balancing_policy(default_load_balancing_policy()), _reconnection_policy( new cql_exponential_reconnection_policy_t( boost::posix_time::seconds(1), // base dealy: @@ -288,6 +288,21 @@ class cql_policies_t { return _retry_policy; } + static boost::shared_ptr + default_load_balancing_policy() + { + boost::unique_lock lock(_mutex); + boost::shared_ptr retVal = _default_load_balancing_policy; + return retVal; + } + + static void + default_load_balancing_policy(boost::shared_ptr policy) + { + boost::unique_lock lock(_mutex); + _default_load_balancing_policy = policy; + } + private: friend class cql_configuration_t; @@ -301,6 +316,10 @@ class cql_policies_t { boost::shared_ptr _load_balancing_policy; boost::shared_ptr _reconnection_policy; boost::shared_ptr _retry_policy; + + static boost::shared_ptr _default_load_balancing_policy; + + static boost::mutex _mutex; }; class cql_configuration_t diff --git a/src/cql/cql_builder.cpp b/src/cql/cql_builder.cpp index 407492f59..a69fbc1b7 100644 --- a/src/cql/cql_builder.cpp +++ b/src/cql/cql_builder.cpp @@ -20,6 +20,10 @@ #include "cql/cql_builder.hpp" #include "cql/internal/cql_util.hpp" +boost::shared_ptr + cql::cql_policies_t::_default_load_balancing_policy(new cql::cql_round_robin_policy_t()); +boost::mutex cql::cql_policies_t::_mutex; + cql::cql_builder_t& cql::cql_builder_t::add_contact_point( const ::boost::asio::ip::address& address) From be2dde0f4c4dc89383bfd4b362585cf462ada7bd Mon Sep 17 00:00:00 2001 From: Charles Bracher Date: Thu, 13 Mar 2014 23:26:57 +0000 Subject: [PATCH 4/4] BE-494 - need to be able to refresh hosts setting for datacenter (and rack, though not really using it at the moment). Need this for our own dca aware round robin which is managed by comparing the datacenter names. We can drop all of this once we get a real dac aware option from the datastax client. The issue being addressed here is that sometimes the datacenter and rack were coming up with their default values "unset", which could make all of the hosts look remote. Now we can detect this "unknown" case and refresh the hosts if needed. Note that it is difficult to see if this is safe to do during execution. Seems like it should be since cql_control_connection is responding to hosts being added and dropped from the cluster. --- CMakeLists.txt | 4 ++++ extra/ccm_bridge/config.txt | 8 ++++---- include/cql/cql_cluster.hpp | 5 +++++ include/cql/cql_host.hpp | 20 +++++++++++++++---- include/cql/internal/cql_cluster_impl.hpp | 8 ++++++++ .../cql/internal/cql_control_connection.hpp | 6 +++--- src/cql/cql_host.cpp | 1 + 7 files changed, 41 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e459c3b06..194c15806 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,10 @@ set(PROJECT_VERSION_STRING ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${P # todo: add version header +if ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") + add_definitions (-D_DEBUG=1 -O0 -g) +endif() + #------------------- # setup deps #------------------- diff --git a/extra/ccm_bridge/config.txt b/extra/ccm_bridge/config.txt index 19125c657..a88bff857 100644 --- a/extra/ccm_bridge/config.txt +++ b/extra/ccm_bridge/config.txt @@ -5,16 +5,16 @@ # IP_PREFIX=192.168.13. -CASSANDRA_VERSION=1.2.5 +CASSANDRA_VERSION=1.2.8 # Addresses MUST be in IP form # Amazon # SSH_HOST=54.224.125.58 -SSH_HOST=127.0.0.1 +SSH_HOST=192.168.13.1 SSH_PORT=22 -SSH_USERNAME=!mc -SSH_PASSWORD=passpass +SSH_USERNAME=user +SSH_PASSWORD=password # # Use NO or YES for binary options diff --git a/include/cql/cql_cluster.hpp b/include/cql/cql_cluster.hpp index 43a4ffba5..ab0a53a2c 100644 --- a/include/cql/cql_cluster.hpp +++ b/include/cql/cql_cluster.hpp @@ -54,6 +54,11 @@ class CQL_EXPORT cql_cluster_t: boost::noncopyable { virtual boost::shared_ptr metadata() const = 0; + // refresh hosts. generally managed internally, but if a host is stuck on having + // datacenter or rack with the initialized but unset value "unknown", then this + // call may be able to fix it. + virtual void refresh_hosts() = 0; + virtual inline ~cql_cluster_t() { } }; diff --git a/include/cql/cql_host.hpp b/include/cql/cql_host.hpp index 8fed53361..663d87de4 100644 --- a/include/cql/cql_host.hpp +++ b/include/cql/cql_host.hpp @@ -44,16 +44,27 @@ namespace cql { return _endpoint; } - inline const std::string& + inline const std::string datacenter() const { - return _datacenter; + boost::unique_lock lock(_mutex); + std::string retVal = _datacenter.c_str(); // dont share buffer across threads + return retVal; } - inline const std::string& + // avoid string allocation + inline bool in_datacenter(const std::string& datacenter) const + { + boost::unique_lock lock(_mutex); + return _datacenter == datacenter; + } + + inline const std::string rack() const { - return _rack; + boost::unique_lock lock(_mutex); + std::string retVal = _rack.c_str(); // dont share buffer across threads + return retVal; } bool @@ -90,6 +101,7 @@ namespace cql { boost::posix_time::ptime _next_up_time; boost::shared_ptr _reconnection_policy; boost::shared_ptr _reconnection_schedule; + mutable boost::mutex _mutex; }; } diff --git a/include/cql/internal/cql_cluster_impl.hpp b/include/cql/internal/cql_cluster_impl.hpp index 1aee8e8dc..3fd8a1752 100644 --- a/include/cql/internal/cql_cluster_impl.hpp +++ b/include/cql/internal/cql_cluster_impl.hpp @@ -191,6 +191,14 @@ class cql_cluster_impl_t : return _metadata; } + virtual void refresh_hosts() + { + if (_control_connection) + { + _control_connection->refresh_hosts(); + } + } + friend class cql_metadata_t; private: diff --git a/include/cql/internal/cql_control_connection.hpp b/include/cql/internal/cql_control_connection.hpp index bc9d33601..ff1b75d80 100644 --- a/include/cql/internal/cql_control_connection.hpp +++ b/include/cql/internal/cql_control_connection.hpp @@ -38,6 +38,9 @@ class cql_control_connection_t : virtual ~cql_control_connection_t(); + bool + refresh_hosts(); + private: void @@ -60,9 +63,6 @@ class cql_control_connection_t : setup_control_connection( bool refresh_only = false); - bool - refresh_hosts(); - void reconnection_callback( const boost::system::error_code& err); diff --git a/src/cql/cql_host.cpp b/src/cql/cql_host.cpp index f8b71f0f4..dfa1123b6 100644 --- a/src/cql/cql_host.cpp +++ b/src/cql/cql_host.cpp @@ -51,6 +51,7 @@ cql::cql_host_t::set_location_info( const std::string& datacenter, const std::string& rack ) { + boost::unique_lock lock(_mutex); _datacenter = datacenter; _rack = rack; }