Skip to content

Commit

Permalink
Enable arbitrary commands to be run on cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecosta90 committed May 3, 2021
1 parent 18d2646 commit 631b731
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 30 deletions.
101 changes: 85 additions & 16 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,54 @@ bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned l
}
}


void cluster_client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) {
int cmd_size = 0;

benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str());

for (unsigned int i = 0; i < cmd->command_args.size(); i++) {
const command_arg* arg = &cmd->command_args[i];

if (arg->type == const_type) {
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
} else if (arg->type == key_type) {
unsigned long long key_index;

// get key
if (!get_key_for_conn(conn_id, get_arbitrary_obj_iter_type(cmd, m_executed_command_index), &key_index)) {
return;
}

assert(key_index >= 0);
assert(m_key_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len);
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);

assert(value != NULL);
assert(value_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len);
}
}

m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, &timestamp, cmd_size);
}

// This function could use some urgent TLC -- but we need to do it without altering the behavior
void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// are we using arbitrary command?
if (m_config->arbitrary_commands->is_defined()) {
const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count,
m_executed_command_index);
create_arbitrary_request(executed_command, timestamp, conn_id);
return;
}

// If the Set:Wait ratio is not 0, start off with WAITs
if (m_config->wait_ratio.b &&
(m_tot_wait_ops == 0 ||
Expand Down Expand Up @@ -416,16 +461,28 @@ void cluster_client::create_request(struct timeval timestamp, unsigned int conn_
void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response) {
// update stats
if (request->m_type == rt_get) {
m_stats.update_moved_get_op(&timestamp,
switch (request->m_type) {
case rt_get:
m_stats.update_moved_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else if (request->m_type == rt_set) {
m_stats.update_moved_set_op(&timestamp,
break;
case rt_set:
m_stats.update_moved_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else {
assert(0);
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
m_stats.update_moved_arbitrary_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
ar->index);
break;
}
default:
assert(0);
break;
}

// connection already issued 'cluster slots' command, wait for slots mapping to be updated
Expand All @@ -444,16 +501,28 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
void cluster_client::handle_ask(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response) {
// update stats
if (request->m_type == rt_get) {
m_stats.update_ask_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else if (request->m_type == rt_set) {
m_stats.update_ask_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else {
assert(0);
switch (request->m_type) {
case rt_get:
m_stats.update_ask_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_set:
m_stats.update_ask_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
m_stats.update_ask_arbitrary_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
ar->index);
break;
}
default:
assert(0);
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions cluster_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class cluster_client : public client {

// client manager api's
virtual void handle_cluster_slots(protocol_response *r);
virtual void create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id);
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
virtual bool hold_pipeline(unsigned int conn_id);
virtual void handle_response(unsigned int conn_id, struct timeval timestamp,
Expand Down
3 changes: 0 additions & 3 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,6 @@ static bool verify_cluster_option(struct benchmark_config *cfg) {
} else if (cfg->unix_socket) {
fprintf(stderr, "error: cluster mode dose not support unix-socket option.\n");
return false;
} else if (cfg->arbitrary_commands->is_defined()) {
fprintf(stderr, "error: cluster mode dose not support arbitrary command option.\n");
return false;
}

return true;
Expand Down
47 changes: 41 additions & 6 deletions run_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ void run_stats::update_moved_set_op(struct timeval* ts, unsigned int bytes, unsi
hdr_record_value(m_set_latency_histogram,latency);
}

void run_stats::update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_moved_op(bytes, latency);
m_totals.update_op(bytes, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
}

void run_stats::update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency)
{
roll_cur_stats(ts);
Expand All @@ -214,6 +225,17 @@ void run_stats::update_ask_set_op(struct timeval* ts, unsigned int bytes, unsign
hdr_record_value(m_set_latency_histogram,latency);
}

void run_stats::update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_ask_op(bytes, latency);
m_totals.update_op(bytes, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
}

void run_stats::update_wait_op(struct timeval *ts, unsigned int latency)
{
roll_cur_stats(ts);
Expand Down Expand Up @@ -975,11 +997,18 @@ void run_stats::print_moved_sec_column(output_table &table) {

column.elements.push_back(*el.init_str("%12s ", "MOVED/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));

if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_moved_sec));
}
} else {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));

}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));
table.add_column(column);
}

Expand All @@ -989,11 +1018,17 @@ void run_stats::print_ask_sec_column(output_table &table) {

column.elements.push_back(*el.init_str("%12s ", "ASK/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_ask_sec));
}
} else {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_ask_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));

}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));
table.add_column(column);
}

Expand Down
4 changes: 4 additions & 0 deletions run_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ class run_stats {

void update_moved_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_moved_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t arbitrary_index);

void update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_ask_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t arbitrary_index);

