Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db) implement the db:cluster_mutex() utility #3685

Merged
merged 1 commit into from Aug 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
153 changes: 153 additions & 0 deletions kong/db/init.lua
Expand Up @@ -185,4 +185,157 @@ function DB:set_events_handler(events)
end


do
local public = require "kong.tools.public"
local resty_lock = require "resty.lock"


local DEFAULT_TTL = 60 -- seconds
local MAX_LOCK_WAIT_STEP = 2 -- seconds


local function release_rlock_and_ret(rlock, ...)
rlock:unlock()

return ...
end


function DB:cluster_mutex(key, opts, cb)
if type(key) ~= "string" then
error("key must be a string", 2)
end

local owner
local ttl

if opts ~= nil then
if type(opts) ~= "table" then
error("opts must be a table", 2)
end

if opts.ttl and type(opts.ttl) ~= "number" then
error("opts.ttl must be a number", 2)
end

if opts.owner and type(opts.owner) ~= "string" then
error("opts.owner must be a string", 2)
end

owner = opts.owner
ttl = opts.ttl
end

if type(cb) ~= "function" then
local mt = getmetatable(cb)

if not mt or type(mt.__call) ~= "function" then
error("cb must be a function", 2)
end
end

if not owner then
-- generate a random string for this worker (resty-cli or runtime nginx
-- worker)
-- we use the `get_node_id()` public utility, but in the CLI context,
-- this value is ephemeral, so no assumptions should be made about the
-- real owner of a lock
local id, err = public.get_node_id()
if not id then
return nil, "failed to generate lock owner: " .. err
end

owner = id
end

if not ttl then
ttl = DEFAULT_TTL
end

local rlock, err = resty_lock:new("kong_locks", {
exptime = ttl,
timeout = ttl,
})
if not rlock then
return nil, "failed to create worker lock: " .. err
end

-- acquire a single worker

local elapsed, err = rlock:lock(key)
if not elapsed then
if err == "timeout" then
return nil, err
end

return nil, "failed to acquire worker lock: " .. err
end

if elapsed ~= 0 then
-- we did not acquire the worker lock, but it was released
return false
end

-- worker lock acquired, other workers are waiting on it
-- now acquire cluster lock via strategy-specific connector

-- ensure the locks table exists
local ok, err = self.connector:setup_locks(DEFAULT_TTL)
if not ok then
return nil, "failed to setup locks: " .. err
end

local ok, err = self.connector:insert_lock(key, ttl, owner)
if err then
return release_rlock_and_ret(rlock, nil, "failed to insert cluster lock: "
.. err)
end

if not ok then
-- waiting on cluster lock
local step = 0.1
local cluster_elapsed = 0

while cluster_elapsed < ttl do
ngx.sleep(step)
cluster_elapsed = cluster_elapsed + step

if cluster_elapsed >= ttl then
break
end

local locked, err = self.connector:read_lock(key)
if err then
return release_rlock_and_ret(rlock, nil, "failed to read cluster " ..
"lock: " .. err)
end

if not locked then
-- the cluster lock was released
return release_rlock_and_ret(rlock, false)
end

step = math.min(step * 3, MAX_LOCK_WAIT_STEP)
end

return release_rlock_and_ret(rlock, nil, "timeout")
end

-- cluster lock acquired, run callback

local pok, perr = xpcall(cb, debug.traceback)
if not pok then
self.connector:remove_lock(key, owner)

return release_rlock_and_ret(rlock, nil, "cluster_mutex callback " ..
"threw an error: " .. perr)
end

self.connector:remove_lock(key, owner)

return release_rlock_and_ret(rlock, true)
end
end


return DB
83 changes: 83 additions & 0 deletions kong/db/strategies/cassandra/connector.lua
Expand Up @@ -309,4 +309,87 @@ function CassandraConnector:truncate_table(table_name)
end


function CassandraConnector:setup_locks(default_ttl)
local ok, err = self:connect()
if not ok then
return nil, err
end

