Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable arbitrary commands to be run on cluster mode #117

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3ab8c0f
Enable arbitrary commands to be run on cluster mode
filipecosta90 Apr 21, 2022
d74d161
Added multi-placeholder test for arbitrary command
filipecosta90 Apr 23, 2022
6ef066a
Added multi-key command test
filipecosta90 May 4, 2022
5150d09
Restrict key placeholder usage for 1 per command. Enabled arbitrary c…
filipecosta90 Jun 15, 2022
2d86e8c
Don't fill key_index_pool for other connections on cluster client
filipecosta90 Jun 15, 2022
26351b4
Merge branch 'master' into cluster.arbitrary.command
filipecosta90 Feb 2, 2023
e10d66a
Removed unrequired changes on the PR
filipecosta90 Feb 2, 2023
a443f2f
Making CI cluster tests more verbose
filipecosta90 Feb 2, 2023
2b664dd
ensuring that when there are multiple shards the keys pool is bellow …
filipecosta90 Feb 6, 2023
f9f324b
Revert "ensuring that when there are multiple shards the keys pool is…
filipecosta90 Feb 7, 2023
fa06f87
Merge remote-tracking branch 'origin/master' into cluster.arbitrary.c…
filipecosta90 Mar 13, 2023
9700d43
Fixed assert_minimum_memtier_outcomes inputs in test_default_arbitrar…
filipecosta90 Mar 13, 2023
ec241aa
Fixed debug print on new tests
filipecosta90 Mar 13, 2023
417ae5c
Ensuring that when there are multiple shards the keys pool is bellow …
filipecosta90 Mar 13, 2023
a51f434
Merge remote-tracking branch 'origin/master' into cluster.arbitrary.c…
filipecosta90 May 17, 2023
5d0b8f2
Revert key pool changes
filipecosta90 May 17, 2023
ae39cb6
Increase request count on oss cluster benchmarks
filipecosta90 May 17, 2023
b1c31fe
Increase request count on oss cluster benchmarks
filipecosta90 May 17, 2023
1433b7d
Cleaned unit tests
filipecosta90 May 22, 2023
643a1b3
ensuring that when there are multiple shards the keys pool is bellow …
filipecosta90 May 22, 2023
7e79da5
Don't store key from different slot in pool if cluster and arbitrary …
filipecosta90 May 22, 2023
288876a
Enabled all tests on cluster mode
filipecosta90 May 22, 2023
5247d3d
fix support for cluster mode and arbitrary commands
YaacovHazan Jun 13, 2023
5e6175f
Included keyless command test. always consume generated keys
filipecosta90 Jun 19, 2023
9cd9c82
fix cluster mode with keyless command
YaacovHazan Jun 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ jobs:
- name: Test OSS-CLUSTER TCP
timeout-minutes: 10
run: |
OSS_STANDALONE=0 OSS_CLUSTER=1 \
OSS_STANDALONE=0 OSS_CLUSTER=1 VERBOSE=1 \
./tests/run_tests.sh

- name: Test OSS-CLUSTER TCP TLS
timeout-minutes: 10
if: matrix.platform == 'ubuntu-latest'
run: |
OSS_STANDALONE=0 OSS_CLUSTER=1 TLS=1 \
OSS_STANDALONE=0 OSS_CLUSTER=1 TLS=1 VERBOSE=1 \
./tests/run_tests.sh

- name: Capture code coverage
Expand Down
180 changes: 113 additions & 67 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ bool client::setup_client(benchmark_config *config, abstract_protocol *protocol,
else if (config->distinct_client_seed)
m_obj_gen->set_random_seed(config->next_client_idx);

// Setup first arbitrary command
if (config->arbitrary_commands->is_defined())
advance_arbitrary_command_index();

// Parallel key-pattern determined according to the first command
if ((config->arbitrary_commands->is_defined() && config->arbitrary_commands->at(0).key_pattern == 'P') ||
(config->key_pattern[key_pattern_set]=='P')) {
Expand Down Expand Up @@ -244,25 +248,36 @@ bool client::hold_pipeline(unsigned int conn_id) {
return false;
}

void client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) {
get_key_response client::get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index) {
int iter;
if (m_config->arbitrary_commands->is_defined())
iter = arbitrary_obj_iter_type(command_index);
else
iter = obj_iter_type(m_config, command_index);

*key_index = m_obj_gen->get_key_index(iter);
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

return available_for_conn;
}

bool client::create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id) {
int cmd_size = 0;

benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str());
const arbitrary_command& cmd = get_arbitrary_command(command_index);

