Skip to content

Commit

Permalink
upstream_retry when multi-GET not-my-vbucket err
Browse files Browse the repository at this point in the history
Change-Id: Iebbae35b0509cc16cbaf4610a3e433b60b71b94b

In this implementation of handling not-my-bucket errors during
ascii-to-binary proxying, we use the binary header's opaque field.

moxi uses the opaque field to store either a vbucket id or a key
index, depending on the command opcode.  The key index is for a char
array, and is used to retreive the key for a given command.  Given the
key, moxi can then re-retrieve the vbucket id.

Given the vbucket id, finally, during a not-my-vbucket error, moxi can
then call the incorrect_master() API of libvbucket.
  • Loading branch information
steveyen committed Jun 18, 2010
1 parent c5516f4 commit 8f7f015
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 43 deletions.
71 changes: 54 additions & 17 deletions cproxy.c
Expand Up @@ -28,6 +28,8 @@ conn *conn_list_remove(conn *head, conn **tail,


bool is_compatible_request(conn *existing, conn *candidate); bool is_compatible_request(conn *existing, conn *candidate);


void propagate_error(downstream *d);

int init_mcs_st(mcs_st *mst, char *config); int init_mcs_st(mcs_st *mst, char *config);


// Function tables. // Function tables.
Expand Down Expand Up @@ -363,6 +365,7 @@ void cproxy_on_close_upstream_conn(conn *c) {
c, &found); c, &found);
if (d->upstream_conn == NULL) { if (d->upstream_conn == NULL) {
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;


// Don't need to do anything else, as we'll now just // Don't need to do anything else, as we'll now just
// read and drop any remaining inflight downstream replies. // read and drop any remaining inflight downstream replies.
Expand Down Expand Up @@ -474,6 +477,7 @@ void cproxy_on_close_downstream_conn(conn *c) {
// //
if (d->upstream_suffix == NULL) { if (d->upstream_suffix == NULL) {
d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n"; d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
d->upstream_retry = 0;
} }


// We sometimes see that drive_machine/transmit will not see // We sometimes see that drive_machine/transmit will not see
Expand Down Expand Up @@ -503,6 +507,7 @@ void cproxy_on_close_downstream_conn(conn *c) {
d->upstream_conn->cmd_retries++; d->upstream_conn->cmd_retries++;
uc_retry = d->upstream_conn; uc_retry = d->upstream_conn;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;
} }
} }
} }
Expand Down Expand Up @@ -612,18 +617,18 @@ downstream *cproxy_reserve_downstream(proxy_td *ptd) {


assert(d->upstream_conn == NULL); assert(d->upstream_conn == NULL);
assert(d->upstream_suffix == NULL); assert(d->upstream_suffix == NULL);
assert(d->upstream_retry == 0);
assert(d->downstream_used == 0); assert(d->downstream_used == 0);
assert(d->downstream_used_start == 0); assert(d->downstream_used_start == 0);
assert(d->multiget == NULL);
assert(d->merger == NULL); assert(d->merger == NULL);
assert(d->timeout_tv.tv_sec == 0); assert(d->timeout_tv.tv_sec == 0);
assert(d->timeout_tv.tv_usec == 0); assert(d->timeout_tv.tv_usec == 0);


d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;
d->downstream_used = 0; d->downstream_used = 0;
d->downstream_used_start = 0; d->downstream_used_start = 0;
d->multiget = NULL;
d->merger = NULL; d->merger = NULL;
d->timeout_tv.tv_sec = 0; d->timeout_tv.tv_sec = 0;
d->timeout_tv.tv_usec = 0; d->timeout_tv.tv_usec = 0;
Expand Down Expand Up @@ -654,6 +659,24 @@ bool cproxy_release_downstream(downstream *d, bool force) {
fprintf(stderr, "release_downstream\n"); fprintf(stderr, "release_downstream\n");
} }


if (!force &&
d->upstream_retry > 0) {
if (settings.verbose > 2) {
fprintf(stderr, "%d: release_downstream, instead retrying %d\n",
d->upstream_conn->sfd, d->upstream_retry);
}

d->upstream_retry = 0;

if (d->propagate(d) == true) {
return true;
} else {
d->ptd->stats.stats.tot_downstream_propagate_failed++;

propagate_error(d);
}
}

d->ptd->stats.stats.tot_downstream_released++; d->ptd->stats.stats.tot_downstream_released++;


// Delink upstream conns. // Delink upstream conns.
Expand Down Expand Up @@ -720,6 +743,7 @@ bool cproxy_release_downstream(downstream *d, bool force) {


d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; // No free(), expecting a static string. d->upstream_suffix = NULL; // No free(), expecting a static string.
d->upstream_retry = 0;
d->downstream_used = 0; d->downstream_used = 0;
d->downstream_used_start = 0; d->downstream_used_start = 0;
d->multiget = NULL; d->multiget = NULL;
Expand Down Expand Up @@ -1209,21 +1233,7 @@ void cproxy_assign_downstream(proxy_td *ptd) {
break; break;
} }


