Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing reconfig, sync, exists commands for keeper-client #54201

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions programs/keeper-client/Commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace DB

bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand Down Expand Up @@ -42,11 +42,11 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand All @@ -64,11 +64,12 @@ void CDCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
Expand All @@ -93,11 +94,12 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co

bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
Expand Down Expand Up @@ -143,10 +145,10 @@ void TouchCommand::execute(const ASTKeeperQuery * query, KeeperClient * client)

bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -158,11 +160,11 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co

bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand Down Expand Up @@ -325,10 +327,10 @@ void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client)

bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -340,10 +342,10 @@ void RMCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -355,6 +357,49 @@ void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
[client, path]{ client->zookeeper->removeRecursive(path); });
}

bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
ReconfigCommand::Operation operation;
if (ParserKeyword{"ADD"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::ADD;
else if (ParserKeyword{"REMOVE"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::REMOVE;
else if (ParserKeyword{"SET"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::SET;
else
return false;

node->args.push_back(operation);
ParserToken{TokenType::Whitespace}.ignore(pos);

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));

return true;
}

void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
String joining;
String leaving;
String new_members;

auto operation = query->args[0].get<ReconfigCommand::Operation>();
if (operation == static_cast<Int64>(ReconfigCommand::Operation::ADD))
joining = query->args[1].safeGet<DB::String>();
else if (operation == static_cast<Int64>(ReconfigCommand::Operation::REMOVE))
leaving = query->args[1].safeGet<DB::String>();
else if (operation == static_cast<Int64>(ReconfigCommand::Operation::SET))
new_members = query->args[1].safeGet<DB::String>();
else
UNREACHABLE();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch statements?


auto response = client->zookeeper->reconfig(joining, leaving, new_members);
std::cout << response.value << '\n';
}

bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
Expand Down
17 changes: 17 additions & 0 deletions programs/keeper-client/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ class RMRCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
};

class ReconfigCommand : public IKeeperClientCommand
{
enum class Operation : Int64 {
ADD = 0,
REMOVE = 1,
SET = 2,
};

String getName() const override { return "reconfig"; }

bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;

void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;

String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigures a ZooKeeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
};

class HelpCommand : public IKeeperClientCommand
{
String getName() const override { return "help"; }
Expand Down
63 changes: 37 additions & 26 deletions programs/keeper-client/KeeperClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ std::vector<String> KeeperClient::getCompletions(const String & prefix) const

void KeeperClient::askConfirmation(const String & prompt, std::function<void()> && callback)
{
if (!ask_confirmation)
return callback();

std::cout << prompt << " Continue?\n";
need_confirmation = true;
waiting_confirmation = true;
confirmation_callback = callback;
}

Expand Down Expand Up @@ -184,6 +187,7 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<ReconfigCommand>(),
std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(),
});
Expand Down Expand Up @@ -216,17 +220,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
EventNotifier::init();
}

void KeeperClient::executeQuery(const String & query)
{
std::vector<String> queries;
boost::algorithm::split(queries, query, boost::is_any_of(";"));

for (const auto & query_text : queries)
{
if (!query_text.empty())
processQueryText(query_text);
}
}

bool KeeperClient::processQueryText(const String & text)
{
Expand All @@ -235,29 +228,44 @@ bool KeeperClient::processQueryText(const String & text)

try
{
if (need_confirmation)
if (waiting_confirmation)
{
need_confirmation = false;
waiting_confirmation = false;
if (text.size() == 1 && (text == "y" || text == "Y"))
confirmation_callback();
return true;
}

KeeperParser parser;
String message;
const char * begin = text.data();
ASTPtr res = tryParseQuery(parser, begin, begin + text.size(), message, true, "", false, 0, 0, false);
const char * end = begin + text.size();

if (!res)
while (begin < end)
{
std::cerr << message << "\n";
return true;
String message;
ASTPtr res = tryParseQuery(
parser,
begin,
end,
/* out_error_message = */ message,
/* hilite = */ true,
/* description = */ "",
/* allow_multi_statements = */ true,
/* max_query_size = */ 0,
/* max_parser_depth = */ 0,
/* skip_insignificant = */ false);

if (!res)
{
std::cerr << message << "\n";
return true;
}

auto * query = res->as<ASTKeeperQuery>();

auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}

auto * query = res->as<ASTKeeperQuery>();

auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}
catch (Coordination::Exception & err)
{
Expand Down Expand Up @@ -286,7 +294,7 @@ void KeeperClient::runInteractive()
while (true)
{
String prompt;
if (need_confirmation)
if (waiting_confirmation)
prompt = "[y/n] ";
else
prompt = cwd.string() + " :) ";
Expand Down Expand Up @@ -320,7 +328,10 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zookeeper = std::make_unique<zkutil::ZooKeeper>(zk_args);

if (config().has("query"))
executeQuery(config().getString("query"));
{
ask_confirmation = false;
processQueryText(config().getString("query"));
}
else
runInteractive();

Expand Down
4 changes: 2 additions & 2 deletions programs/keeper-client/KeeperClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class KeeperClient: public Poco::Util::Application
protected:
void runInteractive();
bool processQueryText(const String & text);
void executeQuery(const String & query);

void loadCommands(std::vector<Command> && new_commands);

Expand All @@ -61,7 +60,8 @@ class KeeperClient: public Poco::Util::Application

zkutil::ZooKeeperArgs zk_args;

bool need_confirmation = false;
bool ask_confirmation = true;
bool waiting_confirmation = false;

std::vector<String> registered_commands_and_four_letter_words;
};
Expand Down
33 changes: 12 additions & 21 deletions programs/keeper-client/Parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,34 @@ namespace DB

bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result)
{
expected.add(pos, getTokenName(TokenType::BareWord));

if (pos->type == TokenType::BareWord)
if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
{
result = String(pos->begin, pos->end);
++pos;
if (!parseIdentifierOrStringLiteral(pos, expected, result))
return false;

ParserToken{TokenType::Whitespace}.ignore(pos);
return true;
}

bool status = parseIdentifierOrStringLiteral(pos, expected, result);
ParserToken{TokenType::Whitespace}.ignore(pos);
return status;
}

bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");

if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
return parseIdentifierOrStringLiteral(pos, expected, path);

String result;
while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream)
while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream && pos->type != TokenType::Semicolon)
{
result.append(pos->begin, pos->end);
++pos;
}

ParserToken{TokenType::Whitespace}.ignore(pos);

if (result.empty())
return false;

path = result;
return true;
}

bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
return parseKeeperArg(pos, expected, path);
}

bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTKeeperQuery>();
Expand Down
Loading
Loading