Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): SetConfig() for ClusterConfig. #1202

Merged
merged 7 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 87 additions & 17 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ extern "C" {
#include <shared_mutex>
#include <string_view>

#include "base/logging.h"
#include "cluster_config.h"

using namespace std;

namespace dfly {

bool ClusterConfig::cluster_enabled = false;

static constexpr SlotId kMaxSlotNum = 0x3FFF;

std::string_view ClusterConfig::KeyTag(std::string_view key) {
string_view ClusterConfig::KeyTag(string_view key) {
size_t start = key.find('{');
if (start == key.npos) {
return key;
Expand All @@ -25,29 +26,98 @@ std::string_view ClusterConfig::KeyTag(std::string_view key) {
return key.substr(start + 1, end - start - 1);
}

SlotId ClusterConfig::KeySlot(std::string_view key) {
std::string_view tag = KeyTag(key);
SlotId ClusterConfig::KeySlot(string_view key) {
string_view tag = KeyTag(key);
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}

ClusterConfig::ClusterConfig() {
ClusterConfig::ClusterConfig(string_view my_id) : my_id_(my_id) {
cluster_enabled = true;
AddSlots();
}

void ClusterConfig::AddSlots() {
// TODO update logic acording to config
// currently add all slots to owned slots
std::lock_guard lk{slots_mu_};
for (SlotId slot_id = 0; slot_id <= kMaxSlotNum; ++slot_id) {
owned_slots_.emplace(slot_id);
bool ClusterConfig::IsConfigValid(const vector<ClusterShard>& new_config) {
// Make sure that all slots are set exactly once.
array<bool, tuple_size<decltype(slots_)>::value> slots_found = {};
for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
if (slot_range.start > slot_range.end) {
LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add log prints so it will be easier to understand if we have a problem with the config

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a great idea!

}

for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
if (slot >= slots_found.size()) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
return false;
}

if (slots_found[slot]) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
return false;
}

slots_found[slot] = true;
}
}
}

if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
LOG(WARNING) << "Invalid cluster config: some slots were missing.";
return false;
}
return;

return true;
}

bool ClusterConfig::IsMySlot(SlotId id) {
std::shared_lock sl(slots_mu_);
return owned_slots_.contains(id);
bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
if (!IsConfigValid(new_config)) {
return false;
}

lock_guard gu(mu_);

config_ = new_config;

for (const auto& shard : config_) {
for (const auto& slot_range : shard.slot_ranges) {
bool owned_by_me =
shard.master.id == my_id_ || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id_; });
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
slots_[i] = {.shard = &shard, .owned_by_me = owned_by_me};
}
}
}

return true;
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return false;
}

return slots_[id].owned_by_me;
}

ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
shared_lock gu(mu_);

CHECK_LT(id, slots_.size()) << "Requesting a non-existing slot id " << id;
CHECK_NE(slots_[id].shard, nullptr)
<< "Calling GetMasterNodeForSlot(" << id << ") before SetConfig()";

return slots_[id].shard->master;
}

ClusterConfig::ClusterShards ClusterConfig::GetConfig() const {
shared_lock gu(mu_);

return config_;
}

} // namespace dfly
64 changes: 56 additions & 8 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,84 @@
//

#pragma once
#include <absl/container/flat_hash_set.h>

#include <absl/base/thread_annotations.h>

#include <array>
#include <optional>
#include <string_view>
#include <tuple>
#include <vector>

#include "src/core/fibers.h"

