Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate mixed workload with Get, Put, Seek in db_bench #4788

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 293 additions & 0 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#endif
#include <fcntl.h>
#include <inttypes.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
Expand Down Expand Up @@ -102,6 +103,7 @@ DEFINE_string(
"compact,"
"compactall,"
"multireadrandom,"
"mixgraph,"
"readseq,"
"readtocache,"
"readreverse,"
Expand Down Expand Up @@ -932,6 +934,52 @@ DEFINE_uint64(
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
"is the global rate in bytes/second.");

// the parameters of mix_graph
DEFINE_double(key_dist_a, 0.0,
"The parameter 'a' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(key_dist_b, 0.0,
"The parameter 'b' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(value_theta, 0.0,
sagar0 marked this conversation as resolved.
Show resolved Hide resolved
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_sigma, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
sagar0 marked this conversation as resolved.
Show resolved Hide resolved
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_theta, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_sigma, 0.0,
"The parameter 'sigma' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(mix_get_ratio, 1.0,
"The ratio of Get queries of mix_graph workload");
DEFINE_double(mix_put_ratio, 0.0,
"The ratio of Put queries of mix_graph workload");
DEFINE_double(mix_seek_ratio, 0.0,
"The ratio of Seek queries of mix_graph workload");
DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
DEFINE_int64(mix_ave_kv_size, 512,
"The average key-value size of this workload");
DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
DEFINE_double(
sine_mix_rate_noise, 0.0,
"Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
DEFINE_bool(sine_mix_rate, false,
"Enable the sine QPS control on the mix workload");
DEFINE_uint64(
sine_mix_rate_interval_milliseconds, 10000,
"Interval of which the sine wave read_rate_limit is recalculated");
DEFINE_int64(mix_accesses, -1,
"The total query accesses of mix_graph workload");

DEFINE_uint64(
benchmark_read_rate_limit, 0,
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
Expand Down Expand Up @@ -2626,6 +2674,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == "mixgraph") {
method = &Benchmark::MixGraph;
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
Expand Down Expand Up @@ -4528,6 +4578,249 @@ void VerifyDBFromDB(std::string& truth_db_name) {
thread->stats.AddMessage(msg);
}

// THe reverse function of Pareto function
int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
double ret;
if (k == 0.0) {
ret = theta - sigma * std::log(u);
} else {
ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
}
return static_cast<int64_t>(ceil(ret));
}
// inversion of y=ax^b
int64_t PowerCdfInversion(double u, double a, double b) {
double ret;
ret = std::pow((u / a), (1 / b));
return static_cast<int64_t>(ceil(ret));
}

// Add the noice to the QPS
double AddNoise(double origin, double noise_ratio) {
if (noise_ratio < 0.0 || noise_ratio > 1.0) {
return origin;
}
int band_int = static_cast<int>(FLAGS_sine_a);
double delta = (rand() % band_int - band_int / 2) * noise_ratio;
if (origin + delta < 0) {
return origin;
} else {
return (origin + delta);
}
}

// decide the query type
// 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
class QueryDecider {
public:
std::vector<int> type_;
std::vector<double> ratio_;
int range_;

QueryDecider() {}
~QueryDecider() {}

Status Initiate(std::vector<double> ratio_input) {
int range_max = 1000;
double sum = 0.0;
for (auto& ratio : ratio_input) {
sum += ratio;
}
range_ = 0;
for (auto& ratio : ratio_input) {
range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
type_.push_back(range_);
ratio_.push_back(ratio / sum);
}
return Status::OK();
}

int GetType(int64_t rand_num) {
if (rand_num < 0) {
rand_num = rand_num * (-1);
}
int pos = static_cast<int>(rand_num % range_);
for (int i = 0; i < static_cast<int>(type_.size()); i++) {
if (pos < type_[i]) {
return i;
}
}
return 0;
}
};

// The graph wokrload mixed with Get, Put, Iterator
void MixGraph(ThreadState* thread) {
int64_t read = 0; // including single gets and Next of iterators
int64_t gets = 0;
int64_t puts = 0;
int64_t found = 0;
int64_t seek = 0;
int64_t seek_found = 0;
int64_t bytes = 0;
int64_t value_max = FLAGS_mix_max_value_size;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
std::vector<double> ratio;
char value_buffer[2 * value_max];
QueryDecider query;
RandomGenerator gen;
Status s;

ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
PinnableSlice pinnable_val;
ratio.push_back(FLAGS_mix_get_ratio);
ratio.push_back(FLAGS_mix_put_ratio);
ratio.push_back(FLAGS_mix_seek_ratio);
query.Initiate(ratio);

// the limit of qps initiation
if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kReadsOnly));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(write_rate));
}

Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
int64_t rand_v, key_rand, key_seed;
rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
double u = static_cast<double>(rand_v) / FLAGS_num;
key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
Random64 rand(key_seed);
key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
int query_type = query.GetType(rand_v);

// change the qps
uint64_t now = FLAGS_env->NowMicros();
uint64_t usecs_since_last;
if (now > thread->stats.GetSineInterval()) {
usecs_since_last = now - thread->stats.GetSineInterval();
} else {
usecs_since_last = 0;
}

if (usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
double mix_rate_with_noise = AddNoise(
SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
write_rate =
mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;

thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(write_rate));
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
read_rate,
FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
RateLimiter::Mode::kReadsOnly));
}
// Start the query
if (query_type == 0) {
// the Get query
gets++;
read++;
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
&pinnable_val);
} else {
pinnable_val.Reset();
s = db_with_cfh->db->Get(options,
db_with_cfh->db->DefaultColumnFamily(), key,
&pinnable_val);
}

if (s.ok()) {
found++;
bytes += key.size() + pinnable_val.size();
} else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}

if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}

} else if (query_type == 1) {
// the Put query
puts++;
int64_t value_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
if (value_size < 0) {
value_size = 10;
} else if (value_size > value_max) {
value_size = value_size % value_max;
}
s = db_with_cfh->db->Put(write_options_, key, gen.Generate(value_size));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}

if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}

} else if (query_type == 2) {
// Seek query
if (db_with_cfh->db != nullptr) {
Iterator* single_iter = nullptr;
single_iter = db_with_cfh->db->NewIterator(options);
if (single_iter != nullptr) {
single_iter->Seek(key);
seek++;
read++;
if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
seek_found++;
}
int64_t scan_length =
ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
FLAGS_iter_sigma) %
scan_len_max;
for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
Slice value = single_iter->value();
memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer)));
bytes += single_iter->key().size() + single_iter->value().size();
single_iter->Next();
assert(single_iter->status().ok());
}
}
delete single_iter;
}
}
}
char msg[100];
snprintf(msg, sizeof(msg),
"( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
" in %" PRIu64 " found)\n",
gets, puts, seek, found, read);

thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);

if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}

void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
Expand Down