Skip to content

Commit

Permalink
feat(server): SetConfig() for ClusterConfig. (#1202)
Browse files Browse the repository at this point in the history
feat(server): SetConfig() for ClusterConfig.
  • Loading branch information
chakaz committed May 14, 2023
1 parent 89b6971 commit e2bb965
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 28 deletions.
104 changes: 87 additions & 17 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ extern "C" {
#include <shared_mutex>
#include <string_view>

#include "base/logging.h"
#include "cluster_config.h"

using namespace std;

namespace dfly {

bool ClusterConfig::cluster_enabled = false;

static constexpr SlotId kMaxSlotNum = 0x3FFF;

std::string_view ClusterConfig::KeyTag(std::string_view key) {
string_view ClusterConfig::KeyTag(string_view key) {
size_t start = key.find('{');
if (start == key.npos) {
return key;
Expand All @@ -25,29 +26,98 @@ std::string_view ClusterConfig::KeyTag(std::string_view key) {
return key.substr(start + 1, end - start - 1);
}

SlotId ClusterConfig::KeySlot(std::string_view key) {
std::string_view tag = KeyTag(key);
SlotId ClusterConfig::KeySlot(string_view key) {
string_view tag = KeyTag(key);
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}

ClusterConfig::ClusterConfig() {
ClusterConfig::ClusterConfig(string_view my_id) : my_id_(my_id) {
cluster_enabled = true;
AddSlots();
}

void ClusterConfig::AddSlots() {
// TODO update logic acording to config
// currently add all slots to owned slots
std::lock_guard lk{slots_mu_};
for (SlotId slot_id = 0; slot_id <= kMaxSlotNum; ++slot_id) {
owned_slots_.emplace(slot_id);
bool ClusterConfig::IsConfigValid(const vector<ClusterShard>& new_config) {
// Make sure that all slots are set exactly once.
array<bool, tuple_size<decltype(slots_)>::value> slots_found = {};
for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
if (slot_range.start > slot_range.end) {
LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
return false;
}

for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
if (slot >= slots_found.size()) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
return false;
}

if (slots_found[slot]) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
return false;
}

slots_found[slot] = true;
}
}
}

if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
LOG(WARNING) << "Invalid cluster config: some slots were missing.";
return false;
}
return;

return true;
}

bool ClusterConfig::IsMySlot(SlotId id) {
std::shared_lock sl(slots_mu_);
return owned_slots_.contains(id);
bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
if (!IsConfigValid(new_config)) {
return false;
}

lock_guard gu(mu_);

config_ = new_config;

for (const auto& shard : config_) {
for (const auto& slot_range : shard.slot_ranges) {
bool owned_by_me =
shard.master.id == my_id_ || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id_; });
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
slots_[i] = {.shard = &shard, .owned_by_me = owned_by_me};
}
}
}

return true;
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return false;
}

return slots_[id].owned_by_me;
}

ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
shared_lock gu(mu_);

CHECK_LT(id, slots_.size()) << "Requesting a non-existing slot id " << id;
CHECK_NE(slots_[id].shard, nullptr)
<< "Calling GetMasterNodeForSlot(" << id << ") before SetConfig()";

return slots_[id].shard->master;
}

ClusterConfig::ClusterShards ClusterConfig::GetConfig() const {
shared_lock gu(mu_);

return config_;
}

} // namespace dfly
64 changes: 56 additions & 8 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,84 @@
//

#pragma once
#include <absl/container/flat_hash_set.h>

#include <absl/base/thread_annotations.h>

#include <array>
#include <optional>
#include <string_view>
#include <tuple>
#include <vector>

#include "src/core/fibers.h"

