Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
366 changes: 351 additions & 15 deletions include/cassandra.h

Large diffs are not rendered by default.

77 changes: 51 additions & 26 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

#define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces"
#define SELECT_TABLES_30 "SELECT * FROM system_schema.tables"
#define SELECT_VIEWS_30 "SELECT * FROM system_schema.views"
#define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns"
#define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types"
#define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions"
Expand Down Expand Up @@ -316,7 +317,7 @@ void ControlConnection::on_event(EventResponse* response) {
refresh_keyspace(response->keyspace());
break;
case EventResponse::TABLE:
refresh_table(response->keyspace(), response->target());
refresh_table_or_view(response->keyspace(), response->target());
break;
case EventResponse::TYPE:
refresh_type(response->keyspace(), response->target());
Expand All @@ -337,8 +338,8 @@ void ControlConnection::on_event(EventResponse* response) {
session_->metadata().drop_keyspace(response->keyspace().to_string());
break;
case EventResponse::TABLE:
session_->metadata().drop_table(response->keyspace().to_string(),
response->target().to_string());
session_->metadata().drop_table_or_view(response->keyspace().to_string(),
response->target().to_string());
break;
case EventResponse::TYPE:
session_->metadata().drop_user_type(response->keyspace().to_string(),
Expand Down Expand Up @@ -469,6 +470,7 @@ void ControlConnection::query_meta_schema() {
if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
handler->execute_query("keyspaces", SELECT_KEYSPACES_30);
handler->execute_query("tables", SELECT_TABLES_30);
handler->execute_query("views", SELECT_VIEWS_30);
handler->execute_query("columns", SELECT_COLUMNS_30);
handler->execute_query("user_types", SELECT_USERTYPES_30);
handler->execute_query("functions", SELECT_FUNCTIONS_30);
Expand Down Expand Up @@ -508,9 +510,17 @@ void ControlConnection::on_query_meta_schema(ControlConnection* control_connecti

ResultResponse* tables_result;
if (MultipleRequestHandler::get_result_response(responses, "tables", &tables_result)) {
ResultResponse* columns_result = NULL;
MultipleRequestHandler::get_result_response(responses, "columns", &columns_result);
session->metadata().update_tables(tables_result, columns_result);
session->metadata().update_tables(tables_result);
}

ResultResponse* views_result;
if (MultipleRequestHandler::get_result_response(responses, "views", &views_result)) {
session->metadata().update_views(views_result);
}

ResultResponse* columns_result = NULL;
if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) {
session->metadata().update_columns(columns_result);
}

ResultResponse* user_types_result;
Expand Down Expand Up @@ -745,57 +755,72 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio
control_connection->session_->metadata().update_keyspaces(result);
}

void ControlConnection::refresh_table(const StringRef& keyspace_name,
const StringRef& table_name) {
void ControlConnection::refresh_table_or_view(const StringRef& keyspace_name,
const StringRef& table_or_view_name) {
std::string table_query;
std::string view_query;
std::string column_query;

if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
table_query.assign(SELECT_TABLES_30);
table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
.append("' AND table_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");

view_query.assign(SELECT_VIEWS_30);
view_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND view_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");

column_query.assign(SELECT_COLUMNS_30);
column_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
.append("' AND table_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");

LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str());
LOG_DEBUG("Refreshing table/view %s; %s; %s", table_query.c_str(), view_query.c_str(), column_query.c_str());
} else {
table_query.assign(SELECT_COLUMN_FAMILIES_20);
table_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
.append("' AND columnfamily_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");

column_query.assign(SELECT_COLUMNS_20);
column_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
.append("' AND columnfamily_name='").append(table_or_view_name.data(), table_or_view_name.size()).append("'");

LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str());
}

ScopedRefPtr<ControlMultipleRequestHandler<RefreshTableData> > handler(
new ControlMultipleRequestHandler<RefreshTableData>(this,
ControlConnection::on_refresh_table,
RefreshTableData(keyspace_name.to_string(), table_name.to_string())));
ControlConnection::on_refresh_table_or_view,
RefreshTableData(keyspace_name.to_string(), table_or_view_name.to_string())));
handler->execute_query("tables", table_query);
if (!view_query.empty()) {
handler->execute_query("views", view_query);
}
handler->execute_query("columns", column_query);
}

