Skip to content

Commit

Permalink
Add JSON output
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio2368 committed Apr 14, 2023
1 parent 32adebb commit f4363d0
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 116 deletions.
2 changes: 1 addition & 1 deletion utils/check-style/check-style
Expand Up @@ -13,7 +13,7 @@
# and then to run formatter only for the specified files.

ROOT_PATH=$(git rev-parse --show-toplevel)
EXCLUDE_DIRS='build/|integration/|widechar_width/|glibc-compatibility/|poco/|memcpy/|consistent-hashing|benchmark|tests/'
EXCLUDE_DIRS='build/|integration/|widechar_width/|glibc-compatibility/|poco/|memcpy/|consistent-hashing|benchmark|tests/|utils/keeper-bench/example.yaml'

# From [1]:
# But since array_to_string_internal() in array.c still loops over array
Expand Down
2 changes: 1 addition & 1 deletion utils/keeper-bench/CMakeLists.txt
@@ -1,2 +1,2 @@
clickhouse_add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_config_no_zookeeper_log)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_config_no_zookeeper_log ch_contrib::rapidjson)
72 changes: 54 additions & 18 deletions utils/keeper-bench/Generator.cpp
Expand Up @@ -12,6 +12,7 @@ using namespace zkutil;
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}

namespace
Expand Down Expand Up @@ -308,7 +309,7 @@ RequestGetter RequestGetter::fromConfig(const std::string & key, const Poco::Uti
auto weight = request_generator->getWeight();
use_weights |= weight != 1;
weight_sum += weight;

generators.push_back(std::move(request_generator));
}

Expand Down Expand Up @@ -575,7 +576,7 @@ Coordination::ZooKeeperRequestPtr MultiRequestGenerator::generateImpl(const Coor

if (size)
{
auto request_count = size->getNumber();
auto request_count = size->getNumber();

for (size_t i = 0; i < request_count; ++i)
ops.push_back(request_getter.getRequestGenerator()->generate(acls));
Expand Down Expand Up @@ -604,28 +605,42 @@ Generator::Generator(const Poco::Util::AbstractConfiguration & config)

static const std::string generator_key = "generator";

std::cout << "---- Parsing setup ---- " << std::endl;
std::cerr << "---- Parsing setup ---- " << std::endl;
static const std::string setup_key = generator_key + ".setup";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(setup_key, keys);
for (const auto & key : keys)
{
if (key.starts_with("node"))
{
const auto & node = root_nodes.emplace_back(parseNode(setup_key + "." + key, config));
auto node_key = setup_key + "." + key;
auto parsed_root_node = parseNode(node_key, config);
const auto node = root_nodes.emplace_back(parsed_root_node);

if (config.has(node_key + ".repeat"))
{
if (!node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key);

auto repeat_count = config.getUInt64(node_key + ".repeat");
node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
root_nodes.emplace_back(node->clone());
}

std::cerr << "Tree to create:" << std::endl;

std::cout << "Tree to create:" << std::endl;
node->dumpTree();
std::cout << std::endl;
std::cerr << std::endl;
}
}
std::cout << "---- Done parsing data setup ----\n" << std::endl;
std::cerr << "---- Done parsing data setup ----\n" << std::endl;

std::cout << "---- Collecting request generators ----" << std::endl;
std::cerr << "---- Collecting request generators ----" << std::endl;
static const std::string requests_key = generator_key + ".requests";
request_getter = RequestGetter::fromConfig(requests_key, config);
std::cout << request_getter.description() << std::endl;
std::cout << "---- Done collecting request generators ----\n" << std::endl;
std::cerr << request_getter.description() << std::endl;
std::cerr << "---- Done collecting request generators ----\n" << std::endl;
}

std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config)
Expand Down Expand Up @@ -654,6 +669,7 @@ std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, c
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key_string);

