Skip to content

Commit

Permalink
Read replica implementation
Browse files Browse the repository at this point in the history
Change-Id: I51ca76ec9f6209937fd8ee71546c4d57998b90ad
Reviewed-on: http://review.couchbase.org/18291
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
  • Loading branch information
avsej committed Jul 13, 2012
1 parent d8c6b7f commit a6928d0
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -13,6 +13,8 @@
*.obj
*.orig
*.profile
*.swp
*.swo
*~
._*
.deps/
Expand Down
16 changes: 16 additions & 0 deletions include/libcouchbase/couchbase.h
Expand Up @@ -344,6 +344,22 @@ extern "C" {
libcouchbase_size_t nkey,
libcouchbase_cas_t cas);

LIBCOUCHBASE_API
libcouchbase_error_t libcouchbase_get_replica_by_key(libcouchbase_t instance,
const void *command_cookie,
const void *hashkey,
libcouchbase_size_t nhashkey,
libcouchbase_size_t num_keys,
const void *const *keys,
const libcouchbase_size_t *nkey);

LIBCOUCHBASE_API
libcouchbase_error_t libcouchbase_get_replica(libcouchbase_t instance,
const void *command_cookie,
libcouchbase_size_t num_keys,
const void *const *keys,
const libcouchbase_size_t *nkey);

