Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ set(PROJECT_VERSION_STRING ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${P

# todo: add version header

if ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug")
add_definitions (-D_DEBUG=1 -O0 -g)
endif()

#-------------------
# setup deps
#-------------------
Expand Down
8 changes: 4 additions & 4 deletions extra/ccm_bridge/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
#

IP_PREFIX=192.168.13.
CASSANDRA_VERSION=1.2.5
CASSANDRA_VERSION=1.2.8

# Addresses MUST be in IP form

# Amazon
# SSH_HOST=54.224.125.58
SSH_HOST=127.0.0.1
SSH_HOST=192.168.13.1
SSH_PORT=22
SSH_USERNAME=!mc
SSH_PASSWORD=passpass
SSH_USERNAME=user
SSH_PASSWORD=password

#
# Use NO or YES for binary options
Expand Down
21 changes: 20 additions & 1 deletion include/cql/cql_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class cql_pooling_options_t
class cql_policies_t {
public:
cql_policies_t() :
_load_balancing_policy(new cql_round_robin_policy_t()),
_load_balancing_policy(default_load_balancing_policy()),
_reconnection_policy(
new cql_exponential_reconnection_policy_t(
boost::posix_time::seconds(1), // base dealy:
Expand Down Expand Up @@ -288,6 +288,21 @@ class cql_policies_t {
return _retry_policy;
}

static boost::shared_ptr<cql_load_balancing_policy_t>
default_load_balancing_policy()
{
boost::unique_lock<boost::mutex> lock(_mutex);
boost::shared_ptr<cql_load_balancing_policy_t> retVal = _default_load_balancing_policy;
return retVal;
}

static void
default_load_balancing_policy(boost::shared_ptr<cql_load_balancing_policy_t> policy)
{
boost::unique_lock<boost::mutex> lock(_mutex);
_default_load_balancing_policy = policy;
}

private:
friend class cql_configuration_t;

Expand All @@ -301,6 +316,10 @@ class cql_policies_t {
boost::shared_ptr<cql_load_balancing_policy_t> _load_balancing_policy;
boost::shared_ptr<cql_reconnection_policy_t> _reconnection_policy;
boost::shared_ptr<cql_retry_policy_t> _retry_policy;

static boost::shared_ptr<cql_load_balancing_policy_t> _default_load_balancing_policy;

static boost::mutex _mutex;
};

class cql_configuration_t
Expand Down
5 changes: 5 additions & 0 deletions include/cql/cql_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class CQL_EXPORT cql_cluster_t: boost::noncopyable {
virtual boost::shared_ptr<cql_metadata_t>
metadata() const = 0;

// refresh hosts. generally managed internally, but if a host is stuck on having
// datacenter or rack with the initialized but unset value "unknown", then this
// call may be able to fix it.
virtual void refresh_hosts() = 0;

virtual inline
~cql_cluster_t() { }
};
Expand Down
20 changes: 16 additions & 4 deletions include/cql/cql_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,27 @@ namespace cql {
return _endpoint;
}

inline const std::string&
inline const std::string
datacenter() const
{
return _datacenter;
boost::unique_lock<boost::mutex> lock(_mutex);
std::string retVal = _datacenter.c_str(); // dont share buffer across threads
return retVal;
}

inline const std::string&
// avoid string allocation
inline bool in_datacenter(const std::string& datacenter) const
{
boost::unique_lock<boost::mutex> lock(_mutex);
return _datacenter == datacenter;
}

inline const std::string
rack() const
{
return _rack;
boost::unique_lock<boost::mutex> lock(_mutex);
std::string retVal = _rack.c_str(); // dont share buffer across threads
return retVal;
}

bool
Expand Down Expand Up @@ -90,6 +101,7 @@ namespace cql {
boost::posix_time::ptime _next_up_time;
boost::shared_ptr<cql_reconnection_policy_t> _reconnection_policy;
boost::shared_ptr<cql_reconnection_schedule_t> _reconnection_schedule;
mutable boost::mutex _mutex;
};
}

Expand Down
8 changes: 8 additions & 0 deletions include/cql/internal/cql_cluster_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ class cql_cluster_impl_t :
return _metadata;
}

virtual void refresh_hosts()
{
if (_control_connection)
{
_control_connection->refresh_hosts();
}
}

friend class cql_metadata_t;

private:
Expand Down
26 changes: 16 additions & 10 deletions include/cql/internal/cql_connection_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1068,17 +1068,23 @@ class cql_connection_impl_t : public cql::cql_connection_t
inline void
check_transport_err(const boost::system::error_code& err)
{
if (!_closing && !_transport->lowest_layer().is_open()) {
_ready = false;
_defunct = true;
}
try
{
if (!_closing && !_transport->lowest_layer().is_open()) {
_ready = false;
_defunct = true;
}

if (_connect_errback && !_closing) {
cql::cql_error_t e;
e.transport = true;
e.code = err.value();
e.message = err.message();
_connect_errback(*this, e);
if (_connect_errback && !_closing) {
cql::cql_error_t e;
e.transport = true;
e.code = err.value();
e.message = err.message();
_connect_errback(*this, e);
}
} catch (std::exception& ex)
{
std::cout << "in check_transport_err caught: " << ex.what() << std::endl;
}
}

Expand Down
6 changes: 3 additions & 3 deletions include/cql/internal/cql_control_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class cql_control_connection_t :
virtual
~cql_control_connection_t();

bool
refresh_hosts();

private:

void
Expand All @@ -60,9 +63,6 @@ class cql_control_connection_t :
setup_control_connection(
bool refresh_only = false);

bool
refresh_hosts();

void
reconnection_callback(
const boost::system::error_code& err);
Expand Down
2 changes: 2 additions & 0 deletions include/cql/internal/cql_promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class cql_promise_t {
{
boost::mutex::scoped_lock lock(*_mutex);
if (!*_value_set) {
*_value_set = true;
_promise->set_value(value);
return true;
}
Expand All @@ -44,6 +45,7 @@ class cql_promise_t {
{
boost::mutex::scoped_lock lock(*_mutex);
if (!*_value_set) {
*_value_set = true;
_promise->set_exception(exception);
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/cql/cql_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#include "cql/cql_builder.hpp"
#include "cql/internal/cql_util.hpp"

boost::shared_ptr<cql::cql_load_balancing_policy_t>
cql::cql_policies_t::_default_load_balancing_policy(new cql::cql_round_robin_policy_t());
boost::mutex cql::cql_policies_t::_mutex;

cql::cql_builder_t&
cql::cql_builder_t::add_contact_point(
const ::boost::asio::ip::address& address)
Expand Down
1 change: 1 addition & 0 deletions src/cql/cql_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cql::cql_host_t::set_location_info(
const std::string& datacenter,
const std::string& rack )
{
boost::unique_lock<boost::mutex> lock(_mutex);
_datacenter = datacenter;
_rack = rack;
}
Expand Down
40 changes: 39 additions & 1 deletion src/cql/internal/cql_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,42 @@
#include "cql/internal/cql_socket.hpp"
#include "cql/internal/cql_connection_impl.hpp"

namespace
{
class endless_dump_t
{
public:

void store_connection(boost::shared_ptr<cql::cql_connection_t> bad_conn,
boost::shared_ptr<cql::cql_promise_t<cql::cql_future_connection_t> > promise)
{
MyLock lock(_mutex);
if (bad_conn)
{
bad_conn->close();
_bad_conns.push_back(bad_conn_data(bad_conn, promise));
}
}

private:
typedef boost::unique_lock<boost::mutex> MyLock;
boost::mutex _mutex;
struct bad_conn_data
{
bad_conn_data( boost::shared_ptr<cql::cql_connection_t> conn,
boost::shared_ptr<cql::cql_promise_t<cql::cql_future_connection_t> > promise)
: _conn(conn),
_promise(promise)
{
}

boost::shared_ptr<cql::cql_connection_t> _conn;
boost::shared_ptr<cql::cql_promise_t<cql::cql_future_connection_t> > _promise;
};
std::list< bad_conn_data > _bad_conns;
};
endless_dump_t endless_dump;
}

cql::cql_session_impl_t::cql_session_impl_t(
const cql_session_callback_info_t& callbacks,
Expand Down Expand Up @@ -227,9 +263,9 @@ cql::cql_session_impl_t::allocate_connection(

if (shared_future.get().error.is_err()) {
decrease_connection_counter(host);
endless_dump.store_connection(connection, promise);
throw cql_connection_allocation_error(
("Error when connecting to host: " + host->endpoint().to_string()).c_str());
connection.reset();
}

return connection;
Expand Down Expand Up @@ -320,6 +356,8 @@ cql::cql_session_impl_t::connect(
continue;
}

boost::recursive_mutex::scoped_lock lock(_mutex);

cql_endpoint_t host_address = host->endpoint();
tried_hosts->push_back(host_address);
cql_connections_collection_t* connections = add_to_connection_pool(host_address);
Expand Down