Skip to content

Commit

Permalink
Fixed last second metric rolling. Fixed totals json section missing t…
Browse files Browse the repository at this point in the history
…he latency quantiles (#251)
  • Loading branch information
filipecosta90 authored Apr 11, 2024
1 parent b2a4041 commit 9c0238a
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 23 deletions.
1 change: 0 additions & 1 deletion client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ void client::handle_response(unsigned int conn_id, struct timeval timestamp,
m_connections[conn_id]->get_readable_id(),
response->get_status());
}

switch (request->m_type) {
case rt_get:
m_stats.update_get_op(&timestamp,
Expand Down
75 changes: 53 additions & 22 deletions run_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,30 @@ void run_stats::set_end_time(struct timeval* end_time)
end_time = &tv;
}
m_end_time = *end_time;
summarize_current_second();
m_stats.push_back(m_cur_stats);
}

void run_stats::summarize_current_second(){
m_cur_stats.m_get_cmd.summarize_quantiles(inst_m_get_latency_histogram,quantiles_list);
m_cur_stats.m_set_cmd.summarize_quantiles(inst_m_set_latency_histogram,quantiles_list);
m_cur_stats.m_wait_cmd.summarize_quantiles(inst_m_wait_latency_histogram,quantiles_list);
m_cur_stats.m_total_cmd.summarize_quantiles(inst_m_totals_latency_histogram,quantiles_list);
for (unsigned int i=0; i<m_cur_stats.m_ar_commands.size(); i++) {
m_cur_stats.m_ar_commands[i].summarize_quantiles(inst_m_ar_commands_latency_histograms[i],quantiles_list);
hdr_reset(inst_m_ar_commands_latency_histograms[i]);
}
hdr_reset(inst_m_get_latency_histogram);
hdr_reset(inst_m_set_latency_histogram);
hdr_reset(inst_m_wait_latency_histogram);
hdr_reset(inst_m_totals_latency_histogram);
}

