From 8c2e8269047824b10a303263caf830c6ea76ee1e Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Thu, 1 Sep 2016 12:49:58 -0700 Subject: [PATCH 1/2] Fix: Connection incorrectly marked READY after being marked DEFUNCT --- src/connection.cpp | 5 +++-- src/control_connection.cpp | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 1d91c55b8..374a015a7 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -435,11 +435,10 @@ void Connection::consume(char* input, size_t size) { // A successful read means the connection is still responsive restart_terminate_timer(); - while (remaining != 0) { + while (remaining != 0 && !is_closing()) { ssize_t consumed = response_->decode(buffer, remaining); if (consumed <= 0) { notify_error("Error consuming message"); - remaining = 0; continue; } @@ -460,6 +459,7 @@ void Connection::consume(char* input, size_t size) { } else { notify_error("Invalid response opcode for event stream: " + opcode_to_string(response->opcode())); + continue; } } else { Handler* handler = NULL; @@ -500,6 +500,7 @@ void Connection::consume(char* input, size_t size) { } } else { notify_error("Invalid stream ID"); + continue; } } } diff --git a/src/control_connection.cpp b/src/control_connection.cpp index bbddffa8f..a0ea1f871 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -624,7 +624,7 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti void ControlConnection::refresh_node_info(SharedRefPtr host, bool is_new_node, bool query_tokens) { - if (connection_ == NULL) { + if (connection_ == NULL || !connection_->is_ready()) { return; } From 52d422b04f5601be0ba530ad4cc88cee867fa3b9 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Thu, 1 Sep 2016 12:58:50 -0700 Subject: [PATCH 2/2] Improvement: Removed redundant call and better error handling --- src/connection.cpp | 4 ---- src/control_connection.cpp | 36 ++++++++++++++++++++++++------------ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 374a015a7..012343754 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -349,10 +349,6 @@ void Connection::internal_close(ConnectionState close_state) { heartbeat_timer_.stop(); terminate_timer_.stop(); connect_timer_.stop(); - if (state_ == CONNECTION_STATE_CONNECTED || - state_ == CONNECTION_STATE_READY) { - uv_read_stop(copy_cast(&socket_)); - } set_state(close_state); uv_close(handle, on_close); } diff --git a/src/control_connection.cpp b/src/control_connection.cpp index a0ea1f871..6d20b9657 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -659,6 +659,7 @@ void ControlConnection::refresh_node_info(SharedRefPtr host, data)); if (!connection_->write(handler.get())) { LOG_ERROR("No more stream available while attempting to refresh node info"); + connection_->defunct(); } } @@ -806,11 +807,14 @@ void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) { LOG_DEBUG("Refreshing keyspace %s", query.c_str()); - connection_->write( + if (!connection_->write( new ControlHandler(new QueryRequest(query), this, ControlConnection::on_refresh_keyspace, - keyspace_name.to_string())); + keyspace_name.to_string()))) { + LOG_ERROR("No more stream available while attempting to refresh keyspace info"); + connection_->defunct(); + } } void ControlConnection::on_refresh_keyspace(ControlConnection* control_connection, @@ -936,11 +940,15 @@ void ControlConnection::refresh_type(const StringRef& keyspace_name, LOG_DEBUG("Refreshing type %s", query.c_str()); - connection_->write( - new ControlHandler >(new QueryRequest(query), - this, - ControlConnection::on_refresh_type, - std::make_pair(keyspace_name.to_string(), type_name.to_string()))); + if (!connection_->write( + new ControlHandler >( + new QueryRequest(query), + this, + ControlConnection::on_refresh_type, + std::make_pair(keyspace_name.to_string(), type_name.to_string())))) { + LOG_ERROR("No more stream available while attempting to refresh type info"); + connection_->defunct(); + } } void ControlConnection::on_refresh_type(ControlConnection* control_connection, @@ -1002,11 +1010,15 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name, request->set(1, CassString(function_name.data(), function_name.size())); request->set(2, signature.get()); - connection_->write( - new ControlHandler(request.get(), - this, - ControlConnection::on_refresh_function, - RefreshFunctionData(keyspace_name, function_name, arg_types, is_aggregate))); + if (!connection_->write( + new ControlHandler( + request.get(), + this, + ControlConnection::on_refresh_function, + RefreshFunctionData(keyspace_name, function_name, arg_types, is_aggregate)))) { + LOG_ERROR("No more stream available while attempting to refresh function info"); + connection_->defunct(); + } } void ControlConnection::on_refresh_function(ControlConnection* control_connection,