for (unsigned int i = 0; i < cmd->command_args.size(); i++) {
const command_arg* arg = &cmd->command_args[i];
benchmark_debug_log("%s: %s:\n", m_connections[conn_id]->get_readable_id(), cmd.command.c_str());

for (unsigned int i = 0; i < cmd.command_args.size(); i++) {
const command_arg* arg = &cmd.command_args[i];
if (arg->type == const_type) {
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
} else if (arg->type == key_type) {
int iter = get_arbitrary_obj_iter_type(cmd, m_executed_command_index);
unsigned int key_len;
const char *key = m_obj_gen->get_key(iter, &key_len);

assert(key != NULL);
assert(key_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, key, key_len);
unsigned long long key_index;
get_key_response res = get_key_for_conn(command_index, conn_id, &key_index);
/* If key not available for this connection, we have a bug of sending partial request */
assert(res == available_for_conn);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len);
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);
Expand All @@ -274,87 +289,118 @@ void client::create_arbitrary_request(const arbitrary_command* cmd, struct timev
}
}

m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, &timestamp, cmd_size);
m_reqs_generated++;
m_connections[conn_id]->send_arbitrary_command_end(command_index, &timestamp, cmd_size);
return true;
}

bool client::create_wait_request(struct timeval& timestamp, unsigned int conn_id) {
unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max);
unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min,
m_config->wait_timeout.max, 0,
((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min);

m_connections[conn_id]->send_wait_command(&timestamp, num_slaves, timeout);
return true;
}

bool client::create_set_request(struct timeval& timestamp, unsigned int conn_id) {
unsigned long long key_index;
get_key_response res = get_key_for_conn(SET_CMD_IDX, conn_id, &key_index);
if (res == not_available)
return false;

if (res == available_for_conn) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(key_index, &value_len);

m_connections[conn_id]->send_set_command(&timestamp, m_key_buffer, m_key_len,
value, value_len, m_obj_gen->get_expiry(),
m_config->data_offset);
}

return true;
}

bool client::create_get_request(struct timeval& timestamp, unsigned int conn_id) {
unsigned long long key_index;
get_key_response res = get_key_for_conn(GET_CMD_IDX, conn_id, &key_index);
if (res == not_available)
return false;

if (res == available_for_conn) {
m_connections[conn_id]->send_get_command(&timestamp, m_key_buffer, m_key_len, m_config->data_offset);
}

return true;
}

bool client::create_mget_request(struct timeval& timestamp, unsigned int conn_id) {
unsigned long long key_index;
unsigned int keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;

m_keylist->clear();
for (unsigned int i = 0; i < keys_count; i++) {
get_key_response res = get_key_for_conn(GET_CMD_IDX, conn_id, &key_index);
/* Not supported in cluster mode */
assert(res == available_for_conn);

m_keylist->add_key(m_key_buffer, m_key_len);
}

m_connections[conn_id]->send_mget_command(&timestamp, m_keylist);
return true;
}

// This function could use some urgent TLC -- but we need to do it without altering the behavior
void client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// are we using arbitrary command?
if (m_config->arbitrary_commands->is_defined()) {
const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count,
m_executed_command_index);
create_arbitrary_request(executed_command, timestamp, conn_id);

if (create_arbitrary_request(m_executed_command_index, timestamp, conn_id)) {
advance_arbitrary_command_index();
m_reqs_generated++;
}
return;
}

// If the Set:Wait ratio is not 0, start off with WAITs
if (m_config->wait_ratio.b &&
(m_tot_wait_ops == 0 ||
(m_tot_set_ops/m_tot_wait_ops > m_config->wait_ratio.a/m_config->wait_ratio.b))) {
if (!create_wait_request(timestamp, conn_id))
return;

m_tot_wait_ops++;

unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max);
unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min,
m_config->wait_timeout.max, 0,
((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min);

m_connections[conn_id]->send_wait_command(&timestamp, num_slaves, timeout);
m_reqs_generated++;
m_tot_wait_ops++;
}

