Skip to content

Commit

Permalink
Merge pull request #54201 from ClickHouse/pufit/keeper-client-reconfig
Browse files Browse the repository at this point in the history
Implementing `reconfig`, `sync`, `exists` commands for keeper-client
  • Loading branch information
pufit committed Sep 12, 2023
2 parents 38e0689 + 4f5d132 commit a54a7b7
Show file tree
Hide file tree
Showing 20 changed files with 648 additions and 494 deletions.
5 changes: 5 additions & 0 deletions docs/en/operations/utilities/clickhouse-keeper-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ A client application to interact with clickhouse-keeper by its native protocol.
- `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s.
- `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s.
- `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`.
- `--log-level=LEVEL` — Set log level. Default value: `information`.
- `--no-confirmation` — If set, will not require a confirmation on several commands. Default value `false` for interactive and `true` for query
- `--help` — Shows the help message.

## Example {#clickhouse-keeper-client-example}
Expand Down Expand Up @@ -44,6 +46,7 @@ keeper foo bar

- `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Change the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only update if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
Expand All @@ -56,3 +59,5 @@ keeper foo bar
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration
129 changes: 105 additions & 24 deletions programs/keeper-client/Commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace DB

bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand Down Expand Up @@ -42,11 +42,11 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand All @@ -64,11 +64,12 @@ void CDCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
Expand All @@ -93,11 +94,12 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co

bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
Expand Down Expand Up @@ -143,10 +145,10 @@ void TouchCommand::execute(const ASTKeeperQuery * query, KeeperClient * client)

bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -156,13 +158,28 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}

bool ExistsCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));

return true;
}

void ExistsCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->exists(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}

bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return true;

node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));
return true;
}

Expand Down Expand Up @@ -325,10 +342,10 @@ void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client)

bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -340,10 +357,10 @@ void RMCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con

bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(arg));
node->args.push_back(std::move(path));

return true;
}
Expand All @@ -355,6 +372,70 @@ void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
[client, path]{ client->zookeeper->removeRecursive(path); });
}

bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
ReconfigCommand::Operation operation;
if (ParserKeyword{"ADD"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::ADD;
else if (ParserKeyword{"REMOVE"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::REMOVE;
else if (ParserKeyword{"SET"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::SET;
else
return false;

node->args.push_back(operation);
ParserToken{TokenType::Whitespace}.ignore(pos);

String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));

return true;
}

void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
String joining;
String leaving;
String new_members;

auto operation = query->args[0].get<ReconfigCommand::Operation>();
switch (operation)
{
case static_cast<UInt8>(ReconfigCommand::Operation::ADD):
joining = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::REMOVE):
leaving = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::SET):
new_members = query->args[1].safeGet<DB::String>();
break;
default:
UNREACHABLE();
}

auto response = client->zookeeper->reconfig(joining, leaving, new_members);
std::cout << response.value << '\n';
}

bool SyncCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));

return true;
}

void SyncCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->sync(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}

bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
Expand Down
40 changes: 40 additions & 0 deletions programs/keeper-client/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ class GetCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
};

class ExistsCommand : public IKeeperClientCommand
{
String getName() const override { return "exists"; }

bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;

void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;

String getHelpMessage() const override { return "{} <path> -- Returns `1` if node exists, `0` otherwise"; }
};

class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
Expand Down Expand Up @@ -177,6 +188,35 @@ class RMRCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
};

class ReconfigCommand : public IKeeperClientCommand
{
enum class Operation : UInt8
{
ADD = 0,
REMOVE = 1,
SET = 2,
};

String getName() const override { return "reconfig"; }

bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;

void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;

String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
};

class SyncCommand: public IKeeperClientCommand
{
String getName() const override { return "sync"; }

bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;

void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;

String getHelpMessage() const override { return "{} <path> -- Synchronizes node between processes and leader"; }
};

class HelpCommand : public IKeeperClientCommand
{
String getName() const override { return "help"; }
Expand Down
Loading

0 comments on commit a54a7b7

Please sign in to comment.