From 690c7d04d273dcc3d5be50ab98ac4f4d17b96741 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Fri, 23 Oct 2015 11:10:46 -0700 Subject: [PATCH 1/3] Use copy-on-write for current keyspace --- src/io_worker.cpp | 23 ++--------------------- src/io_worker.hpp | 8 +++----- src/pool.cpp | 8 ++++---- src/session.cpp | 16 ++++++++-------- src/session.hpp | 2 ++ 5 files changed, 19 insertions(+), 38 deletions(-) diff --git a/src/io_worker.cpp b/src/io_worker.cpp index 0adb358d2..b2ea4f463 100644 --- a/src/io_worker.cpp +++ b/src/io_worker.cpp @@ -32,15 +32,14 @@ IOWorker::IOWorker(Session* session) , config_(session->config()) , metrics_(session->metrics()) , protocol_version_(-1) + , keyspace_(new std::string) , pending_request_count_(0) , request_queue_(config_.queue_size_io()) { prepare_.data = this; - uv_mutex_init(&keyspace_mutex_); uv_mutex_init(&unavailable_addresses_mutex_); } IOWorker::~IOWorker() { - uv_mutex_destroy(&keyspace_mutex_); uv_mutex_destroy(&unavailable_addresses_mutex_); } @@ -56,26 +55,8 @@ int IOWorker::init() { return rc; } -std::string IOWorker::keyspace() { - // Not returned by reference on purpose. This memory can't be shared - // because it could be updated as a result of a "USE " query. These *should* happen - // infrequently. This is preferred over IOWorker::keyspace() - // because it doesn't allocate memory. - ScopedMutex lock(&keyspace_mutex_); - return keyspace_ == keyspace; + keyspace_ = CopyOnWritePtr(new std::string(keyspace)); } void IOWorker::broadcast_keyspace_change(const std::string& keyspace) { diff --git a/src/io_worker.hpp b/src/io_worker.hpp index 4964280ca..0a3a79f51 100644 --- a/src/io_worker.hpp +++ b/src/io_worker.hpp @@ -20,6 +20,7 @@ #include "address.hpp" #include "atomic.hpp" #include "async_queue.hpp" +#include "copy_on_write_ptr.hpp" #include "constants.hpp" #include "event_thread.hpp" #include "logger.hpp" @@ -84,10 +85,8 @@ class IOWorker protocol_version_.store(protocol_version); } - std::string keyspace(); + const CopyOnWritePtr keyspace() const { return keyspace_; } void set_keyspace(const std::string& keyspace); - - bool is_current_keyspace(const std::string& keyspace); void broadcast_keyspace_change(const std::string& keyspace); void set_host_is_available(const Address& address, bool is_available); @@ -141,8 +140,7 @@ class IOWorker Atomic protocol_version_; uv_prepare_t prepare_; - std::string keyspace_; - uv_mutex_t keyspace_mutex_; + CopyOnWritePtr keyspace_; AddressSet unavailable_addresses_; uv_mutex_t unavailable_addresses_mutex_; diff --git a/src/pool.cpp b/src/pool.cpp index 982c09032..ecaa24d29 100644 --- a/src/pool.cpp +++ b/src/pool.cpp @@ -196,16 +196,16 @@ void Pool::set_is_available(bool is_available) { bool Pool::write(Connection* connection, RequestHandler* request_handler) { request_handler->set_pool(this); - if (io_worker_->is_current_keyspace(connection->keyspace())) { + if (*io_worker_->keyspace() == connection->keyspace()) { if (!connection->write(request_handler, false)) { return false; } } else { LOG_DEBUG("Setting keyspace %s on connection(%p) pool(%p)", - io_worker_->keyspace().c_str(), + io_worker_->keyspace()->c_str(), static_cast(connection), static_cast(this)); - if (!connection->write(new SetKeyspaceHandler(connection, io_worker_->keyspace(), + if (!connection->write(new SetKeyspaceHandler(connection, *io_worker_->keyspace(), request_handler), false)) { return false; } @@ -247,7 +247,7 @@ void Pool::spawn_connection() { Connection* connection = new Connection(loop_, config_, metrics_, address_, - io_worker_->keyspace(), + *io_worker_->keyspace(), io_worker_->protocol_version(), this); diff --git a/src/session.cpp b/src/session.cpp index 9fdf9c485..b8d84ee3b 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -140,7 +140,8 @@ Session::Session() , pending_resolve_count_(0) , pending_pool_count_(0) , pending_workers_count_(0) - , current_io_worker_(0) { + , current_io_worker_(0) + , keyspace_(new std::string){ uv_mutex_init(&state_mutex_); uv_mutex_init(&hosts_mutex_); } @@ -193,13 +194,15 @@ int Session::init() { void Session::broadcast_keyspace_change(const std::string& keyspace, const IOWorker* calling_io_worker) { // This can run on an IO worker thread. This is thread-safe because the IO workers - // vector never changes after initialization and IOWorker::set_keyspace() uses a mutex. - // This also means that calling "USE " frequently is an anti-pattern. + // vector never changes after initialization and IOWorker::set_keyspace() uses + // copy-on-write. This also means that calling "USE " frequently is an + // anti-pattern. for (IOWorkerVec::iterator it = io_workers_.begin(), end = io_workers_.end(); it != end; ++it) { if (*it == calling_io_worker) continue; (*it)->set_keyspace(keyspace); } + keyspace_ = CopyOnWritePtr(new std::string(keyspace)); } SharedRefPtr Session::get_host(const Address& address) { @@ -660,11 +663,8 @@ void Session::on_execute(uv_async_t* data) { } QueryPlan* Session::new_query_plan(const Request* request, Request::EncodingCache* cache) { - std::string connected_keyspace; - if (!io_workers_.empty()) { - connected_keyspace = io_workers_[0]->keyspace(); - } - return load_balancing_policy_->new_query_plan(connected_keyspace, request, + const CopyOnWritePtr keyspace(keyspace_); + return load_balancing_policy_->new_query_plan(*keyspace, request, metadata_.token_map(), cache); } diff --git a/src/session.hpp b/src/session.hpp index 296d384f9..513cb05d6 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -177,6 +177,8 @@ class Session : public EventThread { int pending_pool_count_; int pending_workers_count_; int current_io_worker_; + + CopyOnWritePtr keyspace_; }; class SessionFuture : public Future { From d29d05bff54fa62868c1c66322d39ff30a6f7a53 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Mon, 29 Feb 2016 12:44:01 -0700 Subject: [PATCH 2/3] Handle error when connecting with keyspace that doesn't exist --- src/connection.cpp | 8 +++++-- src/connection.hpp | 9 ++------ src/io_worker.cpp | 6 ++++- src/pool.cpp | 14 +++++++----- src/pool.hpp | 13 +++++++++-- src/session.cpp | 24 ++++++++++++++++++-- src/session.hpp | 6 ++++- src/set_keyspace_handler.cpp | 14 ++++++------ src/string_ref.hpp | 8 +++++++ test/integration_tests/src/test_sessions.cpp | 12 ++-------- test/unit_tests/src/test_string_ref.cpp | 17 ++++++++++++++ 11 files changed, 93 insertions(+), 38 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index b856c0244..7bbc38561 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -76,10 +76,14 @@ void Connection::StartupHandler::on_set(ResponseMessage* response) { ErrorResponse* error = static_cast(response->response_body().get()); if (error->code() == CQL_ERROR_PROTOCOL_ERROR && - error->message().to_string().find("Invalid or unsupported protocol version") != std::string::npos) { + error->message().find("Invalid or unsupported protocol version") != StringRef::npos) { connection_->notify_error(error->message().to_string(), CONNECTION_ERROR_INVALID_PROTOCOL); } else if (error->code() == CQL_ERROR_BAD_CREDENTIALS) { connection_->notify_error(error->message().to_string(), CONNECTION_ERROR_AUTH); + } else if (error->code() == CQL_ERROR_INVALID_QUERY && + error->message().find("Keyspace") == 0 && + error->message().find("does not exist") != StringRef::npos) { + connection_->notify_error("Received error response " + error->error_message(), CONNECTION_ERROR_KEYSPACE); } else { connection_->notify_error("Received error response " + error->error_message()); } @@ -743,7 +747,7 @@ void Connection::on_ready() { if (keyspace_.empty()) { notify_ready(); } else { - write(new StartupHandler(this, new QueryRequest("use \"" + keyspace_ + "\""))); + write(new StartupHandler(this, new QueryRequest("USE \"" + keyspace_ + "\""))); } } diff --git a/src/connection.hpp b/src/connection.hpp index b814e0a65..808833dd7 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -65,7 +65,8 @@ class Connection { CONNECTION_ERROR_TIMEOUT, CONNECTION_ERROR_INVALID_PROTOCOL, CONNECTION_ERROR_AUTH, - CONNECTION_ERROR_SSL + CONNECTION_ERROR_SSL, + CONNECTION_ERROR_KEYSPACE }; class Listener { @@ -130,12 +131,6 @@ class Connection { bool is_ssl_error() const { return error_code_ == CONNECTION_ERROR_SSL; } bool is_timeout_error() const { return error_code_ == CONNECTION_ERROR_TIMEOUT; } - bool is_critical_failure() const { - return error_code_ == CONNECTION_ERROR_INVALID_PROTOCOL || - error_code_ == CONNECTION_ERROR_AUTH || - error_code_ == CONNECTION_ERROR_SSL; - } - ConnectionError error_code() const { return error_code_; } const std::string& error_message() const { return error_message_; } diff --git a/src/io_worker.cpp b/src/io_worker.cpp index b2ea4f463..30ddf5470 100644 --- a/src/io_worker.cpp +++ b/src/io_worker.cpp @@ -166,7 +166,11 @@ void IOWorker::request_finished(RequestHandler* request_handler) { void IOWorker::notify_pool_ready(Pool* pool) { if (pool->is_initial_connection()) { - session_->notify_ready_async(); + if (pool->is_keyspace_error()) { + session_->notify_keyspace_error_async(); + } else { + session_->notify_ready_async(); + } } else if (is_ready() && pool->is_ready()){ session_->notify_up_async(pool->address()); } diff --git a/src/pool.cpp b/src/pool.cpp index ecaa24d29..59abc7cdf 100644 --- a/src/pool.cpp +++ b/src/pool.cpp @@ -42,10 +42,10 @@ Pool::Pool(IOWorker* io_worker, , config_(io_worker->config()) , metrics_(io_worker->metrics()) , state_(POOL_STATE_NEW) + , connection_error_(Connection::CONNECTION_OK) , available_connection_count_(0) , is_available_(false) , is_initial_connection_(is_initial_connection) - , is_critical_failure_(false) , is_pending_flush_(false) , cancel_reconnect_(false) { } @@ -309,16 +309,18 @@ void Pool::on_close(Connection* connection) { if (connection->is_timeout_error() && !connections_.empty()) { if (!connect_timer.is_running()) { connect_timer.start(loop_, - config_.reconnect_wait_time_ms(), - this, on_partial_reconnect); + config_.reconnect_wait_time_ms(), + this, on_partial_reconnect); } maybe_notify_ready(); } else if (connection->is_defunct()) { // If at least one connection has a critical failure then don't try to - // reconnect automatically. - if (connection->is_critical_failure()) { - is_critical_failure_ = true; + // reconnect automatically. Also, don't set the error to something else if + // it has already been set to something critical. + if (!is_critical_failure()) { + connection_error_ = connection->error_code(); } + close(); } else { maybe_notify_ready(); diff --git a/src/pool.hpp b/src/pool.hpp index 594fe6f7f..4d68bfbf5 100644 --- a/src/pool.hpp +++ b/src/pool.hpp @@ -66,9 +66,18 @@ class Pool : public RefCounted const Address& address() const { return address_; } + Connection::ConnectionError connection_error() const { return connection_error_; } + bool is_initial_connection() const { return is_initial_connection_; } bool is_ready() const { return state_ == POOL_STATE_READY; } - bool is_critical_failure() const { return is_critical_failure_; } + bool is_keyspace_error() const { + return connection_error_ == Connection::CONNECTION_ERROR_KEYSPACE; + } + bool is_critical_failure() const { + return connection_error_ == Connection::CONNECTION_ERROR_INVALID_PROTOCOL || + connection_error_ == Connection::CONNECTION_ERROR_AUTH || + connection_error_ == Connection::CONNECTION_ERROR_SSL; + } bool cancel_reconnect() const { return cancel_reconnect_; } void return_connection(Connection* connection); @@ -106,13 +115,13 @@ class Pool : public RefCounted Metrics* metrics_; PoolState state_; + Connection::ConnectionError connection_error_; ConnectionVec connections_; ConnectionSet connections_pending_; List pending_requests_; int available_connection_count_; bool is_available_; bool is_initial_connection_; - bool is_critical_failure_; bool is_pending_flush_; bool cancel_reconnect_; diff --git a/src/session.cpp b/src/session.cpp index b8d84ee3b..a5c86698b 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -136,6 +136,7 @@ namespace cass { Session::Session() : state_(SESSION_STATE_CLOSED) + , connect_error_code_(CASS_OK) , current_host_mark_(true) , pending_resolve_count_(0) , pending_pool_count_(0) @@ -256,6 +257,12 @@ bool Session::notify_ready_async() { return send_event_async(event); } +bool Session::notify_keyspace_error_async() { + SessionEvent event; + event.type = SessionEvent::NOTIFY_KEYSPACE_ERROR; + return send_event_async(event); +} + bool Session::notify_worker_closed_async() { SessionEvent event; event.type = SessionEvent::NOTIFY_WORKER_CLOSED; @@ -366,16 +373,20 @@ void Session::notify_connected() { } void Session::notify_connect_error(CassError code, const std::string& message) { + connect_error_code_ = code; + connect_error_message_ = message; ScopedMutex l(&state_mutex_); state_.store(SESSION_STATE_CLOSING, MEMORY_ORDER_RELAXED); internal_close(); - connect_future_->set_error(code, message); - connect_future_.reset(); } void Session::notify_closed() { ScopedMutex l(&state_mutex_); state_.store(SESSION_STATE_CLOSED, MEMORY_ORDER_RELAXED); + if (connect_future_) { + connect_future_->set_error(connect_error_code_, connect_error_message_); + connect_future_.reset(); + } if (close_future_) { close_future_->set(); close_future_.reset(); @@ -442,6 +453,15 @@ void Session::on_event(const SessionEvent& event) { } break; + case SessionEvent::NOTIFY_KEYSPACE_ERROR: { + // Currently, this is only called when the keyspace does not exist + // and not for any other keyspace related errors. + const CopyOnWritePtr keyspace(keyspace_); + notify_connect_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, + "Keyspace '" + *keyspace + "' does not exist"); + break; + } + case SessionEvent::NOTIFY_WORKER_CLOSED: if (--pending_workers_count_ == 0) { LOG_DEBUG("Session is disconnected"); diff --git a/src/session.hpp b/src/session.hpp index 513cb05d6..2b21b8ee5 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -52,13 +52,14 @@ struct SessionEvent { INVALID, CONNECT, NOTIFY_READY, + NOTIFY_KEYSPACE_ERROR, NOTIFY_WORKER_CLOSED, NOTIFY_UP, NOTIFY_DOWN }; SessionEvent() - : type(INVALID) {} + : type(INVALID) { } Type type; Address address; @@ -89,6 +90,7 @@ class Session : public EventThread { SharedRefPtr get_host(const Address& address); bool notify_ready_async(); + bool notify_keyspace_error_async(); bool notify_worker_closed_async(); bool notify_up_async(const Address& address); bool notify_down_async(const Address& address); @@ -162,6 +164,8 @@ class Session : public EventThread { Config config_; ScopedPtr metrics_; ScopedRefPtr load_balancing_policy_; + CassError connect_error_code_; + std::string connect_error_message_; ScopedRefPtr connect_future_; ScopedRefPtr close_future_; diff --git a/src/set_keyspace_handler.cpp b/src/set_keyspace_handler.cpp index e307bed5e..9fe9c325e 100644 --- a/src/set_keyspace_handler.cpp +++ b/src/set_keyspace_handler.cpp @@ -29,10 +29,10 @@ namespace cass { SetKeyspaceHandler::SetKeyspaceHandler(Connection* connection, const std::string& keyspace, RequestHandler* request_handler) - : Handler(new QueryRequest("use \"" + keyspace + "\"")) - , request_handler_(request_handler) { - set_connection(connection); - } + : Handler(new QueryRequest("USE \"" + keyspace + "\"")) + , request_handler_(request_handler) { + set_connection(connection); +} void SetKeyspaceHandler::on_set(ResponseMessage* response) { switch (response->opcode()) { @@ -42,7 +42,7 @@ void SetKeyspaceHandler::on_set(ResponseMessage* response) { case CQL_OPCODE_ERROR: connection_->defunct(); request_handler_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, - "Unable to set keyspace"); + "Unable to set keyspace"); break; default: break; @@ -52,7 +52,7 @@ void SetKeyspaceHandler::on_set(ResponseMessage* response) { void SetKeyspaceHandler::on_error(CassError code, const std::string& message) { connection_->defunct(); request_handler_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, - "Unable to set keyspace"); + "Unable to set keyspace"); } void SetKeyspaceHandler::on_timeout() { @@ -71,7 +71,7 @@ void SetKeyspaceHandler::on_result_response(ResponseMessage* response) { } else { connection_->defunct(); request_handler_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, - "Unable to set keyspace"); + "Unable to set keyspace"); } } diff --git a/src/string_ref.hpp b/src/string_ref.hpp index d9ae06f37..33223e8e1 100644 --- a/src/string_ref.hpp +++ b/src/string_ref.hpp @@ -99,6 +99,14 @@ class StringRef { return StringRef(ptr_ + pos, std::min(length_ - pos, length)); } + size_t find(const StringRef& ref) const { + if (ref.length_ == 0) return 0; + if (length_ == 0) return npos; + const_iterator i = std::search(ptr_, ptr_ + length_, ref.ptr_, ref.ptr_ + ref.length_); + if (i == end()) return npos; + return i - begin(); + } + int compare(const StringRef& ref) const { return compare(ref, IsEqual()); } diff --git a/test/integration_tests/src/test_sessions.cpp b/test/integration_tests/src/test_sessions.cpp index ee7c9ceda..ac5e308bf 100644 --- a/test/integration_tests/src/test_sessions.cpp +++ b/test/integration_tests/src/test_sessions.cpp @@ -73,8 +73,6 @@ BOOST_AUTO_TEST_CASE(connect_invalid_keyspace) { test_utils::CassLog::reset("Received error response 'Keyspace 'invalid' does not exist"); - CassError code; - { test_utils::CassClusterPtr cluster(cass_cluster_new()); @@ -86,17 +84,11 @@ BOOST_AUTO_TEST_CASE(connect_invalid_keyspace) test_utils::CassSessionPtr session(cass_session_new()); test_utils::CassFuturePtr connect_future(cass_session_connect_keyspace(session.get(), cluster.get(), "invalid")); - - const char* query = "SELECT * FROM table"; - test_utils::CassStatementPtr statement(cass_statement_new(query, 0)); - - test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); - - code = cass_future_error_code(future.get()); + CassError code = cass_future_error_code(connect_future.get()); + BOOST_CHECK_EQUAL(code, CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE); } BOOST_CHECK(test_utils::CassLog::message_count() > 0); - BOOST_CHECK_EQUAL(code, CASS_ERROR_LIB_NO_HOSTS_AVAILABLE); } BOOST_AUTO_TEST_CASE(close_timeout_error) diff --git a/test/unit_tests/src/test_string_ref.cpp b/test/unit_tests/src/test_string_ref.cpp index 3e5820548..3167b7920 100644 --- a/test/unit_tests/src/test_string_ref.cpp +++ b/test/unit_tests/src/test_string_ref.cpp @@ -72,6 +72,23 @@ BOOST_AUTO_TEST_CASE(substr) // More tests in "starts_with" and "ends_with" } +BOOST_AUTO_TEST_CASE(find) +{ + cass::StringRef s("abcxyz"); + + BOOST_CHECK(s.find("") == 0); + BOOST_CHECK(s.find("abc") == 0); + BOOST_CHECK(s.find("xyz") == 3); + BOOST_CHECK(s.find("z") == 5); + + BOOST_CHECK(s.find("invalid") == cass::StringRef::npos); + BOOST_CHECK(s.find("abcxyza") == cass::StringRef::npos); + + BOOST_CHECK(s.find("") == 0); + BOOST_CHECK(cass::StringRef("").find("") == 0); +} + + BOOST_AUTO_TEST_CASE(starts_with) { cass::StringRef s("abcxyz"); From 95e3c0a3567560f8de115f81e5e779c5970ba45f Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 2 Mar 2016 15:26:23 -0600 Subject: [PATCH 3/3] Fix callback example - Don't connect_keyspace (in case it's not there) - Fix exit handling for failed session connect --- examples/callbacks/callbacks.c | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/examples/callbacks/callbacks.c b/examples/callbacks/callbacks.c index 55995a4e1..66fa72aa3 100644 --- a/examples/callbacks/callbacks.c +++ b/examples/callbacks/callbacks.c @@ -36,26 +36,33 @@ uv_mutex_t mutex; uv_cond_t cond; +int exit_flag = 0; CassFuture* close_future = NULL; CassUuidGen* uuid_gen = NULL; void wait_exit() { uv_mutex_lock(&mutex); - while (close_future == NULL) { + while (exit_flag == 0) { uv_cond_wait(&cond, &mutex); } uv_mutex_unlock(&mutex); - cass_future_wait(close_future); - cass_future_free(close_future); + if (close_future) { + cass_future_wait(close_future); + cass_future_free(close_future); + } } void signal_exit(CassSession* session) { uv_mutex_lock(&mutex); - close_future = cass_session_close(session); + if (session) { + close_future = cass_session_close(session); + } + exit_flag = 1; uv_cond_signal(&cond); uv_mutex_unlock(&mutex); } +void on_set_keyspace(CassFuture* future, void* data); void on_create_keyspace(CassFuture* future, void* data); void on_create_table(CassFuture* future, void* data); @@ -79,7 +86,7 @@ CassCluster* create_cluster() { } void connect_session(CassSession* session, const CassCluster* cluster, CassFutureCallback callback) { - CassFuture* future = cass_session_connect_keyspace(session, cluster, "examples"); + CassFuture* future = cass_session_connect(session, cluster); cass_future_set_callback(future, callback, session); cass_future_free(future); } @@ -99,7 +106,7 @@ void on_session_connect(CassFuture* future, void* data) { if (code != CASS_OK) { print_error(future); - uv_cond_signal(&cond); + signal_exit(NULL); return; } @@ -115,6 +122,17 @@ void on_create_keyspace(CassFuture* future, void* data) { print_error(future); } + execute_query((CassSession*)data, + "USE examples", + on_set_keyspace); +} + +void on_set_keyspace(CassFuture* future, void* data) { + CassError code = cass_future_error_code(future); + if (code != CASS_OK) { + print_error(future); + } + execute_query((CassSession*)data, "CREATE TABLE callbacks " "(key timeuuid PRIMARY KEY, value bigint)",