Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): SetConfig() for ClusterConfig. #1202

Merged
merged 7 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 85 additions & 17 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#include <mutex>
extern "C" {
#include "redis/crc16.h"
}

#include <shared_mutex>
#include <string_view>

#include "base/logging.h"
#include "cluster_config.h"

using namespace std;

namespace dfly {

bool ClusterConfig::cluster_enabled = false;

static constexpr SlotId kMaxSlotNum = 0x3FFF;

std::string_view ClusterConfig::KeyTag(std::string_view key) {
string_view ClusterConfig::KeyTag(string_view key) {
size_t start = key.find('{');
if (start == key.npos) {
return key;
Expand All @@ -25,29 +27,95 @@ std::string_view ClusterConfig::KeyTag(std::string_view key) {
return key.substr(start + 1, end - start - 1);
}

SlotId ClusterConfig::KeySlot(std::string_view key) {
std::string_view tag = KeyTag(key);
SlotId ClusterConfig::KeySlot(string_view key) {
string_view tag = KeyTag(key);
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}

ClusterConfig::ClusterConfig() {
ClusterConfig::ClusterConfig(string_view my_id) : my_id_(my_id) {
cluster_enabled = true;
AddSlots();
}

void ClusterConfig::AddSlots() {
// TODO update logic acording to config
// currently add all slots to owned slots
std::lock_guard lk{slots_mu_};
for (SlotId slot_id = 0; slot_id <= kMaxSlotNum; ++slot_id) {
owned_slots_.emplace(slot_id);
bool ClusterConfig::IsConfigValid(const vector<ClusterShard>& new_config) {
// Make sure that all slots are set exactly once.
array<bool, tuple_size<decltype(slots_)>::value> slots_found = {};
for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
if (slot_range.start > slot_range.end) {
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add log prints so it will be easier to understand if we have a problem with the config

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a great idea!

}

for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
if (slot >= slots_found.size()) {
return false;
}

if (slots_found[slot]) {
// `slot` was already seen
return false;
}

slots_found[slot] = true;
}
}
}

if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
// Missing slot
return false;
}
return;

return true;
}

bool ClusterConfig::IsMySlot(SlotId id) {
std::shared_lock sl(slots_mu_);
return owned_slots_.contains(id);
bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
if (!IsConfigValid(new_config)) {
return false;
}

lock_guard gu(slots_mu_);

for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
bool owned_by_me =
shard.master.id == my_id_ || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id_; });
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
slots_[i] = {
.master = shard.master, .replicas = shard.replicas, .owned_by_me = owned_by_me};
}
}
}
return true;
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return false;
}

return slots_[id].owned_by_me;
}

vector<ClusterConfig::Node> ClusterConfig::GetNodesForSlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return {};
}

const SlotOwner& slot = slots_[id];
vector<ClusterConfig::Node> results;
results.reserve(slot.replicas.size() + 1);
results.push_back(slot.master);
results.insert(results.end(), slot.replicas.begin(), slot.replicas.end());
return results;
}

ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LT(id, slots_.size()) << "Requesting a non-existing slot id " << id;

return slots_[id].master;
}

} // namespace dfly
63 changes: 55 additions & 8 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,83 @@
//

#pragma once
#include <absl/container/flat_hash_set.h>

#include <absl/base/thread_annotations.h>

#include <array>
#include <optional>
#include <string_view>
#include <tuple>
#include <vector>

#include "src/core/fibers.h"

namespace dfly {

typedef uint16_t SlotId;
using SlotId = uint16_t;

class ClusterConfig {
public:
ClusterConfig();
struct Node {
std::string id;
std::string ip;
uint16_t port = 0;
};

struct SlotRange {
SlotId start = 0;
SlotId end = 0;
};

struct ClusterShard {
std::vector<SlotRange> slot_ranges;
Node master;
std::vector<Node> replicas;
};

explicit ClusterConfig(std::string_view my_id);

static SlotId KeySlot(std::string_view key);

static bool IsClusterEnabled() {
return cluster_enabled;
}

// If the key contains the {...} pattern, return only the part between { and }
static std::string_view KeyTag(std::string_view key);

// If key is in my slots ownership return true
bool IsMySlot(SlotId id);
bool IsMySlot(SlotId id) const;

// Returns nodes that own `id`. Result will always have the first element set to be the master,
// and the following 0 or more elements are the replicas.
std::vector<Node> GetNodesForSlot(SlotId id) const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the flow we will need this api for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLUSTER SHARDS for example

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed per the new API


// Returns the master configured for `id`. Returns a default-initialized `Node` if `SetConfig()`
// was never completed successfully.
Node GetMasterNodeForSlot(SlotId id) const;

// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const std::vector<ClusterShard>& new_config);

private:
void AddSlots();
struct SlotOwner {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this implementation we are using *16384 space to save the data ,instead saving it only once as the config format, and we will need to have a logic to convert from the slot array to the config format again to be able to reply to the CLISTER SHARDS command.

If you would create a member :
std::vector config_;
Which you will set on Set config command and this will be used to reply CLUSTER SHARDS
The SlotOwner struct can be
{
string_view ip;
string_view port;
bool owned_by_me;
}

In this implementation if we have one master and one replica, the data will be saved in the config_ member, the slot array will point to the config data.
In the current implementation we are saving the master and the replica data *16384 times

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that it's a matter of what we're optimizing for. The current optimization is aimed for fast "get owners for slot" questions, such as MOVED replies.
If we're looking to save memory, we could consider using shared_ptr as the value of the internal map. It has the downside of using atomics, but I guess it's not terrible since it's only going to take effect during SetConfig() (I think).
But it's true that the current implementation requires more logic to build the reply of CLUSTER SHARDS and CLUSTER SLOTS. Not much more logic, essentially building a dictionary by iterating all entries (that'll actually be simpler to implement if we moved to shared_ptr as we'll be able to compare Nodes). But still, we could avoid compressing ranges.
Do you suggest having 2 members (slots_ and config_, where slots_ will have 16k entries and will point into config_)? Because that will keep the 16k times data duplication, but without it I don't think we can guarantee O(1) computational time for MOVED replies.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should optimise for "get owners for slot" as it is in the flow of client database queries.
But why does my suggestion guarantee O(1) computational time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I kept 2 members and updated the code according to your suggestion. PTAL.

Node master;
std::vector<Node> replicas;
bool owned_by_me = false;
};

bool IsConfigValid(const std::vector<ClusterShard>& new_config);

util::SharedMutex slots_mu_;
absl::flat_hash_set<SlotId> owned_slots_;
static bool cluster_enabled;
static constexpr SlotId kMaxSlotNum = 0x3FFF;

const std::string my_id_;

mutable util::SharedMutex slots_mu_;

// This array covers the whole range of possible slots.
std::array<SlotOwner, kMaxSlotNum + 1> slots_ ABSL_GUARDED_BY(slots_mu_) = {};
};

} // namespace dfly
109 changes: 107 additions & 2 deletions src/server/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,31 @@

