Skip to content

Commit

Permalink
fix: incr sync shouldn't be established after full sync corrupted (#2756
Browse files Browse the repository at this point in the history
)

* add error_exited for rsyncClient

* add some logs

* add some state transition based on reviewer's opinion

---------

Co-authored-by: cheniujh <1271435567@qq.com>
  • Loading branch information
cheniujh and cheniujh committed Jul 6, 2024
1 parent d859e24 commit fb67dbb
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
2 changes: 1 addition & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SyncSlaveDB : public SyncDB {
void SetLocalIp(const std::string& local_ip);
void StopRsync();
pstd::Status ActivateRsync();
bool IsRsyncRunning() { return rsync_cli_->IsRunning(); }
bool IsRsyncExited() { return rsync_cli_->IsExitedFromRunning(); }

private:
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
Expand Down
5 changes: 5 additions & 0 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class RsyncClient : public net::Thread {
bool IsRunning() {
return state_.load() == RUNNING;
}
bool IsExitedFromRunning() {
return state_.load() == STOP && all_worker_exited_.load();
}
bool IsStop() {
return state_.load() == STOP;
}
Expand Down Expand Up @@ -92,6 +95,8 @@ class RsyncClient : public net::Thread {
std::atomic<int> finished_work_cnt_ = 0;

std::atomic<State> state_;
std::atomic<bool> error_stopped_{false};
std::atomic<bool> all_worker_exited_{true};
int max_retries_ = 10;
std::unique_ptr<WaitObjectManager> wo_mgr_;
std::condition_variable cond_;
Expand Down
18 changes: 11 additions & 7 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,19 @@ Status DB::GetBgSaveUUID(std::string* snapshot_uuid) {
// 2, Replace the old db
// 3, Update master offset, and the PikaAuxiliaryThread cron will connect and do slaveof task with master
bool DB::TryUpdateMasterOffset() {
std::string info_path = dbsync_path_ + kBgsaveInfoFile;
if (!pstd::FileExists(info_path)) {
LOG(WARNING) << "info path: " << info_path << " not exist";
return false;
}

std::shared_ptr<SyncSlaveDB> slave_db =
g_pika_rm->GetSyncSlaveDBByName(DBInfo(db_name_));
if (!slave_db) {
LOG(WARNING) << "Slave DB: " << db_name_ << " not exist";
LOG(ERROR) << "Slave DB: " << db_name_ << " not exist";
slave_db->SetReplState(ReplState::kError);
return false;
}

std::string info_path = dbsync_path_ + kBgsaveInfoFile;
if (!pstd::FileExists(info_path)) {
LOG(WARNING) << "info path: " << info_path << " not exist, Slave DB:" << GetDBName() << " will restart the sync process...";
// May failed in RsyncClient, thus the complete snapshot dir got deleted
slave_db->SetReplState(ReplState::kTryConnect);
return false;
}

Expand Down Expand Up @@ -475,6 +478,7 @@ bool DB::TryUpdateMasterOffset() {
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name_));
if (!master_db) {
LOG(WARNING) << "Master DB: " << db_name_ << " not exist";
slave_db->SetReplState(ReplState::kError);
return false;
}
master_db->Logger()->SetProducerStatus(filenum, offset);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ Status PikaReplicaManager::RunSyncSlaveDBStateMachine() {
std::shared_ptr<DB> db =
g_pika_server->GetDB(p_info.db_name_);
if (db) {
if (!s_db->IsRsyncRunning()) {
if (s_db->IsRsyncExited()) {
db->TryUpdateMasterOffset();
}
} else {
Expand Down
34 changes: 26 additions & 8 deletions src/rsync_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ void RsyncClient::Copy(const std::set<std::string>& file_set, int index) {
break;
}
}
LOG(INFO) << "work_thread index: " << index << " copy remote files done";
if (!error_stopped_.load()) {
LOG(INFO) << "work_thread index: " << index << " copy remote files done";
}
finished_work_cnt_.fetch_add(1);
cond_.notify_all();
}
Expand Down Expand Up @@ -77,8 +79,10 @@ bool RsyncClient::Init() {

void* RsyncClient::ThreadMain() {
if (file_set_.empty()) {
LOG(INFO) << "No remote files need copy, RsyncClient exit";
LOG(INFO) << "No remote files need copy, RsyncClient exit and going to delete dir:" << dir_;
DeleteDirIfExist(dir_);
state_.store(STOP);
all_worker_exited_.store(true);
return nullptr;
}

Expand All @@ -89,7 +93,7 @@ void* RsyncClient::ThreadMain() {
for (const auto& file : file_set_) {
file_vec[index++ % GetParallelNum()].insert(file);
}

all_worker_exited_.store(false);
for (int i = 0; i < GetParallelNum(); i++) {
work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i));
}
Expand All @@ -98,8 +102,9 @@ void* RsyncClient::ThreadMain() {
std::ofstream outfile;
outfile.open(meta_file_path, std::ios_base::app);
if (!outfile.is_open()) {
LOG(FATAL) << "unable to open meta file " << meta_file_path << ", error:" << strerror(errno);
return nullptr;
LOG(ERROR) << "unable to open meta file " << meta_file_path << ", error:" << strerror(errno);
error_stopped_.store(true);
state_.store(STOP);
}
DEFER {
outfile.close();
Expand Down Expand Up @@ -144,7 +149,18 @@ void* RsyncClient::ThreadMain() {
}
finished_work_cnt_.store(0);
state_.store(STOP);
LOG(INFO) << "RsyncClient copy remote files done";
if (!error_stopped_.load()) {
LOG(INFO) << "RsyncClient copy remote files done";
} else {
if (DeleteDirIfExist(dir_)) {
//the dir_ doesn't not exist OR it's existing but successfully deleted
LOG(ERROR) << "RsyncClient stopped with errors, deleted:" << dir_;
} else {
//the dir_ exists but failed to delete
LOG(ERROR) << "RsyncClient stopped with errors, but failed to delete " << dir_ << " when cleaning";
}
}
all_worker_exited_.store(true);
return nullptr;
}

Expand Down Expand Up @@ -217,8 +233,9 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {

if (resp->snapshot_uuid() != snapshot_uuid_) {
LOG(WARNING) << "receive newer dump, reset state to STOP, local_snapshot_uuid:"
<< snapshot_uuid_ << "remote snapshot uuid: " << resp->snapshot_uuid();
<< snapshot_uuid_ << ", remote snapshot uuid: " << resp->snapshot_uuid();
state_.store(STOP);
error_stopped_.store(true);
return s;
}

Expand Down Expand Up @@ -314,7 +331,8 @@ bool RsyncClient::ComparisonUpdate() {
return false;
}

state_ = RUNNING;
state_.store(RUNNING);
error_stopped_.store(false);
LOG(INFO) << "copy meta data done, db name: " << db_name_
<< " snapshot_uuid: " << snapshot_uuid_
<< " file count: " << file_set_.size()
Expand Down

0 comments on commit fb67dbb

Please sign in to comment.