while (d->upstream_conn != NULL) { propagate_error(d);
conn *uc = d->upstream_conn;

if (settings.verbose > 1) {
fprintf(stderr,
"ERROR: %d could not forward upstream to downstream\n",
uc->sfd);
}

upstream_error(uc);

conn *curr = d->upstream_conn;
d->upstream_conn = d->upstream_conn->next;
curr->next = NULL;
}


cproxy_release_downstream(d, false); cproxy_release_downstream(d, false);
} }
Expand All @@ -1234,6 +1244,26 @@ void cproxy_assign_downstream(proxy_td *ptd) {
} }
} }


void propagate_error(downstream *d) {
assert(d != NULL);

while (d->upstream_conn != NULL) {
conn *uc = d->upstream_conn;

if (settings.verbose > 1) {
fprintf(stderr,
"ERROR: %d could not forward upstream to downstream\n",
uc->sfd);
}

upstream_error(uc);

conn *curr = d->upstream_conn;
d->upstream_conn = d->upstream_conn->next;
curr->next = NULL;
}
}

void upstream_error(conn *uc) { void upstream_error(conn *uc) {
assert(uc); assert(uc);
assert(uc->state == conn_pause); assert(uc->state == conn_pause);
Expand Down Expand Up @@ -1300,6 +1330,7 @@ bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
uc->noreply = false; uc->noreply = false;
d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;


cproxy_reset_upstream(uc); cproxy_reset_upstream(uc);


Expand Down Expand Up @@ -1764,6 +1795,11 @@ downstream *downstream_list_remove(downstream *head, downstream *d) {
* TODO: Handle binary upstream protocol. * TODO: Handle binary upstream protocol.
*/ */
bool is_compatible_request(conn *existing, conn *candidate) { bool is_compatible_request(conn *existing, conn *candidate) {
// The not-my-vbucket error handling requires us to not
// squash ascii multi-GET requests, due to reusing the
// multiget-deduplication machinery during retries and
// to simplify the later codepaths.
/*
assert(existing); assert(existing);
assert(IS_ASCII(existing->protocol)); assert(IS_ASCII(existing->protocol));
assert(IS_PROXY(existing->protocol)); assert(IS_PROXY(existing->protocol));
Expand All @@ -1790,6 +1826,7 @@ bool is_compatible_request(conn *existing, conn *candidate) {
return true; return true;
} }
} }
*/


return false; return false;
} }
Expand Down
8 changes: 7 additions & 1 deletion cproxy.h
Expand Up @@ -392,6 +392,12 @@ struct downstream {
int downstream_used_start; int downstream_used_start;
conn *upstream_conn; // Non-NULL when downstream is reserved. conn *upstream_conn; // Non-NULL when downstream is reserved.
char *upstream_suffix; // Last bit to write when downstreams are done. char *upstream_suffix; // Last bit to write when downstreams are done.
int upstream_retry; // Will be >0 if we should retry the entire
// command again when all downstreams are done.
// Used in not-my-vbucket error case. During
// the retry, we'll reuse the same multiget
// de-duplication tracking table to avoid
// asking for successful keys again.


genhash_t *multiget; // Keyed by string. genhash_t *multiget; // Keyed by string.
genhash_t *merger; // Keyed by string, for merging replies like STATS. genhash_t *merger; // Keyed by string, for merging replies like STATS.
Expand Down Expand Up @@ -576,7 +582,7 @@ struct multiget_entry {
bool multiget_ascii_downstream( bool multiget_ascii_downstream(
downstream *d, conn *uc, downstream *d, conn *uc,
int (*emit_start)(conn *c, char *cmd, int cmd_len), int (*emit_start)(conn *c, char *cmd, int cmd_len),
int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket), int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
int (*emit_end)(conn *c), int (*emit_end)(conn *c),
mcache *front_cache); mcache *front_cache);


Expand Down
38 changes: 26 additions & 12 deletions cproxy_multiget.c
Expand Up @@ -25,7 +25,6 @@ void multiget_foreach_free(const void *key,
&ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY]; &ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY];


int length = 0; int length = 0;

multiget_entry *entry = (multiget_entry*)value; multiget_entry *entry = (multiget_entry*)value;


while (entry != NULL) { while (entry != NULL) {
Expand Down Expand Up @@ -74,12 +73,11 @@ void multiget_remove_upstream(const void *key,


bool multiget_ascii_downstream(downstream *d, conn *uc, bool multiget_ascii_downstream(downstream *d, conn *uc,
int (*emit_start)(conn *c, char *cmd, int cmd_len), int (*emit_start)(conn *c, char *cmd, int cmd_len),
int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket), int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
int (*emit_end)(conn *c), int (*emit_end)(conn *c),
mcache *front_cache) { mcache *front_cache) {
assert(d != NULL); assert(d != NULL);
assert(d->downstream_conns != NULL); assert(d->downstream_conns != NULL);
assert(d->multiget == NULL);
assert(uc != NULL); assert(uc != NULL);
assert(uc->noreply == false); assert(uc->noreply == false);


Expand All @@ -103,13 +101,17 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
} }
} }


if (uc->next != NULL) { // Always have a de-duplication map, due to not-my-vbucket error
// More than one upstream conn, so we need a hashtable // handling where any retry attempts should avoid retrying already
// to track keys for de-deplication. // successfully attempted keys.
// //
// Previously, we used to only have a map when there was more than
// one upstream conn.
//
if (d->multiget == NULL) {
d->multiget = genhash_init(128, skeyhash_ops); d->multiget = genhash_init(128, skeyhash_ops);
if (settings.verbose > 1) { if (settings.verbose > 1) {
fprintf(stderr, "cproxy multiget hash table new\n"); fprintf(stderr, "%d: cproxy multiget hash table new\n", uc->sfd);
} }
} }


Expand Down Expand Up @@ -139,8 +141,8 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
int cas_emit = (command[3] == 's'); int cas_emit = (command[3] == 's');


if (settings.verbose > 1) { if (settings.verbose > 1) {
fprintf(stderr, "forward multiget %s (%d %d)\n", fprintf(stderr, "%d: forward multiget %s (%d %d)\n",
command, cmd_len, uc_num); uc_cur->sfd, command, cmd_len, uc_num);
} }


while (space != NULL) { while (space != NULL) {
Expand Down Expand Up @@ -278,11 +280,22 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,


// See if we've already requested this key via // See if we've already requested this key via
// the multiget hash table, in order to // the multiget hash table, in order to
// de-deplicate repeated keys. // de-duplicate repeated keys.
// //
bool first_request = true; bool first_request = true;


if (d->multiget != NULL) { if (d->multiget != NULL) {
if (settings.verbose > 2) {
char key_buf[KEY_MAX_LENGTH + 10];
assert(key_len <= KEY_MAX_LENGTH);
memcpy(key_buf, key, key_len);
key_buf[key_len] = '\0';

fprintf(stderr,
"<%d multiget_ascii_downstream '%s' %d %d %d\n",
c->sfd, key_buf, vbucket, (int) (key - command), key_len);
}

// TODO: Use Trond's allocator here. // TODO: Use Trond's allocator here.
// //
multiget_entry *entry = multiget_entry *entry =
Expand Down Expand Up @@ -318,7 +331,7 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
// Provide the preceding space as optimization // Provide the preceding space as optimization
// for ascii-to-ascii configuration. // for ascii-to-ascii configuration.
// //
emit_skey(c, key - 1, key_len + 1, vbucket); emit_skey(c, key - 1, key_len + 1, vbucket, key - command);
} else { } else {
ptd->stats.stats.tot_multiget_keys_dedupe++; ptd->stats.stats.tot_multiget_keys_dedupe++;


Expand Down Expand Up @@ -383,6 +396,7 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,


if (cproxy_dettach_if_noreply(d, uc) == false) { if (cproxy_dettach_if_noreply(d, uc) == false) {
d->upstream_suffix = "END\r\n"; d->upstream_suffix = "END\r\n";
d->upstream_retry = 0;


cproxy_start_downstream_timeout(d, NULL); cproxy_start_downstream_timeout(d, NULL);
} }
Expand Down
2 changes: 2 additions & 0 deletions cproxy_protocol.c
Expand Up @@ -382,6 +382,7 @@ bool cproxy_optimize_set_ascii(downstream *d, conn *uc,
key, key_len, false)) { key, key_len, false)) {
d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;


out_string(uc, "STORED"); out_string(uc, "STORED");


Expand Down Expand Up @@ -422,6 +423,7 @@ void cproxy_optimize_to_self(downstream *d, conn *uc,


d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0;


cproxy_release_downstream(d, false); cproxy_release_downstream(d, false);
} }
5 changes: 3 additions & 2 deletions cproxy_protocol_a2a.c
Expand Up @@ -16,7 +16,7 @@
#define MAX_TOKENS 8 #define MAX_TOKENS 8


int a2a_multiget_start(conn *c, char *cmd, int cmd_len); int a2a_multiget_start(conn *c, char *cmd, int cmd_len);
int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket); int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index);
int a2a_multiget_end(conn *c); int a2a_multiget_end(conn *c);


void cproxy_init_a2a() { void cproxy_init_a2a() {
Expand Down Expand Up @@ -384,7 +384,7 @@ int a2a_multiget_start(conn *c, char *cmd, int cmd_len) {


/* An skey is a space prefixed key string. /* An skey is a space prefixed key string.
*/ */
int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket) { int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index) {
return add_iov(c, skey, skey_len); return add_iov(c, skey, skey_len);
} }


Expand Down Expand Up @@ -450,6 +450,7 @@ bool cproxy_broadcast_a2a_downstream(downstream *d,


if (cproxy_dettach_if_noreply(d, uc) == false) { if (cproxy_dettach_if_noreply(d, uc) == false) {
d->upstream_suffix = suffix; d->upstream_suffix = suffix;
d->upstream_retry = 0;


cproxy_start_downstream_timeout(d, NULL); cproxy_start_downstream_timeout(d, NULL);
} else { } else {
Expand Down

0 comments on commit 8f7f015

Please sign in to comment.