From 7fd547fec3d85967d0c2d3fbd760c54bd986a608 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Tue, 23 Feb 2016 11:33:50 -0700 Subject: [PATCH 1/4] Use string instead of integer index for multiple query responses --- src/control_connection.cpp | 165 +++++++++++++++++-------------- src/control_connection.hpp | 10 +- src/multiple_request_handler.cpp | 18 +++- src/multiple_request_handler.hpp | 17 ++-- src/schema_change_handler.cpp | 61 ++++++------ src/schema_change_handler.hpp | 4 +- 6 files changed, 156 insertions(+), 119 deletions(-) diff --git a/src/control_connection.cpp b/src/control_connection.cpp index f2f4a12a6..32f262be5 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -369,13 +369,13 @@ void ControlConnection::on_event(EventResponse* response) { void ControlConnection::query_meta_hosts() { ScopedRefPtr > handler( new ControlMultipleRequestHandler(this, ControlConnection::on_query_hosts, UnusedData())); - handler->execute_query(SELECT_LOCAL_TOKENS); - handler->execute_query(SELECT_PEERS_TOKENS); + handler->execute_query("local", SELECT_LOCAL_TOKENS); + handler->execute_query("peers", SELECT_PEERS_TOKENS); } void ControlConnection::on_query_hosts(ControlConnection* control_connection, const UnusedData& data, - const MultipleRequestHandler::ResponseVec& responses) { + const MultipleRequestHandler::ResponseMap& responses) { Connection* connection = control_connection->connection_; if (connection == NULL) { return; @@ -395,10 +395,9 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection, if (host) { host->set_mark(session->current_host_mark_); - ResultResponse* local_result = - static_cast(responses[0].get()); - - if (local_result->row_count() > 0) { + ResultResponse* local_result; + if (MultipleRequestHandler::get_result_response(responses, "local", &local_result) && + local_result->row_count() > 0) { local_result->decode_first_row(); control_connection->update_node_info(host, &local_result->first_row()); session->metadata().set_cassandra_version(host->cassandra_version()); @@ -417,32 +416,33 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection, } { - ResultResponse* peers_result = - static_cast(responses[1].get()); - peers_result->decode_first_row(); - ResultIterator rows(peers_result); - while (rows.next()) { - Address address; - const Row* row = rows.row(); - if (!determine_address_for_peer_host(connection->address(), - row->get_by_name("peer"), - row->get_by_name("rpc_address"), - &address)) { - continue; - } + ResultResponse* peers_result; + if (MultipleRequestHandler::get_result_response(responses, "peers", &peers_result)) { + peers_result->decode_first_row(); + ResultIterator rows(peers_result); + while (rows.next()) { + Address address; + const Row* row = rows.row(); + if (!determine_address_for_peer_host(connection->address(), + row->get_by_name("peer"), + row->get_by_name("rpc_address"), + &address)) { + continue; + } - SharedRefPtr host = session->get_host(address); - bool is_new = false; - if (!host) { - is_new = true; - host = session->add_host(address); - } + SharedRefPtr host = session->get_host(address); + bool is_new = false; + if (!host) { + is_new = true; + host = session->add_host(address); + } - host->set_mark(session->current_host_mark_); + host->set_mark(session->current_host_mark_); - control_connection->update_node_info(host, rows.row()); - if (is_new && !is_initial_connection) { - session->on_add(host, false); + control_connection->update_node_info(host, rows.row()); + if (is_new && !is_initial_connection) { + session->on_add(host, false); + } } } } @@ -467,29 +467,29 @@ void ControlConnection::query_meta_schema() { new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_schema, UnusedData())); if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) { - handler->execute_query(SELECT_KEYSPACES_30); - handler->execute_query(SELECT_TABLES_30); - handler->execute_query(SELECT_COLUMNS_30); - handler->execute_query(SELECT_USERTYPES_30); - handler->execute_query(SELECT_FUNCTIONS_30); - handler->execute_query(SELECT_AGGREGATES_30); + handler->execute_query("keyspaces", SELECT_KEYSPACES_30); + handler->execute_query("tables", SELECT_TABLES_30); + handler->execute_query("columns", SELECT_COLUMNS_30); + handler->execute_query("user_types", SELECT_USERTYPES_30); + handler->execute_query("functions", SELECT_FUNCTIONS_30); + handler->execute_query("aggregates", SELECT_AGGREGATES_30); } else { - handler->execute_query(SELECT_KEYSPACES_20); - handler->execute_query(SELECT_COLUMN_FAMILIES_20); - handler->execute_query(SELECT_COLUMNS_20); + handler->execute_query("keyspaces", SELECT_KEYSPACES_20); + handler->execute_query("tables", SELECT_COLUMN_FAMILIES_20); + handler->execute_query("columns", SELECT_COLUMNS_20); if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) { - handler->execute_query(SELECT_USERTYPES_21); + handler->execute_query("user_types", SELECT_USERTYPES_21); } if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) { - handler->execute_query(SELECT_FUNCTIONS_22); - handler->execute_query(SELECT_AGGREGATES_22); + handler->execute_query("functions", SELECT_FUNCTIONS_22); + handler->execute_query("aggregates", SELECT_AGGREGATES_22); } } } void ControlConnection::on_query_meta_schema(ControlConnection* control_connection, const UnusedData& unused, - const MultipleRequestHandler::ResponseVec& responses) { + const MultipleRequestHandler::ResponseMap& responses) { Connection* connection = control_connection->connection_; if (connection == NULL) { return; @@ -501,17 +501,31 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW); - session->metadata().update_keyspaces(static_cast(responses[0].get())); - session->metadata().update_tables(static_cast(responses[1].get()), - static_cast(responses[2].get())); + ResultResponse* keyspaces_result; + if (MultipleRequestHandler::get_result_response(responses, "keyspaces", &keyspaces_result)) { + session->metadata().update_keyspaces(keyspaces_result); + } + + ResultResponse* tables_result; + if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) { + ResultResponse* columns_result = NULL; + MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); + session->metadata().update_tables(tables_result, columns_result); + } + + ResultResponse* user_types_result; + if (MultipleRequestHandler::get_result_response(responses, "user_types", &user_types_result)) { + session->metadata().update_user_types(user_types_result); + } - if (session->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) { - session->metadata().update_user_types(static_cast(responses[3].get())); + ResultResponse* functions_result; + if (MultipleRequestHandler::get_result_response(responses, "functions", &functions_result)) { + session->metadata().update_functions(functions_result); } - if (session->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) { - session->metadata().update_functions(static_cast(responses[4].get())); - session->metadata().update_aggregates(static_cast(responses[5].get())); + ResultResponse* aggregates_result; + if (MultipleRequestHandler::get_result_response(responses, "aggregates", &aggregates_result)) { + session->metadata().update_aggregates(aggregates_result); } session->metadata().swap_to_back_and_update_front(); @@ -733,50 +747,55 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio void ControlConnection::refresh_table(const StringRef& keyspace_name, const StringRef& table_name) { - std::string cf_query; - std::string col_query; + std::string table_query; + std::string column_query; if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) { - cf_query.assign(SELECT_TABLES_30); - cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) + table_query.assign(SELECT_TABLES_30); + table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'"); - col_query.assign(SELECT_COLUMNS_30); - col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) + column_query.assign(SELECT_COLUMNS_30); + column_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'"); + + LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str()); } else { - cf_query.assign(SELECT_COLUMN_FAMILIES_20); - cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) + table_query.assign(SELECT_COLUMN_FAMILIES_20); + table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'"); - col_query.assign(SELECT_COLUMNS_20); - col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) + column_query.assign(SELECT_COLUMNS_20); + column_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'"); - } - LOG_DEBUG("Refreshing table %s; %s", cf_query.c_str(), col_query.c_str()); + LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str()); + } ScopedRefPtr > handler( new ControlMultipleRequestHandler(this, ControlConnection::on_refresh_table, RefreshTableData(keyspace_name.to_string(), table_name.to_string()))); - handler->execute_query(cf_query); - handler->execute_query(col_query); + handler->execute_query("tables", table_query); + handler->execute_query("columns", column_query); } void ControlConnection::on_refresh_table(ControlConnection* control_connection, const RefreshTableData& data, - const MultipleRequestHandler::ResponseVec& responses) { - ResultResponse* column_family_result = static_cast(responses[0].get()); - if (column_family_result->row_count() == 0) { + const MultipleRequestHandler::ResponseMap& responses) { + ResultResponse* tables_result; + if (!MultipleRequestHandler::get_result_response(responses, "tables", &tables_result) || + tables_result->row_count() == 0) { LOG_ERROR("No row found for column family %s.%s in system schema table.", data.keyspace_name.c_str(), data.table_name.c_str()); return; } + ResultResponse* columns_result = NULL; + MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); + Session* session = control_connection->session_; - session->metadata().update_tables(static_cast(responses[0].get()), - static_cast(responses[1].get())); + session->metadata().update_tables(tables_result, columns_result); } @@ -945,11 +964,11 @@ void ControlConnection::on_reconnect(Timer* timer) { template void ControlConnection::ControlMultipleRequestHandler::on_set( - const MultipleRequestHandler::ResponseVec& responses) { + const MultipleRequestHandler::ResponseMap& responses) { bool has_error = false; - for (MultipleRequestHandler::ResponseVec::const_iterator it = responses.begin(), + for (MultipleRequestHandler::ResponseMap::const_iterator it = responses.begin(), end = responses.end(); it != end; ++it) { - if (control_connection_->handle_query_invalid_response(it->get())) { + if (control_connection_->handle_query_invalid_response(it->second.get())) { has_error = true; } } diff --git a/src/control_connection.hpp b/src/control_connection.hpp index 472167de3..deb4c3731 100644 --- a/src/control_connection.hpp +++ b/src/control_connection.hpp @@ -71,7 +71,7 @@ class ControlConnection : public Connection::Listener { template class ControlMultipleRequestHandler : public MultipleRequestHandler { public: - typedef void (*ResponseCallback)(ControlConnection*, const T&, const MultipleRequestHandler::ResponseVec&); + typedef void (*ResponseCallback)(ControlConnection*, const T&, const MultipleRequestHandler::ResponseMap&); ControlMultipleRequestHandler(ControlConnection* control_connection, ResponseCallback response_callback, @@ -81,7 +81,7 @@ class ControlConnection : public Connection::Listener { , response_callback_(response_callback) , data_(data) {} - virtual void on_set(const MultipleRequestHandler::ResponseVec& responses); + virtual void on_set(const MultipleRequestHandler::ResponseMap& responses); virtual void on_error(CassError code, const std::string& message) { control_connection_->handle_query_failure(code, message); @@ -189,12 +189,12 @@ class ControlConnection : public Connection::Listener { void query_meta_hosts(); static void on_query_hosts(ControlConnection* control_connection, const UnusedData& data, - const MultipleRequestHandler::ResponseVec& responses); + const MultipleRequestHandler::ResponseMap& responses); void query_meta_schema(); static void on_query_meta_schema(ControlConnection* control_connection, const UnusedData& data, - const MultipleRequestHandler::ResponseVec& responses); + const MultipleRequestHandler::ResponseMap& responses); void refresh_node_info(SharedRefPtr host, bool is_new_node, @@ -215,7 +215,7 @@ class ControlConnection : public Connection::Listener { const StringRef& table_name); static void on_refresh_table(ControlConnection* control_connection, const RefreshTableData& data, - const MultipleRequestHandler::ResponseVec& responses); + const MultipleRequestHandler::ResponseMap& responses); void refresh_type(const StringRef& keyspace_name, const StringRef& type_name); diff --git a/src/multiple_request_handler.cpp b/src/multiple_request_handler.cpp index 694211d8b..6eb218db9 100644 --- a/src/multiple_request_handler.cpp +++ b/src/multiple_request_handler.cpp @@ -21,10 +21,22 @@ namespace cass { -void MultipleRequestHandler::execute_query(const std::string& query) { +bool MultipleRequestHandler::get_result_response(const ResponseMap& responses, + const std::string& index, + ResultResponse** response) { + ResponseMap::const_iterator it = responses.find(index); + if (it == responses.end() || it->second->opcode() != CQL_OPCODE_RESULT) { + return false; + } + *response = static_cast(it->second.get()); + return true; +} + +void MultipleRequestHandler::execute_query(const std::string& index, const std::string& query) { if (has_errors_or_timeouts_) return; - responses_.push_back(SharedRefPtr()); - SharedRefPtr handler(new InternalHandler(this, new QueryRequest(query), remaining_++)); + responses_[index] = SharedRefPtr(); + SharedRefPtr handler(new InternalHandler(this, new QueryRequest(query), index)); + remaining_++; if (!connection_->write(handler.get())) { on_error(CASS_ERROR_LIB_NO_STREAMS, "No more streams available"); } diff --git a/src/multiple_request_handler.hpp b/src/multiple_request_handler.hpp index b13357e35..d507a855c 100644 --- a/src/multiple_request_handler.hpp +++ b/src/multiple_request_handler.hpp @@ -28,10 +28,11 @@ namespace cass { class Connection; class Response; +class ResultResponse; class MultipleRequestHandler : public RefCounted { public: - typedef std::vector > ResponseVec; + typedef std::map > ResponseMap; MultipleRequestHandler(Connection* connection) : connection_(connection) @@ -40,9 +41,13 @@ class MultipleRequestHandler : public RefCounted { virtual ~MultipleRequestHandler() { } - void execute_query(const std::string& query); + static bool get_result_response(const ResponseMap& responses, + const std::string& index, + ResultResponse** response); - virtual void on_set(const ResponseVec& responses) = 0; + void execute_query(const std::string& index, const std::string& query); + + virtual void on_set(const ResponseMap& responses) = 0; virtual void on_error(CassError code, const std::string& message) = 0; virtual void on_timeout() = 0; @@ -53,7 +58,7 @@ class MultipleRequestHandler : public RefCounted { private: class InternalHandler : public Handler { public: - InternalHandler(MultipleRequestHandler* parent, const Request* request, int index) + InternalHandler(MultipleRequestHandler* parent, const Request* request, const std::string& index) : Handler(request) , parent_(parent) , index_(index) { } @@ -64,13 +69,13 @@ class MultipleRequestHandler : public RefCounted { private: ScopedRefPtr parent_; - int index_; + std::string index_; }; Connection* connection_; bool has_errors_or_timeouts_; int remaining_; - ResponseVec responses_; + ResponseMap responses_; }; } // namespace cass diff --git a/src/schema_change_handler.cpp b/src/schema_change_handler.cpp index fd1aa68e3..1f12e66ff 100644 --- a/src/schema_change_handler.cpp +++ b/src/schema_change_handler.cpp @@ -45,17 +45,17 @@ SchemaChangeHandler::SchemaChangeHandler(Connection* connection, , elapsed_ms_(elapsed) {} void SchemaChangeHandler::execute() { - execute_query("SELECT schema_version FROM system.local WHERE key='local'"); - execute_query("SELECT peer, rpc_address, schema_version FROM system.peers"); + execute_query("local", "SELECT schema_version FROM system.local WHERE key='local'"); + execute_query("peers", "SELECT peer, rpc_address, schema_version FROM system.peers"); } -bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) { +bool SchemaChangeHandler::has_schema_agreement(const ResponseMap& responses) { StringRef current_version; - ResultResponse* local_result = - static_cast(responses[0].get()); + ResultResponse* local_result; - if (local_result->row_count() > 0) { + if (MultipleRequestHandler::get_result_response(responses, "local", &local_result) && + local_result->row_count() > 0) { local_result->decode_first_row(); const Row* row = &local_result->first_row(); @@ -69,27 +69,28 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) { connection()->address_string().c_str()); } - ResultResponse* peers_result = - static_cast(responses[1].get()); - peers_result->decode_first_row(); - - ResultIterator rows(peers_result); - while (rows.next()) { - const Row* row = rows.row(); - - Address address; - bool is_valid_address - = ControlConnection::determine_address_for_peer_host(connection()->address(), - row->get_by_name("peer"), - row->get_by_name("rpc_address"), - &address); - - if (is_valid_address && request_handler_->is_host_up(address)) { - const Value* v = row->get_by_name("schema_version"); - if (!row->get_by_name("rpc_address")->is_null() && !v->is_null()) { - StringRef version(v->to_string_ref()); - if (version != current_version) { - return false; + ResultResponse* peers_result; + if (MultipleRequestHandler::get_result_response(responses, "peers", &peers_result)) { + peers_result->decode_first_row(); + + ResultIterator rows(peers_result); + while (rows.next()) { + const Row* row = rows.row(); + + Address address; + bool is_valid_address + = ControlConnection::determine_address_for_peer_host(connection()->address(), + row->get_by_name("peer"), + row->get_by_name("rpc_address"), + &address); + + if (is_valid_address && request_handler_->is_host_up(address)) { + const Value* v = row->get_by_name("schema_version"); + if (!row->get_by_name("rpc_address")->is_null() && !v->is_null()) { + StringRef version(v->to_string_ref()); + if (version != current_version) { + return false; + } } } } @@ -98,13 +99,13 @@ bool SchemaChangeHandler::has_schema_agreement(const ResponseVec& responses) { return true; } -void SchemaChangeHandler::on_set(const ResponseVec& responses) { +void SchemaChangeHandler::on_set(const ResponseMap& responses) { elapsed_ms_ += get_time_since_epoch_ms() - start_ms_; bool has_error = false; - for (MultipleRequestHandler::ResponseVec::const_iterator it = responses.begin(), + for (MultipleRequestHandler::ResponseMap::const_iterator it = responses.begin(), end = responses.end(); it != end; ++it) { - if (check_error_or_invalid_response("SchemaChangeHandler", CQL_OPCODE_RESULT, it->get())) { + if (check_error_or_invalid_response("SchemaChangeHandler", CQL_OPCODE_RESULT, it->second.get())) { has_error = true; } } diff --git a/src/schema_change_handler.hpp b/src/schema_change_handler.hpp index d6980b4c7..24d87788f 100644 --- a/src/schema_change_handler.hpp +++ b/src/schema_change_handler.hpp @@ -39,13 +39,13 @@ class SchemaChangeHandler : public MultipleRequestHandler { void execute(); - virtual void on_set(const ResponseVec& responses); + virtual void on_set(const ResponseMap& responses); virtual void on_error(CassError code, const std::string& message); virtual void on_timeout(); void on_closing(); private: - bool has_schema_agreement(const ResponseVec& responses); + bool has_schema_agreement(const ResponseMap& responses); ScopedRefPtr request_handler_; SharedRefPtr request_response_; From c760a8bc801a2d866b4cc8428a235d0bcb42e628 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Tue, 23 Feb 2016 09:28:25 -0700 Subject: [PATCH 2/4] Added support for secondary index metadata --- include/cassandra.h | 232 ++++++++++++++- src/control_connection.cpp | 20 +- src/external_types.hpp | 1 + src/host.hpp | 4 + src/metadata.cpp | 276 ++++++++++++++++-- src/metadata.hpp | 71 ++++- src/utils.cpp | 46 ++- src/utils.hpp | 2 + .../src/test_schema_metadata.cpp | 130 +++++++++ test/unit_tests/src/test_utils.cpp | 17 ++ 10 files changed, 754 insertions(+), 45 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 8d1153e8c..0c020f38a 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -353,6 +353,13 @@ typedef struct CassTableMeta_ CassTableMeta; */ typedef struct CassColumnMeta_ CassColumnMeta; +/** + * @struct CassIndexMeta + * + * Index metadata + */ +typedef struct CassIndexMeta_ CassIndexMeta; + /** * @struct CassUuidGen * @@ -468,6 +475,21 @@ typedef enum CassWriteType_ { XX(CASS_WRITE_TYPE_BATCH_LOG, "BATCH_LOG") \ XX(CASS_WRITE_TYPE_CAS, "CAS") +typedef enum CassColumnType_ { + CASS_COLUMN_TYPE_REGULAR, + CASS_COLUMN_TYPE_PARTITION_KEY, + CASS_COLUMN_TYPE_CLUSTERING_KEY, + CASS_COLUMN_TYPE_STATIC, + CASS_COLUMN_TYPE_COMPACT_VALUE +} CassColumnType; + +typedef enum CassIndexType_ { + CASS_INDEX_TYPE_UNKNOWN, + CASS_INDEX_TYPE_KEYS, + CASS_INDEX_TYPE_CUSTOM, + CASS_INDEX_TYPE_COMPOSITES +} CassIndexType; + typedef enum CassValueType_ { CASS_VALUE_TYPE_UNKNOWN = 0xFFFF, CASS_VALUE_TYPE_CUSTOM = 0x0000, @@ -503,14 +525,14 @@ typedef enum CassValueType_ { typedef enum CassCollectionType_ { CASS_COLLECTION_TYPE_LIST = CASS_VALUE_TYPE_LIST, - CASS_COLLECTION_TYPE_MAP = CASS_VALUE_TYPE_MAP, - CASS_COLLECTION_TYPE_SET = CASS_VALUE_TYPE_SET + CASS_COLLECTION_TYPE_MAP = CASS_VALUE_TYPE_MAP, + CASS_COLLECTION_TYPE_SET = CASS_VALUE_TYPE_SET } CassCollectionType; typedef enum CassBatchType_ { - CASS_BATCH_TYPE_LOGGED = 0, - CASS_BATCH_TYPE_UNLOGGED = 1, - CASS_BATCH_TYPE_COUNTER = 2 + CASS_BATCH_TYPE_LOGGED = 0x00, + CASS_BATCH_TYPE_UNLOGGED = 0x01, + CASS_BATCH_TYPE_COUNTER = 0x02 } CassBatchType; typedef enum CassIteratorType_ { @@ -526,7 +548,8 @@ typedef enum CassIteratorType_ { CASS_ITERATOR_TYPE_TYPE_META, CASS_ITERATOR_TYPE_FUNCTION_META, CASS_ITERATOR_TYPE_AGGREGATE_META, - CASS_ITERATOR_TYPE_COLUMN_META + CASS_ITERATOR_TYPE_COLUMN_META, + CASS_ITERATOR_TYPE_INDEX_META } CassIteratorType; #define CASS_LOG_LEVEL_MAP(XX) \ @@ -553,14 +576,6 @@ typedef enum CassSslVerifyFlags { CASS_SSL_VERIFY_PEER_IDENTITY } CassSslVerifyFlags; -typedef enum CassColumnType_ { - CASS_COLUMN_TYPE_REGULAR, - CASS_COLUMN_TYPE_PARTITION_KEY, - CASS_COLUMN_TYPE_CLUSTERING_KEY, - CASS_COLUMN_TYPE_STATIC, - CASS_COLUMN_TYPE_COMPACT_VALUE -} CassColumnType; - typedef enum CassErrorSource_ { CASS_ERROR_SOURCE_NONE, CASS_ERROR_SOURCE_LIB, @@ -1862,6 +1877,38 @@ cass_table_meta_column_by_name_n(const CassTableMeta* table_meta, const char* column, size_t column_length); +/** + * Gets the index metadata for the provided index name. + * + * @public @memberof CassTableMeta + * + * @param[in] table_meta + * @param[in] index + * + * @return The metadata for a index. NULL if index does not exist. + */ +CASS_EXPORT const CassIndexMeta* +cass_table_meta_index_by_name(const CassTableMeta* table_meta, + const char* index); + +/** + * Same as cass_table_meta_index_by_name(), but with lengths for string + * parameters. + * + * @public @memberof CassTableMeta + * + * @param[in] table_meta + * @param[in] index + * @param[in] index_length + * @return same as cass_table_meta_index_by_name() + * + * @see cass_table_meta_index_by_name() + */ +CASS_EXPORT const CassIndexMeta* +cass_table_meta_index_by_name_n(const CassTableMeta* table_meta, + const char* index, + size_t index_length); + /** * Gets the name of the table. * @@ -1900,6 +1947,30 @@ CASS_EXPORT const CassColumnMeta* cass_table_meta_column(const CassTableMeta* table_meta, size_t index); +/** + * Gets the total number of indexes for the table. + * + * @public @memberof CassTableMeta + * + * @param[in] table_meta + * @return The total index count. + */ +CASS_EXPORT size_t +cass_table_meta_index_count(const CassTableMeta* table_meta); + +/** + * Gets the index metadata for the provided index. + * + * @public @memberof CassTableMeta + * + * @param[in] table_meta + * @param[in] index + * @return The metadata for a index. NULL returned if the index is out of range. + */ +CASS_EXPORT const CassIndexMeta* +cass_table_meta_index(const CassTableMeta* table_meta, + size_t index); + /** * Gets the number of columns for the table's partition key. * @@ -2049,6 +2120,88 @@ cass_column_meta_field_by_name_n(const CassColumnMeta* column_meta, const char* name, size_t name_length); +/** + * Gets the name of the index. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @param[out] name + * @param[out] name_length + */ +CASS_EXPORT void +cass_index_meta_name(const CassIndexMeta* index_meta, + const char** name, + size_t* name_length); + +/** + * Gets the type of the index. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @return The index's type. + */ +CASS_EXPORT CassIndexType +cass_index_meta_type(const CassIndexMeta* index_meta); + +/** + * Gets the target of the index. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @param[out] target + * @param[out] target_length + */ +CASS_EXPORT void +cass_index_meta_target(const CassIndexMeta* index_meta, + const char** target, + size_t* target_length); + +/** + * Gets the options of the index. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @return The index's options. + */ +CASS_EXPORT const CassValue* +cass_index_meta_options(const CassIndexMeta* index_meta); + +/** + * Gets a metadata field for the provided name. Metadata fields allow direct + * access to the index data found in the underlying "indexes" metadata table. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @param[in] name + * @return A metadata field value. NULL if the field does not exist. + */ +CASS_EXPORT const CassValue* +cass_index_meta_field_by_name(const CassIndexMeta* index_meta, + const char* name); + +/** + * Same as cass_index_meta_field_by_name(), but with lengths for string + * parameters. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @param[in] name + * @param[in] name_length + * @return same as cass_index_meta_field_by_name() + * + * @see cass_index_meta_field_by_name() + */ +CASS_EXPORT const CassValue* +cass_index_meta_field_by_name_n(const CassIndexMeta* index_meta, + const char* name, + size_t name_length); + /** * Gets the name of the function. * @@ -6611,6 +6764,21 @@ cass_iterator_fields_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta); CASS_EXPORT CassIterator* cass_iterator_columns_from_table_meta(const CassTableMeta* table_meta); +/** + * Creates a new iterator for the specified table metadata. + * This can be used to iterate over indexes. + * + * @public @memberof CassTableMeta + * + * @param[in] table_meta + * @return A new iterator that must be freed. + * + * @see cass_iterator_get_index_meta() + * @see cass_iterator_free() + */ +CASS_EXPORT CassIterator* +cass_iterator_indexes_from_table_meta(const CassTableMeta* table_meta); + /** * Creates a new fields iterator for the specified table metadata. Metadata * fields allow direct access to the column data found in the underlying @@ -6647,13 +6815,31 @@ cass_iterator_fields_from_table_meta(const CassTableMeta* table_meta); CASS_EXPORT CassIterator* cass_iterator_fields_from_column_meta(const CassColumnMeta* column_meta); +/** + * Creates a new fields iterator for the specified index metadata. Metadata + * fields allow direct access to the index data found in the underlying + * "indexes" metadata table. This can be used to iterate those metadata + * field entries. + * + * @public @memberof CassIndexMeta + * + * @param[in] index_meta + * @return A new iterator that must be freed. + * + * @see cass_iterator_get_meta_field_name() + * @see cass_iterator_get_meta_field_value() + * @see cass_iterator_free() + */ +CASS_EXPORT CassIterator* +cass_iterator_fields_from_index_meta(const CassIndexMeta* index_meta); + /** * Creates a new fields iterator for the specified function metadata. Metadata * fields allow direct access to the column data found in the underlying * "functions" metadata table. This can be used to iterate those metadata * field entries. * - * @public @memberof CassColumnMeta + * @public @memberof CassFunctionMeta * * @param[in] function_meta * @return A new iterator that must be freed. @@ -6670,7 +6856,7 @@ cass_iterator_fields_from_function_meta(const CassFunctionMeta* function_meta); * "aggregates" metadata table. This can be used to iterate those metadata * field entries. * - * @public @memberof CassColumnMeta + * @public @memberof CassAggregateMeta * * @param[in] aggregate_meta * @return A new iterator that must be freed. @@ -6879,6 +7065,20 @@ cass_iterator_get_aggregate_meta(const CassIterator* iterator); CASS_EXPORT const CassColumnMeta* cass_iterator_get_column_meta(const CassIterator* iterator); +/** + * Gets the index metadata entry at the iterator's current position. + * + * Calling cass_iterator_next() will invalidate the previous + * value returned by this method. + * + * @public @memberof CassIterator + * + * @param[in] iterator + * @return A index metadata entry + */ +CASS_EXPORT const CassIndexMeta* +cass_iterator_get_index_meta(const CassIterator* iterator); + /** * Gets the metadata field name at the iterator's current position. * diff --git a/src/control_connection.cpp b/src/control_connection.cpp index 32f262be5..ee99bcf45 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -48,6 +48,7 @@ #define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces" #define SELECT_TABLES_30 "SELECT * FROM system_schema.tables" #define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns" +#define SELECT_INDEXES_30 "SELECT * FROM system_schema.indexes" #define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types" #define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions" #define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates" @@ -470,6 +471,7 @@ void ControlConnection::query_meta_schema() { handler->execute_query("keyspaces", SELECT_KEYSPACES_30); handler->execute_query("tables", SELECT_TABLES_30); handler->execute_query("columns", SELECT_COLUMNS_30); + handler->execute_query("indexes", SELECT_INDEXES_30); handler->execute_query("user_types", SELECT_USERTYPES_30); handler->execute_query("functions", SELECT_FUNCTIONS_30); handler->execute_query("aggregates", SELECT_AGGREGATES_30); @@ -510,7 +512,9 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) { ResultResponse* columns_result = NULL; MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); - session->metadata().update_tables(tables_result, columns_result); + ResultResponse* indexes_result = NULL; + MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result); + session->metadata().update_tables(tables_result, columns_result, indexes_result); } ResultResponse* user_types_result; @@ -749,6 +753,7 @@ void ControlConnection::refresh_table(const StringRef& keyspace_name, const StringRef& table_name) { std::string table_query; std::string column_query; + std::string index_query; if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) { table_query.assign(SELECT_TABLES_30); @@ -759,7 +764,11 @@ void ControlConnection::refresh_table(const StringRef& keyspace_name, column_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'"); - LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str()); + index_query.assign(SELECT_INDEXES_30); + index_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) + .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'"); + + LOG_DEBUG("Refreshing table %s; %s; %s", table_query.c_str(), column_query.c_str(), index_query.c_str()); } else { table_query.assign(SELECT_COLUMN_FAMILIES_20); table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size()) @@ -778,6 +787,9 @@ void ControlConnection::refresh_table(const StringRef& keyspace_name, RefreshTableData(keyspace_name.to_string(), table_name.to_string()))); handler->execute_query("tables", table_query); handler->execute_query("columns", column_query); + if (!index_query.empty()) { + handler->execute_query("indexes", index_query); + } } void ControlConnection::on_refresh_table(ControlConnection* control_connection, @@ -793,9 +805,11 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection, ResultResponse* columns_result = NULL; MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); + ResultResponse* indexes_result = NULL; + MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result); Session* session = control_connection->session_; - session->metadata().update_tables(tables_result, columns_result); + session->metadata().update_tables(tables_result, columns_result, indexes_result); } diff --git a/src/external_types.hpp b/src/external_types.hpp index 3e6002be5..04a1edbc4 100644 --- a/src/external_types.hpp +++ b/src/external_types.hpp @@ -75,6 +75,7 @@ EXTERNAL_TYPE(cass::Metadata::SchemaSnapshot, CassSchemaMeta); EXTERNAL_TYPE(cass::KeyspaceMetadata, CassKeyspaceMeta); EXTERNAL_TYPE(cass::TableMetadata, CassTableMeta); EXTERNAL_TYPE(cass::ColumnMetadata, CassColumnMeta); +EXTERNAL_TYPE(cass::IndexMetadata, CassIndexMeta); EXTERNAL_TYPE(cass::FunctionMetadata, CassFunctionMeta); EXTERNAL_TYPE(cass::AggregateMetadata, CassAggregateMeta); EXTERNAL_TYPE(cass::UuidGen, CassUuidGen); diff --git a/src/host.hpp b/src/host.hpp index 9f3c4a716..a023b984d 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -62,6 +62,10 @@ class VersionNumber { return compare(other) >= 0; } + bool operator <(const VersionNumber& other) const { + return compare(other) < 0; + } + int compare(const VersionNumber& other) const { if (major_ < other.major_) return -1; if (major_ > other.major_) return 1; diff --git a/src/metadata.cpp b/src/metadata.cpp index 9af2bb829..fd91be64b 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -27,6 +27,7 @@ #include "row_iterator.hpp" #include "scoped_lock.hpp" #include "data_type_parser.hpp" +#include "util.h" #include "value.hpp" #include "third_party/rapidjson/rapidjson/document.h" @@ -165,6 +166,17 @@ const CassColumnMeta* cass_table_meta_column_by_name_n(const CassTableMeta* tabl return CassColumnMeta::to(table_meta->get_column(std::string(column, column_length))); } +const CassIndexMeta* cass_table_meta_index_by_name(const CassTableMeta* table_meta, + const char* index) { + return CassIndexMeta::to(table_meta->get_index(index)); +} + +const CassIndexMeta* cass_table_meta_index_by_name_n(const CassTableMeta* table_meta, + const char* index, + size_t index_length) { + return CassIndexMeta::to(table_meta->get_index(std::string(index, index_length))); +} + void cass_table_meta_name(const CassTableMeta* table_meta, const char** name, size_t* name_length) { *name = table_meta->name().data(); @@ -193,6 +205,18 @@ const CassColumnMeta* cass_table_meta_column(const CassTableMeta* table_meta, return CassColumnMeta::to(table_meta->columns()[index].get()); } +size_t cass_table_meta_index_count(const CassTableMeta* table_meta) { + return table_meta->indexes().size(); +} + +const CassIndexMeta* cass_table_meta_index(const CassTableMeta* table_meta, + size_t index) { + if (index >= table_meta->indexes().size()) { + return NULL; + } + return CassIndexMeta::to(table_meta->indexes()[index].get()); +} + size_t cass_table_meta_partition_key_count(const CassTableMeta* table_meta) { return table_meta->partition_key().size(); } @@ -243,6 +267,38 @@ cass_column_meta_field_by_name_n(const CassColumnMeta* column_meta, return CassValue::to(column_meta->get_field(std::string(name, name_length))); } +void cass_index_meta_name(const CassIndexMeta* index_meta, + const char** name, size_t* name_length) { + *name = index_meta->name().data(); + *name_length = index_meta->name().size(); +} + +CassIndexType cass_index_meta_type(const CassIndexMeta* index_meta) { + return index_meta->type(); +} + +void cass_index_meta_target(const CassIndexMeta* index_meta, + const char** target, size_t* target_length) { + *target = index_meta->target().data(); + *target_length = index_meta->target().size(); +} + +const CassValue* cass_index_meta_options(const CassIndexMeta* index_meta) { + return CassValue::to(index_meta->options()); +} + +const CassValue* +cass_index_meta_field_by_name(const CassIndexMeta* index_meta, + const char* name) { + return CassValue::to(index_meta->get_field(name)); +} + +const CassValue* +cass_index_meta_field_by_name_n(const CassIndexMeta* index_meta, + const char* name, size_t name_length) { + return CassValue::to(index_meta->get_field(std::string(name, name_length))); +} + void cass_function_meta_name(const CassFunctionMeta* function_meta, const char** name, size_t* name_length) { @@ -405,6 +461,10 @@ CassIterator* cass_iterator_columns_from_table_meta(const CassTableMeta* table_m return CassIterator::to(table_meta->iterator_columns()); } +CassIterator* cass_iterator_indexes_from_table_meta(const CassTableMeta* table_meta) { + return CassIterator::to(table_meta->iterator_indexes()); +} + CassIterator* cass_iterator_fields_from_table_meta(const CassTableMeta* table_meta) { return CassIterator::to(table_meta->iterator_fields()); } @@ -413,6 +473,10 @@ CassIterator* cass_iterator_fields_from_column_meta(const CassColumnMeta* column return CassIterator::to(column_meta->iterator_fields()); } +CassIterator* cass_iterator_fields_from_index_meta(const CassIndexMeta* index_meta) { + return CassIterator::to(index_meta->iterator_fields()); +} + CassIterator* cass_iterator_fields_from_function_meta(const CassFunctionMeta* function_meta) { return CassIterator::to(function_meta->iterator_fields()); } @@ -475,6 +539,15 @@ const CassColumnMeta* cass_iterator_get_column_meta(const CassIterator* iterator iterator->from())->column()); } +const CassIndexMeta* cass_iterator_get_index_meta(const CassIterator* iterator) { + if (iterator->type() != CASS_ITERATOR_TYPE_INDEX_META) { + return NULL; + } + return CassIndexMeta::to( + static_cast( + iterator->from())->index()); +} + CassError cass_iterator_get_meta_field_name(const CassIterator* iterator, const char** name, size_t* name_length) { @@ -587,14 +660,16 @@ void Metadata::update_keyspaces(ResultResponse* result) { } } -void Metadata::update_tables(ResultResponse* tables_result, ResultResponse* columns_result) { +void Metadata::update_tables(ResultResponse* tables_result, + ResultResponse* columns_result, + ResultResponse* indexes_result) { schema_snapshot_version_++; if (is_front_buffer()) { ScopedMutex l(&mutex_); - updating_->update_tables(config_, tables_result, columns_result); + updating_->update_tables(config_, tables_result, columns_result, indexes_result); } else { - updating_->update_tables(config_, tables_result, columns_result); + updating_->update_tables(config_, tables_result, columns_result, indexes_result); } } @@ -1008,16 +1083,6 @@ const ColumnMetadata* TableMetadata::get_column(const std::string& name) const { return i->second.get(); } -const ColumnMetadata::Ptr& TableMetadata::get_or_create_column(const std::string& name) { - ColumnMetadata::Map::iterator i = columns_by_name_.find(name); - if (i == columns_by_name_.end()) { - ColumnMetadata::Ptr column(new ColumnMetadata(name)); - i = columns_by_name_.insert(std::make_pair(name, column)).first; - columns_.push_back(column); - } - return i->second; -} - void TableMetadata::add_column(const ColumnMetadata::Ptr& column) { columns_.push_back(column); columns_by_name_[column->name()] = column; @@ -1179,6 +1244,22 @@ void TableMetadata::key_aliases(const NativeDataTypes& native_types, KeyAliases* } } +const IndexMetadata* TableMetadata::get_index(const std::string& name) const { + IndexMetadata::Map::const_iterator i = indexes_by_name_.find(name); + if (i == indexes_by_name_.end()) return NULL; + return i->second.get(); +} + +void TableMetadata::add_index(const IndexMetadata::Ptr& index) { + indexes_.push_back(index); + indexes_by_name_[index->name()] = index; +} + +void TableMetadata::clear_indexes() { + indexes_.clear(); + indexes_by_name_.clear(); +} + FunctionMetadata::FunctionMetadata(const MetadataConfig& config, const std::string& name, const Value* signature, KeyspaceMetadata* keyspace, @@ -1338,9 +1419,99 @@ AggregateMetadata::AggregateMetadata(const MetadataConfig& config, } } +IndexMetadata::Ptr IndexMetadata::from_row(const std::string& index_name, + const SharedRefPtr& buffer, const Row* row) { + IndexMetadata::Ptr index(new IndexMetadata(index_name)); + + StringRef kind; + const Value* value = index->add_field(buffer, row, "kind"); + if (value != NULL && + value->value_type() == CASS_VALUE_TYPE_VARCHAR) { + kind = value->to_string_ref(); + } + + const Value* options = index->add_field(buffer, row, "options"); + index->update(kind, options); + + return index; +} + +void IndexMetadata::update(StringRef kind, const Value* options) { + type_ = index_type_from_string(kind); + + MapIterator iterator(options); + while(iterator.next()) { + if (iterator.key()->to_string_ref() == "target") { + target_ = iterator.value()->to_string(); + } + } + + options_ = options; +} + +IndexMetadata::Ptr IndexMetadata::from_legacy(const MetadataConfig& config, + const std::string& index_name, const ColumnMetadata* column, + const SharedRefPtr& buffer, const Row* row) { + IndexMetadata::Ptr index(new IndexMetadata(index_name)); + + index->add_field(buffer, row, "index_name"); + + StringRef index_type; + const Value* value = index->add_field(buffer, row, "index_type"); + if (value != NULL && + value->value_type() == CASS_VALUE_TYPE_VARCHAR) { + index_type = value->to_string_ref(); + } + + const Value* options = index->add_json_map_field(config.protocol_version, row, "index_options"); + index->update_legacy(index_type, column, options); + + return index; +} + +void IndexMetadata::update_legacy(StringRef index_type, const ColumnMetadata* column, const Value* options) { + type_ = index_type_from_string(index_type); + target_ = target_from_legacy(column, options); + options_ = options; +} + +std::string IndexMetadata::target_from_legacy(const ColumnMetadata* column, + const Value* options) { + std::string column_name(column->name()); + + escape_id(column_name); + + MapIterator iterator(options); + + while (iterator.next()) { + std::string key(iterator.key()->to_string()); + if (key.find("index_keys") != std::string::npos) { + return "keys(" + column_name + ")"; + } else if (key.find("index_keys_and_values") != std::string::npos) { + return "entries(" + column_name + ")"; + } else if (column->data_type()->is_collection()) { // TODO(mpenick): && is_frozen() + return "full(" + column_name + ")"; + } + } + + return column_name; +} + +CassIndexType IndexMetadata::index_type_from_string(StringRef index_type) { + if (index_type.iequals("keys")) { + return CASS_INDEX_TYPE_KEYS; + } else if (index_type.iequals("custom")) { + return CASS_INDEX_TYPE_CUSTOM; + } else if (index_type.iequals("composites")) { + return CASS_INDEX_TYPE_COMPOSITES; + } + return CASS_INDEX_TYPE_UNKNOWN; +} + ColumnMetadata::ColumnMetadata(const MetadataConfig& config, const std::string& name, KeyspaceMetadata* keyspace, + TableMetadata* table, const SharedRefPtr& buffer, const Row* row) : MetadataBase(name) , type_(CASS_COLUMN_TYPE_REGULAR) @@ -1416,9 +1587,20 @@ ColumnMetadata::ColumnMetadata(const MetadataConfig& config, data_type_ = DataTypeClassNameParser::parse_one(validator, config.native_types); } - add_field(buffer, row, "index_name"); - add_json_map_field(config.protocol_version, row, "index_options"); - add_field(buffer, row, "index_type"); + value = add_field(buffer, row, "index_type"); + if (value != NULL && !value->is_null()) { + std::string index_name; + value = add_field(buffer, row, "index_name"); + if (value != NULL && + value->value_type() == CASS_VALUE_TYPE_VARCHAR) { + index_name = value->to_string(); + table->add_index(IndexMetadata::from_legacy(config, index_name, this, buffer, row)); + } + add_json_map_field(config.protocol_version, row, "index_options"); + } else { + add_field(buffer, row, "index_name"); + add_json_map_field(config.protocol_version, row, "index_options"); + } } } @@ -1444,7 +1626,9 @@ void Metadata::InternalData::update_keyspaces(const MetadataConfig& config, } void Metadata::InternalData::update_tables(const MetadataConfig& config, - ResultResponse* tables_result, ResultResponse* columns_result) { + ResultResponse* tables_result, + ResultResponse* columns_result, + ResultResponse* indexes_result) { SharedRefPtr buffer = tables_result->buffer(); tables_result->decode_first_row(); @@ -1472,7 +1656,13 @@ void Metadata::InternalData::update_tables(const MetadataConfig& config, keyspace->add_table(TableMetadata::Ptr(new TableMetadata(config, table_name, buffer, row))); } - update_columns(config, columns_result); + if (columns_result != NULL) { + update_columns(config, columns_result); + } + + if (indexes_result != NULL) { + update_indexes(config, indexes_result); + } } void Metadata::InternalData::update_user_types(const MetadataConfig& config, ResultResponse* result) { @@ -1695,10 +1885,14 @@ void Metadata::InternalData::update_columns(const MetadataConfig& config, Result table_name = temp_table_name; table = keyspace->get_or_create_table(table_name); table->clear_columns(); + if (config.cassandra_version < VersionNumber(3, 0, 0)) { + table->clear_indexes(); + } } table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(config, column_name, - keyspace, buffer, row))); + keyspace, table.get(), + buffer, row))); } // Build keys for the last table @@ -1707,6 +1901,50 @@ void Metadata::InternalData::update_columns(const MetadataConfig& config, Result } } +void Metadata::InternalData::update_indexes(const MetadataConfig& config, ResultResponse* result) { + SharedRefPtr buffer = result->buffer(); + + result->decode_first_row(); + ResultIterator rows(result); + + std::string keyspace_name; + std::string table_name; + std::string index_name; + + KeyspaceMetadata* keyspace = NULL; + TableMetadata::Ptr table; + + while (rows.next()) { + std::string temp_keyspace_name; + std::string temp_table_name; + const Row* row = rows.row(); + + if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) || + !row->get_string_by_name("table_name", &temp_table_name) || + !row->get_string_by_name("index_name", &index_name)) { + LOG_ERROR("Unable to get column value for 'keyspace_name', 'table_name' or 'index_name'"); + continue; + } + + if (keyspace_name != temp_keyspace_name) { + keyspace_name = temp_keyspace_name; + keyspace = get_or_create_keyspace(keyspace_name); + } + + if (table_name != temp_table_name) { + // Build keys for the previous table + if (table) { + table->build_keys_and_sort(config); + } + table_name = temp_table_name; + table = keyspace->get_or_create_table(table_name); + table->clear_indexes(); + } + + table->add_index(IndexMetadata::from_row(index_name, buffer, row)); + } +} + KeyspaceMetadata* Metadata::InternalData::get_or_create_keyspace(const std::string& name) { KeyspaceMetadata::Map::iterator i = keyspaces_->find(name); if (i == keyspaces_->end()) { diff --git a/src/metadata.hpp b/src/metadata.hpp index 60004d95b..f2f938ff5 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -34,6 +34,8 @@ namespace cass { +class ColumnMetadata; +class TableMetadata; class KeyspaceMetadata; class Row; class ResultResponse; @@ -112,8 +114,8 @@ class MetadataField { : name_(name) {} MetadataField(const std::string& name, - const Value& value, - const SharedRefPtr& buffer) + const Value& value, + const SharedRefPtr& buffer) : name_(name) , value_(value) , buffer_(buffer) {} @@ -259,6 +261,43 @@ class AggregateMetadata : public MetadataBase, public RefCounted { +public: + typedef SharedRefPtr Ptr; + typedef std::map Map; + typedef std::vector Vec; + + CassIndexType type() const { return type_; } + const std::string& target() const { return target_; } + const Value* options() const { return options_; } + + IndexMetadata(const std::string& index_name) + : MetadataBase(index_name) { } + + static IndexMetadata::Ptr from_row(const std::string& index_name, + const SharedRefPtr& buffer, const Row* row); + void update(StringRef index_type, const Value* options); + + static IndexMetadata::Ptr from_legacy(const MetadataConfig& config, + const std::string& index_name, const ColumnMetadata* column, + const SharedRefPtr& buffer, const Row* row); + void update_legacy(StringRef index_type, const ColumnMetadata* column, const Value* options); + + +private: + static CassIndexType index_type_from_string(StringRef index_type); + static std::string target_from_legacy(const ColumnMetadata* column, + const Value* options); + +private: + CassIndexType type_; + std::string target_; + const Value* options_; + +private: + DISALLOW_COPY_AND_ASSIGN(IndexMetadata); +}; + class ColumnMetadata : public MetadataBase, public RefCounted { public: typedef SharedRefPtr Ptr; @@ -282,6 +321,7 @@ class ColumnMetadata : public MetadataBase, public RefCounted { ColumnMetadata(const MetadataConfig& config, const std::string& name, KeyspaceMetadata* keyspace, + TableMetadata* table, const SharedRefPtr& buffer, const Row* row); CassColumnType type() const { return type_; } @@ -310,6 +350,13 @@ class TableMetadata : public MetadataBase, public RefCounted { const ColumnMetadata* column() const { return impl_.item().get(); } }; + class IndexIterator : public MetadataIteratorImpl > { + public: + IndexIterator(const IndexIterator::Collection& collection) + : MetadataIteratorImpl >(CASS_ITERATOR_TYPE_INDEX_META, collection) { } + const IndexMetadata* index() const { return impl_.item().get(); } + }; + TableMetadata(const std::string& name) : MetadataBase(name) { } @@ -320,19 +367,27 @@ class TableMetadata : public MetadataBase, public RefCounted { const ColumnMetadata::Vec& partition_key() const { return partition_key_; } const ColumnMetadata::Vec& clustering_key() const { return clustering_key_; } + const IndexMetadata::Vec& indexes() const { return indexes_; } + Iterator* iterator_columns() const { return new ColumnIterator(columns_); } const ColumnMetadata* get_column(const std::string& name) const; - const ColumnMetadata::Ptr& get_or_create_column(const std::string& name); void add_column(const ColumnMetadata::Ptr& column); void clear_columns(); void build_keys_and_sort(const MetadataConfig& config); void key_aliases(const NativeDataTypes& native_types, KeyAliases* output) const; + Iterator* iterator_indexes() const { return new IndexIterator(indexes_); } + const IndexMetadata* get_index(const std::string& name) const; + void add_index(const IndexMetadata::Ptr& index); + void clear_indexes(); + private: ColumnMetadata::Vec columns_; ColumnMetadata::Map columns_by_name_; ColumnMetadata::Vec partition_key_; ColumnMetadata::Vec clustering_key_; + IndexMetadata::Vec indexes_; + IndexMetadata::Map indexes_by_name_; private: DISALLOW_COPY_AND_ASSIGN(TableMetadata); @@ -468,7 +523,9 @@ class Metadata { SchemaSnapshot schema_snapshot() const; void update_keyspaces(ResultResponse* result); - void update_tables(ResultResponse* tables_result, ResultResponse* columns_result); + void update_tables(ResultResponse* tables_result, + ResultResponse* columns_result, + ResultResponse* indexes_result); void update_user_types(ResultResponse* result); void update_functions(ResultResponse* result); void update_aggregates(ResultResponse* result); @@ -517,7 +574,10 @@ class Metadata { const KeyspaceMetadata::MapPtr& keyspaces() const { return keyspaces_; } void update_keyspaces(const MetadataConfig& config, ResultResponse* result, KeyspaceMetadata::Map& updates); - void update_tables(const MetadataConfig& config, ResultResponse* tables_result, ResultResponse* columns_result); + void update_tables(const MetadataConfig& config, + ResultResponse* tables_result, + ResultResponse* columns_result, + ResultResponse* indexes_result); void update_user_types(const MetadataConfig& config, ResultResponse* result); void update_functions(const MetadataConfig& config, ResultResponse* result); void update_aggregates(const MetadataConfig& config, ResultResponse* result); @@ -538,6 +598,7 @@ class Metadata { private: void update_columns(const MetadataConfig& config, ResultResponse* result); + void update_indexes(const MetadataConfig& config, ResultResponse* result); KeyspaceMetadata* get_or_create_keyspace(const std::string& name); diff --git a/src/utils.cpp b/src/utils.cpp index f7815b4a4..d8b7f9275 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -89,21 +89,63 @@ std::string& trim(std::string& str) { return str; } -static bool is_valid_cql_id_char(int c) { +static bool is_word_char(int c) { return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_'; } +static bool is_lower_word_char(int c) { + return (c >= 'a' && c <= 'z') || + (c >= '0' && c <= '9') || c == '_'; +} + bool is_valid_cql_id(const std::string& str) { for (std::string::const_iterator i = str.begin(), end = str.end(); i != end; ++i) { - if (!is_valid_cql_id_char(*i)) { + if (!is_word_char(*i)) { return false; } } return true; } +bool is_valid_lower_cql_id(const std::string& str) { + if (str.empty() || !is_lower_word_char(str.front())) { + return false; + } + if (str.size() > 1) { + for (std::string::const_iterator i = str.begin() + 1, + end = str.end(); i != end; ++i) { + if (!is_lower_word_char(*i)) { + return false; + } + } + } + return true; +} + +std::string& quote_id(std::string& str) { + std::string temp(str); + str.clear(); + str.push_back('"'); + for (std::string::const_iterator i = temp.begin(), + end = temp.end(); i != end; ++i) { + if (*i == '"') { + str.push_back('"'); + str.push_back('"'); + } else { + str.push_back(*i); + } + } + str.push_back('"'); + + return str; +} + +std::string& escape_id(std::string& str) { + return is_valid_lower_cql_id(str) ? str : quote_id(str); +} + std::string& to_cql_id(std::string& str) { if (is_valid_cql_id(str)) { std::transform(str.begin(), str.end(), str.begin(), tolower); diff --git a/src/utils.hpp b/src/utils.hpp index efed2bd5b..ea6e96038 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -84,6 +84,8 @@ bool is_valid_cql_id(const std::string& str); std::string& to_cql_id(std::string& str); +std::string& escape_id(std::string& str); + } // namespace cass #endif diff --git a/test/integration_tests/src/test_schema_metadata.cpp b/test/integration_tests/src/test_schema_metadata.cpp index 5186facb7..4f701cc86 100644 --- a/test/integration_tests/src/test_schema_metadata.cpp +++ b/test/integration_tests/src/test_schema_metadata.cpp @@ -42,6 +42,15 @@ #define USER_DEFINED_AGGREGATE_NAME "user_defined_aggregate" #define USER_DEFINED_AGGREGATE_FINAL_FUNCTION_NAME "uda_udf_final" +namespace std { + +std::ostream& operator<<(std::ostream& os, const std::pair& p) +{ + return os << "(\"" << p.first << "\", \"" << p.second << "\")"; +} + +} // namespace std + /** * Schema Metadata Test Class * @@ -221,6 +230,46 @@ struct TestSchemaMetadata : public test_utils::SingleSessionTest { } } + void verify_index(const CassIndexMeta* index_meta, + const std::string& index_name, CassIndexType index_type, + const std::string& index_target, const std::map& index_options) { + BOOST_REQUIRE(index_meta != NULL); + + CassString name; + cass_index_meta_name(index_meta, &name.data, &name.length); + BOOST_CHECK(name == CassString(index_name.c_str())); + + CassString target; + cass_index_meta_target(index_meta, &target.data, &target.length); + BOOST_CHECK(target == CassString(index_target.c_str())); + + CassIndexType type = cass_index_meta_type(index_meta); + BOOST_CHECK(type == index_type); + + const CassValue* options = cass_index_meta_options(index_meta); + + if (cass_value_is_null(options)) { + BOOST_CHECK(index_options.empty()); + return; + } + + test_utils::CassIteratorPtr iterator(cass_iterator_from_map(options)); + + std::map actual_index_options; + while (cass_iterator_next(iterator.get())) { + CassString k, v; + const CassValue* key = cass_iterator_get_map_key(iterator.get()); + cass_value_get_string(key, &k.data, &k.length); + const CassValue* value = cass_iterator_get_map_value(iterator.get()); + cass_value_get_string(value, &v.data, &v.length); + actual_index_options[std::string(k.data, k.length)] = std::string(v.data, v.length); + } + + BOOST_CHECK_EQUAL_COLLECTIONS(actual_index_options.begin(), actual_index_options.end(), + index_options.begin(), index_options.end()); + } + + const std::set& column_fields() { static std::set fields; if (fields.empty()) { @@ -972,5 +1021,86 @@ BOOST_AUTO_TEST_CASE(disable) { test_utils::execute_query_with_error(session, str(boost::format(test_utils::DROP_KEYSPACE_FORMAT) % "ks2")); } + +/** + * Test secondary indexes + * + * Verifies that index metadata is correctly updated and returned. + * + * @since 2.3.0 + * @jira_ticket CPP-321 + * @test_category schema + * @cassandra_version 1.2.x + */ +BOOST_AUTO_TEST_CASE(indexes) { + { + test_utils::execute_query(session, "CREATE KEYSPACE indexes WITH replication = " + "{ 'class' : 'SimpleStrategy', 'replication_factor' : 3 }"); + + test_utils::execute_query(session, "CREATE TABLE indexes.table1 (key1 text, value1 int, value2 map, PRIMARY KEY(key1))"); + + refresh_schema_meta(); + const CassTableMeta* table_meta = schema_get_table("indexes", "table1"); + + BOOST_CHECK(cass_table_meta_index_count(table_meta) == 0); + BOOST_CHECK(cass_table_meta_index_by_name(table_meta, "invalid") == NULL); + BOOST_CHECK(cass_table_meta_index(table_meta, 0) == NULL); + } + + // Index + { + test_utils::execute_query(session, "CREATE INDEX index1 ON indexes.table1 (value1)"); + + refresh_schema_meta(); + const CassTableMeta* table_meta = schema_get_table("indexes", "table1"); + + BOOST_REQUIRE(cass_table_meta_index_count(table_meta) == 1); + std::map index_options; + if (version >= "3.0.0") { + index_options["target"] = "value1"; + } + verify_index(cass_table_meta_index_by_name(table_meta, "index1"), + "index1", CASS_INDEX_TYPE_COMPOSITES, "value1", index_options); + verify_index(cass_table_meta_index(table_meta, 0), + "index1", CASS_INDEX_TYPE_COMPOSITES, "value1", index_options); + } + + // Index on map keys + { + test_utils::execute_query(session, "CREATE INDEX index2 ON indexes.table1 (KEYS(value2))"); + + refresh_schema_meta(); + const CassTableMeta* table_meta = schema_get_table("indexes", "table1"); + + BOOST_REQUIRE(cass_table_meta_index_count(table_meta) == 2); + + std::map index_options; + if (version >= "3.0.0") { + index_options["target"] = "keys(value2)"; + } else { + index_options["index_keys"] = ""; + } + verify_index(cass_table_meta_index_by_name(table_meta, "index2"), + "index2", CASS_INDEX_TYPE_COMPOSITES, "keys(value2)", index_options); + verify_index(cass_table_meta_index(table_meta, 1), + "index2", CASS_INDEX_TYPE_COMPOSITES, "keys(value2)", index_options); + } + + // Iterator + { + const CassTableMeta* table_meta = schema_get_table("indexes", "table1"); + + test_utils::CassIteratorPtr iterator(cass_iterator_indexes_from_table_meta(table_meta)); + while (cass_iterator_next(iterator.get())) { + const CassIndexMeta* index_meta = cass_iterator_get_index_meta(iterator.get()); + BOOST_REQUIRE(index_meta != NULL); + + CassString name; + cass_index_meta_name(index_meta, &name.data, &name.length); + BOOST_CHECK(name == CassString("index1") || name == CassString("index2")); + } + } +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/unit_tests/src/test_utils.cpp b/test/unit_tests/src/test_utils.cpp index 4fff8c223..6fc21b789 100644 --- a/test/unit_tests/src/test_utils.cpp +++ b/test/unit_tests/src/test_utils.cpp @@ -48,4 +48,21 @@ BOOST_AUTO_TEST_CASE(cql_id) BOOST_CHECK_EQUAL(cass::to_cql_id(s), std::string("!@#")); } +BOOST_AUTO_TEST_CASE(escape_id) +{ + std::string s; + + s = "abc"; + BOOST_CHECK_EQUAL(cass::escape_id(s), std::string("abc")); + + s = "aBc"; + BOOST_CHECK_EQUAL(cass::escape_id(s), std::string("\"aBc\"")); + + s = "\""; + BOOST_CHECK_EQUAL(cass::escape_id(s), std::string("\"\"\"\"")); + + s = "a\"Bc"; + BOOST_CHECK_EQUAL(cass::escape_id(s), std::string("\"a\"\"Bc\"")); +} + BOOST_AUTO_TEST_SUITE_END() From 3dce03e6756cab6988f9ccfdd05d4c8e19d06513 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Wed, 24 Feb 2016 16:18:57 -0700 Subject: [PATCH 3/4] Separated column/index metadata from table metadata update --- src/control_connection.cpp | 33 ++++++++++++++++-------- src/metadata.cpp | 51 ++++++++++++++++++++++---------------- src/metadata.hpp | 15 +++++------ 3 files changed, 58 insertions(+), 41 deletions(-) diff --git a/src/control_connection.cpp b/src/control_connection.cpp index ee99bcf45..9f7dc9ba7 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -510,11 +510,17 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti ResultResponse* tables_result; if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) { - ResultResponse* columns_result = NULL; - MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); - ResultResponse* indexes_result = NULL; - MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result); - session->metadata().update_tables(tables_result, columns_result, indexes_result); + session->metadata().update_tables(tables_result); + } + + ResultResponse* columns_result; + if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) { + session->metadata().update_columns(columns_result); + } + + ResultResponse* indexes_result; + if (MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result)) { + session->metadata().update_indexes(indexes_result); } ResultResponse* user_types_result; @@ -803,13 +809,18 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection, return; } - ResultResponse* columns_result = NULL; - MultipleRequestHandler::get_result_response(responses, "columns", &columns_result); - ResultResponse* indexes_result = NULL; - MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result); - Session* session = control_connection->session_; - session->metadata().update_tables(tables_result, columns_result, indexes_result); + session->metadata().update_tables(tables_result); + + ResultResponse* columns_result; + if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) { + session->metadata().update_columns(columns_result); + } + + ResultResponse* indexes_result; + if (MultipleRequestHandler::get_result_response(responses, "indexes", &indexes_result)) { + session->metadata().update_indexes(indexes_result); + } } diff --git a/src/metadata.cpp b/src/metadata.cpp index fd91be64b..3c13e5c97 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -27,7 +27,7 @@ #include "row_iterator.hpp" #include "scoped_lock.hpp" #include "data_type_parser.hpp" -#include "util.h" +#include "utils.hpp" #include "value.hpp" #include "third_party/rapidjson/rapidjson/document.h" @@ -660,16 +660,36 @@ void Metadata::update_keyspaces(ResultResponse* result) { } } -void Metadata::update_tables(ResultResponse* tables_result, - ResultResponse* columns_result, - ResultResponse* indexes_result) { +void Metadata::update_tables(ResultResponse* result) { schema_snapshot_version_++; if (is_front_buffer()) { ScopedMutex l(&mutex_); - updating_->update_tables(config_, tables_result, columns_result, indexes_result); + updating_->update_tables(config_, result); } else { - updating_->update_tables(config_, tables_result, columns_result, indexes_result); + updating_->update_tables(config_, result); + } +} + +void Metadata::update_columns(ResultResponse* result) { + schema_snapshot_version_++; + + if (is_front_buffer()) { + ScopedMutex l(&mutex_); + updating_->update_columns(config_, result); + } else { + updating_->update_columns(config_, result); + } +} + +void Metadata::update_indexes(ResultResponse* result) { + schema_snapshot_version_++; + + if (is_front_buffer()) { + ScopedMutex l(&mutex_); + updating_->update_indexes(config_, result); + } else { + updating_->update_indexes(config_, result); } } @@ -1625,14 +1645,11 @@ void Metadata::InternalData::update_keyspaces(const MetadataConfig& config, } } -void Metadata::InternalData::update_tables(const MetadataConfig& config, - ResultResponse* tables_result, - ResultResponse* columns_result, - ResultResponse* indexes_result) { - SharedRefPtr buffer = tables_result->buffer(); +void Metadata::InternalData::update_tables(const MetadataConfig& config, ResultResponse* result) { + SharedRefPtr buffer = result->buffer(); - tables_result->decode_first_row(); - ResultIterator rows(tables_result); + result->decode_first_row(); + ResultIterator rows(result); std::string keyspace_name; std::string table_name; @@ -1655,14 +1672,6 @@ void Metadata::InternalData::update_tables(const MetadataConfig& config, keyspace->add_table(TableMetadata::Ptr(new TableMetadata(config, table_name, buffer, row))); } - - if (columns_result != NULL) { - update_columns(config, columns_result); - } - - if (indexes_result != NULL) { - update_indexes(config, indexes_result); - } } void Metadata::InternalData::update_user_types(const MetadataConfig& config, ResultResponse* result) { diff --git a/src/metadata.hpp b/src/metadata.hpp index f2f938ff5..767368b6d 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -523,9 +523,9 @@ class Metadata { SchemaSnapshot schema_snapshot() const; void update_keyspaces(ResultResponse* result); - void update_tables(ResultResponse* tables_result, - ResultResponse* columns_result, - ResultResponse* indexes_result); + void update_tables(ResultResponse* result); + void update_columns(ResultResponse* result); + void update_indexes(ResultResponse* result); void update_user_types(ResultResponse* result); void update_functions(ResultResponse* result); void update_aggregates(ResultResponse* result); @@ -574,10 +574,9 @@ class Metadata { const KeyspaceMetadata::MapPtr& keyspaces() const { return keyspaces_; } void update_keyspaces(const MetadataConfig& config, ResultResponse* result, KeyspaceMetadata::Map& updates); - void update_tables(const MetadataConfig& config, - ResultResponse* tables_result, - ResultResponse* columns_result, - ResultResponse* indexes_result); + void update_tables(const MetadataConfig& config, ResultResponse* result); + void update_columns(const MetadataConfig& config, ResultResponse* result); + void update_indexes(const MetadataConfig& config, ResultResponse* result); void update_user_types(const MetadataConfig& config, ResultResponse* result); void update_functions(const MetadataConfig& config, ResultResponse* result); void update_aggregates(const MetadataConfig& config, ResultResponse* result); @@ -597,8 +596,6 @@ class Metadata { } private: - void update_columns(const MetadataConfig& config, ResultResponse* result); - void update_indexes(const MetadataConfig& config, ResultResponse* result); KeyspaceMetadata* get_or_create_keyspace(const std::string& name); From ee5b8d0b67ce58deba2dcbddf5186897fceb9147 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Thu, 25 Feb 2016 09:17:21 -0700 Subject: [PATCH 4/4] Fix: std::string::front() is C++11 only --- src/utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.cpp b/src/utils.cpp index d8b7f9275..d519ce587 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -110,7 +110,7 @@ bool is_valid_cql_id(const std::string& str) { } bool is_valid_lower_cql_id(const std::string& str) { - if (str.empty() || !is_lower_word_char(str.front())) { + if (str.empty() || !is_lower_word_char(str[0])) { return false; } if (str.size() > 1) {