Skip to content

Commit

Permalink
[fix][store] Fixup delete bdb region hang apply worker issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Apr 29, 2024
1 parent 39e0308 commit 4a7b1c4
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 58 deletions.
10 changes: 9 additions & 1 deletion proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ message DestroyExecutorRequest {
int64 region_id = 1;
}

message DeleteDataRequest {
dingodb.pb.common.Range range = 1;
repeated string raw_cf_names = 2;
repeated string txn_cf_names = 3;
}

// RegionCmdType
enum RegionCmdType {
CMD_NONE = 0; // this is a placeholder
Expand All @@ -174,6 +180,7 @@ enum RegionCmdType {

CMD_STOP = 30; // when region state ORPHAN, stop region
CMD_DESTROY_EXECUTOR = 31; // destroy region executor
CMD_DELETE_DATA = 32; // bdb engine delete data when delete region
}

// Region cmd status
Expand Down Expand Up @@ -207,6 +214,7 @@ message RegionCmd {

StopRequest stop_request = 30; // when region state ORPHAN, stop region
DestroyExecutorRequest destroy_executor_request = 31; // destroy region executor
DeleteDataRequest delete_data_request = 32;
}

bool is_notify = 40; // store need to notify coordinator when this cmd is done
Expand Down Expand Up @@ -554,7 +562,7 @@ message GetCoordinatorMapResponse {
dingodb.pb.common.Location kv_leader_location = 5;
dingodb.pb.common.Location tso_leader_location = 6;
dingodb.pb.common.Location auto_increment_leader_location = 7;
repeated dingodb.pb.common.Location coordinator_locations = 8; // deprcated
repeated dingodb.pb.common.Location coordinator_locations = 8; // deprcated
dingodb.pb.common.CoordinatorMap coordinator_map = 9;
}

Expand Down
172 changes: 117 additions & 55 deletions src/store/region_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,43 +264,62 @@ butil::Status DeleteRegionTask::DeleteRegion(std::shared_ptr<Context> ctx, int64
// Delete raft meta
store_meta_manager->GetStoreRaftMeta()->DeleteRaftMeta(region_id);

// Index region
if (GetRole() == pb::common::ClusterRole::INDEX) {
auto vector_index_wrapper = region->VectorIndexWrapper();
if (vector_index_wrapper != nullptr) {
vector_index_wrapper->Destroy();
}
}

auto region_controller = Server::GetInstance().GetRegionController();

// Delete data
DINGO_LOG(DEBUG) << fmt::format("[control.region][region({})] delete region, delete data", region_id);
if (!Helper::InvalidRange(region->Range())) {
std::vector<std::string> raw_cf_names;
std::vector<std::string> txn_cf_names;

Helper::GetColumnFamilyNames(region->Range().start_key(), raw_cf_names, txn_cf_names);

if (!raw_cf_names.empty()) {
status = region_raw_engine->Writer()->KvDeleteRange(raw_cf_names, region->Range());
if (!status.ok()) {
DINGO_LOG(FATAL) << fmt::format("[control.region][region({})] delete region data raw failed, error: {}",
region->Id(), status.error_str());
if (region_raw_engine->GetRawEngineType() != pb::common::RAW_ENG_BDB) {
if (!raw_cf_names.empty()) {
status = region_raw_engine->Writer()->KvDeleteRange(raw_cf_names, region->Range());
CHECK(status.ok()) << fmt::format("[control.region][region({})] delete region data raw failed, error: {}",
region->Id(), status.error_str());
}
}

if (!txn_cf_names.empty()) {
pb::common::Range txn_range = Helper::GetMemComparableRange(region->Range());
status = region_raw_engine->Writer()->KvDeleteRange(txn_cf_names, txn_range);
if (!status.ok()) {
DINGO_LOG(FATAL) << fmt::format("[control.region][region({})] delete region data txn failed, error: {}",
region->Id(), status.error_str());
if (!txn_cf_names.empty()) {
pb::common::Range txn_range = Helper::GetMemComparableRange(region->Range());
status = region_raw_engine->Writer()->KvDeleteRange(txn_cf_names, txn_range);
CHECK(status.ok()) << fmt::format("[control.region][region({})] delete region data txn failed, error: {}",
region->Id(), status.error_str());
}
} else {
auto command = std::make_shared<pb::coordinator::RegionCmd>();
command->set_id(Helper::TimestampNs());
command->set_region_id(region_id);
command->set_create_timestamp(Helper::TimestampMs());
command->set_region_cmd_type(pb::coordinator::CMD_DELETE_DATA);
auto* mut_request = command->mutable_delete_data_request();
*mut_request->mutable_range() = region->Range();
Helper::VectorToPbRepeated(raw_cf_names, mut_request->mutable_raw_cf_names());
Helper::VectorToPbRepeated(txn_cf_names, mut_request->mutable_txn_cf_names());

auto ctx = std::make_shared<Context>();
auto cond = ctx->CreateSyncModeCond();
status = region_controller->DispatchRegionControlCommand(ctx, command);
DINGO_LOG_IF(ERROR, !status.ok()) << fmt::format(
"[control.region][region({})] dispatch region executor command failed, error: {} {}", region_id,
status.error_code(), status.error_str());

if (status.ok()) cond->IncreaseWait();
}
}

// Index region
if (GetRole() == pb::common::ClusterRole::INDEX) {
auto vector_index_wrapper = region->VectorIndexWrapper();
if (vector_index_wrapper != nullptr) {
vector_index_wrapper->Destroy();
}
} else {
DINGO_LOG(WARNING) << fmt::format("[control.region][region({})] delete region, invalid range {}", region_id,
region->RangeToString());
}

// Delete region executor
auto region_controller = Server::GetInstance().GetRegionController();

auto command = std::make_shared<pb::coordinator::RegionCmd>();
command->set_id(Helper::TimestampNs());
command->set_region_id(region_id);
Expand All @@ -309,11 +328,9 @@ butil::Status DeleteRegionTask::DeleteRegion(std::shared_ptr<Context> ctx, int64
command->mutable_destroy_executor_request()->set_region_id(region_id);

status = region_controller->DispatchRegionControlCommand(std::make_shared<Context>(), command);
if (!status.ok()) {
DINGO_LOG(ERROR) << fmt::format(
"[control.region][region({})] dispatch region executor command failed, error: {} {}", region_id,
status.error_code(), status.error_str());
}
DINGO_LOG_IF(ERROR, !status.ok()) << fmt::format(
"[control.region][region({})] dispatch region executor command failed, error: {} {}", region_id,
status.error_code(), status.error_str());

// Purge region for coordinator recycle_orphan_region mechanism
// TODO: need to implement a better mechanism of tombstone for region's meta info
Expand Down Expand Up @@ -543,7 +560,7 @@ butil::Status CheckChangeRegionLog(int64_t region_id, int64_t min_applied_log_id
if (log_entry.type == LogEntryType::kEntryTypeData) {
auto raft_cmd = std::make_shared<pb::raft::RaftCmdRequest>();
butil::IOBufAsZeroCopyInputStream wrapper(log_entry.data);
CHECK(raft_cmd->ParseFromZeroCopyStream(&wrapper));
CHECK(raft_cmd->ParseFromZeroCopyStream(&wrapper)) << "parse raft log fail.";
for (const auto& request : raft_cmd->requests()) {
if (request.cmd_type() == pb::raft::CmdType::SPLIT || request.cmd_type() == pb::raft::CmdType::PREPARE_MERGE ||
request.cmd_type() == pb::raft::CmdType::COMMIT_MERGE ||
Expand Down Expand Up @@ -1412,7 +1429,7 @@ void SnapshotVectorIndexTask::Run() {
} else {
auto status = SaveSnapshotAsync(ctx_, region_cmd_);
if (!status.ok()) {
DINGO_LOG(WARNING) << fmt::format("[control.region][region({})] save vector index failed, {}",
DINGO_LOG(WARNING) << fmt::format("[control.region][region({})] save vector index failed, {}",
region_cmd_->switch_split_request().region_id(), status.error_str());
}

Expand All @@ -1427,6 +1444,49 @@ void SnapshotVectorIndexTask::Run() {
}
}

butil::Status DeleteDataTask::DeleteData(std::shared_ptr<Context>, RegionCmdPtr region_cmd) {
const auto& range = region_cmd->delete_data_request().range();
std::vector<std::string> raw_cf_names = Helper::PbRepeatedToVector(region_cmd->delete_data_request().raw_cf_names());
std::vector<std::string> txn_cf_names = Helper::PbRepeatedToVector(region_cmd->delete_data_request().txn_cf_names());

auto region_raw_engine = Server::GetInstance().GetRawEngine(pb::common::RAW_ENG_BDB);
CHECK(region_raw_engine != nullptr) << fmt::format(
"[control.region][region({})] delete region, delete data, raw engine is null", region_cmd->region_id());

Helper::GetColumnFamilyNames(range.start_key(), raw_cf_names, txn_cf_names);
if (!raw_cf_names.empty()) {
auto status = region_raw_engine->Writer()->KvDeleteRange(raw_cf_names, range);
CHECK(status.ok()) << fmt::format("[control.region][region({})] delete region data raw failed, error: {}",
region_cmd->region_id(), status.error_str());
}

if (!txn_cf_names.empty()) {
pb::common::Range txn_range = Helper::GetMemComparableRange(range);
auto status = region_raw_engine->Writer()->KvDeleteRange(txn_cf_names, txn_range);
CHECK(status.ok()) << fmt::format("[control.region][region({})] delete region data txn failed, error: {}",
region_cmd->region_id(), status.error_str());
}

return butil::Status::OK();
}

void DeleteDataTask::Run() {
int64_t start_time = Helper::TimestampMs();
auto status = DeleteData(ctx_, region_cmd_);
if (!status.ok()) {
DINGO_LOG(ERROR) << fmt::format("[control.region][region({})][time({})] delete data failed, {}",
region_cmd_->region_id(), Helper::TimestampMs() - start_time, status.error_str());
} else {
DINGO_LOG(INFO) << fmt::format("[control.region][region({})][elapse_time({}ms)] delete data finish.",
region_cmd_->region_id(), Helper::TimestampMs() - start_time);
}

if (ctx_ != nullptr) {
auto cond = ctx_->SyncModeCond();
if (cond != nullptr) cond->DecreaseSignal();
}
}

bool ControlExecutor::Init() { return worker_->Init(); }

bool ControlExecutor::Execute(TaskRunnablePtr task) { return worker_->Execute(task); }
Expand Down Expand Up @@ -1558,6 +1618,12 @@ bool RegionController::Init() {
return false;
}

heavy_task_executor_ = std::make_shared<HeavyTaskExecutor>();
if (!heavy_task_executor_->Init()) {
DINGO_LOG(ERROR) << "[control.region] heavy task executor init failed.";
return false;
}

auto regions = Server::GetInstance().GetAllAliveRegion();
for (auto& region : regions) {
if (!RegisterExecutor(region->Id())) {
Expand All @@ -1577,7 +1643,7 @@ bool RegionController::Recover() {
for (auto& command : commands) {
auto ctx = std::make_shared<Context>();

auto status = InnerDispatchRegionControlCommand(ctx, command);
auto status = DispatchRegionControlCommandImpl(ctx, command);
if (!status.ok()) {
DINGO_LOG(ERROR) << fmt::format("[control.region] recover region control command failed, error: {}",
status.error_str());
Expand All @@ -1594,6 +1660,7 @@ void RegionController::Destroy() {
}

share_executor_->Stop();
heavy_task_executor_->Stop();
}

std::vector<int64_t> RegionController::GetAllRegion() {
Expand Down Expand Up @@ -1647,7 +1714,7 @@ std::shared_ptr<RegionControlExecutor> RegionController::GetRegionControlExecuto
return it->second;
}

butil::Status RegionController::InnerDispatchRegionControlCommand(std::shared_ptr<Context> ctx, RegionCmdPtr command) {
butil::Status RegionController::DispatchRegionControlCommandImpl(std::shared_ptr<Context> ctx, RegionCmdPtr command) {
DINGO_LOG(DEBUG) << fmt::format("[control.region][region({})] dispatch region control command, commad id: {} {}",
command->region_id(), command->id(),
pb::coordinator::RegionCmdType_Name(command->region_cmd_type()));
Expand All @@ -1657,39 +1724,30 @@ butil::Status RegionController::InnerDispatchRegionControlCommand(std::shared_pt
RegisterExecutor(command->region_id());
}

auto executor = (command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_PURGE ||
command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_DESTROY_EXECUTOR)
? share_executor_
: GetRegionControlExecutor(command->region_id());
ControlExecutorPtr executor;
if (command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_PURGE ||
command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_DESTROY_EXECUTOR) {
executor = share_executor_;
} else if (command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_DELETE_DATA) {
executor = heavy_task_executor_;
} else {
executor = GetRegionControlExecutor(command->region_id());
}
if (executor == nullptr) {
DINGO_LOG(ERROR) << fmt::format("[control.region][region({})] not find region control executor.",
command->region_id());
return butil::Status(pb::error::EREGION_NOT_FOUND, "Not find regon control executor");
}

auto it = task_builders.find(command->region_cmd_type());
if (it == task_builders.end()) {
DINGO_LOG(ERROR) << "[control.region] not exist region control command.";
return butil::Status(pb::error::EINTERNAL, "Not exist region control command");
}
CHECK(it != task_builders.end()) << "[control.region] not exist region control command.";

// Free at ExecuteRoutine()
auto task = it->second(ctx, command);
if (task == nullptr) {
DINGO_LOG(ERROR) << "[control.region] not support region control command.";
return butil::Status(pb::error::EINTERNAL, "Not support region control command");
}
CHECK(task != nullptr) << "[control.region] not support region control command.";

// Because DeleteRegion task is very heavy, we use apply_wkr to execute it for limiting the concurrency
if (command->region_cmd_type() == pb::coordinator::RegionCmdType::CMD_DELETE) {
auto apply_wkr = Server::GetInstance().GetRaftApplyWorkerSet();
if (!apply_wkr->Execute(task)) {
return butil::Status(pb::error::EINTERNAL, "Execute region control command failed");
}
} else {
if (!executor->Execute(task)) {
return butil::Status(pb::error::EINTERNAL, "Execute region control command failed");
}
if (!executor->Execute(task)) {
return butil::Status(pb::error::EINTERNAL, "Execute region control command failed");
}

return butil::Status();
Expand All @@ -1705,7 +1763,7 @@ butil::Status RegionController::DispatchRegionControlCommand(std::shared_ptr<Con
// Save region command
region_command_manager->AddCommand(command);

return InnerDispatchRegionControlCommand(ctx, command);
return DispatchRegionControlCommandImpl(ctx, command);
}

RegionController::ValidateFunc RegionController::GetValidater(pb::coordinator::RegionCmdType cmd_type) {
Expand Down Expand Up @@ -1775,6 +1833,10 @@ RegionController::TaskBuilderMap RegionController::task_builders = {
[](std::shared_ptr<Context> ctx, RegionCmdPtr command) -> TaskRunnablePtr {
return std::make_shared<HoldVectorIndexTask>(ctx, command);
}},
{pb::coordinator::CMD_DELETE_DATA,
[](std::shared_ptr<Context> ctx, RegionCmdPtr command) -> TaskRunnablePtr {
return std::make_shared<DeleteDataTask>(ctx, command);
}},
};

RegionController::ValidaterMap RegionController::validaters = {
Expand Down
35 changes: 33 additions & 2 deletions src/store/region_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -310,6 +311,22 @@ class SnapshotVectorIndexTask : public TaskRunnable {
RegionCmdPtr region_cmd_;
};

class DeleteDataTask : public TaskRunnable {
public:
DeleteDataTask(std::shared_ptr<Context> ctx, RegionCmdPtr region_cmd) : ctx_(ctx), region_cmd_(region_cmd) {}
~DeleteDataTask() override = default;

std::string Type() override { return "DELETE_DATA"; }

void Run() override;

private:
static butil::Status DeleteData(std::shared_ptr<Context> ctx, RegionCmdPtr region_cmd);

std::shared_ptr<Context> ctx_;
RegionCmdPtr region_cmd_;
};

class ControlExecutor {
public:
explicit ControlExecutor() { worker_ = Worker::New(); }
Expand All @@ -325,6 +342,17 @@ class ControlExecutor {
WorkerPtr worker_;
};

using ControlExecutorPtr = std::shared_ptr<ControlExecutor>;

// execute heavy task, e.g. bdb engine DeleteRange
class HeavyTaskExecutor : public ControlExecutor {
public:
HeavyTaskExecutor() = default;
~HeavyTaskExecutor() override = default;
};

using HeavyTaskExecutorPtr = std::shared_ptr<HeavyTaskExecutor>;

class RegionControlExecutor : public ControlExecutor {
public:
explicit RegionControlExecutor(int64_t region_id) : region_id_(region_id) {}
Expand Down Expand Up @@ -406,13 +434,16 @@ class RegionController {

private:
std::shared_ptr<RegionControlExecutor> GetRegionControlExecutor(int64_t region_id);
butil::Status InnerDispatchRegionControlCommand(std::shared_ptr<Context> ctx, RegionCmdPtr command);
butil::Status DispatchRegionControlCommandImpl(std::shared_ptr<Context> ctx, RegionCmdPtr command);

bthread_mutex_t mutex_;
std::unordered_map<int64_t, std::shared_ptr<RegionControlExecutor>> executors_;

// When have no regoin executor, used this executorm, like PURGE.
std::shared_ptr<ControlExecutor> share_executor_;
ControlExecutorPtr share_executor_;

// execute heavy task, e.g. bdb engine DeleteRange
HeavyTaskExecutorPtr heavy_task_executor_;

// task builder
using TaskBuildFunc = std::function<TaskRunnablePtr(std::shared_ptr<Context>, RegionCmdPtr)>;
Expand Down

0 comments on commit 4a7b1c4

Please sign in to comment.