Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(events) using a regular DICT for async events #1748

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
176 changes: 105 additions & 71 deletions kong/core/cluster.lua
Original file line number Diff line number Diff line change
@@ -1,103 +1,137 @@
local cache = require "kong.tools.database_cache"
local singletons = require "kong.singletons"


local timer_at = ngx.timer.at
local ngx_log = ngx.log
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG

local resty_lock
local status, res = pcall(require, "resty.lock")
if status then
resty_lock = res
end

local KEEPALIVE_INTERVAL = 30
local ASYNC_AUTOJOIN_INTERVAL = 3
local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20)
local KEEPALIVE_KEY = "events:keepalive"
local AUTOJOIN_INTERVAL = 3
local AUTOJOIN_KEY = "events:autojoin"
local AUTOJOIN_MAX_RETRIES = 20 -- Try for max a minute (3s * 20)
local AUTOJOIN_MAX_RETRIES_KEY = "autojoin_retries"


local kong_dict = ngx.shared.kong


local function log(lvl, ...)
ngx_log(lvl, "[cluster] ", ...)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better pattern (which is already commonly used across the codebase) is:

local ngx_log = ngx.log
local ERR = ngx.ERR
local WARN = ngx.WARN
local DEBUG = ngx.DEBUG

local function log(lvl, ...)
  ngx_log(lvl, "[cluster] ", ...)
end

-- ...

log(ERR, "thing")



-- Hold a lock for the whole interval (exptime) to prevent multiple
-- worker processes from sending the test request simultaneously.
-- Other workers do not need to wait until this lock is released,
-- and can ignore the event, knowing another worker is handling it.
-- We substract 1ms to the exp time to prevent a race condition
-- with the next timer event.
local function get_lock(key, exptime)
local ok, err = kong_dict:safe_add(key, true, exptime - 0.001)
if not ok and err ~= "exists" then
log(ERR, "could not get lock from 'kong' shm: ", err)
end

local function create_timer(at, cb)
local ok, err = ngx.timer.at(at, cb)
return ok
end


local function create_timer(...)
local ok, err = timer_at(...)
if not ok then
ngx_log(ngx.ERR, "[cluster] failed to create timer: ", err)
log(ERR, "could not create timer: ", err)
end
end

local function async_autojoin(premature)
if premature then return end

local function autojoin_handler(premature)
if premature then
return
end

if not get_lock(AUTOJOIN_KEY, AUTOJOIN_INTERVAL) then
return
end

-- If this node is the only node in the cluster, but other nodes are present, then try to join them
-- This usually happens when two nodes are started very fast, and the first node didn't write his
-- information into the datastore yet. When the second node starts up, there is nothing to join yet.
local lock, err = resty_lock:new("cluster_autojoin_locks", {
exptime = ASYNC_AUTOJOIN_INTERVAL - 0.001
})
if not lock then
ngx_log(ngx.ERR, "could not create lock: ", err)
return
end
local elapsed = lock:lock("async_autojoin")
if elapsed and elapsed == 0 then
-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in
-- the DAO, then try to join them
local count, err = singletons.dao.nodes:count()
log(DEBUG, "auto-joining")

-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in
-- the DAO, then try to join them
local count, err = singletons.dao.nodes:count()
if err then
log(ERR, err)

elseif count > 1 then
local members, err = singletons.serf:members()
if err then
ngx_log(ngx.ERR, tostring(err))
elseif count > 1 then
local members, err = singletons.serf:members()
log(ERR, err)

elseif #members < 2 then
-- Trigger auto-join
local _, err = singletons.serf:autojoin()
if err then
ngx_log(ngx.ERR, tostring(err))
elseif #members < 2 then
-- Trigger auto-join
local _, err = singletons.serf:autojoin()
if err then
ngx_log(ngx.ERR, tostring(err))
end
else
return -- The node is already in the cluster and no need to continue
log(ERR, err)
end
end

-- Create retries counter key if it doesn't exist
if not cache.get(cache.autojoin_retries_key()) then
cache.rawset(cache.autojoin_retries_key(), 0)
else
return -- The node is already in the cluster and no need to continue
end
end

local autojoin_retries = cache.incr(cache.autojoin_retries_key(), 1) -- Increment retries counter
if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin)
end
local n_retries, err = kong_dict:incr(AUTOJOIN_MAX_RETRIES_KEY, 1, 0)
if err then
log(ERR, "could not increment number of auto-join retries in 'kong' ",
"shm: ", err)
return
end

if n_retries < AUTOJOIN_MAX_RETRIES then
create_timer(AUTOJOIN_INTERVAL, autojoin_handler)
end
end

local function send_keepalive(premature)
if premature then return end

local lock = resty_lock:new("cluster_locks", {
exptime = KEEPALIVE_INTERVAL - 0.001
})
local elapsed = lock:lock("keepalive")
if elapsed and elapsed == 0 then
-- Send keepalive
local nodes, err = singletons.dao.nodes:find_all {
name = singletons.serf.node_name
}

local function keepalive_handler(premature)
if premature then
return
end

if not get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) then
return
end

log(DEBUG, "sending keepalive event to datastore")

local nodes, err = singletons.dao.nodes:find_all {
name = singletons.serf.node_name
}
if err then
log(ERR, "could not retrieve nodes from datastore: ", err)

elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node, {
ttl = singletons.configuration.cluster_ttl_on_failure,
quiet = true
})
if err then
ngx_log(ngx.ERR, tostring(err))
elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node, {
ttl = singletons.configuration.cluster_ttl_on_failure,
quiet = true
})
if err then
ngx_log(ngx.ERR, tostring(err))
end
log(ERR, "could not update node in datastore:", err)
end
end

