Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement DFLY CLUSTER CONFIG command. #1223

Merged
merged 6 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(cluster_config_test dfly_test_lib LABELS DFLY)
cxx_test(dflycmd_test dfly_test_lib LABELS DFLY)



Expand Down
160 changes: 160 additions & 0 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/strip.h>

#include <jsoncons/json.hpp>
#include <limits>
#include <optional>

#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
Expand Down Expand Up @@ -370,15 +376,169 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
}
}

namespace {
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;

template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}

return obj.as<T>();
}

optional<vector<ClusterConfig::SlotRange>> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}

vector<ClusterConfig::SlotRange> ranges;

for (const auto& range : slots.array_value()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}

optional<SlotId> start = ReadNumeric<SlotId>(range.at_or_null("start"));
optional<SlotId> end = ReadNumeric<SlotId>(range.at_or_null("end"));
if (!start.has_value() || !end.has_value()) {
return nullopt;
}

ranges.push_back({.start = start.value(), .end = end.value()});
}

return ranges;
}

optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}

ClusterConfig::Node node;

{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
}

{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
}

{
auto port = ReadNumeric<uint16_t>(json.at_or_null("port"));
if (!port.has_value()) {
return nullopt;
}
node.port = port.value();
}

return node;
}

optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;

if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}

for (const auto& element : json.array_value()) {
ClusterConfig::ClusterShard shard;

if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}

auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!slots.has_value()) {
return nullopt;
}
shard.slot_ranges = std::move(slots).value();

auto master = ParseClusterNode(element.at_or_null("master"));
if (!master.has_value()) {
return nullopt;
}
shard.master = std::move(master).value();

auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}

for (const auto& replica : replicas.array_value()) {
auto node = ParseClusterNode(replica);
if (!node.has_value()) {
return nullopt;
}
shard.replicas.push_back(std::move(node).value());
}

config.push_back(std::move(shard));
}

return config;
}
} // namespace

void DflyCmd::ClusterConfig(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();

if (args.size() != 3) {
return rb->SendError(kSyntaxErr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out this error type - WrongNumArgsError

}

string_view json_str = ArgS(args, 2);
optional<JsonType> json = JsonFromString(json_str);
if (!json.has_value()) {
LOG(WARNING) << kInvalidConfigPrefix << "Can't parse JSON " << json_str;
return rb->SendError(kSyntaxErr);
Copy link
Collaborator

@adiholden adiholden May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better return more details in the error here
rb->SendError(kInvalidConfigPrefix, kSyntaxErrType)

}

optional<ClusterConfig::ClusterShards> config = BuildClusterConfigFromJson(json.value());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it makes sense that the function BuildClusterConfigFromJson impl will be inside the ClusterConfig class
i.e change SetConfig to get json object, and all the logic to parse the json will be inside ClusterConfig

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I actually have thought about this before making the change and (originally) thought that it made sense within dflycmd.cc, but following your comment I'm convinced that it's probably better to be part of ClusterConfig' API.
So as you requested, I moved it.

if (!config.has_value()) {
return rb->SendError(kSyntaxErr);
}

if (!sf_->cluster_config()->SetConfig(config.value())) {
return rb->SendError("Invalid cluster configuration.");
}

return rb->SendOk();
}

void DflyCmd::ClusterManagmentCmd(CmdArgList args, ConnectionContext* cntx) {
if (!ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError("DFLY CLUSTER commands requires --cluster_mode=yes");
}
CHECK_NE(sf_->cluster_config(), nullptr);

// TODO check admin port
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "GETSLOTINFO") {
return ClusterGetSlotInfo(args, cntx);
} else if (sub_cmd == "CONFIG") {
return ClusterConfig(args, cntx);
}

return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLY CLUSTER"), kSyntaxErrType);
Expand Down
3 changes: 3 additions & 0 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class DflyCmd {
// CLUSTER GETSLOTINFO command
void ClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);

// CLUSTER CONFIG command
void ClusterConfig(CmdArgList args, ConnectionContext* cntx);

// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

Expand Down