From 528d3bcf310d06f2461d136f5c8bac145e5dd9ef Mon Sep 17 00:00:00 2001 From: YaacovHazan Date: Thu, 7 Sep 2017 14:43:27 +0300 Subject: [PATCH] add support for cluster mode new 'cluster_client' add to support cluster mode. the new client use CLUSTER_SLOTS command to retrive the key-->node mapping, create separate connection fot each node, and send the commands to the appropriate connection according to hash calculation. --- Makefile.am | 2 + README.md | 8 + client.cpp | 576 +++++++-------------------------------- client.h | 116 ++++---- cluster_client.cpp | 357 +++++++++++++++++++++++++ cluster_client.h | 55 ++++ connections_manager.h | 44 +++ memtier_benchmark.cpp | 29 +- memtier_benchmark.h | 1 + obj_gen.cpp | 45 +++- obj_gen.h | 6 +- protocol.cpp | 153 ++++++++++- protocol.h | 67 ++++- shard_connection.cpp | 606 ++++++++++++++++++++++++++++++++++++++++++ shard_connection.h | 162 +++++++++++ 15 files changed, 1659 insertions(+), 568 deletions(-) create mode 100644 cluster_client.cpp create mode 100644 cluster_client.h create mode 100644 connections_manager.h create mode 100644 shard_connection.cpp create mode 100644 shard_connection.h diff --git a/Makefile.am b/Makefile.am index c529ff17..fa4d57b2 100644 --- a/Makefile.am +++ b/Makefile.am @@ -25,6 +25,8 @@ memtier_benchmark_CPPFLAGS = $(LIBEVENT_CFLAGS) memtier_benchmark_SOURCES = \ memtier_benchmark.cpp memtier_benchmark.h \ client.cpp client.h \ + cluster_client.cpp cluster_client.h \ + shard_connection.cpp shard_connection.h connections_manager.h \ JSON_handler.cpp JSON_handler.h \ protocol.cpp protocol.h \ obj_gen.cpp obj_gen.h \ diff --git a/README.md b/README.md index 3b1d4705..a0c6c0d5 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ memtier_benchmark is a command line utility developed by Redis Labs (formerly Ga * Read:Write ratio * Random and sequential key name pattern policies * Random or ranged key expiration + * Redis cluster * ...and much more Read more at: @@ -73,6 +74,13 @@ On recent Ubuntu versions, simply install all prerequisites as follows: ``` +### Cluster mode + +In case where there is some asymmetry between the redis nodes, and user set +the number of total requests with sequential key pattern options, it might be +gaps in the generated keys. + + ### Building and installing After downloading the source tree, use standard autoconf/automake commands:: diff --git a/client.cpp b/client.cpp index 075ed974..b644c39d 100755 --- a/client.cpp +++ b/client.cpp @@ -46,10 +46,10 @@ #include #include +#include #include "client.h" -#include "obj_gen.h" -#include "memtier_benchmark.h" +#include "cluster_client.h" float get_2_meaningful_digits(float val) { @@ -60,16 +60,6 @@ float get_2_meaningful_digits(float val) return new_val; } -void client_event_handler(evutil_socket_t sfd, short evtype, void *opaque) -{ - client *c = (client *) opaque; - - assert(c != NULL); - assert(c->get_sockfd() == sfd); - - c->handle_event(evtype); -} - inline long long int ts_diff(struct timeval a, struct timeval b) { unsigned long long aval = a.tv_sec * 1000000 + a.tv_usec; @@ -98,39 +88,14 @@ inline timeval timeval_factorial_average(timeval a, timeval b, unsigned int weig return (tv); } -client::request::request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys) - : m_type(type), m_size(size), m_keys(keys) -{ - if (sent_time != NULL) - m_sent_time = *sent_time; - else { - gettimeofday(&m_sent_time, NULL); - } -} - bool client::setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *objgen) { m_config = config; assert(m_config != NULL); - if (m_config->unix_socket) { - m_unix_sockaddr = (struct sockaddr_un *) malloc(sizeof(struct sockaddr_un)); - assert(m_unix_sockaddr != NULL); - - m_unix_sockaddr->sun_family = AF_UNIX; - strncpy(m_unix_sockaddr->sun_path, m_config->unix_socket, sizeof(m_unix_sockaddr->sun_path)-1); - m_unix_sockaddr->sun_path[sizeof(m_unix_sockaddr->sun_path)-1] = '\0'; - } - - m_read_buf = evbuffer_new(); - assert(m_read_buf != NULL); - - m_write_buf = evbuffer_new(); - assert(m_write_buf != NULL); - - m_protocol = protocol->clone(); - assert(m_protocol != NULL); - m_protocol->set_buffers(m_read_buf, m_write_buf); + // create main connection + shard_connection* conn = new shard_connection(m_connections.size(), this, m_config, m_event_base, protocol); + m_connections.push_back(conn); m_obj_gen = objgen->clone(); assert(m_obj_gen != NULL); @@ -155,43 +120,33 @@ bool client::setup_client(benchmark_config *config, abstract_protocol *protocol, m_keylist = new keylist(m_config->multi_key_get + 1); assert(m_keylist != NULL); + return true; } -client::client(client_group* group) : - m_sockfd(-1), m_unix_sockaddr(NULL), m_event(NULL), m_event_base(NULL), - m_read_buf(NULL), m_write_buf(NULL), m_initialized(false), m_connected(false), - m_authentication(auth_none), m_db_selection(select_none), - m_config(NULL), m_protocol(NULL), m_obj_gen(NULL), - m_reqs_processed(0), - m_set_ratio_count(0), - m_get_ratio_count(0) +client::client(client_group* group) : + m_event_base(NULL), m_initialized(false), m_end_set(false), m_config(NULL), + m_obj_gen(NULL), m_reqs_processed(0), m_set_ratio_count(0), m_get_ratio_count(0), + m_tot_set_ops(0), m_tot_wait_ops(0) { m_event_base = group->get_event_base(); if (!setup_client(group->get_config(), group->get_protocol(), group->get_obj_gen())) { return; } - + benchmark_debug_log("new client %p successfully set up.\n", this); m_initialized = true; } -client::client(struct event_base *event_base, - benchmark_config *config, - abstract_protocol *protocol, - object_generator *obj_gen) : - m_sockfd(-1), m_unix_sockaddr(NULL), m_event(NULL), m_event_base(NULL), - m_read_buf(NULL), m_write_buf(NULL), m_initialized(false), m_connected(false), - m_authentication(auth_none), m_db_selection(select_none), - m_config(NULL), m_protocol(NULL), m_obj_gen(NULL), - m_reqs_processed(0), - m_set_ratio_count(0), - m_get_ratio_count(0), - m_tot_set_ops(0), - m_tot_wait_ops(0) +client::client(struct event_base *event_base, benchmark_config *config, + abstract_protocol *protocol, object_generator *obj_gen) : + m_event_base(NULL), m_initialized(false), m_end_set(false), m_config(NULL), + m_obj_gen(NULL), m_reqs_processed(0), m_set_ratio_count(0), m_get_ratio_count(0), + m_tot_set_ops(0), m_tot_wait_ops(0) { m_event_base = event_base; + if (!setup_client(config, protocol, obj_gen)) { return; } @@ -202,35 +157,11 @@ client::client(struct event_base *event_base, client::~client() { - if (m_event != NULL) { - event_free(m_event); - m_event = NULL; - } - - if (m_unix_sockaddr != NULL) { - free(m_unix_sockaddr); - m_unix_sockaddr = NULL; - } - - if (m_read_buf != NULL) { - evbuffer_free(m_read_buf); - m_read_buf = NULL; - } - - if (m_write_buf != NULL) { - evbuffer_free(m_write_buf); - m_write_buf = NULL; - } - - if (m_sockfd != -1) { - close(m_sockfd); - m_sockfd = -1; - } - - if (m_protocol != NULL) { - delete m_protocol; - m_protocol = NULL; + for (unsigned int i = 0; i < m_connections.size(); i++) { + shard_connection* sc = m_connections[i]; + delete sc; } + m_connections.clear(); if (m_obj_gen != NULL) { delete m_obj_gen; @@ -250,194 +181,43 @@ bool client::initialized(void) void client::disconnect(void) { - if (m_sockfd != -1) { - close(m_sockfd); - m_sockfd = -1; - } - - evbuffer_drain(m_read_buf, evbuffer_get_length(m_read_buf)); - evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf)); - - int ret = event_del(m_event); - assert(ret == 0); + shard_connection* sc = MAIN_CONNECTION; + assert(sc != NULL); - m_connected = false; - m_authentication = auth_none; - m_db_selection = select_none; + sc->disconnect(); } int client::connect(void) { struct connect_info addr; - // clean up existing socket/buffers - if (m_sockfd != -1) - close(m_sockfd); - evbuffer_drain(m_read_buf, evbuffer_get_length(m_read_buf)); - evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf)); - - if (m_unix_sockaddr != NULL) { - m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); - if (m_sockfd < 0) { - return -errno; - } - } else { - if (m_config->server_addr->get_connect_info(&addr) != 0) { - benchmark_error_log("connect: resolve error: %s\n", m_config->server_addr->get_last_error()); - return -1; - } - - // initialize socket - m_sockfd = socket(addr.ci_family, addr.ci_socktype, addr.ci_protocol); - if (m_sockfd < 0) { - return -errno; - } - - // configure socket behavior - struct linger ling = {0, 0}; - int flags = 1; - int error = setsockopt(m_sockfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); - assert(error == 0); - - error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); - assert(error == 0); - - error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); - assert(error == 0); - } - - // set non-blocking behavior - int flags = 1; - if ((flags = fcntl(m_sockfd, F_GETFL, 0)) < 0 || - fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK) < 0) { - benchmark_error_log("connect: failed to set non-blocking flag.\n"); - close(m_sockfd); - m_sockfd = -1; - return -1; - } - - // set up event - if (!m_event) { - m_event = event_new(m_event_base, - m_sockfd, EV_WRITE, client_event_handler, (void *)this); - assert(m_event != NULL); - } else { - int ret = event_del(m_event); - assert(ret == 0); + // get primary connection + shard_connection* sc = MAIN_CONNECTION; + assert(sc != NULL); - ret = event_assign(m_event, m_event_base, - m_sockfd, EV_WRITE, client_event_handler, (void *)this); - assert(ret == 0); - } - - int ret = event_add(m_event, NULL); - assert(ret == 0); - - // call connect - if (::connect(m_sockfd, - m_unix_sockaddr ? (struct sockaddr *) m_unix_sockaddr : addr.ci_addr, - m_unix_sockaddr ? sizeof(struct sockaddr_un) : addr.ci_addrlen) == -1) { - if (errno == EINPROGRESS || errno == EWOULDBLOCK) - return 0; - benchmark_error_log("connect failed, error = %s\n", strerror(errno)); + // get address information + if (m_config->server_addr->get_connect_info(&addr) != 0) { + benchmark_error_log("connect: resolve error: %s\n", m_config->server_addr->get_last_error()); return -1; } - - return 0; -} -void client::handle_event(short evtype) -{ - // connect() returning to us? normally we expect EV_WRITE, but for UNIX domain - // sockets we workaround since connect() returned immediately, but we don't want - // to do any I/O from the client::connect() call... - if (!m_connected && (evtype == EV_WRITE || m_unix_sockaddr != NULL)) { - int error = -1; - socklen_t errsz = sizeof(error); - - if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, (void *) &error, &errsz) == -1) { - benchmark_error_log("connect: error getting connect response (getsockopt): %s\n", strerror(errno)); - return; - } + // Just in case we got domain name and not ip, we convert it + struct sockaddr_in *ipv4 = (struct sockaddr_in *)addr.ci_addr; + char address[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(ipv4->sin_addr), address, INET_ADDRSTRLEN); - if (error != 0) { - benchmark_error_log("connect: connection failed: %s\n", strerror(error)); - return; - } + char port_str[20]; + snprintf(port_str, sizeof(port_str)-1, "%u", m_config->port); - m_connected = true; - if (!m_reqs_processed) { - process_first_request(); - } else { - benchmark_debug_log("reconnection complete, proceeding with test\n"); - fill_pipeline(); - } - } - - assert(m_connected == true); - if ((evtype & EV_WRITE) == EV_WRITE && evbuffer_get_length(m_write_buf) > 0) { - if (evbuffer_write(m_write_buf, m_sockfd) < 0) { - if (errno != EWOULDBLOCK) { - benchmark_error_log("write error: %s\n", strerror(errno)); - disconnect(); - - return; - } - } - } - - if ((evtype & EV_READ) == EV_READ) { - int ret = 1; - while (ret > 0) { - ret = evbuffer_read(m_read_buf, m_sockfd, -1); - } + // save address and port + sc->set_address_port(address, port_str); - if (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - benchmark_error_log("read error: %s\n", strerror(errno)); - disconnect(); - - return; - } - if (ret == 0) { - benchmark_error_log("connection dropped.\n"); - disconnect(); - - return; - } - - if (evbuffer_get_length(m_read_buf) > 0) { - process_response(); - - // process_response may have disconnected, in which case - // we just abort and wait for libevent to call us back sometime - if (!m_connected) { - return; - } - - } - } - - // update event - short new_evtype = 0; - if (!finished()) { - new_evtype = EV_READ; - } - if (evbuffer_get_length(m_write_buf) > 0) { - assert(finished() == false); - new_evtype |= EV_WRITE; - } - - if (new_evtype) { - int ret = event_assign(m_event, m_event_base, - m_sockfd, new_evtype, client_event_handler, (void *)this); - assert(ret == 0); + // call connect + int ret = sc->connect(&addr); + if (ret) + return ret; - ret = event_add(m_event, NULL); - assert(ret == 0); - } else { - benchmark_debug_log("nothing else to do, test is finished.\n"); - m_stats.set_end_time(NULL); - } + return 0; } bool client::finished(void) @@ -449,59 +229,42 @@ bool client::finished(void) return false; } -bool client::send_conn_setup_commands(struct timeval timestamp) -{ - bool sent = false; - - if (m_config->authenticate && m_authentication != auth_done) { - if (m_authentication == auth_none) { - benchmark_debug_log("sending authentication command.\n"); - m_protocol->authenticate(m_config->authenticate); - m_pipeline.push(new client::request(rt_auth, 0, ×tamp, 0)); - m_authentication = auth_sent; - sent = true; - } - } - if (m_config->select_db && m_db_selection != select_done) { - if (m_db_selection == select_none) { - benchmark_debug_log("sending db selection command.\n"); - m_protocol->select_db(m_config->select_db); - m_pipeline.push(new client::request(rt_select_db, 0, ×tamp, 0)); - m_db_selection = select_sent; - sent = true; - } - } +void client::set_start_time() { + struct timeval now; - return sent; + gettimeofday(&now, NULL); + m_stats.set_start_time(&now); } -bool client::is_conn_setup_done(void) -{ - if (m_config->authenticate && m_authentication != auth_done) - return false; - if (m_config->select_db && m_db_selection != select_done) - return false; - return true; +void client::set_end_time() { + // update only once + if (!m_end_set) { + benchmark_debug_log("nothing else to do, test is finished.\n"); + + m_stats.set_end_time(NULL); + m_end_set = true; + } } -/* - * Utility function to get the object iterator type based on the config - */ -static inline -int obj_iter_type(benchmark_config *cfg, unsigned char index) -{ - if (cfg->key_pattern[index] == 'R') - return OBJECT_GENERATOR_KEY_RANDOM; - else if (cfg->key_pattern[index] == 'G') - return OBJECT_GENERATOR_KEY_GAUSSIAN; - return OBJECT_GENERATOR_KEY_SET_ITER; +bool client::hold_pipeline(unsigned int conn_id) { + // don't exceed requests + if (m_config->requests) { + if (m_reqs_generated >= m_config->requests) + return true; + } + + // if we have reconnect_interval stop enlarging the pipeline on time + if (m_config->reconnect_interval) { + if ((m_reqs_processed % m_config->reconnect_interval) + (m_reqs_generated - m_reqs_processed) >= m_config->reconnect_interval) + return true; + } + + return false; } // This function could use some urgent TLC -- but we need to do it without altering the behavior -void client::create_request(struct timeval timestamp) +void client::create_request(struct timeval timestamp, unsigned int conn_id) { - int cmd_size = 0; - // If the Set:Wait ratio is not 0, start off with WAITs if (m_config->wait_ratio.b && (m_tot_wait_ops == 0 || @@ -514,9 +277,8 @@ void client::create_request(struct timeval timestamp) m_config->wait_timeout.max, 0, ((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min); - benchmark_debug_log("WAIT num_slaves=%u timeout=%u\n", num_slaves, timeout); - cmd_size = m_protocol->write_command_wait(num_slaves, timeout); - m_pipeline.push(new client::request(rt_wait, cmd_size, ×tamp, 0)); + m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout); + m_reqs_generated++; } // are we set or get? this depends on the ratio else if (m_set_ratio_count < m_config->ratio.a) { @@ -527,15 +289,12 @@ void client::create_request(struct timeval timestamp) unsigned int value_len; const char *value = obj->get_value(&value_len); + m_connections[conn_id]->send_set_command(×tamp, key, key_len, + value, value_len, obj->get_expiry(), + m_config->data_offset); + m_reqs_generated++; m_set_ratio_count++; m_tot_set_ops++; - - benchmark_debug_log("SET key=[%.*s] value_len=%u expiry=%u\n", - key_len, key, value_len, obj->get_expiry()); - cmd_size = m_protocol->write_command_set(key, key_len, value, value_len, - obj->get_expiry(), m_config->data_offset); - - m_pipeline.push(new client::request(rt_set, cmd_size, ×tamp, 1)); } else if (m_get_ratio_count < m_config->ratio.b) { // get command int iter = obj_iter_type(m_config, 2); @@ -558,28 +317,18 @@ void client::create_request(struct timeval timestamp) m_keylist->add_key(key, keylen); } - const char *first_key, *last_key; - unsigned int first_key_len, last_key_len; - first_key = m_keylist->get_key(0, &first_key_len); - last_key = m_keylist->get_key(m_keylist->get_keys_count()-1, &last_key_len); - - benchmark_debug_log("MGET %d keys [%.*s] .. [%.*s]\n", - m_keylist->get_keys_count(), first_key_len, first_key, last_key_len, last_key); - - cmd_size = m_protocol->write_command_multi_get(m_keylist); + m_connections[conn_id]->send_mget_command(×tamp, m_keylist); + m_reqs_generated++; m_get_ratio_count += keys_count; - m_pipeline.push(new client::request(rt_get, cmd_size, ×tamp, m_keylist->get_keys_count())); } else { unsigned int keylen; const char *key = m_obj_gen->get_key(iter, &keylen); assert(key != NULL); assert(keylen > 0); - - benchmark_debug_log("GET key=[%.*s]\n", keylen, key); - cmd_size = m_protocol->write_command_get(key, keylen, m_config->data_offset); + m_connections[conn_id]->send_get_command(×tamp, key, keylen, m_config->data_offset); + m_reqs_generated++; m_get_ratio_count++; - m_pipeline.push(new client::request(rt_get, cmd_size, ×tamp, 1)); } } else { // overlap counters @@ -587,35 +336,9 @@ void client::create_request(struct timeval timestamp) } } -void client::fill_pipeline(void) -{ - struct timeval now; - gettimeofday(&now, NULL); - - while (!finished() && m_pipeline.size() < m_config->pipeline) { - if (!is_conn_setup_done()) { - send_conn_setup_commands(now); - return; - } - - // don't exceed requests - if (m_config->requests > 0 && m_reqs_processed + m_pipeline.size() >= m_config->requests) - break; - - // if we have reconnect_interval stop enlarging the pipeline - // on time - if (m_config->reconnect_interval) { - if ((m_reqs_processed % m_config->reconnect_interval) + m_pipeline.size() >= m_config->reconnect_interval) - return; - } - - create_request(now); - } -} - int client::prepare(void) -{ - if (!m_unix_sockaddr && (!m_config->server_addr || !m_protocol)) +{ + if (MAIN_CONNECTION == NULL) return -1; int ret = this->connect(); @@ -627,15 +350,6 @@ int client::prepare(void) return 0; } -void client::process_first_request(void) -{ - struct timeval now; - - gettimeofday(&now, NULL); - m_stats.set_start_time(&now); - fill_pipeline(); -} - void client::handle_response(struct timeval timestamp, request *request, protocol_response *response) { switch (request->m_type) { @@ -661,120 +375,15 @@ void client::handle_response(struct timeval timestamp, request *request, protoco } } -void client::process_response(void) -{ - int ret; - bool responses_handled = false; - - struct timeval now; - gettimeofday(&now, NULL); - - while ((ret = m_protocol->parse_response()) > 0) { - bool error = false; - protocol_response *r = m_protocol->get_response(); - - client::request* req = m_pipeline.front(); - m_pipeline.pop(); - - if (req->m_type == rt_auth) { - if (r->is_error()) { - benchmark_error_log("error: authentication failed [%s]\n", r->get_status()); - error = true; - } else { - m_authentication = auth_done; - benchmark_debug_log("authentication successful.\n"); - } - } else if (req->m_type == rt_select_db) { - if (strcmp(r->get_status(), "+OK") != 0) { - benchmark_error_log("database selection failed.\n"); - error = true; - } else { - benchmark_debug_log("database selection successful.\n"); - m_db_selection = select_done; - } - } else { - benchmark_debug_log("handled response (first line): %s, %d hits, %d misses\n", - r->get_status(), - r->get_hits(), - req->m_keys - r->get_hits()); - - if (r->is_error()) { - benchmark_error_log("error response: %s\n", r->get_status()); - } - - handle_response(now, req, r); - - m_reqs_processed++; - responses_handled = true; - } - delete req; - if (error) { - return; - } - } - - if (ret == -1) { - benchmark_error_log("error: response parsing failed.\n"); - } - - if (m_config->reconnect_interval > 0 && responses_handled) { - if ((m_reqs_processed % m_config->reconnect_interval) == 0) { - assert(m_pipeline.size() == 0); - benchmark_debug_log("reconnecting, m_reqs_processed = %u\n", m_reqs_processed); - disconnect(); - - ret = connect(); - assert(ret == 0); - - return; - } - } - - fill_pipeline(); -} - /////////////////////////////////////////////////////////////////////////// -verify_client::verify_request::verify_request(request_type type, - unsigned int size, - struct timeval* sent_time, - unsigned int keys, - const char *key, - unsigned int key_len, - const char *value, - unsigned int value_len) : - client::request(type, size, sent_time, keys), - m_key(NULL), m_key_len(0), - m_value(NULL), m_value_len(0) -{ - m_key_len = key_len; - m_key = (char *)malloc(key_len); - memcpy(m_key, key, m_key_len); - - m_value_len = value_len; - m_value = (char *)malloc(value_len); - memcpy(m_value, value, m_value_len); -} - -verify_client::verify_request::~verify_request(void) -{ - if (m_key != NULL) { - free((void *) m_key); - m_key = NULL; - } - if (m_value != NULL) { - free((void *) m_value); - m_value = NULL; - } -} - verify_client::verify_client(struct event_base *event_base, benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen) : client(event_base, config, protocol, obj_gen), m_finished(false), m_verified_keys(0), m_errors(0) { - m_protocol->set_keep_value(true); + MAIN_CONNECTION->get_protocol()->set_keep_value(true); } unsigned long long int verify_client::get_verified_keys(void) @@ -787,7 +396,7 @@ unsigned long long int verify_client::get_errors(void) return m_errors; } -void verify_client::create_request(struct timeval timestamp) +void verify_client::create_request(struct timeval timestamp, unsigned int conn_id) { // TODO: Refactor client::create_request so this can be unified. if (m_set_ratio_count < m_config->ratio.a) { @@ -798,13 +407,12 @@ void verify_client::create_request(struct timeval timestamp) const char *key = obj->get_key(&key_len); unsigned int value_len; const char *value = obj->get_value(&value_len); - unsigned int cmd_size; - m_set_ratio_count++; - cmd_size = m_protocol->write_command_get(key, key_len, m_config->data_offset); + m_connections[conn_id]->send_verify_get_command(×tamp, key, key_len, + value, value_len, obj->get_expiry(), + m_config->data_offset); - m_pipeline.push(new verify_client::verify_request(rt_get, - cmd_size, ×tamp, 1, key, key_len, value, value_len)); + m_set_ratio_count++; } else if (m_get_ratio_count < m_config->ratio.b) { // We don't really care about GET operations, all we do here is keep // the object generator synced. @@ -904,7 +512,13 @@ client_group::~client_group(void) int client_group::create_clients(int num) { for (int i = 0; i < num; i++) { - client* c = new client(this); + client* c; + + if (m_config->cluster_mode) + c = new cluster_client(this); + else + c = new client(this); + assert(c != NULL); if (!c->initialized()) { diff --git a/client.h b/client.h index 8f736f04..afc7ab54 100755 --- a/client.h +++ b/client.h @@ -33,6 +33,13 @@ #include "protocol.h" #include "JSON_handler.h" +#include "config_types.h" +#include "shard_connection.h" +#include "connections_manager.h" +#include "obj_gen.h" +#include "memtier_benchmark.h" + +#define MAIN_CONNECTION m_connections[0] class client; // forward decl class client_group; // forward decl @@ -131,42 +138,22 @@ class run_stats { unsigned long int get_total_latency(void); }; -class client { +class client : public connections_manager { protected: - friend void client_event_handler(evutil_socket_t sfd, short evtype, void *opaque); - // connection related - int m_sockfd; - struct sockaddr_un* m_unix_sockaddr; - struct event* m_event; + std::vector m_connections; + struct event_base* m_event_base; - struct evbuffer *m_read_buf; - struct evbuffer *m_write_buf; bool m_initialized; - bool m_connected; - enum authentication_state { auth_none, auth_sent, auth_done } m_authentication; - enum select_db_state { select_none, select_sent, select_done } m_db_selection; + bool m_end_set; // test related benchmark_config* m_config; - abstract_protocol* m_protocol; object_generator* m_obj_gen; run_stats m_stats; - // pipeline management - enum request_type { rt_unknown, rt_set, rt_get, rt_wait,rt_auth, rt_select_db }; - struct request { - request_type m_type; - struct timeval m_sent_time; - unsigned int m_size; - unsigned int m_keys; - - request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys); - virtual ~request(void) {} - }; - std::queue m_pipeline; - unsigned int m_reqs_processed; // requests processed (responses received) + unsigned int m_reqs_generated; // requests generated (wait for responses) unsigned int m_set_ratio_count; // number of sets counter (overlaps on ratio) unsigned int m_get_ratio_count; // number of gets counter (overlaps on ratio) @@ -175,56 +162,67 @@ class client { keylist *m_keylist; // used to construct multi commands - bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); - int connect(void); - void disconnect(void); - - void handle_event(short evtype); - int get_sockfd(void) { return m_sockfd; } - - virtual bool finished(); - virtual void create_request(struct timeval timestamp); - virtual void handle_response(struct timeval timestamp, request *request, protocol_response *response); - - bool send_conn_setup_commands(struct timeval timestamp); - bool is_conn_setup_done(void); - void fill_pipeline(void); - void process_first_request(void); - void process_response(void); public: client(client_group* group); client(struct event_base *event_base, benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); virtual ~client(); + virtual bool setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); + virtual int prepare(void); bool initialized(void); - int prepare(void); + run_stats* get_stats(void) { return &m_stats; } + + // client manager api's + unsigned int get_reqs_processed() { + return m_reqs_processed; + } + + void inc_reqs_processed() { + m_reqs_processed++; + } + + unsigned int get_reqs_generated() { + return m_reqs_generated; + } + + void inc_reqs_generated() { + m_reqs_generated++; + } + + virtual void handle_cluster_slots(protocol_response *r) { + assert(false && "handle_cluster_slots not supported"); + } + + virtual void handle_response(struct timeval timestamp, request *request, protocol_response *response); + virtual bool finished(void); + virtual void set_start_time(); + virtual void set_end_time(); + virtual void create_request(struct timeval timestamp, unsigned int conn_id); + virtual bool hold_pipeline(unsigned int conn_id); + virtual int connect(void); + virtual void disconnect(void); + // + + // Utility function to get the object iterator type based on the config + inline int obj_iter_type(benchmark_config *cfg, unsigned char index) + { + if (cfg->key_pattern[index] == 'R') + return OBJECT_GENERATOR_KEY_RANDOM; + else if (cfg->key_pattern[index] == 'G') + return OBJECT_GENERATOR_KEY_GAUSSIAN; + return OBJECT_GENERATOR_KEY_SET_ITER; + } }; class verify_client : public client { protected: - struct verify_request : public request { - char *m_key; - unsigned int m_key_len; - char *m_value; - unsigned int m_value_len; - - verify_request(request_type type, - unsigned int size, - struct timeval* sent_time, - unsigned int keys, - const char *key, - unsigned int key_len, - const char *value, - unsigned int value_len); - virtual ~verify_request(void); - }; bool m_finished; unsigned long long int m_verified_keys; unsigned long long int m_errors; virtual bool finished(void); - virtual void create_request(struct timeval timestamp); + virtual void create_request(struct timeval timestamp, unsigned int conn_id); virtual void handle_response(struct timeval timestamp, request *request, protocol_response *response); public: verify_client(struct event_base *event_base, benchmark_config *config, abstract_protocol *protocol, object_generator *obj_gen); diff --git a/cluster_client.cpp b/cluster_client.cpp new file mode 100644 index 00000000..230d2f17 --- /dev/null +++ b/cluster_client.cpp @@ -0,0 +1,357 @@ +/* + * Copyright (C) 2011-2017 Redis Labs Ltd. + * + * This file is part of memtier_benchmark. + * + * memtier_benchmark is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 2. + * + * memtier_benchmark is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with memtier_benchmark. If not, see . + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_FCNTL_H +#include +#endif +#include +#include +#include +#include +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif +#ifdef HAVE_LIMITS_H +#include +#endif + +#ifdef HAVE_ASSERT_H +#include +#endif + +#include "cluster_client.h" +#include "memtier_benchmark.h" +#include "obj_gen.h" +#include "shard_connection.h" + +#define KEY_INDEX_QUEUE_MAX_SIZE 1000000 + +#define MAX_CLUSTER_HSLOT 16383 +static const uint16_t crc16tab[256]= { + 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, + 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, + 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, + 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, + 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, + 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, + 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, + 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, + 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, + 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, + 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, + 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, + 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, + 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, + 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, + 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, + 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, + 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, + 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, + 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, + 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, + 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, + 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, + 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, + 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, + 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, + 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, + 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, + 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, + 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, + 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, + 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 +}; + +static inline uint16_t crc16(const char *buf, size_t len) { + size_t counter; + uint16_t crc = 0; + for (counter = 0; counter < len; counter++) + crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF]; + return crc; +} + +static uint32_t calc_hslot_crc16_cluster(const char *str, size_t length) +{ + uint32_t rv = (uint32_t) crc16(str, length) & MAX_CLUSTER_HSLOT; + return rv; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////// + +cluster_client::cluster_client(client_group* group) : client(group) +{ +} + +cluster_client::~cluster_client() { + for (unsigned int i = 0; i < m_key_index_pools.size(); i++) { + key_index_pool* key_idx_pool = m_key_index_pools[i]; + delete key_idx_pool; + } + m_key_index_pools.clear(); +} + +int cluster_client::connect(void) { + // get main connection + shard_connection* sc = MAIN_CONNECTION; + assert(sc != NULL); + + // set main connection to send 'CLUSTER SLOTS' command + sc->set_cluster_slots(); + + // create key index pool for main connection + key_index_pool* key_idx_pool = new key_index_pool; + m_key_index_pools.push_back(key_idx_pool); + assert(m_connections.size() == m_key_index_pools.size()); + + // continue with base class + client::connect(); + + return 0; +} + +void cluster_client::disconnect(void) +{ + unsigned int conn_size = m_connections.size(); + unsigned int i; + + // disconnect all connections + for (i = 0; i < m_connections.size(); i++) { + shard_connection* sc = m_connections[i]; + sc->disconnect(); + } + + // delete all connections except main connection + for (i = conn_size - 1; i > 0; i--) { + shard_connection* sc = m_connections.back(); + m_connections.pop_back(); + delete sc; + } +} + +shard_connection* cluster_client::add_shard_connection(char* address, char* port) { + shard_connection* sc = new shard_connection(m_connections.size(), this, + m_config, m_event_base, + MAIN_CONNECTION->get_protocol()); + m_connections.push_back(sc); + + // create key index pool + key_index_pool* key_idx_pool = new key_index_pool; + m_key_index_pools.push_back(key_idx_pool); + assert(m_connections.size() == m_key_index_pools.size()); + + // set which setup command required + sc->set_authentication(); + sc->set_select_db(); + sc->set_cluster_slots(); + + // save address and port + sc->set_address_port(address, port); + + // get address information + struct connect_info ci; + struct addrinfo *addr_info; + struct addrinfo hints; + int ret; + + memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_PASSIVE; + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = AF_INET; + + int res = getaddrinfo(address, port, &hints, &addr_info); + if (res != 0) { + benchmark_error_log("connect: resolve error: %s\n", gai_strerror(res)); + return NULL; + } + + ci.ci_family = addr_info->ai_family; + ci.ci_socktype = addr_info->ai_socktype; + ci.ci_protocol = addr_info->ai_protocol; + assert(addr_info->ai_addrlen <= sizeof(ci.addr_buf)); + memcpy(ci.addr_buf, addr_info->ai_addr, addr_info->ai_addrlen); + ci.ci_addr = (struct sockaddr *) ci.addr_buf; + ci.ci_addrlen = addr_info->ai_addrlen; + + // call connect + ret = sc->connect(&ci); + if (ret) + return NULL; + + return sc; +} + +void cluster_client::handle_cluster_slots(protocol_response *r) { + // run over response and create connections + for (unsigned int i=0; iget_mbulk_value()->mbulk_array.size(); i++) { + // create connection + mbulk_element* shard = r->get_mbulk_value()->mbulk_array[i]; + + int min_slot = strtol(shard->mbulk_array[0]->value, NULL, 10); + int max_slot = strtol(shard->mbulk_array[1]->value, NULL, 10); + + // hostname/ip + mbulk_element* mbulk_addr_el = shard->mbulk_array[2]->mbulk_array[0]; + char* addr = (char*) malloc(mbulk_addr_el->value_len + 1); + memcpy(addr, mbulk_addr_el->value, mbulk_addr_el->value_len); + addr[mbulk_addr_el->value_len] = '\0'; + + // port + mbulk_element* mbulk_port_el = shard->mbulk_array[2]->mbulk_array[1]; + char* port = (char*) malloc(mbulk_port_el->value_len + 1); + memcpy(port, mbulk_port_el->value, mbulk_port_el->value_len); + port[mbulk_port_el->value_len] = '\0'; + + // check if connection already exist + shard_connection* sc = NULL; + unsigned int j; + + for (j = 0; j < m_connections.size(); j++) { + if (strcmp(addr, m_connections[j]->get_address()) == 0 && + strcmp(port, m_connections[j]->get_port()) == 0) { + sc = m_connections[j]; + break; + } + } + + // if connection doesn't exist, add it + if (sc == NULL) { + sc = add_shard_connection(addr, port); + assert(sc != NULL); + } + + // update range + for (int j = min_slot; j <= max_slot; j++) { + m_slot_to_shard[j] = sc->get_id(); + } + + free(addr); + free(port); + } +} + +bool cluster_client::hold_pipeline(unsigned int conn_id) { + // don't exceed requests + if (m_config->requests) { + if (m_key_index_pools[conn_id]->empty() && + m_reqs_generated >= m_config->requests) { + return true; + } + } + + return false; +} + +bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index) { + // first check if we already have key in pool + if (!m_key_index_pools[conn_id]->empty()) { + *key_index = m_key_index_pools[conn_id]->front(); + m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index); + + m_key_index_pools[conn_id]->pop(); + return true; + } + + // keep generate key till it match for this connection, or requests reached + while (true) { + // generate key + *key_index = m_obj_gen->get_key_index(iter); + m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index); + + // check if the key match for this connection + unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len); + if (m_slot_to_shard[hslot] == conn_id) { + m_reqs_generated++; + return true; + } + + // store key for other connection, if queue is not full + key_index_pool* key_idx_pool = m_key_index_pools[m_slot_to_shard[hslot]]; + if (key_idx_pool->size() < KEY_INDEX_QUEUE_MAX_SIZE) { + key_idx_pool->push(*key_index); + m_reqs_generated++; + } + + // don't exceed requests + if (m_config->requests > 0 && m_reqs_generated >= m_config->requests) + return false; + } +} + +// This function could use some urgent TLC -- but we need to do it without altering the behavior +void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id) +{ + // If the Set:Wait ratio is not 0, start off with WAITs + if (m_config->wait_ratio.b && + (m_tot_wait_ops == 0 || + (m_tot_set_ops/m_tot_wait_ops > m_config->wait_ratio.a/m_config->wait_ratio.b))) { + + m_tot_wait_ops++; + + unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max); + unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min, + m_config->wait_timeout.max, 0, + ((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min); + + m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout); + m_reqs_generated++; + } + // are we set or get? this depends on the ratio + else if (m_set_ratio_count < m_config->ratio.a) { + // set command + unsigned long long key_index; + + // get key + if (!get_key_for_conn(conn_id, obj_iter_type(m_config, 0), &key_index)) { + return; + } + + // get value + unsigned int value_len; + const char *value = m_obj_gen->get_value(key_index, &value_len); + + m_connections[conn_id]->send_set_command(×tamp, m_key_buffer, m_key_len, + value, value_len, m_obj_gen->get_expiry(), + m_config->data_offset); + m_set_ratio_count++; + m_tot_set_ops++; + } else if (m_get_ratio_count < m_config->ratio.b) { + // get command + unsigned long long key_index; + + // get key + if (!get_key_for_conn(conn_id, obj_iter_type(m_config, 2), &key_index)) + return; + + m_connections[conn_id]->send_get_command(×tamp, m_key_buffer, m_key_len, m_config->data_offset); + m_get_ratio_count++; + } else { + // overlap counters + m_get_ratio_count = m_set_ratio_count = 0; + } +} + diff --git a/cluster_client.h b/cluster_client.h new file mode 100644 index 00000000..4c82c747 --- /dev/null +++ b/cluster_client.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2011-2017 Redis Labs Ltd. + * + * This file is part of memtier_benchmark. + * + * memtier_benchmark is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 2. + * + * memtier_benchmark is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with memtier_benchmark. If not, see . + */ + +#ifndef MEMTIER_BENCHMARK_CLUSTER_CLIENT_H +#define MEMTIER_BENCHMARK_CLUSTER_CLIENT_H + +#include +#include "client.h" + +typedef std::queue key_index_pool; + +// forward decleration +class shard_connection; + +class cluster_client : public client { +protected: + std::vector m_key_index_pools; + unsigned int m_slot_to_shard[16384]; + + char m_key_buffer[250]; + int m_key_len; + + virtual int connect(void); + virtual void disconnect(void); + + shard_connection* add_shard_connection(char* address, char* port); + bool get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index); + +public: + cluster_client(client_group* group); + virtual ~cluster_client(); + + // client manager api's + virtual void handle_cluster_slots(protocol_response *r); + virtual void create_request(struct timeval timestamp, unsigned int conn_id); + virtual bool hold_pipeline(unsigned int conn_id); +}; + + +#endif //MEMTIER_BENCHMARK_CLUSTER_CLIENT_H diff --git a/connections_manager.h b/connections_manager.h new file mode 100644 index 00000000..23640170 --- /dev/null +++ b/connections_manager.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2011-2017 Redis Labs Ltd. + * + * This file is part of memtier_benchmark. + * + * memtier_benchmark is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 2. + * + * memtier_benchmark is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with memtier_benchmark. If not, see . + */ + +#ifndef MEMTIER_BENCHMARK_CLIENT_DATA_MANAGER_H +#define MEMTIER_BENCHMARK_CLIENT_DATA_MANAGER_H + +class connections_manager { +public: + virtual unsigned int get_reqs_processed(void) = 0; + virtual void inc_reqs_processed(void) = 0; + virtual unsigned int get_reqs_generated(void) = 0; + virtual void inc_reqs_generated(void) = 0; + virtual bool finished(void) = 0; + + virtual void set_start_time(void) = 0; + virtual void set_end_time(void) = 0; + + virtual void handle_cluster_slots(protocol_response *r) = 0; + virtual void handle_response(struct timeval timestamp, request *request, protocol_response *response) = 0; + + virtual void create_request(struct timeval timestamp, unsigned int conn_id) = 0; + virtual bool hold_pipeline(unsigned int conn_id) = 0; + + virtual int connect(void) = 0; + virtual void disconnect(void) = 0; +}; + + +#endif //MEMTIER_BENCHMARK_CLIENT_DATA_MANAGER_H diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index 76c5fa03..ffb0aab5 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -261,6 +261,24 @@ static int generate_random_seed() return (int)time(NULL)^getpid()^R; } +static bool verify_cluster_option(struct benchmark_config *cfg) { + if (cfg->reconnect_interval) { + fprintf(stderr, "error: cluster mode dose not support reconnect-interval option.\n"); + return false; + } else if (cfg->multi_key_get) { + fprintf(stderr, "error: cluster mode dose not support multi-key-get option.\n"); + return false; + } else if (cfg->wait_ratio.is_defined()) { + fprintf(stderr, "error: cluster mode dose not support wait-ratio option.\n"); + return false; + } else if (cfg->protocol && strcmp(cfg->protocol, "redis")) { + fprintf(stderr, "error: cluster mode supported only in redis protocol.\n"); + return false; + } + + return true; +} + static int config_parse_args(int argc, char *argv[], struct benchmark_config *cfg) { enum extended_options { @@ -294,7 +312,8 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_wait_ratio, o_num_slaves, o_wait_timeout, - o_json_out_file + o_json_out_file, + o_cluster_mode }; static struct option long_options[] = { @@ -342,6 +361,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "num-slaves", 1, 0, o_num_slaves }, { "wait-timeout", 1, 0, o_wait_timeout }, { "json-out-file", 1, 0, o_json_out_file }, + { "cluster-mode", 0, 0, o_cluster_mode }, { "help", 0, 0, 'h' }, { "version", 0, 0, 'v' }, { NULL, 0, 0, 0 } @@ -635,12 +655,18 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf case o_json_out_file: cfg->json_out_file = optarg; break; + case o_cluster_mode: + cfg->cluster_mode = true; + break; default: return -1; break; } } + if (cfg->cluster_mode && !verify_cluster_option(cfg)) + return -1; + return 0; } @@ -662,6 +688,7 @@ void usage() { " --json-out-file=FILE Name of JSON output file, if not set, will not print to json\n" " --show-config Print detailed configuration before running\n" " --hide-histogram Don't print detailed latency histogram\n" + " --cluster-mode Run client in cluster mode\n" " --help Display this help\n" " --version Display version information\n" "\n" diff --git a/memtier_benchmark.h b/memtier_benchmark.h index a30464cb..ac87be40 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -81,6 +81,7 @@ struct benchmark_config { config_range wait_timeout; // JSON additions const char *json_out_file; + bool cluster_mode; }; diff --git a/obj_gen.cpp b/obj_gen.cpp index e717f7c6..3c1de290 100644 --- a/obj_gen.cpp +++ b/obj_gen.cpp @@ -391,7 +391,6 @@ const char* object_generator::get_key(int iter, unsigned int *len) return m_key_buffer; } - data_object* object_generator::get_object(int iter) { // compute key @@ -436,6 +435,50 @@ data_object* object_generator::get_object(int iter) return &m_object; } +const char* object_generator::get_key_prefix() { + return m_key_prefix; +} + +const char* object_generator::get_value(unsigned long long key_index, unsigned int *len) { + // compute size + unsigned int new_size = 0; + if (m_data_size_type == data_size_fixed) { + new_size = m_data_size.size_fixed; + } else if (m_data_size_type == data_size_range) { + if (m_data_size_pattern && *m_data_size_pattern=='S') { + double a = (key_index-m_key_min)/static_cast(m_key_max-m_key_min); + new_size = (m_data_size.size_range.size_max-m_data_size.size_range.size_min)*a + m_data_size.size_range.size_min; + } else { + new_size = random_range(m_data_size.size_range.size_min > 0 ? m_data_size.size_range.size_min : 1, + m_data_size.size_range.size_max); + } + } else if (m_data_size_type == data_size_weighted) { + new_size = m_data_size.size_list->get_next_size(); + } else { + assert(0); + } + + // modify object content in case of random data + if (m_random_data) { + m_value_buffer[m_value_buffer_mutation_pos++]++; + if (m_value_buffer_mutation_pos >= m_value_buffer_size) + m_value_buffer_mutation_pos = 0; + } + + *len = new_size; + return m_value_buffer; +} + +unsigned int object_generator::get_expiry() { + // compute expiry + unsigned int expiry = 0; + if (m_expiry_max > 0) { + expiry = random_range(m_expiry_min, m_expiry_max); + } + + return expiry; +} + /////////////////////////////////////////////////////////////////////////// data_object::data_object() : diff --git a/obj_gen.h b/obj_gen.h index 2e3ac786..78cf9d72 100644 --- a/obj_gen.h +++ b/obj_gen.h @@ -113,7 +113,6 @@ class object_generator { void alloc_value_buffer(void); void alloc_value_buffer(const char* copy_from); void random_init(void); - unsigned long long get_key_index(int iter); public: object_generator(); object_generator(const object_generator& copy); @@ -134,8 +133,13 @@ class object_generator { void set_key_distribution(double key_stddev, double key_median); void set_random_seed(int seed); + unsigned long long get_key_index(int iter); virtual const char* get_key(int iter, unsigned int *len); virtual data_object* get_object(int iter); + + const char * get_key_prefix(); + const char* get_value(unsigned long long key_index, unsigned int *len); + unsigned int get_expiry(); }; class imported_keylist; diff --git a/protocol.cpp b/protocol.cpp index a218a90b..5403ead3 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -56,7 +56,7 @@ void abstract_protocol::set_keep_value(bool flag) ///////////////////////////////////////////////////////////////////////// protocol_response::protocol_response() - : m_status(NULL), m_value(NULL), m_value_len(0), m_hits(0), m_error(false) + : m_status(NULL), m_value(NULL), m_mbulk_value(NULL), m_value_len(0), m_hits(0), m_error(false) { } @@ -133,25 +133,61 @@ void protocol_response::clear(void) free((void *)m_value); m_value = NULL; } + if (m_mbulk_value != NULL) { + m_mbulk_value->free_mbulk(); + free((void *)m_mbulk_value); + m_mbulk_value = NULL; + } m_value_len = 0; m_total_len = 0; m_hits = 0; m_error = 0; } +void protocol_response::set_mbulk_value(mbulk_element* element) { + m_mbulk_value = element; +} + +void protocol_response::add_mbulk_array(mbulk_level_array* array, mbulk_element* element, int count) { + add_mbulk_value(array, element); + + // create new nested level + mbulk_level* new_level = new mbulk_level(count, element); + array->push_back(new_level); +} + +void protocol_response::add_mbulk_value(mbulk_level_array* array, mbulk_element* element) { + mbulk_level* cur_level = array->back(); + + // insert new object into current nested mbulk + cur_level->mbulk->mbulk_array.push_back(element); + + // update current mbulk + cur_level->mbulk_count--; + + if (cur_level->mbulk_count == 0) + array->pop_back(); +} + +const mbulk_element* protocol_response::get_mbulk_value() { + return m_mbulk_value; +} + ///////////////////////////////////////////////////////////////////////// class redis_protocol : public abstract_protocol { protected: - enum response_state { rs_initial, rs_read_bulk }; + enum response_state { rs_initial, rs_read_bulk, rs_read_mbulk }; response_state m_response_state; unsigned int m_bulk_len; size_t m_response_len; + mbulk_level_array current_mbulk_level; public: redis_protocol() : m_response_state(rs_initial), m_bulk_len(0), m_response_len(0) { } virtual redis_protocol* clone(void) { return new redis_protocol(); } virtual int select_db(int db); virtual int authenticate(const char *credentials); + virtual int write_command_cluster_slots(); virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); virtual int write_command_get(const char *key, int key_len, unsigned int offset); virtual int write_command_multi_get(const keylist *keylist); @@ -190,6 +226,21 @@ int redis_protocol::authenticate(const char *credentials) return size; } +int redis_protocol::write_command_cluster_slots() +{ + int size = 0; + + size = evbuffer_add(m_write_buf, + "*2\r\n" + "$7\r\n" + "CLUSTER\r\n" + "$5\r\n" + "SLOTS\r\n", + 28); + + return size; +} + int redis_protocol::write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset) { assert(key != NULL); @@ -336,19 +387,30 @@ int redis_protocol::parse_response(void) if (line == NULL) return 0; // maybe we didn't get it yet? m_response_len += 2; // count CRLF - - // todo: support multi-bulk reply - if (line[0] == '*') { - benchmark_debug_log("multi-bulk replies not currently supported.\n"); - free(line); - return -1; - } // clear last response m_last_response.clear(); - // bulk? - if (line[0] == '$') { + if (line[0] == '*') { + int count = strtol(line + 1, NULL, 10); + if (count == -1) { + m_last_response.set_status(line); + return 1; + } + + // start new mbulk response + current_mbulk_level.clear(); + + mbulk_element* mbulk = new mbulk_element(); + m_last_response.set_mbulk_value(mbulk); + + // update mbulk level + mbulk_level* new_level = new mbulk_level(count, mbulk); + current_mbulk_level.push_back(new_level); + + m_response_state = rs_read_mbulk; + m_last_response.set_status(line); + } else if (line[0] == '$') { int len = strtol(line + 1, NULL, 10); if (len == -1) { m_last_response.set_status(line); @@ -371,6 +433,63 @@ int redis_protocol::parse_response(void) return -1; } + break; + case rs_read_mbulk: + line = evbuffer_readln(m_read_buf, &m_response_len, EVBUFFER_EOL_CRLF_STRICT); + if (line == NULL) + return 0; + + if (line[0] == '*') { + int count = strtol(line + 1, NULL, 10); + + // add new mbulk array + mbulk_element *mbulk = new mbulk_element(); + m_last_response.add_mbulk_array(¤t_mbulk_level, mbulk, count); + + m_response_state = rs_read_mbulk; + m_last_response.set_status(line); + + } else if (line[0] == ':') { + char *bulk_value = strdup(line + 1); + assert(bulk_value != NULL); + + // add new mbulk value + mbulk_element* mbulk = new mbulk_element(bulk_value, strlen(bulk_value)); + m_last_response.add_mbulk_value(¤t_mbulk_level, mbulk); + + // check if we got all mbulk + if (current_mbulk_level.size() == 0) { + m_response_state = rs_initial; + return 1; + } + + } else if (line[0] == '$') { + int len = strtol(line + 1, NULL, 10); + m_bulk_len = (unsigned int) len; + + if (evbuffer_get_length(m_read_buf) >= m_bulk_len + 2) { + char *bulk_value = (char *) malloc(m_bulk_len); + assert(bulk_value != NULL); + + int ret = evbuffer_remove(m_read_buf, bulk_value, m_bulk_len); + assert(ret != -1); + + // drain CRLF + ret = evbuffer_drain(m_read_buf, 2); + assert(ret != -1); + + // create new mbulk array + mbulk_element* obj = new mbulk_element(bulk_value, m_bulk_len); + m_last_response.add_mbulk_value(¤t_mbulk_level, obj); + + if (current_mbulk_level.size() == 0) { + m_response_state = rs_initial; + return 1; + } + } else { + return 0; + } + } break; case rs_read_bulk: if (evbuffer_get_length(m_read_buf) >= m_bulk_len + 2) { @@ -421,6 +540,7 @@ class memcache_text_protocol : public abstract_protocol { virtual memcache_text_protocol* clone(void) { return new memcache_text_protocol(); } virtual int select_db(int db); virtual int authenticate(const char *credentials); + virtual int write_command_cluster_slots(); virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); virtual int write_command_get(const char *key, int key_len, unsigned int offset); virtual int write_command_multi_get(const keylist *keylist); @@ -438,6 +558,11 @@ int memcache_text_protocol::authenticate(const char *credentials) assert(0); } +int memcache_text_protocol::write_command_cluster_slots() +{ + assert(0); +} + int memcache_text_protocol::write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset) { assert(key != NULL); @@ -613,6 +738,7 @@ class memcache_binary_protocol : public abstract_protocol { virtual memcache_binary_protocol* clone(void) { return new memcache_binary_protocol(); } virtual int select_db(int db); virtual int authenticate(const char *credentials); + virtual int write_command_cluster_slots(); virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); virtual int write_command_get(const char *key, int key_len, unsigned int offset); virtual int write_command_multi_get(const keylist *keylist); @@ -662,6 +788,11 @@ int memcache_binary_protocol::authenticate(const char *credentials) return sizeof(req) + user_len + passwd_len + 2 + sizeof(mechanism) - 1; } +int memcache_binary_protocol::write_command_cluster_slots() +{ + assert(0); +} + int memcache_binary_protocol::write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset) { assert(key != NULL); diff --git a/protocol.h b/protocol.h index 61dd91d9..7e5f1f8c 100644 --- a/protocol.h +++ b/protocol.h @@ -20,36 +20,74 @@ #define _PROTOCOL_H #include +#include + +struct mbulk_element { + mbulk_element() : value(NULL), value_len(0) {;} + mbulk_element(char* val, unsigned int len) : value(val), value_len(len) {;} + + void free_mbulk() { + if (value != NULL) { + free((void *) value); + } else { + for (unsigned int i=0; ifree_mbulk(); + free((void *)mbulk_array[i]); + } + mbulk_array.clear(); + } + } + + char* value; + unsigned int value_len; + + std::vector mbulk_array; +}; + +struct mbulk_level { + mbulk_level (int count, mbulk_element* mbulk_el) : mbulk_count(count), mbulk(mbulk_el) {;} + + unsigned int mbulk_count; + mbulk_element* mbulk; +}; + +typedef std::vector mbulk_level_array; class protocol_response { protected: const char *m_status; const char *m_value; + mbulk_element *m_mbulk_value; unsigned int m_value_len; unsigned int m_total_len; unsigned int m_hits; bool m_error; public: - protocol_response(); - virtual ~protocol_response(); + protocol_response(); + virtual ~protocol_response(); + + void set_status(const char *status); + const char *get_status(void); + + void set_error(bool error); + bool is_error(void); - void set_status(const char *status); - const char *get_status(void); + void set_value(const char *value, unsigned int value_len); + const char *get_value(unsigned int *value_len); - void set_error(bool error); - bool is_error(void); + void set_total_len(unsigned int total_len); + unsigned int get_total_len(void); - void set_value(const char *value, unsigned int value_len); - const char *get_value(unsigned int *value_len); - - void set_total_len(unsigned int total_len); - unsigned int get_total_len(void); + void incr_hits(void); + unsigned int get_hits(void); - void incr_hits(void); - unsigned int get_hits(void); + void clear(); - void clear(); + void set_mbulk_value(mbulk_element* element); + void add_mbulk_array(mbulk_level_array* mbulk_level, mbulk_element* element, int count); + void add_mbulk_value(mbulk_level_array* mbulk_level, mbulk_element* element); + const mbulk_element* get_mbulk_value(); }; class keylist { @@ -94,6 +132,7 @@ class abstract_protocol { virtual int select_db(int db) = 0; virtual int authenticate(const char *credentials) = 0; + virtual int write_command_cluster_slots() = 0; virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset) = 0; virtual int write_command_get(const char *key, int key_len, unsigned int offset) = 0; virtual int write_command_multi_get(const keylist *keylist) = 0; diff --git a/shard_connection.cpp b/shard_connection.cpp new file mode 100644 index 00000000..37396c3c --- /dev/null +++ b/shard_connection.cpp @@ -0,0 +1,606 @@ +/* + * Copyright (C) 2011-2017 Redis Labs Ltd. + * + * This file is part of memtier_benchmark. + * + * memtier_benchmark is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 2. + * + * memtier_benchmark is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with memtier_benchmark. If not, see . + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_FCNTL_H +#include +#endif +#include +#include +#include +#include +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif +#ifdef HAVE_LIMITS_H +#include +#endif + +#ifdef HAVE_ASSERT_H +#include +#endif + +#include "shard_connection.h" +#include "obj_gen.h" +#include "memtier_benchmark.h" +#include "connections_manager.h" + +void cluster_client_event_handler(evutil_socket_t sfd, short evtype, void *opaque) +{ + shard_connection *sc = (shard_connection *) opaque; + + assert(sc != NULL); + assert(sc->m_sockfd == sfd); + + sc->handle_event(evtype); +} + +request::request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys) + : m_type(type), m_size(size), m_keys(keys) +{ + if (sent_time != NULL) + m_sent_time = *sent_time; + else { + gettimeofday(&m_sent_time, NULL); + } +} + +verify_request::verify_request(request_type type, + unsigned int size, + struct timeval* sent_time, + unsigned int keys, + const char *key, + unsigned int key_len, + const char *value, + unsigned int value_len) : + request(type, size, sent_time, keys), + m_key(NULL), m_key_len(0), + m_value(NULL), m_value_len(0) +{ + m_key_len = key_len; + m_key = (char *)malloc(key_len); + memcpy(m_key, key, m_key_len); + + m_value_len = value_len; + m_value = (char *)malloc(value_len); + memcpy(m_value, value, m_value_len); +} + +verify_request::~verify_request(void) +{ + if (m_key != NULL) { + free((void *) m_key); + m_key = NULL; + } + if (m_value != NULL) { + free((void *) m_value); + m_value = NULL; + } +} + +shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config, + struct event_base* event_base, abstract_protocol* abs_protocol) : + m_sockfd(-1), m_unix_sockaddr(NULL), m_event(NULL), m_pending_resp(0), m_connected(false), + m_authentication(auth_done), m_db_selection(select_done), m_cluster_slots(slots_done) { + m_id = id; + m_conns_manager = conns_man; + m_config = config; + m_event_base = event_base; + + if (m_config->unix_socket) { + m_unix_sockaddr = (struct sockaddr_un *) malloc(sizeof(struct sockaddr_un)); + assert(m_unix_sockaddr != NULL); + + m_unix_sockaddr->sun_family = AF_UNIX; + strncpy(m_unix_sockaddr->sun_path, m_config->unix_socket, sizeof(m_unix_sockaddr->sun_path)-1); + m_unix_sockaddr->sun_path[sizeof(m_unix_sockaddr->sun_path)-1] = '\0'; + } + + m_read_buf = evbuffer_new(); + assert(m_read_buf != NULL); + + m_write_buf = evbuffer_new(); + assert(m_write_buf != NULL); + + m_protocol = abs_protocol->clone(); + assert(m_protocol != NULL); + m_protocol->set_buffers(m_read_buf, m_write_buf); + + m_pipeline = new std::queue; + assert(m_pipeline != NULL); +} + +shard_connection::~shard_connection() { + if (m_sockfd != -1) { + close(m_sockfd); + m_sockfd = -1; + } + + if (m_address != NULL) { + free(m_address); + m_address = NULL; + } + + if (m_port != NULL) { + free(m_port); + m_port = NULL; + } + + if (m_unix_sockaddr != NULL) { + free(m_unix_sockaddr); + m_unix_sockaddr = NULL; + } + + if (m_read_buf != NULL) { + evbuffer_free(m_read_buf); + m_read_buf = NULL; + } + + if (m_write_buf != NULL) { + evbuffer_free(m_write_buf); + m_write_buf = NULL; + } + + if (m_event != NULL) { + event_free(m_event); + m_event = NULL; + } + + if (m_protocol != NULL) { + delete m_protocol; + m_protocol = NULL; + } + + if (m_pipeline != NULL) { + delete m_pipeline; + m_pipeline = NULL; + } +} + +void shard_connection::setup_event() { + int ret; + + if (!m_event) { + m_event = event_new(m_event_base, m_sockfd, EV_WRITE, + cluster_client_event_handler, (void *)this); + assert(m_event != NULL); + } else { + ret = event_del(m_event); + assert(ret ==0); + + ret = event_assign(m_event, m_event_base, m_sockfd, EV_WRITE, + cluster_client_event_handler, (void *)this); + assert(ret ==0); + } + + ret = event_add(m_event, NULL); + assert(ret == 0); +} + +int shard_connection::setup_socket(struct connect_info* addr) { + int flags; + + // clean up existing socket + if (m_sockfd != -1) + close(m_sockfd); + + if (m_unix_sockaddr != NULL) { + m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (m_sockfd < 0) { + return -errno; + } + } else { + // initialize socket + m_sockfd = socket(addr->ci_family, addr->ci_socktype, addr->ci_protocol); + if (m_sockfd < 0) { + return -errno; + } + + // configure socket behavior + struct linger ling = {0, 0}; + int flags = 1; + int error = setsockopt(m_sockfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags, sizeof(flags)); + assert(error == 0); + + error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *) &ling, sizeof(ling)); + assert(error == 0); + + error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags, sizeof(flags)); + assert(error == 0); + } + + // set non-blocking behavior + flags = 1; + if ((flags = fcntl(m_sockfd, F_GETFL, 0)) < 0 || + fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK) < 0) { + benchmark_error_log("connect: failed to set non-blocking flag.\n"); + close(m_sockfd); + m_sockfd = -1; + return -1; + } + + return 0; +} + +int shard_connection::connect(struct connect_info* addr) { + // set required setup commands + m_authentication = m_config->authenticate ? auth_none : auth_done; + m_db_selection = m_config->select_db ? select_none : select_done; + + // clean up existing buffers + evbuffer_drain(m_read_buf, evbuffer_get_length(m_read_buf)); + evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf)); + + // setup socket + setup_socket(addr); + + // set up event + setup_event(); + + // call connect + if (::connect(m_sockfd, + m_unix_sockaddr ? (struct sockaddr *) m_unix_sockaddr : addr->ci_addr, + m_unix_sockaddr ? sizeof(struct sockaddr_un) : addr->ci_addrlen) == -1) { + if (errno == EINPROGRESS || errno == EWOULDBLOCK) + return 0; + benchmark_error_log("connect failed, error = %s\n", strerror(errno)); + return -1; + } + + return 0; +} + +void shard_connection::disconnect() { + if (m_sockfd != -1) { + close(m_sockfd); + m_sockfd = -1; + } + + evbuffer_drain(m_read_buf, evbuffer_get_length(m_read_buf)); + evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf)); + + int ret = event_del(m_event); + assert(ret == 0); + + m_connected = false; + + // by default no need to send any setup request + m_authentication = auth_done; + m_db_selection = select_done; + m_cluster_slots = slots_done; +} + +void shard_connection::set_address_port(const char* address, const char* port) { + m_address = strdup(address); + m_port = strdup(port); +} + +request* shard_connection::pop_req() { + request* req = m_pipeline->front(); + m_pipeline->pop(); + + m_pending_resp--; + assert(m_pending_resp >= 0); + + return req; +} + +void shard_connection::push_req(request* req) { + m_pipeline->push(req); + m_pending_resp++; +} + +bool shard_connection::is_conn_setup_done() { + return m_authentication == auth_done && + m_db_selection == select_done && + m_cluster_slots == slots_done; +} + +void shard_connection::send_conn_setup_commands(struct timeval timestamp) { + if (m_authentication == auth_none) { + benchmark_debug_log("sending authentication command.\n"); + m_protocol->authenticate(m_config->authenticate); + push_req(new request(rt_auth, 0, ×tamp, 0)); + m_authentication = auth_sent; + } + + if (m_db_selection == select_none) { + benchmark_debug_log("sending db selection command.\n"); + m_protocol->select_db(m_config->select_db); + push_req(new request(rt_select_db, 0, ×tamp, 0)); + m_db_selection = select_sent; + } + + if (m_cluster_slots == slots_none) { + benchmark_debug_log("sending cluster slots command.\n"); + m_protocol->write_command_cluster_slots(); + push_req(new request(rt_cluster_slots, 0, ×tamp, 0)); + m_cluster_slots = slots_sent; + } +} + +void shard_connection::process_response(void) +{ + int ret; + bool responses_handled = false; + + struct timeval now; + gettimeofday(&now, NULL); + + while ((ret = m_protocol->parse_response()) > 0) { + bool error = false; + protocol_response *r = m_protocol->get_response(); + + request* req = pop_req(); + + if (req->m_type == rt_auth) { + if (r->is_error()) { + benchmark_error_log("error: authentication failed [%s]\n", r->get_status()); + error = true; + } else { + m_authentication = auth_done; + benchmark_debug_log("authentication successful.\n"); + } + } else if (req->m_type == rt_select_db) { + if (strcmp(r->get_status(), "+OK") != 0) { + benchmark_error_log("database selection failed.\n"); + error = true; + } else { + benchmark_debug_log("database selection successful.\n"); + m_db_selection = select_done; + } + } else if (req->m_type == rt_cluster_slots) { + if (r->get_mbulk_value() == NULL || r->get_mbulk_value()->mbulk_array.size() == 0) { + benchmark_error_log("cluster slot failed.\n"); + error = true; + } else { + // parse response + m_conns_manager->handle_cluster_slots(r); + + m_cluster_slots = slots_done; + benchmark_debug_log("cluster slot command successful\n"); + } + } else { + benchmark_debug_log("handled response (first line): %s, %d hits, %d misses\n", + r->get_status(), + r->get_hits(), + req->m_keys - r->get_hits()); + + if (r->is_error()) { + benchmark_error_log("error response: %s\n", r->get_status()); + } + + m_conns_manager->handle_response(now, req, r); + m_conns_manager->inc_reqs_processed(); + responses_handled = true; + } + delete req; + if (error) { + return; + } + } + + if (ret == -1) { + benchmark_error_log("error: response parsing failed.\n"); + } + + if (m_config->reconnect_interval > 0 && responses_handled) { + if ((m_conns_manager->get_reqs_processed() % m_config->reconnect_interval) == 0) { + assert(m_pipeline->size() == 0); + benchmark_debug_log("reconnecting, m_reqs_processed = %u\n", m_conns_manager->get_reqs_processed()); + + // client manage connection & disconnection of shard + m_conns_manager->disconnect(); + ret = m_conns_manager->connect(); + assert(ret == 0); + + return; + } + } + + fill_pipeline(); +} + +void shard_connection::process_first_request() { + m_conns_manager->set_start_time(); + fill_pipeline(); +} + +void shard_connection::fill_pipeline(void) +{ + struct timeval now; + gettimeofday(&now, NULL); + + while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) { + if (!is_conn_setup_done()) { + send_conn_setup_commands(now); + return; + } + + // don't exceed requests + if (m_conns_manager->hold_pipeline(m_id)) + break; + + // client manage requests logic + m_conns_manager->create_request(now, m_id); + } +} + +void shard_connection::handle_event(short evtype) +{ + // connect() returning to us? normally we expect EV_WRITE, but for UNIX domain + // sockets we workaround since connect() returned immediately, but we don't want + // to do any I/O from the client::connect() call... + if (!m_connected && (evtype == EV_WRITE || m_unix_sockaddr != NULL)) { + int error = -1; + socklen_t errsz = sizeof(error); + + if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, (void *) &error, &errsz) == -1) { + benchmark_error_log("connect: error getting connect response (getsockopt): %s\n", strerror(errno)); + return; + } + + if (error != 0) { + benchmark_error_log("connect: connection failed: %s\n", strerror(error)); + return; + } + + m_connected = true; + if (!m_conns_manager->get_reqs_processed()) { + process_first_request(); + } else { + benchmark_debug_log("reconnection complete, proceeding with test\n"); + fill_pipeline(); + } + } + + assert(m_connected == true); + if ((evtype & EV_WRITE) == EV_WRITE && evbuffer_get_length(m_write_buf) > 0) { + if (evbuffer_write(m_write_buf, m_sockfd) < 0) { + if (errno != EWOULDBLOCK) { + benchmark_error_log("write error: %s\n", strerror(errno)); + disconnect(); + + return; + } + } + } + + if ((evtype & EV_READ) == EV_READ) { + int ret = 1; + while (ret > 0) { + ret = evbuffer_read(m_read_buf, m_sockfd, -1); + } + + if (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + benchmark_error_log("read error: %s\n", strerror(errno)); + disconnect(); + + return; + } + if (ret == 0) { + benchmark_error_log("connection dropped.\n"); + disconnect(); + + return; + } + + if (evbuffer_get_length(m_read_buf) > 0) { + process_response(); + + // process_response may have disconnected, in which case + // we just abort and wait for libevent to call us back sometime + if (!m_connected) { + return; + } + + } + } + + // update event + short new_evtype = 0; + if (m_pending_resp) { + new_evtype = EV_READ; + } + + if (evbuffer_get_length(m_write_buf) > 0) { + assert(!m_conns_manager->finished()); + new_evtype |= EV_WRITE; + } + + if (new_evtype) { + int ret = event_assign(m_event, m_event_base, + m_sockfd, new_evtype, cluster_client_event_handler, (void *)this); + assert(ret == 0); + + ret = event_add(m_event, NULL); + assert(ret == 0); + } else if (m_conns_manager->finished()) { + m_conns_manager->set_end_time(); + } +} + +void shard_connection::send_wait_command(struct timeval* sent_time, + unsigned int num_slaves, unsigned int timeout) { + int cmd_size = 0; + + benchmark_debug_log("WAIT num_slaves=%u timeout=%u\n", num_slaves, timeout); + + cmd_size = m_protocol->write_command_wait(num_slaves, timeout); + push_req(new request(rt_wait, cmd_size, sent_time, 0)); +} + +void shard_connection::send_set_command(struct timeval* sent_time, const char *key, int key_len, + const char *value, int value_len, int expiry, unsigned int offset) { + int cmd_size = 0; + + benchmark_debug_log("SET key=[%.*s] value_len=%u expiry=%u\n", + key_len, key, value_len, expiry); + + cmd_size = m_protocol->write_command_set(key, key_len, value, value_len, + expiry, offset); + + push_req(new request(rt_set, cmd_size, sent_time, 1)); +} + +void shard_connection::send_get_command(struct timeval* sent_time, + const char *key, int key_len, unsigned int offset) { + int cmd_size = 0; + + benchmark_debug_log("GET key=[%.*s]\n", key_len, key); + cmd_size = m_protocol->write_command_get(key, key_len, offset); + + push_req(new request(rt_get, cmd_size, sent_time, 1)); +} + +void shard_connection::send_mget_command(struct timeval* sent_time, const keylist* key_list) { + int cmd_size = 0; + + const char *first_key, *last_key; + unsigned int first_key_len, last_key_len; + first_key = key_list->get_key(0, &first_key_len); + last_key = key_list->get_key(key_list->get_keys_count()-1, &last_key_len); + + benchmark_debug_log("MGET %d keys [%.*s] .. [%.*s]\n", + key_list->get_keys_count(), first_key_len, first_key, last_key_len, last_key); + + cmd_size = m_protocol->write_command_multi_get(key_list); + + push_req(new request(rt_get, cmd_size, sent_time, key_list->get_keys_count())); +} + +void shard_connection::send_verify_get_command(struct timeval* sent_time, const char *key, int key_len, + const char *value, int value_len, int expiry, unsigned int offset) { + int cmd_size = 0; + + benchmark_debug_log("GET key=[%.*s] value_len=%u expiry=%u\n", + key_len, key, value_len, expiry); + + cmd_size = m_protocol->write_command_get(key, key_len, offset); + + push_req(new verify_request(rt_get, cmd_size, sent_time, 1, key, key_len, value, value_len)); +} diff --git a/shard_connection.h b/shard_connection.h new file mode 100644 index 00000000..4c62a961 --- /dev/null +++ b/shard_connection.h @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2011-2017 Redis Labs Ltd. + * + * This file is part of memtier_benchmark. + * + * memtier_benchmark is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 2. + * + * memtier_benchmark is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with memtier_benchmark. If not, see . + */ + +#ifndef MEMTIER_BENCHMARK_SHARD_CONNECTION_H +#define MEMTIER_BENCHMARK_SHARD_CONNECTION_H + +#include +#include +#include +#include +#include +#include + +#include "protocol.h" + +// forward decleration +class connections_manager; +struct benchmark_config; +class abstract_protocol; +class object_generator; + +enum authentication_state { auth_none, auth_sent, auth_done }; +enum select_db_state { select_none, select_sent, select_done }; +enum cluster_slots_state { slots_none, slots_sent, slots_done }; + +enum request_type { rt_unknown, rt_set, rt_get, rt_wait, rt_auth, rt_select_db, rt_cluster_slots }; +struct request { + request_type m_type; + struct timeval m_sent_time; + unsigned int m_size; + unsigned int m_keys; + + request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys); + virtual ~request(void) {} +}; + +struct verify_request : public request { + char *m_key; + unsigned int m_key_len; + char *m_value; + unsigned int m_value_len; + + verify_request(request_type type, + unsigned int size, + struct timeval* sent_time, + unsigned int keys, + const char *key, + unsigned int key_len, + const char *value, + unsigned int value_len); + virtual ~verify_request(void); +}; + +class shard_connection { + friend void cluster_client_event_handler(evutil_socket_t sfd, short evtype, void *opaque); + +public: + shard_connection(unsigned int id, connections_manager* conn_man, benchmark_config* config, + struct event_base* event_base, abstract_protocol* abs_protocol); + ~shard_connection(); + + void set_address_port(const char* address, const char* port); + int connect(struct connect_info* addr); + void disconnect(); + + void send_wait_command(struct timeval* sent_time, + unsigned int num_slaves, unsigned int timeout); + void send_set_command(struct timeval* sent_time, const char *key, int key_len, + const char *value, int value_len, int expiry, unsigned int offset); + void send_get_command(struct timeval* sent_time, + const char *key, int key_len, unsigned int offset); + void send_mget_command(struct timeval* sent_time, const keylist* key_list); + void send_verify_get_command(struct timeval* sent_time, const char *key, int key_len, + const char *value, int value_len, int expiry, unsigned int offset); + + void set_authentication() { + m_authentication = auth_none; + } + + void set_select_db() { + m_db_selection = select_none; + } + + void set_cluster_slots() { + m_cluster_slots = slots_none; + } + + unsigned int get_id() { + return m_id; + } + + abstract_protocol* get_protocol() { + return m_protocol; + } + + const char* get_address() { + return m_address; + } + + const char* get_port() { + return m_port; + } + + +private: + void setup_event(); + int setup_socket(struct connect_info* addr); + + bool is_conn_setup_done(); + void send_conn_setup_commands(struct timeval timestamp); + + request* pop_req(); + void push_req(request* req); + + void process_response(void); + void process_first_request(); + void fill_pipeline(void); + + void handle_event(short evtype); + + unsigned int m_id; + connections_manager* m_conns_manager; + benchmark_config* m_config; + + int m_sockfd; + char* m_address; + char* m_port; + struct sockaddr_un* m_unix_sockaddr; + + struct evbuffer* m_read_buf; + struct evbuffer* m_write_buf; + + struct event_base* m_event_base; + struct event* m_event; + + abstract_protocol* m_protocol; + std::queue* m_pipeline; + + int m_pending_resp; + bool m_connected; + + enum authentication_state m_authentication; + enum select_db_state m_db_selection; + enum cluster_slots_state m_cluster_slots; +}; + +#endif //MEMTIER_BENCHMARK_SHARD_CONNECTION_H