Skip to content

Commit

Permalink
feat(db) implement the db:cluster_mutex() utility
Browse files Browse the repository at this point in the history
This function can be called to set a cluster-wide lock (via the
underlying DB, currently C* only) to execute some code in a
"cluster-wide mutex".

Example usage in the tests.
  • Loading branch information
thibaultcha committed Aug 8, 2018
1 parent 1997445 commit ab1f678
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 0 deletions.
153 changes: 153 additions & 0 deletions kong/db/init.lua
Expand Up @@ -185,4 +185,157 @@ function DB:set_events_handler(events)
end


do
local public = require "kong.tools.public"
local resty_lock = require "resty.lock"


local DEFAULT_TTL = 60 -- seconds
local MAX_LOCK_WAIT_STEP = 2 -- seconds


local function release_rlock_and_ret(rlock, ...)
rlock:unlock()

return ...
end


function DB:cluster_mutex(key, opts, cb)
if type(key) ~= "string" then
error("key must be a string", 2)
end

local owner
local ttl

if opts ~= nil then
if type(opts) ~= "table" then
error("opts must be a table", 2)
end

if opts.ttl and type(opts.ttl) ~= "number" then
error("opts.ttl must be a number", 2)
end

if opts.owner and type(opts.owner) ~= "string" then
error("opts.owner must be a string", 2)
end

owner = opts.owner
ttl = opts.ttl
end

if type(cb) ~= "function" then
local mt = getmetatable(cb)

if not mt or type(mt.__call) ~= "function" then
error("cb must be a function", 2)
end
end

if not owner then
-- generate a random string for this worker (resty-cli or runtime nginx
-- worker)
-- we use the `get_node_id()` public utility, but in the CLI context,
-- this value is ephemeral, so no assumptions should be made about the
-- real owner of a lock
local id, err = public.get_node_id()
if not id then
return nil, "failed to generate lock owner: " .. err
end

owner = id
end

if not ttl then
ttl = DEFAULT_TTL
end

local rlock, err = resty_lock:new("kong_locks", {
exptime = ttl,
timeout = ttl,
})
if not rlock then
return nil, "failed to create worker lock: " .. err
end

-- acquire a single worker

local elapsed, err = rlock:lock(key)
if not elapsed then
if err == "timeout" then
return nil, err
end

return nil, "failed to acquire worker lock: " .. err
end

if elapsed ~= 0 then
-- we did not acquire the worker lock, but it was released
return false
end

-- worker lock acquired, other workers are waiting on it
-- now acquire cluster lock via strategy-specific connector

-- ensure the locks table exists
local ok, err = self.connector:setup_locks(DEFAULT_TTL)
if not ok then
return nil, "failed to setup locks: " .. err
end

local ok, err = self.connector:insert_lock(key, ttl, owner)
if err then
return release_rlock_and_ret(rlock, nil, "failed to insert cluster lock: "
.. err)
end

if not ok then
-- waiting on cluster lock
local step = 0.1
local cluster_elapsed = 0

while cluster_elapsed < ttl do
ngx.sleep(step)
cluster_elapsed = cluster_elapsed + step

if cluster_elapsed >= ttl then
break
end

local locked, err = self.connector:read_lock(key)
if err then
return release_rlock_and_ret(rlock, nil, "failed to read cluster " ..
"lock: " .. err)
end

if not locked then
-- the cluster lock was released
return release_rlock_and_ret(rlock, false)
end

step = math.min(step * 3, MAX_LOCK_WAIT_STEP)
end

return release_rlock_and_ret(rlock, nil, "timeout")
end

-- cluster lock acquired, run callback

local pok, perr = xpcall(cb, debug.traceback)
if not pok then
self.connector:remove_lock(key, owner)

return release_rlock_and_ret(rlock, nil, "cluster_mutex callback " ..
"threw an error: " .. perr)
end

self.connector:remove_lock(key, owner)

return release_rlock_and_ret(rlock, true)
end
end


return DB
83 changes: 83 additions & 0 deletions kong/db/strategies/cassandra/connector.lua
Expand Up @@ -309,4 +309,87 @@ function CassandraConnector:truncate_table(table_name)
end


function CassandraConnector:setup_locks(default_ttl)
local ok, err = self:connect()
if not ok then
return nil, err
end

