Skip to content

Commit

Permalink
Move giant failout loop to its own function
Browse files Browse the repository at this point in the history
It's about time..

Change-Id: I27df01e3eefb2ea1ad2a7c05844eff7f63f6e9f7
Reviewed-on: http://review.couchbase.org/29782
Tested-by: Mark Nunberg <mnunberg@haskalah.org>
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information
mnunberg authored and avsej committed Oct 27, 2013
1 parent b4bc0fb commit 28da13d
Showing 1 changed file with 182 additions and 172 deletions.
354 changes: 182 additions & 172 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,188 @@ void lcb_failout_observe_request(lcb_server_t *server,
lcb_observe_invoke_callback(instance, command_data, err, &resp);
}
}
static void failout_single_request(lcb_server_t *server,
protocol_binary_request_header *req,
struct lcb_command_data_st *ct,
lcb_error_t error,
const void *keyptr,
lcb_size_t nkey,
const void *packet)
{
lcb_t root = server->instance;
union {
lcb_get_resp_t get;
lcb_store_resp_t store;
lcb_remove_resp_t remove;
lcb_touch_resp_t touch;
lcb_unlock_resp_t unlock;
lcb_arithmetic_resp_t arithmetic;
lcb_observe_resp_t observe;
lcb_server_stat_resp_t stats;
lcb_server_version_resp_t versions;
lcb_verbosity_resp_t verbosity;
lcb_flush_resp_t flush;
} resp;

switch (req->request.opcode) {
case PROTOCOL_BINARY_CMD_NOOP:
break;
case CMD_GET_LOCKED:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GATQ:
case PROTOCOL_BINARY_CMD_GET:
case PROTOCOL_BINARY_CMD_GETQ:
setup_lcb_get_resp_t(&resp.get, keyptr, nkey, NULL, 0, 0, 0, 0);
TRACE_GET_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.get);
root->callbacks.get(root, ct->cookie, error, &resp.get);
break;
case CMD_UNLOCK_KEY:
setup_lcb_unlock_resp_t(&resp.unlock, keyptr, nkey);
TRACE_UNLOCK_END(req->request.opaque, ntohs(req->request.vbucket),
error, &resp.unlock);
root->callbacks.unlock(root, ct->cookie, error, &resp.unlock);
break;
case PROTOCOL_BINARY_CMD_FLUSH:
setup_lcb_flush_resp_t(&resp.flush, server->authority);
TRACE_FLUSH_PROGRESS(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.flush);
root->callbacks.flush(root, ct->cookie, error, &resp.flush);
if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_FLUSH,
req->request.opaque,
server) < 0) {
setup_lcb_flush_resp_t(&resp.flush, NULL);
TRACE_FLUSH_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error);
root->callbacks.flush(root, ct->cookie, error, &resp.flush);
}
break;
case PROTOCOL_BINARY_CMD_ADD:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.store);
root->callbacks.store(root, ct->cookie, LCB_ADD, error, &resp.store);
break;
case PROTOCOL_BINARY_CMD_REPLACE:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.store);
root->callbacks.store(root, ct->cookie, LCB_REPLACE, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_SET:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.store);
root->callbacks.store(root, ct->cookie, LCB_SET, error, &resp.store);
break;
case PROTOCOL_BINARY_CMD_APPEND:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.store);
root->callbacks.store(root, ct->cookie, LCB_APPEND, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_PREPEND:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.store);
root->callbacks.store(root, ct->cookie, LCB_PREPEND, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_DELETE:
setup_lcb_remove_resp_t(&resp.remove, keyptr, nkey, 0);
TRACE_REMOVE_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.remove);
root->callbacks.remove(root, ct->cookie, error, &resp.remove);
break;

case PROTOCOL_BINARY_CMD_INCREMENT:
case PROTOCOL_BINARY_CMD_DECREMENT:
setup_lcb_arithmetic_resp_t(&resp.arithmetic, keyptr, nkey, 0, 0);
TRACE_ARITHMETIC_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.arithmetic);
root->callbacks.arithmetic(root, ct->cookie, error,
&resp.arithmetic);
break;
case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
case PROTOCOL_BINARY_CMD_SASL_AUTH:
case PROTOCOL_BINARY_CMD_SASL_STEP:
/* no need to notify user about these commands */
break;

case PROTOCOL_BINARY_CMD_TOUCH:
setup_lcb_touch_resp_t(&resp.touch, keyptr, nkey, 0);
TRACE_TOUCH_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.touch);
root->callbacks.touch(root, ct->cookie, error, &resp.touch);
break;

