From ab1f67867ed080942025c6f683e73558de1dece7 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 2 Aug 2018 20:28:17 -0700 Subject: [PATCH] feat(db) implement the db:cluster_mutex() utility This function can be called to set a cluster-wide lock (via the underlying DB, currently C* only) to execute some code in a "cluster-wide mutex". Example usage in the tests. --- kong/db/init.lua | 153 ++++++++++++++++++ kong/db/strategies/cassandra/connector.lua | 83 ++++++++++ kong/db/strategies/connector.lua | 20 +++ .../000-new-dao/03-db_cluster_mutex_spec.lua | 112 +++++++++++++ 4 files changed, 368 insertions(+) create mode 100644 spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua diff --git a/kong/db/init.lua b/kong/db/init.lua index a9e3cc1d4e9..bd7dbcd8397 100644 --- a/kong/db/init.lua +++ b/kong/db/init.lua @@ -185,4 +185,157 @@ function DB:set_events_handler(events) end +do + local public = require "kong.tools.public" + local resty_lock = require "resty.lock" + + + local DEFAULT_TTL = 60 -- seconds + local MAX_LOCK_WAIT_STEP = 2 -- seconds + + + local function release_rlock_and_ret(rlock, ...) + rlock:unlock() + + return ... + end + + + function DB:cluster_mutex(key, opts, cb) + if type(key) ~= "string" then + error("key must be a string", 2) + end + + local owner + local ttl + + if opts ~= nil then + if type(opts) ~= "table" then + error("opts must be a table", 2) + end + + if opts.ttl and type(opts.ttl) ~= "number" then + error("opts.ttl must be a number", 2) + end + + if opts.owner and type(opts.owner) ~= "string" then + error("opts.owner must be a string", 2) + end + + owner = opts.owner + ttl = opts.ttl + end + + if type(cb) ~= "function" then + local mt = getmetatable(cb) + + if not mt or type(mt.__call) ~= "function" then + error("cb must be a function", 2) + end + end + + if not owner then + -- generate a random string for this worker (resty-cli or runtime nginx + -- worker) + -- we use the `get_node_id()` public utility, but in the CLI context, + -- this value is ephemeral, so no assumptions should be made about the + -- real owner of a lock + local id, err = public.get_node_id() + if not id then + return nil, "failed to generate lock owner: " .. err + end + + owner = id + end + + if not ttl then + ttl = DEFAULT_TTL + end + + local rlock, err = resty_lock:new("kong_locks", { + exptime = ttl, + timeout = ttl, + }) + if not rlock then + return nil, "failed to create worker lock: " .. err + end + + -- acquire a single worker + + local elapsed, err = rlock:lock(key) + if not elapsed then + if err == "timeout" then + return nil, err + end + + return nil, "failed to acquire worker lock: " .. err + end + + if elapsed ~= 0 then + -- we did not acquire the worker lock, but it was released + return false + end + + -- worker lock acquired, other workers are waiting on it + -- now acquire cluster lock via strategy-specific connector + + -- ensure the locks table exists + local ok, err = self.connector:setup_locks(DEFAULT_TTL) + if not ok then + return nil, "failed to setup locks: " .. err + end + + local ok, err = self.connector:insert_lock(key, ttl, owner) + if err then + return release_rlock_and_ret(rlock, nil, "failed to insert cluster lock: " + .. err) + end + + if not ok then + -- waiting on cluster lock + local step = 0.1 + local cluster_elapsed = 0 + + while cluster_elapsed < ttl do + ngx.sleep(step) + cluster_elapsed = cluster_elapsed + step + + if cluster_elapsed >= ttl then + break + end + + local locked, err = self.connector:read_lock(key) + if err then + return release_rlock_and_ret(rlock, nil, "failed to read cluster " .. + "lock: " .. err) + end + + if not locked then + -- the cluster lock was released + return release_rlock_and_ret(rlock, false) + end + + step = math.min(step * 3, MAX_LOCK_WAIT_STEP) + end + + return release_rlock_and_ret(rlock, nil, "timeout") + end + + -- cluster lock acquired, run callback + + local pok, perr = xpcall(cb, debug.traceback) + if not pok then + self.connector:remove_lock(key, owner) + + return release_rlock_and_ret(rlock, nil, "cluster_mutex callback " .. + "threw an error: " .. perr) + end + + self.connector:remove_lock(key, owner) + + return release_rlock_and_ret(rlock, true) + end +end + + return DB diff --git a/kong/db/strategies/cassandra/connector.lua b/kong/db/strategies/cassandra/connector.lua index bd626b17f87..d30bd394c93 100644 --- a/kong/db/strategies/cassandra/connector.lua +++ b/kong/db/strategies/cassandra/connector.lua @@ -309,4 +309,87 @@ function CassandraConnector:truncate_table(table_name) end +function CassandraConnector:setup_locks(default_ttl) + local ok, err = self:connect() + if not ok then + return nil, err + end + + local cql = string.format([[ + CREATE TABLE IF NOT EXISTS locks( + key text PRIMARY KEY, + owner text + ) WITH default_time_to_live = %d + ]], default_ttl) + + local ok, err = self:query(cql) + if not ok then + self:setkeepalive() + return nil, err + end + + ok, err = self.cluster:wait_schema_consensus(self.connection) + if not ok then + self:setkeepalive() + return nil, err + end + + self:setkeepalive() + + return true +end + + +function CassandraConnector:insert_lock(key, ttl, owner) + local cql = string.format([[ + INSERT INTO locks(key, owner) + VALUES(?, ?) + IF NOT EXISTS + USING TTL %d + ]], ttl) + + local res, err = self:query(cql, { key, owner }, { + consistency = cassandra.consistencies.quorum, + }) + if not res then + return nil, err + end + + res = res[1] + if not res then + return nil, "unexpected result" + end + + return res["[applied]"] +end + + +function CassandraConnector:read_lock(key) + local res, err = self:query([[ + SELECT * FROM locks WHERE key = ? + ]], { key }, { + consistency = cassandra.consistencies.serial, + }) + if not res then + return nil, err + end + + return res[1] ~= nil +end + + +function CassandraConnector:remove_lock(key, owner) + local res, err = self:query([[ + DELETE FROM locks WHERE key = ? IF owner = ? + ]], { key, owner }, { + consistency = cassandra.consistencies.quorum, + }) + if not res then + return nil, err + end + + return true +end + + return CassandraConnector diff --git a/kong/db/strategies/connector.lua b/kong/db/strategies/connector.lua index 8171c02783a..e3dd62c70ea 100644 --- a/kong/db/strategies/connector.lua +++ b/kong/db/strategies/connector.lua @@ -35,4 +35,24 @@ function Connector:truncate() end +function Connector:setup_locks() + error(fmt("setup_locks() not implemented for '%s' strategy", self.database)) +end + + +function Connector:insert_lock() + error(fmt("insert_lock() not implemented for '%s' strategy", self.database)) +end + + +function Connector:read_lock() + error(fmt("read_lock() not implemented for '%s' strategy", self.database)) +end + + +function Connector:remove_lock() + error(fmt("remove_lock() not implemented for '%s' strategy", self.database)) +end + + return Connector diff --git a/spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua b/spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua new file mode 100644 index 00000000000..a1662a07887 --- /dev/null +++ b/spec/02-integration/000-new-dao/03-db_cluster_mutex_spec.lua @@ -0,0 +1,112 @@ +local helpers = require "spec.helpers" + + +for _, strategy in helpers.each_strategy("cassandra") do + describe("kong.db [#" .. strategy .. "]", function() + local db + + + setup(function() + local _ + _, db, _ = helpers.get_db_utils(strategy) + end) + + + describe("db:cluster_mutex()", function() + it("returns 'true' when mutex ran and 'false' otherwise", function() + local t1 = ngx.thread.spawn(function() + local ok, err = db:cluster_mutex("my_key", nil, function() + ngx.sleep(0.1) + end) + assert.is_nil(err) + assert.equal(true, ok) + end) + + local t2 = ngx.thread.spawn(function() + local ok, err = db:cluster_mutex("my_key", nil, function() end) + assert.is_nil(err) + assert.equal(false, ok) + end) + + ngx.thread.wait(t1) + ngx.thread.wait(t2) + end) + + + it("mutex ensures only one callback gets called", function() + local cb1 = spy.new(function() end) + local cb2 = spy.new(function() ngx.sleep(0.3) end) + + local t1 = ngx.thread.spawn(function() + ngx.sleep(0.2) + + local _, err = db:cluster_mutex("my_key_2", { owner = "1" }, cb1) + assert.is_nil(err) + end) + + local t2 = ngx.thread.spawn(function() + local _, err = db:cluster_mutex("my_key_2", { owner = "2" }, cb2) + assert.is_nil(err) + end) + + ngx.thread.wait(t1) + ngx.thread.wait(t2) + + assert.spy(cb2).was_called() + assert.spy(cb1).was_not_called() + end) + + + it("mutex can be subsequently acquired once released", function() + local cb1 = spy.new(function() end) + local cb2 = spy.new(function() end) + + local t1 = ngx.thread.spawn(function() + local _, err = db:cluster_mutex("my_key_3", nil, cb1) + assert.is_nil(err) + end) + + local t2 = ngx.thread.spawn(function() + local _, err = db:cluster_mutex("my_key_3", nil, cb2) + assert.is_nil(err) + end) + + ngx.thread.wait(t1) + ngx.thread.wait(t2) + + assert.spy(cb1).was_called() + assert.spy(cb2).was_called() + end) + + + it("mutex cannot be held for longer than opts.ttl across nodes (DB lock)", function() + local cb1 = spy.new(function() + -- remove worker lock + ngx.shared.kong_locks:delete("my_key_5") + -- make DB lock expire + ngx.sleep(1) + end) + + local cb2 = spy.new(function() end) + + local t1 = ngx.thread.spawn(function() + local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb1) + assert.is_nil(err) + assert.equal(true, ok) + end) + + local t2 = ngx.thread.spawn(function() + local ok, err = db:cluster_mutex("my_key_5", { ttl = 0.5 }, cb2) + assert.is_nil(ok) + assert.equal("timeout", err) + end) + + ngx.thread.wait(t1) + ngx.thread.wait(t2) + + assert.spy(cb1).was_called() + assert.spy(cb2).was_not_called() + end) + end) + end) +end