create_timer(KEEPALIVE_INTERVAL, send_keepalive)
create_timer(KEEPALIVE_INTERVAL, keepalive_handler)
end


return {
init_worker = function()
create_timer(KEEPALIVE_INTERVAL, send_keepalive)
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) -- Only execute one time
create_timer(KEEPALIVE_INTERVAL, keepalive_handler)
create_timer(AUTOJOIN_INTERVAL, autojoin_handler)
end
}
}
53 changes: 27 additions & 26 deletions kong/core/reports.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ local cache = require "kong.tools.database_cache"
local utils = require "kong.tools.utils"
local pl_utils = require "pl.utils"
local pl_stringx = require "pl.stringx"
local resty_lock = require "resty.lock"
local singletons = require "kong.singletons"
local constants = require "kong.constants"
local concat = table.concat
local udp_sock = ngx.socket.udp

local ping_handler, system_infos
local enabled = false
local ping_interval = 3600
local unique_str = utils.random_string()

--------
-- utils
--------
local PING_INTERVAL = 3600
local KEY = "events:reports"

local function log_error(...)
ngx.log(ngx.WARN, "[reports] ", ...)
local ngx_log = ngx.log
local ERR = ngx.ERR

local function log(lvl, ...)
ngx_log(lvl, "[cluster] ", ...)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: better logging utility idiom


local function get_system_infos()
Expand Down Expand Up @@ -76,56 +76,57 @@ local function send(t, host, port)
local sock = udp_sock()
local ok, err = sock:setpeername(host, port)
if not ok then
log_error("could not set peer name for UDP socket: ", err)
log(ERR, "could not set peer name for UDP socket: ", err)
return
end

sock:settimeout(1000)

ok, err = sock:send("<14>"..msg) -- syslog facility code 'log alert'
if not ok then
log_error("could not send data: ", err)
log(ERR, "could not send data: ", err)
end

ok, err = sock:close()
if not ok then
log_error("could not close socket: ", err)
log(ERR, "could not close socket: ", err)
end
end

---------------
-- ping handler
---------------

local function get_lock()
-- the lock is held for the whole interval to prevent multiple
-- worker processes from sending the test request simultaneously.
-- here we substract the lock expiration time by 1ms to prevent
-- a race condition with the next timer event.
return cache.rawadd(KEY, true, PING_INTERVAL - 0.001)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the new ngx.shared.kong dict here. It is meant especially for long-lived data that shouldn't get deleted in an LRU fashion (unlike the cache dict which is its purpose).

(Only write to ngx.shared.kong in a safe way with safe_set and safe_add as we want to report no memory errors to the user in that case, so it can be increased).

end

local function create_ping_timer()
local ok, err = ngx.timer.at(ping_interval, ping_handler)
local ok, err = ngx.timer.at(PING_INTERVAL, ping_handler)
if not ok then
log_error("failed to create ping timer: ", err)
log(ERR, "failed to create ping timer: ", err)
end
end

ping_handler = function(premature)
if premature then return end

local lock, err = resty_lock:new("reports_locks", {
exptime = ping_interval - 0.001
})
if not lock then
log_error("could not create lock: ", err)
return
end

local elapsed, err = lock:lock("ping")
if not elapsed then
log_error("failed to acquire ping lock: ", err)
elseif elapsed == 0 then
local ok, err = get_lock()
if ok then
local requests = cache.get(cache.requests_key()) or 0
send {
signal = "ping",
requests = cache.get(cache.requests_key()) or 0,
requests = requests,
unique_id = unique_str,
database = singletons.configuration.database
}
cache.rawset(cache.requests_key(), 0)
cache.rawset(cache.requests_key(), -requests)
elseif err ~= "exists" then
log(ERR, err)
end

create_ping_timer()
Expand Down
3 changes: 0 additions & 3 deletions kong/templates/nginx_kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ lua_code_cache ${{LUA_CODE_CACHE}};
lua_max_running_timers 4096;
lua_max_pending_timers 16384;
lua_shared_dict cache ${{MEM_CACHE_SIZE}};
lua_shared_dict reports_locks 100k;
lua_shared_dict cluster_locks 100k;
lua_shared_dict cluster_autojoin_locks 100k;
lua_shared_dict cache_locks 100k;
lua_shared_dict cassandra 1m;
lua_shared_dict cassandra_prepared 5m;
Expand Down
9 changes: 4 additions & 5 deletions kong/tools/database_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ local CACHE_KEYS = {
ACLS = "acls",
SSL = "ssl",
REQUESTS = "requests",
AUTOJOIN_RETRIES = "autojoin_retries",
TIMERS = "timers",
ALL_APIS_BY_DIC = "ALL_APIS_BY_DIC",
LDAP_CREDENTIAL = "ldap_credentials",
Expand All @@ -41,6 +40,10 @@ function _M.rawget(key)
return cache:get(key)
end

function _M.rawadd(key, value, exptime)
return cache:add(key, value, exptime)
end

function _M.get(key)
local value, flags = _M.rawget(key)
if value then
Expand All @@ -66,10 +69,6 @@ function _M.requests_key()
return CACHE_KEYS.REQUESTS
end

function _M.autojoin_retries_key()
return CACHE_KEYS.AUTOJOIN_RETRIES
end

function _M.api_key(host)
return CACHE_KEYS.APIS..":"..host
end
Expand Down