Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions examples/callbacks/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@

uv_mutex_t mutex;
uv_cond_t cond;
int exit_flag = 0;
CassFuture* close_future = NULL;
CassUuidGen* uuid_gen = NULL;

void wait_exit() {
uv_mutex_lock(&mutex);
while (close_future == NULL) {
while (exit_flag == 0) {
uv_cond_wait(&cond, &mutex);
}
uv_mutex_unlock(&mutex);
cass_future_wait(close_future);
cass_future_free(close_future);
if (close_future) {
cass_future_wait(close_future);
cass_future_free(close_future);
}
}

void signal_exit(CassSession* session) {
uv_mutex_lock(&mutex);
close_future = cass_session_close(session);
if (session) {
close_future = cass_session_close(session);
}
exit_flag = 1;
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}

void on_set_keyspace(CassFuture* future, void* data);
void on_create_keyspace(CassFuture* future, void* data);
void on_create_table(CassFuture* future, void* data);

Expand All @@ -79,7 +86,7 @@ CassCluster* create_cluster() {
}

void connect_session(CassSession* session, const CassCluster* cluster, CassFutureCallback callback) {
CassFuture* future = cass_session_connect_keyspace(session, cluster, "examples");
CassFuture* future = cass_session_connect(session, cluster);
cass_future_set_callback(future, callback, session);
cass_future_free(future);
}
Expand All @@ -99,7 +106,7 @@ void on_session_connect(CassFuture* future, void* data) {

if (code != CASS_OK) {
print_error(future);
uv_cond_signal(&cond);
signal_exit(NULL);
return;
}

Expand All @@ -115,6 +122,17 @@ void on_create_keyspace(CassFuture* future, void* data) {
print_error(future);
}

execute_query((CassSession*)data,
"USE examples",
on_set_keyspace);
}

void on_set_keyspace(CassFuture* future, void* data) {
CassError code = cass_future_error_code(future);
if (code != CASS_OK) {
print_error(future);
}

execute_query((CassSession*)data,
"CREATE TABLE callbacks "
"(key timeuuid PRIMARY KEY, value bigint)",
Expand Down
8 changes: 6 additions & 2 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ void Connection::StartupHandler::on_set(ResponseMessage* response) {
ErrorResponse* error
= static_cast<ErrorResponse*>(response->response_body().get());
if (error->code() == CQL_ERROR_PROTOCOL_ERROR &&
error->message().to_string().find("Invalid or unsupported protocol version") != std::string::npos) {
error->message().find("Invalid or unsupported protocol version") != StringRef::npos) {
connection_->notify_error(error->message().to_string(), CONNECTION_ERROR_INVALID_PROTOCOL);
} else if (error->code() == CQL_ERROR_BAD_CREDENTIALS) {
connection_->notify_error(error->message().to_string(), CONNECTION_ERROR_AUTH);
} else if (error->code() == CQL_ERROR_INVALID_QUERY &&
error->message().find("Keyspace") == 0 &&
error->message().find("does not exist") != StringRef::npos) {
connection_->notify_error("Received error response " + error->error_message(), CONNECTION_ERROR_KEYSPACE);
} else {
connection_->notify_error("Received error response " + error->error_message());
}
Expand Down Expand Up @@ -743,7 +747,7 @@ void Connection::on_ready() {
if (keyspace_.empty()) {
notify_ready();
} else {
write(new StartupHandler(this, new QueryRequest("use \"" + keyspace_ + "\"")));
write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\"")));
}
}

Expand Down
9 changes: 2 additions & 7 deletions src/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class Connection {
CONNECTION_ERROR_TIMEOUT,
CONNECTION_ERROR_INVALID_PROTOCOL,
CONNECTION_ERROR_AUTH,
CONNECTION_ERROR_SSL
CONNECTION_ERROR_SSL,
CONNECTION_ERROR_KEYSPACE
};

class Listener {
Expand Down Expand Up @@ -130,12 +131,6 @@ class Connection {
bool is_ssl_error() const { return error_code_ == CONNECTION_ERROR_SSL; }
bool is_timeout_error() const { return error_code_ == CONNECTION_ERROR_TIMEOUT; }

bool is_critical_failure() const {
return error_code_ == CONNECTION_ERROR_INVALID_PROTOCOL ||
error_code_ == CONNECTION_ERROR_AUTH ||
error_code_ == CONNECTION_ERROR_SSL;
}

ConnectionError error_code() const { return error_code_; }
const std::string& error_message() const { return error_message_; }

Expand Down
29 changes: 7 additions & 22 deletions src/io_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ IOWorker::IOWorker(Session* session)
, config_(session->config())
, metrics_(session->metrics())
, protocol_version_(-1)
, keyspace_(new std::string)
, pending_request_count_(0)
, request_queue_(config_.queue_size_io()) {
prepare_.data = this;
uv_mutex_init(&keyspace_mutex_);
uv_mutex_init(&unavailable_addresses_mutex_);
}

IOWorker::~IOWorker() {
uv_mutex_destroy(&keyspace_mutex_);
uv_mutex_destroy(&unavailable_addresses_mutex_);
}

Expand All @@ -56,26 +55,8 @@ int IOWorker::init() {
return rc;
}

std::string IOWorker::keyspace() {
// Not returned by reference on purpose. This memory can't be shared
// because it could be updated as a result of a "USE <keyspace" query.
ScopedMutex lock(&keyspace_mutex_);
return keyspace_;
}

void IOWorker::set_keyspace(const std::string& keyspace) {
ScopedMutex lock(&keyspace_mutex_);
keyspace_ = keyspace;
}

bool IOWorker::is_current_keyspace(const std::string& keyspace) {
// This mutex is locked on the query request path, but it should
// almost never have any contention. That could only happen when
// there is a "USE <keyspace>" query. These *should* happen
// infrequently. This is preferred over IOWorker::keyspace()
// because it doesn't allocate memory.
ScopedMutex lock(&keyspace_mutex_);
return keyspace_ == keyspace;
keyspace_ = CopyOnWritePtr<std::string>(new std::string(keyspace));
}

void IOWorker::broadcast_keyspace_change(const std::string& keyspace) {
Expand Down Expand Up @@ -185,7 +166,11 @@ void IOWorker::request_finished(RequestHandler* request_handler) {

void IOWorker::notify_pool_ready(Pool* pool) {
if (pool->is_initial_connection()) {
session_->notify_ready_async();
if (pool->is_keyspace_error()) {
session_->notify_keyspace_error_async();
} else {
session_->notify_ready_async();
}
} else if (is_ready() && pool->is_ready()){
session_->notify_up_async(pool->address());
}
Expand Down
8 changes: 3 additions & 5 deletions src/io_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "address.hpp"
#include "atomic.hpp"
#include "async_queue.hpp"
#include "copy_on_write_ptr.hpp"
#include "constants.hpp"
#include "event_thread.hpp"
#include "logger.hpp"
Expand Down Expand Up @@ -84,10 +85,8 @@ class IOWorker
protocol_version_.store(protocol_version);
}

std::string keyspace();
const CopyOnWritePtr<std::string> keyspace() const { return keyspace_; }
void set_keyspace(const std::string& keyspace);

bool is_current_keyspace(const std::string& keyspace);
void broadcast_keyspace_change(const std::string& keyspace);

void set_host_is_available(const Address& address, bool is_available);
Expand Down Expand Up @@ -141,8 +140,7 @@ class IOWorker
Atomic<int> protocol_version_;
uv_prepare_t prepare_;

std::string keyspace_;
uv_mutex_t keyspace_mutex_;
CopyOnWritePtr<std::string> keyspace_;

AddressSet unavailable_addresses_;
uv_mutex_t unavailable_addresses_mutex_;
Expand Down
22 changes: 12 additions & 10 deletions src/pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ Pool::Pool(IOWorker* io_worker,
, config_(io_worker->config())
, metrics_(io_worker->metrics())
, state_(POOL_STATE_NEW)
, connection_error_(Connection::CONNECTION_OK)
, available_connection_count_(0)
, is_available_(false)
, is_initial_connection_(is_initial_connection)
, is_critical_failure_(false)
, is_pending_flush_(false)
, cancel_reconnect_(false) { }

Expand Down Expand Up @@ -196,16 +196,16 @@ void Pool::set_is_available(bool is_available) {

bool Pool::write(Connection* connection, RequestHandler* request_handler) {
request_handler->set_pool(this);
if (io_worker_->is_current_keyspace(connection->keyspace())) {
if (*io_worker_->keyspace() == connection->keyspace()) {
if (!connection->write(request_handler, false)) {
return false;
}
} else {
LOG_DEBUG("Setting keyspace %s on connection(%p) pool(%p)",
io_worker_->keyspace().c_str(),
io_worker_->keyspace()->c_str(),
static_cast<void*>(connection),
static_cast<void*>(this));
if (!connection->write(new SetKeyspaceHandler(connection, io_worker_->keyspace(),
if (!connection->write(new SetKeyspaceHandler(connection, *io_worker_->keyspace(),
request_handler), false)) {
return false;
}
Expand Down Expand Up @@ -247,7 +247,7 @@ void Pool::spawn_connection() {
Connection* connection =
new Connection(loop_, config_, metrics_,
address_,
io_worker_->keyspace(),
*io_worker_->keyspace(),
io_worker_->protocol_version(),
this);

Expand Down Expand Up @@ -309,16 +309,18 @@ void Pool::on_close(Connection* connection) {
if (connection->is_timeout_error() && !connections_.empty()) {
if (!connect_timer.is_running()) {
connect_timer.start(loop_,
config_.reconnect_wait_time_ms(),
this, on_partial_reconnect);
config_.reconnect_wait_time_ms(),
this, on_partial_reconnect);
}
maybe_notify_ready();
} else if (connection->is_defunct()) {
// If at least one connection has a critical failure then don't try to
// reconnect automatically.
if (connection->is_critical_failure()) {
is_critical_failure_ = true;
// reconnect automatically. Also, don't set the error to something else if
// it has already been set to something critical.
if (!is_critical_failure()) {
connection_error_ = connection->error_code();
}

close();
} else {
maybe_notify_ready();
Expand Down
13 changes: 11 additions & 2 deletions src/pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,18 @@ class Pool : public RefCounted<Pool>

const Address& address() const { return address_; }

Connection::ConnectionError connection_error() const { return connection_error_; }

bool is_initial_connection() const { return is_initial_connection_; }
bool is_ready() const { return state_ == POOL_STATE_READY; }
bool is_critical_failure() const { return is_critical_failure_; }
bool is_keyspace_error() const {
return connection_error_ == Connection::CONNECTION_ERROR_KEYSPACE;
}
bool is_critical_failure() const {
return connection_error_ == Connection::CONNECTION_ERROR_INVALID_PROTOCOL ||
connection_error_ == Connection::CONNECTION_ERROR_AUTH ||
connection_error_ == Connection::CONNECTION_ERROR_SSL;
}
bool cancel_reconnect() const { return cancel_reconnect_; }

void return_connection(Connection* connection);
Expand Down Expand Up @@ -106,13 +115,13 @@ class Pool : public RefCounted<Pool>
Metrics* metrics_;

PoolState state_;
Connection::ConnectionError connection_error_;
ConnectionVec connections_;
ConnectionSet connections_pending_;
List<Handler> pending_requests_;
int available_connection_count_;
bool is_available_;
bool is_initial_connection_;
bool is_critical_failure_;
bool is_pending_flush_;
bool cancel_reconnect_;

Expand Down
Loading