Skip to content

Commit

Permalink
feat(pdk) add rate-limiting to kong.vault.try function
Browse files Browse the repository at this point in the history
### Summary

Adds rate-limiting so that `kong.vault.try` will not call the vault apis
everytime it fails with a callback. This will limit concurrency on vault
credentials update. The waiting threads wait at maximum 1 second.
  • Loading branch information
bungle committed Jul 7, 2022
1 parent 6bfb80d commit 123551b
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 66 deletions.
238 changes: 172 additions & 66 deletions kong/pdk/vault.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ local require = require

local constants = require "kong.constants"
local arguments = require "kong.api.arguments"
local semaphore = require "ngx.semaphore"
local lrucache = require "resty.lrucache"
local cjson = require("cjson.safe").new()
local isempty = require "table.isempty"
local buffer = require "string.buffer"
local nkeys = require "table.nkeys"
local clone = require "table.clone"
local cjson = require("cjson.safe").new()


local ngx = ngx
Expand All @@ -23,10 +27,12 @@ local byte = string.byte
local gsub = string.gsub
local type = type
local next = next
local sort = table.sort
local pcall = pcall
local lower = string.lower
local pairs = pairs
local concat = table.concat
local md5_bin = ngx.md5_bin
local tostring = tostring
local tonumber = tonumber
local decode_args = ngx.decode_args
Expand All @@ -40,6 +46,15 @@ local function new(self)
local LRU = lrucache.new(1000)


local KEY_BUFFER = buffer.new(100)


local RETRY_LRU = lrucache.new(1000)
local RETRY_SEMAPHORE = semaphore.new(1)
local RETRY_WAIT = 1
local RETRY_TTL = 10


local STRATEGIES = {}
local SCHEMAS = {}
local CONFIGS = {}
Expand Down Expand Up @@ -117,6 +132,41 @@ local function new(self)
end


local function retrieve_value(strategy, config, reference, resource,
name, version, key, cache, rotation)
local cache_key
if cache or rotation then
cache_key = build_cache_key(name, resource, version)
end

local value, err
if rotation then
value = rotation[cache_key]
if not value then
value, err = strategy.get(config, resource, version)
if value then
rotation[cache_key] = value
if cache then
-- Warmup cache just in case the value is needed elsewhere.
-- TODO: do we need to clear cache first?
cache:get(cache_key, nil, function()
return value, err
end)
end
end
end

elseif cache then
value, err = cache:get(cache_key, nil, strategy.get, config, resource, version)

else
value, err = strategy.get(config, resource, version)
end

return validate_value(value, err, name, resource, key, reference)
end


local function process_secret(reference, opts, rotation)
local name = opts.name
if not VAULT_NAMES[name] then
Expand Down Expand Up @@ -201,34 +251,9 @@ local function new(self)
CONFIGS[name] = config
end

local cache = self and self.core_cache
local resource = opts.resource
local version = opts.version

local cache_key
if cache or rotation then
cache_key = build_cache_key(name, resource, version)
end

if rotation then
local value = rotation[cache_key]
if value then
return validate_value(value, nil, name, resource, opts.key, reference)
end
end

local value, err
if cache then
value, err = cache:get(cache_key, nil, strategy.get, config, resource, version)
else
value, err = strategy.get(config, resource, version)
end

if rotation and value then
rotation[cache_key] = value
end

return validate_value(value, err, name, resource, opts.key, reference)
return retrieve_value(strategy, config, reference, opts.resource, name,
opts.version, opts.key, self and self.core_cache,
rotation)
end


Expand Down Expand Up @@ -287,34 +312,8 @@ local function new(self)
config = vault.config
end

local resource = opts.resource
local version = opts.version

local cache_key
if cache or rotation then
cache_key = build_cache_key(prefix, resource, version)
end

if rotation then
local value = rotation[cache_key]
if value then
return validate_value(value, nil, prefix, resource, opts.key, reference)
end
end

local value
if cache then
local cache_key = build_cache_key(prefix, resource, version)
value, err = cache:get(cache_key, nil, strategy.get, config, resource, version)
else
value, err = strategy.get(config, resource, version)
end

if rotation and value then
rotation[cache_key] = value
end

return validate_value(value, err, prefix, resource, opts.key, reference)
return retrieve_value(strategy, config, reference, opts.resource, prefix,
opts.version, opts.key, cache, rotation)
end


Expand Down Expand Up @@ -426,37 +425,144 @@ local function new(self)


local function try(callback, options)
-- store current values early on to avoid race conditions
local previous
local refs
local refs_empty
if options then
refs = options["$refs"]
if refs then
refs_empty = isempty(refs)
if not refs_empty then
previous = {}
for name in pairs(refs) do
previous[name] = options[name]
end
end
end
end

-- try with already resolved credentials
local res, err = callback(options)
if res then
return res
end

if not options then
self.log.notice("cannot automatically rotate secrets in absence of options")
return nil, err
end

local refs = options["$refs"]
if not refs then
self.log.notice('cannot automatically rotate secrets in absence of options["$refs"]')
return nil, err
end

if not next(refs) then
if refs_empty then
self.log.notice('cannot automatically rotate secrets with empty options["$refs"]')
return nil, err
end

-- generate an LRU key
local count = nkeys(refs)
local keys = self.table.new(count, 0)
local i = 0
for k in pairs(refs) do
i = i + 1
keys[i] = k
end

sort(keys)

KEY_BUFFER:reset()

for i = 1, count do
local key = keys[i]
local val = refs[key]
KEY_BUFFER:putf("%s=%s;", key, val)
end

local key = md5_bin(KEY_BUFFER:tostring())
local updated

-- is there already values with RETRY_TTL seconds ttl?
local values = RETRY_LRU:get(key)
if values then
for name, value in pairs(values) do
updated = previous[name] ~= value
if updated then
break
end
end

if not updated then
return nil, err
end

for name, value in pairs(values) do
options[name] = value
end

-- try with updated credentials
return callback(options)
end

-- grab a semaphore to limit concurrent updates to reduce calls to vaults
local wait_ok, wait_err = RETRY_SEMAPHORE:wait(RETRY_WAIT)
if not wait_ok then
self.log.notice("waiting for semaphore failed: ", wait_err or "unknown")
end

-- do we now have values with RETRY_TTL seconds ttl?
values = RETRY_LRU:get(key)
if values then
if wait_ok then
-- release a resource
RETRY_SEMAPHORE:post()
end

for name, value in pairs(values) do
updated = previous[name] ~= value
if updated then
break
end
end

if not updated then
return nil, err
end

for name, value in pairs(values) do
options[name] = value
end

-- try with updated credentials
return callback(options)
end

-- resolve references without read-cache
local rotation = {}
local values = {}
for name, ref in pairs(refs) do
local value, err = get(ref, rotation)
for i = 1, count do
local name = keys[i]
local value, get_err = get(refs[name], rotation)
if not value then
return nil, err
end
if not updated and value ~= options[name] then
updated = true
self.log.notice("resolving reference ", refs[name], " failed: ", get_err or "unknown")

else
values[name] = value
if updated == nil and previous[name] ~= value then
updated = true
end
end
values[name] = value
end

-- set the values in LRU
RETRY_LRU:set(key, values, RETRY_TTL)

if wait_ok then
-- release a resource
RETRY_SEMAPHORE:post()
end

if not updated then
Expand Down

0 comments on commit 123551b

Please sign in to comment.