void ControlConnection::on_refresh_table(ControlConnection* control_connection,
const RefreshTableData& data,
const MultipleRequestHandler::ResponseMap& responses) {
void ControlConnection::on_refresh_table_or_view(ControlConnection* control_connection,
const RefreshTableData& data,
const MultipleRequestHandler::ResponseMap& responses) {
ResultResponse* tables_result;
Session* session = control_connection->session_;
if (!MultipleRequestHandler::get_result_response(responses, "tables", &tables_result) ||
tables_result->row_count() == 0) {
LOG_ERROR("No row found for column family %s.%s in system schema table.",
data.keyspace_name.c_str(), data.table_name.c_str());
return;
ResultResponse* views_result;
if (!MultipleRequestHandler::get_result_response(responses, "views", &views_result) ||
views_result->row_count() == 0) {
LOG_ERROR("No row found for table (or view) %s.%s in system schema tables.",
data.keyspace_name.c_str(), data.table_or_view_name.c_str());
return;
}
session->metadata().update_views(views_result);
} else {
session->metadata().update_tables(tables_result);
}

ResultResponse* columns_result = NULL;
MultipleRequestHandler::get_result_response(responses, "columns", &columns_result);

Session* session = control_connection->session_;
session->metadata().update_tables(tables_result, columns_result);
ResultResponse* columns_result;
if (MultipleRequestHandler::get_result_response(responses, "columns", &columns_result)) {
session->metadata().update_columns(columns_result);
}
}


Expand Down
8 changes: 4 additions & 4 deletions src/control_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ class ControlConnection : public Connection::Listener {
RefreshTableData(const std::string& keyspace_name,
const std::string& table_name)
: keyspace_name(keyspace_name)
, table_name(table_name) {}
, table_or_view_name(table_name) {}
std::string keyspace_name;
std::string table_name;
std::string table_or_view_name;
};

struct UnusedData {};
Expand Down Expand Up @@ -211,9 +211,9 @@ class ControlConnection : public Connection::Listener {
void refresh_keyspace(const StringRef& keyspace_name);
static void on_refresh_keyspace(ControlConnection* control_connection, const std::string& keyspace_name, Response* response);

void refresh_table(const StringRef& keyspace_name,
void refresh_table_or_view(const StringRef& keyspace_name,
const StringRef& table_name);
static void on_refresh_table(ControlConnection* control_connection,
static void on_refresh_table_or_view(ControlConnection* control_connection,
const RefreshTableData& data,
const MultipleRequestHandler::ResponseMap& responses);

Expand Down
4 changes: 2 additions & 2 deletions src/data_type_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ bool from_hex(const std::string& hex, std::string* result) {
}

DataType::ConstPtr DataTypeCqlNameParser::parse(const std::string& type,
const NativeDataTypes& native_types,
KeyspaceMetadata* keyspace) {
const NativeDataTypes& native_types,
KeyspaceMetadata* keyspace) {
Parser parser(type, 0);
std::string type_name;
Parser::TypeParamsVec params;
Expand Down
1 change: 1 addition & 0 deletions src/external_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ EXTERNAL_TYPE(cass::SslContext, CassSsl);
EXTERNAL_TYPE(cass::Metadata::SchemaSnapshot, CassSchemaMeta);
EXTERNAL_TYPE(cass::KeyspaceMetadata, CassKeyspaceMeta);
EXTERNAL_TYPE(cass::TableMetadata, CassTableMeta);
EXTERNAL_TYPE(cass::ViewMetadata, CassMaterializedViewMeta);
EXTERNAL_TYPE(cass::ColumnMetadata, CassColumnMeta);
EXTERNAL_TYPE(cass::FunctionMetadata, CassFunctionMeta);
EXTERNAL_TYPE(cass::AggregateMetadata, CassAggregateMeta);
Expand Down
Loading