Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into multi_get
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored and brother-jin committed May 28, 2024
2 parents d6b77c0 + 1b2bfbe commit 2b50e37
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 7 deletions.
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class PikaReplicaManager {

// write_queue related
void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector<WriteTask>& tasks);
void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name);
void DropItemInWriteQueue(const std::string& ip, int port);
int ConsumeWriteQueue();

Expand Down
8 changes: 6 additions & 2 deletions include/pika_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ class SPopCmd : public Cmd {
public:
SPopCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::SET)) {
srem_cmd_ = std::make_shared<SRemCmd>(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet);
}
srem_cmd_ = std::make_shared<SRemCmd>(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet);
}
SPopCmd(const SPopCmd& other)
: Cmd(other), key_(other.key_), members_(other.members_), count_(other.count_), s_(other.s_) {
srem_cmd_ = std::make_shared<SRemCmd>(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet);
}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
Expand Down
4 changes: 4 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,10 @@ std::vector<std::string> Cmd::SubCommand() const { return subCmdName_; };
bool Cmd::IsAdminRequire() const { return (flag_ & kCmdFlagsAdminRequire); }
bool Cmd::IsNeedUpdateCache() const { return (flag_ & kCmdFlagsUpdateCache); }
bool Cmd::IsNeedCacheDo() const {
if (g_pika_conf->IsCacheDisabledTemporarily()) {
return false;
}

if (hasFlag(kCmdFlagsKv)) {
if (!g_pika_conf->GetCacheString()) {
return false;
Expand Down
6 changes: 4 additions & 2 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ void MgetCmd::DoInitial() {
keys_ = argv_;
keys_.erase(keys_.begin());
split_res_.resize(keys_.size());
cache_miss_keys_.clear();
}

void MgetCmd::AssembleResponseFromCache() {
Expand All @@ -526,7 +527,9 @@ void MgetCmd::AssembleResponseFromCache() {
}

void MgetCmd::Do() {

if(cache_miss_keys_.size() == 0){
cache_miss_keys_ = keys_;
}
db_value_status_array_.clear();
s_ = db_->storage()->MGet(cache_miss_keys_, &db_value_status_array_);
if (!s_.ok()) {
Expand Down Expand Up @@ -576,7 +579,6 @@ void MgetCmd::DoThroughDB() {
}

void MgetCmd::ReadCache() {
cache_miss_keys_.clear();
for (const auto key : keys_) {
std::string value;
auto s = db_->cache()->Get(const_cast<std::string&>(key), &value);
Expand Down
12 changes: 10 additions & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status SyncMasterDB::ActivateSlaveBinlogSync(const std::string& ip, int port, co
}
//Since we init a new reader, we should drop items in write queue and reset sync_window.
//Or the sent_offset and acked_offset will not match
g_pika_rm->DropItemInWriteQueue(ip, port);
g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName());
slave_ptr->sync_win.Reset();
slave_ptr->b_state = kReadFromFile;
}
Expand Down Expand Up @@ -335,7 +335,7 @@ Status SyncMasterDB::CheckSyncTimeout(uint64_t now) {

for (auto& node : to_del) {
coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port());
g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port());
g_pika_rm->DropItemInOneWriteQueue(node.Ip(), node.Port(), DBName());
LOG(WARNING) << SyncDBInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
}
return Status::OK();
Expand Down Expand Up @@ -645,6 +645,14 @@ int PikaReplicaManager::ConsumeWriteQueue() {
return counter;
}

void PikaReplicaManager::DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
if (write_queues_.find(index) != write_queues_.end()) {
write_queues_[index].erase(db_name);
}
}

void PikaReplicaManager::DropItemInWriteQueue(const std::string& ip, int port) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
Expand Down
5 changes: 4 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ void PikaServer::ClearHitRatio(std::shared_ptr<DB> db) {

void PikaServer::OnCacheStartPosChanged(int zset_cache_start_direction, std::shared_ptr<DB> db) {
ResetCacheConfig(db);
ClearCacheDbAsync(db);
ClearCacheDbAsyncV2(db);
}

void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
Expand All @@ -1726,6 +1726,9 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
arg->conf = std::move(g_pika_conf);
arg->reenable_cache = true;
common_bg_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));
if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus()) {
g_pika_conf->UnsetCacheDisableFlag();
}
}

void PikaServer::ProcessCronTask() {
Expand Down

0 comments on commit 2b50e37

Please sign in to comment.