auto repeat_count = config.getUInt64(node_key_string + ".repeat");
child_node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
node->children.push_back(child_node);
}
Expand All @@ -666,10 +682,30 @@ void Generator::Node::dumpTree(int level) const
{
std::string data_string
= data.has_value() ? fmt::format("{}", data->description()) : "no data";
std::cout << fmt::format("{}name: {}, data: {}", std::string(level, '\t'), name.description(), data_string) << std::endl;

for (const auto & child : children)
std::string repeat_count_string = repeat_count != 0 ? fmt::format(", repeated {} times", repeat_count) : "";

std::cerr << fmt::format("{}name: {}, data: {}{}", std::string(level, '\t'), name.description(), data_string, repeat_count_string) << std::endl;

for (auto it = children.begin(); it != children.end();)
{
const auto & child = *it;
child->dumpTree(level + 1);
std::advance(it, child->repeat_count != 0 ? child->repeat_count : 1);
}
}

std::shared_ptr<Generator::Node> Generator::Node::clone() const
{
auto new_node = std::make_shared<Node>();
new_node->name = name;
new_node->data = data;
new_node->repeat_count = repeat_count;

// don't do deep copy of children because we will do clone only for root nodes
new_node->children = children;

return new_node;
}

void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const
Expand All @@ -693,21 +729,21 @@ void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std:

void Generator::startup(Coordination::ZooKeeper & zookeeper)
{
std::cout << "---- Creating test data ----" << std::endl;
std::cerr << "---- Creating test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
node->name.setString(node_name);

std::string root_path = std::filesystem::path("/") / node_name;
std::cout << "Cleaning up " << root_path << std::endl;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);

node->createNode(zookeeper, "/", default_acls);
}
std::cout << "---- Created test data ----\n" << std::endl;
std::cerr << "---- Created test data ----\n" << std::endl;

std::cout << "---- Initializing generators ----" << std::endl;
std::cerr << "---- Initializing generators ----" << std::endl;

request_getter.startup(zookeeper);
}
Expand All @@ -719,12 +755,12 @@ Coordination::ZooKeeperRequestPtr Generator::generate()

void Generator::cleanup(Coordination::ZooKeeper & zookeeper)
{
std::cout << "---- Cleaning up test data ----" << std::endl;
std::cerr << "---- Cleaning up test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
std::string root_path = std::filesystem::path("/") / node_name;
std::cout << "Cleaning up " << root_path << std::endl;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
}
}
3 changes: 3 additions & 0 deletions utils/keeper-bench/Generator.h
Expand Up @@ -180,6 +180,9 @@ class Generator
StringGetter name;
std::optional<StringGetter> data;
std::vector<std::shared_ptr<Node>> children;
size_t repeat_count = 0;

std::shared_ptr<Node> clone() const;

void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
void dumpTree(int level = 0) const;
Expand Down
49 changes: 30 additions & 19 deletions utils/keeper-bench/Runner.cpp
Expand Up @@ -5,10 +5,18 @@
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/EventNotifier.h>
#include <Common/Config/ConfigProcessor.h>
#include "IO/WriteBufferFromFile.h"

namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}

namespace DB::ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
extern const int BAD_ARGUMENTS;
}

