Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into bugfix/bgsave_inconsi…
Browse files Browse the repository at this point in the history
…stent
  • Loading branch information
cheniujh committed Jul 22, 2024
2 parents 9c8f04e + 6c0e0d4 commit ada3f48
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 73 deletions.
7 changes: 4 additions & 3 deletions codis/pkg/models/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) {
var newMasterIndex = -1

for index, server := range g.Servers {
if index == 0 || server.State != GroupServerStateNormal {
if index == 0 || server.State != GroupServerStateNormal || !server.IsEligibleForMasterElection {
continue
}

Expand Down Expand Up @@ -84,8 +84,9 @@ type GroupServer struct {
// master or slave
Role GroupServerRole `json:"role"`
// If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`

// Monitoring status, 0 normal, 1 subjective offline, 2 actual offline
// If marked as 2 , no service is provided
Expand Down
11 changes: 10 additions & 1 deletion codis/pkg/topom/topom_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
err = s.storeUpdateGroup(group)
// clean cache whether err is nil or not
Expand Down Expand Up @@ -531,7 +532,12 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
continue
}

err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
if server.IsEligibleForMasterElection {
err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
} else {
err = updateMasterToNewOneForcefully(server.Addr, newMasterAddr, s.config.ProductAuth)
}

if err != nil {
// skip err, and retry to update master-slave replication relationship through next heartbeat check
err = nil
Expand All @@ -548,14 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
}

func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s]", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, false)
}

func promoteServerToNewMaster(serverAddr, auth string) (err error) {
log.Infof("[%s] switch master to NO:ONE", serverAddr)
return setNewRedisMaster(serverAddr, "NO:ONE", auth, false)
}

func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, true)
}

Expand Down
3 changes: 2 additions & 1 deletion codis/pkg/topom/topom_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *mo

if len(recoveredGroupServersState) > 0 {
// offline GroupServer's service has recovered, check and fix it's master-slave replication relationship
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState,len(masterOfflineGroups))
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState, len(masterOfflineGroups))
}

return nil
Expand Down Expand Up @@ -92,6 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
}
}
Expand Down
18 changes: 10 additions & 8 deletions codis/pkg/utils/redis/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func (i *InfoSlave) UnmarshalJSON(b []byte) error {
}

type InfoReplication struct {
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
Slaves []InfoSlave `json:"-"`
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`
Slaves []InfoSlave `json:"-"`
}

type ReplicationState struct {
Expand Down Expand Up @@ -108,6 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error {
i.MasterPort = kvmap["master_host"]
i.MasterHost = kvmap["master_port"]
i.MasterLinkStatus = kvmap["master_link_status"]
i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true"

if val, ok := kvmap["binlog_file_num"]; ok {
if intval, err := strconv.ParseUint(val, 10, 64); err == nil {
Expand Down
7 changes: 7 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,10 @@ cache-lfu-decay-time: 1
#
# Example:
# rename-command : FLUSHDB 360flushdb

# [You can ignore this item]
# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage.
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
39 changes: 39 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ class PikaConf : public pstd::BaseConf {
int64_t rsync_timeout_ms() {
return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed);
}

// Slow Commands configuration
const std::string GetSlowCmd() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -825,6 +826,7 @@ class PikaConf : public pstd::BaseConf {
}

int64_t cache_maxmemory() { return cache_maxmemory_; }

void SetSlowCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
Expand All @@ -841,6 +843,40 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetInternalUsedUnFinishedFullSync(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
pstd::StringSplit2Set(lower_value, ',', internal_used_unfinished_full_sync_);
}

void AddInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.insert(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}

void RemoveInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.erase(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}

size_t GetUnfinishedFullSyncCount() {
std::shared_lock l(rwlock_);
return internal_used_unfinished_full_sync_.size();
}
void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand Down Expand Up @@ -1014,6 +1050,9 @@ class PikaConf : public pstd::BaseConf {
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
};

#endif
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class PikaReplicaManager {
void RmStatus(std::string* debug_info);
pstd::Status CheckDBRole(const std::string& table, int* role);
pstd::Status LostConnection(const std::string& ip, int port);
pstd::Status DeactivateSyncSlaveDB(const std::string& ip, int port);

// Update binlog win and try to send next binlog
pstd::Status UpdateSyncBinlogStatus(const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end);
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
timer_task_thread_.set_thread_name("TimerTaskThread");
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
timer_task_thread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
55 changes: 23 additions & 32 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,39 @@ int Setnonblocking(int sockfd) {
return flags;
}

uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
TimerTaskID TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
const std::function<void()>& task) {
TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task};
id_to_task_[new_task.task_id] = new_task;

int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}

int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {

int64_t TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
// traverse in ascending order, and exec expired tasks
for (const auto& task : exec_queue_) {
if (task.exec_ts <= now_in_ms) {
auto it = id_to_task_.find(task.id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
fired_tasks_.push_back({task.exec_ts, task.id});
now_in_ms = NowInMs();
} else {
break;
}
}

for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
Expand All @@ -69,16 +69,21 @@ int TimerTaskManager::ExecTimerTask() {
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;

if (exec_queue_.empty()) {
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
return 5000;
}

int64_t gap_between_now_and_next_task = exec_queue_.begin()->exec_ts - NowInMs();
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
return gap_between_now_and_next_task;
}
bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {

bool TimerTaskManager::DelTimerTaskByTaskId(TimerTaskID task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
if (task_to_del == id_to_task_.end()) {
Expand All @@ -87,11 +92,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
Expand All @@ -106,15 +106,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
Expand All @@ -140,9 +131,9 @@ int TimerTaskThread::StopThread() {
}

void* TimerTaskThread::ThreadMain() {
int timeout;
int32_t timeout;
while (!should_stop()) {
timeout = timer_task_manager_.ExecTimerTask();
timeout = static_cast<int32_t>(timer_task_manager_.ExecTimerTask());
net_multiplexer_->NetPoll(timeout);
}
return nullptr;
Expand Down
Loading

0 comments on commit ada3f48

Please sign in to comment.