void update_wait_op(struct timeval* ts, unsigned int latency);
void update_arbitrary_op(struct timeval *ts, unsigned int bytes,
Expand Down
9 changes: 5 additions & 4 deletions tests/include.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ def addTLSArgs(benchmark_specs, env):
benchmark_specs['args'].append('--cacert={}'.format(TLS_CACERT))


def get_default_memtier_config():
def get_default_memtier_config(threads=10, clients=5, requests=1000):
config = {
"memtier_benchmark": {
"binary": MEMTIER_BINARY,
"threads": 10,
"clients": 5,
"requests": 1000
"threads": threads,
"clients": clients,
"requests": requests
},
}
return config
Expand All @@ -106,3 +106,4 @@ def ensure_clean_benchmark_folder(dirname):
if os.path.exists(dirname):
os.removedirs(dirname)
os.makedirs(dirname)

79 changes: 78 additions & 1 deletion tests/tests_oss_simple_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def test_default_set_get_1_1(env):
# assert same number of gets and sets
env.assertEqual(merged_command_stats['cmdstat_set']['calls'], merged_command_stats['cmdstat_get']['calls'])


# run each test on different env
def test_default_set_get_3_runs(env):
env.skipOnCluster()
Expand Down Expand Up @@ -144,3 +143,81 @@ def test_default_set_get_3_runs(env):
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, merged_command_stats, overall_expected_request_count,
overall_request_count)


def test_default_arbitrary_command_pubsub(env):
benchmark_specs = {"name": env.testName, "args": ['--command=publish \"__key__\" \"__data__\"']}
addTLSArgs(benchmark_specs, env)
config = get_default_memtier_config()
master_nodes_list = env.getMasterNodesList()
overall_expected_request_count = 0

add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)

# Create a temporary directory
test_dir = tempfile.mkdtemp()

config = RunConfig(test_dir, env.testName, config, {})
ensure_clean_benchmark_folder(config.results_dir)

benchmark = Benchmark.from_json(config, benchmark_specs)

# benchmark.run() returns True if the return code of memtier_benchmark was 0
memtier_ok = benchmark.run()
debugPrintMemtierOnError(config, env, memtier_ok)


def test_default_arbitrary_command_set(env):
benchmark_specs = {"name": env.testName, "args": ['--command=SET __key__ __data__']}
addTLSArgs(benchmark_specs, env)
config = get_default_memtier_config()
master_nodes_list = env.getMasterNodesList()
overall_expected_request_count = get_expected_request_count(config)

add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)

# Create a temporary directory
test_dir = tempfile.mkdtemp()

config = RunConfig(test_dir, env.testName, config, {})
ensure_clean_benchmark_folder(config.results_dir)

benchmark = Benchmark.from_json(config, benchmark_specs)

# benchmark.run() returns True if the return code of memtier_benchmark was 0
memtier_ok = benchmark.run()
debugPrintMemtierOnError(config, env, memtier_ok)

master_nodes_connections = env.getOSSMasterNodesConnectionList()
merged_command_stats = {'cmdstat_set': {'calls': 0}}
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, merged_command_stats, overall_expected_request_count,
overall_request_count)


def test_default_arbitrary_command_hset(env):
benchmark_specs = {"name": env.testName, "args": ['--command=HSET __key__ field1 __data__']}
addTLSArgs(benchmark_specs, env)
config = get_default_memtier_config()
master_nodes_list = env.getMasterNodesList()
overall_expected_request_count = get_expected_request_count(config)

add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)

# Create a temporary directory
test_dir = tempfile.mkdtemp()

config = RunConfig(test_dir, env.testName, config, {})
ensure_clean_benchmark_folder(config.results_dir)

benchmark = Benchmark.from_json(config, benchmark_specs)

# benchmark.run() returns True if the return code of memtier_benchmark was 0
memtier_ok = benchmark.run()
debugPrintMemtierOnError(config, env, memtier_ok)

master_nodes_connections = env.getOSSMasterNodesConnectionList()
merged_command_stats = {'cmdstat_hset': {'calls': 0}}
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, merged_command_stats, overall_expected_request_count,
overall_request_count)

0 comments on commit 631b731

Please sign in to comment.