From 6cd3e64f490b159d62695996ff2fa24a37bfae26 Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Fri, 24 May 2024 19:30:17 +0800 Subject: [PATCH 1/2] fix the problem that BinlogAckEnd smaller than BinlogAckStart(due to the Master clean un-relevant WriteQueue when one DB timeout) (#2666) Co-authored-by: cjh <1271435567@qq.com> --- include/pika_rm.h | 1 + src/pika_rm.cc | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/include/pika_rm.h b/include/pika_rm.h index b3e310ddf..d035b400f 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -172,6 +172,7 @@ class PikaReplicaManager { // write_queue related void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector& tasks); + void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name); void DropItemInWriteQueue(const std::string& ip, int port); int ConsumeWriteQueue(); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index a996463bc..99cbd548d 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -99,7 +99,7 @@ Status SyncMasterDB::ActivateSlaveBinlogSync(const std::string& ip, int port, co } //Since we init a new reader, we should drop items in write queue and reset sync_window. //Or the sent_offset and acked_offset will not match - g_pika_rm->DropItemInWriteQueue(ip, port); + g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName()); slave_ptr->sync_win.Reset(); slave_ptr->b_state = kReadFromFile; } @@ -335,7 +335,7 @@ Status SyncMasterDB::CheckSyncTimeout(uint64_t now) { for (auto& node : to_del) { coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port()); - g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port()); + g_pika_rm->DropItemInOneWriteQueue(node.Ip(), node.Port(), DBName()); LOG(WARNING) << SyncDBInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString(); } return Status::OK(); @@ -645,6 +645,14 @@ int PikaReplicaManager::ConsumeWriteQueue() { return counter; } +void PikaReplicaManager::DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name) { + std::lock_guard l(write_queue_mu_); + std::string index = ip + ":" + std::to_string(port); + if (write_queues_.find(index) != write_queues_.end()) { + write_queues_[index].erase(db_name); + } +} + void PikaReplicaManager::DropItemInWriteQueue(const std::string& ip, int port) { std::lock_guard l(write_queue_mu_); std::string index = ip + ":" + std::to_string(port); From 1b2bfbe40b6fc4eb3e9f1314deab3eca2ca6a347 Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Fri, 24 May 2024 19:32:22 +0800 Subject: [PATCH 2/2] fix: fix data race around spop Dobinlog (#2674) * fix spop binlog data race * adjust format --------- Co-authored-by: cjh <1271435567@qq.com> --- include/pika_set.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/include/pika_set.h b/include/pika_set.h index 34e47d5ed..c4b8eb203 100644 --- a/include/pika_set.h +++ b/include/pika_set.h @@ -66,8 +66,12 @@ class SPopCmd : public Cmd { public: SPopCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag, static_cast(AclCategory::SET)) { - srem_cmd_ = std::make_shared(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet); - } + srem_cmd_ = std::make_shared(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet); + } + SPopCmd(const SPopCmd& other) + : Cmd(other), key_(other.key_), members_(other.members_), count_(other.count_), s_(other.s_) { + srem_cmd_ = std::make_shared(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet); + } std::vector current_key() const override { std::vector res; res.push_back(key_);