Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Almost complete rewrite of load balancing logic

There are now almost no dependencies on the round_robin module,
mostly during setup stage.

Conflicts:

	src/http/modules/ngx_http_upstream_fair_module.c
  • Loading branch information...
commit bd35d8802b764b139ddeb532f0e675a15e1ba26f 1 parent 1e2b7a3
@gnosek authored
Showing with 211 additions and 131 deletions.
  1. +211 −131 ngx_http_upstream_fair_module.c
View
342 ngx_http_upstream_fair_module.c
@@ -34,12 +34,18 @@ typedef struct {
ngx_shm_zone_t *shm_zone;
ngx_http_upstream_fair_shared_t *shared;
ngx_http_upstream_rr_peers_t *rrp;
+ ngx_uint_t current;
} ngx_http_upstream_fair_peers_t;
+#define NGX_PEER_INVALID (~0UL)
+
+
typedef struct {
ngx_http_upstream_rr_peer_data_t rrpd;
ngx_http_upstream_fair_shared_t *shared;
+ ngx_http_upstream_fair_peers_t *peer_data;
+ ngx_uint_t current;
} ngx_http_upstream_fair_peer_data_t;
@@ -151,6 +157,7 @@ ngx_http_upstream_init_fair(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
}
peers->shm_zone->init = ngx_http_upstream_fair_init_shm_zone;
peers->shared = NULL;
+ peers->current = n - 1;
us->peer.init = ngx_http_upstream_init_fair_peer;
@@ -159,17 +166,19 @@ ngx_http_upstream_init_fair(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
static void
-ngx_http_upstream_fair_update_nreq(ngx_http_upstream_fair_peer_data_t *fp, int delta)
+ngx_http_upstream_fair_update_nreq(ngx_http_upstream_fair_peer_data_t *fp, int delta, ngx_log_t *log)
{
ngx_http_upstream_fair_shared_t *fs;
ngx_uint_t slot;
- fs = &fp->shared[fp->rrpd.current];
+ fs = &fp->shared[fp->current];
ngx_atomic_fetch_add(&fs->nreq, delta);
slot = ngx_atomic_fetch_add(&fs->slot, 1) % FS_TIME_SLOTS;
fs->last_active[slot] = ngx_current_msec;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, log, 0, "[upstream_fair] nreq for peer %ui now %d", fp->current, fs->nreq);
}
@@ -181,32 +190,10 @@ ngx_http_upstream_fair_update_nreq(ngx_http_upstream_fair_peer_data_t *fp, int d
*/
#define FS_TIME_SCALE_OFFSET 1000
-static ngx_inline
-ngx_int_t ngx_http_upstream_fair_peer_is_down(
- ngx_http_upstream_rr_peer_t *peer, time_t now)
-{
- if (peer->down) {
- return 1;
- }
-
- if (peer->max_fails == 0
- || peer->fails < peer->max_fails)
- {
- return 0;
- }
-
- if (now - peer->accessed > peer->fail_timeout) {
- peer->fails = 0;
- return 0;
- }
-
- return 1;
-}
-
static ngx_int_t
ngx_http_upstream_fair_sched_score(ngx_peer_connection_t *pc,
ngx_http_upstream_fair_shared_t *fs,
- ngx_http_upstream_rr_peer_t *peer)
+ ngx_http_upstream_rr_peer_t *peer, ngx_uint_t n)
{
ngx_int_t slot;
ngx_msec_t last_active_delta;
@@ -214,20 +201,28 @@ ngx_http_upstream_fair_sched_score(ngx_peer_connection_t *pc,
slot = (fs->slot - 1) % FS_TIME_SLOTS;
last_active_delta = ngx_current_msec - fs->last_active[slot];
if ((ngx_int_t) last_active_delta < 0) {
- ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[rr ] Clock skew of at least %i msec detected", -(ngx_int_t) last_active_delta);
+ ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[upstream_fair] Clock skew of at least %i msec detected", -(ngx_int_t) last_active_delta);
/* a pretty arbitrary value */
last_active_delta = FS_TIME_SCALE_OFFSET;
}
+ /* sanity check */
+ if (fs->nreq > INT_MAX) {
+ ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[upstream_fair] upstream %ui has negative nreq (%i)", n, fs->nreq);
+ return -FS_TIME_SCALE_OFFSET;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] nreq = %i, last_active_delta = %ui", fs->nreq, last_active_delta);
+
/*
* should be pretty unlikely to process a request for many days and still not time out,
* or to become swamped with requests this heavily; still, we shouldn't drop this backend
* completely as it wouldn't ever get a chance to recover
*/
- if (fs->nreq > 1 && last_active_delta > 0 && (INT_MAX / ( last_active_delta + FS_TIME_SCALE_OFFSET )) < fs->nreq - 1) {
- ngx_log_error(NGX_LOG_WARN, pc->log, 0, "Upstream %V has been active for %lu seconds",
- &peer->name, last_active_delta / 1000);
+ if (fs->nreq > 1 && last_active_delta > 0 && (INT_MAX / ( last_active_delta + FS_TIME_SCALE_OFFSET )) < (fs->nreq - 1)) {
+ ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[upstream_fair] upstream %ui has been active for %ul seconds",
+ n, last_active_delta / 1000);
/*
* schedule behind "sane" backends with the same number of requests pending
@@ -240,129 +235,180 @@ ngx_http_upstream_fair_sched_score(ngx_peer_connection_t *pc,
}
/*
- * the two methods below are the core of load balancing logic
+ * the core of load balancing logic
*/
-ngx_int_t
-ngx_http_upstream_get_fair_peer(ngx_peer_connection_t *pc, void *data)
+static ngx_int_t
+ngx_http_upstream_fair_try_peer(ngx_peer_connection_t *pc,
+ ngx_http_upstream_rr_peer_data_t *rrp,
+ ngx_uint_t peer_id,
+ time_t now)
{
- ngx_int_t ret;
- ngx_http_upstream_fair_peer_data_t *fp = data;
- ngx_http_upstream_fair_shared_t *fs;
- ngx_http_upstream_fair_shared_t fsc;
+ ngx_uint_t n, m;
+ ngx_http_upstream_rr_peer_t *peer;
- ngx_int_t first_sched_score = INT_MIN;
- ngx_int_t sched_score = INT_MIN;
- ngx_int_t prev_sched_score = INT_MIN;
- time_t now;
- ngx_uint_t i;
- ngx_uint_t first_peer;
- ngx_uint_t next_peer;
+ n = peer_id / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << peer_id % (8 * sizeof(uintptr_t));
- /*
- * ngx_http_upstream_rr_peer_data_t is the first member,
- * so just passing data is safe
- */
+ if (rrp->tried[n] & m)
+ return NGX_BUSY;
+
+ peer = &rrp->peers->peer[peer_id];
+
+ if (!peer->down) {
+ if (peer->max_fails == 0 || peer->fails < peer->max_fails) {
+ return NGX_OK;
+ }
- ret = ngx_http_upstream_get_round_robin_peer(pc, data);
- if (ret != NGX_OK) {
- return ret;
+ if (now - peer->accessed > peer->fail_timeout) {
+ peer->fails = 0;
+ return NGX_OK;
+ }
}
- now = ngx_time();
+ rrp->tried[n] |= m;
+ if (pc)
+ pc->tries--;
+ return NGX_BUSY;
+}
- first_peer = fp->rrpd.current;
- for (i = 0; i < fp->rrpd.peers->number; i++) {
- fs = &fp->shared[fp->rrpd.current];
+static ngx_int_t
+ngx_http_upstream_choose_fair_peer(ngx_peer_connection_t *pc,
+ ngx_http_upstream_fair_peer_data_t *fp, ngx_uint_t *peer_id)
+{
+ ngx_uint_t i, n;
+ ngx_uint_t npeers, total_npeers;
+ ngx_http_upstream_fair_shared_t fsc;
+ time_t now;
+ ngx_int_t prev_sched_score, sched_score = 0;
- /* the fast path -- current backend is idle */
- if (ngx_atomic_fetch_add(&fs->nreq, 0) == 0)
- break;
+ total_npeers = npeers = fp->rrpd.peers->number;
- /* keep a local copy -- TODO: proper locking */
- fsc = *fs;
+ /* just a single backend */
+ if (npeers == 1) {
+ *peer_id = 0;
+ return NGX_OK;
+ }
- /* calculate scheduler score for currently selected backend */
- sched_score = ngx_http_upstream_fair_sched_score(pc, &fsc,
- &fp->rrpd.peers->peer[fp->rrpd.current]);
+ now = ngx_time();
- /* keep the first score for logging purposes */
- if (!i) {
- first_sched_score = sched_score;
- }
+ /* any idle backends? */
+ for (i = 0, n = fp->current; i < npeers; i++, n = (n + 1) % total_npeers) {
+ if (ngx_atomic_fetch_add(&fp->shared[n].nreq, 0) == 0 &&
+ ngx_http_upstream_fair_try_peer(pc, &fp->rrpd, n, now) == NGX_OK) {
- /*
- * the new score isn't any better than the old one, roll back to the
- * previous backend
- *
- * we roll back also when the score is equal to keep close to the round
- * robin model where it makes sense
- */
- if (sched_score <= prev_sched_score) {
- ngx_log_debug3(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] i=%i, %i <= %i, rolling back", i, sched_score, prev_sched_score);
-
- /* the last change was bad, roll it back */
- sched_score = prev_sched_score;
- if (fp->rrpd.current == 0) {
- fp->rrpd.current = fp->rrpd.peers->number - 1;
- } else {
- fp->rrpd.current--;
- }
- /* don't look any further for two reasons:
- * 1. as we're "almost" round robin, it's unlikely that there are much
- * better backends futher down the list
- * 2. we don't want the cache lines to bounce too much (they're already
- * quite hot)
- */
- break;
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] peer %i is idle", n);
+ *peer_id = n;
+ return NGX_OK;
}
+ }
- /* check the next peer */
- next_peer = fp->rrpd.current + 1;
- if (next_peer >= fp->rrpd.peers->number) {
- next_peer = 0;
- }
- fp->rrpd.current = next_peer;
- prev_sched_score = sched_score;
+ /* no idle backends, choose the least loaded one */
- if (ngx_http_upstream_fair_peer_is_down(&fp->rrpd.peers->peer[fp->rrpd.current], now)) {
- /* should probably use fancier logic, but for now it'll do */
- ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] next peer %ui is down", next_peer);
+ /* skip the nearest failed backends */
+ n = fp->current;
+ while (npeers && pc->tries) {
+ if (ngx_http_upstream_fair_try_peer(pc, &fp->rrpd, n, now) == NGX_OK) {
break;
}
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] backend %d is down, npeers = %d", n, npeers - 1);
+ n = (n + 1) % total_npeers;
+ npeers--;
}
- /*
- * tell the upstream core to use the new backend
- * and advance the "current peer" for the next run of round-robin
+ /* all backends down or failed? */
+ if (!npeers || !pc->tries) {
+ return NGX_BUSY;
+ }
+
+ /* calc our current sched score */
+ fsc = fp->shared[n];
+ prev_sched_score = ngx_http_upstream_fair_sched_score(pc,
+ &fsc, &fp->rrpd.peers->peer[n], n);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] pss = %i (n = %d)", prev_sched_score, n);
+
+ *peer_id = n;
+
+ n = (n + 1) % total_npeers;
+
+ /* calc sched scores for all the peers, until it no longer
+ * increases, or we wrap around to the beginning
*/
- if (first_peer != fp->rrpd.current) {
- ngx_http_upstream_rr_peer_t *orig_peer = &fp->rrpd.peers->peer[first_peer];
- ngx_http_upstream_rr_peer_t *peer = &fp->rrpd.peers->peer[fp->rrpd.current];
+ for (i = 0; i < npeers; i++, n = (n + 1) % total_npeers) {
+ ngx_http_upstream_rr_peer_t *peer;
- /* undo ajdustments made to the original peer by rr algorithm */
- if (orig_peer->current_weight == orig_peer->weight) {
- orig_peer->current_weight = 1;
- } else {
- orig_peer->current_weight++;
+ if (ngx_http_upstream_fair_try_peer(pc, &fp->rrpd, n, now) != NGX_OK) {
+ if (!pc->tries) {
+ ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] all backends exhausted");
+ return NGX_BUSY;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] backend %d is dead", n);
+ continue;
}
- /* adjust the new peer accordingly */
- if (peer->current_weight == 1) {
+ peer = &fp->rrpd.peers->peer[n];
+
+ if (peer->current_weight-- == 0) {
peer->current_weight = peer->weight;
- } else {
- peer->current_weight--;
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] peer %d expired weight, reset to %d", n, peer->weight);
+ continue;
+ }
+
+ fsc = fp->shared[n];
+ if (i) {
+ prev_sched_score = sched_score;
}
+ sched_score = ngx_http_upstream_fair_sched_score(pc, &fsc, peer, n);
+ ngx_log_debug3(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] pss = %i, ss = %i (n = %d)", prev_sched_score, sched_score, n);
- pc->sockaddr = peer->sockaddr;
- pc->socklen = peer->socklen;
- pc->name = &peer->name;
+ if (sched_score <= prev_sched_score)
+ return NGX_OK;
- ngx_log_debug5(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] sched_score [%ui] %i -> [%ui] %i in %ui iterations",
- first_peer, first_sched_score, fp->rrpd.current, sched_score, i);
+ *peer_id = n;
}
- ngx_http_upstream_fair_update_nreq(data, 1);
+ return NGX_OK;
+}
+
+ngx_int_t
+ngx_http_upstream_get_fair_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_int_t ret;
+ ngx_uint_t peer_id, i;
+ ngx_http_upstream_fair_peer_data_t *fp = data;
+ ngx_http_upstream_rr_peer_t *peer;
+
+ peer_id = fp->current;
+ fp->current = (fp->current + 1) % fp->rrpd.peers->number;
+
+ ret = ngx_http_upstream_choose_fair_peer(pc, fp, &peer_id);
+ ngx_log_debug(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] fp->current = %d, peer_id = %d, ret = %d",
+ fp->current, peer_id, ret);
+
+ if (ret == NGX_BUSY) {
+ for (i = 0; i < fp->rrpd.peers->number; i++) {
+ fp->rrpd.peers->peer[i].fails = 0;
+ }
+
+ pc->name = fp->rrpd.peers->name;
+ fp->current = NGX_PEER_INVALID;
+ if (pc->tries > 0) {
+ pc->tries--;
+ }
+ return NGX_BUSY;
+ }
+
+ /* assert(ret == NGX_OK); */
+ peer = &fp->rrpd.peers->peer[peer_id];
+ fp->current = peer_id;
+ fp->peer_data->current = peer_id;
+ pc->sockaddr = peer->sockaddr;
+ pc->socklen = peer->socklen;
+ pc->name = &peer->name;
+
+ ngx_http_upstream_fair_update_nreq(data, 1, pc->log);
return ret;
}
@@ -371,8 +417,41 @@ void
ngx_http_upstream_free_fair_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
- ngx_http_upstream_fair_update_nreq(data, -1);
- ngx_http_upstream_free_round_robin_peer(pc, data, state);
+ ngx_http_upstream_fair_peer_data_t *fp = data;
+ ngx_http_upstream_rr_peer_t *peer;
+ ngx_uint_t weight_delta;
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] fp->current = %d, state = %ui, pc->tries = %d, pc->data = %p",
+ fp->current, state, pc->tries, pc->data);
+
+ if (fp->current == NGX_PEER_INVALID) {
+ return;
+ }
+
+ ngx_http_upstream_fair_update_nreq(data, -1, pc->log);
+
+ if (state == 0 && pc->tries == 0) {
+ return;
+ }
+
+ if (fp->rrpd.peers->number == 1) {
+ pc->tries = 0;
+ }
+
+ if (state & NGX_PEER_FAILED) {
+ peer = &fp->rrpd.peers->peer[fp->current];
+
+ peer->fails++;
+ peer->accessed = ngx_time();
+
+ weight_delta = peer->weight / peer->max_fails;
+
+ if (peer->current_weight < weight_delta) {
+ peer->current_weight = 0;
+ } else {
+ peer->current_weight -= weight_delta;
+ }
+ }
}
@@ -410,20 +489,21 @@ ngx_http_upstream_init_fair_peer(ngx_http_request_t *r,
if (!usfp->shared) {
ngx_uint_t i;
- time_t now = ngx_time();
usfp->shared = ngx_slab_alloc(shpool, usfp->rrp->number * sizeof(ngx_http_upstream_fair_shared_t));
for (i = 0; i < usfp->rrp->number; i++) {
usfp->shared[i].nreq = 0;
- usfp->shared[i].slot = 0;
- usfp->shared[i].last_active[0] = now;
+ usfp->shared[i].slot = 1;
+ usfp->shared[i].last_active[0] = ngx_current_msec;
}
}
- if (usfp->shared) {
- fp->shared = usfp->shared;
- r->upstream->peer.get = ngx_http_upstream_get_fair_peer;
- r->upstream->peer.free = ngx_http_upstream_free_fair_peer;
- } /* else just fallback on round-robin behaviour */
+ fp->shared = usfp->shared;
+ fp->peer_data = usfp;
+ fp->current = usfp->current;
+ r->upstream->peer.get = ngx_http_upstream_get_fair_peer;
+ r->upstream->peer.free = ngx_http_upstream_free_fair_peer;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "[upstream_fair] peer->tries = %d", r->upstream->peer.tries);
/* keep the rest of configuration from rr, including e.g. SSL sessions */
Please sign in to comment.
Something went wrong with that request. Please try again.