Skip to content

Commit

Permalink
Updated flushall write Binlog logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Mixficsol committed Mar 22, 2024
1 parent 5612645 commit 110a95d
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 49 deletions.
4 changes: 1 addition & 3 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,9 @@ class FlushallCmd : public Cmd {
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void FlushAllWithoutLock();
void DoBinlog(std::shared_ptr<SyncMasterDB> sync_db_);

private:
void DoInitial() override;
std::string ToRedisProtocol() override;
void DoWithoutLock(std::shared_ptr<DB> db);
};

Expand All @@ -211,7 +209,7 @@ class FlushdbCmd : public Cmd {
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllDBsWithoutLock();
std::string GetFlushDname() { return db_name_; }
std::string GetFlushDBname() { return db_name_; }

private:
std::string db_name_;
Expand Down
46 changes: 1 addition & 45 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,19 +559,6 @@ void FlushallCmd::DoUpdateCache() {
}
}

// flushall convert flushdb writes to every db binlog
std::string FlushallCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 1, "*");

// to flushdb cmd
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return content;
}

void FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_ptr<DB> db = db_item.second;
Expand All @@ -581,45 +568,17 @@ void FlushallCmd::FlushAllWithoutLock() {
return;
}
DoWithoutLock(db);
DoBinlog(g_pika_rm->GetSyncMasterDBs()[p_info]);
}
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::DoBinlog(std::shared_ptr<SyncMasterDB> sync_db) {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
// Consider that dummy cmd appended by system, both conn and resp are null.
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
if (!conn_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

Status s = sync_db->ConsensusProposeLog(shared_from_this());
if (!s.ok()) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db->FlushDBWithoutLock();
DoUpdateCache();
}
}

Expand All @@ -643,15 +602,14 @@ void FlushdbCmd::Do() {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db_->GetDBLock());
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
}
}


void FlushdbCmd::DoThroughDB() {
Do();
}
Expand All @@ -670,7 +628,6 @@ void FlushdbCmd::FlushAllDBsWithoutLock() {
return;
}
DoWithoutLock();
DoBinlog();
}

void FlushdbCmd::DoWithoutLock() {
Expand All @@ -683,7 +640,6 @@ void FlushdbCmd::DoWithoutLock() {
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
}
DoUpdateCache();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) {
if (c_ptr->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(c_ptr);
SetTxnFailedFromDBs(flushdb->GetFlushDname());
SetTxnFailedFromDBs(flushdb->GetFlushDBname());
} else if (c_ptr->name() == kCmdNameFlushall) {
SetAllTxnFailed();
} else {
Expand Down

0 comments on commit 110a95d

Please sign in to comment.