#include "server/cluster/cluster_config.h"

#include <gmock/gmock-matchers.h>

#include "base/gtest.h"
#include "base/logging.h"

using namespace std;
using testing::Pointwise;
using Node = dfly::ClusterConfig::Node;

namespace dfly {

class ClusterConfigTest : public ::testing::Test {};
MATCHER(NodeMatches, "") {
auto first = std::get<0>(arg);
auto second = std::get<1>(arg);
return first.id == second.id && first.ip == second.ip && first.port == second.port;
}

class ClusterConfigTest : public ::testing::Test {
protected:
static constexpr string_view kMyId = "my-id";
ClusterConfig config_{kMyId};
};

TEST_F(ClusterConfigTest, KeyTagTest) {
std::string key = "{user1000}.following";
string key = "{user1000}.following";
ASSERT_EQ("user1000", ClusterConfig::KeyTag(key));

key = " foo{}{bar}";
Expand All @@ -27,4 +44,92 @@ TEST_F(ClusterConfigTest, KeyTagTest) {
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
}

TEST_F(ClusterConfigTest, ConfigEmpty) {
// Test that empty-initialization causes none of the slots to be owned locally.
for (SlotId i : {0, 1, 10, 100, 1'000, 10'000, 16'000, 0x3FFF}) {
EXPECT_FALSE(config_.IsMySlot(i));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test where you check that IsMySlot is true and for some false

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea!
I changed ClusterConfigTest.ConfigSetMultipleInstances to have that.

}
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
// Test that empty config means all slots are owned locally.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment

EXPECT_FALSE(config_.SetConfig({}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 16000}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
vector<Node> expected = {{}}; // 1 empty master, no replicas
EXPECT_THAT(config_.GetNodesForSlot(0), Pointwise(NodeMatches(), expected));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_FALSE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}},
{.slot_ranges = {{.start = 0, .end = 0}},
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {}}}));
vector<Node> expected = {{}}; // 1 empty master, no replicas
EXPECT_THAT(config_.GetNodesForSlot(0), Pointwise(NodeMatches(), expected));
}

TEST_F(ClusterConfigTest, ConfigSetOk) {
EXPECT_TRUE(config_.SetConfig({{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {}}}));
std::vector<Node> expected = {
ClusterConfig::Node{.id = "other", .ip = "192.168.0.100", .port = 7000}};
EXPECT_THAT(config_.GetNodesForSlot(0), Pointwise(NodeMatches(), expected));
}

TEST_F(ClusterConfigTest, ConfigSetOkWithReplicas) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}}}));
std::vector<Node> expected = {
ClusterConfig::Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000},
ClusterConfig::Node{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}};
EXPECT_THAT(config_.GetNodesForSlot(0), Pointwise(NodeMatches(), expected));
}

TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
EXPECT_TRUE(config_.SetConfig(
{{.slot_ranges = {{.start = 0, .end = 5'000}},
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}},
{.slot_ranges = {{.start = 5'001, .end = 10'000}},
.master = {.id = "other-master2", .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}},
{.slot_ranges = {{.start = 10'001, .end = 0x3FFF}},
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}}}));
{
std::vector<Node> expected = {
ClusterConfig::Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000},
ClusterConfig::Node{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}};
for (int i = 0; i <= 5'000; ++i) {
EXPECT_THAT(config_.GetNodesForSlot(i), Pointwise(NodeMatches(), expected));
}
}
{
std::vector<Node> expected = {
ClusterConfig::Node{.id = "other-master2", .ip = "192.168.0.102", .port = 7002},
ClusterConfig::Node{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}};
for (int i = 5'001; i <= 10'000; ++i) {
EXPECT_THAT(config_.GetNodesForSlot(i), Pointwise(NodeMatches(), expected));
}
}
{
std::vector<Node> expected = {
ClusterConfig::Node{.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
ClusterConfig::Node{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}};
for (int i = 10'001; i <= 0x3FFF; ++i) {
EXPECT_THAT(config_.GetNodesForSlot(i), Pointwise(NodeMatches(), expected));
}
}
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else if (cluster_mode == "yes") {
cluster_config_.reset(new ClusterConfig());
cluster_config_ = std::make_unique<ClusterConfig>(master_id_);
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
Expand Down