Skip to content

Commit

Permalink
feat(cluster): Implement DFLY CLUSTER CONFIG command. (#1223)
Browse files Browse the repository at this point in the history
* fix: Lock before accessing slots_

* Implement `DFLY CLUSTER CONFIG` command.

* Move JSON parsing logic to ClusterConfig

* clang-tidy

* Rename test fixture class
  • Loading branch information
chakaz committed May 17, 2023
1 parent 5c46c3d commit 3dd6d88
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(cluster_config_test dfly_test_lib LABELS DFLY)
cxx_test(dflycmd_test dfly_test_lib LABELS DFLY)



Expand Down
134 changes: 134 additions & 0 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern "C" {
#include "redis/crc16.h"
}

#include <jsoncons/json.hpp>
#include <shared_mutex>
#include <string_view>

Expand Down Expand Up @@ -95,6 +96,139 @@ bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
return true;
}

namespace {
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;

template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}

return obj.as<T>();
}

optional<vector<ClusterConfig::SlotRange>> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}

vector<ClusterConfig::SlotRange> ranges;

for (const auto& range : slots.array_value()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}

optional<SlotId> start = ReadNumeric<SlotId>(range.at_or_null("start"));
optional<SlotId> end = ReadNumeric<SlotId>(range.at_or_null("end"));
if (!start.has_value() || !end.has_value()) {
return nullopt;
}

ranges.push_back({.start = start.value(), .end = end.value()});
}

return ranges;
}

optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}

ClusterConfig::Node node;

{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
}

{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
}

{
auto port = ReadNumeric<uint16_t>(json.at_or_null("port"));
if (!port.has_value()) {
return nullopt;
}
node.port = port.value();
}

return node;
}

optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;

if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}

for (const auto& element : json.array_value()) {
ClusterConfig::ClusterShard shard;

if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}

auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!slots.has_value()) {
return nullopt;
}
shard.slot_ranges = std::move(slots).value();

auto master = ParseClusterNode(element.at_or_null("master"));
if (!master.has_value()) {
return nullopt;
}
shard.master = std::move(master).value();

auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}

for (const auto& replica : replicas.array_value()) {
auto node = ParseClusterNode(replica);
if (!node.has_value()) {
return nullopt;
}
shard.replicas.push_back(std::move(node).value());
}

config.push_back(std::move(shard));
}

return config;
}
} // namespace

bool ClusterConfig::SetConfig(const JsonType& json) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json);
if (!config.has_value()) {
return false;
}

return SetConfig(config.value());
}

bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
Expand Down
7 changes: 6 additions & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string_view>
#include <vector>

#include "core/json_object.h"
#include "src/core/fibers.h"

namespace dfly {
Expand All @@ -19,6 +20,8 @@ using SlotId = uint16_t;

class ClusterConfig {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;

struct Node {
std::string id;
std::string ip;
Expand Down Expand Up @@ -61,7 +64,9 @@ class ClusterConfig {
// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const ClusterShards& new_config);
static constexpr SlotId kMaxSlotNum = 0x3FFF;

// Parses `json` into `ClusterShards` and calls the above overload.
bool SetConfig(const JsonType& json);

private:
struct SlotEntry {
Expand Down
175 changes: 174 additions & 1 deletion src/server/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <gmock/gmock-matchers.h>

#include <jsoncons/json.hpp>

#include "base/gtest.h"
#include "base/logging.h"

Expand All @@ -20,6 +22,12 @@ MATCHER_P(NodeMatches, expected, "") {

class ClusterConfigTest : public ::testing::Test {
protected:
JsonType ParseJson(string_view json_str) {
optional<JsonType> opt_json = JsonFromString(json_str);
CHECK(opt_json.has_value());
return opt_json.value();
}

const string kMyId = "my-id";
ClusterConfig config_{kMyId};
};
Expand Down Expand Up @@ -49,7 +57,7 @@ TEST_F(ClusterConfigTest, ConfigEmpty) {
}

TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_FALSE(config_.SetConfig({}));
EXPECT_FALSE(config_.SetConfig(ClusterConfig::ClusterShards{}));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
Expand Down Expand Up @@ -118,4 +126,169 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
}
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) {
// Note that slot_ranges is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": "0,16383",
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) {
// Note that slot_ranges.start is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": "0",
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) {
// Note that slot_ranges.end is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": "16383"
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
]
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) {
// Note that master is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": 123,
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"ip": "10.0.0.0",
"port": 8000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"port": 8000
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0"
},
"replicas": []
}
])json")));
}

TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0",
"port": 8000
}
}
])json")));
}

} // namespace dfly

0 comments on commit 3dd6d88

Please sign in to comment.