Skip to content

Commit

Permalink
Improve code style in cluster (#2272)
Browse files Browse the repository at this point in the history
Co-authored-by: Twice <twice@apache.org>
  • Loading branch information
jihuayu and PragmaTwice committed May 1, 2024
1 parent a6d8386 commit 76757e1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 54 deletions.
96 changes: 48 additions & 48 deletions src/cluster/cluster.cc
Expand Up @@ -22,9 +22,11 @@

#include <config/config_util.h>

#include <array>
#include <cstring>
#include <fstream>
#include <memory>
#include <vector>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
Expand All @@ -37,11 +39,11 @@
#include "time_util.h"

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots)
const std::bitset<kClusterSlots> &slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}

Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
: srv_(srv), binds_(std::move(binds)), port_(port), size_(0), version_(-1), myself_(nullptr) {
: srv_(srv), binds_(std::move(binds)), port_(port) {
for (auto &slots_node : slots_nodes_) {
slots_node = nullptr;
}
Expand All @@ -53,10 +55,10 @@ Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
// cluster data, so these commands should be executed exclusively, and ReadWriteLock
// also can guarantee accessing data is safe.
bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
for (auto v : {"setnodes", "setnodeid", "setslot", "import", "reset"}) {
if (util::EqualICase(v, subcommand)) return true;
}
return false;
std::array subcommands = {"setnodes", "setnodeid", "setslot", "import", "reset"};

return std::any_of(std::begin(subcommands), std::end(subcommands),
[&subcommand](const std::string &val) { return util::EqualICase(val, subcommand); });
}

Status Cluster::SetNodeId(const std::string &node_id) {
Expand Down Expand Up @@ -170,26 +172,26 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
size_ = 0;

// Update slots to nodes
for (const auto &n : slots_nodes) {
slots_nodes_[n.first] = nodes_[n.second];
for (const auto &[slot, node_id] : slots_nodes) {
slots_nodes_[slot] = nodes_[node_id];
}

// Update replicas info and size
for (auto &n : nodes_) {
if (n.second->role == kClusterSlave) {
if (nodes_.find(n.second->master_id) != nodes_.end()) {
nodes_[n.second->master_id]->replicas.push_back(n.first);
for (const auto &[node_id, node] : nodes_) {
if (node->role == kClusterSlave) {
if (nodes_.find(node->master_id) != nodes_.end()) {
nodes_[node->master_id]->replicas.push_back(node_id);
}
}
if (n.second->role == kClusterMaster && n.second->slots.count() > 0) {
if (node->role == kClusterMaster && node->slots.count() > 0) {
size_++;
}
}

if (myid_.empty() || force) {
for (auto &n : nodes_) {
if (n.second->port == port_ && util::MatchListeningIP(binds_, n.second->host)) {
myid_ = n.first;
for (const auto &[node_id, node] : nodes_) {
if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) {
myid_ = node_id;
break;
}
}
Expand All @@ -210,9 +212,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
for (auto &it : migrated_slots_) {
if (slots_nodes_[it.first] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, it.first);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand Down Expand Up @@ -521,34 +523,32 @@ std::string Cluster::genNodesDescription() {

auto now = util::GetTimeStampMS();
std::string nodes_desc;
for (const auto &item : nodes_) {
const std::shared_ptr<ClusterNode> n = item.second;

for (const auto &[_, node] : nodes_) {
std::string node_str;
// ID, host, port
node_str.append(n->id + " ");
node_str.append(fmt::format("{}:{}@{} ", n->host, n->port, n->port + kClusterPortIncr));
node_str.append(node->id + " ");
node_str.append(fmt::format("{}:{}@{} ", node->host, node->port, node->port + kClusterPortIncr));

// Flags
if (n->id == myid_) node_str.append("myself,");
if (n->role == kClusterMaster) {
if (node->id == myid_) node_str.append("myself,");
if (node->role == kClusterMaster) {
node_str.append("master - ");
} else {
node_str.append("slave " + n->master_id + " ");
node_str.append("slave " + node->master_id + " ");
}

// Ping sent, pong received, config epoch, link status
node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));

if (n->role == kClusterMaster) {
auto iter = slots_infos.find(n->id);
if (iter != slots_infos.end() && iter->second.size() > 0) {
if (node->role == kClusterMaster) {
auto iter = slots_infos.find(node->id);
if (iter != slots_infos.end() && !iter->second.empty()) {
node_str.append(" " + iter->second);
}
}

// Just for MYSELF node to show the importing/migrating slot
if (n->id == myid_) {
if (node->id == myid_) {
if (srv_->slot_migrator) {
auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
if (migrating_slot != -1) {
Expand All @@ -567,10 +567,10 @@ std::string Cluster::genNodesDescription() {
return nodes_desc;
}

std::map<std::string, std::string> Cluster::getClusterNodeSlots() const {
std::map<std::string, std::string, std::less<>> Cluster::getClusterNodeSlots() const {
int start = -1;
// node id => slots info string
std::map<std::string, std::string> slots_infos;
std::map<std::string, std::string, std::less<>> slots_infos;

std::shared_ptr<ClusterNode> n = nullptr;
for (int i = 0; i <= kClusterSlots; i++) {
Expand Down Expand Up @@ -600,30 +600,29 @@ std::map<std::string, std::string> Cluster::getClusterNodeSlots() const {
return slots_infos;
}

std::string Cluster::genNodesInfo() {
std::string Cluster::genNodesInfo() const {
auto slots_infos = getClusterNodeSlots();

std::string nodes_info;
for (const auto &item : nodes_) {
const std::shared_ptr<ClusterNode> &n = item.second;
for (const auto &[_, node] : nodes_) {
std::string node_str;
node_str.append("node ");
// ID
node_str.append(n->id + " ");
node_str.append(node->id + " ");
// Host + Port
node_str.append(fmt::format("{} {} ", n->host, n->port));
node_str.append(fmt::format("{} {} ", node->host, node->port));

// Role
if (n->role == kClusterMaster) {
if (node->role == kClusterMaster) {
node_str.append("master - ");
} else {
node_str.append("slave " + n->master_id + " ");
node_str.append("slave " + node->master_id + " ");
}

// Slots
if (n->role == kClusterMaster) {
auto iter = slots_infos.find(n->id);
if (iter != slots_infos.end() && iter->second.size() > 0) {
if (node->role == kClusterMaster) {
auto iter = slots_infos.find(node->id);
if (iter != slots_infos.end() && !iter->second.empty()) {
node_str.append(" " + iter->second);
}
}
Expand Down Expand Up @@ -694,7 +693,7 @@ Status Cluster::LoadClusterNodes(const std::string &file_path) {
Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = util::Split(nodes_str, "\n");
if (nodes_info.size() == 0) {
if (nodes_info.empty()) {
return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
}

Expand Down Expand Up @@ -803,16 +802,17 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) { return srv_->slot_migrator->GetForbiddenSlot() == slot; }
bool Cluster::IsWriteForbiddenSlot(int slot) const { return srv_->slot_migrator->GetForbiddenSlot() == slot; }

Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
std::vector<int> keys_indexes;
auto s = redis::CommandTable::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes);

// No keys
if (!s.IsOK()) return Status::OK();
if (auto s = redis::CommandTable::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes); !s.IsOK())
return Status::OK();

if (keys_indexes.size() == 0) return Status::OK();
if (keys_indexes.empty()) return Status::OK();

int slot = -1;
for (auto i : keys_indexes) {
Expand Down
12 changes: 6 additions & 6 deletions src/cluster/cluster.h
Expand Up @@ -39,7 +39,7 @@
class ClusterNode {
public:
explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots);
const std::bitset<kClusterSlots> &slots);
std::string id;
std::string host;
int port;
Expand Down Expand Up @@ -81,7 +81,7 @@ class Cluster {
int64_t GetVersion() const { return version_; }
static bool IsValidSlot(int slot) { return slot >= 0 && slot < kClusterSlots; }
bool IsNotMaster();
bool IsWriteForbiddenSlot(int slot);
bool IsWriteForbiddenSlot(int slot) const;
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Expand All @@ -97,16 +97,16 @@ class Cluster {
private:
std::string getNodeIDBySlot(int slot) const;
std::string genNodesDescription();
std::string genNodesInfo();
std::map<std::string, std::string> getClusterNodeSlots() const;
std::string genNodesInfo() const;
std::map<std::string, std::string, std::less<>> getClusterNodeSlots() const;
SlotInfo genSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
static Status parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *srv_;
std::vector<std::string> binds_;
int port_;
int size_;
int64_t version_;
int size_ = 0;
int64_t version_ = -1;
std::string myid_;
std::shared_ptr<ClusterNode> myself_;
ClusterNodes nodes_;
Expand Down

0 comments on commit 76757e1

Please sign in to comment.