local cql = string.format([[
CREATE TABLE IF NOT EXISTS locks(
key text PRIMARY KEY,
owner text
) WITH default_time_to_live = %d
]], default_ttl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the above be in a migration? Not as much for this instance as well as creating a precedent.

Unless the intent is to use this lock, during migrations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the intent is to use this lock, during migrations?

Yes indeed


local ok, err = self:query(cql)
if not ok then
self:setkeepalive()
return nil, err
end

ok, err = self.cluster:wait_schema_consensus(self.connection)
if not ok then
self:setkeepalive()
return nil, err
end

self:setkeepalive()

return true
end


function CassandraConnector:insert_lock(key, ttl, owner)
local cql = string.format([[
INSERT INTO locks(key, owner)
VALUES(?, ?)
IF NOT EXISTS
USING TTL %d
]], ttl)

local res, err = self:query(cql, { key, owner }, {
consistency = cassandra.consistencies.quorum,
})
if not res then
return nil, err
end

res = res[1]
if not res then
return nil, "unexpected result"
end

return res["[applied]"]
end


function CassandraConnector:read_lock(key)
local res, err = self:query([[
SELECT * FROM locks WHERE key = ?
]], { key }, {
consistency = cassandra.consistencies.serial,
})
if not res then
return nil, err
end

return res[1] ~= nil
end


function CassandraConnector:remove_lock(key, owner)
local res, err = self:query([[
DELETE FROM locks WHERE key = ? IF owner = ?
]], { key, owner }, {
consistency = cassandra.consistencies.quorum,
})
if not res then
return nil, err
end

return true
end


return CassandraConnector
20 changes: 20 additions & 0 deletions kong/db/strategies/connector.lua
Expand Up @@ -35,4 +35,24 @@ function Connector:truncate()
end


function Connector:setup_locks()
error(fmt("setup_locks() not implemented for '%s' strategy", self.database))
end


function Connector:insert_lock()
error(fmt("insert_lock() not implemented for '%s' strategy", self.database))
end


function Connector:read_lock()
error(fmt("read_lock() not implemented for '%s' strategy", self.database))
end


function Connector:remove_lock()
error(fmt("remove_lock() not implemented for '%s' strategy", self.database))
end


return Connector
112 changes: 112 additions & 0 deletions spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua
@@ -0,0 +1,112 @@
local helpers = require "spec.helpers"


for _, strategy in helpers.each_strategy("cassandra") do
describe("kong.db [#" .. strategy .. "]", function()
local db


setup(function()
local _
_, db, _ = helpers.get_db_utils(strategy)
end)


describe("db:cluster_mutex()", function()
it("returns 'true' when mutex ran and 'false' otherwise", function()
local t1 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key", nil, function()
ngx.sleep(0.1)
end)
assert.is_nil(err)
assert.equal(true, ok)
end)

local t2 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key", nil, function() end)
assert.is_nil(err)
assert.equal(false, ok)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)
end)


it("mutex ensures only one callback gets called", function()
local cb1 = spy.new(function() end)
local cb2 = spy.new(function() ngx.sleep(0.3) end)

local t1 = ngx.thread.spawn(function()
ngx.sleep(0.2)

local _, err = db:cluster_mutex("my_key_2", { owner = "1" }, cb1)
assert.is_nil(err)
end)

local t2 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_2", { owner = "2" }, cb2)
assert.is_nil(err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb2).was_called()
assert.spy(cb1).was_not_called()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since cb2 takes 0.2 to run and t1 waits 0.2 until it acquires, isn't there a chance we get unlucky in this test and t2 releases the lock before t1 tries to acquire? (in other words, shouldn't the sleep in cb2 be a little longer than t1 to be on the safe side, or am I misunderstanding the test?)

end)


it("mutex can be subsequently acquired once released", function()
local cb1 = spy.new(function() end)
local cb2 = spy.new(function() end)

local t1 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_3", nil, cb1)
assert.is_nil(err)
end)

local t2 = ngx.thread.spawn(function()
local _, err = db:cluster_mutex("my_key_3", nil, cb2)
assert.is_nil(err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb1).was_called()
assert.spy(cb2).was_called()
end)


it("mutex cannot be held for longer than opts.ttl across nodes (DB lock)", function()
local cb1 = spy.new(function()
-- remove worker lock
ngx.shared.kong_locks:delete("my_key_5")
-- make DB lock expire
ngx.sleep(1)
end)

local cb2 = spy.new(function() end)

local t1 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb1)
assert.is_nil(err)
assert.equal(true, ok)
end)

local t2 = ngx.thread.spawn(function()
local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb2)
assert.is_nil(ok)
assert.equal("timeout", err)
end)

ngx.thread.wait(t1)
ngx.thread.wait(t2)

assert.spy(cb1).was_called()
assert.spy(cb2).was_not_called()
end)
end)
end)
end