Skip to content

Commit

Permalink
handle comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed May 6, 2024
1 parent a4a6623 commit 66682c3
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 79 deletions.
66 changes: 21 additions & 45 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ extern pikiwidb::PConfig g_config;

namespace pikiwidb {

DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) {
DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {
storage::StorageOptions storage_options;
storage_options.options = g_config.GetRocksDBOptions();
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_instance_num = g_config.db_instance_num.load();
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -44,48 +44,26 @@ DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
INFO("Open DB{} success!", db_index_);
}

void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold the storage's shared lock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the storage's exclusion lock

// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
}

void DB::CreateCheckpoint(const std::string& path, bool sync) {
auto tmp_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(tmp_path)) {
WARN("Create dir {} fail !", tmp_path);
void DB::CreateCheckpoint(const std::string& checkpoint_path, bool sync) {
auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(checkpoint_sub_path)) {
WARN("Create dir {} fail !", checkpoint_sub_path);
return;
}

std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i);
result.push_back(std::move(res));
}
std::shared_lock sharedLock(storage_mutex_);
auto result = storage_->CreateCheckpoint(checkpoint_sub_path);
if (sync) {
for (auto& r : result) {
r.get();
}
}
}

void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) {
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[maybe_unused]]) {
auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(checkpoint_sub_path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_sub_path);
return;
}
if (0 != pstd::IsDir(db_path_)) {
Expand All @@ -97,20 +75,15 @@ void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) {

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
opened_ = false;
std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
auto result = storage_->LoadCheckpoint(checkpoint_sub_path, db_path_);

for (auto& r : result) {
r.get();
}

storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.options = g_config.GetRocksDBOptions();
storage_options.db_instance_num = g_config.db_instance_num.load();
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -121,14 +94,17 @@ void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
storage_options.do_snapshot_function =
std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
abort();
}