namespace dfly {

typedef uint16_t SlotId;
using SlotId = uint16_t;

class ClusterConfig {
public:
ClusterConfig();
struct Node {
std::string id;
std::string ip;
uint16_t port = 0;
};

struct SlotRange {
SlotId start = 0;
SlotId end = 0;
};

struct ClusterShard {
std::vector<SlotRange> slot_ranges;
Node master;
std::vector<Node> replicas;
};

using ClusterShards = std::vector<ClusterShard>;

explicit ClusterConfig(std::string_view my_id);

static SlotId KeySlot(std::string_view key);

static bool IsClusterEnabled() {
return cluster_enabled;
}

// If the key contains the {...} pattern, return only the part between { and }
static std::string_view KeyTag(std::string_view key);

// If key is in my slots ownership return true
bool IsMySlot(SlotId id);
bool IsMySlot(SlotId id) const;

// Returns the master configured for `id`. Returns a default-initialized `Node` if `SetConfig()`
// was never completed successfully.
Node GetMasterNodeForSlot(SlotId id) const;

ClusterShards GetConfig() const;

// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const ClusterShards& new_config);

private:
void AddSlots();
struct SlotEntry {
const ClusterShard* shard = nullptr;
bool owned_by_me = false;
};

bool IsConfigValid(const ClusterShards& new_config);

util::SharedMutex slots_mu_;
absl::flat_hash_set<SlotId> owned_slots_;
static bool cluster_enabled;
static constexpr SlotId kMaxSlotNum = 0x3FFF;

const std::string my_id_;

mutable util::SharedMutex mu_;

ClusterShards config_ ABSL_GUARDED_BY(mu_);

// This array covers the whole range of possible slots for fast access. It points into `config_`.
std::array<SlotEntry, kMaxSlotNum + 1> slots_ ABSL_GUARDED_BY(mu_) = {};
};

} // namespace dfly
95 changes: 93 additions & 2 deletions src/server/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,28 @@

#include "server/cluster/cluster_config.h"

#include <gmock/gmock-matchers.h>

#include "base/gtest.h"
#include "base/logging.h"

using namespace std;
using Node = dfly::ClusterConfig::Node;

namespace dfly {

class ClusterConfigTest : public ::testing::Test {};
MATCHER_P(NodeMatches, expected, "") {
return arg.id == expected.id && arg.ip == expected.ip && arg.port == expected.port;
}

class ClusterConfigTest : public ::testing::Test {
protected:
const string kMyId = "my-id";
ClusterConfig config_{kMyId};
};

TEST_F(ClusterConfigTest, KeyTagTest) {
std::string key = "{user1000}.following";
string key = "{user1000}.following";
ASSERT_EQ("user1000", ClusterConfig::KeyTag(key));

key = " foo{}{bar}";
Expand All @@ -27,4 +41,81 @@ TEST_F(ClusterConfigTest, KeyTagTest) {
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
}

TEST_F(ClusterConfigTest, ConfigEmpty) {
// Test that empty-initialization causes none of the slots to be owned locally.
for (SlotId i : {0, 1, 10, 100, 1'000, 10'000, 16'000, 0x3FFF}) {
EXPECT_FALSE(config_.IsMySlot(i));
}
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_FALSE(config_.SetConfig({}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 16000}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}},
{.slot_ranges = {{.start = 0, .end = 0}},
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {}}}));
}

TEST_F(ClusterConfigTest, ConfigSetOk) {
EXPECT_TRUE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
EXPECT_THAT(config_.GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other", .ip = "192.168.0.100", .port = 7000}));
}

TEST_F(ClusterConfigTest, ConfigSetOkWithReplicas) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}}}));
EXPECT_THAT(config_.GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
}

TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 5'000}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}},
{.slot_ranges = {{.start = 5'001, .end = 10'000}},
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}},
{.slot_ranges = {{.start = 10'001, .end = 0x3FFF}},
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}}}));
{
for (int i = 0; i <= 5'000; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
EXPECT_FALSE(config_.IsMySlot(i));
}
}
{
for (int i = 5'001; i <= 10'000; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = kMyId, .ip = "192.168.0.102", .port = 7002}));
EXPECT_TRUE(config_.IsMySlot(i));
}
}
{
for (int i = 10'001; i <= 0x3FFF; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master3", .ip = "192.168.0.104", .port = 7004}));
EXPECT_FALSE(config_.IsMySlot(i));
}
}
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else if (cluster_mode == "yes") {
cluster_config_.reset(new ClusterConfig());
cluster_config_ = std::make_unique<ClusterConfig>(master_id_);
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
Expand Down

0 comments on commit e2bb965

Please sign in to comment.