@@ -112,7 +112,8 @@ ControlConnection::ControlConnection()
112112 , session_(NULL )
113113 , connection_(NULL )
114114 , protocol_version_(0 )
115- , should_query_tokens_(false ) {}
115+ , use_schema_(false )
116+ , token_aware_routing_(false ) { }
116117
117118const SharedRefPtr<Host>& ControlConnection::connected_host () const {
118119 return current_host_;
@@ -126,19 +127,21 @@ void ControlConnection::clear() {
126127 query_plan_.reset ();
127128 protocol_version_ = 0 ;
128129 last_connection_error_.clear ();
129- should_query_tokens_ = false ;
130+ use_schema_ = false ;
131+ token_aware_routing_ = false ;
130132}
131133
132134void ControlConnection::connect (Session* session) {
133135 session_ = session;
134136 query_plan_.reset (new ControlStartupQueryPlan (session_->hosts_ )); // No hosts lock necessary (read-only)
135137 protocol_version_ = session_->config ().protocol_version ();
136- should_query_tokens_ = session_->config ().token_aware_routing ();
138+ use_schema_ = session_->config ().use_schema ();
139+ token_aware_routing_ = session_->config ().token_aware_routing ();
137140 if (protocol_version_ < 0 ) {
138141 protocol_version_ = CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION;
139142 }
140143
141- if (session_-> config (). use_schema () ) {
144+ if (use_schema_ || token_aware_routing_ ) {
142145 set_event_types (CASS_EVENT_TOPOLOGY_CHANGE | CASS_EVENT_STATUS_CHANGE |
143146 CASS_EVENT_SCHEMA_CHANGE);
144147 } else {
@@ -325,10 +328,17 @@ void ControlConnection::on_event(EventResponse* response) {
325328 }
326329
327330 case CASS_EVENT_SCHEMA_CHANGE:
331+ // Only handle keyspace events when using token-aware routing
332+ if (!use_schema_ &&
333+ response->schema_change_target () != EventResponse::KEYSPACE) {
334+ return ;
335+ }
336+
328337 LOG_DEBUG (" Schema change (%d): %.*s %.*s\n " ,
329338 response->schema_change (),
330339 (int )response->keyspace ().size (), response->keyspace ().data (),
331340 (int )response->target ().size (), response->target ().data ());
341+
332342 switch (response->schema_change ()) {
333343 case EventResponse::CREATED:
334344 case EventResponse::UPDATED:
@@ -390,8 +400,8 @@ void ControlConnection::on_event(EventResponse* response) {
390400void ControlConnection::query_meta_hosts () {
391401 ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler (
392402 new ControlMultipleRequestHandler<UnusedData>(this , ControlConnection::on_query_hosts, UnusedData ()));
393- handler->execute_query (" local" , SELECT_LOCAL_TOKENS);
394- handler->execute_query (" peers" , SELECT_PEERS_TOKENS);
403+ handler->execute_query (" local" , token_aware_routing_ ? SELECT_LOCAL_TOKENS : SELECT_LOCAL );
404+ handler->execute_query (" peers" , token_aware_routing_ ? SELECT_PEERS_TOKENS : SELECT_PEERS );
395405}
396406
397407void ControlConnection::on_query_hosts (ControlConnection* control_connection,
@@ -472,7 +482,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
472482
473483 session->purge_hosts (is_initial_connection);
474484
475- if (session->config ().use_schema ()) {
485+ if (control_connection->use_schema_ ||
486+ control_connection->token_aware_routing_ ) {
476487 control_connection->query_meta_schema ();
477488 } else if (is_initial_connection) {
478489 control_connection->state_ = CONTROL_STATE_READY;
@@ -490,24 +501,32 @@ void ControlConnection::query_meta_schema() {
490501 new ControlMultipleRequestHandler<UnusedData>(this , ControlConnection::on_query_meta_schema, UnusedData ()));
491502
492503 if (cassandra_version_ >= VersionNumber (3 , 0 , 0 )) {
493- handler->execute_query (" keyspaces" , SELECT_KEYSPACES_30);
494- handler->execute_query (" tables" , SELECT_TABLES_30);
495- handler->execute_query (" views" , SELECT_VIEWS_30);
496- handler->execute_query (" columns" , SELECT_COLUMNS_30);
497- handler->execute_query (" indexes" , SELECT_INDEXES_30);
498- handler->execute_query (" user_types" , SELECT_USERTYPES_30);
499- handler->execute_query (" functions" , SELECT_FUNCTIONS_30);
500- handler->execute_query (" aggregates" , SELECT_AGGREGATES_30);
504+ if (use_schema_ || token_aware_routing_) {
505+ handler->execute_query (" keyspaces" , SELECT_KEYSPACES_30);
506+ }
507+ if (use_schema_) {
508+ handler->execute_query (" tables" , SELECT_TABLES_30);
509+ handler->execute_query (" views" , SELECT_VIEWS_30);
510+ handler->execute_query (" columns" , SELECT_COLUMNS_30);
511+ handler->execute_query (" indexes" , SELECT_INDEXES_30);
512+ handler->execute_query (" user_types" , SELECT_USERTYPES_30);
513+ handler->execute_query (" functions" , SELECT_FUNCTIONS_30);
514+ handler->execute_query (" aggregates" , SELECT_AGGREGATES_30);
515+ }
501516 } else {
502- handler->execute_query (" keyspaces" , SELECT_KEYSPACES_20);
503- handler->execute_query (" tables" , SELECT_COLUMN_FAMILIES_20);
504- handler->execute_query (" columns" , SELECT_COLUMNS_20);
505- if (cassandra_version_ >= VersionNumber (2 , 1 , 0 )) {
506- handler->execute_query (" user_types" , SELECT_USERTYPES_21);
517+ if (use_schema_ || token_aware_routing_) {
518+ handler->execute_query (" keyspaces" , SELECT_KEYSPACES_20);
507519 }
508- if (cassandra_version_ >= VersionNumber (2 , 2 , 0 )) {
509- handler->execute_query (" functions" , SELECT_FUNCTIONS_22);
510- handler->execute_query (" aggregates" , SELECT_AGGREGATES_22);
520+ if (use_schema_) {
521+ handler->execute_query (" tables" , SELECT_COLUMN_FAMILIES_20);
522+ handler->execute_query (" columns" , SELECT_COLUMNS_20);
523+ if (cassandra_version_ >= VersionNumber (2 , 1 , 0 )) {
524+ handler->execute_query (" user_types" , SELECT_USERTYPES_21);
525+ }
526+ if (cassandra_version_ >= VersionNumber (2 , 2 , 0 )) {
527+ handler->execute_query (" functions" , SELECT_FUNCTIONS_22);
528+ handler->execute_query (" aggregates" , SELECT_AGGREGATES_22);
529+ }
511530 }
512531 }
513532}
@@ -524,58 +543,63 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti
524543 int protocol_version = control_connection->protocol_version_ ;
525544 const VersionNumber& cassandra_version = control_connection->cassandra_version_ ;
526545
546+ bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
547+
527548 if (session->token_map_ ) {
528549 session->token_map_ ->clear_keyspaces ();
550+
551+ ResultResponse* keyspaces_result;
552+ if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
553+ session->token_map_ ->add_keyspaces (cassandra_version, keyspaces_result);
554+ }
529555 }
530556
531- session->metadata ().clear_and_update_back (cassandra_version);
557+ if (control_connection->use_schema_ ) {
558+ session->metadata ().clear_and_update_back (cassandra_version);
532559
533- bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
560+ ResultResponse* keyspaces_result;
561+ if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
562+ session->metadata ().update_keyspaces (protocol_version, cassandra_version, keyspaces_result);
563+ }
534564
535- ResultResponse* keyspaces_result;
536- if (MultipleRequestHandler::get_result_response (responses, " keyspaces" , &keyspaces_result)) {
537- if (session->token_map_ ) {
538- session->token_map_ ->add_keyspaces (cassandra_version, keyspaces_result);
565+ ResultResponse* tables_result;
566+ if (MultipleRequestHandler::get_result_response (responses, " tables" , &tables_result)) {
567+ session->metadata ().update_tables (protocol_version, cassandra_version, tables_result);
539568 }
540- session->metadata ().update_keyspaces (protocol_version, cassandra_version, keyspaces_result);
541- }
542569
543- ResultResponse* tables_result ;
544- if (MultipleRequestHandler::get_result_response (responses, " tables " , &tables_result )) {
545- session->metadata ().update_tables (protocol_version, cassandra_version, tables_result );
546- }
570+ ResultResponse* views_result ;
571+ if (MultipleRequestHandler::get_result_response (responses, " views " , &views_result )) {
572+ session->metadata ().update_views (protocol_version, cassandra_version, views_result );
573+ }
547574
548- ResultResponse* views_result ;
549- if (MultipleRequestHandler::get_result_response (responses, " views " , &views_result )) {
550- session->metadata ().update_views (protocol_version, cassandra_version, views_result );
551- }
575+ ResultResponse* columns_result = NULL ;
576+ if (MultipleRequestHandler::get_result_response (responses, " columns " , &columns_result )) {
577+ session->metadata ().update_columns (protocol_version, cassandra_version, columns_result );
578+ }
552579
553- ResultResponse* columns_result = NULL ;
554- if (MultipleRequestHandler::get_result_response (responses, " columns " , &columns_result )) {
555- session->metadata ().update_columns (protocol_version, cassandra_version, columns_result );
556- }
580+ ResultResponse* indexes_result ;
581+ if (MultipleRequestHandler::get_result_response (responses, " indexes " , &indexes_result )) {
582+ session->metadata ().update_indexes (protocol_version, cassandra_version, indexes_result );
583+ }
557584
558- ResultResponse* indexes_result ;
559- if (MultipleRequestHandler::get_result_response (responses, " indexes " , &indexes_result )) {
560- session->metadata ().update_indexes (protocol_version, cassandra_version, indexes_result );
561- }
585+ ResultResponse* user_types_result ;
586+ if (MultipleRequestHandler::get_result_response (responses, " user_types " , &user_types_result )) {
587+ session->metadata ().update_user_types (protocol_version, cassandra_version, user_types_result );
588+ }
562589
563- ResultResponse* user_types_result ;
564- if (MultipleRequestHandler::get_result_response (responses, " user_types " , &user_types_result )) {
565- session->metadata ().update_user_types (protocol_version, cassandra_version, user_types_result );
566- }
590+ ResultResponse* functions_result ;
591+ if (MultipleRequestHandler::get_result_response (responses, " functions " , &functions_result )) {
592+ session->metadata ().update_functions (protocol_version, cassandra_version, functions_result );
593+ }
567594
568- ResultResponse* functions_result ;
569- if (MultipleRequestHandler::get_result_response (responses, " functions " , &functions_result )) {
570- session->metadata ().update_functions (protocol_version, cassandra_version, functions_result );
571- }
595+ ResultResponse* aggregates_result ;
596+ if (MultipleRequestHandler::get_result_response (responses, " aggregates " , &aggregates_result )) {
597+ session->metadata ().update_aggregates (protocol_version, cassandra_version, aggregates_result );
598+ }
572599
573- ResultResponse* aggregates_result;
574- if (MultipleRequestHandler::get_result_response (responses, " aggregates" , &aggregates_result)) {
575- session->metadata ().update_aggregates (protocol_version, cassandra_version, aggregates_result);
600+ session->metadata ().swap_to_back_and_update_front ();
576601 }
577602
578- session->metadata ().swap_to_back_and_update_front ();
579603 if (session->token_map_ ) {
580604 session->token_map_ ->build ();
581605 }
@@ -601,7 +625,7 @@ void ControlConnection::refresh_node_info(SharedRefPtr<Host> host,
601625 std::string query;
602626 ControlHandler<RefreshNodeData>::ResponseCallback response_callback;
603627
604- bool token_query = should_query_tokens_ && (host->was_just_added () || query_tokens);
628+ bool token_query = token_aware_routing_ && (host->was_just_added () || query_tokens);
605629 if (is_connected_host || !host->listen_address ().empty ()) {
606630 if (is_connected_host) {
607631 query.assign (token_query ? SELECT_LOCAL_TOKENS : SELECT_LOCAL);
@@ -739,7 +763,7 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
739763 host->address ().to_string ().c_str ());
740764 }
741765
742- if (should_query_tokens_ ) {
766+ if (token_aware_routing_ ) {
743767 bool is_connected_host = connection_ != NULL && host->address ().compare (connection_->address ()) == 0 ;
744768 std::string partitioner;
745769 if (is_connected_host && row->get_string_by_name (" partitioner" , &partitioner)) {
@@ -798,7 +822,10 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
798822 if (session->token_map_ ) {
799823 session->token_map_ ->update_keyspaces_and_build (cassandra_version, result);
800824 }
801- session->metadata ().update_keyspaces (protocol_version, cassandra_version, result);
825+
826+ if (control_connection->use_schema_ ) {
827+ session->metadata ().update_keyspaces (protocol_version, cassandra_version, result);
828+ }
802829}
803830
804831void ControlConnection::refresh_table_or_view (const StringRef& keyspace_name,
0 commit comments