Skip to content

Commit 16b90b0

Browse files
committed
Prevent the control connection from timing out as a result of rebuilding the token map
1 parent b3fe72a commit 16b90b0

File tree

4 files changed

+36
-23
lines changed

4 files changed

+36
-23
lines changed

src/control_connection.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,9 @@ void ControlConnection::on_event(EventResponse* response) {
400400
void ControlConnection::query_meta_hosts() {
401401
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler(
402402
new ControlMultipleRequestHandler<UnusedData>(this, ControlConnection::on_query_hosts, UnusedData()));
403+
// This needs to happen before other schema metadata queries so that we have
404+
// a valid Cassandra version because this version determines which follow up
405+
// schema metadata queries are executed.
403406
handler->execute_query("local", token_aware_routing_ ? SELECT_LOCAL_TOKENS : SELECT_LOCAL);
404407
handler->execute_query("peers", token_aware_routing_ ? SELECT_PEERS_TOKENS : SELECT_PEERS);
405408
}
@@ -415,7 +418,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
415418
Session* session = control_connection->session_;
416419

417420
if (session->token_map_) {
418-
session->token_map_->clear_hosts();
421+
// Clearing token/hosts will not invalidate the replicas
422+
session->token_map_->clear_tokens_and_hosts();
419423
}
420424

421425
bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
@@ -433,7 +437,7 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
433437
ResultResponse* local_result;
434438
if (MultipleRequestHandler::get_result_response(responses, "local", &local_result) &&
435439
local_result->row_count() > 0) {
436-
control_connection->update_node_info(host, &local_result->first_row(), ADD_NODE);
440+
control_connection->update_node_info(host, &local_result->first_row(), ADD_HOST);
437441
control_connection->cassandra_version_ = host->cassandra_version();
438442
} else {
439443
LOG_WARN("No row found in %s's local system table",
@@ -472,7 +476,7 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
472476

473477
host->set_mark(session->current_host_mark_);
474478

475-
control_connection->update_node_info(host, rows.row(), ADD_NODE);
479+
control_connection->update_node_info(host, rows.row(), ADD_HOST);
476480
if (is_new && !is_initial_connection) {
477481
session->on_add(host, false);
478482
}
@@ -546,12 +550,12 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
546550
bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
547551

548552
if (session->token_map_) {
549-
session->token_map_->clear_keyspaces();
550-
551553
ResultResponse* keyspaces_result;
552554
if (MultipleRequestHandler::get_result_response(responses, "keyspaces", &keyspaces_result)) {
555+
session->token_map_->clear_replicas_and_strategies(); // Only clear replicas once we have the new keyspaces
553556
session->token_map_->add_keyspaces(cassandra_version, keyspaces_result);
554557
}
558+
session->token_map_->build();
555559
}
556560

557561
if (control_connection->use_schema_) {
@@ -600,10 +604,6 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
600604
session->metadata().swap_to_back_and_update_front();
601605
}
602606

603-
if (session->token_map_) {
604-
session->token_map_->build();
605-
}
606-
607607
if (is_initial_connection) {
608608
control_connection->state_ = CONTROL_STATE_READY;
609609
session->on_control_connection_ready();
@@ -674,7 +674,7 @@ void ControlConnection::on_refresh_node_info(ControlConnection* control_connecti
674674
host_address_str.c_str());
675675
return;
676676
}
677-
control_connection->update_node_info(data.host, &result->first_row(), UPDATE_NODE);
677+
control_connection->update_node_info(data.host, &result->first_row(), UPDATE_HOST_AND_BUILD);
678678

679679
if (data.is_new_node) {
680680
control_connection->session_->on_add(data.host, false);
@@ -712,7 +712,7 @@ void ControlConnection::on_refresh_node_info_all(ControlConnection* control_conn
712712
row->get_by_name("rpc_address"),
713713
&address);
714714
if (is_valid_address && data.host->address().compare(address) == 0) {
715-
control_connection->update_node_info(data.host, row, UPDATE_NODE);
715+
control_connection->update_node_info(data.host, row, UPDATE_HOST_AND_BUILD);
716716
if (data.is_new_node) {
717717
control_connection->session_->on_add(data.host, false);
718718
}
@@ -721,7 +721,7 @@ void ControlConnection::on_refresh_node_info_all(ControlConnection* control_conn
721721
}
722722
}
723723

724-
void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row, UpdateNodeType type) {
724+
void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row, UpdateHostType type) {
725725
const Value* v;
726726

727727
std::string rack;
@@ -774,7 +774,7 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
774774
v = row->get_by_name("tokens");
775775
if (v != NULL && v->is_collection()) {
776776
if (session_->token_map_) {
777-
if (type == UPDATE_NODE) {
777+
if (type == UPDATE_HOST_AND_BUILD) {
778778
session_->token_map_->update_host_and_build(host, v);
779779
} else {
780780
session_->token_map_->add_host(host, v);
@@ -1082,6 +1082,17 @@ void ControlConnection::on_reconnect(Timer* timer) {
10821082
control_connection->reconnect(false);
10831083
}
10841084

1085+
template<class T>
1086+
void ControlConnection::ControlMultipleRequestHandler<T>::execute_query(
1087+
const std::string& index, const std::string& query) {
1088+
// We need to update the loop time to prevent new requests from timing out
1089+
// in cases where a callback took a long time to execute. In the future,
1090+
// we might improve this by executing the these long running callbacks
1091+
// on a seperate thread.
1092+
uv_update_time(control_connection_->session_->loop());
1093+
MultipleRequestHandler::execute_query(index, query);
1094+
}
1095+
10851096
template<class T>
10861097
void ControlConnection::ControlMultipleRequestHandler<T>::on_set(
10871098
const MultipleRequestHandler::ResponseMap& responses) {

src/control_connection.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class ControlConnection : public Connection::Listener {
8787
, response_callback_(response_callback)
8888
, data_(data) {}
8989

90+
void execute_query(const std::string& index, const std::string& query);
91+
9092
virtual void on_set(const MultipleRequestHandler::ResponseMap& responses);
9193

9294
virtual void on_error(CassError code, const std::string& message) {
@@ -177,9 +179,9 @@ class ControlConnection : public Connection::Listener {
177179
bool is_aggregate;
178180
};
179181

180-
enum UpdateNodeType {
181-
ADD_NODE,
182-
UPDATE_NODE
182+
enum UpdateHostType {
183+
ADD_HOST,
184+
UPDATE_HOST_AND_BUILD
183185
};
184186

185187
void schedule_reconnect(uint64_t ms = 0);
@@ -217,7 +219,7 @@ class ControlConnection : public Connection::Listener {
217219
const RefreshNodeData& data,
218220
Response* response);
219221

220-
void update_node_info(SharedRefPtr<Host> host, const Row* row, UpdateNodeType type);
222+
void update_node_info(SharedRefPtr<Host> host, const Row* row, UpdateHostType type);
221223

222224
void refresh_keyspace(const StringRef& keyspace_name);
223225
static void on_refresh_keyspace(ControlConnection* control_connection, const std::string& keyspace_name, Response* response);

src/token_map.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ class TokenMap {
3737
virtual void add_host(const Host::Ptr& host, const Value* tokens) = 0;
3838
virtual void update_host_and_build(const Host::Ptr& host, const Value* tokens) = 0;
3939
virtual void remove_host_and_build(const Host::Ptr& host) = 0;
40-
virtual void clear_hosts() = 0;
40+
virtual void clear_tokens_and_hosts() = 0;
4141

4242
virtual void add_keyspaces(const VersionNumber& cassandra_version, ResultResponse* result) = 0;
4343
virtual void update_keyspaces_and_build(const VersionNumber& cassandra_version, ResultResponse* result) = 0;
4444
virtual void drop_keyspace(const std::string& keyspace_name) = 0;
45-
virtual void clear_keyspaces() = 0;
45+
virtual void clear_replicas_and_strategies() = 0;
4646

4747
virtual void build() = 0;
4848

src/token_map_impl.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,12 +527,12 @@ class TokenMapImpl : public TokenMap {
527527
virtual void add_host(const Host::Ptr& host, const Value* tokens);
528528
virtual void update_host_and_build(const Host::Ptr& host, const Value* tokens);
529529
virtual void remove_host_and_build(const Host::Ptr& host);
530-
virtual void clear_hosts();
530+
virtual void clear_tokens_and_hosts();
531531

532532
virtual void add_keyspaces(const VersionNumber& cassandra_version, ResultResponse* result);
533533
virtual void update_keyspaces_and_build(const VersionNumber& cassandra_version, ResultResponse* result);
534534
virtual void drop_keyspace(const std::string& keyspace_name);
535-
virtual void clear_keyspaces();
535+
virtual void clear_replicas_and_strategies();
536536

537537
virtual void build();
538538

@@ -626,7 +626,7 @@ void TokenMapImpl<Partitioner>::remove_host_and_build(const Host::Ptr& host) {
626626
}
627627

628628
template <class Partitioner>
629-
void TokenMapImpl<Partitioner>::clear_hosts() {
629+
void TokenMapImpl<Partitioner>::clear_tokens_and_hosts() {
630630
tokens_.clear();
631631
hosts_.clear();
632632
}
@@ -650,7 +650,7 @@ void TokenMapImpl<Partitioner>::drop_keyspace(const std::string& keyspace_name)
650650
}
651651

652652
template <class Partitioner>
653-
void TokenMapImpl<Partitioner>::clear_keyspaces() {
653+
void TokenMapImpl<Partitioner>::clear_replicas_and_strategies() {
654654
replicas_.clear();
655655
strategies_.clear();
656656
}

0 commit comments

Comments
 (0)