diff --git a/include/cassandra.h b/include/cassandra.h
index 573fd8826..3e1c55cca 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -557,7 +557,8 @@ typedef enum CassColumnType_ {
CASS_COLUMN_TYPE_REGULAR,
CASS_COLUMN_TYPE_PARTITION_KEY,
CASS_COLUMN_TYPE_CLUSTERING_KEY,
- CASS_COLUMN_TYPE_STATIC
+ CASS_COLUMN_TYPE_STATIC,
+ CASS_COLUMN_TYPE_COMPACT_VALUE
} CassColumnType;
typedef enum CassErrorSource_ {
@@ -2322,6 +2323,9 @@ cass_aggregate_meta_final_func(const CassAggregateMeta* aggregate_meta);
/**
* Gets the initial condition value for the aggregate.
*
+ * Note: The value of the initial condition will always be
+ * a "varchar" type for Cassandra 3.0+.
+ *
* @public @memberof CassAggregateMeta
*
* @param[in] aggregate_meta
@@ -4278,6 +4282,18 @@ cass_data_type_set_class_name_n(CassDataType* data_type,
const char* class_name,
size_t class_name_length);
+/**
+ * Gets the sub-data type count of a UDT (user defined type), tuple
+ * or collection.
+ *
+ * Note: Only valid for UDT, tuple and collection data types.
+ *
+ * @param[in] data_type
+ * @return Returns the number of sub-data types
+ */
+CASS_EXPORT size_t
+cass_data_sub_type_count(const CassDataType* data_type);
+
/**
* Gets the sub-data type of a UDT (user defined type), tuple or collection at
* the specified index.
diff --git a/src/abstract_data.hpp b/src/abstract_data.hpp
index acac4e927..3fd5fad19 100644
--- a/src/abstract_data.hpp
+++ b/src/abstract_data.hpp
@@ -143,7 +143,7 @@ class AbstractData {
protected:
virtual size_t get_indices(StringRef name,
IndexVec* indices) = 0;
- virtual const SharedRefPtr& get_type(size_t index) const = 0;
+ virtual const DataType::ConstPtr& get_type(size_t index) const = 0;
private:
template
@@ -152,7 +152,7 @@ class AbstractData {
return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
}
IsValidDataType is_valid_type;
- SharedRefPtr data_type(get_type(index));
+ DataType::ConstPtr data_type(get_type(index));
if (data_type && !is_valid_type(value, data_type)) {
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
}
diff --git a/src/collection.hpp b/src/collection.hpp
index 1c2dbf337..7863bc132 100644
--- a/src/collection.hpp
+++ b/src/collection.hpp
@@ -41,7 +41,7 @@ class Collection : public RefCounted {
items_.reserve(item_count);
}
- Collection(const SharedRefPtr& data_type,
+ Collection(const CollectionType::ConstPtr& data_type,
size_t item_count)
: data_type_(data_type) {
items_.reserve(item_count);
@@ -51,7 +51,7 @@ class Collection : public RefCounted {
return static_cast(data_type_->value_type());
}
- const SharedRefPtr& data_type() const { return data_type_; }
+ const CollectionType::ConstPtr& data_type() const { return data_type_; }
const BufferVec& items() const { return items_; }
#define APPEND_TYPE(Type) \
@@ -129,7 +129,7 @@ class Collection : public RefCounted {
void encode_items_uint16(char* buf) const;
private:
- SharedRefPtr data_type_;
+ CollectionType::ConstPtr data_type_;
BufferVec items_;
private:
diff --git a/src/collection_iterator.cpp b/src/collection_iterator.cpp
index 76c92e734..c652c5f3e 100644
--- a/src/collection_iterator.cpp
+++ b/src/collection_iterator.cpp
@@ -33,7 +33,7 @@ char* CollectionIterator::decode_value(char* position) {
int32_t size;
char* buffer = decode_size(protocol_version, position, size);
- SharedRefPtr data_type;
+ DataType::ConstPtr data_type;
if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
data_type = (index_ % 2 == 0) ? collection_->primary_data_type()
: collection_->secondary_data_type();
diff --git a/src/collection_iterator.hpp b/src/collection_iterator.hpp
index fbbe2dd30..fee7bb7c4 100644
--- a/src/collection_iterator.hpp
+++ b/src/collection_iterator.hpp
@@ -67,7 +67,7 @@ class TupleIterator : public ValueIterator {
: ValueIterator(CASS_ITERATOR_TYPE_TUPLE)
, tuple_(tuple)
, position_(tuple->data()) {
- SharedRefPtr collection_type(tuple->data_type());
+ CollectionType::ConstPtr collection_type(tuple->data_type());
next_ = collection_type->types().begin();
end_ = collection_type->types().end();
}
diff --git a/src/control_connection.cpp b/src/control_connection.cpp
index 60b1637e8..61df6d3f3 100644
--- a/src/control_connection.cpp
+++ b/src/control_connection.cpp
@@ -38,12 +38,19 @@
#define SELECT_PEERS "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers"
#define SELECT_PEERS_TOKENS "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers"
-#define SELECT_KEYSPACES "SELECT * FROM system.schema_keyspaces"
-#define SELECT_COLUMN_FAMILIES "SELECT * FROM system.schema_columnfamilies"
-#define SELECT_COLUMNS "SELECT * FROM system.schema_columns"
-#define SELECT_USERTYPES "SELECT * FROM system.schema_usertypes"
-#define SELECT_FUNCTIONS "SELECT * FROM system.schema_functions"
-#define SELECT_AGGREGATES "SELECT * FROM system.schema_aggregates"
+#define SELECT_KEYSPACES_20 "SELECT * FROM system.schema_keyspaces"
+#define SELECT_COLUMN_FAMILIES_20 "SELECT * FROM system.schema_columnfamilies"
+#define SELECT_COLUMNS_20 "SELECT * FROM system.schema_columns"
+#define SELECT_USERTYPES_21 "SELECT * FROM system.schema_usertypes"
+#define SELECT_FUNCTIONS_22 "SELECT * FROM system.schema_functions"
+#define SELECT_AGGREGATES_22 "SELECT * FROM system.schema_aggregates"
+
+#define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces"
+#define SELECT_TABLES_30 "SELECT * FROM system_schema.tables"
+#define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns"
+#define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types"
+#define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions"
+#define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates"
namespace cass {
@@ -194,7 +201,7 @@ void ControlConnection::on_ready(Connection* connection) {
// The control connection has to refresh meta when there's a reconnect because
// events could have been missed while not connected.
- query_meta_all();
+ query_meta_hosts();
}
void ControlConnection::on_close(Connection* connection) {
@@ -359,31 +366,16 @@ void ControlConnection::on_event(EventResponse* response) {
}
}
-//TODO: query and callbacks should be in Metadata
-// punting for now because of tight coupling of Session and CC state
-void ControlConnection::query_meta_all() {
- ScopedRefPtr > handler(
- new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_all, QueryMetadataAllData()));
+void ControlConnection::query_meta_hosts() {
+ ScopedRefPtr > handler(
+ new ControlMultipleRequestHandler(this, ControlConnection::on_query_hosts, UnusedData()));
handler->execute_query(SELECT_LOCAL_TOKENS);
handler->execute_query(SELECT_PEERS_TOKENS);
-
- if (session_->config().use_schema()) {
- handler->execute_query(SELECT_KEYSPACES);
- handler->execute_query(SELECT_COLUMN_FAMILIES);
- handler->execute_query(SELECT_COLUMNS);
- if (protocol_version_ >= 3) {
- handler->execute_query(SELECT_USERTYPES);
- }
- if (protocol_version_ >= 4) {
- handler->execute_query(SELECT_FUNCTIONS);
- handler->execute_query(SELECT_AGGREGATES);
- }
- }
}
-void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
- const QueryMetadataAllData& unused,
- const MultipleRequestHandler::ResponseVec& responses) {
+void ControlConnection::on_query_hosts(ControlConnection* control_connection,
+ const UnusedData& data,
+ const MultipleRequestHandler::ResponseVec& responses) {
Connection* connection = control_connection->connection_;
if (connection == NULL) {
return;
@@ -391,8 +383,6 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
Session* session = control_connection->session_;
- session->metadata().clear_and_update_back();
-
bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
// If the 'system.local' table is empty the connection isn't used as a control
@@ -460,20 +450,73 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
session->purge_hosts(is_initial_connection);
if (session->config().use_schema()) {
- session->metadata().update_keyspaces(static_cast(responses[2].get()));
- session->metadata().update_tables(static_cast(responses[3].get()),
- static_cast(responses[4].get()));
- if (control_connection->protocol_version_ >= 3) {
- session->metadata().update_user_types(static_cast(responses[5].get()));
+ control_connection->query_meta_schema();
+ } else {
+ control_connection->state_ = CONTROL_STATE_READY;
+ session->on_control_connection_ready();
+ // Create a new query plan that considers all the new hosts from the
+ // "system" tables.
+ control_connection->query_plan_.reset(session->new_query_plan());
+ }
+}
+
+//TODO: query and callbacks should be in Metadata
+// punting for now because of tight coupling of Session and CC state
+void ControlConnection::query_meta_schema() {
+ ScopedRefPtr > handler(
+ new ControlMultipleRequestHandler(this, ControlConnection::on_query_meta_schema, UnusedData()));
+
+ if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ handler->execute_query(SELECT_KEYSPACES_30);
+ handler->execute_query(SELECT_TABLES_30);
+ handler->execute_query(SELECT_COLUMNS_30);
+ handler->execute_query(SELECT_USERTYPES_30);
+ handler->execute_query(SELECT_FUNCTIONS_30);
+ handler->execute_query(SELECT_AGGREGATES_30);
+ } else {
+ handler->execute_query(SELECT_KEYSPACES_20);
+ handler->execute_query(SELECT_COLUMN_FAMILIES_20);
+ handler->execute_query(SELECT_COLUMNS_20);
+ if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
+ handler->execute_query(SELECT_USERTYPES_21);
}
- if (control_connection->protocol_version_ >= 4) {
- session->metadata().update_functions(static_cast(responses[6].get()));
- session->metadata().update_aggregates(static_cast(responses[7].get()));
+ if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
+ handler->execute_query(SELECT_FUNCTIONS_22);
+ handler->execute_query(SELECT_AGGREGATES_22);
}
- session->metadata().swap_to_back_and_update_front();
- if (control_connection->should_query_tokens_) session->metadata().build();
+ }
+}
+
+void ControlConnection::on_query_meta_schema(ControlConnection* control_connection,
+ const UnusedData& unused,
+ const MultipleRequestHandler::ResponseVec& responses) {
+ Connection* connection = control_connection->connection_;
+ if (connection == NULL) {
+ return;
}
+ Session* session = control_connection->session_;
+
+ session->metadata().clear_and_update_back();
+
+ bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);
+
+ session->metadata().update_keyspaces(static_cast(responses[0].get()));
+ session->metadata().update_tables(static_cast(responses[1].get()),
+ static_cast(responses[2].get()));
+
+ if (session->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
+ session->metadata().update_user_types(static_cast(responses[3].get()));
+ }
+
+ if (session->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
+ session->metadata().update_functions(static_cast(responses[4].get()));
+ session->metadata().update_aggregates(static_cast(responses[5].get()));
+ }
+
+ session->metadata().swap_to_back_and_update_front();
+ if (control_connection->should_query_tokens_) session->metadata().build();
+
if (is_initial_connection) {
control_connection->state_ = CONTROL_STATE_READY;
session->on_control_connection_ready();
@@ -636,8 +679,9 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
}
if (should_query_tokens_) {
+ bool is_connected_host = connection_ != NULL && host->address().compare(connection_->address()) == 0;
std::string partitioner;
- if (row->get_string_by_name("partitioner", &partitioner)) {
+ if (is_connected_host && row->get_string_by_name("partitioner", &partitioner)) {
session_->metadata().set_partitioner(partitioner);
}
v = row->get_by_name("tokens");
@@ -655,7 +699,13 @@ void ControlConnection::update_node_info(SharedRefPtr host, const Row* row
}
void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
- std::string query(SELECT_KEYSPACES);
+ std::string query;
+
+ if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ query.assign(SELECT_KEYSPACES_30);
+ } else {
+ query.assign(SELECT_KEYSPACES_20);
+ }
query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("'");
@@ -683,13 +733,26 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
void ControlConnection::refresh_table(const StringRef& keyspace_name,
const StringRef& table_name) {
- std::string cf_query(SELECT_COLUMN_FAMILIES);
- cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
- .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+ std::string cf_query;
+ std::string col_query;
+
+ if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ cf_query.assign(SELECT_TABLES_30);
+ cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+ .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
- std::string col_query(SELECT_COLUMNS);
- col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
- .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+ col_query.assign(SELECT_COLUMNS_30);
+ col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+ .append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
+ } else {
+ cf_query.assign(SELECT_COLUMN_FAMILIES_20);
+ cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+ .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+
+ col_query.assign(SELECT_COLUMNS_20);
+ col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
+ .append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
+ }
LOG_DEBUG("Refreshing table %s; %s", cf_query.c_str(), col_query.c_str());
@@ -720,7 +783,13 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection,
void ControlConnection::refresh_type(const StringRef& keyspace_name,
const StringRef& type_name) {
- std::string query(SELECT_USERTYPES);
+ std::string query;
+ if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ query.assign(SELECT_USERTYPES_30);
+ } else {
+ query.assign(SELECT_USERTYPES_21);
+ }
+
query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND type_name='").append(type_name.data(), type_name.size()).append("'");
@@ -752,12 +821,22 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
bool is_aggregate) {
std::string query;
- if (is_aggregate) {
- query.assign(SELECT_AGGREGATES);
- query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
+ if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
+ if (is_aggregate) {
+ query.assign(SELECT_AGGREGATES_30);
+ query.append(" WHERE keyspace_name=? AND aggregate_name=? AND argument_types=?");
+ } else {
+ query.assign(SELECT_FUNCTIONS_30);
+ query.append(" WHERE keyspace_name=? AND function_name=? AND argument_types=?");
+ }
} else {
- query.assign(SELECT_FUNCTIONS);
- query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
+ if (is_aggregate) {
+ query.assign(SELECT_AGGREGATES_22);
+ query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
+ } else {
+ query.assign(SELECT_FUNCTIONS_22);
+ query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
+ }
}
LOG_DEBUG("Refreshing %s %s in keyspace %s",
diff --git a/src/control_connection.hpp b/src/control_connection.hpp
index 4a72555e1..472167de3 100644
--- a/src/control_connection.hpp
+++ b/src/control_connection.hpp
@@ -106,7 +106,7 @@ class ControlConnection : public Connection::Listener {
std::string table_name;
};
- struct QueryMetadataAllData {};
+ struct UnusedData {};
template
class ControlHandler : public Handler {
@@ -180,28 +180,32 @@ class ControlConnection : public Connection::Listener {
virtual void on_availability_change(Connection* connection) {}
virtual void on_event(EventResponse* response);
- //TODO: possibly reorder callback functions to pair with initiator
- static void on_query_meta_all(ControlConnection* control_connection,
- const QueryMetadataAllData& data,
- const MultipleRequestHandler::ResponseVec& responses);
- static void on_refresh_node_info(ControlConnection* control_connection,
- const RefreshNodeData& data,
- Response* response);
- static void on_refresh_node_info_all(ControlConnection* control_connection,
- const RefreshNodeData& data,
- Response* response);
- void on_local_query(ResponseMessage* response);
- void on_peer_query(ResponseMessage* response);
static void on_reconnect(Timer* timer);
bool handle_query_invalid_response(Response* response);
void handle_query_failure(CassError code, const std::string& message);
void handle_query_timeout();
- void query_meta_all();
+ void query_meta_hosts();
+ static void on_query_hosts(ControlConnection* control_connection,
+ const UnusedData& data,
+ const MultipleRequestHandler::ResponseVec& responses);
+
+ void query_meta_schema();
+ static void on_query_meta_schema(ControlConnection* control_connection,
+ const UnusedData& data,
+ const MultipleRequestHandler::ResponseVec& responses);
+
void refresh_node_info(SharedRefPtr host,
bool is_new_node,
bool query_tokens = false);
+ static void on_refresh_node_info(ControlConnection* control_connection,
+ const RefreshNodeData& data,
+ Response* response);
+ static void on_refresh_node_info_all(ControlConnection* control_connection,
+ const RefreshNodeData& data,
+ Response* response);
+
void update_node_info(SharedRefPtr host, const Row* row);
void refresh_keyspace(const StringRef& keyspace_name);
diff --git a/src/data_type.cpp b/src/data_type.cpp
index 101850cf4..286f42198 100644
--- a/src/data_type.cpp
+++ b/src/data_type.cpp
@@ -238,6 +238,19 @@ CassError cass_data_type_set_class_name_n(CassDataType* data_type,
return CASS_OK;
}
+size_t cass_data_sub_type_count(const CassDataType* data_type) {
+ if (data_type->is_collection() || data_type->is_tuple()) {
+ const cass::SubTypesDataType* sub_types
+ = static_cast(data_type->from());
+ return sub_types->types().size();
+ } else if (data_type->is_user_type()) {
+ const cass::UserType* user_type
+ = static_cast(data_type->from());
+ return user_type->fields().size();
+ }
+ return 0;
+}
+
CassError cass_data_type_sub_type_name(const CassDataType* data_type,
size_t index,
const char** name,
@@ -360,20 +373,81 @@ void cass_data_type_free(CassDataType* data_type) {
namespace cass {
-const SharedRefPtr DataType::NIL;
+const DataType::ConstPtr DataType::NIL;
+
+void NativeDataTypes::init_class_names() {
+ if (!by_class_names_.empty()) return;
+ by_class_names_["org.apache.cassandra.db.marshal.AsciiType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_ASCII));
+ by_class_names_["org.apache.cassandra.db.marshal.BooleanType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BOOLEAN));
+ by_class_names_["org.apache.cassandra.db.marshal.ByteType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TINY_INT));
+ by_class_names_["org.apache.cassandra.db.marshal.BytesType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BLOB));
+ by_class_names_["org.apache.cassandra.db.marshal.CounterColumnType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_COUNTER));
+ by_class_names_["org.apache.cassandra.db.marshal.DateType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+ by_class_names_["org.apache.cassandra.db.marshal.DecimalType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DECIMAL));
+ by_class_names_["org.apache.cassandra.db.marshal.DoubleType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DOUBLE));
+ by_class_names_["org.apache.cassandra.db.marshal.FloatType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_FLOAT));
+ by_class_names_["org.apache.cassandra.db.marshal.InetAddressType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INET));
+ by_class_names_["org.apache.cassandra.db.marshal.Int32Type"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+ by_class_names_["org.apache.cassandra.db.marshal.IntegerType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+ by_class_names_["org.apache.cassandra.db.marshal.LongType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BIGINT));
+ by_class_names_["org.apache.cassandra.db.marshal.ShortType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_SMALL_INT));
+ by_class_names_["org.apache.cassandra.db.marshal.SimpleDateType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DATE));
+ by_class_names_["org.apache.cassandra.db.marshal.TimeType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIME));
+ by_class_names_["org.apache.cassandra.db.marshal.TimestampType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+ by_class_names_["org.apache.cassandra.db.marshal.TimeUUIDType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMEUUID));
+ by_class_names_["org.apache.cassandra.db.marshal.UTF8Type"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TEXT));
+ by_class_names_["org.apache.cassandra.db.marshal.UUIDType"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_UUID));
+}
+
+const DataType::ConstPtr& NativeDataTypes::by_class_name(const std::string& name) const {
+ DataTypeMap::const_iterator i = by_class_names_.find(name);
+ if (i == by_class_names_.end()) return DataType::NIL;
+ return i->second;
+}
+
+void NativeDataTypes::init_cql_names() {
+ if (!by_cql_names_.empty()) return;
+ by_cql_names_["ascii"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_ASCII));
+ by_cql_names_["bigint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BIGINT));
+ by_cql_names_["blob"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BLOB));
+ by_cql_names_["boolean"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_BOOLEAN));
+ by_cql_names_["counter"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_COUNTER));
+ by_cql_names_["date"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DATE));
+ by_cql_names_["decimal"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DECIMAL));
+ by_cql_names_["double"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_DOUBLE));
+ by_cql_names_["float"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_FLOAT));
+ by_cql_names_["inet"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INET));
+ by_cql_names_["int"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_INT));
+ by_cql_names_["smallint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_SMALL_INT));
+ by_cql_names_["time"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIME));
+ by_cql_names_["timestamp"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMESTAMP));
+ by_cql_names_["timeuuid"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TIMEUUID));
+ by_cql_names_["tinyint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TINY_INT));
+ by_cql_names_["text"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_TEXT));
+ by_cql_names_["uuid"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_UUID));
+ by_cql_names_["varchar"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_VARCHAR));
+ by_cql_names_["varint"] = DataType::ConstPtr(new DataType(CASS_VALUE_TYPE_VARINT));
+}
+
+const DataType::ConstPtr& NativeDataTypes::by_cql_name(const std::string& name) const {
+ DataTypeMap::const_iterator i = by_cql_names_.find(name);
+ if (i == by_cql_names_.end()) return DataType::NIL;
+ return i->second;
+}
+
bool cass::IsValidDataType::operator()(const Collection* value,
- const SharedRefPtr& data_type) const {
+ const DataType::ConstPtr& data_type) const {
return value->data_type()->equals(data_type);
}
bool cass::IsValidDataType::operator()(const Tuple* value,
- const SharedRefPtr& data_type) const {
+ const DataType::ConstPtr& data_type) const {
return value->data_type()->equals(data_type);
}
bool cass::IsValidDataType::operator()(const UserTypeValue* value,
- const SharedRefPtr& data_type) const {
+ const DataType::ConstPtr& data_type) const {
return value->data_type()->equals(data_type);
}
diff --git a/src/data_type.hpp b/src/data_type.hpp
index 638728ca7..411cee0cb 100644
--- a/src/data_type.hpp
+++ b/src/data_type.hpp
@@ -65,10 +65,10 @@ inline bool equals_both_not_empty(const std::string& s1,
class DataType : public RefCounted {
public:
- typedef SharedRefPtr Ptr;
- typedef std::vector Vec;
+ typedef SharedRefPtr ConstPtr;
+ typedef std::vector Vec;
- static const SharedRefPtr NIL;
+ static const DataType::ConstPtr NIL;
DataType(CassValueType value_type)
: value_type_(value_type) { }
@@ -99,7 +99,7 @@ class DataType : public RefCounted {
return value_type_ == CASS_VALUE_TYPE_CUSTOM;
}
- virtual bool equals(const SharedRefPtr& data_type) const {
+ virtual bool equals(const DataType::ConstPtr& data_type) const {
switch (value_type_) {
// "text" is an alias for "varchar"
case CASS_VALUE_TYPE_TEXT:
@@ -168,7 +168,7 @@ class CustomType : public DataType {
class_name_ = class_name;
}
- virtual bool equals(const SharedRefPtr& data_type) const {
+ virtual bool equals(const DataType::ConstPtr& data_type) const {
assert(value_type() == CASS_VALUE_TYPE_CUSTOM);
if (data_type->value_type() != CASS_VALUE_TYPE_CUSTOM) {
return false;
@@ -221,6 +221,8 @@ class SubTypesDataType : public DataType {
class CollectionType : public SubTypesDataType {
public:
+ typedef SharedRefPtr ConstPtr;
+
CollectionType(CassValueType collection_type)
: SubTypesDataType(collection_type) { }
@@ -233,7 +235,7 @@ class CollectionType : public SubTypesDataType {
CollectionType(CassValueType collection_type, const DataType::Vec& types)
: SubTypesDataType(collection_type, types) { }
- virtual bool equals(const SharedRefPtr& data_type) const {
+ virtual bool equals(const DataType::ConstPtr& data_type) const {
assert(value_type() == CASS_VALUE_TYPE_LIST ||
value_type() == CASS_VALUE_TYPE_SET ||
value_type() == CASS_VALUE_TYPE_MAP);
@@ -242,7 +244,7 @@ class CollectionType : public SubTypesDataType {
return false;
}
- const SharedRefPtr& collection_type(data_type);
+ const CollectionType::ConstPtr& collection_type(data_type);
// Only compare sub-types if both have sub-types
if(!types_.empty() && !collection_type->types_.empty()) {
@@ -264,36 +266,38 @@ class CollectionType : public SubTypesDataType {
}
public:
- static SharedRefPtr list(SharedRefPtr element_type) {
+ static DataType::ConstPtr list(DataType::ConstPtr element_type) {
DataType::Vec types;
types.push_back(element_type);
- return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_LIST, types));
+ return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_LIST, types));
}
- static SharedRefPtr set(SharedRefPtr element_type) {
+ static DataType::ConstPtr set(DataType::ConstPtr element_type) {
DataType::Vec types;
types.push_back(element_type);
- return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_SET, types));
+ return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_SET, types));
}
- static SharedRefPtr map(SharedRefPtr key_type,
- SharedRefPtr value_type) {
+ static DataType::ConstPtr map(DataType::ConstPtr key_type,
+ DataType::ConstPtr value_type) {
DataType::Vec types;
types.push_back(key_type);
types.push_back(value_type);
- return SharedRefPtr(new CollectionType(CASS_VALUE_TYPE_MAP, types));
+ return DataType::ConstPtr(new CollectionType(CASS_VALUE_TYPE_MAP, types));
}
};
class TupleType : public SubTypesDataType {
public:
+ typedef SharedRefPtr ConstPtr;
+
TupleType()
: SubTypesDataType(CASS_VALUE_TYPE_TUPLE) { }
TupleType(const DataType::Vec& types)
: SubTypesDataType(CASS_VALUE_TYPE_TUPLE, types) { }
- virtual bool equals(const SharedRefPtr& data_type) const {
+ virtual bool equals(const DataType::ConstPtr& data_type) const {
assert(value_type() == CASS_VALUE_TYPE_TUPLE);
if (value_type() != data_type->value_type()) {
@@ -322,17 +326,20 @@ class TupleType : public SubTypesDataType {
}
};
-
class UserType : public DataType {
public:
+ typedef SharedRefPtr Ptr;
+ typedef SharedRefPtr ConstPtr;
+ typedef std::map Map;
+
struct Field : public HashTableEntry {
Field(const std::string& field_name,
- const SharedRefPtr& type)
+ const DataType::ConstPtr& type)
: name(field_name)
, type(type) { }
std::string name;
- SharedRefPtr type;
+ DataType::ConstPtr type;
};
typedef CaseInsensitiveHashTable::EntryVec FieldVec;
@@ -344,6 +351,12 @@ class UserType : public DataType {
: DataType(CASS_VALUE_TYPE_UDT)
, fields_(field_count) { }
+ UserType(const std::string& keyspace,
+ const std::string& type_name )
+ : DataType(CASS_VALUE_TYPE_UDT)
+ , keyspace_(keyspace)
+ , type_name_(type_name) { }
+
UserType(const std::string& keyspace,
const std::string& type_name,
const FieldVec& fields)
@@ -370,17 +383,21 @@ class UserType : public DataType {
return fields_.get_indices(name, result);
}
- void add_field(const std::string name, const SharedRefPtr& data_type) {
+ void add_field(const std::string name, const DataType::ConstPtr& data_type) {
fields_.add(Field(name, data_type));
}
- virtual bool equals(const SharedRefPtr& data_type) const {
+ void set_fields(const FieldVec& fields) {
+ fields_.set_entries(fields);
+ }
+
+ virtual bool equals(const DataType::ConstPtr& data_type) const {
assert(value_type() == CASS_VALUE_TYPE_UDT);
if (data_type->value_type() != CASS_VALUE_TYPE_UDT) {
return false;
}
- const SharedRefPtr& user_type(data_type);
+ const UserType::ConstPtr& user_type(data_type);
if (!equals_both_not_empty(keyspace_, user_type->keyspace_)) {
return false;
@@ -418,120 +435,134 @@ class UserType : public DataType {
CaseInsensitiveHashTable fields_;
};
+class NativeDataTypes {
+public:
+ void init_class_names();
+ const DataType::ConstPtr& by_class_name(const std::string& name) const;
+
+ void init_cql_names();
+ const DataType::ConstPtr& by_cql_name(const std::string& name) const;
+
+private:
+ typedef std::map DataTypeMap;
+ DataTypeMap by_class_names_;
+ DataTypeMap by_cql_names_;
+};
+
template
struct IsValidDataType;
template<>
struct IsValidDataType {
- bool operator()(CassNull, const SharedRefPtr& data_type) const {
+ bool operator()(CassNull, const DataType::ConstPtr& data_type) const {
return true;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_int8_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_int8_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_TINY_INT;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_int16_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_int16_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_SMALL_INT;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_int32_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_int32_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_INT;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_uint32_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_uint32_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_DATE;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_int64_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_int64_t, const DataType::ConstPtr& data_type) const {
return is_int64_type(data_type->value_type());
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_float_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_float_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_FLOAT;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_double_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_double_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_DOUBLE;
}
};
template<>
struct IsValidDataType {
- bool operator()(cass_bool_t, const SharedRefPtr& data_type) const {
+ bool operator()(cass_bool_t, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_BOOLEAN;
}
};
template<>
struct IsValidDataType {
- bool operator()(CassString, const SharedRefPtr& data_type) const {
+ bool operator()(CassString, const DataType::ConstPtr& data_type) const {
return is_string_type(data_type->value_type());
}
};
template<>
struct IsValidDataType {
- bool operator()(CassBytes, const SharedRefPtr& data_type) const {
+ bool operator()(CassBytes, const DataType::ConstPtr& data_type) const {
return is_bytes_type(data_type->value_type());
}
};
template<>
struct IsValidDataType {
- bool operator()(CassUuid, const SharedRefPtr& data_type) const {
+ bool operator()(CassUuid, const DataType::ConstPtr& data_type) const {
return is_uuid_type(data_type->value_type());
}
};
template<>
struct IsValidDataType {
- bool operator()(CassInet, const SharedRefPtr& data_type) const {
+ bool operator()(CassInet, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_INET;
}
};
template<>
struct IsValidDataType {
- bool operator()(CassDecimal, const SharedRefPtr& data_type) const {
+ bool operator()(CassDecimal, const DataType::ConstPtr& data_type) const {
return data_type->value_type() == CASS_VALUE_TYPE_DECIMAL;
}
};
template<>
struct IsValidDataType {
- bool operator()(const Collection* value, const SharedRefPtr& data_type) const;
+ bool operator()(const Collection* value, const DataType::ConstPtr& data_type) const;
};
template<>
struct IsValidDataType {
- bool operator()(const Tuple* value, const SharedRefPtr& data_type) const;
+ bool operator()(const Tuple* value, const DataType::ConstPtr& data_type) const;
};
template<>
struct IsValidDataType {
- bool operator()(const UserTypeValue* value, const SharedRefPtr& data_type) const;
+ bool operator()(const UserTypeValue* value, const DataType::ConstPtr& data_type) const;
};
} // namespace cass
diff --git a/src/type_parser.cpp b/src/data_type_parser.cpp
similarity index 54%
rename from src/type_parser.cpp
rename to src/data_type_parser.cpp
index 79dc81494..f233a10d6 100644
--- a/src/type_parser.cpp
+++ b/src/data_type_parser.cpp
@@ -14,7 +14,7 @@
limitations under the License.
*/
-#include "type_parser.hpp"
+#include "data_type_parser.hpp"
#include "utils.hpp"
#include "logger.hpp"
@@ -34,71 +34,8 @@
#define UDT_TYPE "org.apache.cassandra.db.marshal.UserType"
#define TUPLE_TYPE "org.apache.cassandra.db.marshal.TupleType"
-#define MARSHAL_PACKAGE "org.apache.cassandra.db.marshal."
-
namespace cass {
-static CassValueType get_value_type(const std::string& str) {
- if (starts_with(str, MARSHAL_PACKAGE)) {
- StringRef type(StringRef(str).substr(sizeof(MARSHAL_PACKAGE) - 1));
- switch (type.front()) {
- case 'A':
- if (type == "AsciiType") return CASS_VALUE_TYPE_ASCII;
- break;
-
- case 'B':
- if (type == "BooleanType") return CASS_VALUE_TYPE_BOOLEAN;
- if (type == "ByteType") return CASS_VALUE_TYPE_TINY_INT;
- if (type == "BytesType") return CASS_VALUE_TYPE_BLOB;
- break;
-
- case 'C':
- if (type == "CounterColumnType") return CASS_VALUE_TYPE_COUNTER;
- break;
-
- case 'D':
- if (type == "DateType") return CASS_VALUE_TYPE_TIMESTAMP;
- if (type == "DecimalType") return CASS_VALUE_TYPE_DECIMAL;
- if (type == "DoubleType") return CASS_VALUE_TYPE_DOUBLE;
- break;
-
- case 'F':
- if (type == "FloatType") return CASS_VALUE_TYPE_FLOAT;
- break;
-
- case 'I':
- if (type == "InetAddressType") return CASS_VALUE_TYPE_INET;
- if (type == "Int32Type") return CASS_VALUE_TYPE_INT;
- if (type == "IntegerType") return CASS_VALUE_TYPE_INT;
- break;
-
- case 'L':
- if (type == "LongType") return CASS_VALUE_TYPE_BIGINT;
- break;
-
- case 'S':
- if (type == "ShortType") return CASS_VALUE_TYPE_SMALL_INT;
- if (type == "SimpleDateType") return CASS_VALUE_TYPE_DATE;
- break;
-
- case 'T':
- if (type == "TimeType") return CASS_VALUE_TYPE_TIME;
- if (type == "TimestampType") return CASS_VALUE_TYPE_TIMESTAMP;
- if (type == "TimeUUIDType") return CASS_VALUE_TYPE_TIMEUUID;
- break;
-
- case 'U':
- if (type == "UTF8Type") return CASS_VALUE_TYPE_TEXT;
- if (type == "UUIDType") return CASS_VALUE_TYPE_UUID;
- break;
-
- default:
- break;
- }
- }
- return CASS_VALUE_TYPE_UNKNOWN;
-}
-
int hex_value(int c) {
if (c >= '0' && c <= '9') {
return c - '0';
@@ -127,38 +64,218 @@ bool from_hex(const std::string& hex, std::string* result) {
return true;
}
-bool TypeParser::is_reversed(const std::string& type) {
+DataType::ConstPtr DataTypeCqlNameParser::parse(const std::string& type,
+ const NativeDataTypes& native_types,
+ KeyspaceMetadata* keyspace) {
+ Parser parser(type, 0);
+ std::string type_name;
+ Parser::TypeParamsVec params;
+
+ parser.parse_type_name(&type_name);
+ std::transform(type_name.begin(), type_name.end(), type_name.begin(), tolower);
+
+ DataType::ConstPtr native_type(native_types.by_cql_name(type_name));
+ if (native_type) {
+ return native_type;
+ }
+
+ if (type_name == "list") {
+ parser.parse_type_parameters(¶ms);
+ if (params.size() != 1) {
+ LOG_ERROR("Expecting single parameter for list %s", type.c_str());
+ return DataType::NIL;
+ }
+ DataType::ConstPtr element_type = parse(params[0], native_types, keyspace);
+ return CollectionType::list(element_type);
+ }
+
+ if (type_name == "set") {
+ parser.parse_type_parameters(¶ms);
+ if (params.size() != 1) {
+ LOG_ERROR("Expecting single parameter for set %s", type.c_str());
+ return DataType::NIL;
+ }
+ DataType::ConstPtr element_type = parse(params[0], native_types, keyspace);
+ return CollectionType::set(element_type);
+ }
+
+ if (type_name == "map") {
+ parser.parse_type_parameters(¶ms);
+ if (params.size() != 2) {
+ LOG_ERROR("Expecting two parameters for set %s", type.c_str());
+ return DataType::NIL;
+ }
+ DataType::ConstPtr key_type = parse(params[0], native_types, keyspace);
+ DataType::ConstPtr value_type = parse(params[1], native_types, keyspace);
+ return CollectionType::map(key_type, value_type);
+ }
+
+ if (type_name == "tuple") {
+ parser.parse_type_parameters(¶ms);
+ if (params.empty()) {
+ LOG_ERROR("Expecting at least a one parameter for tuple %s", type.c_str());
+ return DataType::NIL;
+ }
+ DataType::Vec types;
+ for (Parser::TypeParamsVec::iterator i = params.begin(),
+ end = params.end();
+ i != end; ++i) {
+ types.push_back(parse(*i, native_types, keyspace));
+ }
+ return DataType::ConstPtr(new TupleType(types));
+ }
+
+ if (type_name == "frozen") {
+ parser.parse_type_parameters(¶ms);
+ if (params.size() != 1) {
+ LOG_ERROR("Expecting single parameter for frozen keyword %s", type.c_str());
+ return DataType::NIL;
+ }
+ return parse(params[0], native_types, keyspace);
+ }
+
+ if (type_name == "empty") {
+ return DataType::ConstPtr(new CustomType(type_name));
+ }
+
+ if (type_name.empty()) {
+ return DataType::NIL;
+ }
+
+ return keyspace->get_or_create_user_type(type_name);
+}
+
+void DataTypeCqlNameParser::Parser::parse_type_name(std::string* name) {
+ skip_blank();
+ read_next_identifier(name);
+}
+
+void DataTypeCqlNameParser::Parser::parse_type_parameters(TypeParamsVec* params) {
+ params->clear();
+
+ if (is_eos()) return;
+
+ skip_blank_and_comma();
+
+ if (str_[index_] != '<') {
+ LOG_ERROR("Expecting char %u of %s to be '<' but '%c' found",
+ (unsigned int)index_, str_.c_str(), str_[index_]);
+ return;
+ }
+
+ ++index_; // Skip '<'
+
+ std::string name;
+ std::string args;
+ while (skip_blank_and_comma()) {
+ if (str_[index_] == '>') {
+ ++index_;
+ return;
+ }
+ parse_type_name(&name);
+ if (!read_raw_type_parameters(&args))
+ return;
+ params->push_back(name + args);
+ }
+}
+
+void DataTypeCqlNameParser::Parser::read_next_identifier(std::string* name) {
+ size_t start_index = index_;
+ if (str_[start_index] == '"') {
+ ++index_;
+ while (!is_eos()) {
+ bool is_quote = str_[index_] == '"';
+ ++index_;
+ if (is_quote) {
+ if (!is_eos() && str_[index_] == '"') {
+ ++index_;
+ } else {
+ break;
+ }
+ }
+ }
+ } else {
+ while (!is_eos() && (is_identifier_char(str_[index_]) || str_[index_] == '"')) {
+ ++index_;
+ }
+ }
+ name->assign(str_.begin() + start_index, str_.begin() + index_);
+}
+
+bool DataTypeCqlNameParser::Parser::read_raw_type_parameters(std::string* params) {
+ skip_blank();
+
+ params->clear();
+
+ if (is_eos() || str_[index_] == '>' || str_[index_] == ',') return true;
+
+ if (str_[index_] != '<') {
+ LOG_ERROR("Expecting char %u of %s to be '<' but '%c' found",
+ (unsigned int)index_, str_.c_str(), str_[index_]);
+ return false;
+ }
+
+ size_t start_index = index_;
+ int open = 1;
+ bool in_quotes = false;
+ while (open > 0) {
+ ++index_;
+
+ if (is_eos()) {
+ LOG_ERROR("Angle brackets not closed in type %s", str_.c_str());
+ return false;
+ }
+
+ if (!in_quotes) {
+ if (str_[index_] == '"') {
+ in_quotes = true;
+ } else if (str_[index_] == '<') {
+ open++;
+ } else if (str_[index_] == '>') {
+ open--;
+ }
+ } else if (str_[index_] == '"') {
+ in_quotes = false;
+ }
+ }
+
+ ++index_; // Move past the trailing '>'
+ params->assign(str_.begin() + start_index, str_.begin() + index_);
+ return true;
+}
+
+bool DataTypeClassNameParser::is_reversed(const std::string& type) {
return starts_with(type, REVERSED_TYPE);
}
-bool TypeParser::is_frozen(const std::string& type) {
+bool DataTypeClassNameParser::is_frozen(const std::string& type) {
return starts_with(type, FROZEN_TYPE);
}
-bool TypeParser::is_composite(const std::string& type) {
+bool DataTypeClassNameParser::is_composite(const std::string& type) {
return starts_with(type, COMPOSITE_TYPE);
}
-bool TypeParser::is_collection(const std::string& type) {
+bool DataTypeClassNameParser::is_collection(const std::string& type) {
return starts_with(type, COLLECTION_TYPE);
}
-bool TypeParser::is_user_type(const std::string& type) {
+bool DataTypeClassNameParser::is_user_type(const std::string& type) {
return starts_with(type, UDT_TYPE);
}
-bool TypeParser::is_tuple_type(const std::string& type) {
+bool DataTypeClassNameParser::is_tuple_type(const std::string& type) {
return starts_with(type, TUPLE_TYPE);
}
-SharedRefPtr TypeParser::parse_one(const std::string& type) {
+DataType::ConstPtr DataTypeClassNameParser::parse_one(const std::string& type, const NativeDataTypes& native_types) {
bool frozen = is_frozen(type);
std::string class_name;
if (is_reversed(type) || frozen) {
if (!get_nested_class_name(type, &class_name)) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
} else {
class_name = type;
@@ -172,32 +289,32 @@ SharedRefPtr TypeParser::parse_one(const std::string& type) {
if (starts_with(next, LIST_TYPE)) {
TypeParamsVec params;
if (!parser.get_type_params(¶ms) || params.empty()) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
- SharedRefPtr element_type(parse_one(params[0]));
+ DataType::ConstPtr element_type(parse_one(params[0], native_types));
if (!element_type) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
return CollectionType::list(element_type);
} else if(starts_with(next, SET_TYPE)) {
TypeParamsVec params;
if (!parser.get_type_params(¶ms) || params.empty()) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
- SharedRefPtr element_type(parse_one(params[0]));
+ DataType::ConstPtr element_type(parse_one(params[0], native_types));
if (!element_type) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
return CollectionType::set(element_type);
} else if(starts_with(next, MAP_TYPE)) {
TypeParamsVec params;
if (!parser.get_type_params(¶ms) || params.size() < 2) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
- SharedRefPtr key_type(parse_one(params[0]));
- SharedRefPtr value_type(parse_one(params[1]));
+ DataType::ConstPtr key_type(parse_one(params[0], native_types));
+ DataType::ConstPtr value_type(parse_one(params[1], native_types));
if (!key_type || !value_type) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
return CollectionType::map(key_type, value_type);
}
@@ -212,77 +329,80 @@ SharedRefPtr TypeParser::parse_one(const std::string& type) {
std::string keyspace;
if (!parser.read_one(&keyspace)) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
parser.skip_blank_and_comma();
std::string hex;
if (!parser.read_one(&hex)) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
std::string type_name;
if (!from_hex(hex, &type_name)) {
LOG_ERROR("Invalid hex string \"%s\" for parameter", hex.c_str());
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
if (keyspace.empty() || type_name.empty()) {
LOG_ERROR("UDT has no keyspace or type name");
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
parser.skip_blank_and_comma();
NameAndTypeParamsVec raw_fields;
if (!parser.get_name_and_type_params(&raw_fields)) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
UserType::FieldVec fields;
for (NameAndTypeParamsVec::const_iterator i = raw_fields.begin(),
end = raw_fields.end(); i != end; ++i) {
- SharedRefPtr data_type = parse_one(i->second);
+ DataType::ConstPtr data_type = parse_one(i->second, native_types);
if (!data_type) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
fields.push_back(UserType::Field(i->first, data_type));
}
- return SharedRefPtr(new UserType(keyspace, type_name, fields));
+ return DataType::ConstPtr(new UserType(keyspace, type_name, fields));
}
if (is_tuple_type(type)) {
TypeParamsVec raw_types;
if (!parser.get_type_params(&raw_types)) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
DataType::Vec types;
for (TypeParamsVec::const_iterator i = raw_types.begin(),
end = raw_types.end(); i != end; ++i) {
- SharedRefPtr data_type = parse_one(*i);
+ DataType::ConstPtr data_type = parse_one(*i, native_types);
if (!data_type) {
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
types.push_back(data_type);
}
- return SharedRefPtr(new TupleType(types));
+ return DataType::ConstPtr(new TupleType(types));
+ }
+
+ DataType::ConstPtr native_type(native_types.by_class_name(next));
+ if (native_type) {
+ return native_type;
}
- CassValueType t = get_value_type(next);
- return t == CASS_VALUE_TYPE_UNKNOWN ? SharedRefPtr(new CustomType(next))
- : SharedRefPtr(new DataType(t));
+ return DataType::ConstPtr(new CustomType(next));
}
-SharedRefPtr TypeParser::parse_with_composite(const std::string& type) {
+SharedRefPtr DataTypeClassNameParser::parse_with_composite(const std::string& type, const NativeDataTypes& native_types) {
Parser parser(type, 0);
std::string next;
parser.get_next_name(&next);
if (!is_composite(next)) {
- SharedRefPtr data_type = parse_one(type);
+ DataType::ConstPtr data_type = parse_one(type, native_types);
if (!data_type) {
return SharedRefPtr();
}
@@ -315,7 +435,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
for (NameAndTypeParamsVec::const_iterator i = params.begin(),
end = params.end(); i != end; ++i) {
- SharedRefPtr data_type = parse_one(i->second);
+ DataType::ConstPtr data_type = parse_one(i->second, native_types);
if (!data_type) {
return SharedRefPtr();
}
@@ -326,7 +446,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
DataType::Vec types;
ParseResult::ReversedVec reversed;
for (size_t i = 0; i < count; ++i) {
- SharedRefPtr data_type = parse_one(sub_class_names[i]);
+ DataType::ConstPtr data_type = parse_one(sub_class_names[i], native_types);
if (!data_type) {
return SharedRefPtr();
}
@@ -337,7 +457,7 @@ SharedRefPtr TypeParser::parse_with_composite(const std::string& ty
return SharedRefPtr(new ParseResult(true, types, reversed, collections));
}
-bool TypeParser::get_nested_class_name(const std::string& type, std::string* class_name) {
+bool DataTypeClassNameParser::get_nested_class_name(const std::string& type, std::string* class_name) {
Parser parser(type, 0);
parser.get_next_name();
TypeParamsVec params;
@@ -349,7 +469,7 @@ bool TypeParser::get_nested_class_name(const std::string& type, std::string* cla
return true;
}
-bool TypeParser::Parser::read_one(std::string* name_and_args) {
+bool DataTypeClassNameParser::Parser::read_one(std::string* name_and_args) {
std::string name;
get_next_name(&name);
std::string args;
@@ -360,12 +480,12 @@ bool TypeParser::Parser::read_one(std::string* name_and_args) {
return true;
}
-void TypeParser::Parser::get_next_name(std::string* name) {
+void DataTypeClassNameParser::Parser::get_next_name(std::string* name) {
skip_blank();
read_next_identifier(name);
}
-bool TypeParser::Parser::get_type_params(TypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_type_params(TypeParamsVec* params) {
if (is_eos()) {
params->clear();
return true;
@@ -395,7 +515,7 @@ bool TypeParser::Parser::get_type_params(TypeParamsVec* params) {
return false;
}
-bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params) {
while (skip_blank_and_comma()) {
if (str_[index_] == ')') {
++index_;
@@ -408,7 +528,7 @@ bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params)
std::string name;
if (!from_hex(hex, &name)) {
LOG_ERROR("Invalid hex string \"%s\" for parameter", hex.c_str());
- return SharedRefPtr();
+ return DataType::ConstPtr();
}
skip_blank();
@@ -434,7 +554,7 @@ bool TypeParser::Parser::get_name_and_type_params(NameAndTypeParamsVec* params)
return false;
}
-bool TypeParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
+bool DataTypeClassNameParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
if (is_eos()) {
params->clear();
return true;
@@ -450,31 +570,7 @@ bool TypeParser::Parser::get_collection_params(NameAndTypeParamsVec* params) {
return get_name_and_type_params(params);
}
-void TypeParser::Parser::skip_blank() {
- while (!is_eos() && is_blank(str_[index_])) {
- ++index_;
- }
-}
-
-bool TypeParser::Parser::skip_blank_and_comma() {
- bool comma_found = false;
- while (!is_eos()) {
- int c = str_[index_];
- if (c == ',') {
- if (comma_found) {
- return true;
- } else {
- comma_found = true;
- }
- } else if (!is_blank(c)) {
- return true;
- }
- ++index_;
- }
- return false;
-}
-
-bool TypeParser::Parser::read_raw_arguments(std::string* args) {
+bool DataTypeClassNameParser::Parser::read_raw_arguments(std::string* args) {
skip_blank();
if (is_eos() || str_[index_] == ')' || str_[index_] == ',') {
@@ -509,7 +605,7 @@ bool TypeParser::Parser::read_raw_arguments(std::string* args) {
return true;
}
-void TypeParser::Parser::read_next_identifier(std::string* name) {
+void DataTypeClassNameParser::Parser::read_next_identifier(std::string* name) {
size_t i = index_;
while (!is_eos() && is_identifier_char(str_[index_]))
++index_;
@@ -522,7 +618,7 @@ void TypeParser::Parser::read_next_identifier(std::string* name) {
}
}
-void TypeParser::Parser::parse_error(const std::string& str,
+void DataTypeClassNameParser::Parser::parse_error(const std::string& str,
size_t index,
const char* error) {
LOG_ERROR("Error parsing '%s' at %u index: %s",
diff --git a/src/type_parser.hpp b/src/data_type_parser.hpp
similarity index 65%
rename from src/type_parser.hpp
rename to src/data_type_parser.hpp
index cec0aac54..b1e0aaa3e 100644
--- a/src/type_parser.hpp
+++ b/src/data_type_parser.hpp
@@ -19,6 +19,7 @@
#include "cassandra.h"
#include "data_type.hpp"
+#include "metadata.hpp"
#include "ref_counted.hpp"
#include "value.hpp"
@@ -28,12 +29,86 @@
namespace cass {
+class ParserBase {
+public:
+ ParserBase(const std::string& str, size_t index)
+ : str_(str)
+ , index_(index) { }
+
+ void skip() { ++index_; }
+
+ void skip_blank() {
+ while (!is_eos() && is_blank(str_[index_])) {
+ ++index_;
+ }
+ }
+
+ bool skip_blank_and_comma() {
+ bool comma_found = false;
+ while (!is_eos()) {
+ int c = str_[index_];
+ if (c == ',') {
+ if (comma_found) {
+ return true;
+ } else {
+ comma_found = true;
+ }
+ } else if (!is_blank(c)) {
+ return true;
+ }
+ ++index_;
+ }
+ return false;
+ }
+
+ bool is_eos() const {
+ return index_ >= str_.length();
+ }
+
+ static bool is_identifier_char(int c) {
+ return (c >= '0' && c <= '9')
+ || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')
+ || c == '-' || c == '+' || c == '.' || c == '_' || c == '&';
+ }
+
+ static bool is_blank(int c) {
+ return c == ' ' || c == '\t' || c == '\n';
+ }
+
+protected:
+ const std::string str_;
+ size_t index_;
+};
+
+class DataTypeCqlNameParser {
+public:
+ static DataType::ConstPtr parse(const std::string& type,
+ const NativeDataTypes& native_types,
+ KeyspaceMetadata* keyspace);
+
+private:
+ class Parser : public ParserBase {
+ public:
+ typedef std::vector TypeParamsVec;
+
+ Parser(const std::string& str, size_t index)
+ : ParserBase(str, index) { }
+
+ void parse_type_name(std::string* name);
+ void parse_type_parameters(TypeParamsVec* params);
+
+ private:
+ void read_next_identifier(std::string* name);
+ bool read_raw_type_parameters(std::string* param);
+ };
+};
+
class ParseResult : public RefCounted {
public:
typedef std::vector ReversedVec;
- typedef std::map > CollectionMap;
+ typedef std::map CollectionMap;
- ParseResult(SharedRefPtr type, bool reversed)
+ ParseResult(DataType::ConstPtr type, bool reversed)
: is_composite_(false) {
types_.push_back(type);
reversed_.push_back(reversed);
@@ -60,7 +135,7 @@ class ParseResult : public RefCounted {
CollectionMap collections_;
};
-class TypeParser {
+class DataTypeClassNameParser {
public:
static bool is_reversed(const std::string& type);
static bool is_frozen(const std::string& type);
@@ -70,8 +145,8 @@ class TypeParser {
static bool is_user_type(const std::string& type);
static bool is_tuple_type(const std::string& type);
- static SharedRefPtr parse_one(const std::string& type);
- static SharedRefPtr parse_with_composite(const std::string& type);
+ static DataType::ConstPtr parse_one(const std::string& type, const NativeDataTypes& native_types);
+ static SharedRefPtr parse_with_composite(const std::string& type, const NativeDataTypes& native_types);
private:
static bool get_nested_class_name(const std::string& type, std::string* class_name);
@@ -79,15 +154,10 @@ class TypeParser {
typedef std::vector TypeParamsVec;
typedef std::vector > NameAndTypeParamsVec;
- class Parser {
+ class Parser : public ParserBase {
public:
Parser(const std::string& str, size_t index)
- : str_(str)
- , index_(index) {}
-
- void skip() { ++index_; }
- void skip_blank();
- bool skip_blank_and_comma();
+ : ParserBase(str, index) { }
bool read_one(std::string* name_and_args);
void get_next_name(std::string* name = NULL);
@@ -102,27 +172,9 @@ class TypeParser {
static void parse_error(const std::string& str,
size_t index,
const char* error);
-
- bool is_eos() const {
- return index_ >= str_.length();
- }
-
- static bool is_identifier_char(int c) {
- return (c >= '0' && c <= '9')
- || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')
- || c == '-' || c == '+' || c == '.' || c == '_' || c == '&';
- }
-
- static bool is_blank(int c) {
- return c == ' ' || c == '\t' || c == '\n';
- }
-
- private:
- const std::string str_;
- size_t index_;
};
- TypeParser();
+ DataTypeClassNameParser();
};
} // namespace cass
diff --git a/src/execute_request.hpp b/src/execute_request.hpp
index 91e31b610..7aeec814c 100644
--- a/src/execute_request.hpp
+++ b/src/execute_request.hpp
@@ -50,7 +50,7 @@ class ExecuteRequest : public Statement {
return metadata_->get_indices(name, indices);
}
- virtual const SharedRefPtr& get_type(size_t index) const {
+ virtual const DataType::ConstPtr& get_type(size_t index) const {
return metadata_->get_column_definition(index).data_type;
}
diff --git a/src/hash_table.hpp b/src/hash_table.hpp
index 19c4b40ee..3c70b85f2 100644
--- a/src/hash_table.hpp
+++ b/src/hash_table.hpp
@@ -89,6 +89,8 @@ class CaseInsensitiveHashTable {
size_t add(const T& entry);
const EntryVec& entries() const { return entries_; }
+ void set_entries(const EntryVec& entries);
+
size_t size() const { return entries_.size(); }
private:
@@ -114,10 +116,7 @@ CaseInsensitiveHashTable::CaseInsensitiveHashTable(size_t capacity) {
template
CaseInsensitiveHashTable::CaseInsensitiveHashTable(const EntryVec& entries) {
- reset(entries.size());
- for (size_t i = 0; i < entries.size(); ++i) {
- add(entries[i]);
- }
+ set_entries(entries);
}
template
@@ -177,6 +176,16 @@ size_t CaseInsensitiveHashTable::add(const T& entry) {
return index;
}
+
+template
+void CaseInsensitiveHashTable::set_entries(const EntryVec& entries) {
+ entries_.clear();
+ reset(entries.size());
+ for (size_t i = 0; i < entries.size(); ++i) {
+ add(entries[i]);
+ }
+}
+
template
void CaseInsensitiveHashTable::add_index(T* entry) {
size_t h = fnv1a_hash_lower(entry->name) & index_mask_;
diff --git a/src/host.hpp b/src/host.hpp
index 8dba02e46..9f3c4a716 100644
--- a/src/host.hpp
+++ b/src/host.hpp
@@ -53,6 +53,28 @@ class VersionNumber {
, minor_(0)
, patch_(0) { }
+ VersionNumber(int major, int minor, int patch)
+ : major_(major)
+ , minor_(minor)
+ , patch_(patch) { }
+
+ bool operator >=(const VersionNumber& other) const {
+ return compare(other) >= 0;
+ }
+
+ int compare(const VersionNumber& other) const {
+ if (major_ < other.major_) return -1;
+ if (major_ > other.major_) return 1;
+
+ if (minor_ < other.minor_) return -1;
+ if (minor_ > other.minor_) return 1;
+
+ if (patch_ < other.patch_) return -1;
+ if (patch_ > other.patch_) return 1;
+
+ return 0;
+ }
+
bool parse(const std::string& version);
int major() const { return major_; }
diff --git a/src/metadata.cpp b/src/metadata.cpp
index b1d7f7189..9af2bb829 100644
--- a/src/metadata.cpp
+++ b/src/metadata.cpp
@@ -26,7 +26,7 @@
#include "row.hpp"
#include "row_iterator.hpp"
#include "scoped_lock.hpp"
-#include "type_parser.hpp"
+#include "data_type_parser.hpp"
#include "value.hpp"
#include "third_party/rapidjson/rapidjson/document.h"
@@ -231,10 +231,6 @@ const CassDataType* cass_column_meta_data_type(const CassColumnMeta* column_meta
return CassDataType::to(column_meta->data_type().get());
}
-cass_bool_t cass_column_meta_is_reversed(const CassColumnMeta* column_meta) {
- return column_meta->is_reversed() ? cass_true : cass_false;
-}
-
const CassValue*
cass_column_meta_field_by_name(const CassColumnMeta* column_meta,
const char* name) {
@@ -505,6 +501,14 @@ const CassValue* cass_iterator_get_meta_field_value(const CassIterator* iterator
namespace cass {
+static const char* table_column_name(const cass::VersionNumber& cassandra_version) {
+ return cassandra_version >= VersionNumber(3, 0, 0) ? "table_name" : "columnfamily_name";
+}
+
+static const char* signature_column_name(const cass::VersionNumber& cassandra_version) {
+ return cassandra_version >= VersionNumber(3, 0, 0) ? "argument_types" : "signature";
+}
+
template
const T& as_const(const T& x) { return x; }
@@ -562,7 +566,7 @@ std::string Metadata::full_function_name(const std::string& name, const StringVe
Metadata::SchemaSnapshot Metadata::schema_snapshot() const {
ScopedMutex l(&mutex_);
return SchemaSnapshot(schema_snapshot_version_,
- protocol_version_,
+ config_.protocol_version,
front_.keyspaces());
}
@@ -573,9 +577,9 @@ void Metadata::update_keyspaces(ResultResponse* result) {
if (is_front_buffer()) {
ScopedMutex l(&mutex_);
- updating_->update_keyspaces(protocol_version_, result, updates);
+ updating_->update_keyspaces(config_, result, updates);
} else {
- updating_->update_keyspaces(protocol_version_, result, updates);
+ updating_->update_keyspaces(config_, result, updates);
}
for (KeyspaceMetadata::Map::const_iterator i = updates.begin(); i != updates.end(); ++i) {
@@ -588,9 +592,9 @@ void Metadata::update_tables(ResultResponse* tables_result, ResultResponse* colu
if (is_front_buffer()) {
ScopedMutex l(&mutex_);
- updating_->update_tables(protocol_version_, cassandra_version_, tables_result, columns_result);
+ updating_->update_tables(config_, tables_result, columns_result);
} else {
- updating_->update_tables(protocol_version_, cassandra_version_, tables_result, columns_result);
+ updating_->update_tables(config_, tables_result, columns_result);
}
}
@@ -599,9 +603,9 @@ void Metadata::update_user_types(ResultResponse* result) {
if (is_front_buffer()) {
ScopedMutex l(&mutex_);
- updating_->update_user_types(result);
+ updating_->update_user_types(config_, result);
} else {
- updating_->update_user_types(result);
+ updating_->update_user_types(config_, result);
}
}
@@ -610,9 +614,9 @@ void Metadata::update_functions(ResultResponse* result) {
if (is_front_buffer()) {
ScopedMutex l(&mutex_);
- updating_->update_functions(result);
+ updating_->update_functions(config_, result);
} else {
- updating_->update_functions(result);
+ updating_->update_functions(config_, result);
}
}
@@ -621,9 +625,9 @@ void Metadata::update_aggregates(ResultResponse* result) {
if (is_front_buffer()) {
ScopedMutex l(&mutex_);
- updating_->update_aggregates(protocol_version_, result);
+ updating_->update_aggregates(config_, result);
} else {
- updating_->update_aggregates(protocol_version_, result);
+ updating_->update_aggregates(config_, result);
}
}
@@ -683,6 +687,11 @@ void Metadata::drop_aggregate(const std::string& keyspace_name, const std::strin
}
void Metadata::clear_and_update_back() {
+ if (config_.cassandra_version >= VersionNumber(3, 0, 0)) {
+ config_.native_types.init_cql_names();
+ } else {
+ config_.native_types.init_class_names();
+ }
token_map_.clear();
back_.clear();
updating_ = &back_;
@@ -731,7 +740,11 @@ const Value* MetadataBase::add_field(const SharedRefPtr& buffer, cons
return value;
}
-void MetadataBase::add_json_list_field(int version, const Row* row, const std::string& name) {
+void MetadataBase::add_field(const SharedRefPtr& buffer, const Value& value, const std::string& name) {
+ fields_[name] = MetadataField(name, value, buffer);
+}
+
+void MetadataBase::add_json_list_field(int protocol_version, const Row* row, const std::string& name) {
const Value* value = row->get_by_name(name);
if (value == NULL) return;
if (value->size() <= 0) {
@@ -764,12 +777,12 @@ void MetadataBase::add_json_list_field(int version, const Row* row, const std::s
collection.append(cass::CassString(i->GetString(), i->GetStringLength()));
}
- size_t encoded_size = collection.get_items_size(version);
+ size_t encoded_size = collection.get_items_size(protocol_version);
SharedRefPtr encoded(RefBuffer::create(encoded_size));
- collection.encode_items(version, encoded->data());
+ collection.encode_items(protocol_version, encoded->data());
- Value list(version,
+ Value list(protocol_version,
collection.data_type(),
d.Size(),
encoded->data(),
@@ -777,12 +790,12 @@ void MetadataBase::add_json_list_field(int version, const Row* row, const std::s
fields_[name] = MetadataField(name, list, encoded);
}
-void MetadataBase::add_json_map_field(int version, const Row* row, const std::string& name) {
+const Value* MetadataBase::add_json_map_field(int protocol_version, const Row* row, const std::string& name) {
const Value* value = row->get_by_name(name);
- if (value == NULL) return;
+ if (value == NULL) return NULL;
if (value->size() <= 0) {
fields_[name] = MetadataField(name);
- return;
+ return NULL;
}
int32_t buffer_size = value->size();
@@ -795,13 +808,13 @@ void MetadataBase::add_json_map_field(int version, const Row* row, const std::st
if (d.HasParseError()) {
LOG_ERROR("Unable to parse JSON (object) for column '%s'", name.c_str());
- return;
+ return NULL;
}
if (!d.IsObject()) {
LOG_DEBUG("Expected JSON object for column '%s' (probably null or empty)", name.c_str());
fields_[name] = MetadataField(name);
- return;
+ return NULL;
}
Collection collection(CollectionType::map(SharedRefPtr(new DataType(CASS_VALUE_TYPE_TEXT)),
@@ -812,17 +825,18 @@ void MetadataBase::add_json_map_field(int version, const Row* row, const std::st
collection.append(CassString(i->value.GetString(), i->value.GetStringLength()));
}
- size_t encoded_size = collection.get_items_size(version);
+ size_t encoded_size = collection.get_items_size(protocol_version);
SharedRefPtr encoded(RefBuffer::create(encoded_size));
- collection.encode_items(version, encoded->data());
+ collection.encode_items(protocol_version, encoded->data());
- Value map(version,
+ Value map(protocol_version,
collection.data_type(),
d.MemberCount(),
encoded->data(),
encoded_size);
- fields_[name] = MetadataField(name, map, encoded);
+
+ return (fields_[name] = MetadataField(name, map, encoded)).value();
}
const TableMetadata* KeyspaceMetadata::get_table(const std::string& name) const {
@@ -848,21 +862,60 @@ void KeyspaceMetadata::drop_table(const std::string& table_name) {
tables_->erase(table_name);
}
+const UserType::Ptr& KeyspaceMetadata::get_or_create_user_type(const std::string& name) {
+ UserType::Map::iterator i = user_types_->find(name);
+ if (i == user_types_->end()) {
+ i = user_types_->insert(std::make_pair(name,
+ UserType::Ptr(new UserType(MetadataBase::name(), name)))).first;
+ }
+ return i->second;
+}
+
const UserType* KeyspaceMetadata::get_user_type(const std::string& name) const {
- UserTypeMap::const_iterator i = user_types_->find(name);
+ UserType::Map::const_iterator i = user_types_->find(name);
if (i == user_types_->end()) return NULL;
return i->second.get();
}
-void KeyspaceMetadata::update(int version, const SharedRefPtr& buffer, const Row* row) {
+void KeyspaceMetadata::update(const MetadataConfig& config, const SharedRefPtr& buffer, const Row* row) {
add_field(buffer, row, "keyspace_name");
add_field(buffer, row, "durable_writes");
- add_field(buffer, row, "strategy_class");
- add_json_map_field(version, row, "strategy_options");
-}
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ const Value* map = add_field(buffer, row, "replication");
+ if (map != NULL &&
+ map->value_type() == CASS_VALUE_TYPE_MAP &&
+ is_string_type(map->primary_value_type()) &&
+ is_string_type(map->secondary_value_type())) {
+ MapIterator iterator(map);
+ while (iterator.next()) {
+ const Value* key = iterator.key();
+ const Value* value = iterator.value();
+ if (key->to_string_ref() == "class") {
+ strategy_class_ = value->to_string_ref();
+ }
+ strategy_options_[key->to_string_ref()] = value->to_string_ref();
+ }
+ }
+ } else {
+ const Value* value = add_field(buffer, row, "strategy_class");
+ if (value != NULL &&
+ is_string_type(value->value_type())) {
+ strategy_class_ = value->to_string_ref();
+ }
-void KeyspaceMetadata::add_user_type(const SharedRefPtr& user_type) {
- (*user_types_)[user_type->type_name()] = user_type;
+ const Value* map = add_json_map_field(config.protocol_version, row, "strategy_options");
+ if (map != NULL &&
+ map->value_type() == CASS_VALUE_TYPE_MAP &&
+ is_string_type(map->primary_value_type()) &&
+ is_string_type(map->secondary_value_type())) {
+ MapIterator iterator(map);
+ while (iterator.next()) {
+ const Value* key = iterator.key();
+ const Value* value = iterator.value();
+ strategy_options_[key->to_string_ref()] = value->to_string_ref();
+ }
+ }
+ }
}
void KeyspaceMetadata::drop_user_type(const std::string& type_name) {
@@ -897,43 +950,56 @@ void KeyspaceMetadata::drop_aggregate(const std::string& full_aggregate_name) {
aggregates_->erase(full_aggregate_name);
}
-TableMetadata::TableMetadata(const std::string& name,
- int version, const SharedRefPtr& buffer, const Row* row)
+TableMetadata::TableMetadata(const MetadataConfig& config,
+ const std::string& name, const SharedRefPtr& buffer, const Row* row)
: MetadataBase(name) {
add_field(buffer, row, "keyspace_name");
- add_field(buffer, row, "columnfamily_name");
- add_field(buffer, row, "cf_id");
+ add_field(buffer, row, table_column_name(config.cassandra_version));
add_field(buffer, row, "bloom_filter_fp_chance");
add_field(buffer, row, "caching");
- add_field(buffer, row, "id");
- add_json_list_field(version, row, "column_aliases");
add_field(buffer, row, "comment");
- add_field(buffer, row, "compaction_strategy_class");
- add_json_map_field(version, row, "compaction_strategy_options");
- add_field(buffer, row, "comparator");
- add_json_map_field(version, row, "compression_parameters");
add_field(buffer, row, "default_time_to_live");
- add_field(buffer, row, "default_validator");
- add_field(buffer, row, "dropped_columns");
add_field(buffer, row, "gc_grace_seconds");
- add_field(buffer, row, "index_interval");
- add_field(buffer, row, "is_dense");
- add_field(buffer, row, "key_alias");
- add_json_list_field(version, row, "key_aliases");
- add_field(buffer, row, "key_validator");
- add_field(buffer, row, "local_read_repair_chance");
- add_field(buffer, row, "max_compaction_threshold");
+ add_field(buffer, row, "id");
+ add_field(buffer, row, "speculative_retry");
add_field(buffer, row, "max_index_interval");
- add_field(buffer, row, "memtable_flush_period_in_ms");
- add_field(buffer, row, "min_compaction_threshold");
add_field(buffer, row, "min_index_interval");
- add_field(buffer, row, "populate_io_cache_on_flush");
+ add_field(buffer, row, "memtable_flush_period_in_ms");
add_field(buffer, row, "read_repair_chance");
- add_field(buffer, row, "replicate_on_write");
- add_field(buffer, row, "speculative_retry");
- add_field(buffer, row, "subcomparator");
- add_field(buffer, row, "type");
- add_field(buffer, row, "value_alias");
+
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ add_field(buffer, row, "dclocal_read_repair_chance");
+ add_field(buffer, row, "crc_check_chance");
+ add_field(buffer, row, "compaction");
+ add_field(buffer, row, "compression");
+ add_field(buffer, row, "extensions");
+ add_field(buffer, row, "flags");
+ } else {
+ add_field(buffer, row, "cf_id");
+ add_field(buffer, row, "local_read_repair_chance");
+
+ add_field(buffer, row, "compaction_strategy_class");
+ add_json_map_field(config.protocol_version, row, "compaction_strategy_options");
+ add_json_map_field(config.protocol_version, row, "compression_parameters");
+
+ add_json_list_field(config.protocol_version, row, "column_aliases");
+ add_field(buffer, row, "comparator");
+ add_field(buffer, row, "subcomparator");
+ add_field(buffer, row, "default_validator");
+ add_field(buffer, row, "key_alias");
+ add_json_list_field(config.protocol_version, row, "key_aliases");
+ add_field(buffer, row, "value_alias");
+ add_field(buffer, row, "key_validator");
+ add_field(buffer, row, "type");
+
+ add_field(buffer, row, "dropped_columns");
+ add_field(buffer, row, "index_interval");
+ add_field(buffer, row, "is_dense");
+ add_field(buffer, row, "max_compaction_threshold");
+ add_field(buffer, row, "min_compaction_threshold");
+ add_field(buffer, row, "populate_io_cache_on_flush");
+ add_field(buffer, row, "replicate_on_write");
+ }
}
const ColumnMetadata* TableMetadata::get_column(const std::string& name) const {
@@ -973,13 +1039,13 @@ size_t get_column_count(const ColumnMetadata::Vec& columns, CassColumnType type)
return count;
}
-void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version) {
+void TableMetadata::build_keys_and_sort(const MetadataConfig& config) {
// Also, Reorders columns so that the order is:
// 1) Parition key
// 2) Clustering keys
// 3) Other columns
- if (cassandra_version.major() >= 2) {
+ if (config.cassandra_version.major() >= 2) {
partition_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_PARTITION_KEY));
clustering_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_CLUSTERING_KEY));
for (ColumnMetadata::Vec::const_iterator i = columns_.begin(),
@@ -1012,7 +1078,8 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
}
}
- SharedRefPtr key_validator = TypeParser::parse_with_composite(get_string_field("key_validator"));
+ SharedRefPtr key_validator
+ = DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), config.native_types);
size_t size = key_validator->types().size();
partition_key_.reserve(size);
for (size_t i = 0; i < size; ++i) {
@@ -1045,7 +1112,8 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
}
// TODO: Figure out how to test these special cases and properly document them here
- SharedRefPtr comparator = TypeParser::parse_with_composite(get_string_field("comparator"));
+ SharedRefPtr comparator
+ = DataTypeClassNameParser::parse_with_composite(get_string_field("comparator"), config.native_types);
size_t size = comparator->types().size();
if (comparator->is_composite()) {
if (!comparator->collections().empty() ||
@@ -1087,7 +1155,7 @@ void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version)
}
}
-void TableMetadata::key_aliases(KeyAliases* output) const {
+void TableMetadata::key_aliases(const NativeDataTypes& native_types, KeyAliases* output) const {
const Value* aliases = get_field("key_aliases");
if (aliases != NULL) {
output->reserve(aliases->count());
@@ -1097,7 +1165,8 @@ void TableMetadata::key_aliases(KeyAliases* output) const {
}
}
if (output->empty()) {// C* 1.2 tables created via CQL2 or thrift don't have col meta or key aliases
- SharedRefPtr key_validator_type = TypeParser::parse_with_composite(get_string_field("key_validator"));
+ SharedRefPtr key_validator_type
+ = DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), native_types);
const size_t count = key_validator_type->types().size();
std::ostringstream ss("key");
for (size_t i = 0; i < count; ++i) {
@@ -1110,7 +1179,9 @@ void TableMetadata::key_aliases(KeyAliases* output) const {
}
}
-FunctionMetadata::FunctionMetadata(const std::string& name, const Value* signature,
+FunctionMetadata::FunctionMetadata(const MetadataConfig& config,
+ const std::string& name, const Value* signature,
+ KeyspaceMetadata* keyspace,
const SharedRefPtr& buffer, const Row* row)
: MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
, simple_name_(name) {
@@ -1130,18 +1201,31 @@ FunctionMetadata::FunctionMetadata(const std::string& name, const Value* signatu
value2->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
CollectionIterator iterator1(value1);
CollectionIterator iterator2(value2);
- while (iterator1.next() && iterator2.next()) {
- StringRef arg_name(iterator1.value()->to_string_ref());
- DataType::Ptr arg_type(TypeParser::parse_one(iterator2.value()->to_string()));
- args_.push_back(Argument(arg_name, arg_type));
- args_by_name_[arg_name] = arg_type;
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ while (iterator1.next() && iterator2.next()) {
+ StringRef arg_name(iterator1.value()->to_string_ref());
+ DataType::ConstPtr arg_type(DataTypeCqlNameParser::parse(iterator2.value()->to_string(), config.native_types, keyspace));
+ args_.push_back(Argument(arg_name, arg_type));
+ args_by_name_[arg_name] = arg_type;
+ }
+ } else {
+ while (iterator1.next() && iterator2.next()) {
+ StringRef arg_name(iterator1.value()->to_string_ref());
+ DataType::ConstPtr arg_type(DataTypeClassNameParser::parse_one(iterator2.value()->to_string(), config.native_types));
+ args_.push_back(Argument(arg_name, arg_type));
+ args_by_name_[arg_name] = arg_type;
+ }
}
}
value1 = add_field(buffer, row, "return_type");
if (value1 != NULL &&
value1->value_type() == CASS_VALUE_TYPE_VARCHAR) {
- return_type_ = TypeParser::parse_one(value1->to_string());
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ return_type_ = DataTypeCqlNameParser::parse(value1->to_string(), config.native_types, keyspace);
+ } else {
+ return_type_ = DataTypeClassNameParser::parse_one(value1->to_string(), config.native_types);
+ }
}
value1 = add_field(buffer, row, "body");
@@ -1169,12 +1253,14 @@ const DataType* FunctionMetadata::get_arg_type(StringRef name) const {
return i->second.get();
}
-AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signature,
- const FunctionMetadata::Map& functions,
- int version, const SharedRefPtr& buffer, const Row* row)
+AggregateMetadata::AggregateMetadata(const MetadataConfig& config,
+ const std::string& name, const Value* signature,
+ KeyspaceMetadata* keyspace,
+ const SharedRefPtr& buffer, const Row* row)
: MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
, simple_name_(name) {
const Value* value;
+ const FunctionMetadata::Map& functions = keyspace->functions();
add_field(buffer, row, "keyspace_name");
add_field(buffer, row, "aggregate_name");
@@ -1184,21 +1270,35 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
value->value_type() == CASS_VALUE_TYPE_LIST &&
value->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
CollectionIterator iterator(value);
- while (iterator.next()) {
- arg_types_.push_back(TypeParser::parse_one(iterator.value()->to_string()));
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ while (iterator.next()) {
+ arg_types_.push_back(DataTypeCqlNameParser::parse(iterator.value()->to_string(), config.native_types, keyspace));
+ }
+ } else {
+ while (iterator.next()) {
+ arg_types_.push_back(DataTypeClassNameParser::parse_one(iterator.value()->to_string(), config.native_types));
+ }
}
}
value = add_field(buffer, row, "return_type");
if (value != NULL &&
value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
- return_type_ = TypeParser::parse_one(value->to_string());
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ return_type_ = DataTypeCqlNameParser::parse(value->to_string(), config.native_types, keyspace);
+ } else {
+ return_type_ = DataTypeClassNameParser::parse_one(value->to_string(), config.native_types);
+ }
}
value = add_field(buffer, row, "state_type");
if (value != NULL &&
value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
- state_type_ = TypeParser::parse_one(value->to_string());
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ state_type_ = DataTypeCqlNameParser::parse(value->to_string(), config.native_types, keyspace);
+ } else {
+ state_type_ = DataTypeClassNameParser::parse_one(value->to_string(), config.native_types);
+ }
}
value = add_field(buffer, row, "final_func");
@@ -1226,74 +1326,104 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
}
value = add_field(buffer, row, "initcond");
- if (value != NULL &&
- value->value_type() == CASS_VALUE_TYPE_BLOB) {
- init_cond_ = Value(version, state_type_, value->data(), value->size());
+ if (value != NULL) {
+ if (value->value_type() == CASS_VALUE_TYPE_BLOB) {
+ init_cond_ = Value(config.protocol_version, state_type_, value->data(), value->size());
+ } else if (config.cassandra_version >= VersionNumber(3, 0, 0) &&
+ value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+ init_cond_ = Value(config.protocol_version,
+ config.native_types.by_cql_name("varchar"),
+ value->data(), value->size());
+ }
}
}
-ColumnMetadata::ColumnMetadata(const std::string& name,
- int version, const SharedRefPtr& buffer, const Row* row)
+ColumnMetadata::ColumnMetadata(const MetadataConfig& config,
+ const std::string& name,
+ KeyspaceMetadata* keyspace,
+ const SharedRefPtr& buffer, const Row* row)
: MetadataBase(name)
, type_(CASS_COLUMN_TYPE_REGULAR)
- , position_(0)
- , is_reversed_(false) {
+ , position_(0) {
const Value* value;
add_field(buffer, row, "keyspace_name");
- add_field(buffer, row, "columnfamily_name");
+ add_field(buffer, row, table_column_name(config.cassandra_version));
add_field(buffer, row, "column_name");
- value = add_field(buffer, row, "type");
- if (value != NULL &&
- value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
- StringRef type = value->to_string_ref();
- if (type == "partition_key") {
- type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
- } else if (type == "clustering_key") {
- type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
- } else if (type == "static") {
- type_ = CASS_COLUMN_TYPE_STATIC;
- } else {
- type_ = CASS_COLUMN_TYPE_REGULAR;
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ add_field(buffer, row, "clustering_order");
+ add_field(buffer, row, "column_name_bytes");
+
+ value = add_field(buffer, row, "kind");
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+ StringRef type = value->to_string_ref();
+ if (type == "partition_key") {
+ type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
+ } else if (type == "clustering") {
+ type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
+ } else if (type == "static") {
+ type_ = CASS_COLUMN_TYPE_STATIC;
+ } else {
+ type_ = CASS_COLUMN_TYPE_REGULAR;
+ }
}
- }
- value = add_field(buffer, row, "component_index");
- // For C* 2.0 to 2.2 this is "null" for single component partition keys
- // so the default position of 0 works. C* 1.2 and below don't use this.
- if (value != NULL &&
- value->value_type() == CASS_VALUE_TYPE_INT) {
- position_ = value->as_int32();
- }
+ value = add_field(buffer, row, "position");
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_INT) {
+ position_ = value->as_int32();
+ if (position_ < 0) position_ = 0;
+ }
- value = add_field(buffer, row, "validator");
- if (value != NULL &&
- value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
- std::string validator(value->to_string());
- // TODO: Use const changes from CPP-295
- data_type_ = SharedRefPtr(TypeParser::parse_one(validator));
- is_reversed_ = TypeParser::is_reversed(validator);
- }
+ value = add_field(buffer, row, "type");
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+ std::string type(value->to_string());
+ data_type_ = DataTypeCqlNameParser::parse(type, config.native_types, keyspace);
+ }
+ } else {
+ value = add_field(buffer, row, "type");
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+ StringRef type = value->to_string_ref();
+ if (type == "partition_key") {
+ type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
+ } else if (type == "clustering_key") {
+ type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
+ } else if (type == "static") {
+ type_ = CASS_COLUMN_TYPE_STATIC;
+ } else if (type == "compact_value") {
+ type_ = CASS_COLUMN_TYPE_COMPACT_VALUE;
+ } else {
+ type_ = CASS_COLUMN_TYPE_REGULAR;
+ }
+ }
- add_field(buffer, row, "index_name");
- add_json_map_field(version, row, "index_options");
- add_field(buffer, row, "index_type");
-}
+ value = add_field(buffer, row, "component_index");
+ // For C* 2.0 to 2.2 this is "null" for single component partition keys
+ // so the default position of 0 works. C* 1.2 and below don't use this.
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_INT) {
+ position_ = value->as_int32();
+ }
-void Metadata::SchemaSnapshot::get_table_key_columns(const std::string& ks_name,
- const std::string& table_name,
- std::vector* output) const {
- const KeyspaceMetadata* keyspace = get_keyspace(ks_name);
- if (keyspace != NULL) {
- const TableMetadata* table = keyspace->get_table(table_name);
- if (table != NULL) {
- table->key_aliases(output);
+ value = add_field(buffer, row, "validator");
+ if (value != NULL &&
+ value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
+ std::string validator(value->to_string());
+ data_type_ = DataTypeClassNameParser::parse_one(validator, config.native_types);
}
+
+ add_field(buffer, row, "index_name");
+ add_json_map_field(config.protocol_version, row, "index_options");
+ add_field(buffer, row, "index_type");
}
}
-void Metadata::InternalData::update_keyspaces(int version, ResultResponse* result, KeyspaceMetadata::Map& updates) {
+void Metadata::InternalData::update_keyspaces(const MetadataConfig& config,
+ ResultResponse* result, KeyspaceMetadata::Map& updates) {
SharedRefPtr buffer = result->buffer();
result->decode_first_row();
ResultIterator rows(result);
@@ -1308,19 +1438,20 @@ void Metadata::InternalData::update_keyspaces(int version, ResultResponse* resul
}
KeyspaceMetadata* keyspace = get_or_create_keyspace(keyspace_name);
- keyspace->update(version, buffer, row);
+ keyspace->update(config, buffer, row);
updates.insert(std::make_pair(keyspace_name, *keyspace));
}
}
-void Metadata::InternalData::update_tables(int version, const VersionNumber& cassandra_version, ResultResponse* tables_result, ResultResponse* columns_result) {
+void Metadata::InternalData::update_tables(const MetadataConfig& config,
+ ResultResponse* tables_result, ResultResponse* columns_result) {
SharedRefPtr buffer = tables_result->buffer();
tables_result->decode_first_row();
ResultIterator rows(tables_result);
std::string keyspace_name;
- std::string columnfamily_name;
+ std::string table_name;
KeyspaceMetadata* keyspace = NULL;
while (rows.next()) {
@@ -1328,8 +1459,8 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas
const Row* row = rows.row();
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
- !row->get_string_by_name("columnfamily_name", &columnfamily_name)) {
- LOG_ERROR("Unable to get column value for 'keyspace_name' or 'columnfamily_name'");
+ !row->get_string_by_name(table_column_name(config.cassandra_version), &table_name)) {
+ LOG_ERROR("Unable to get column value for 'keyspace_name' or '%s'", table_column_name(config.cassandra_version));
continue;
}
@@ -1338,13 +1469,13 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas
keyspace = get_or_create_keyspace(keyspace_name);
}
- keyspace->add_table(TableMetadata::Ptr(new TableMetadata(columnfamily_name, version, buffer, row)));
+ keyspace->add_table(TableMetadata::Ptr(new TableMetadata(config, table_name, buffer, row)));
}
- update_columns(version, cassandra_version, columns_result);
+ update_columns(config, columns_result);
}
-void Metadata::InternalData::update_user_types(ResultResponse* result) {
+void Metadata::InternalData::update_user_types(const MetadataConfig& config, ResultResponse* result) {
result->decode_first_row();
ResultIterator rows(result);
@@ -1404,7 +1535,14 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
std::string field_name(name->to_string());
- SharedRefPtr data_type = TypeParser::parse_one(type->to_string());
+ DataType::ConstPtr data_type;
+
+ if (config.cassandra_version >= VersionNumber(3, 0, 0)) {
+ data_type = DataTypeCqlNameParser::parse(type->to_string(), config.native_types, keyspace);
+ } else {
+ data_type = DataTypeClassNameParser::parse_one(type->to_string(), config.native_types);
+ }
+
if (!data_type) {
LOG_ERROR("Invalid 'field_type' for field \"%s\", keyspace \"%s\" and type \"%s\"",
field_name.c_str(),
@@ -1416,12 +1554,12 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
fields.push_back(UserType::Field(field_name, data_type));
}
- keyspace->add_user_type(
- SharedRefPtr(new UserType(keyspace_name, type_name, fields)));
+ keyspace->get_or_create_user_type(type_name)->set_fields(fields);
}
}
-void Metadata::InternalData::update_functions(ResultResponse* result) {
+void Metadata::InternalData::update_functions(const MetadataConfig& config,
+ ResultResponse* result) {
SharedRefPtr buffer = result->buffer();
result->decode_first_row();
@@ -1435,7 +1573,7 @@ void Metadata::InternalData::update_functions(ResultResponse* result) {
std::string function_name;
const Row* row = rows.row();
- const Value* signature = row->get_by_name("signature");
+ const Value* signature = row->get_by_name(signature_column_name(config.cassandra_version));
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
!row->get_string_by_name("function_name", &function_name) ||
signature == NULL) {
@@ -1448,13 +1586,15 @@ void Metadata::InternalData::update_functions(ResultResponse* result) {
keyspace = get_or_create_keyspace(keyspace_name);
}
- keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(function_name,
- signature, buffer, row)));
+ keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(config,
+ function_name, signature,
+ keyspace,
+ buffer, row)));
}
}
-void Metadata::InternalData::update_aggregates(int version, ResultResponse* result) {
+void Metadata::InternalData::update_aggregates(const MetadataConfig& config, ResultResponse* result) {
SharedRefPtr buffer = result->buffer();
result->decode_first_row();
@@ -1468,7 +1608,7 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
std::string aggregate_name;
const Row* row = rows.row();
- const Value* signature = row->get_by_name("signature");
+ const Value* signature = row->get_by_name(signature_column_name(config.cassandra_version));
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
!row->get_string_by_name("aggregate_name", &aggregate_name) ||
signature == NULL) {
@@ -1481,9 +1621,10 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
keyspace = get_or_create_keyspace(keyspace_name);
}
- keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(aggregate_name, signature,
- keyspace->functions(),
- version, buffer, row)));
+ keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(config,
+ aggregate_name, signature,
+ keyspace,
+ buffer, row)));
}
}
@@ -1515,14 +1656,14 @@ void Metadata::InternalData::drop_aggregate(const std::string& keyspace_name, co
i->second.drop_aggregate(full_aggregate_name);
}
-void Metadata::InternalData::update_columns(int version, const VersionNumber& cassandra_version, ResultResponse* result) {
+void Metadata::InternalData::update_columns(const MetadataConfig& config, ResultResponse* result) {
SharedRefPtr buffer = result->buffer();
result->decode_first_row();
ResultIterator rows(result);
std::string keyspace_name;
- std::string columnfamily_name;
+ std::string table_name;
std::string column_name;
KeyspaceMetadata* keyspace = NULL;
@@ -1530,13 +1671,14 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
while (rows.next()) {
std::string temp_keyspace_name;
- std::string temp_columnfamily_name;
+ std::string temp_table_name;
const Row* row = rows.row();
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
- !row->get_string_by_name("columnfamily_name", &temp_columnfamily_name) ||
+ !row->get_string_by_name(table_column_name(config.cassandra_version), &temp_table_name) ||
!row->get_string_by_name("column_name", &column_name)) {
- LOG_ERROR("Unable to get column value for 'keyspace_name', 'columnfamily_name' or 'column_name'");
+ LOG_ERROR("Unable to get column value for 'keyspace_name', '%s' or 'column_name'",
+ table_column_name(config.cassandra_version));
continue;
}
@@ -1545,22 +1687,23 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
keyspace = get_or_create_keyspace(keyspace_name);
}
- if (columnfamily_name != temp_columnfamily_name) {
+ if (table_name != temp_table_name) {
// Build keys for the previous table
if (table) {
- table->build_keys_and_sort(cassandra_version);
+ table->build_keys_and_sort(config);
}
- columnfamily_name = temp_columnfamily_name;
- table = keyspace->get_or_create_table(columnfamily_name);
+ table_name = temp_table_name;
+ table = keyspace->get_or_create_table(table_name);
table->clear_columns();
}
- table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(column_name, version, buffer, row)));
+ table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(config, column_name,
+ keyspace, buffer, row)));
}
// Build keys for the last table
if (table) {
- table->build_keys_and_sort(cassandra_version);
+ table->build_keys_and_sort(config);
}
}
diff --git a/src/metadata.hpp b/src/metadata.hpp
index ec39012b4..60004d95b 100644
--- a/src/metadata.hpp
+++ b/src/metadata.hpp
@@ -24,7 +24,8 @@
#include "scoped_lock.hpp"
#include "scoped_ptr.hpp"
#include "token_map.hpp"
-#include "type_parser.hpp"
+#include "data_type.hpp"
+#include "value.hpp"
#include