case PROTOCOL_BINARY_CMD_STAT:
setup_lcb_server_stat_resp_t(&resp.stats, server->authority,
NULL, 0, NULL, 0);
TRACE_STATS_PROGRESS(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.stats);
root->callbacks.stat(root, ct->cookie, error, &resp.stats);

if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_STAT,
req->request.opaque,
server) < 0) {
setup_lcb_server_stat_resp_t(&resp.stats,
NULL, NULL, 0, NULL, 0);
TRACE_STATS_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error);
root->callbacks.stat(root, ct->cookie, error, &resp.stats);
}
break;

case PROTOCOL_BINARY_CMD_VERBOSITY:
setup_lcb_verbosity_resp_t(&resp.verbosity, server->authority);
TRACE_VERBOSITY_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.verbosity);
root->callbacks.verbosity(root, ct->cookie, error, &resp.verbosity);

if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_VERBOSITY,
req->request.opaque,
server) < 0) {
setup_lcb_verbosity_resp_t(&resp.verbosity, NULL);
TRACE_VERBOSITY_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.verbosity);
root->callbacks.verbosity(root, ct->cookie, error, &resp.verbosity);
}
break;

case PROTOCOL_BINARY_CMD_VERSION:
setup_lcb_server_version_resp_t(&resp.versions, server->authority,
NULL, 0);
TRACE_VERSIONS_PROGRESS(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error, &resp.versions);
root->callbacks.version(root, ct->cookie, error, &resp.versions);
if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_VERSION,
req->request.opaque,
server) < 0) {
TRACE_VERSIONS_END(req->request.opaque, ntohs(req->request.vbucket),
req->request.opcode, error);
setup_lcb_server_version_resp_t(&resp.versions, NULL, NULL, 0);
root->callbacks.version(root, ct->cookie, error, &resp.versions);
}
break;

case CMD_OBSERVE:
lcb_failout_observe_request(server, ct, packet,
sizeof(req->bytes) + ntohl(req->request.bodylen),
error);
break;

default:
lcb_assert("unexpected opcode while purging the server" && 0);
}

}

static void purge_single_server(lcb_server_t *server, lcb_error_t error,
hrtime_t min_nonstale)
Expand Down Expand Up @@ -106,19 +287,6 @@ static void purge_single_server(lcb_server_t *server, lcb_error_t error,
int allocated = 0;
lcb_uint32_t headersize;
lcb_uint16_t nkey;
union {
lcb_get_resp_t get;
lcb_store_resp_t store;
lcb_remove_resp_t remove;
lcb_touch_resp_t touch;
lcb_unlock_resp_t unlock;
lcb_arithmetic_resp_t arithmetic;
lcb_observe_resp_t observe;
lcb_server_stat_resp_t stats;
lcb_server_version_resp_t versions;
lcb_verbosity_resp_t verbosity;
lcb_flush_resp_t flush;
} resp;

nr = ringbuffer_peek(cookies, &ct, sizeof(ct));
if (nr != sizeof(ct)) {
Expand Down Expand Up @@ -180,165 +348,7 @@ static void purge_single_server(lcb_server_t *server, lcb_error_t error,
keyptr = packet + sizeof(req) + req.request.extlen;
nkey = ntohs(req.request.keylen);

/* It would have been awesome if we could have a generic error */
/* handler we could call */
switch (req.request.opcode) {
case PROTOCOL_BINARY_CMD_NOOP:
break;
case CMD_GET_LOCKED:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GATQ:
case PROTOCOL_BINARY_CMD_GET:
case PROTOCOL_BINARY_CMD_GETQ:
setup_lcb_get_resp_t(&resp.get, keyptr, nkey, NULL, 0, 0, 0, 0);
TRACE_GET_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.get);
root->callbacks.get(root, ct.cookie, error, &resp.get);
break;
case CMD_UNLOCK_KEY:
setup_lcb_unlock_resp_t(&resp.unlock, keyptr, nkey);
TRACE_UNLOCK_END(req.request.opaque, ntohs(req.request.vbucket),
error, &resp.unlock);
root->callbacks.unlock(root, ct.cookie, error, &resp.unlock);
break;
case PROTOCOL_BINARY_CMD_FLUSH:
setup_lcb_flush_resp_t(&resp.flush, server->authority);
TRACE_FLUSH_PROGRESS(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.flush);
root->callbacks.flush(root, ct.cookie, error, &resp.flush);
if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_FLUSH,
req.request.opaque,
server) < 0) {
setup_lcb_flush_resp_t(&resp.flush, NULL);
TRACE_FLUSH_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error);
root->callbacks.flush(root, ct.cookie, error, &resp.flush);
}
break;
case PROTOCOL_BINARY_CMD_ADD:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.store);
root->callbacks.store(root, ct.cookie, LCB_ADD, error, &resp.store);
break;
case PROTOCOL_BINARY_CMD_REPLACE:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.store);
root->callbacks.store(root, ct.cookie, LCB_REPLACE, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_SET:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.store);
root->callbacks.store(root, ct.cookie, LCB_SET, error, &resp.store);
break;
case PROTOCOL_BINARY_CMD_APPEND:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.store);
root->callbacks.store(root, ct.cookie, LCB_APPEND, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_PREPEND:
setup_lcb_store_resp_t(&resp.store, keyptr, nkey, 0);
TRACE_STORE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.store);
root->callbacks.store(root, ct.cookie, LCB_PREPEND, error,
&resp.store);
break;
case PROTOCOL_BINARY_CMD_DELETE:
setup_lcb_remove_resp_t(&resp.remove, keyptr, nkey, 0);
TRACE_REMOVE_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.remove);
root->callbacks.remove(root, ct.cookie, error, &resp.remove);
break;

