Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement DFLY CLUSTER CONFIG command. #1223

Merged
merged 6 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(cluster_config_test dfly_test_lib LABELS DFLY)
cxx_test(dflycmd_test dfly_test_lib LABELS DFLY)



Expand Down
134 changes: 134 additions & 0 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern "C" {
#include "redis/crc16.h"
}

#include <jsoncons/json.hpp>
#include <shared_mutex>
#include <string_view>

Expand Down Expand Up @@ -95,6 +96,139 @@ bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
return true;
}

namespace {
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;

template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}

return obj.as<T>();
}

optional<vector<ClusterConfig::SlotRange>> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}

vector<ClusterConfig::SlotRange> ranges;

for (const auto& range : slots.array_value()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}

optional<SlotId> start = ReadNumeric<SlotId>(range.at_or_null("start"));
optional<SlotId> end = ReadNumeric<SlotId>(range.at_or_null("end"));
if (!start.has_value() || !end.has_value()) {
return nullopt;
}

ranges.push_back({.start = start.value(), .end = end.value()});
}

return ranges;
}

optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}

ClusterConfig::Node node;

{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
}

{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
}

{
auto port = ReadNumeric<uint16_t>(json.at_or_null("port"));
if (!port.has_value()) {
return nullopt;
}
node.port = port.value();
}

return node;
}

optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;

if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}

for (const auto& element : json.array_value()) {
ClusterConfig::ClusterShard shard;

if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}

auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!slots.has_value()) {
return nullopt;
}
shard.slot_ranges = std::move(slots).value();

auto master = ParseClusterNode(element.at_or_null("master"));
if (!master.has_value()) {
return nullopt;
}
shard.master = std::move(master).value();

auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}

for (const auto& replica : replicas.array_value()) {
auto node = ParseClusterNode(replica);
if (!node.has_value()) {
return nullopt;
}
shard.replicas.push_back(std::move(node).value());
}

config.push_back(std::move(shard));
}

return config;
}
} // namespace

bool ClusterConfig::SetConfig(const JsonType& json) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json);
if (!config.has_value()) {
return false;
}

return SetConfig(config.value());
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
Expand Down
7 changes: 6 additions & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string_view>
#include <vector>

#include "core/json_object.h"
#include "src/core/fibers.h"

namespace dfly {
Expand All @@ -19,6 +20,8 @@ using SlotId = uint16_t;

class ClusterConfig {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;

struct Node {
std::string id;
std::string ip;
Expand Down Expand Up @@ -61,7 +64,9 @@ class ClusterConfig {
// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const ClusterShards& new_config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to private?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I'd agree. I just tried to move it, but it would then require testing solely by passing json, which is more cumbersome. Also, I guess it's a nicer API to support both, although I agree that if it's not in use there's no point in keeping it.
Again, I'd love to keep it if only to make testing this class easier, OK?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

static constexpr SlotId kMaxSlotNum = 0x3FFF;

// Parses `json` into `ClusterShards` and calls the above overload.
bool SetConfig(const JsonType& json);

private:
struct SlotEntry {
Expand Down
175 changes: 174 additions & 1 deletion src/server/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <gmock/gmock-matchers.h>

#include <jsoncons/json.hpp>

#include "base/gtest.h"
#include "base/logging.h"

Expand All @@ -20,6 +22,12 @@ MATCHER_P(NodeMatches, expected, "") {

class ClusterConfigTest : public ::testing::Test {
protected:
JsonType ParseJson(string_view json_str) {
optional<JsonType> opt_json = JsonFromString(json_str);
CHECK(opt_json.has_value());
return opt_json.value();
}

const string kMyId = "my-id";
ClusterConfig config_{kMyId};
};
Expand Down Expand Up @@ -49,7 +57,7 @@ TEST_F(ClusterConfigTest, ConfigEmpty) {
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_FALSE(config_.SetConfig({}));
EXPECT_FALSE(config_.SetConfig(ClusterConfig::ClusterShards{}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
Expand Down Expand Up @@ -118,4 +126,169 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
}
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) {
// Note that slot_ranges is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": "0,16383",
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) {
// Note that slot_ranges.start is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": "0",
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) {
// Note that slot_ranges.end is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": "16383"
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
]
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) {
// Note that master is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": 123,
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"ip": "10.0.0.0",
"port": 8000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"port": 8000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0"
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0",
"port": 8000
}
}
])json")));
}

} // namespace dfly
Loading