Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: drop aggr tables in drop table #3908

Merged
merged 6 commits into from
Jul 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 68 additions & 82 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -875,54 +875,41 @@ bool SQLClusterRouter::DropTable(const std::string& db, const std::string& table
}
}

// delete pre-aggr meta info if need
if (table_info->base_table_tid() > 0) {
std::string meta_db = openmldb::nameserver::INTERNAL_DB;
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select base_db,base_table,aggr_func,aggr_col,partition_cols,order_by_col,filter_col from ",
meta_db, ".", meta_table, " where aggr_table = '", table_info->name(), "';");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() != 1) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError,
"duplicate records generate with aggr table name: " + table_info->name());
return false;
}
std::string idx_key;
if (rs->Next()) {
for (int i = 0; i < rs->GetSchema()->GetColumnCnt(); i++) {
if (!idx_key.empty()) {
idx_key += "|";
}
auto k = rs->GetAsStringUnsafe(i);
if (k.empty()) {
idx_key += hybridse::codec::EMPTY_STRING;
} else {
idx_key += k;
}
// delete related pre-aggr tables first
std::string meta_db = openmldb::nameserver::INTERNAL_DB;
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '",
table_info->name(), "' and base_db='", table_info->db(), "';");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() > 0) {
// drop aggr-tables, if got error, delete manully
std::vector<std::pair<std::string, std::string>> aggr_tables;
while (rs->Next()) {
std::string aggr_db = rs->GetStringUnsafe(0);
std::string aggr_table = rs->GetStringUnsafe(1);

if (aggr_db.empty() || aggr_table.empty()) {
WARN_NOT_OK_AND_RET(
status, absl::StrCat("aggr table ", aggr_db, " or ", aggr_table, " is empty, can't delete"), false);
}
} else {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "access ResultSet failed");
return false;
}
auto tablet_accessor = cluster_sdk_->GetTablet(meta_db, meta_table, (uint32_t)0);
if (!tablet_accessor) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet accessor failed");
return false;
}
auto tablet_client = tablet_accessor->GetClient();
if (!tablet_client) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "get tablet client failed");
return false;
if (!DropTable(aggr_db, aggr_table, true, status)) {
WARN_NOT_OK_AND_RET(status, absl::StrCat("drop aggr table ", aggr_db, ".", aggr_table, " failed"),
false);
}
aggr_tables.emplace_back(aggr_db, aggr_table);
vagetablechicken marked this conversation as resolved.
Show resolved Hide resolved
}
auto tid = cluster_sdk_->GetTableId(meta_db, meta_table);
std::string msg;
if (!tablet_client->Delete(tid, 0, table_info->name(), "aggr_table", msg) ||
!tablet_client->Delete(tid, 0, idx_key, "unique_key", msg)) {
SET_STATUS_AND_WARN(status, StatusCode::kCmdError, "delete aggr meta failed");
return false;
// drop pre-agg meta in meta table
for (auto& aggr_table : aggr_tables) {
LOG(INFO) << "drop aggr meta " << aggr_table.first << "." << aggr_table.second << "by table name";
std::string delete_aggr_info =
absl::StrCat("delete from ", meta_db, ".", meta_table, " where aggr_table='", aggr_table.second, "';");
auto rs = ExecuteSQL("", delete_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "delete aggr info failed", false);
}
} else {
LOG(INFO) << "no related pre-aggr tables";
}

