Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dns): fix retry and timeout handling #11386

Merged
merged 1 commit into from Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -118,6 +118,9 @@
[#11306](https://github.com/Kong/kong/pull/11306)
- Fix an issue where cluster_cert or cluster_ca_cert is inserted into lua_ssl_trusted_certificate before being base64 decoded.
[#11385](https://github.com/Kong/kong/pull/11385)
- Update the DNS client to follow configured timeouts in a more predictable manner. Also fix a corner case in its
behavior that could cause it to resolve incorrectly during transient network and DNS server failures.
[#11386](https://github.com/Kong/kong/pull/11386)

#### Admin API

Expand Down
99 changes: 42 additions & 57 deletions kong/resty/dns/client.lua
Expand Up @@ -365,9 +365,9 @@ end
-- @param self the try_list to add to
-- @param status string with current status, added to the list for the current try
-- @return the try_list
local function try_status(self, status)
local status_list = self[#self].msg
status_list[#status_list + 1] = status
local function add_status_to_try_list(self, status)
hanshuebner marked this conversation as resolved.
Show resolved Hide resolved
local try_list = self[#self].msg
try_list[#try_list + 1] = status
return self
end

Expand All @@ -388,8 +388,7 @@ end
-- @section resolving


local poolMaxWait
local poolMaxRetry
local resolve_max_wait

--- Initialize the client. Can be called multiple times. When called again it
-- will clear the cache.
Expand Down Expand Up @@ -643,8 +642,7 @@ _M.init = function(options)

config = options -- store it in our module level global

poolMaxRetry = 1 -- do one retry, dns resolver is already doing 'retrans' number of retries on top
poolMaxWait = options.timeout / 1000 * options.retrans -- default is to wait for the dns resolver to hit its timeouts
resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts

return true
end
Expand Down Expand Up @@ -677,7 +675,7 @@ local function parseAnswer(qname, qtype, answers, try_list)

if (answer.type ~= qtype) or (answer.name ~= check_qname) then
local key = answer.type..":"..answer.name
try_status(try_list, key .. " removed")
add_status_to_try_list(try_list, key .. " removed")
local lst = others[key]
if not lst then
lst = {}
Expand Down Expand Up @@ -714,7 +712,7 @@ local function individualQuery(qname, r_opts, try_list)
return r, "failed to create a resolver: " .. err, try_list
end

try_status(try_list, "querying")
add_status_to_try_list(try_list, "querying")

local result
result, err = r:query(qname, r_opts)
Expand Down Expand Up @@ -747,7 +745,7 @@ local function executeQuery(premature, item)
--[[
log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item))
--]]
try_status(item.try_list, "querying")
add_status_to_try_list(item.try_list, "querying")
item.result, item.err = r:query(item.qname, item.r_opts)
if item.result then
--[[
Expand All @@ -772,7 +770,9 @@ local function executeQuery(premature, item)
item.semaphore = nil
ngx.sleep(0)
-- 3) destroy the resolver -- ditto in individualQuery
r:destroy()
if r then
r:destroy()
end
end


Expand All @@ -791,7 +791,7 @@ local function asyncQuery(qname, r_opts, try_list)
--[[
log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item))
--]]
try_status(try_list, "in progress (async)")
add_status_to_try_list(try_list, "in progress (async)")
return item -- already in progress, return existing query
end

Expand All @@ -813,41 +813,32 @@ local function asyncQuery(qname, r_opts, try_list)
--[[
log(DEBUG, PREFIX, "Query async (scheduled): ", key, " ", fquery(item))
--]]
try_status(try_list, "scheduled")
add_status_to_try_list(try_list, "scheduled")

return item
end


-- schedules a sync query.
-- This will be synchronized, so multiple calls (sync or async) might result in 1 query.
-- The `poolMaxWait` is how long a thread waits for another to complete the query.
-- The `poolMaxRetry` is how often we wait for another query to complete.
-- The maximum delay would be `poolMaxWait * poolMaxRetry`.
-- The maximum delay would be `options.timeout * options.retrans`.
-- @param qname the name to query for
-- @param r_opts a table with the query options
-- @param try_list the try_list object to add to
-- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors
local function syncQuery(qname, r_opts, try_list, count)
local function syncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]
count = count or 1

-- if nothing is in progress, we start a new async query
if not item then
local err
item, err = asyncQuery(qname, r_opts, try_list)
--[[
log(DEBUG, PREFIX, "Query sync (new): ", key, " ", fquery(item)," count=", count)
--]]
if not item then
return item, err, try_list
end
else
--[[
log(DEBUG, PREFIX, "Query sync (exists): ", key, " ", fquery(item)," count=", count)
--]]
try_status(try_list, "in progress (sync)")
add_status_to_try_list(try_list, "in progress (sync)")
end

local supported_semaphore_wait_phases = {
Expand All @@ -872,7 +863,7 @@ local function syncQuery(qname, r_opts, try_list, count)
end

-- block and wait for the async query to complete
local ok, err = item.semaphore:wait(poolMaxWait)
local ok, err = item.semaphore:wait(resolve_max_wait)
if ok and item.result then
-- we were released, and have a query result from the
-- other thread, so all is well, return it
Expand All @@ -883,25 +874,16 @@ local function syncQuery(qname, r_opts, try_list, count)
return item.result, item.err, try_list
end

-- there was an error, either a semaphore timeout, or a lookup error
-- go retry
try_status(try_list, "try "..count.." error: "..(item.err or err or "unknown"))
if count > poolMaxRetry then
--[[
log(DEBUG, PREFIX, "Query sync (fail): ", key, " ", fquery(item)," retries exceeded. count=", count)
--]]
return nil, "dns lookup pool exceeded retries (" ..
tostring(poolMaxRetry) .. "): "..tostring(item.err or err),
try_list
end
err = err or item.err or "unknown"
add_status_to_try_list(try_list, "error: "..err)

-- don't block on the same thread again, so remove it from the queue
if queue[key] == item then queue[key] = nil end
if queue[key] == item then
queue[key] = nil
end

--[[
log(DEBUG, PREFIX, "Query sync (fail): ", key, " ", fquery(item)," retrying. count=", count)
--]]
return syncQuery(qname, r_opts, try_list, count + 1)
-- there was an error, either a semaphore timeout, or a lookup error
return nil, err
end

-- will lookup a name in the cache, or alternatively query the nameservers.
Expand Down Expand Up @@ -944,7 +926,7 @@ local function lookup(qname, r_opts, dnsCacheOnly, try_list)
try_list = try_add(try_list, qname, r_opts.qtype, "cache-hit")
if entry.expired then
-- the cached record is stale but usable, so we do a refresh query in the background
try_status(try_list, "stale")
add_status_to_try_list(try_list, "stale")
asyncQuery(qname, r_opts, try_list)
end

Expand All @@ -962,7 +944,7 @@ local function check_ipv6(qname, qtype, try_list)

local record = cachelookup(qname, qtype)
if record then
try_status(try_list, "cached")
add_status_to_try_list(try_list, "cached")
return record, nil, try_list
end

Expand All @@ -982,7 +964,7 @@ local function check_ipv6(qname, qtype, try_list)
end
if qtype == _M.TYPE_AAAA and
check:match("^%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?$") then
try_status(try_list, "validated")
add_status_to_try_list(try_list, "validated")
record = {{
address = qname,
type = _M.TYPE_AAAA,
Expand All @@ -994,7 +976,7 @@ local function check_ipv6(qname, qtype, try_list)
else
-- not a valid IPv6 address, or a bad type (non ipv6)
-- return a "server error"
try_status(try_list, "bad IPv6")
add_status_to_try_list(try_list, "bad IPv6")
record = {
errcode = 103,
errstr = clientErrors[103],
Expand All @@ -1015,12 +997,12 @@ local function check_ipv4(qname, qtype, try_list)

local record = cachelookup(qname, qtype)
if record then
try_status(try_list, "cached")
add_status_to_try_list(try_list, "cached")
return record, nil, try_list
end

if qtype == _M.TYPE_A then
try_status(try_list, "validated")
add_status_to_try_list(try_list, "validated")
record = {{
address = qname,
type = _M.TYPE_A,
Expand All @@ -1032,7 +1014,7 @@ local function check_ipv4(qname, qtype, try_list)
else
-- bad query type for this ipv4 address
-- return a "server error"
try_status(try_list, "bad IPv4")
add_status_to_try_list(try_list, "bad IPv4")
record = {
errcode = 102,
errstr = clientErrors[102],
Expand Down Expand Up @@ -1178,7 +1160,7 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list)
records = nil
-- luacheck: pop
err = "recursion detected"
try_status(try_list, err)
add_status_to_try_list(try_list, err)
return nil, err, try_list
end
end
Expand All @@ -1190,14 +1172,14 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list)
-- luacheck: push no unused
records = nil
-- luacheck: pop
try_list = try_status(try_list, "stale")
try_list = add_status_to_try_list(try_list, "stale")

else
-- a valid non-stale record
-- check for CNAME records, and dereferencing the CNAME
if (records[1] or EMPTY).type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then
opts.qtype = nil
try_status(try_list, "dereferencing CNAME")
add_status_to_try_list(try_list, "dereferencing CNAME")
return resolve(records[1].cname, opts, dnsCacheOnly, try_list)
end

Expand Down Expand Up @@ -1245,8 +1227,10 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list)
end

if not records then -- luacheck: ignore
-- nothing to do, an error
-- fall through to the next entry in our search sequence
-- An error has occurred, terminate the lookup process. We don't want to try other record types because
-- that would potentially cause us to respond with wrong answers (i.e. the contents of an A record if the
-- query for the SRV record failed due to a network error).
goto failed

elseif records.errcode then
-- dns error: fall through to the next entry in our search sequence
Expand Down Expand Up @@ -1305,7 +1289,7 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list)
if records[1].type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then
-- dereference CNAME
opts.qtype = nil
try_status(try_list, "dereferencing CNAME")
add_status_to_try_list(try_list, "dereferencing CNAME")
return resolve(records[1].cname, opts, dnsCacheOnly, try_list)
end

Expand All @@ -1314,8 +1298,9 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list)
end

-- we had some error, record it in the status list
try_status(try_list, err)
add_status_to_try_list(try_list, err)
end
::failed::

-- we failed, clear cache and return last error
if not dnsCacheOnly then
Expand Down Expand Up @@ -1525,7 +1510,7 @@ local function toip(qname, port, dnsCacheOnly, try_list)
local entry = rec[roundRobinW(rec)]
-- our SRV entry might still contain a hostname, so recurse, with found port number
local srvport = (entry.port ~= 0 and entry.port) or port -- discard port if it is 0
try_status(try_list, "dereferencing SRV")
add_status_to_try_list(try_list, "dereferencing SRV")
return toip(entry.target, srvport, dnsCacheOnly, try_list)
else
-- must be A or AAAA
Expand Down