From a7a8b625c6d79d203702709983b736137be2a9bd Mon Sep 17 00:00:00 2001 From: doujiang Date: Tue, 28 Sep 2021 21:55:45 +0800 Subject: [PATCH 1/3] improvement: should throw Lua runtime error instead of let process crash when no memory. (#42) Co-authored-by: doujiang24 --- chash.c | 32 ++++++++++++++++++++++++----- chash.h | 6 +++--- lib/resty/chash.lua | 44 ++++++++++++++++++++++++++++------------ lib/resty/roundrobin.lua | 1 + 4 files changed, 62 insertions(+), 21 deletions(-) diff --git a/chash.c b/chash.c index 5c3148b..b8c1fec 100644 --- a/chash.c +++ b/chash.c @@ -10,6 +10,8 @@ #define u_char unsigned char #endif +#define CHASH_OK 0 +#define CHASH_ERR -1 #define crc32_final(crc) \ crc ^= 0xffffffff @@ -144,7 +146,7 @@ chash_point_init(chash_point_t *arr, uint32_t base_hash, uint32_t start, } -void +int chash_point_sort(chash_point_t arr[], uint32_t n) { chash_point_t *points; @@ -163,6 +165,9 @@ chash_point_sort(chash_point_t arr[], uint32_t n) step = pow(2, 32) / m; points = (chash_point_t *) calloc(m, sizeof(chash_point_t)); + if (points == NULL) { + return CHASH_ERR; + } for (i = 0; i < n; i++) { node = &arr[i]; @@ -246,10 +251,12 @@ chash_point_sort(chash_point_t arr[], uint32_t n) } free(points); + + return CHASH_OK; } -void +int chash_point_add(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id, chash_point_t *new_points) @@ -258,9 +265,16 @@ chash_point_add(chash_point_t *old_points, uint32_t old_length, chash_point_t *tmp_points; tmp_points = (chash_point_t *) calloc(num, sizeof(chash_point_t)); + if (tmp_points == NULL) { + return CHASH_ERR; + } chash_point_init_crc(tmp_points, 0, base_hash, from, num, id); - chash_point_sort(tmp_points, num); + + if (chash_point_sort(tmp_points, num) != CHASH_OK) { + free(tmp_points); + return CHASH_ERR; + } j = num - 1; k = old_length + num - 1; @@ -283,10 +297,12 @@ chash_point_add(chash_point_t *old_points, uint32_t old_length, } free(tmp_points); + + return CHASH_OK; } -void +int chash_point_reduce(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id) { @@ -296,7 +312,11 @@ chash_point_reduce(chash_point_t *old_points, uint32_t old_length, tmp_points = (chash_point_t *) calloc(num, sizeof(chash_point_t)); chash_point_init_crc(tmp_points, 0, base_hash, from, num, id); - chash_point_sort(tmp_points, num); + + if (chash_point_sort(tmp_points, num) != CHASH_OK) { + free(tmp_points); + return CHASH_ERR; + } for (i = 0, j = 0, k = 0; i < old_length; i++) { if (j < num @@ -315,6 +335,8 @@ chash_point_reduce(chash_point_t *old_points, uint32_t old_length, } free(tmp_points); + + return CHASH_OK; } diff --git a/chash.h b/chash.h index b7f864c..8fb97b0 100644 --- a/chash.h +++ b/chash.h @@ -39,12 +39,12 @@ typedef struct { */ void chash_point_init(chash_point_t *points, uint32_t base_hash, uint32_t start, uint32_t num, uint32_t id) LCH_EXPORT; -void chash_point_sort(chash_point_t *points, uint32_t npoints) LCH_EXPORT; +int chash_point_sort(chash_point_t *points, uint32_t npoints) LCH_EXPORT; -void chash_point_add(chash_point_t *old_points, uint32_t old_length, +int chash_point_add(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id, chash_point_t *new_points) LCH_EXPORT; -void chash_point_reduce(chash_point_t *old_points, uint32_t old_length, +int chash_point_reduce(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id) LCH_EXPORT; void chash_point_delete(chash_point_t *old_points, uint32_t old_length, uint32_t id) LCH_EXPORT; diff --git a/lib/resty/chash.lua b/lib/resty/chash.lua index 90e73bc..930ebad 100644 --- a/lib/resty/chash.lua +++ b/lib/resty/chash.lua @@ -19,6 +19,10 @@ local pairs = pairs local tostring = tostring local tonumber = tonumber local bxor = bit.bxor +local error = error + + +local CHASH_OK = 0 ffi.cdef[[ @@ -31,12 +35,12 @@ typedef struct { void chash_point_init(chash_point_t *points, uint32_t base_hash, uint32_t start, uint32_t num, uint32_t id); -void chash_point_sort(chash_point_t *points, uint32_t size); +int chash_point_sort(chash_point_t *points, uint32_t size); -void chash_point_add(chash_point_t *old_points, uint32_t old_length, +int chash_point_add(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id, chash_point_t *new_points); -void chash_point_reduce(chash_point_t *old_points, uint32_t old_length, +int chash_point_reduce(chash_point_t *old_points, uint32_t old_length, uint32_t base_hash, uint32_t from, uint32_t num, uint32_t id); void chash_point_delete(chash_point_t *old_points, uint32_t old_length, uint32_t id); @@ -117,7 +121,9 @@ local function _precompute(nodes) start = start + num end - clib.chash_point_sort(points, npoints) + if clib.chash_point_sort(points, npoints) ~= CHASH_OK then + error("no memory") + end return ids, points, npoints, newnodes end @@ -194,14 +200,21 @@ local function _incr(self, id, weight) local new_npoints = self.npoints + weight * CONSISTENT_POINTS if self.size < new_npoints then new_points = ffi_new(chash_point_t, new_npoints) - self.size = new_npoints end local base_hash = bxor(crc32(tostring(id)), 0xffffffff) - clib.chash_point_add(self.points, self.npoints, base_hash, - old_weight * CONSISTENT_POINTS, - weight * CONSISTENT_POINTS, - index, new_points) + local rc = clib.chash_point_add(self.points, self.npoints, base_hash, + old_weight * CONSISTENT_POINTS, + weight * CONSISTENT_POINTS, + index, new_points) + + if rc ~= CHASH_OK then + error("no memory") + end + + if self.size < new_npoints then + self.size = new_npoints + end self.points = new_points self.npoints = new_npoints @@ -230,10 +243,15 @@ local function _decr(self, id, weight) end local base_hash = bxor(crc32(tostring(id)), 0xffffffff) - clib.chash_point_reduce(self.points, self.npoints, base_hash, - (old_weight - weight) * CONSISTENT_POINTS, - CONSISTENT_POINTS * weight, - index) + local from = (old_weight - weight) * CONSISTENT_POINTS + local num = CONSISTENT_POINTS * weight + + local rc = clib.chash_point_reduce(self.points, self.npoints, base_hash, + from, num, index) + + if rc ~= CHASH_OK then + error("no memory") + end nodes[id] = old_weight - weight self.npoints = self.npoints - CONSISTENT_POINTS * weight diff --git a/lib/resty/roundrobin.lua b/lib/resty/roundrobin.lua index 97743b5..be727d0 100644 --- a/lib/resty/roundrobin.lua +++ b/lib/resty/roundrobin.lua @@ -4,6 +4,7 @@ local next = next local tonumber = tonumber local setmetatable = setmetatable local math_random = math.random +local error = error local utils = require "resty.balancer.utils" From 92b38d2ed650b1be53d0f59ba83822461dcaf8d5 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Sun, 16 Oct 2022 07:24:46 +0000 Subject: [PATCH 2/3] feature: add nginx swrr support. --- lib/resty/swrr.lua | 145 +++++++++++++++++++++++++++++++ t/swrr.t | 211 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 lib/resty/swrr.lua create mode 100644 t/swrr.t diff --git a/lib/resty/swrr.lua b/lib/resty/swrr.lua new file mode 100644 index 0000000..7f281c1 --- /dev/null +++ b/lib/resty/swrr.lua @@ -0,0 +1,145 @@ +local pairs = pairs +local next = next +local tonumber = tonumber +local setmetatable = setmetatable +local math_random = math.random +local error = error + +local utils = require "resty.balancer.utils" + +local copy = utils.copy +local nkeys = utils.nkeys +local new_tab = utils.new_tab + +local _M = {} +local mt = { __index = _M } + + +local function new_current_weights(nodes) + local current_weights = new_tab(0, nkeys(nodes)) + for id, _ in pairs(nodes) do + current_weights[id] = 0 + end + return current_weights +end + + +local function random_start(self) + local count = nkeys(self.nodes) + local random_times = math_random(count) + + for _ = 1, random_times do + self:next() + end +end + + +function _M.new(_, nodes) + local newnodes = copy(nodes) + local current_weights = new_current_weights(nodes) + + local self = { + nodes = newnodes, -- it's safer to copy one + current_weights = current_weights, + } + self = setmetatable(self, mt) + random_start(self) + return self +end + + +function _M.reinit(self, nodes) + local newnodes = copy(nodes) + local current_weights = new_current_weights(newnodes) + + self.nodes = newnodes + self.current_weights = current_weights + random_start(self) +end + + +local function _delete(self, id) + local nodes = self.nodes + local current_weights = self.current_weights + + nodes[id] = nil + current_weights[id] = nil +end +_M.delete = _delete + + +local function _decr(self, id, weight) + local weight = tonumber(weight) or 1 + local nodes = self.nodes + + local old_weight = nodes[id] + + if not old_weight then + return + end + + if old_weight <= weight then + return _delete(self, id) + end + + nodes[id] = old_weight - weight +end +_M.decr = _decr + + +local function _incr(self, id, weight) + local weight = tonumber(weight) or 1 + local nodes = self.nodes + + nodes[id] = (nodes[id] or 0) + weight +end +_M.incr = _incr + + +function _M.set(self, id, new_weight) + local new_weight = tonumber(new_weight) or 0 + local old_weight = self.nodes[id] or 0 + + if old_weight == new_weight then + return + end + + if old_weight < new_weight then + return _incr(self, id, new_weight - old_weight) + end + + return _decr(self, id, old_weight - new_weight) +end + + +local function find(self) + local nodes = self.nodes + local current_weights = self.current_weights + + local best_id = nil + local best_current_weight = 0 + local total = 0 + + for id, weight in pairs(nodes) do + local current_weight = current_weights[id] + total = total + weight + current_weight = current_weight + weight + current_weights[id] = current_weight + + if best_id == nil or best_current_weight < current_weight then + best_id = id + best_current_weight = current_weight + end + end + + if best_id ~= nil then + current_weights[best_id] = best_current_weight - total + end + + return best_id +end +_M.find = find +_M.next = find + + +return _M diff --git a/t/swrr.t b/t/swrr.t new file mode 100644 index 0000000..7f07330 --- /dev/null +++ b/t/swrr.t @@ -0,0 +1,211 @@ +# vim:set ft= ts=4 sw=4 et: + +use Test::Nginx::Socket::Lua; +use Cwd qw(cwd); + +repeat_each(2); + +plan tests => repeat_each() * (3 * blocks()); + +my $pwd = cwd(); + +$ENV{TEST_NGINX_CWD} = $pwd; + +our $HttpConfig = qq{ + lua_package_path "$pwd/lib/?.lua;;"; + lua_package_cpath "$pwd/?.so;;"; +}; + +no_long_string(); +#no_diff(); + +run_tests(); + +__DATA__ + +=== TEST 1: sanity +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + math.randomseed(75098) + + local swrr = require "resty.swrr" + + local servers = { + ["server1"] = 8, + ["server2"] = 4, + ["server3"] = 2, + } + + local rr = swrr:new(servers) + + for i = 1, 14 do + local id = rr:find() + if type(id) ~= "string" or not servers[id] then + return ngx.say("fail") + end + end + + ngx.say("success") + } + } +--- request +GET /t +--- response_body +success +--- no_error_log +[error] + + + +=== TEST 2: find count +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + math.randomseed(75098) + + local swrr = require "resty.swrr" + + local servers = { + ["server1"] = 6, + ["server2"] = 3, + ["server3"] = 1, + } + + local rr = swrr:new(servers) + + local res = {} + for i = 1, 100 * 1000 do + local id = rr:find() + + if res[id] then + res[id] = res[id] + 1 + else + res[id] = 1 + end + end + + local keys = {} + for id, num in pairs(res) do + keys[#keys + 1] = id + end + + if #keys ~= 3 then + ngx.exit(400) + end + + ngx.say("server1: ", res['server1']) + ngx.say("server2: ", res['server2']) + ngx.say("server3: ", res['server3']) + } + } +--- request +GET /t +--- response_body +server1: 60000 +server2: 30000 +server3: 10000 +--- no_error_log +[error] + + + +=== TEST 3: random start +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + math.randomseed(9975098) + + local swrr = require "resty.swrr" + + local servers = { + ["server1"] = 1, + ["server2"] = 1, + ["server3"] = 1, + ["server4"] = 1, + } + + local rr = swrr:new(servers, true) + local id = rr:find() + + local rr2 = swrr:new(servers, true) + local id2 = rr2:find() + ngx.log(ngx.INFO, "id: ", id, " id2: ", id2) + ngx.say(id == id2) + } + } +--- request +GET /t +--- response_body +false +--- no_error_log +[error] + + + +=== TEST 4: weight is "0" +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + math.randomseed(9975098) + + local swrr = require "resty.swrr" + + local servers = { + ["server1"] = "0", + ["server2"] = "1", + ["server3"] = "0", + ["server4"] = "0", + } + + local rr = swrr:new(servers, true) + local id = rr:find() + + ngx.say("id: ", id) + } + } +--- request +GET /t +--- response_body +id: server2 +--- no_error_log +[error] + + + +=== TEST 5: all weights are 0, behavior like weights are 1. +It's not recommends to use 0, this test just make sure it won't be worse, like crash. +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + math.randomseed(9975098) + + local swrr = require "resty.swrr" + + local servers = { + ["server1"] = 0, + ["server2"] = 0, + ["server3"] = 0, + ["server4"] = 0, + } + + local rr = swrr:new(servers, true) + + for i = 1, 4 do + local id = rr:find() + end + + ngx.say("ok") + } + } +--- request +GET /t +--- response_body +ok +--- no_error_log +[error] From 1cd4363c0a239afe4765ec607dcfbbb4e5900eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BA=AA=E5=8D=93=E5=BF=97?= Date: Wed, 24 May 2023 19:32:57 +0800 Subject: [PATCH 3/3] docs: add example for swrr. --- README.markdown | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/README.markdown b/README.markdown index ed2b180..e69c4b8 100644 --- a/README.markdown +++ b/README.markdown @@ -47,6 +47,7 @@ Synopsis init_by_lua_block { local resty_chash = require "resty.chash" local resty_roundrobin = require "resty.roundrobin" + local resty_swrr = require "resty.swrr" local server_list = { ["127.0.0.1:1985"] = 2, @@ -73,6 +74,9 @@ Synopsis local rr_up = resty_roundrobin:new(server_list) package.loaded.my_rr_up = rr_up + + local swrr_up = resty_swrr:new(server_list) + package.loaded.my_swrr_up = swrr_up } upstream backend_chash { @@ -105,6 +109,20 @@ Synopsis } } + upstream backend_swrr { + server 0.0.0.1; + balancer_by_lua_block { + local b = require "ngx.balancer" + + local swrr_up = package.loaded.my_swrr_up + + -- Note that SWRR picks the first server randomly + local server = swrr_up:find() + + assert(b.set_current_peer(server)) + } + } + server { location /chash { proxy_pass http://backend_chash; @@ -113,6 +131,10 @@ Synopsis location /roundrobin { proxy_pass http://backend_rr; } + + location /swrr { + proxy_pass http://backend_swrr; + } } ``` @@ -121,7 +143,7 @@ Synopsis Methods ======= -Both `resty.chash` and `resty.roundrobin` have the same apis. +Both `resty.chash`, `resty.roundrobin` and `resty.swrr` have the same apis. [Back to TOC](#table-of-contents)