// Check offline table info first
Expand Down Expand Up @@ -1280,7 +1267,7 @@ bool SQLClusterRouter::ExecuteInsert(const std::string& db, const std::string& s

std::vector<size_t> fails;
if (!codegen_rows.empty()) {
for (size_t i = 0 ; i < codegen_rows.size(); ++i) {
for (size_t i = 0; i < codegen_rows.size(); ++i) {
auto r = codegen_rows[i];
auto row = std::make_shared<SQLInsertRow>(table_info, schema, r, put_if_absent);
if (!PutRow(table_info->tid(), row, tablets, status)) {
Expand Down Expand Up @@ -1695,7 +1682,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::HandleSQLCmd(const h
}

case hybridse::node::kCmdShowUser: {
std::vector<std::string> value = { options_->user };
std::vector<std::string> value = {options_->user};
return ResultSetSQL::MakeResultSet({"User"}, {value}, status);
}

Expand Down Expand Up @@ -2740,7 +2727,8 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
}
case hybridse::node::kPlanTypeCreateUser: {
auto create_node = dynamic_cast<hybridse::node::CreateUserPlanNode*>(node);
UserInfo user_info;;
UserInfo user_info;
;
auto result = GetUser(create_node->Name(), &user_info);
if (!result.ok()) {
*status = {StatusCode::kCmdError, result.status().message()};
Expand Down Expand Up @@ -2954,7 +2942,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
if (is_online_mode) {
// Handle in online mode
config.emplace("spark.insert_memory_usage_limit",
std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed)));
std::to_string(insert_memory_usage_limit_.load(std::memory_order_relaxed)));
base_status = ImportOnlineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info);
} else {
// Handle in offline mode
Expand Down Expand Up @@ -4874,10 +4862,10 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetNameServerJobResu
}

absl::StatusOr<bool> SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) {
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
hybridse::sdk::Status status;
auto rs = ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql,
std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
auto rs =
ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
if (rs == nullptr) {
return absl::InternalError(status.msg);
}
Expand All @@ -4897,17 +4885,17 @@ hybridse::sdk::Status SQLClusterRouter::AddUser(const std::string& name, const s
auto real_password = password.empty() ? password : codec::Encrypt(password);
uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000;
std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (",
"'%',", // host
"'", name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
cur_ts, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'',", // privileges
"null" // extra_info
");");
"'%',", // host
"'", name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
cur_ts, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'',", // privileges
"null" // extra_info
");");
hybridse::sdk::Status status;
ExecuteInsert(nameserver::INTERNAL_DB, sql, &status);
return status;
Expand All @@ -4917,25 +4905,25 @@ hybridse::sdk::Status SQLClusterRouter::UpdateUser(const UserInfo& user_info, co
auto real_password = password.empty() ? password : codec::Encrypt(password);
uint64_t cur_ts = ::baidu::common::timer::get_micros() / 1000;
std::string sql = absl::StrCat("insert into ", nameserver::USER_INFO_NAME, " values (",
"'%',", // host
"'", user_info.name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
user_info.create_time, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'", user_info.privileges, "',", // privileges
"null" // extra_info
");");
"'%',", // host
"'", user_info.name, "','", // user
real_password, "',", // password
cur_ts, ",", // password_last_changed
"0,", // password_expired_time
user_info.create_time, ", ", // create_time
cur_ts, ",", // update_time
1, // account_type
",'", user_info.privileges, "',", // privileges
"null" // extra_info
");");
hybridse::sdk::Status status;
ExecuteInsert(nameserver::INTERNAL_DB, sql, &status);
return status;
}

hybridse::sdk::Status SQLClusterRouter::DeleteUser(const std::string& name) {
std::string sql = absl::StrCat("delete from ", nameserver::USER_INFO_NAME,
" where host = '%' and user = '", name, "';");
std::string sql =
absl::StrCat("delete from ", nameserver::USER_INFO_NAME, " where host = '%' and user = '", name, "';");
hybridse::sdk::Status status;
ExecuteSQL(nameserver::INTERNAL_DB, sql, &status);
return status;
Expand All @@ -4948,12 +4936,10 @@ void SQLClusterRouter::AddUserToConfig(std::map<std::string, std::string>* confi
}
}

::hybridse::sdk::Status SQLClusterRouter::RevertPut(const nameserver::TableInfo& table_info,
uint32_t end_pid,
const std::map<uint32_t, std::vector<std::pair<std::string, uint32_t>>>& dimensions,
uint64_t ts,
const base::Slice& value,
const std::vector<std::shared_ptr<::openmldb::catalog::TabletAccessor>>& tablets) {
::hybridse::sdk::Status SQLClusterRouter::RevertPut(
const nameserver::TableInfo& table_info, uint32_t end_pid,
const std::map<uint32_t, std::vector<std::pair<std::string, uint32_t>>>& dimensions, uint64_t ts,
const base::Slice& value, const std::vector<std::shared_ptr<::openmldb::catalog::TabletAccessor>>& tablets) {
codec::RowView row_view(table_info.column_desc());
std::map<std::string, uint32_t> column_map;
for (int32_t i = 0; i < table_info.column_desc_size(); i++) {
Expand Down
Loading