Skip to content

Commit

Permalink
binary-to-binary handling of not-my-vbucket responses
Browse files Browse the repository at this point in the history
Change-Id: Ib7c493f19d964fb48d8d29d1d69238a74e2ce755
Reviewed-on: http://review.northscale.com/975
Reviewed-by: Dustin Sallings <dustin@spy.net>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Jul 7, 2010
1 parent b1805aa commit ecbd542
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions cproxy_protocol_b2b.c
Expand Up @@ -342,23 +342,24 @@ void cproxy_process_b2b_downstream_nread(conn *c) {
int extlen = header->response.extlen;
int keylen = header->response.keylen;
uint32_t bodylen = header->response.bodylen;
int status = header->response.status;

if (settings.verbose > 2) {
fprintf(stderr,
"<%d cproxy_process_b2b_downstream_nread %x %d %d %u %d\n",
c->sfd, c->cmd, extlen, keylen, bodylen, c->noreply);
"<%d cproxy_process_b2b_downstream_nread %x %d %d %u %d %x\n",
c->sfd, c->cmd, extlen, keylen, bodylen, c->noreply, status);
}

downstream *d = c->extra;
assert(d != NULL);
assert(d->ptd != NULL);
assert(d->ptd->proxy != NULL);

// TODO: Need to handle not-my-vbucket error by retrying.
// TODO: Need to handle quiet binary command error response,
// in the right order.
// TODO: Need to handle not-my-vbucket error during a quiet cmd.
//
conn *uc = d->upstream_conn;
item *it = c->item;
assert(it != NULL);
assert(it->refcount == 1);
Expand All @@ -367,9 +368,62 @@ void cproxy_process_b2b_downstream_nread(conn *c) {
conn_set_state(c, conn_new_cmd);
} else {
conn_set_state(c, conn_pause);

// If the client is still there, we should handle
// a not-my-vbucket error with a possible retry.
//
if (uc != NULL &&
status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
if (settings.verbose > 2) {
fprintf(stderr,
"<%d cproxy_process_b2b_downstream_nread not-my-vbucket, "
"cmd: %x %d\n",
c->sfd, header->response.opcode, uc->item != NULL);
}

assert(uc->item != NULL);

protocol_binary_request_header *req =
(protocol_binary_request_header *) ITEM_data((item *) uc->item);

int vbucket = ntohs(req->request.reserved);
int sindex = downstream_conn_index(d, c);

if (settings.verbose > 2) {
fprintf(stderr,
"<%d cproxy_process_b2b_downstream_nread not-my-vbucket, "
"cmd: %x not multi-key get, sindex %d, vbucket %d, retries %d\n",
c->sfd, header->response.opcode,
sindex, vbucket, uc->cmd_retries);
}

mcs_server_invalid_vbucket(&d->mst, sindex, vbucket);

// As long as the upstream is still open and we haven't
// retried too many times already.
//
int max_retries = (mcs_server_count(&d->mst) * 2);

if (uc->cmd_retries < max_retries) {
uc->cmd_retries++;

// TODO: Add a stats counter here for this case.
//
d->upstream_retry++;

return;
}

if (settings.verbose > 2) {
fprintf(stderr,
"%d: cproxy_process_b2b_downstream_nread not-my-vbucket, "
"cmd: %x skipping retry %d >= %d\n",
c->sfd, header->response.opcode, uc->cmd_retries,
max_retries);
}
}
}

conn *uc = d->upstream_conn;
if (uc != NULL) {
if (settings.verbose > 2) {
fprintf(stderr,
Expand Down

0 comments on commit ecbd542

Please sign in to comment.