diff --git a/CMakeLists.txt b/CMakeLists.txt index e459c3b06..194c15806 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,10 @@ set(PROJECT_VERSION_STRING ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${P # todo: add version header +if ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") + add_definitions (-D_DEBUG=1 -O0 -g) +endif() + #------------------- # setup deps #------------------- diff --git a/extra/ccm_bridge/config.txt b/extra/ccm_bridge/config.txt index 19125c657..a88bff857 100644 --- a/extra/ccm_bridge/config.txt +++ b/extra/ccm_bridge/config.txt @@ -5,16 +5,16 @@ # IP_PREFIX=192.168.13. -CASSANDRA_VERSION=1.2.5 +CASSANDRA_VERSION=1.2.8 # Addresses MUST be in IP form # Amazon # SSH_HOST=54.224.125.58 -SSH_HOST=127.0.0.1 +SSH_HOST=192.168.13.1 SSH_PORT=22 -SSH_USERNAME=!mc -SSH_PASSWORD=passpass +SSH_USERNAME=user +SSH_PASSWORD=password # # Use NO or YES for binary options diff --git a/include/cql/cql_builder.hpp b/include/cql/cql_builder.hpp index 21a1e9eeb..d02caa07d 100644 --- a/include/cql/cql_builder.hpp +++ b/include/cql/cql_builder.hpp @@ -252,7 +252,7 @@ class cql_pooling_options_t class cql_policies_t { public: cql_policies_t() : - _load_balancing_policy(new cql_round_robin_policy_t()), + _load_balancing_policy(default_load_balancing_policy()), _reconnection_policy( new cql_exponential_reconnection_policy_t( boost::posix_time::seconds(1), // base dealy: @@ -288,6 +288,21 @@ class cql_policies_t { return _retry_policy; } + static boost::shared_ptr + default_load_balancing_policy() + { + boost::unique_lock lock(_mutex); + boost::shared_ptr retVal = _default_load_balancing_policy; + return retVal; + } + + static void + default_load_balancing_policy(boost::shared_ptr policy) + { + boost::unique_lock lock(_mutex); + _default_load_balancing_policy = policy; + } + private: friend class cql_configuration_t; @@ -301,6 +316,10 @@ class cql_policies_t { boost::shared_ptr _load_balancing_policy; boost::shared_ptr _reconnection_policy; boost::shared_ptr _retry_policy; + + static boost::shared_ptr _default_load_balancing_policy; + + static boost::mutex _mutex; }; class cql_configuration_t diff --git a/include/cql/cql_cluster.hpp b/include/cql/cql_cluster.hpp index 43a4ffba5..ab0a53a2c 100644 --- a/include/cql/cql_cluster.hpp +++ b/include/cql/cql_cluster.hpp @@ -54,6 +54,11 @@ class CQL_EXPORT cql_cluster_t: boost::noncopyable { virtual boost::shared_ptr metadata() const = 0; + // refresh hosts. generally managed internally, but if a host is stuck on having + // datacenter or rack with the initialized but unset value "unknown", then this + // call may be able to fix it. + virtual void refresh_hosts() = 0; + virtual inline ~cql_cluster_t() { } }; diff --git a/include/cql/cql_host.hpp b/include/cql/cql_host.hpp index 8fed53361..663d87de4 100644 --- a/include/cql/cql_host.hpp +++ b/include/cql/cql_host.hpp @@ -44,16 +44,27 @@ namespace cql { return _endpoint; } - inline const std::string& + inline const std::string datacenter() const { - return _datacenter; + boost::unique_lock lock(_mutex); + std::string retVal = _datacenter.c_str(); // dont share buffer across threads + return retVal; } - inline const std::string& + // avoid string allocation + inline bool in_datacenter(const std::string& datacenter) const + { + boost::unique_lock lock(_mutex); + return _datacenter == datacenter; + } + + inline const std::string rack() const { - return _rack; + boost::unique_lock lock(_mutex); + std::string retVal = _rack.c_str(); // dont share buffer across threads + return retVal; } bool @@ -90,6 +101,7 @@ namespace cql { boost::posix_time::ptime _next_up_time; boost::shared_ptr _reconnection_policy; boost::shared_ptr _reconnection_schedule; + mutable boost::mutex _mutex; }; } diff --git a/include/cql/internal/cql_cluster_impl.hpp b/include/cql/internal/cql_cluster_impl.hpp index 1aee8e8dc..3fd8a1752 100644 --- a/include/cql/internal/cql_cluster_impl.hpp +++ b/include/cql/internal/cql_cluster_impl.hpp @@ -191,6 +191,14 @@ class cql_cluster_impl_t : return _metadata; } + virtual void refresh_hosts() + { + if (_control_connection) + { + _control_connection->refresh_hosts(); + } + } + friend class cql_metadata_t; private: diff --git a/include/cql/internal/cql_connection_impl.hpp b/include/cql/internal/cql_connection_impl.hpp index 4635cbe0a..3822bc9df 100644 --- a/include/cql/internal/cql_connection_impl.hpp +++ b/include/cql/internal/cql_connection_impl.hpp @@ -1068,17 +1068,23 @@ class cql_connection_impl_t : public cql::cql_connection_t inline void check_transport_err(const boost::system::error_code& err) { - if (!_closing && !_transport->lowest_layer().is_open()) { - _ready = false; - _defunct = true; - } + try + { + if (!_closing && !_transport->lowest_layer().is_open()) { + _ready = false; + _defunct = true; + } - if (_connect_errback && !_closing) { - cql::cql_error_t e; - e.transport = true; - e.code = err.value(); - e.message = err.message(); - _connect_errback(*this, e); + if (_connect_errback && !_closing) { + cql::cql_error_t e; + e.transport = true; + e.code = err.value(); + e.message = err.message(); + _connect_errback(*this, e); + } + } catch (std::exception& ex) + { + std::cout << "in check_transport_err caught: " << ex.what() << std::endl; } } diff --git a/include/cql/internal/cql_control_connection.hpp b/include/cql/internal/cql_control_connection.hpp index bc9d33601..ff1b75d80 100644 --- a/include/cql/internal/cql_control_connection.hpp +++ b/include/cql/internal/cql_control_connection.hpp @@ -38,6 +38,9 @@ class cql_control_connection_t : virtual ~cql_control_connection_t(); + bool + refresh_hosts(); + private: void @@ -60,9 +63,6 @@ class cql_control_connection_t : setup_control_connection( bool refresh_only = false); - bool - refresh_hosts(); - void reconnection_callback( const boost::system::error_code& err); diff --git a/include/cql/internal/cql_promise.hpp b/include/cql/internal/cql_promise.hpp index d2320df28..52c8236cd 100644 --- a/include/cql/internal/cql_promise.hpp +++ b/include/cql/internal/cql_promise.hpp @@ -32,6 +32,7 @@ class cql_promise_t { { boost::mutex::scoped_lock lock(*_mutex); if (!*_value_set) { + *_value_set = true; _promise->set_value(value); return true; } @@ -44,6 +45,7 @@ class cql_promise_t { { boost::mutex::scoped_lock lock(*_mutex); if (!*_value_set) { + *_value_set = true; _promise->set_exception(exception); return true; } diff --git a/src/cql/cql_builder.cpp b/src/cql/cql_builder.cpp index 407492f59..a69fbc1b7 100644 --- a/src/cql/cql_builder.cpp +++ b/src/cql/cql_builder.cpp @@ -20,6 +20,10 @@ #include "cql/cql_builder.hpp" #include "cql/internal/cql_util.hpp" +boost::shared_ptr + cql::cql_policies_t::_default_load_balancing_policy(new cql::cql_round_robin_policy_t()); +boost::mutex cql::cql_policies_t::_mutex; + cql::cql_builder_t& cql::cql_builder_t::add_contact_point( const ::boost::asio::ip::address& address) diff --git a/src/cql/cql_host.cpp b/src/cql/cql_host.cpp index f8b71f0f4..dfa1123b6 100644 --- a/src/cql/cql_host.cpp +++ b/src/cql/cql_host.cpp @@ -51,6 +51,7 @@ cql::cql_host_t::set_location_info( const std::string& datacenter, const std::string& rack ) { + boost::unique_lock lock(_mutex); _datacenter = datacenter; _rack = rack; } diff --git a/src/cql/internal/cql_session_impl.cpp b/src/cql/internal/cql_session_impl.cpp index a7b4cbe66..2f327f922 100644 --- a/src/cql/internal/cql_session_impl.cpp +++ b/src/cql/internal/cql_session_impl.cpp @@ -31,6 +31,42 @@ #include "cql/internal/cql_socket.hpp" #include "cql/internal/cql_connection_impl.hpp" +namespace +{ + class endless_dump_t + { + public: + + void store_connection(boost::shared_ptr bad_conn, + boost::shared_ptr > promise) + { + MyLock lock(_mutex); + if (bad_conn) + { + bad_conn->close(); + _bad_conns.push_back(bad_conn_data(bad_conn, promise)); + } + } + + private: + typedef boost::unique_lock MyLock; + boost::mutex _mutex; + struct bad_conn_data + { + bad_conn_data( boost::shared_ptr conn, + boost::shared_ptr > promise) + : _conn(conn), + _promise(promise) + { + } + + boost::shared_ptr _conn; + boost::shared_ptr > _promise; + }; + std::list< bad_conn_data > _bad_conns; + }; + endless_dump_t endless_dump; +} cql::cql_session_impl_t::cql_session_impl_t( const cql_session_callback_info_t& callbacks, @@ -227,9 +263,9 @@ cql::cql_session_impl_t::allocate_connection( if (shared_future.get().error.is_err()) { decrease_connection_counter(host); + endless_dump.store_connection(connection, promise); throw cql_connection_allocation_error( ("Error when connecting to host: " + host->endpoint().to_string()).c_str()); - connection.reset(); } return connection; @@ -320,6 +356,8 @@ cql::cql_session_impl_t::connect( continue; } + boost::recursive_mutex::scoped_lock lock(_mutex); + cql_endpoint_t host_address = host->endpoint(); tried_hosts->push_back(host_address); cql_connections_collection_t* connections = add_to_connection_pool(host_address);