Skip to content

Commit

Permalink
fix several memory leaks in cbc-gen
Browse files Browse the repository at this point in the history
Change-Id: I9404903129b40b5a5ece40191f4a1960ede4816e
Reviewed-on: http://review.couchbase.org/c/libcouchbase/+/139679
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information
avsej committed Nov 5, 2020
1 parent d76b5e1 commit 57a7ca9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 60 deletions.
2 changes: 1 addition & 1 deletion src/hdr_timings.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ lcb_HISTOGRAM *lcb_histogram_create(void)
LCB_INTERNAL_API
void lcb_histogram_destroy(lcb_HISTOGRAM *hg)
{
free(hg->hdr_histogram);
hdr_close(hg->hdr_histogram);
free(hg);
}

Expand Down
121 changes: 69 additions & 52 deletions tools/cbc-gen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

#include "config.h"
#include <signal.h>
#include <csignal>
#include <iostream>
#include <map>
#include <cstdio>
Expand Down Expand Up @@ -57,8 +57,6 @@ static void do_or_die(lcb_STATUS rc, const std::string &msg = "")
}
}

static Histogram hg;

class Configuration
{
public:
Expand Down Expand Up @@ -119,8 +117,8 @@ class KeyGenerator
class DistributedKeyGenerator : public KeyGenerator
{
private:
std::vector< std::string > key_pool;
std::vector< std::string >::const_iterator itr;
std::vector<std::string> key_pool;
std::vector<std::string>::const_iterator itr;

public:
explicit DistributedKeyGenerator(lcb_INSTANCE *instance, const std::string &prefix = "key_",
Expand All @@ -132,10 +130,10 @@ class DistributedKeyGenerator : public KeyGenerator
if (num_vbuckets == 0) {
throw std::runtime_error("The configuration does not contain any vBuckets");
}
std::vector< std::vector< std::string > > key_groups(num_vbuckets);
std::vector<std::vector<std::string>> key_groups(num_vbuckets);
size_t left = num_keys_per_vbucket * num_vbuckets;
size_t i = 0;
while (left > 0 && i < std::numeric_limits< size_t >::max()) {
while (left > 0 && i < std::numeric_limits<size_t>::max()) {
std::stringstream ss;
ss << prefix << std::setw(8) << std::setfill('0') << i++;
std::string key = ss.str();
Expand Down Expand Up @@ -186,8 +184,8 @@ size_t value_size_max = 128;
class BoundedValueGenerator : public ValueGenerator
{
private:
std::vector< std::string > value_pool;
std::vector< std::string >::const_iterator itr;
std::vector<std::string> value_pool;
std::vector<std::string>::const_iterator itr;

public:
explicit BoundedValueGenerator(size_t minimum_size = value_size_min, size_t maximum_size = value_size_max,
Expand All @@ -204,7 +202,7 @@ class BoundedValueGenerator : public ValueGenerator
}
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
auto rnd = std::default_random_engine(seed);
std::uniform_int_distribution< size_t > dist(minimum_size, maximum_size);
std::uniform_int_distribution<size_t> dist(minimum_size, maximum_size);

for (size_t idx = 0; idx < pool_size; idx++) {
size_t value_size = dist(rnd) - 12;
Expand Down Expand Up @@ -275,10 +273,10 @@ class Workload

private:
std::default_random_engine rnd;
std::uniform_int_distribution< unsigned int > dist;
std::uniform_int_distribution<unsigned int> dist;

typedef std::pair< op_type, unsigned int > op;
std::vector< op > weights_;
typedef std::pair<op_type, unsigned int> op;
std::vector<op> weights_;
};

class Worker;
Expand Down Expand Up @@ -308,21 +306,21 @@ extern "C" {
void store_callback(lcb_INSTANCE *, int, lcb_RESPSTORE *resp)
{
Stats *stats = nullptr;
lcb_respstore_cookie(resp, reinterpret_cast< void ** >(&stats));
lcb_respstore_cookie(resp, reinterpret_cast<void **>(&stats));
stats->writes++;
stats->total++;
}
void get_callback(lcb_INSTANCE *, int, lcb_RESPGET *resp)
{
Stats *stats = nullptr;
lcb_respget_cookie(resp, reinterpret_cast< void ** >(&stats));
lcb_respget_cookie(resp, reinterpret_cast<void **>(&stats));
stats->reads++;
stats->total++;
}
void remove_callback(lcb_INSTANCE *, int, lcb_RESPREMOVE *resp)
{
Stats *stats = nullptr;
lcb_respremove_cookie(resp, reinterpret_cast< void ** >(&stats));
lcb_respremove_cookie(resp, reinterpret_cast<void **>(&stats));
stats->deletes++;
stats->total++;
}
Expand All @@ -339,7 +337,6 @@ class Worker
: is_running(false), instance(nullptr), io_thr(nullptr), gen_thr(nullptr), keygen(nullptr), valgen(nullptr)
{
lcb_CREATEOPTS *cropts = nullptr;
memset(&cropts, 0, sizeof cropts);
config.fillCropts(cropts);
do_or_die(lcb_create(&instance, cropts), "Failed to create connection");
lcb_createopts_destroy(cropts);
Expand Down Expand Up @@ -371,6 +368,7 @@ class Worker

~Worker()
{
stop();
if (instance) {
if (config.shouldDump()) {
lcb_dump(instance, stderr, LCB_DUMP_ALL);
Expand Down Expand Up @@ -409,72 +407,82 @@ class Worker
void stop()
{
is_running = false;
gen_thr->join();
io_thr->join();
join();
delete io_thr;
io_thr = nullptr;
delete gen_thr;
gen_thr = nullptr;
}

void join()
void join() const
{
gen_thr->join();
io_thr->join();
if (gen_thr != nullptr && gen_thr->joinable()) {
gen_thr->join();
}
if (io_thr != nullptr && io_thr->joinable()) {
io_thr->join();
}
}

void push_batch(std::list< std::pair< Workload::op_type, lcb_CMDBASE * > > &batch)
void push_batch(std::list<std::pair<Workload::op_type, void *>> &batch)
{
std::unique_lock< std::mutex > lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
for (auto &cmd : batch) {
list_.push_back(cmd);
list_.emplace_back(cmd);
}
}

bool want_more()
{
std::unique_lock< std::mutex > lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
return list_.size() < batch_size;
}

void flush()
std::size_t flush()
{
std::unique_lock< std::mutex > lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
if (list_.empty()) {
return;
return 0;
}

std::size_t scheduled = 0;
lcb_sched_enter(instance);
for (auto &entry : list_) {
lcb_STATUS rc = LCB_SUCCESS;
switch (entry.first) {
case Workload::op_write: {
auto *cmd = reinterpret_cast< lcb_CMDSTORE * >(entry.second);
rc = lcb_store(instance, reinterpret_cast< void * >(&stats_), cmd);
auto *cmd = reinterpret_cast<lcb_CMDSTORE *>(entry.second);
rc = lcb_store(instance, reinterpret_cast<void *>(&stats_), cmd);
lcb_cmdstore_destroy(cmd);
} break;
case Workload::op_read: {
auto *cmd = reinterpret_cast< lcb_CMDGET * >(entry.second);
rc = lcb_get(instance, reinterpret_cast< void * >(&stats_), cmd);
auto *cmd = reinterpret_cast<lcb_CMDGET *>(entry.second);
rc = lcb_get(instance, reinterpret_cast<void *>(&stats_), cmd);
lcb_cmdget_destroy(cmd);
} break;
case Workload::op_delete: {
auto *cmd = reinterpret_cast< lcb_CMDREMOVE * >(entry.second);
rc = lcb_remove(instance, reinterpret_cast< void * >(&stats_), cmd);
auto *cmd = reinterpret_cast<lcb_CMDREMOVE *>(entry.second);
rc = lcb_remove(instance, reinterpret_cast<void *>(&stats_), cmd);
lcb_cmdremove_destroy(cmd);
} break;
}
if (rc != LCB_SUCCESS) {
lcb_sched_fail(instance);
break;
}
++scheduled;
}
lcb_sched_leave(instance);
list_.clear();
return scheduled;
}

const Stats &stats()
{
return stats_;
}

size_t total_ops()
size_t total_ops() const
{
return stats_.total;
}
Expand All @@ -489,11 +497,13 @@ class Worker
static int next_id;

std::mutex mutex_;
std::list< std::pair< Workload::op_type, lcb_CMDBASE * > > list_;
std::list<std::pair<Workload::op_type, void *>> list_;

KeyGenerator *keygen;
ValueGenerator *valgen;
Stats stats_;

Histogram hg{};
};
int Worker::next_id = 0;

Expand Down Expand Up @@ -528,7 +538,7 @@ void generator_loop(Worker *worker, size_t num_items)
{
bool has_limit = num_items > 0;
size_t items_left = num_items;
std::list< std::pair< Workload::op_type, lcb_CMDBASE * > > batch;
std::list<std::pair<Workload::op_type, void *>> batch;

while (worker->is_running) {
if (has_limit && items_left == 0) {
Expand All @@ -550,7 +560,7 @@ void generator_loop(Worker *worker, size_t num_items)
lcb_cmdstore_key(cmd, key.data(), key.size());
lcb_cmdstore_value(cmd, value.data(), value.size());
lcb_cmdstore_durability(cmd, durability_level);
batch.emplace_back(type, reinterpret_cast< lcb_CMDBASE * >(cmd));
batch.emplace_back(type, reinterpret_cast<void *>(cmd));
} break;
case Workload::op_read: {
lcb_CMDGET *cmd = nullptr;
Expand All @@ -560,7 +570,7 @@ void generator_loop(Worker *worker, size_t num_items)
}
const std::string &key = worker->next_key();
lcb_cmdget_key(cmd, key.data(), key.size());
batch.emplace_back(type, reinterpret_cast< lcb_CMDBASE * >(cmd));
batch.emplace_back(type, reinterpret_cast<void *>(cmd));
} break;
case Workload::op_delete: {
lcb_CMDREMOVE *cmd = nullptr;
Expand All @@ -571,7 +581,7 @@ void generator_loop(Worker *worker, size_t num_items)
const std::string &key = worker->next_key();
lcb_cmdremove_key(cmd, key.data(), key.size());
lcb_cmdremove_durability(cmd, durability_level);
batch.emplace_back(type, reinterpret_cast< lcb_CMDBASE * >(cmd));
batch.emplace_back(type, reinterpret_cast<void *>(cmd));
} break;
}
items_left--;
Expand All @@ -586,7 +596,7 @@ void generator_loop(Worker *worker, size_t num_items)
}
}

std::map< std::string, Worker * > workers;
std::map<std::string, Worker *> workers;

static const char *handlers_sorted[] = {"help", // HelpHandler
"create", // CreateHandler
Expand All @@ -613,19 +623,17 @@ static void command_completion(const char *buf, linenoiseCompletions *lc)
}

struct bm_COMMAND {
std::string name;
std::vector< std::string > args;
std::map< std::string, std::string > options;

bm_COMMAND() : name(""), args(), options() {}
std::string name{};
std::vector<std::string> args{};
std::map<std::string, std::string> options{};
};

namespace gen
{
class Handler;
}

static std::map< std::string, gen::Handler * > handlers;
static std::map<std::string, gen::Handler *> handlers;

namespace gen
{
Expand All @@ -651,18 +659,21 @@ class Handler
}

virtual ~Handler() = default;

virtual const char *description() const
{
return nullptr;
}

virtual const char *usagestr() const
{
return nullptr;
}

virtual void execute(bm_COMMAND &cmd) = 0;

protected:
std::string cmdname;
std::string cmdname{};
};

class HelpHandler : public Handler
Expand Down Expand Up @@ -728,7 +739,7 @@ class CreateHandler : public Handler
protected:
void execute(bm_COMMAND &) override
{
Worker *worker = new Worker();
auto *worker = new Worker();
workers[worker->id] = worker;
std::cout << "# worker " << worker->id << " has been created and connected" << std::endl;
}
Expand All @@ -747,7 +758,7 @@ class DestroyHandler : public Handler
std::string id = wpair.first;
delete wpair.second;
workers.erase(id);
std::cout << "# worker " << wpair.first << " has been destroyed" << std::endl;
std::cout << "# worker " << id << " has been destroyed" << std::endl;
}
}
};
Expand Down Expand Up @@ -1013,6 +1024,11 @@ static void cleanup()
bm_COMMAND cmd;
handlers["stop"]->execute(cmd);
handlers["destroy"]->execute(cmd);

for (const auto &handler : handlers) {
delete handler.second;
}
handlers.clear();
}

static void sigint_handler(int)
Expand Down Expand Up @@ -1051,7 +1067,7 @@ static void real_main(int argc, char **argv)
FILE *finput = stdin;
if (!config.scriptPath().empty()) {
finput = fopen(config.scriptPath().c_str(), "r");
if (finput == NULL) {
if (finput == nullptr) {
perror("unable to open script file");
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -1089,6 +1105,7 @@ static void real_main(int argc, char **argv)
}
if (strlen(line) == 0 || line[0] == '#') {
/* ignore empty lines and comments */
linenoiseFree(line);
continue;
}
if (finput == stdin && isatty(fileno(stdin))) {
Expand Down
Loading

0 comments on commit 57a7ca9

Please sign in to comment.