namespace dfly {

typedef uint16_t SlotId;
using SlotId = uint16_t;

class ClusterConfig {
public:
ClusterConfig();
struct Node {
std::string id;
std::string ip;
uint16_t port = 0;
};

struct SlotRange {
SlotId start = 0;
SlotId end = 0;
};

struct ClusterShard {
std::vector<SlotRange> slot_ranges;
Node master;
std::vector<Node> replicas;
};

using ClusterShards = std::vector<ClusterShard>;

explicit ClusterConfig(std::string_view my_id);

static SlotId KeySlot(std::string_view key);

static bool IsClusterEnabled() {
return cluster_enabled;
}

// If the key contains the {...} pattern, return only the part between { and }
static std::string_view KeyTag(std::string_view key);

// If key is in my slots ownership return true
bool IsMySlot(SlotId id);
bool IsMySlot(SlotId id) const;

// Returns the master configured for `id`. Returns a default-initialized `Node` if `SetConfig()`
// was never completed successfully.
Node GetMasterNodeForSlot(SlotId id) const;

ClusterShards GetConfig() const;

// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const ClusterShards& new_config);

private:
void AddSlots();
struct SlotEntry {
const ClusterShard* shard = nullptr;
bool owned_by_me = false;
};

bool IsConfigValid(const ClusterShards& new_config);

util::SharedMutex slots_mu_;
absl::flat_hash_set<SlotId> owned_slots_;
static bool cluster_enabled;
static constexpr SlotId kMaxSlotNum = 0x3FFF;

const std::string my_id_;

mutable util::SharedMutex mu_;

ClusterShards config_ ABSL_GUARDED_BY(mu_);

// This array covers the whole range of possible slots for fast access. It points into `config_`.
std::array<SlotEntry, kMaxSlotNum + 1> slots_ ABSL_GUARDED_BY(mu_) = {};
};

} // namespace dfly
95 changes: 93 additions & 2 deletions src/server/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,28 @@

#include "server/cluster/cluster_config.h"

#include <gmock/gmock-matchers.h>

#include "base/gtest.h"
#include "base/logging.h"

using namespace std;
using Node = dfly::ClusterConfig::Node;

namespace dfly {

class ClusterConfigTest : public ::testing::Test {};
MATCHER_P(NodeMatches, expected, "") {
return arg.id == expected.id && arg.ip == expected.ip && arg.port == expected.port;
}

class ClusterConfigTest : public ::testing::Test {
protected:
const string kMyId = "my-id";
ClusterConfig config_{kMyId};
};

TEST_F(ClusterConfigTest, KeyTagTest) {
std::string key = "{user1000}.following";
string key = "{user1000}.following";
ASSERT_EQ("user1000", ClusterConfig::KeyTag(key));

key = " foo{}{bar}";
Expand All @@ -27,4 +41,81 @@ TEST_F(ClusterConfigTest, KeyTagTest) {
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
}

TEST_F(ClusterConfigTest, ConfigEmpty) {
// Test that empty-initialization causes none of the slots to be owned locally.
for (SlotId i : {0, 1, 10, 100, 1'000, 10'000, 16'000, 0x3FFF}) {
EXPECT_FALSE(config_.IsMySlot(i));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test where you check that IsMySlot is true and for some false

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea!
I changed ClusterConfigTest.ConfigSetMultipleInstances to have that.

}
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_FALSE(config_.SetConfig({}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 16000}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}},
{.slot_ranges = {{.start = 0, .end = 0}},
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {}}}));
}

TEST_F(ClusterConfigTest, ConfigSetOk) {
EXPECT_TRUE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
EXPECT_THAT(config_.GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other", .ip = "192.168.0.100", .port = 7000}));
}

TEST_F(ClusterConfigTest, ConfigSetOkWithReplicas) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}}}));
EXPECT_THAT(config_.GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
}

TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 5'000}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}},
{.slot_ranges = {{.start = 5'001, .end = 10'000}},
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}},
{.slot_ranges = {{.start = 10'001, .end = 0x3FFF}},
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}}}));
{
for (int i = 0; i <= 5'000; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
EXPECT_FALSE(config_.IsMySlot(i));
}
}
{
for (int i = 5'001; i <= 10'000; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = kMyId, .ip = "192.168.0.102", .port = 7002}));
EXPECT_TRUE(config_.IsMySlot(i));
}
}
{
for (int i = 10'001; i <= 0x3FFF; ++i) {
EXPECT_THAT(config_.GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master3", .ip = "192.168.0.104", .port = 7004}));
EXPECT_FALSE(config_.IsMySlot(i));
}
}
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else if (cluster_mode == "yes") {
cluster_config_.reset(new ClusterConfig());
cluster_config_ = std::make_unique<ClusterConfig>(master_id_);
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
Expand Down