From 165ea9c3bd3eff522cd9c0fde859dc58d8ea1779 Mon Sep 17 00:00:00 2001 From: thefosk Date: Mon, 17 Oct 2016 14:55:47 -0700 Subject: [PATCH 1/3] refactor(events) using a regular DICT for async events --- kong/core/cluster.lua | 65 ++++++++++++++++++++--------------- kong/core/reports.lua | 45 +++++++++++++++--------- kong/templates/nginx_kong.lua | 3 -- kong/tools/database_cache.lua | 4 +++ 4 files changed, 70 insertions(+), 47 deletions(-) diff --git a/kong/core/cluster.lua b/kong/core/cluster.lua index 51c73fbc61ed..36a468ce8854 100644 --- a/kong/core/cluster.lua +++ b/kong/core/cluster.lua @@ -1,21 +1,36 @@ local cache = require "kong.tools.database_cache" local singletons = require "kong.singletons" -local ngx_log = ngx.log - -local resty_lock -local status, res = pcall(require, "resty.lock") -if status then - resty_lock = res -end local KEEPALIVE_INTERVAL = 30 +local KEEPALIVE_KEY = "events:keepalive" local ASYNC_AUTOJOIN_INTERVAL = 3 local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20) +local ASYNC_AUTOJOIN_KEY = "events:autojoin" + +local function log_error(...) + ngx.log(ngx.WARN, "[cluster] ", ...) +end + +local function log_debug(...) + ngx.log(ngx.DEBUG, "[cluster] ", ...) +end + +local function get_lock(key, interval) + -- the lock is held for the whole interval to prevent multiple + -- worker processes from sending the test request simultaneously. + -- here we substract the lock expiration time by 1ms to prevent + -- a race condition with the next timer event. + local ok, err = cache.rawadd(key, true, interval - 0.001) + if not ok then + return nil, err + end + return true +end local function create_timer(at, cb) local ok, err = ngx.timer.at(at, cb) if not ok then - ngx_log(ngx.ERR, "[cluster] failed to create timer: ", err) + log_error("failed to create timer: ", err) end end @@ -25,29 +40,23 @@ local function async_autojoin(premature) -- If this node is the only node in the cluster, but other nodes are present, then try to join them -- This usually happens when two nodes are started very fast, and the first node didn't write his -- information into the datastore yet. When the second node starts up, there is nothing to join yet. - local lock, err = resty_lock:new("cluster_autojoin_locks", { - exptime = ASYNC_AUTOJOIN_INTERVAL - 0.001 - }) - if not lock then - ngx_log(ngx.ERR, "could not create lock: ", err) - return - end - local elapsed = lock:lock("async_autojoin") - if elapsed and elapsed == 0 then + local ok, err = get_lock(ASYNC_AUTOJOIN_KEY, ASYNC_AUTOJOIN_INTERVAL) + if ok then + log_debug("auto-joining") -- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in -- the DAO, then try to join them local count, err = singletons.dao.nodes:count() if err then - ngx_log(ngx.ERR, tostring(err)) + log_error(tostring(err)) elseif count > 1 then local members, err = singletons.serf:members() if err then - ngx_log(ngx.ERR, tostring(err)) + log_error(tostring(err)) elseif #members < 2 then -- Trigger auto-join local _, err = singletons.serf:autojoin() if err then - ngx_log(ngx.ERR, tostring(err)) + log_error(tostring(err)) end else return -- The node is already in the cluster and no need to continue @@ -63,23 +72,23 @@ local function async_autojoin(premature) if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) end + elseif err ~= "exists" then + log_error(err) end end local function send_keepalive(premature) if premature then return end - local lock = resty_lock:new("cluster_locks", { - exptime = KEEPALIVE_INTERVAL - 0.001 - }) - local elapsed = lock:lock("keepalive") - if elapsed and elapsed == 0 then + local ok, err = get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) + if ok then + log_debug("sending keepalive") -- Send keepalive local nodes, err = singletons.dao.nodes:find_all { name = singletons.serf.node_name } if err then - ngx_log(ngx.ERR, tostring(err)) + log_error(tostring(err)) elseif #nodes == 1 then local node = nodes[1] local _, err = singletons.dao.nodes:update(node, node, { @@ -87,9 +96,11 @@ local function send_keepalive(premature) quiet = true }) if err then - ngx_log(ngx.ERR, tostring(err)) + log_error(tostring(err)) end end + elseif err ~= "exists" then + log_error(err) end create_timer(KEEPALIVE_INTERVAL, send_keepalive) diff --git a/kong/core/reports.lua b/kong/core/reports.lua index 1f9cba47b83b..0d6226a31897 100644 --- a/kong/core/reports.lua +++ b/kong/core/reports.lua @@ -4,7 +4,6 @@ local cache = require "kong.tools.database_cache" local utils = require "kong.tools.utils" local pl_utils = require "pl.utils" local pl_stringx = require "pl.stringx" -local resty_lock = require "resty.lock" local singletons = require "kong.singletons" local constants = require "kong.constants" local concat = table.concat @@ -12,9 +11,11 @@ local udp_sock = ngx.socket.udp local ping_handler, system_infos local enabled = false -local ping_interval = 3600 local unique_str = utils.random_string() +local PING_INTERVAL = 3600 +local KEY = "events:reports" + -------- -- utils -------- @@ -23,6 +24,10 @@ local function log_error(...) ngx.log(ngx.WARN, "[reports] ", ...) end +local function log_debug(...) + ngx.log(ngx.DEBUG, "[reports] ", ...) +end + local function get_system_infos() local infos = { version = meta._VERSION @@ -97,8 +102,20 @@ end -- ping handler --------------- +local function get_lock() + -- the lock is held for the whole interval to prevent multiple + -- worker processes from sending the test request simultaneously. + -- here we substract the lock expiration time by 1ms to prevent + -- a race condition with the next timer event. + local ok, err = cache.rawadd(KEY, true, PING_INTERVAL - 0.001) + if not ok then + return nil, err + end + return true +end + local function create_ping_timer() - local ok, err = ngx.timer.at(ping_interval, ping_handler) + local ok, err = ngx.timer.at(PING_INTERVAL, ping_handler) if not ok then log_error("failed to create ping timer: ", err) end @@ -107,25 +124,19 @@ end ping_handler = function(premature) if premature then return end - local lock, err = resty_lock:new("reports_locks", { - exptime = ping_interval - 0.001 - }) - if not lock then - log_error("could not create lock: ", err) - return - end - - local elapsed, err = lock:lock("ping") - if not elapsed then - log_error("failed to acquire ping lock: ", err) - elseif elapsed == 0 then + local ok, err = get_lock() + if ok then + log_debug("flushing") + local requests = cache.get(cache.requests_key()) or 0 send { signal = "ping", - requests = cache.get(cache.requests_key()) or 0, + requests = requests, unique_id = unique_str, database = singletons.configuration.database } - cache.rawset(cache.requests_key(), 0) + cache.rawset(cache.requests_key(), -requests) + elseif err ~= "exists" then + log_error(err) end create_ping_timer() diff --git a/kong/templates/nginx_kong.lua b/kong/templates/nginx_kong.lua index cea3da821613..dc27806ead44 100644 --- a/kong/templates/nginx_kong.lua +++ b/kong/templates/nginx_kong.lua @@ -35,9 +35,6 @@ lua_code_cache ${{LUA_CODE_CACHE}}; lua_max_running_timers 4096; lua_max_pending_timers 16384; lua_shared_dict cache ${{MEM_CACHE_SIZE}}; -lua_shared_dict reports_locks 100k; -lua_shared_dict cluster_locks 100k; -lua_shared_dict cluster_autojoin_locks 100k; lua_shared_dict cache_locks 100k; lua_shared_dict cassandra 1m; lua_shared_dict cassandra_prepared 5m; diff --git a/kong/tools/database_cache.lua b/kong/tools/database_cache.lua index 5bae4643401d..2489206c6a89 100644 --- a/kong/tools/database_cache.lua +++ b/kong/tools/database_cache.lua @@ -41,6 +41,10 @@ function _M.rawget(key) return cache:get(key) end +function _M.rawadd(key, value, exptime) + return cache:add(key, value, exptime) +end + function _M.get(key) local value, flags = _M.rawget(key) if value then From 9370ae502e77fbf8548d9a573ea58dd299fff33d Mon Sep 17 00:00:00 2001 From: thefosk Date: Wed, 19 Oct 2016 21:40:56 -0700 Subject: [PATCH 2/3] addressing review --- kong/core/cluster.lua | 37 ++++++++++++++++--------------------- kong/core/reports.lua | 30 ++++++++++-------------------- 2 files changed, 26 insertions(+), 41 deletions(-) diff --git a/kong/core/cluster.lua b/kong/core/cluster.lua index 36a468ce8854..9059ba092032 100644 --- a/kong/core/cluster.lua +++ b/kong/core/cluster.lua @@ -7,12 +7,12 @@ local ASYNC_AUTOJOIN_INTERVAL = 3 local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20) local ASYNC_AUTOJOIN_KEY = "events:autojoin" -local function log_error(...) - ngx.log(ngx.WARN, "[cluster] ", ...) -end +local ngx_log = ngx.log +local ERR = ngx.ERR +local DEBUG = ngx.DEBUG -local function log_debug(...) - ngx.log(ngx.DEBUG, "[cluster] ", ...) +local function log(lvl, ...) + ngx_log(lvl, "[cluster] ", ...) end local function get_lock(key, interval) @@ -20,17 +20,13 @@ local function get_lock(key, interval) -- worker processes from sending the test request simultaneously. -- here we substract the lock expiration time by 1ms to prevent -- a race condition with the next timer event. - local ok, err = cache.rawadd(key, true, interval - 0.001) - if not ok then - return nil, err - end - return true + return cache.rawadd(key, true, interval - 0.001) end local function create_timer(at, cb) local ok, err = ngx.timer.at(at, cb) if not ok then - log_error("failed to create timer: ", err) + log(ERR, "failed to create timer: ", err) end end @@ -42,21 +38,21 @@ local function async_autojoin(premature) -- information into the datastore yet. When the second node starts up, there is nothing to join yet. local ok, err = get_lock(ASYNC_AUTOJOIN_KEY, ASYNC_AUTOJOIN_INTERVAL) if ok then - log_debug("auto-joining") + log(DEBUG, "auto-joining") -- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in -- the DAO, then try to join them local count, err = singletons.dao.nodes:count() if err then - log_error(tostring(err)) + log(ERR, err) elseif count > 1 then local members, err = singletons.serf:members() if err then - log_error(tostring(err)) + log(ERR, err) elseif #members < 2 then -- Trigger auto-join local _, err = singletons.serf:autojoin() if err then - log_error(tostring(err)) + log(ERR, err) end else return -- The node is already in the cluster and no need to continue @@ -73,7 +69,7 @@ local function async_autojoin(premature) create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) end elseif err ~= "exists" then - log_error(err) + log(ERR, err) end end @@ -82,13 +78,12 @@ local function send_keepalive(premature) local ok, err = get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) if ok then - log_debug("sending keepalive") - -- Send keepalive + log(DEBUG, "sending keepalive") local nodes, err = singletons.dao.nodes:find_all { name = singletons.serf.node_name } if err then - log_error(tostring(err)) + log(ERR, err) elseif #nodes == 1 then local node = nodes[1] local _, err = singletons.dao.nodes:update(node, node, { @@ -96,11 +91,11 @@ local function send_keepalive(premature) quiet = true }) if err then - log_error(tostring(err)) + log(ERR, err) end end elseif err ~= "exists" then - log_error(err) + log(ERR, err) end create_timer(KEEPALIVE_INTERVAL, send_keepalive) diff --git a/kong/core/reports.lua b/kong/core/reports.lua index 0d6226a31897..9ec38a0213f6 100644 --- a/kong/core/reports.lua +++ b/kong/core/reports.lua @@ -16,16 +16,11 @@ local unique_str = utils.random_string() local PING_INTERVAL = 3600 local KEY = "events:reports" --------- --- utils --------- +local ngx_log = ngx.log +local ERR = ngx.ERR -local function log_error(...) - ngx.log(ngx.WARN, "[reports] ", ...) -end - -local function log_debug(...) - ngx.log(ngx.DEBUG, "[reports] ", ...) +local function log(lvl, ...) + ngx_log(lvl, "[cluster] ", ...) end local function get_system_infos() @@ -81,7 +76,7 @@ local function send(t, host, port) local sock = udp_sock() local ok, err = sock:setpeername(host, port) if not ok then - log_error("could not set peer name for UDP socket: ", err) + log(ERR, "could not set peer name for UDP socket: ", err) return end @@ -89,12 +84,12 @@ local function send(t, host, port) ok, err = sock:send("<14>"..msg) -- syslog facility code 'log alert' if not ok then - log_error("could not send data: ", err) + log(ERR, "could not send data: ", err) end ok, err = sock:close() if not ok then - log_error("could not close socket: ", err) + log(ERR, "could not close socket: ", err) end end @@ -107,17 +102,13 @@ local function get_lock() -- worker processes from sending the test request simultaneously. -- here we substract the lock expiration time by 1ms to prevent -- a race condition with the next timer event. - local ok, err = cache.rawadd(KEY, true, PING_INTERVAL - 0.001) - if not ok then - return nil, err - end - return true + return cache.rawadd(KEY, true, PING_INTERVAL - 0.001) end local function create_ping_timer() local ok, err = ngx.timer.at(PING_INTERVAL, ping_handler) if not ok then - log_error("failed to create ping timer: ", err) + log(ERR, "failed to create ping timer: ", err) end end @@ -126,7 +117,6 @@ ping_handler = function(premature) local ok, err = get_lock() if ok then - log_debug("flushing") local requests = cache.get(cache.requests_key()) or 0 send { signal = "ping", @@ -136,7 +126,7 @@ ping_handler = function(premature) } cache.rawset(cache.requests_key(), -requests) elseif err ~= "exists" then - log_error(err) + log(ERR, err) end create_ping_timer() From 6b705c91f681d18ad76844720e701b32eaf3f570 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Fri, 28 Oct 2016 12:32:20 -0700 Subject: [PATCH 3/3] refactor(events) use kong shm instead of db cache --- kong/core/cluster.lua | 162 ++++++++++++++++++++-------------- kong/tools/database_cache.lua | 5 -- 2 files changed, 95 insertions(+), 72 deletions(-) diff --git a/kong/core/cluster.lua b/kong/core/cluster.lua index 9059ba092032..e681cb383616 100644 --- a/kong/core/cluster.lua +++ b/kong/core/cluster.lua @@ -1,109 +1,137 @@ -local cache = require "kong.tools.database_cache" local singletons = require "kong.singletons" -local KEEPALIVE_INTERVAL = 30 -local KEEPALIVE_KEY = "events:keepalive" -local ASYNC_AUTOJOIN_INTERVAL = 3 -local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20) -local ASYNC_AUTOJOIN_KEY = "events:autojoin" +local timer_at = ngx.timer.at local ngx_log = ngx.log local ERR = ngx.ERR local DEBUG = ngx.DEBUG + +local KEEPALIVE_INTERVAL = 30 +local KEEPALIVE_KEY = "events:keepalive" +local AUTOJOIN_INTERVAL = 3 +local AUTOJOIN_KEY = "events:autojoin" +local AUTOJOIN_MAX_RETRIES = 20 -- Try for max a minute (3s * 20) +local AUTOJOIN_MAX_RETRIES_KEY = "autojoin_retries" + + +local kong_dict = ngx.shared.kong + + local function log(lvl, ...) ngx_log(lvl, "[cluster] ", ...) end -local function get_lock(key, interval) - -- the lock is held for the whole interval to prevent multiple - -- worker processes from sending the test request simultaneously. - -- here we substract the lock expiration time by 1ms to prevent - -- a race condition with the next timer event. - return cache.rawadd(key, true, interval - 0.001) + +-- Hold a lock for the whole interval (exptime) to prevent multiple +-- worker processes from sending the test request simultaneously. +-- Other workers do not need to wait until this lock is released, +-- and can ignore the event, knowing another worker is handling it. +-- We substract 1ms to the exp time to prevent a race condition +-- with the next timer event. +local function get_lock(key, exptime) + local ok, err = kong_dict:safe_add(key, true, exptime - 0.001) + if not ok and err ~= "exists" then + log(ERR, "could not get lock from 'kong' shm: ", err) + end + + return ok end -local function create_timer(at, cb) - local ok, err = ngx.timer.at(at, cb) + +local function create_timer(...) + local ok, err = timer_at(...) if not ok then - log(ERR, "failed to create timer: ", err) + log(ERR, "could not create timer: ", err) end end -local function async_autojoin(premature) - if premature then return end + +local function autojoin_handler(premature) + if premature then + return + end + + if not get_lock(AUTOJOIN_KEY, AUTOJOIN_INTERVAL) then + return + end -- If this node is the only node in the cluster, but other nodes are present, then try to join them -- This usually happens when two nodes are started very fast, and the first node didn't write his -- information into the datastore yet. When the second node starts up, there is nothing to join yet. - local ok, err = get_lock(ASYNC_AUTOJOIN_KEY, ASYNC_AUTOJOIN_INTERVAL) - if ok then - log(DEBUG, "auto-joining") - -- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in - -- the DAO, then try to join them - local count, err = singletons.dao.nodes:count() + log(DEBUG, "auto-joining") + + -- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in + -- the DAO, then try to join them + local count, err = singletons.dao.nodes:count() + if err then + log(ERR, err) + + elseif count > 1 then + local members, err = singletons.serf:members() if err then log(ERR, err) - elseif count > 1 then - local members, err = singletons.serf:members() + + elseif #members < 2 then + -- Trigger auto-join + local _, err = singletons.serf:autojoin() if err then log(ERR, err) - elseif #members < 2 then - -- Trigger auto-join - local _, err = singletons.serf:autojoin() - if err then - log(ERR, err) - end - else - return -- The node is already in the cluster and no need to continue end - end - -- Create retries counter key if it doesn't exist - if not cache.get(cache.autojoin_retries_key()) then - cache.rawset(cache.autojoin_retries_key(), 0) + else + return -- The node is already in the cluster and no need to continue end + end - local autojoin_retries = cache.incr(cache.autojoin_retries_key(), 1) -- Increment retries counter - if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then - create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) - end - elseif err ~= "exists" then - log(ERR, err) + local n_retries, err = kong_dict:incr(AUTOJOIN_MAX_RETRIES_KEY, 1, 0) + if err then + log(ERR, "could not increment number of auto-join retries in 'kong' ", + "shm: ", err) + return + end + + if n_retries < AUTOJOIN_MAX_RETRIES then + create_timer(AUTOJOIN_INTERVAL, autojoin_handler) end end -local function send_keepalive(premature) - if premature then return end - local ok, err = get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) - if ok then - log(DEBUG, "sending keepalive") - local nodes, err = singletons.dao.nodes:find_all { - name = singletons.serf.node_name - } +local function keepalive_handler(premature) + if premature then + return + end + + if not get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) then + return + end + + log(DEBUG, "sending keepalive event to datastore") + + local nodes, err = singletons.dao.nodes:find_all { + name = singletons.serf.node_name + } + if err then + log(ERR, "could not retrieve nodes from datastore: ", err) + + elseif #nodes == 1 then + local node = nodes[1] + local _, err = singletons.dao.nodes:update(node, node, { + ttl = singletons.configuration.cluster_ttl_on_failure, + quiet = true + }) if err then - log(ERR, err) - elseif #nodes == 1 then - local node = nodes[1] - local _, err = singletons.dao.nodes:update(node, node, { - ttl = singletons.configuration.cluster_ttl_on_failure, - quiet = true - }) - if err then - log(ERR, err) - end + log(ERR, "could not update node in datastore:", err) end - elseif err ~= "exists" then - log(ERR, err) end - create_timer(KEEPALIVE_INTERVAL, send_keepalive) + create_timer(KEEPALIVE_INTERVAL, keepalive_handler) end + return { init_worker = function() - create_timer(KEEPALIVE_INTERVAL, send_keepalive) - create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) -- Only execute one time + create_timer(KEEPALIVE_INTERVAL, keepalive_handler) + create_timer(AUTOJOIN_INTERVAL, autojoin_handler) end -} \ No newline at end of file +} diff --git a/kong/tools/database_cache.lua b/kong/tools/database_cache.lua index 2489206c6a89..005d7cab9261 100644 --- a/kong/tools/database_cache.lua +++ b/kong/tools/database_cache.lua @@ -16,7 +16,6 @@ local CACHE_KEYS = { ACLS = "acls", SSL = "ssl", REQUESTS = "requests", - AUTOJOIN_RETRIES = "autojoin_retries", TIMERS = "timers", ALL_APIS_BY_DIC = "ALL_APIS_BY_DIC", LDAP_CREDENTIAL = "ldap_credentials", @@ -70,10 +69,6 @@ function _M.requests_key() return CACHE_KEYS.REQUESTS end -function _M.autojoin_retries_key() - return CACHE_KEYS.AUTOJOIN_RETRIES -end - function _M.api_key(host) return CACHE_KEYS.APIS..":"..host end