opened_ = true;
INFO("DB{} load a checkpoint from {} success!", db_index_, path);
INFO("DB{} load a checkpoint from {} success!", db_index_, checkpoint_path);
}
} // namespace pikiwidb
7 changes: 1 addition & 6 deletions src/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace pikiwidb {

class DB {
public:
DB(int db_index, const std::string& db_path, int rocksdb_inst_num);
DB(int db_index, const std::string& db_path);

std::unique_ptr<storage::Storage>& GetStorage() { return storage_; }

Expand All @@ -36,14 +36,9 @@ class DB {

int GetDbIndex() { return db_index_; }

private:
void DoCheckpoint(const std::string&, int i);
void LoadCheckpoint(const std::string&, const std::string& db_path, int i);

private:
const int db_index_ = 0;
const std::string db_path_;
int rocksdb_inst_num_ = 0;
/**
* If you want to change the pointer that points to storage,
* you must first acquire a mutex lock.
Expand Down
8 changes: 6 additions & 2 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,13 @@ class Storage {

Status Open(const StorageOptions& storage_options, const std::string& db_path);

Status CreateCheckpoint(const std::string& dump_path, int index);
std::vector<std::future<Status>> CreateCheckpoint(const std::string& checkpoint_path);

Status LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int index);
Status CreateCheckpointInternal(const std::string& checkpoint_path, int db_index);

std::vector<std::future<Status>> LoadCheckpoint(const std::string& checkpoint_path, const std::string& db_path);

Status LoadCheckpointInternal(const std::string& dump_path, const std::string& db_path, int index);

Status LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key);

Expand Down
77 changes: 52 additions & 25 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

#include <algorithm>
#include <filesystem>
#include <future>
#include <string_view>
#include <utility>
#include <vector>

#include "binlog.pb.h"
#include "config.h"
Expand Down Expand Up @@ -148,83 +150,109 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d
return Status::OK();
}

Status Storage::CreateCheckpoint(const std::string& dump_path, int i) {
INFO("DB{}'s RocksDB {} begin to generate a checkpoint!", db_id_, i);
auto source_dir = AppendSubDirectory(dump_path, db_id_);
source_dir = AppendSubDirectory(source_dir, i);
std::vector<std::future<Status>> Storage::CreateCheckpoint(const std::string& checkpoint_path) {
INFO("DB{} begin to generate a checkpoint to {}", db_id_, checkpoint_path);
// auto source_dir = AppendSubDirectory(checkpoint_path, db_id_);

std::vector<std::future<Status>> result;
result.reserve(db_instance_num_);
for (int i = 0; i < db_instance_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i.
auto res = std::async(std::launch::async, &Storage::CreateCheckpointInternal, this, checkpoint_path, i);
result.push_back(std::move(res));
}
return result;
}

Status Storage::CreateCheckpointInternal(const std::string& checkpoint_path, int index) {
auto source_dir = AppendSubDirectory(checkpoint_path, index);

auto tmp_dir = source_dir + ".tmp";
// 1) Make sure the temporary directory does not exist
if (!pstd::DeleteDirIfExist(tmp_dir)) {
WARN("DB{}'s RocksDB {} delete directory fail!", db_id_, i);
WARN("DB{}'s RocksDB {} delete directory fail!", db_id_, index);
return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", tmp_dir);
}

// 2) Create checkpoint object of this RocksDB
rocksdb::Checkpoint* checkpoint = nullptr;
auto db = insts_[i]->GetDB();
auto db = insts_[index]->GetDB();
rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint);
if (!s.ok()) {
WARN("DB{}'s RocksDB {} create checkpoint object failed!. Error: ", db_id_, i, s.ToString());
WARN("DB{}'s RocksDB {} create checkpoint object failed!. Error: ", db_id_, index, s.ToString());
return s;
}

// 3) Create a checkpoint
std::unique_ptr<rocksdb::Checkpoint> checkpoint_guard(checkpoint);
s = checkpoint->CreateCheckpoint(tmp_dir, kFlush, nullptr);
if (!s.ok()) {
WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, i, s.ToString());
WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, index, s.ToString());
return s;
}

// 4) Make sure the source directory does not exist
if (!pstd::DeleteDirIfExist(source_dir)) {
WARN("DB{}'s RocksDB {} delete directory {} fail!", db_id_, i, source_dir);
WARN("DB{}'s RocksDB {} delete directory {} fail!", db_id_, index, source_dir);
if (!pstd::DeleteDirIfExist(tmp_dir)) {
WARN("DB{}'s RocksDB {} fail to delete the temporary directory {} ", db_id_, i, tmp_dir);
WARN("DB{}'s RocksDB {} fail to delete the temporary directory {} ", db_id_, index, tmp_dir);
}
return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", source_dir);
}

// 5) Rename the temporary directory to source directory
if (auto status = pstd::RenameFile(tmp_dir, source_dir); status != 0) {
WARN("DB{}'s RocksDB {} rename temporary directory {} to source directory {} fail!", db_id_, i, tmp_dir,
WARN("DB{}'s RocksDB {} rename temporary directory {} to source directory {} fail!", db_id_, index, tmp_dir,
source_dir);
if (!pstd::DeleteDirIfExist(tmp_dir)) {
WARN("DB{}'s RocksDB {} fail to delete the rename failed directory {} ", db_id_, i, tmp_dir);
WARN("DB{}'s RocksDB {} fail to delete the rename failed directory {} ", db_id_, index, tmp_dir);
}
return Status::IOError("Rename directory {} fail!", tmp_dir);
}

INFO("DB{}'s RocksDB {} create checkpoint {} success!", db_id_, i, source_dir);
INFO("DB{}'s RocksDB {} create checkpoint {} success!", db_id_, index, source_dir);
return Status::OK();
}

Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) {
auto rocksdb_checkpoint_path = AppendSubDirectory(dump_path, i);
INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}", db_id_, i, rocksdb_checkpoint_path);
auto rocksdb_path = AppendSubDirectory(db_path, i); // ./db/db_id/i
auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/i.tmp
insts_[i].reset();
std::vector<std::future<Status>> Storage::LoadCheckpoint(const std::string& checkpoint_sub_path,
const std::string& db_sub_path) {
INFO("DB{} begin to load a checkpoint from {} to {}", db_id_, checkpoint_sub_path, db_sub_path);
std::vector<std::future<Status>> result;
result.reserve(db_instance_num_);
for (int i = 0; i < db_instance_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
auto res =
std::async(std::launch::async, &Storage::LoadCheckpointInternal, this, checkpoint_sub_path, db_sub_path, i);
result.push_back(std::move(res));
}
return result;
}

Status Storage::LoadCheckpointInternal(const std::string& checkpoint_sub_path, const std::string& db_sub_path,
int index) {
auto rocksdb_path = AppendSubDirectory(db_sub_path, index); // ./db/db_id/index
auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/index.tmp
insts_[index].reset();

auto source_dir = AppendSubDirectory(checkpoint_sub_path, index);
// 1) Rename the original db to db.tmp, and only perform the maximum possible recovery of data
// when loading the checkpoint fails.
if (auto status = pstd::RenameFile(rocksdb_path, tmp_rocksdb_path); status != 0) {
WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path,
WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, index, rocksdb_path,
tmp_rocksdb_path);
return Status::IOError("Rename directory {} fail!", db_path);
return Status::IOError("Rename directory {} fail!", rocksdb_path);
}

// 2) Create a db directory to save the checkpoint.
if (0 != pstd::CreatePath(rocksdb_path)) {
pstd::RenameFile(tmp_rocksdb_path, rocksdb_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, index, checkpoint_sub_path);
return Status::IOError("Create directory {} fail!", rocksdb_path);
}
if (RecursiveLinkAndCopy(rocksdb_checkpoint_path, rocksdb_path) != 0) {
if (RecursiveLinkAndCopy(source_dir, rocksdb_path) != 0) {
pstd::DeleteDir(rocksdb_path);
pstd::RenameFile(tmp_rocksdb_path, rocksdb_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, index, source_dir);
return Status::IOError("recursive link and copy directory {} fail!", rocksdb_path);
}

Expand Down Expand Up @@ -2340,7 +2368,6 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {

rocksdb::WriteBatch batch;
bool is_finished_start = true;
// 提前获取 seq, 每次自增, 需要保证该操作串行执行?
auto seqno = inst->GetDB()->GetLatestSequenceNumber();
for (const auto& entry : log.entries()) {
if (inst->IsRestarting() && inst->IsApplied(entry.cf_idx(), log_idx)) [[unlikely]] {
Expand Down
2 changes: 1 addition & 1 deletion src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void PStore::Init(int db_number) {
db_number_ = db_number;
backends_.reserve(db_number_);
for (int i = 0; i < db_number_; i++) {
auto db = std::make_unique<DB>(i, g_config.db_path, g_config.db_instance_num);
auto db = std::make_unique<DB>(i, g_config.db_path);
backends_.push_back(std::move(db));
INFO("Open DB_{} success!", i);
}
Expand Down

0 comments on commit 66682c3

Please sign in to comment.