Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ void Connection::internal_close(ConnectionState close_state) {
heartbeat_timer_.stop();
terminate_timer_.stop();
connect_timer_.stop();
if (state_ == CONNECTION_STATE_CONNECTED ||
state_ == CONNECTION_STATE_READY) {
uv_read_stop(copy_cast<uv_tcp_t*, uv_stream_t*>(&socket_));
}
set_state(close_state);
uv_close(handle, on_close);
}
Expand Down Expand Up @@ -435,11 +431,10 @@ void Connection::consume(char* input, size_t size) {
// A successful read means the connection is still responsive
restart_terminate_timer();

while (remaining != 0) {
while (remaining != 0 && !is_closing()) {
ssize_t consumed = response_->decode(buffer, remaining);
if (consumed <= 0) {
notify_error("Error consuming message");
remaining = 0;
continue;
}

Expand All @@ -460,6 +455,7 @@ void Connection::consume(char* input, size_t size) {
} else {
notify_error("Invalid response opcode for event stream: " +
opcode_to_string(response->opcode()));
continue;
}
} else {
Handler* handler = NULL;
Expand Down Expand Up @@ -500,6 +496,7 @@ void Connection::consume(char* input, size_t size) {
}
} else {
notify_error("Invalid stream ID");
continue;
}
}
}
Expand Down
38 changes: 25 additions & 13 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
void ControlConnection::refresh_node_info(SharedRefPtr<Host> host,
bool is_new_node,
bool query_tokens) {
if (connection_ == NULL) {
if (connection_ == NULL || !connection_->is_ready()) {
return;
}

Expand Down Expand Up @@ -659,6 +659,7 @@ void ControlConnection::refresh_node_info(SharedRefPtr<Host> host,
data));
if (!connection_->write(handler.get())) {
LOG_ERROR("No more stream available while attempting to refresh node info");
connection_->defunct();
}
}

Expand Down Expand Up @@ -806,11 +807,14 @@ void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {

LOG_DEBUG("Refreshing keyspace %s", query.c_str());

connection_->write(
if (!connection_->write(
new ControlHandler<std::string>(new QueryRequest(query),
this,
ControlConnection::on_refresh_keyspace,
keyspace_name.to_string()));
keyspace_name.to_string()))) {
LOG_ERROR("No more stream available while attempting to refresh keyspace info");
connection_->defunct();
}
}

void ControlConnection::on_refresh_keyspace(ControlConnection* control_connection,
Expand Down Expand Up @@ -936,11 +940,15 @@ void ControlConnection::refresh_type(const StringRef& keyspace_name,

LOG_DEBUG("Refreshing type %s", query.c_str());

connection_->write(
new ControlHandler<std::pair<std::string, std::string> >(new QueryRequest(query),
this,
ControlConnection::on_refresh_type,
std::make_pair(keyspace_name.to_string(), type_name.to_string())));
if (!connection_->write(
new ControlHandler<std::pair<std::string, std::string> >(
new QueryRequest(query),
this,
ControlConnection::on_refresh_type,
std::make_pair(keyspace_name.to_string(), type_name.to_string())))) {
LOG_ERROR("No more stream available while attempting to refresh type info");
connection_->defunct();
}
}

void ControlConnection::on_refresh_type(ControlConnection* control_connection,
Expand Down Expand Up @@ -1002,11 +1010,15 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
request->set(1, CassString(function_name.data(), function_name.size()));
request->set(2, signature.get());

connection_->write(
new ControlHandler<RefreshFunctionData>(request.get(),
this,
ControlConnection::on_refresh_function,
RefreshFunctionData(keyspace_name, function_name, arg_types, is_aggregate)));
if (!connection_->write(
new ControlHandler<RefreshFunctionData>(
request.get(),
this,
ControlConnection::on_refresh_function,
RefreshFunctionData(keyspace_name, function_name, arg_types, is_aggregate)))) {
LOG_ERROR("No more stream available while attempting to refresh function info");
connection_->defunct();
}
}

void ControlConnection::on_refresh_function(ControlConnection* control_connection,
Expand Down