/**
* Touch (set expiration time) on a number of values in the cache
* You need to run the event loop yourself (or call
Expand Down
6 changes: 3 additions & 3 deletions src/event.c
Expand Up @@ -118,6 +118,7 @@ static int parse_single(libcouchbase_server_t *c, hrtime_t stop)
}
return -1;
}
ct.vbucket = ntohs(req.request.vbucket);

switch (header.response.magic) {
case PROTOCOL_BINARY_REQ:
Expand All @@ -132,14 +133,13 @@ static int parse_single(libcouchbase_server_t *c, hrtime_t stop)
return -1;
}


assert(nr == sizeof(ct));
if (c->instance->histogram) {
libcouchbase_record_metrics(c->instance, stop - ct.start,
header.response.opcode);
}

if (ntohs(header.response.status) != PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
if (ntohs(header.response.status) != PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET
|| header.response.opcode == CMD_GET_REPLICA) {
c->instance->response_handler[header.response.opcode](c, &ct, (void *)packet);
/* keep command and cookie until we get complete STAT response */
if (was_connected &&
Expand Down
74 changes: 73 additions & 1 deletion src/get.c
Expand Up @@ -254,7 +254,7 @@ static libcouchbase_error_t libcouchbase_single_get(libcouchbase_t instance,
{
libcouchbase_server_t *server;
protocol_binary_request_gat req;
int vb, idx;
int vb, idx, ii;
libcouchbase_size_t nbytes;

if (nhashkey == 0) {
Expand Down Expand Up @@ -297,3 +297,75 @@ static libcouchbase_error_t libcouchbase_single_get(libcouchbase_t instance,

return libcouchbase_synchandler_return(instance, LIBCOUCHBASE_SUCCESS);
}

LIBCOUCHBASE_API
libcouchbase_error_t libcouchbase_get_replica_by_key(libcouchbase_t instance,
const void *command_cookie,
const void *hashkey,
libcouchbase_size_t nhashkey,
libcouchbase_size_t num_keys,
const void *const *keys,
const libcouchbase_size_t *nkey)
{
libcouchbase_server_t *server;
protocol_binary_request_get req;
int vb, idx;
libcouchbase_size_t ii, *affected_servers = NULL;

/* we need a vbucket config before we can start getting data.. */
if (instance->vbucket_config == NULL) {
return libcouchbase_synchandler_return(instance, LIBCOUCHBASE_ETMPFAIL);
}

affected_servers = calloc(instance->nservers, sizeof(libcouchbase_size_t));
if (affected_servers == NULL) {
return libcouchbase_synchandler_return(instance, LIBCOUCHBASE_ENOMEM);
}
memset(&req, 0, sizeof(req));
req.message.header.request.magic = PROTOCOL_BINARY_REQ;
req.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
req.message.header.request.opcode = CMD_GET_REPLICA;
for (ii = 0; ii < num_keys; ++ii) {
if (nhashkey == 0) {
nhashkey = nkey[ii];
hashkey = keys[ii];
}
vb = vbucket_get_vbucket_by_key(instance->vbucket_config, hashkey, nhashkey);
idx = vbucket_get_replica(instance->vbucket_config, vb, 0);
if (idx < 0 || idx > (int)instance->nservers) {
/* the config says that there is no server yet at that position (-1) */
free(affected_servers);
return libcouchbase_synchandler_return(instance, LIBCOUCHBASE_NETWORK_ERROR);
}
affected_servers[idx]++;
server = instance->servers + idx;
req.message.header.request.keylen = ntohs((libcouchbase_uint16_t)nkey[ii]);
req.message.header.request.vbucket = ntohs((libcouchbase_uint16_t)vb);
req.message.header.request.bodylen = ntohl((libcouchbase_uint32_t)nkey[ii]);
req.message.header.request.opaque = ++instance->seqno;
libcouchbase_server_start_packet(server, command_cookie, req.bytes, sizeof(req.bytes));
libcouchbase_server_write_packet(server, keys[ii], nkey[ii]);
libcouchbase_server_end_packet(server);
}

for (ii = 0; ii < instance->nservers; ++ii) {
if (affected_servers[ii]) {
server = instance->servers + ii;
libcouchbase_server_send_packets(server);
}
}

free(affected_servers);
return libcouchbase_synchandler_return(instance, LIBCOUCHBASE_SUCCESS);
}

LIBCOUCHBASE_API
libcouchbase_error_t libcouchbase_get_replica(libcouchbase_t instance,
const void *command_cookie,
libcouchbase_size_t num_keys,
const void *const *keys,
const libcouchbase_size_t *nkey)
{
return libcouchbase_get_replica_by_key(instance, command_cookie,
NULL, 0, num_keys, keys, nkey);
}
65 changes: 65 additions & 0 deletions src/handler.c
Expand Up @@ -206,6 +206,70 @@ static void getq_response_handler(libcouchbase_server_t *server,
release_key(server, packet);
}

static void get_replica_response_handler(libcouchbase_server_t *server,
struct libcouchbase_command_data_st *command_data,
protocol_binary_response_header *res)
{
libcouchbase_t root = server->instance;
protocol_binary_response_get *get = (void *)res;
libcouchbase_uint16_t status = ntohs(res->response.status);
libcouchbase_size_t nbytes = ntohl(res->response.bodylen);
char *packet;
libcouchbase_uint16_t nkey;
const char *key = get_key(server, &nkey, &packet);

nbytes -= res->response.extlen;
if (key == NULL) {
libcouchbase_error_handler(server->instance, LIBCOUCHBASE_EINTERNAL,
NULL);
return;
} else if (status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
const char *bytes = (const char *)res;
bytes += sizeof(get->bytes);
root->callbacks.get(root, command_data->cookie, LIBCOUCHBASE_SUCCESS,
key, nkey, bytes, nbytes,
ntohl(get->message.body.flags),
res->response.cas);
} else {
if (status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
/* the config was updated, start from first replica */
command_data->replica = 0;
} else {
command_data->replica++;
}
if (command_data->replica < root->nreplicas) {
/* try next replica */
protocol_binary_request_get req;
int idx = vbucket_get_replica(root->vbucket_config, command_data->vbucket, 0);
if (idx < 0 || idx > (int)root->nservers) {
libcouchbase_error_handler(root, LIBCOUCHBASE_NETWORK_ERROR,
"GET_REPLICA: missing server");
return;
}
server = root->servers + idx;
memset(&req, 0, sizeof(req));
req.message.header.request.magic = PROTOCOL_BINARY_REQ;
req.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
req.message.header.request.opcode = CMD_GET_REPLICA;
req.message.header.request.keylen = ntohs((libcouchbase_uint16_t)nkey);
req.message.header.request.vbucket = ntohs(command_data->vbucket);
req.message.header.request.bodylen = ntohl((libcouchbase_uint32_t)nkey);
req.message.header.request.opaque = ++root->seqno;
libcouchbase_server_start_packet(server, command_data->cookie,
req.bytes, sizeof(req.bytes));
libcouchbase_server_write_packet(server, key, nkey);
libcouchbase_server_end_packet(server);
libcouchbase_server_send_packets(server);
} else {
/* give up and report the error */
root->callbacks.get(root, command_data->cookie,
map_error(status), key, nkey,
NULL, 0, 0, 0);
}
}
release_key(server, packet);
}

static void delete_response_handler(libcouchbase_server_t *server,
struct libcouchbase_command_data_st *command_data,
protocol_binary_response_header *res)
Expand Down Expand Up @@ -1027,6 +1091,7 @@ void libcouchbase_initialize_packet_handlers(libcouchbase_t instance)
instance->response_handler[PROTOCOL_BINARY_CMD_GET] = getq_response_handler;
instance->response_handler[PROTOCOL_BINARY_CMD_GAT] = getq_response_handler;
instance->response_handler[CMD_GET_LOCKED] = getq_response_handler;
instance->response_handler[CMD_GET_REPLICA] = get_replica_response_handler;
instance->response_handler[CMD_UNLOCK_KEY] = unlock_response_handler;
instance->response_handler[PROTOCOL_BINARY_CMD_ADD] = storage_response_handler;
instance->response_handler[PROTOCOL_BINARY_CMD_DELETE] = delete_response_handler;
Expand Down
2 changes: 2 additions & 0 deletions src/internal.h
Expand Up @@ -70,6 +70,8 @@ extern "C" {
struct libcouchbase_command_data_st {
hrtime_t start;
const void *cookie;
int replica;
libcouchbase_uint16_t vbucket;
};

typedef void (*REQUEST_HANDLER)(libcouchbase_server_t *instance,
Expand Down
1 change: 1 addition & 0 deletions src/packet.c
Expand Up @@ -32,6 +32,7 @@ void libcouchbase_server_buffer_start_packet(libcouchbase_server_t *c,
libcouchbase_size_t size)
{
struct libcouchbase_command_data_st ct;
memset(&ct, 0, sizeof(struct libcouchbase_command_data_st));
/* @todo we don't want to call gethrtime for each operation, */
/* so I need to pass it down the chain so that a large */
/* multiget can reuse the same timer... */
Expand Down
40 changes: 29 additions & 11 deletions tools/cbc.cc
Expand Up @@ -609,7 +609,7 @@ static bool rm_impl(libcouchbase_t instance, list<string> &keys)
return true;
}

static bool cat_impl(libcouchbase_t instance, list<string> &keys)
static bool cat_impl(libcouchbase_t instance, list<string> &keys, bool replica)
{
if (keys.empty()) {
cerr << "ERROR: you need to specify the key to get" << endl;
Expand All @@ -618,20 +618,23 @@ static bool cat_impl(libcouchbase_t instance, list<string> &keys)

const char* *k = new const char*[keys.size()];
libcouchbase_size_t *s = new libcouchbase_size_t[keys.size()];

int idx = 0;
libcouchbase_error_t err;

for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter, ++idx) {
k[idx] = iter->c_str();
s[idx] = iter->length();
}

libcouchbase_error_t err = libcouchbase_mget(instance, NULL, idx,
(const void * const *)k,
s, NULL);

if (replica) {
err = libcouchbase_get_replica(instance, NULL, idx,
(const void * const *)k, s);
} else {
err = libcouchbase_mget(instance, NULL, idx,
(const void * const *)k,
s, NULL);
}
delete []k;
delete []s;

if (err != LIBCOUCHBASE_SUCCESS) {
cerr << "Failed to send requests:" << endl
<< libcouchbase_strerror(instance, err) << endl;
Expand Down Expand Up @@ -941,7 +944,7 @@ static bool create_impl(libcouchbase_t instance, list<string> &keys,
static bool verify_impl(libcouchbase_t instance, list<string> &keys)
{
(void)libcouchbase_set_get_callback(instance, verify_callback);
return cat_impl(instance, keys);
return cat_impl(instance, keys, false);
}

static void loadKeys(list<string> &keys)
Expand Down Expand Up @@ -999,6 +1002,12 @@ static void handleCommandLineOptions(enum cbc_command_t cmd, int argc, char **ar
getopt.addOption(new CommandLineOption('t', "timeout", true,
"Specify timeout value"));

bool replica = false;
if (cmd == cbc_cat) {
getopt.addOption(new CommandLineOption('r', "replica", false,
"Read key(s) from replicas"));
}

libcouchbase_uint32_t flags = 0;
libcouchbase_uint32_t exptime = 0;
bool add = false;
Expand Down Expand Up @@ -1105,7 +1114,16 @@ static void handleCommandLineOptions(enum cbc_command_t cmd, int argc, char **ar
// NOTREACHED

default:
if (cmd == cbc_create) {
if (cmd == cbc_cat) {
unknownOpt = false;
switch ((*iter)->shortopt) {
case 'r':
replica = true;
break;
default:
unknownOpt = true;
}
} else if (cmd == cbc_create) {
unknownOpt = false;
switch ((*iter)->shortopt) {
case 'f':
Expand Down Expand Up @@ -1261,7 +1279,7 @@ static void handleCommandLineOptions(enum cbc_command_t cmd, int argc, char **ar
bool success = false;
switch (cmd) {
case cbc_cat:
success = cat_impl(instance, getopt.arguments);
success = cat_impl(instance, getopt.arguments, replica);
break;
case cbc_lock:
success = lock_impl(instance, getopt.arguments, exptime);
Expand Down

0 comments on commit a6928d0

Please sign in to comment.