void run_stats::roll_cur_stats(struct timeval* ts)
{
const unsigned int sec = ts_diff(m_start_time, *ts) / 1000000;
if (sec > m_cur_stats.m_second) {
m_cur_stats.m_get_cmd.summarize_quantiles(inst_m_get_latency_histogram,quantiles_list);
m_cur_stats.m_set_cmd.summarize_quantiles(inst_m_set_latency_histogram,quantiles_list);
m_cur_stats.m_wait_cmd.summarize_quantiles(inst_m_wait_latency_histogram,quantiles_list);
for (unsigned int i=0; i<m_cur_stats.m_ar_commands.size(); i++) {
m_cur_stats.m_ar_commands[i].summarize_quantiles(inst_m_ar_commands_latency_histograms[i],quantiles_list);
hdr_reset(inst_m_ar_commands_latency_histograms[i]);
}
hdr_reset(inst_m_get_latency_histogram);
hdr_reset(inst_m_set_latency_histogram);
hdr_reset(inst_m_wait_latency_histogram);
summarize_current_second();
m_stats.push_back(m_cur_stats);
m_cur_stats.reset(sec);
}
Expand All @@ -176,104 +183,134 @@ void run_stats::update_get_op(struct timeval* ts, unsigned int bytes_rx, unsigne
{
roll_cur_stats(ts);
m_cur_stats.m_get_cmd.update_op(bytes_rx, bytes_tx, latency, hits, misses);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency, hits, misses);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_get_latency_histogram,latency);
hdr_record_value(inst_m_get_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_set_op(struct timeval* ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_set_cmd.update_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_set_latency_histogram,latency);
hdr_record_value(inst_m_set_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_moved_get_op(struct timeval* ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_get_cmd.update_moved_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_get_latency_histogram,latency);
hdr_record_value(inst_m_get_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_moved_set_op(struct timeval* ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_set_cmd.update_moved_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_moved_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_set_latency_histogram,latency);
hdr_record_value(inst_m_set_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_moved_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_ask_get_op(struct timeval* ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_get_cmd.update_ask_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_ask_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_get_latency_histogram,latency);
hdr_record_value(inst_m_get_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_ask_set_op(struct timeval* ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_set_cmd.update_ask_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_ask_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);
hdr_record_value(m_set_latency_histogram,latency);
hdr_record_value(inst_m_set_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_ask_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_ask_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_wait_op(struct timeval *ts, unsigned int latency)
{
roll_cur_stats(ts);

m_cur_stats.m_wait_cmd.update_op(0,0, latency);
m_cur_stats.m_total_cmd.update_op(0,0, latency);
m_totals.update_op(0,0, latency);
hdr_record_value(m_wait_latency_histogram,latency);
hdr_record_value(inst_m_wait_latency_histogram,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

void run_stats::update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_op(bytes_rx, bytes_tx, latency);
m_cur_stats.m_total_cmd.update_op(bytes_rx, bytes_tx, latency);
m_totals.update_op(bytes_rx, bytes_tx, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
struct hdr_histogram* inst_hist = inst_m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
hdr_record_value(inst_hist,latency);
hdr_record_value(m_totals_latency_histogram,latency);
hdr_record_value(inst_m_totals_latency_histogram,latency);
}

unsigned int run_stats::get_duration(void)
Expand Down Expand Up @@ -384,17 +421,9 @@ std::vector<one_sec_cmd_stats> run_stats::get_one_sec_cmd_stats_wait() {
std::vector<one_sec_cmd_stats> run_stats::get_one_sec_cmd_stats_totals() {
std::vector<one_sec_cmd_stats> result;
result.reserve(m_stats.size());
for (std::list<one_second_stats>::iterator i = m_stats.begin(); i != m_stats.end(); ++i)
{
one_second_stats current_second_stats = *i;
one_sec_cmd_stats total_stat = one_sec_cmd_stats(current_second_stats.m_get_cmd);
total_stat.merge(current_second_stats.m_set_cmd);
total_stat.merge(current_second_stats.m_wait_cmd);
for (size_t j = 0; j < current_second_stats.m_ar_commands.size(); j++)
{
total_stat.merge(current_second_stats.m_ar_commands.at(j));
}
result.push_back(total_stat);
for (std::list<one_second_stats>::iterator i = m_stats.begin();
i != m_stats.end(); i++) {
result.push_back(i->m_total_cmd);
}
return result;
}
Expand Down Expand Up @@ -734,6 +763,7 @@ void run_stats::aggregate_average(const std::vector<run_stats>& all_stats)
hdr_add(m_get_latency_histogram,i->m_get_latency_histogram);
hdr_add(m_set_latency_histogram,i->m_set_latency_histogram);
hdr_add(m_wait_latency_histogram,i->m_wait_latency_histogram);
hdr_add(m_totals_latency_histogram,i->m_totals_latency_histogram);

for (unsigned int j=0; j < i->m_ar_commands_latency_histograms.size(); j++) {
hdr_add(m_ar_commands_latency_histograms.at(j),i->m_ar_commands_latency_histograms.at(j));
Expand All @@ -743,6 +773,7 @@ void run_stats::aggregate_average(const std::vector<run_stats>& all_stats)
m_totals.m_set_cmd.aggregate_average(all_stats.size());
m_totals.m_get_cmd.aggregate_average(all_stats.size());
m_totals.m_wait_cmd.aggregate_average(all_stats.size());
m_totals.m_total_cmd.aggregate_average(all_stats.size());
m_totals.m_ar_commands.aggregate_average(all_stats.size());
m_totals.m_ops_sec /= all_stats.size();
m_totals.m_hits_sec /= all_stats.size();
Expand Down Expand Up @@ -790,7 +821,7 @@ void run_stats::merge(const run_stats& other, int iteration)
m_totals.add(other.m_totals);

// aggregate latency data
// hdr_add(m_totals.latency_histogram,other.m_totals.latency_histogram);
hdr_add(m_totals_latency_histogram,other.m_totals.latency_histogram);
hdr_add(m_get_latency_histogram,other.m_get_latency_histogram);
hdr_add(m_set_latency_histogram,other.m_set_latency_histogram);
hdr_add(m_wait_latency_histogram,other.m_wait_latency_histogram);
Expand Down
3 changes: 3 additions & 0 deletions run_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ class run_stats {
safe_hdr_histogram m_set_latency_histogram;
safe_hdr_histogram m_wait_latency_histogram;
std::vector<safe_hdr_histogram> m_ar_commands_latency_histograms;
safe_hdr_histogram m_totals_latency_histogram;

// instantaneous command stats ( used in the per second latencies )
safe_hdr_histogram inst_m_get_latency_histogram;
safe_hdr_histogram inst_m_set_latency_histogram;
safe_hdr_histogram inst_m_wait_latency_histogram;
std::vector<safe_hdr_histogram> inst_m_ar_commands_latency_histograms;
safe_hdr_histogram inst_m_totals_latency_histogram;

void roll_cur_stats(struct timeval* ts);

Expand Down Expand Up @@ -140,6 +142,7 @@ class run_stats {

void aggregate_average(const std::vector<run_stats>& all_stats);
void summarize(totals& result) const;
void summarize_current_second();
void merge(const run_stats& other, int iteration);
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_get();
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_set();
Expand Down
5 changes: 5 additions & 0 deletions run_stats_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ one_second_stats::one_second_stats(unsigned int second) :
m_set_cmd(),
m_get_cmd(),
m_wait_cmd(),
m_total_cmd(),
m_ar_commands()
{
reset(second);
Expand All @@ -177,13 +178,15 @@ void one_second_stats::reset(unsigned int second) {
m_get_cmd.reset();
m_set_cmd.reset();
m_wait_cmd.reset();
m_total_cmd.reset();
m_ar_commands.reset();
}

void one_second_stats::merge(const one_second_stats& other) {
m_get_cmd.merge(other.m_get_cmd);
m_set_cmd.merge(other.m_set_cmd);
m_wait_cmd.merge(other.m_wait_cmd);
m_total_cmd.merge(other.m_total_cmd);
m_ar_commands.merge(other.m_ar_commands);
}

Expand Down Expand Up @@ -269,6 +272,7 @@ totals::totals() :
m_set_cmd(),
m_get_cmd(),
m_wait_cmd(),
m_total_cmd(),
m_ar_commands(),
m_ops_sec(0),
m_bytes_sec(0),
Expand All @@ -290,6 +294,7 @@ void totals::add(const totals& other) {
m_set_cmd.add(other.m_set_cmd);
m_get_cmd.add(other.m_get_cmd);
m_wait_cmd.add(other.m_wait_cmd);
m_total_cmd.add(other.m_total_cmd);
m_ar_commands.add(other.m_ar_commands);

m_ops_sec += other.m_ops_sec;
Expand Down
3 changes: 3 additions & 0 deletions run_stats_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ class one_second_stats {
one_sec_cmd_stats m_set_cmd;
one_sec_cmd_stats m_get_cmd;
one_sec_cmd_stats m_wait_cmd;
one_sec_cmd_stats m_total_cmd;
ar_one_sec_cmd_stats m_ar_commands;
one_second_stats(unsigned int second);
void setup_arbitrary_commands(size_t n_arbitrary_commands);
void reset(unsigned int second);
void summarize();
void merge(const one_second_stats& other);
};

Expand Down Expand Up @@ -172,6 +174,7 @@ class totals {
totals_cmd m_set_cmd;
totals_cmd m_get_cmd;
totals_cmd m_wait_cmd;
totals_cmd m_total_cmd;
ar_totals_cmd m_ar_commands;
safe_hdr_histogram latency_histogram;
double m_ops_sec;
Expand Down
44 changes: 44 additions & 0 deletions tests/tests_oss_simple_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,29 @@ def test_default_set_get(env):
count = second_data["Count"]
# if we had commands on that second the BW needs to be > 0
if count > 0:
p50 = second_data["p50.00"]
p99 = second_data["p99.00"]
p999 = second_data["p99.90"]
env.assertTrue(bytes_rx > 0)
env.assertTrue(bytes_tx > 0)
env.assertTrue(p50 > 0.0)
env.assertTrue(p99 > 0.0)
env.assertTrue(p999 > 0.0)

for second_data in get_metrics_ts.values():
bytes_rx = second_data["Bytes RX"]
bytes_tx = second_data["Bytes TX"]
count = second_data["Count"]
# if we had commands on that second the BW needs to be > 0
if count > 0:
p50 = second_data["p50.00"]
p99 = second_data["p99.00"]
p999 = second_data["p99.90"]
env.assertTrue(bytes_rx > 0)
env.assertTrue(bytes_tx > 0)
env.assertTrue(p50 > 0.0)
env.assertTrue(p99 > 0.0)
env.assertTrue(p999 > 0.0)

def test_default_set_get_with_print_percentiles(env):
p_str = '0,10,20,30,40,50,60,70,80,90,95,100'
Expand Down Expand Up @@ -408,6 +421,37 @@ def test_default_arbitrary_command_keyless(env):
if not benchmark.run():
debugPrintMemtierOnError(config, env)

json_filename = '{0}/mb.json'.format(config.results_dir)
## Assert that all BW metrics are properly stored and calculated
with open(json_filename) as results_json:
results_dict = json.load(results_json)
metrics = results_dict['ALL STATS']['Pings']
metrics_ts = results_dict['ALL STATS']['Pings']["Time-Serie"]
totals_metrics_ts = results_dict['ALL STATS']['Totals']["Time-Serie"]
for metric_name in ["KB/sec RX/TX","KB/sec RX","KB/sec TX","KB/sec"]:
# assert the metric exists
env.assertTrue(metric_name in metrics)
# assert the metric value is non zero given we've had write and read
metric_value_kbs = metrics[metric_name]
env.assertTrue(metric_value_kbs > 0)

totals_metrics_ts_v = list(totals_metrics_ts.values())
for pos, second_data in enumerate(metrics_ts.values()):
bytes_rx = second_data["Bytes RX"]
bytes_tx = second_data["Bytes TX"]
count = second_data["Count"]
second_data_total = totals_metrics_ts_v[pos]
for metric_name in ["p50.00","p99.00","p99.90"]:
if count > 0:
metric_value_second_data = second_data[metric_name]
metric_value_totals_second_data = second_data_total[metric_name]
env.assertTrue(metric_value_totals_second_data == metric_value_second_data)
env.assertTrue(metric_value_second_data > 0.0)
# if we had commands on that second the BW needs to be > 0
if count > 0:
env.assertTrue(bytes_rx > 0)
env.assertTrue(bytes_tx > 0)


def test_default_arbitrary_command_set(env):
benchmark_specs = {"name": env.testName, "args": []}
Expand Down

0 comments on commit 9c0238a

Please sign in to comment.