// are we set or get? this depends on the ratio
else if (m_set_ratio_count < m_config->ratio.a) {
// set command
data_object *obj = m_obj_gen->get_object(obj_iter_type(m_config, 0));
unsigned int key_len;
const char *key = obj->get_key(&key_len);
unsigned int value_len;
const char *value = obj->get_value(&value_len);
if (!create_set_request(timestamp, conn_id))
return;

m_connections[conn_id]->send_set_command(&timestamp, key, key_len,
value, value_len, obj->get_expiry(),
m_config->data_offset);
m_reqs_generated++;
m_set_ratio_count++;
m_reqs_generated++;
m_tot_set_ops++;
} else if (m_get_ratio_count < m_config->ratio.b) {
// get command
int iter = obj_iter_type(m_config, 2);

if (m_config->multi_key_get > 0) {
unsigned int keys_count;

keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;
// GET command
if (!m_config->multi_key_get) {
if (!create_get_request(timestamp, conn_id))
return;

m_keylist->clear();
while (m_keylist->get_keys_count() < keys_count) {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);

assert(key != NULL);
assert(keylen > 0);

m_keylist->add_key(key, keylen);
}

m_connections[conn_id]->send_mget_command(&timestamp, m_keylist);
m_reqs_generated++;
m_get_ratio_count += keys_count;
} else {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);
assert(key != NULL);
assert(keylen > 0);

m_connections[conn_id]->send_get_command(&timestamp, key, keylen, m_config->data_offset);
m_reqs_generated++;
m_get_ratio_count++;
m_reqs_generated++;
return;
}

// MGET command
if (!create_mget_request(timestamp, conn_id))
return;

m_get_ratio_count += m_config->multi_key_get;
m_reqs_generated++;
} else {
// overlap counters
m_get_ratio_count = m_set_ratio_count = 0;
Expand Down
51 changes: 43 additions & 8 deletions client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ struct benchmark_config;
class object_generator;
class data_object;

#define SET_CMD_IDX 0
#define GET_CMD_IDX 2

enum get_key_response { not_available, available_for_conn, available_for_other_conn };

class client : public connections_manager {
protected:

Expand All @@ -56,6 +61,10 @@ class client : public connections_manager {
bool m_initialized;
bool m_end_set;

// key buffer
char m_key_buffer[250];
int m_key_len;

// test related
benchmark_config* m_config;
object_generator* m_obj_gen;
Expand All @@ -77,13 +86,18 @@ class client : public connections_manager {
client(client_group* group);
client(struct event_base *event_base, benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen);
virtual ~client();
virtual bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen);
virtual int prepare(void);

bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen);
int prepare(void);
bool initialized(void);

run_stats* get_stats(void) { return &m_stats; }

virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index);
virtual bool create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id);
bool create_wait_request(struct timeval& timestamp, unsigned int conn_id);
bool create_set_request(struct timeval& timestamp, unsigned int conn_id);
bool create_get_request(struct timeval& timestamp, unsigned int conn_id);
bool create_mget_request(struct timeval& timestamp, unsigned int conn_id);

// client manager api's
unsigned long long get_reqs_processed() {
return m_reqs_processed;
Expand All @@ -110,13 +124,33 @@ class client : public connections_manager {
virtual bool finished(void);
virtual void set_start_time();
virtual void set_end_time();
virtual void create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id);
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
virtual bool hold_pipeline(unsigned int conn_id);
virtual int connect(void);
virtual void disconnect(void);
//

/* Get current executed arbitrary command */
const arbitrary_command & get_arbitrary_command(unsigned int command_index) {
return m_config->arbitrary_commands->at(command_index);
}

/* Set the arbitrary command index to the next to be executed */
void advance_arbitrary_command_index() {
while(true) {
if (m_arbitrary_command_ratio_count < get_arbitrary_command(m_executed_command_index).ratio) {
m_arbitrary_command_ratio_count++;
return;
} else {
m_arbitrary_command_ratio_count = 0;
m_executed_command_index++;
if (m_executed_command_index == m_config->arbitrary_commands->size()) {
m_executed_command_index = 0;
}
}
}

}
// Utility function to get the object iterator type based on the config
inline int obj_iter_type(benchmark_config *cfg, unsigned char index)
{
Expand All @@ -132,10 +166,11 @@ class client : public connections_manager {
}
}

inline int get_arbitrary_obj_iter_type(const arbitrary_command* cmd, unsigned int index) {
if (cmd->key_pattern == 'R') {
inline int arbitrary_obj_iter_type(unsigned int index) {
const arbitrary_command& cmd = get_arbitrary_command(index);
if (cmd.key_pattern == 'R') {
return OBJECT_GENERATOR_KEY_RANDOM;
} else if (cmd->key_pattern == 'G') {
} else if (cmd.key_pattern == 'G') {
return OBJECT_GENERATOR_KEY_GAUSSIAN;
} else {
return index;
Expand Down
Loading
Loading