@@ -112,7 +112,8 @@ ControlConnection::ControlConnection()
112
112
, session_(NULL )
113
113
, connection_(NULL )
114
114
, protocol_version_(0 )
115
- , should_query_tokens_(false ) {}
115
+ , use_schema_(false )
116
+ , token_aware_routing_(false ) { }
116
117
117
118
const SharedRefPtr<Host>& ControlConnection::connected_host () const {
118
119
return current_host_;
@@ -126,19 +127,21 @@ void ControlConnection::clear() {
126
127
query_plan_.reset ();
127
128
protocol_version_ = 0 ;
128
129
last_connection_error_.clear ();
129
- should_query_tokens_ = false ;
130
+ use_schema_ = false ;
131
+ token_aware_routing_ = false ;
130
132
}
131
133
132
134
void ControlConnection::connect (Session* session) {
133
135
session_ = session;
134
136
query_plan_.reset (new ControlStartupQueryPlan (session_->hosts_ )); // No hosts lock necessary (read-only)
135
137
protocol_version_ = session_->config ().protocol_version ();
136
- should_query_tokens_ = session_->config ().token_aware_routing ();
138
+ use_schema_ = session_->config ().use_schema ();
139
+ token_aware_routing_ = session_->config ().token_aware_routing ();
137
140
if (protocol_version_ < 0 ) {
138
141
protocol_version_ = CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION;
139
142
}
140
143
141
- if (session_-> config (). use_schema () ) {
144
+ if (use_schema_ || token_aware_routing_ ) {
142
145
set_event_types (CASS_EVENT_TOPOLOGY_CHANGE | CASS_EVENT_STATUS_CHANGE |
143
146
CASS_EVENT_SCHEMA_CHANGE);
144
147
} else {
@@ -325,10 +328,17 @@ void ControlConnection::on_event(EventResponse* response) {
325
328
}
326
329
327
330
case CASS_EVENT_SCHEMA_CHANGE:
331
+ // Only handle keyspace events when using token-aware routing
332
+ if (!use_schema_ &&
333
+ response->schema_change_target () != EventResponse::KEYSPACE) {
334
+ return ;
335
+ }
336
+
328
337
LOG_DEBUG (" Schema change (%d): %.*s %.*s\n " ,
329
338
response->schema_change (),
330
339
(int )response->keyspace ().size (), response->keyspace ().data (),
331
340
(int )response->target ().size (), response->target ().data ());
341
+
332
342
switch (response->schema_change ()) {
333
343
case EventResponse::CREATED:
334
344
case EventResponse::UPDATED:
@@ -390,8 +400,8 @@ void ControlConnection::on_event(EventResponse* response) {
390
400
void ControlConnection::query_meta_hosts () {
391
401
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler (
392
402
new ControlMultipleRequestHandler<UnusedData>(this , ControlConnection::on_query_hosts, UnusedData ()));
393
- handler->execute_query (" local" , SELECT_LOCAL_TOKENS);
394
- handler->execute_query (" peers" , SELECT_PEERS_TOKENS);
403
+ handler->execute_query (" local" , token_aware_routing_ ? SELECT_LOCAL_TOKENS : SELECT_LOCAL );
404
+ handler->execute_query (" peers" , token_aware_routing_ ? SELECT_PEERS_TOKENS : SELECT_PEERS );
395
405
}
396
406
397
407
void ControlConnection::on_query_hosts (ControlConnection* control_connection,
@@ -472,7 +482,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
472
482
473
483
session->purge_hosts (is_initial_connection);
474
484
475
- if (session->config ().use_schema ()) {
485
+ if (control_connection->use_schema_ ||
486
+ control_connection->token_aware_routing_ ) {
476
487
control_connection->query_meta_schema ();
477
488
} else if (is_initial_connection) {
478
489
control_connection->state_ = CONTROL_STATE_READY;
@@ -490,24 +501,32 @@ void ControlConnection::query_meta_schema() {
490
501
new ControlMultipleRequestHandler<UnusedData>(this , ControlConnection::on_query_meta_schema, UnusedData ()));
491
502
492
503
if (cassandra_version_ >= VersionNumber (3 , 0 , 0 )) {
493
- handler->execute_query (" keyspaces" , SELECT_KEYSPACES_30);
494
- handler->execute_query (" tables" , SELECT_TABLES_30);
495
- handler->execute_query (" views" , SELECT_VIEWS_30);
496
- handler->execute_query (" columns" , SELECT_COLUMNS_30);
497
- handler->execute_query (" indexes" , SELECT_INDEXES_30);
498
- handler->execute_query (" user_types" , SELECT_USERTYPES_30);
499
- handler->execute_query (" functions" , SELECT_FUNCTIONS_30);
500
- handler->execute_query (" aggregates" , SELECT_AGGREGATES_30);
504
+ if (use_schema_ || token_aware_routing_) {
505
+ handler->execute_query (" keyspaces" , SELECT_KEYSPACES_30);
506
+ }
507
+ if (use_schema_) {
508
+ handler->execute_query (" tables" , SELECT_TABLES_30);
509
+ handler->execute_query (" views" , SELECT_VIEWS_30);
510
+ handler->execute_query (" columns" , SELECT_COLUMNS_30);
511
+ handler->execute_query (" indexes" , SELECT_INDEXES_30);
512
+ handler->execute_query (" user_types" , SELECT_USERTYPES_30);
513
+ handler->execute_query (" functions" , SELECT_FUNCTIONS_30);
514
+ handler->execute_query (" aggregates" , SELECT_AGGREGATES_30);
515
+ }
501
516
} else {
502
- handler->execute_query (" keyspaces" , SELECT_KEYSPACES_20);
503
- handler->execute_query (" tables" , SELECT_COLUMN_FAMILIES_20);
504
- handler->execute_query (" columns" , SELECT_COLUMNS_20);
505
- if (cassandra_version_ >= VersionNumber (2 , 1 , 0 )) {
506
- handler->execute_query (" user_types" , SELECT_USERTYPES_21);
517
+ if (use_schema_ || token_aware_routing_) {
518
+ handler->execute_query (" keyspaces" , SELECT_KEYSPACES_20);
507
519
}
508
- if (cassandra_version_ >= VersionNumber (2 , 2 , 0 )) {
509
- handler->execute_query (" functions" , SELECT_FUNCTIONS_22);
510
- handler->execute_query (" aggregates" , SELECT_AGGREGATES_22);
520
+ if (use_schema_) {
521
+ handler->execute_query (" tables" , SELECT_COLUMN_FAMILIES_20);
522
+ handler->execute_query (" columns" , SELECT_COLUMNS_20);
523
+ if (cassandra_version_ >= VersionNumber (2 , 1 , 0 )) {
524
+ handler->execute_query (" user_types" , SELECT_USERTYPES_21);
525
+ }
526
+ if (cassandra_version_ >= VersionNumber (2 , 2 , 0 )) {
527
+ handler->execute_query (" functions" , SELECT_FUNCTIONS_22);
528
+ handler->execute_query (" aggregates" , SELECT_AGGREGATES_22);
529
+ }
511
530
}
512
531
}
513
532
}
@@ -524,58 +543,63 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
524
543
int protocol_version = control_connection->protocol_version_ ;
525
544
const VersionNumber& cassandra_version = control_connection->cassandra_version_ ;
526
545
546
+ bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
547
+
527
548
if (session->token_map_ ) {
528
549
session->token_map_ ->clear_keyspaces ();
550
+
551
+ ResultResponse* keyspaces_result;
552
+ if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
553
+ session->token_map_ ->add_keyspaces (cassandra_version, keyspaces_result);
554
+ }
529
555
}
530
556
531
- session->metadata ().clear_and_update_back (cassandra_version);
557
+ if (control_connection->use_schema_ ) {
558
+ session->metadata ().clear_and_update_back (cassandra_version);
532
559
533
- bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
560
+ ResultResponse* keyspaces_result;
561
+ if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
562
+ session->metadata ().update_keyspaces (protocol_version, cassandra_version, keyspaces_result);
563
+ }
534
564
535
- ResultResponse* keyspaces_result;
536
- if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
537
- if (session->token_map_ ) {
538
- session->token_map_ ->add_keyspaces (cassandra_version, keyspaces_result);
565
+ ResultResponse* tables_result;
566
+ if (MultipleRequestHandler::get_result_response (responses, " tables" , &tables_result)) {
567
+ session->metadata ().update_tables (protocol_version, cassandra_version, tables_result);
539
568
}
540
- session->metadata ().update_keyspaces (protocol_version, cassandra_version, keyspaces_result);
541
- }
542
569
543
- ResultResponse* tables_result ;
544
- if (MultipleRequestHandler::get_result_response (responses, " tables " , &tables_result )) {
545
- session->metadata ().update_tables (protocol_version, cassandra_version, tables_result );
546
- }
570
+ ResultResponse* views_result ;
571
+ if (MultipleRequestHandler::get_result_response (responses, " views " , &views_result )) {
572
+ session->metadata ().update_views (protocol_version, cassandra_version, views_result );
573
+ }
547
574
548
- ResultResponse* views_result ;
549
- if (MultipleRequestHandler::get_result_response (responses, " views " , &views_result )) {
550
- session->metadata ().update_views (protocol_version, cassandra_version, views_result );
551
- }
575
+ ResultResponse* columns_result = NULL ;
576
+ if (MultipleRequestHandler::get_result_response (responses, " columns " , &columns_result )) {
577
+ session->metadata ().update_columns (protocol_version, cassandra_version, columns_result );
578
+ }
552
579
553
- ResultResponse* columns_result = NULL ;
554
- if (MultipleRequestHandler::get_result_response (responses, " columns " , &columns_result )) {
555
- session->metadata ().update_columns (protocol_version, cassandra_version, columns_result );
556
- }
580
+ ResultResponse* indexes_result ;
581
+ if (MultipleRequestHandler::get_result_response (responses, " indexes " , &indexes_result )) {
582
+ session->metadata ().update_indexes (protocol_version, cassandra_version, indexes_result );
583
+ }
557
584
558
- ResultResponse* indexes_result ;
559
- if (MultipleRequestHandler::get_result_response (responses, " indexes " , &indexes_result )) {
560
- session->metadata ().update_indexes (protocol_version, cassandra_version, indexes_result );
561
- }
585
+ ResultResponse* user_types_result ;
586
+ if (MultipleRequestHandler::get_result_response (responses, " user_types " , &user_types_result )) {
587
+ session->metadata ().update_user_types (protocol_version, cassandra_version, user_types_result );
588
+ }
562
589
563
- ResultResponse* user_types_result ;
564
- if (MultipleRequestHandler::get_result_response (responses, " user_types " , &user_types_result )) {
565
- session->metadata ().update_user_types (protocol_version, cassandra_version, user_types_result );
566
- }
590
+ ResultResponse* functions_result ;
591
+ if (MultipleRequestHandler::get_result_response (responses, " functions " , &functions_result )) {
592
+ session->metadata ().update_functions (protocol_version, cassandra_version, functions_result );
593
+ }
567
594
568
- ResultResponse* functions_result ;
569
- if (MultipleRequestHandler::get_result_response (responses, " functions " , &functions_result )) {
570
- session->metadata ().update_functions (protocol_version, cassandra_version, functions_result );
571
- }
595
+ ResultResponse* aggregates_result ;
596
+ if (MultipleRequestHandler::get_result_response (responses, " aggregates " , &aggregates_result )) {
597
+ session->metadata ().update_aggregates (protocol_version, cassandra_version, aggregates_result );
598
+ }
572
599
573
- ResultResponse* aggregates_result;
574
- if (MultipleRequestHandler::get_result_response (responses, " aggregates" , &aggregates_result)) {
575
- session->metadata ().update_aggregates (protocol_version, cassandra_version, aggregates_result);
600
+ session->metadata ().swap_to_back_and_update_front ();
576
601
}
577
602
578
- session->metadata ().swap_to_back_and_update_front ();
579
603
if (session->token_map_ ) {
580
604
session->token_map_ ->build ();
581
605
}
@@ -601,7 +625,7 @@ void ControlConnection::refresh_node_info(SharedRefPtr<Host> host,
601
625
std::string query;
602
626
ControlHandler<RefreshNodeData>::ResponseCallback response_callback;
603
627
604
- bool token_query = should_query_tokens_ && (host->was_just_added () || query_tokens);
628
+ bool token_query = token_aware_routing_ && (host->was_just_added () || query_tokens);
605
629
if (is_connected_host || !host->listen_address ().empty ()) {
606
630
if (is_connected_host) {
607
631
query.assign (token_query ? SELECT_LOCAL_TOKENS : SELECT_LOCAL);
@@ -739,7 +763,7 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
739
763
host->address ().to_string ().c_str ());
740
764
}
741
765
742
- if (should_query_tokens_ ) {
766
+ if (token_aware_routing_ ) {
743
767
bool is_connected_host = connection_ != NULL && host->address ().compare (connection_->address ()) == 0 ;
744
768
std::string partitioner;
745
769
if (is_connected_host && row->get_string_by_name (" partitioner" , &partitioner)) {
@@ -798,7 +822,10 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
798
822
if (session->token_map_ ) {
799
823
session->token_map_ ->update_keyspaces_and_build (cassandra_version, result);
800
824
}
801
- session->metadata ().update_keyspaces (protocol_version, cassandra_version, result);
825
+
826
+ if (control_connection->use_schema_ ) {
827
+ session->metadata ().update_keyspaces (protocol_version, cassandra_version, result);
828
+ }
802
829
}
803
830
804
831
void ControlConnection::refresh_table_or_view (const StringRef& keyspace_name,
0 commit comments