Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flushdb and flushall bug #2533

Merged
merged 5 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,14 @@ class FlushallCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::KEYSPACE)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void DoUpdateCache(std::shared_ptr<DB> db);
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void Execute() override;
void FlushAllWithoutLock();
void DoBinlog(std::shared_ptr<SyncMasterDB> sync_db_);

private:
void DoInitial() override;
std::string ToRedisProtocol() override;
void DoWithoutLock(std::shared_ptr<DB> db);
};

Expand All @@ -212,7 +209,6 @@ class FlushdbCmd : public Cmd {
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllDBsWithoutLock();
void Execute() override;
std::string GetFlushDBname() { return db_name_; }

private:
Expand Down Expand Up @@ -265,7 +261,6 @@ class InfoCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -333,7 +328,6 @@ class ConfigCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Expand Down
5 changes: 1 addition & 4 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Init();
bool TryUpdateMasterOffset();
/*
* FlushDB & FlushSubDB use
* FlushDB used
*/
bool FlushDB();
bool FlushSubDB(const std::string& db_name);
bool FlushDBWithoutLock();
bool FlushSubDBWithoutLock(const std::string& db_name);
bool ChangeDb(const std::string& new_path);
pstd::Status GetBgSaveUUID(std::string* snapshot_uuid);
void PrepareRsync();
Expand Down
2 changes: 0 additions & 2 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class ExecCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::TRANSACTION)) {}
void Do() override;
Cmd* Clone() override { return new ExecCmd(*this); }
void Execute() override;
void Split(const HintKeys& hint_keys) override {}
void Merge() override {}
std::vector<std::string> current_key() const override { return {}; }
Expand Down Expand Up @@ -79,7 +78,6 @@ class WatchCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::TRANSACTION)) {}

void Do() override;
void Execute() override;
void Split(const HintKeys& hint_keys) override {}
Cmd* Clone() override { return new WatchCmd(*this); }
void Merge() override {}
Expand Down
112 changes: 20 additions & 92 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,39 +526,8 @@ void FlushallCmd::DoInitial() {
return;
}
}
void FlushallCmd::Do() {
if (!db_) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db_->FlushDB();
}
}

void FlushallCmd::DoThroughDB() {
Do();
}

void FlushallCmd::DoUpdateCache() {
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
g_pika_server->ClearCacheDbAsync(db_);
}
}

// flushall convert flushdb writes to every db binlog
std::string FlushallCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 1, "*");

// to flushdb cmd
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return content;
}

void FlushallCmd::Execute() {
void FlushallCmd::Do() {
std::lock_guard l_trw(g_pika_server->GetDBLock());
for (const auto& db_item : g_pika_server->GetDB()) {
if (db_item.second->IsKeyScaning()) {
Expand All @@ -580,6 +549,17 @@ void FlushallCmd::Execute() {
}
}

void FlushallCmd::DoThroughDB() {
Do();
}

void FlushallCmd::DoUpdateCache(std::shared_ptr<DB> db) {
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
g_pika_server->ClearCacheDbAsync(db);
}
}

void FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_ptr<DB> db = db_item.second;
Expand All @@ -589,45 +569,18 @@ void FlushallCmd::FlushAllWithoutLock() {
return;
}
DoWithoutLock(db);
DoBinlog(g_pika_rm->GetSyncMasterDBs()[p_info]);
}
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::DoBinlog(std::shared_ptr<SyncMasterDB> sync_db) {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
// Consider that dummy cmd appended by system, both conn and resp are null.
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
if (!conn_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

Status s = sync_db->ConsensusProposeLog(shared_from_this());
if (!s.ok()) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db->FlushDBWithoutLock();
DoUpdateCache();
DoUpdateCache(db);
}
}

Expand All @@ -646,18 +599,19 @@ void FlushdbCmd::DoInitial() {

void FlushdbCmd::Do() {
if (!db_) {
LOG(INFO) << "Flushdb, but DB not found";
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (db_name_ == "all") {
db_->FlushDB();
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
}
}


void FlushdbCmd::DoThroughDB() {
Do();
}
Expand All @@ -676,7 +630,6 @@ void FlushdbCmd::FlushAllDBsWithoutLock() {
return;
}
DoWithoutLock();
DoBinlog();
}

void FlushdbCmd::DoWithoutLock() {
Expand All @@ -689,22 +642,6 @@ void FlushdbCmd::DoWithoutLock() {
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
}
DoUpdateCache();
}
}

void FlushdbCmd::Execute() {
if (!db_) {
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db_->GetDBLock());
std::lock_guard s_prw(g_pika_rm->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
}
}

Expand Down Expand Up @@ -1465,11 +1402,6 @@ std::string InfoCmd::CacheStatusToString(int status) {
}
}

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
}

void ConfigCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameConfig);
Expand Down Expand Up @@ -2793,10 +2725,6 @@ void ConfigCmd::ConfigResetstat(std::string& ret) {
ret = "+OK\r\n";
}

void ConfigCmd::Execute() {
Do();
}

void MonitorCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameMonitor);
Expand Down
40 changes: 1 addition & 39 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ DisplayCacheInfo DB::GetCacheInfo() {
}

bool DB::FlushDBWithoutLock() {
std::lock_guard l(bgsave_protector_);
if (bgsave_info_.bgsaving) {
return false;
}
Expand All @@ -225,34 +226,6 @@ bool DB::FlushDBWithoutLock() {
return true;
}

bool DB::FlushSubDBWithoutLock(const std::string& db_name) {
std::lock_guard l(bgsave_protector_);
if (bgsave_info_.bgsaving) {
return false;
}

LOG(INFO) << db_name_ << " Delete old " + db_name + " db...";
storage_.reset();

std::string dbpath = db_path_;
if (dbpath[dbpath.length() - 1] != '/') {
dbpath.append("/");
}

std::string sub_dbpath = dbpath + db_name;
std::string del_dbpath = dbpath + db_name + "_deleting";
pstd::RenameFile(sub_dbpath, del_dbpath);

storage_ = std::make_shared<storage::Storage>(g_pika_conf->db_instance_num(),
g_pika_conf->default_slot_num(), g_pika_conf->classic_mode());
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_);
assert(storage_);
assert(s.ok());
LOG(INFO) << db_name_ << " open new " + db_name + " db success";
g_pika_server->PurgeDir(del_dbpath);
return true;
}

void DB::DoBgSave(void* arg) {
std::unique_ptr<BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));

Expand Down Expand Up @@ -575,11 +548,6 @@ void DB::ClearBgsave() {
bgsave_info_.Clear();
}

bool DB::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(dbs_rw_);
return FlushSubDBWithoutLock(db_name);
}

void DB::UpdateCacheInfo(CacheInfo& cache_info) {
std::unique_lock<std::shared_mutex> lock(cache_info_rwlock_);

Expand Down Expand Up @@ -628,9 +596,3 @@ void DB::ResetDisplayCacheInfo(int status) {
cache_info_.waitting_load_keys_num = 0;
cache_usage_ = 0;
}

bool DB::FlushDB() {
std::lock_guard rwl(dbs_rw_);
std::lock_guard l(bgsave_protector_);
return FlushDBWithoutLock();
}
40 changes: 16 additions & 24 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ void MultiCmd::DoInitial() {
void ExecCmd::Do() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
conn = GetConn();
client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
std::vector<CmdRes> res_vec = {};
std::vector<std::shared_ptr<std::string>> resp_strs;
for (size_t i = 0; i < cmds_.size(); ++i) {
Expand Down Expand Up @@ -83,26 +99,6 @@ void ExecCmd::Do() {
for (auto &r : res_vec) {
res_.AppendStringRaw(r.message());
}
}

void ExecCmd::Execute() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
Do();
Unlock();
ServeToBLrPopWithKeys();
list_cmd_.clear();
Expand Down Expand Up @@ -245,10 +241,6 @@ void WatchCmd::Do() {
res_.SetRes(CmdRes::kOk);
}

void WatchCmd::Execute() {
Do();
}

void WatchCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, name());
Expand Down
Loading
Loading