0
+--- src/http/modules/ngx_http_upstream_fair_module.c.orig 2008-04-30 00:08:41.842749471 +0000
0
++++ src/http/modules/ngx_http_upstream_fair_module.c
0
++ * Copyright (C) 2007 Grzegorz Nosek
0
++ * Work sponsored by Ezra Zygmuntowicz & EngineYard.com
0
++ * Based on nginx source (C) Igor Sysoev
0
++#include <ngx_config.h>
0
++ ngx_atomic_t last_active;
0
++} ngx_http_upstream_fair_shared_t;
0
++ ngx_rbtree_node_t node;
0
++ void *peers; /* forms a unique cookie together with cycle */
0
++ ngx_int_t refcount; /* accessed only under shmtx_lock */
0
++ ngx_http_upstream_fair_shared_t stats[1];
0
++} ngx_http_upstream_fair_shm_block_t;
0
++ ngx_http_upstream_fair_shm_block_t *shared;
0
++ ngx_http_upstream_rr_peers_t *rrp;
0
++ ngx_uint_t size_err:1;
0
++} ngx_http_upstream_fair_peers_t;
0
++#define NGX_PEER_INVALID (~0UL)
0
++ ngx_http_upstream_fair_shared_t *shared;
0
++ ngx_http_upstream_rr_peers_t *rrp;
0
++ ngx_http_upstream_fair_peers_t *peers;
0
++} ngx_http_upstream_fair_peer_data_t;
0
++static ngx_int_t ngx_http_upstream_fair_init_module(ngx_cycle_t *cycle);
0
++static ngx_int_t ngx_http_upstream_init_fair(ngx_conf_t *cf,
0
++ ngx_http_upstream_srv_conf_t *us);
0
++static ngx_int_t ngx_http_upstream_get_fair_peer(ngx_peer_connection_t *pc,
0
++static void ngx_http_upstream_free_fair_peer(ngx_peer_connection_t *pc,
0
++ void *data, ngx_uint_t state);
0
++static ngx_int_t ngx_http_upstream_init_fair_peer(ngx_http_request_t *r,
0
++ ngx_http_upstream_srv_conf_t *us);
0
++static char *ngx_http_upstream_fair(ngx_conf_t *cf, ngx_command_t *cmd,
0
++static char *ngx_http_upstream_fair_set_shm_size(ngx_conf_t *cf,
0
++ ngx_command_t *cmd, void *conf);
0
++static ngx_int_t ngx_http_upstream_fair_set_session(ngx_peer_connection_t *pc,
0
++static void ngx_http_upstream_fair_save_session(ngx_peer_connection_t *pc,
0
++static ngx_command_t ngx_http_upstream_fair_commands[] = {
0
++ { ngx_string("fair"),
0
++ NGX_HTTP_UPS_CONF|NGX_CONF_NOARGS,
0
++ ngx_http_upstream_fair,
0
++ { ngx_string("upstream_fair_shm_size"),
0
++ NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
0
++ ngx_http_upstream_fair_set_shm_size,
0
++static ngx_http_module_t ngx_http_upstream_fair_module_ctx = {
0
++ NULL, /* preconfiguration */
0
++ NULL, /* postconfiguration */
0
++ NULL, /* create main configuration */
0
++ NULL, /* init main configuration */
0
++ NULL, /* create server configuration */
0
++ NULL, /* merge server configuration */
0
++ NULL, /* create location configuration */
0
++ NULL /* merge location configuration */
0
++ngx_module_t ngx_http_upstream_fair_module = {
0
++ &ngx_http_upstream_fair_module_ctx, /* module context */
0
++ ngx_http_upstream_fair_commands, /* module directives */
0
++ NGX_HTTP_MODULE, /* module type */
0
++ NULL, /* init master */
0
++ NULL, /* init module */
0
++ NULL, /* init process */
0
++ NULL, /* init thread */
0
++ NULL, /* exit thread */
0
++ NULL, /* exit process */
0
++ NULL, /* exit master */
0
++ NGX_MODULE_V1_PADDING
0
++static ngx_uint_t ngx_http_upstream_fair_shm_size;
0
++static ngx_shm_zone_t * ngx_http_upstream_fair_shm_zone;
0
++static ngx_rbtree_t * ngx_http_upstream_fair_rbtree;
0
++ngx_http_upstream_fair_compare_rbtree_node(const ngx_rbtree_node_t *v_left,
0
++ const ngx_rbtree_node_t *v_right)
0
++ ngx_http_upstream_fair_shm_block_t *left, *right;
0
++ left = (ngx_http_upstream_fair_shm_block_t *) v_left;
0
++ right = (ngx_http_upstream_fair_shm_block_t *) v_right;
0
++ if (left->cycle < right->cycle) {
0
++ } else if (left->cycle > right->cycle) {
0
++ } else { /* left->cycle == right->cycle */
0
++ if (left->peers < right->peers) {
0
++ } else if (left->peers > right->peers) {
0
++ * generic functions start here
0
++ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp,
0
++ ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel,
0
++ int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right))
0
++ if (node->key < temp->key) {
0
++ if (temp->left == sentinel) {
0
++ } else if (node->key > temp->key) {
0
++ if (temp->right == sentinel) {
0
++ } else { /* node->key == temp->key */
0
++ if (compare(node, temp) < 0) {
0
++ if (temp->left == sentinel) {
0
++ if (temp->right == sentinel) {
0
++ node->left = sentinel;
0
++ node->right = sentinel;
0
++#define NGX_BITVECTOR_ELT_SIZE (sizeof(uintptr_t) * 8)
0
++ngx_bitvector_alloc(ngx_pool_t *pool, ngx_uint_t size, uintptr_t *small)
0
++ ngx_uint_t nelts = (size + NGX_BITVECTOR_ELT_SIZE - 1) / NGX_BITVECTOR_ELT_SIZE;
0
++ if (small && nelts == 1) {
0
++ return ngx_pcalloc(pool, nelts * NGX_BITVECTOR_ELT_SIZE);
0
++ngx_bitvector_test(uintptr_t *bv, ngx_uint_t bit)
0
++ n = bit / NGX_BITVECTOR_ELT_SIZE;
0
++ m = 1 << (bit % NGX_BITVECTOR_ELT_SIZE);
0
++ngx_bitvector_set(uintptr_t *bv, ngx_uint_t bit)
0
++ n = bit / NGX_BITVECTOR_ELT_SIZE;
0
++ m = 1 << (bit % NGX_BITVECTOR_ELT_SIZE);
0
++ * generic functions end here
0
++ngx_http_upstream_fair_rbtree_insert(ngx_rbtree_node_t *temp,
0
++ ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) {
0
++ ngx_rbtree_generic_insert(temp, node, sentinel,
0
++ ngx_http_upstream_fair_compare_rbtree_node);
0
++ngx_http_upstream_fair_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
0
++ ngx_slab_pool_t *shpool;
0
++ ngx_rbtree_node_t *sentinel;
0
++ shm_zone->data = data;
0
++ shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
0
++ tree = ngx_slab_alloc(shpool, sizeof *tree);
0
++ sentinel = ngx_slab_alloc(shpool, sizeof *sentinel);
0
++ if (sentinel == NULL) {
0
++ ngx_rbtree_sentinel_init(sentinel);
0
++ tree->root = sentinel;
0
++ tree->sentinel = sentinel;
0
++ tree->insert = ngx_http_upstream_fair_rbtree_insert;
0
++ shm_zone->data = tree;
0
++ ngx_http_upstream_fair_rbtree = tree;
0
++ngx_http_upstream_fair_set_shm_size(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
0
++ ssize_t new_shm_size;
0
++ value = cf->args->elts;
0
++ new_shm_size = ngx_parse_size(&value[1]);
0
++ if (new_shm_size == NGX_ERROR) {
0
++ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "Invalid memory area size `%V'", &value[1]);
0
++ return NGX_CONF_ERROR;
0
++ new_shm_size = ngx_align(new_shm_size, ngx_pagesize);
0
++ if (new_shm_size < 8 * (ssize_t) ngx_pagesize) {
0
++ ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The upstream_fair_shm_size value must be at least %udKiB", (8 * ngx_pagesize) >> 10);
0
++ new_shm_size = 8 * ngx_pagesize;
0
++ if (ngx_http_upstream_fair_shm_size &&
0
++ ngx_http_upstream_fair_shm_size != (ngx_uint_t) new_shm_size) {
0
++ ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Cannot change memory area size without restart, ignoring change");
0
++ ngx_http_upstream_fair_shm_size = new_shm_size;
0
++ ngx_conf_log_error(NGX_LOG_DEBUG, cf, 0, "Using %udKiB of shared memory for upstream_fair", new_shm_size >> 10);
0
++ngx_http_upstream_fair(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
0
++ ngx_http_upstream_srv_conf_t *uscf;
0
++ uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
0
++ uscf->peer.init_upstream = ngx_http_upstream_init_fair;
0
++ uscf->flags = NGX_HTTP_UPSTREAM_CREATE
0
++ |NGX_HTTP_UPSTREAM_WEIGHT
0
++ |NGX_HTTP_UPSTREAM_MAX_FAILS
0
++ |NGX_HTTP_UPSTREAM_FAIL_TIMEOUT
0
++ |NGX_HTTP_UPSTREAM_DOWN;
0
++ngx_http_upstream_init_fair(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
0
++ ngx_http_upstream_fair_peers_t *peers;
0
++ /* do the dirty work using rr module */
0
++ if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
0
++ /* setup our wrapper around rr */
0
++ peers = ngx_palloc(cf->pool, sizeof *peers);
0
++ peers->rrp = us->peer.data;
0
++ us->peer.data = peers;
0
++ n = peers->rrp->number;
0
++ shm_name = ngx_palloc(cf->pool, sizeof *shm_name);
0
++ shm_name->len = sizeof("upstream_fair");
0
++ shm_name->data = (unsigned char *) "upstream_fair";
0
++ if (ngx_http_upstream_fair_shm_size == 0) {
0
++ ngx_http_upstream_fair_shm_size = 8 * ngx_pagesize;
0
++ ngx_http_upstream_fair_shm_zone = ngx_shared_memory_add(
0
++ cf, shm_name, ngx_http_upstream_fair_shm_size, &ngx_http_upstream_fair_module);
0
++ if (ngx_http_upstream_fair_shm_zone == NULL) {
0
++ ngx_http_upstream_fair_shm_zone->init = ngx_http_upstream_fair_init_shm_zone;
0
++ peers->cycle = cf->cycle;
0
++ peers->shared = NULL;
0
++ peers->current = n - 1;
0
++ us->peer.init = ngx_http_upstream_init_fair_peer;
0
++ngx_http_upstream_fair_update_nreq(ngx_http_upstream_fair_peer_data_t *fp, int delta, ngx_log_t *log)
0
++ ngx_http_upstream_fair_shared_t *fs;
0
++ fs = &fp->shared[fp->current];
0
++ ngx_atomic_fetch_add(&fs->nreq, delta);
0
++ fs->last_active = ngx_current_msec;
0
++ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, log, 0, "[upstream_fair] nreq for peer %ui now %d", fp->current, fs->nreq);
0
++ * SCHED_TIME_BITS is the portion of an ngx_uint_t which represents the
0
++ * last_time_delta part (time since last activity in msec). The rest
0
++ * (top bits) represents the number of currently processed requests.
0
++ * The value is not too critical because overflow is handled via
0
++ * saturation. With the default value of 24, scheduling is exact for
0
++ * requests shorter than 16777 sec and for less than 256 requests per
0
++ * backend (on 32-bit architectures). Beyond these limits, the algorithm
0
++ * essentially falls back to pure weighted round-robin
0
++ * A higher score means less suitable -- this changed from previous
0
++ * The `delta' parameter is bit-negated so that high values yield low
0
++ * scores and get chosen more often.
0
++#define SCHED_TIME_BITS 24
0
++#define SCHED_NREQ_MAX ((~0UL) >> SCHED_TIME_BITS)
0
++#define SCHED_TIME_MAX ((1 << SCHED_TIME_BITS) - 1)
0
++#define SCHED_SCORE(nreq,delta) (((nreq) << SCHED_TIME_BITS) | (~(delta)))
0
++#define ngx_upstream_fair_min(a,b) (((a) < (b)) ? (a) : (b))
0
++ngx_http_upstream_fair_sched_score(ngx_peer_connection_t *pc,
0
++ ngx_http_upstream_fair_shared_t *fs,
0
++ ngx_http_upstream_rr_peer_t *peer, ngx_uint_t n)
0
++ ngx_msec_t last_active_delta;
0
++ last_active_delta = ngx_current_msec - fs->last_active;
0
++ if ((ngx_int_t) last_active_delta < 0) {
0
++ ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[upstream_fair] Clock skew of at least %i msec detected", -(ngx_int_t) last_active_delta);
0
++ /* a pretty arbitrary value */
0
++ last_active_delta = abs(last_active_delta);
0
++ if ((ngx_int_t)fs->nreq < 0) {
0
++ ngx_log_error(NGX_LOG_WARN, pc->log, 0, "[upstream_fair] upstream %ui has negative nreq (%i)", n, fs->nreq);
0
++ return SCHED_SCORE(0, last_active_delta);
0
++ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "[upstream_fair] nreq = %i, last_active_delta = %ui", fs->nreq, last_active_delta);
0
++ ngx_upstream_fair_min(fs->nreq, SCHED_NREQ_MAX),
0
++ ngx_upstream_fair_min(last_active_delta, SCHED_TIME_MAX));
0
++ * the core of load balancing logic
0
++ngx_http_upstream_fair_try_peer(ngx_peer_connection_t *pc,
0
++ ngx_http_upstream_fair_peer_data_t *fp,
0
++ ngx_http_upstream_rr_peer_t *peer;
0
++ if (ngx_bitvector_test(fp->tried, peer_id))
0
++ peer = &fp->rrp->peer[peer_id];
0
++ if (peer->max_fails == 0 || peer->fails < peer->max_fails) {
0
++ if (now - peer->accessed > peer->fail_timeout) {
0
++ ngx_bitvector_set(fp->tried, peer_id);