Permalink
Browse files

MB-3389 - b2b not-my-vbucket handling

This bug fix required these changes...

- binary protocol ntohs(status) conversion, so the NOT_MY_VBUCKET
  status code comparison actually works.

- remove an assert() so that b2b request retry codepath works.

- send binary error message to binary upstream clients instead of
  an ASCII protocol error string.

Test hint -- point: memcachetest -h HOST:11211 -i 1 -c 1000 -l -L 2
at moxi while adding & removing servers in a cluster.

Change-Id: I06aba8cc3390042cc7430a3b4d5c659421f112de
Reviewed-on: http://review.membase.org/4374
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information...
1 parent a5cbcfd commit 57a53494ae2903c1e8135e323edf2f3246a8d0d7 @steveyen steveyen committed with alk Jan 29, 2011
Showing with 60 additions and 5 deletions.
  1. +28 −3 cproxy.c
  2. +5 −0 cproxy.h
  3. +24 −0 cproxy_protocol_b.c
  4. +3 −2 cproxy_protocol_b2b.c
View
31 cproxy.c
@@ -721,8 +721,18 @@ void cproxy_on_close_downstream_conn(conn *c) {
// - last downstream conn closes? Current behavior.
//
if (d->upstream_suffix == NULL) {
- d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
- d->upstream_suffix_len = 0;
+ if (settings.verbose > 2) {
+ moxi_log_write("<%d proxy downstream closed, upstream %d (%x)\n",
+ c->sfd,
+ d->upstream_conn->sfd,
+ d->upstream_conn->protocol);
+ }
+
+ if (IS_ASCII(d->upstream_conn->protocol)) {
+ d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
+ d->upstream_suffix_len = 0;
+ }
+
d->upstream_retry = 0;
}
@@ -758,6 +768,21 @@ void cproxy_on_close_downstream_conn(conn *c) {
}
}
}
+
+ if (uc_retry == NULL &&
+ d->upstream_suffix == NULL &&
+ IS_BINARY(d->upstream_conn->protocol)) {
+ protocol_binary_response_header *rh =
+ cproxy_make_bin_error(d->upstream_conn,
+ PROTOCOL_BINARY_RESPONSE_ENOMEM);
+ if (rh != NULL) {
+ d->upstream_suffix = rh;
+ d->upstream_suffix_len = sizeof(protocol_binary_response_header);
+ } else {
+ d->ptd->stats.stats.err_oom++;
+ cproxy_close_conn(d->upstream_conn);
+ }
+ }
}
// Are we over-decrementing here, and in handling conn_pause?
@@ -1886,7 +1911,7 @@ void upstream_error(conn *uc) {
upstream_error_msg(uc, NULL);
}
- void upstream_error_msg(conn *uc, char *ascii_msg) {
+void upstream_error_msg(conn *uc, char *ascii_msg) {
assert(uc);
assert(uc->state == conn_pause);
View
5 cproxy.h
@@ -835,6 +835,11 @@ void key_stats_dec_ref(void *it);
bool add_conn_item(conn *c, item *it);
char *add_conn_suffix(conn *c);
+void *cproxy_make_bin_header(conn *c, uint8_t magic);
+
+protocol_binary_response_header *cproxy_make_bin_error(conn *c,
+ uint16_t status);
+
size_t scan_tokens(char *command, token_t *tokens, const size_t max_tokens,
int *command_len);
View
24 cproxy_protocol_b.c
@@ -440,3 +440,27 @@ static void cproxy_sasl_plain_auth(conn *c, char *req_bytes) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
}
+void *cproxy_make_bin_header(conn *c, uint8_t magic) {
+ protocol_binary_response_header *rh =
+ (protocol_binary_response_header *) add_conn_suffix(c);
+ if (rh != NULL) {
+ memset(rh, 0, sizeof(protocol_binary_response_header));
+ rh->response.magic = magic;
+ rh->response.datatype = (uint8_t) PROTOCOL_BINARY_RAW_BYTES;
+ return rh;
+ }
+ return NULL;
+}
+
+protocol_binary_response_header *
+cproxy_make_bin_error(conn *c, uint16_t status) {
+ protocol_binary_response_header *rh =
+ cproxy_make_bin_header(c, PROTOCOL_BINARY_RES);
+ if (rh != NULL) {
+ rh->response.opcode = c->binary_header.request.opcode;
+ rh->response.status = (uint16_t) htons(status);
+ rh->response.opaque = c->opaque;
+ return rh;
+ }
+ return NULL;
+}
View
5 cproxy_protocol_b2b.c
@@ -34,11 +34,12 @@ bool cproxy_forward_b2b_downstream(downstream *d) {
assert(d->ptd != NULL);
assert(d->ptd->proxy != NULL);
assert(d->downstream_conns != NULL);
- assert(d->downstream_used_start == 0);
assert(d->downstream_used == 0);
assert(d->multiget == NULL);
assert(d->merger == NULL);
+ d->downstream_used_start = 0;
+
conn *uc = d->upstream_conn;
if (settings.verbose > 2) {
@@ -401,7 +402,7 @@ void cproxy_process_b2b_downstream_nread(conn *c) {
int extlen = header->response.extlen;
int keylen = header->response.keylen;
uint32_t bodylen = header->response.bodylen;
- int status = header->response.status;
+ int status = ntohs(header->response.status);
int opcode = header->response.opcode;
if (settings.verbose > 2) {

0 comments on commit 57a5349

Please sign in to comment.