local cql = string.format([[
CREATE TABLE IF NOT EXISTS locks(
key text PRIMARY KEY,
owner text
) WITH default_time_to_live = %d
]], default_ttl)

local ok, err = self:query(cql)
if not ok then
self:setkeepalive()
return nil, err
end

ok, err = self.cluster:wait_schema_consensus(self.connection)
if not ok then
self:setkeepalive()
return nil, err
end

self:setkeepalive()

return true
end


function CassandraConnector:insert_lock(key, ttl, owner)
local cql = string.format([[
INSERT INTO locks(key, owner)
VALUES(?, ?)
IF NOT EXISTS
USING TTL %d
]], ttl)

local res, err = self:query(cql, { key, owner }, {
consistency = cassandra.consistencies.quorum,
})
if not res then
return nil, err
end

res = res[1]
if not res then
return nil, "unexpected result"
end

return res["[applied]"]
end


function CassandraConnector:read_lock(key)
local res, err = self:query([[
SELECT * FROM locks WHERE key = ?
]], { key }, {
consistency = cassandra.consistencies.serial,
})
if not res then
return nil, err
end

return res[1] ~= nil
end


function CassandraConnector:remove_lock(key, owner)
local res, err = self:query([[
DELETE FROM locks WHERE key = ? IF owner = ?
]], { key, owner }, {
consistency = cassandra.consistencies.quorum,
})
if not res then
return nil, err
end

return true
end


return CassandraConnector
20 changes: 20 additions & 0 deletions kong/db/strategies/connector.lua
Expand Up @@ -35,4 +35,24 @@ function Connector:truncate()
end


function Connector:setup_locks()
error(fmt("setup_locks() not implemented for '%s' strategy", self.database))
end


function Connector:insert_lock()
error(fmt("insert_lock() not implemented for '%s' strategy", self.database))
end


function Connector:read_lock()
error(fmt("read_lock() not implemented for '%s' strategy", self.database))
end


function Connector:remove_lock()
error(fmt("remove_lock() not implemented for '%s' strategy", self.database))
end


return Connector
112 changes: 112 additions & 0 deletions spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua
@@ -0,0 +1,112 @@
local helpers = require "spec.helpers"


for _, strategy in helpers.each_strategy("cassandra") do
describe("kong.db [#" .. strategy .. "]", function()
local db


setup(function()
local _
_, db, _ = helpers.get_db_utils(strategy)
end)


describe("db:cluster_mutex()", function()
it("returns 'true' when mutex ran and 'false' otherwise", function()
local t1 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key", nil, function()
ngx.sleep(0.1)
end)
assert.is_nil(err)
assert.equal(true, ok)
end)

local t2 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key", nil, function() end)
assert.is_nil(err)
assert.equal(false, ok)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)
end)


it("mutex ensures only one callback gets called", function()
local cb1 = spy.new(function() end)
local cb2 = spy.new(function() ngx.sleep(0.3) end)

local t1 = ngx.thread.spawn(function()
ngx.sleep(0.2)

local _, err = db:cluster_mutex("my_key_2", { owner = "1" }, cb1)
assert.is_nil(err)
end)

local t2 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_2", { owner = "2" }, cb2)
assert.is_nil(err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb2).was_called()
assert.spy(cb1).was_not_called()
end)


it("mutex can be subsequently acquired once released", function()
local cb1 = spy.new(function() end)
local cb2 = spy.new(function() end)

local t1 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_3", nil, cb1)
assert.is_nil(err)
end)

local t2 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_3", nil, cb2)
assert.is_nil(err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb1).was_called()
assert.spy(cb2).was_called()
end)


it("mutex cannot be held for longer than opts.ttl across nodes (DB lock)", function()
local cb1 = spy.new(function()
-- remove worker lock
ngx.shared.kong_locks:delete("my_key_5")
-- make DB lock expire
ngx.sleep(1)
end)

local cb2 = spy.new(function() end)

local t1 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb1)
assert.is_nil(err)
assert.equal(true, ok)
end)

local t2 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb2)
assert.is_nil(ok)
assert.equal("timeout", err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb1).was_called()
assert.spy(cb2).was_not_called()
end)
end)
end)
end

0 comments on commit ab1f678

Please sign in to comment.