Runner::Runner(
Expand Down Expand Up @@ -40,41 +48,41 @@ Runner::Runner(
parseHostsFromConfig(*config);
}

std::cout << "---- Run options ---- " << std::endl;
std::cerr << "---- Run options ---- " << std::endl;
static constexpr uint64_t DEFAULT_CONCURRENCY = 1;
if (concurrency_)
concurrency = *concurrency_;
else
concurrency = config->getUInt64("concurrency", DEFAULT_CONCURRENCY);
std::cout << "Concurrency: " << concurrency << std::endl;
std::cerr << "Concurrency: " << concurrency << std::endl;

static constexpr uint64_t DEFAULT_ITERATIONS = 0;
if (max_iterations_)
max_iterations = *max_iterations_;
else
max_iterations = config->getUInt64("iterations", DEFAULT_ITERATIONS);
std::cout << "Iterations: " << max_iterations << std::endl;
std::cerr << "Iterations: " << max_iterations << std::endl;

static constexpr double DEFAULT_DELAY = 1.0;
if (delay_)
delay = *delay_;
else
delay = config->getDouble("report_delay", DEFAULT_DELAY);
std::cout << "Report delay: " << delay << std::endl;
std::cerr << "Report delay: " << delay << std::endl;

static constexpr double DEFAULT_TIME_LIMIT = 1.0;
if (max_time_)
max_time = *max_time_;
else
max_time = config->getDouble("timelimit", DEFAULT_TIME_LIMIT);
std::cout << "Time limit: " << max_time << std::endl;
std::cerr << "Time limit: " << max_time << std::endl;

if (continue_on_error_)
continue_on_error = *continue_on_error_;
else
continue_on_error = config->getBool("continue_on_error", false);
std::cout << "Continue on error: " << continue_on_error << std::endl;
std::cout << "---- Run options ----\n" << std::endl;
std::cerr << "Continue on error: " << continue_on_error << std::endl;
std::cerr << "---- Run options ----\n" << std::endl;

pool.emplace(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency);
queue.emplace(concurrency);
Expand Down Expand Up @@ -173,7 +181,7 @@ void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookee
else if (response.error == Coordination::Error::ZNONODE)
{
/// remove can fail with ZNONODE because of different order of execution
/// of generated create and remove requests
/// of generated create and remove requests
/// this is okay for concurrent runs
if (dynamic_cast<const Coordination::ZooKeeperRemoveResponse *>(&response))
set_exception = false;
Expand Down Expand Up @@ -203,14 +211,14 @@ void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookee
try
{
auto response_size = future.get();
double seconds = watch.elapsedSeconds();
auto microseconds = watch.elapsedMicroseconds();

std::lock_guard lock(mutex);

if (request->isReadRequest())
info->addRead(seconds, 1, request->bytesSize() + response_size);
info->addRead(microseconds, 1, request->bytesSize() + response_size);
else
info->addWrite(seconds, 1, request->bytesSize() + response_size);
info->addWrite(microseconds, 1, request->bytesSize() + response_size);
}
catch (...)
{
Expand Down Expand Up @@ -268,7 +276,7 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
//if (i % 10000 == 0)
//{
// for (const auto & [op_num, count] : counts)
// std::cout << fmt::format("{}: {}", op_num, count) << std::endl;
// std::cerr << fmt::format("{}: {}", op_num, count) << std::endl;
//}

bool inserted = false;
Expand All @@ -285,13 +293,13 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re

if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
std::cerr << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}

if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
std::cerr << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}

Expand All @@ -300,7 +308,7 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
printNumberOfRequestsExecuted(requests_executed);

std::lock_guard lock(mutex);
report(info, concurrency);
info->report(concurrency);
delay_watch.restart();
}
}
Expand Down Expand Up @@ -350,18 +358,21 @@ void Runner::runBenchmark()
printNumberOfRequestsExecuted(requests_executed);

std::lock_guard lock(mutex);
report(info, concurrency);
info->report(concurrency);

DB::WriteBufferFromFile out("result.json");
info->writeJSON(out, concurrency);
}


void Runner::createConnections()
{
DB::EventNotifier::init();
std::cout << "---- Creating connections ---- " << std::endl;
std::cerr << "---- Creating connections ---- " << std::endl;
for (size_t connection_info_idx = 0; connection_info_idx < connection_infos.size(); ++connection_info_idx)
{
const auto & connection_info = connection_infos[connection_info_idx];
std::cout << fmt::format("Creating {} session(s) for:\n"
std::cerr << fmt::format("Creating {} session(s) for:\n"
"- host: {}\n"
"- secure: {}\n"
"- session timeout: {}ms\n"
Expand All @@ -380,7 +391,7 @@ void Runner::createConnections()
connections_to_info_map[connections.size() - 1] = connection_info_idx;
}
}
std::cout << "---- Done creating connections ----\n" << std::endl;
std::cerr << "---- Done creating connections ----\n" << std::endl;
}

std::shared_ptr<Coordination::ZooKeeper> Runner::getConnection(const ConnectionInfo & connection_info)
Expand Down
11 changes: 0 additions & 11 deletions utils/keeper-bench/Runner.h
Expand Up @@ -19,17 +19,6 @@
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;

namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}

namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
}

class Runner
{
public:
Expand Down

0 comments on commit f4363d0

Please sign in to comment.