case PROTOCOL_BINARY_CMD_INCREMENT:
case PROTOCOL_BINARY_CMD_DECREMENT:
setup_lcb_arithmetic_resp_t(&resp.arithmetic, keyptr, nkey, 0, 0);
TRACE_ARITHMETIC_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.arithmetic);
root->callbacks.arithmetic(root, ct.cookie, error,
&resp.arithmetic);
break;
case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
case PROTOCOL_BINARY_CMD_SASL_AUTH:
case PROTOCOL_BINARY_CMD_SASL_STEP:
/* no need to notify user about these commands */
break;

case PROTOCOL_BINARY_CMD_TOUCH:
setup_lcb_touch_resp_t(&resp.touch, keyptr, nkey, 0);
TRACE_TOUCH_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.touch);
root->callbacks.touch(root, ct.cookie, error, &resp.touch);
break;

case PROTOCOL_BINARY_CMD_STAT:
setup_lcb_server_stat_resp_t(&resp.stats, server->authority,
NULL, 0, NULL, 0);
TRACE_STATS_PROGRESS(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.stats);
root->callbacks.stat(root, ct.cookie, error, &resp.stats);

if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_STAT,
req.request.opaque,
server) < 0) {
setup_lcb_server_stat_resp_t(&resp.stats,
NULL, NULL, 0, NULL, 0);
TRACE_STATS_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error);
root->callbacks.stat(root, ct.cookie, error, &resp.stats);
}
break;

case PROTOCOL_BINARY_CMD_VERBOSITY:
setup_lcb_verbosity_resp_t(&resp.verbosity, server->authority);
TRACE_VERBOSITY_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.verbosity);
root->callbacks.verbosity(root, ct.cookie, error, &resp.verbosity);

if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_VERBOSITY,
req.request.opaque,
server) < 0) {
setup_lcb_verbosity_resp_t(&resp.verbosity, NULL);
TRACE_VERBOSITY_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.verbosity);
root->callbacks.verbosity(root, ct.cookie, error, &resp.verbosity);
}
break;

case PROTOCOL_BINARY_CMD_VERSION:
setup_lcb_server_version_resp_t(&resp.versions, server->authority,
NULL, 0);
TRACE_VERSIONS_PROGRESS(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error, &resp.versions);
root->callbacks.version(root, ct.cookie, error, &resp.versions);
if (lcb_lookup_server_with_command(root,
PROTOCOL_BINARY_CMD_VERSION,
req.request.opaque,
server) < 0) {
TRACE_VERSIONS_END(req.request.opaque, ntohs(req.request.vbucket),
req.request.opcode, error);
setup_lcb_server_version_resp_t(&resp.versions, NULL, NULL, 0);
root->callbacks.version(root, ct.cookie, error, &resp.versions);
}
break;

case CMD_OBSERVE:
lcb_failout_observe_request(server, &ct, packet,
sizeof(req.bytes) + ntohl(req.request.bodylen),
error);
break;

default:
lcb_assert("unexpected opcode while purging the server" && 0);
}
failout_single_request(server, &req, &ct, error, keyptr, nkey, packet);

if (allocated) {
free(packet);
Expand Down

0 comments on commit 28da13d

Please sign in to comment.