Permalink
Browse files

bug 1429 - stop retrying after too many ascii-to-binary retries

There are two cases of retry tracking added here, so code
needed fixing in 2 different places...

- for simple, single-shot mutation commands
- for scatter-gather GETKQ's and NOOP translation case

Also, fixed a case when the multiget was NULL, introduced
in a previous performance enhancement fix.

Change-Id: I043f12f9733624b225196c29b144f5b1e02e5852
Reviewed-on: http://review.northscale.com:8080/700
Tested-by: Steve Yen <steve.yen@gmail.com>
Reviewed-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information...
1 parent 4642096 commit 18cb8924cddb9269687ef8019882c6c3a0489f11 @steveyen steveyen committed Jun 19, 2010
Showing with 78 additions and 33 deletions.
  1. +26 −10 cproxy.c
  2. +1 −0 cproxy.h
  3. +51 −23 cproxy_protocol_a2b.c
View
@@ -629,6 +629,7 @@ downstream *cproxy_reserve_downstream(proxy_td *ptd) {
d->upstream_conn = NULL;
d->upstream_suffix = NULL;
d->upstream_retry = 0;
+ d->upstream_retries = 0;
d->downstream_used = 0;
d->downstream_used_start = 0;
d->merger = NULL;
@@ -667,19 +668,33 @@ bool cproxy_release_downstream(downstream *d, bool force) {
//
if (!force &&
d->upstream_retry > 0) {
- if (settings.verbose > 2) {
- fprintf(stderr, "%d: release_downstream, instead retrying %d\n",
- d->upstream_conn->sfd, d->upstream_retry);
- }
-
d->upstream_retry = 0;
+ d->upstream_retries++;
- if (cproxy_forward(d) == true) {
- return true;
- } else {
- d->ptd->stats.stats.tot_downstream_propagate_failed++;
+ // But, we can stop retrying if we've tried each server twice.
+ //
+ int max_retries = mcs_server_count(&d->mst) * 2;
- propagate_error(d);
+ if (d->upstream_retries <= max_retries) {
+ if (settings.verbose > 2) {
+ fprintf(stderr, "%d: release_downstream, instead retrying %d, %d <= %d\n",
+ d->upstream_conn->sfd,
+ d->upstream_retry, d->upstream_retries, max_retries);
+ }
+
+ if (cproxy_forward(d) == true) {
+ return true;
+ } else {
+ d->ptd->stats.stats.tot_downstream_propagate_failed++;
+
+ propagate_error(d);
+ }
+ } else {
+ if (settings.verbose > 2) {
+ fprintf(stderr, "%d: release_downstream, skipping retry %d, %d > %d\n",
+ d->upstream_conn->sfd,
+ d->upstream_retry, d->upstream_retries, max_retries);
+ }
}
}
@@ -750,6 +765,7 @@ bool cproxy_release_downstream(downstream *d, bool force) {
d->upstream_conn = NULL;
d->upstream_suffix = NULL; // No free(), expecting a static string.
d->upstream_retry = 0;
+ d->upstream_retries = 0;
d->downstream_used = 0;
d->downstream_used_start = 0;
d->multiget = NULL;
View
@@ -383,6 +383,7 @@ struct downstream {
// the retry, we'll reuse the same multiget
// de-duplication tracking table to avoid
// asking for successful keys again.
+ int upstream_retries; // Count number of upstream_retry attempts.
genhash_t *multiget; // Keyed by string.
genhash_t *merger; // Keyed by string, for merging replies like STATS.
View
@@ -670,18 +670,36 @@ void a2b_process_downstream_response(conn *c) {
mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
vbucket);
- conn_set_state(c, conn_pause);
+ // As long as the upstream is still open and we haven't
+ // retried too many times already.
+ //
+ int max_retries = (mcs_server_count(&d->mst) * 2);
- assert(uc->thread);
- assert(uc->thread->work_queue);
+ if (uc != NULL &&
+ uc->cmd_retries < max_retries) {
+ uc->cmd_retries++;
- // TODO: Add a stats counter here for this case.
- //
- // Using work_send() so the call stack unwinds back to libevent.
- //
- work_send(uc->thread->work_queue, upstream_retry, uc->extra, uc);
+ conn_set_state(c, conn_pause);
- return;
+ assert(uc->thread);
+ assert(uc->thread->work_queue);
+
+ // TODO: Add a stats counter here for this case.
+ //
+ // Using work_send() so the call stack unwinds back to libevent.
+ //
+ work_send(uc->thread->work_queue, upstream_retry, uc->extra, uc);
+
+ return;
+ } else {
+ if (settings.verbose > 2) {
+ fprintf(stderr,
+ "%d: cproxy_process_a2b_downstream_response not-my-vbucket, "
+ "cmd: %x skipping retry %d >= %d\n",
+ c->sfd, header->response.opcode, uc->cmd_retries,
+ max_retries);
+ }
+ }
} else {
// TODO: Add a stats counter here for this case.
//
@@ -697,7 +715,6 @@ void a2b_process_downstream_response(conn *c) {
return;
}
- assert(d->multiget != NULL);
assert(uc->cmd_start != NULL);
assert(header->response.opaque != 0);
@@ -719,26 +736,37 @@ void a2b_process_downstream_response(conn *c) {
mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
vbucket);
- // Update the de-duplication map, removing the key, so that
- // we'll reattempt another request for the key during the
- // retry.
- //
- multiget_entry *entry = genhash_find(d->multiget, key_buf);
-
if (settings.verbose > 2) {
fprintf(stderr,
"<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
- "cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d\n",
+ "cmd: %x get/getk '%s' %d retry: %d, vbucket %d\n",
c->sfd, header->response.opcode, key_buf, key_len,
- d->upstream_retry + 1, entry != NULL, vbucket);
+ d->upstream_retry + 1, vbucket);
}
- genhash_delete(d->multiget, key_buf);
+ // Update the de-duplication map, removing the key, so that
+ // we'll reattempt another request for the key during the
+ // retry.
+ //
+ if (d->multiget != NULL) {
+ multiget_entry *entry = genhash_find(d->multiget, key_buf);
+
+ if (settings.verbose > 2) {
+ fprintf(stderr,
+ "<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
+ "cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d "
+ "deleting multiget entry\n",
+ c->sfd, header->response.opcode, key_buf, key_len,
+ d->upstream_retry + 1, entry != NULL, vbucket);
+ }
+
+ genhash_delete(d->multiget, key_buf);
- while (entry != NULL) {
- multiget_entry *curr = entry;
- entry = entry->next;
- free(curr);
+ while (entry != NULL) {
+ multiget_entry *curr = entry;
+ entry = entry->next;
+ free(curr);
+ }
}
// Signal that we need to retry, where this counter is

0 comments on commit 18cb892

Please sign in to comment.