Skip to content

Commit

Permalink
feat(db) manage composite cache_key values internally
Browse files Browse the repository at this point in the history
The Plugins entity has a particular case of a composite uniqueness constraint
involving nullable values, which isn't natively supported by either Postgres
or Cassandra. The old DAO performed filtered searches on these composite-unique
entries in two places: when using the cache_key values to validate the
uniqueness of a plugin and in the plugins iterator. This had two problems:

1. To perform the composite uniqueness check, the old schema library gave full
   DAO access to the self-check function, which then performed a filtered search.
2. The old-DAO semantics of the filtered search were suboptimal, because it did
   an overly general search and then filtered out the undesired values in a loop.
   This added extra computation to the plugin iterator.

The new approach generates the unique cache_key string as an internal field in
the database, which can be unique-tested for in the usual way (solving problem 1)
and also be searched for precisely (solving problem 2).

For uniqueness check, in Postgres it uses native UNIQUE. For Cassandra,
the implementation is pretty much the same as the other uniqueness checks.
The management of a hidden column in those strategies is similar to that of
the ttl field for Postgres.

There is one interesting gotcha, which is how to keep it up-to-date on partial
updates. There is no other alternative than doing read-before-write, but we
need to do this for partial updates of Plugin config field too, so this uses
the general mechanism between schema and generic DAO to determine "do I need a
read-before-write on this update" (currently, the two situations that will
trigger a read-before-write at the generic DAO level are updates of nested
records and updates of composite cache-key fields).

At this point, this mechanism will only be useful for the Plugins entity and
we don't see it getting more widespread use in the future. We expect that if
this composite-uniqueness restriction is dropped from Plugins, then this
composite-cache-key handling mechanism will be able to be dropped from the DAO
as well.
  • Loading branch information
hishamhm committed Sep 11, 2018
1 parent 3ccff83 commit 4c53d38
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 98 deletions.
140 changes: 90 additions & 50 deletions kong/db/dao/init.lua
Expand Up @@ -229,6 +229,50 @@ local function row_iterator(self, pager, size, options)
end


local function check_update(self, key, entity, options, name)
local entity_to_update, err, read_before_write =
self.schema:process_auto_fields(entity, "update")
if not entity_to_update then
local err_t = self.errors:schema_violation(err)
return nil, tostring(err_t), err_t
end

if read_before_write then
local rbw_entity, err, err_t
if name then
rbw_entity, err, err_t = self.strategy:select_by_field(name, key, options)
else
rbw_entity, err, err_t = self.strategy:select(key, options)
end
if not rbw_entity then
return nil, err, err_t
end

entity_to_update = self.schema:merge_values(entity_to_update, rbw_entity)
end

local ok, errors = self.schema:validate_update(entity_to_update)
if not ok then
local err_t = self.errors:schema_violation(errors)
return nil, tostring(err_t), err_t
end

if options ~= nil then
ok, errors = validate_options_value(options, self.schema, "update")
if not ok then
local err_t = self.errors:invalid_options(errors)
return nil, tostring(err_t), err_t
end
end

if self.schema.cache_key and #self.schema.cache_key > 1 then
entity_to_update.cache_key = self:cache_key(entity_to_update)
end

return entity_to_update
end


local function generate_foreign_key_methods(schema)
local methods = {}

Expand Down Expand Up @@ -287,7 +331,7 @@ local function generate_foreign_key_methods(schema)
end

local entities, err
entities, err, err_t = self:rows_to_entities(rows)
entities, err, err_t = self:rows_to_entities(rows, options)
if err then
return nil, err, err_t
end
Expand Down Expand Up @@ -381,29 +425,10 @@ local function generate_foreign_key_methods(schema)
return nil, tostring(err_t), err_t
end

if options ~= nil and type(options) ~= "table" then
error("options must be a table", 2)
end

local entity_to_update, err = self.schema:process_auto_fields(entity, "update")
local entity_to_update, err, err_t = check_update(self, unique_value,
entity, options, name)
if not entity_to_update then
local err_t = self.errors:schema_violation(err)
return nil, tostring(err_t), err_t
end

local errors
ok, errors = self.schema:validate_update(entity_to_update)
if not ok then
local err_t = self.errors:schema_violation(errors)
return nil, tostring(err_t), err_t
end

if options ~= nil then
ok, errors = validate_options_value(options, schema, "update")
if not ok then
local err_t = self.errors:invalid_options(errors)
return nil, tostring(err_t), err_t
end
return nil, err, err_t
end

local row, err_t = self.strategy:update_by_field(name, unique_value,
Expand Down Expand Up @@ -463,6 +488,10 @@ local function generate_foreign_key_methods(schema)
end
end

if self.schema.cache_key and #self.schema.cache_key > 1 then
entity_to_upsert.cache_key = self:cache_key(entity)
end

local row, err_t = self.strategy:upsert_by_field(name, unique_value,
entity_to_upsert, options)
if not row then
Expand Down Expand Up @@ -707,6 +736,10 @@ function DAO:insert(entity, options)
end
end

if self.schema.cache_key and #self.schema.cache_key > 1 then
entity_to_insert.cache_key = self:cache_key(entity)
end

local row, err_t = self.strategy:insert(entity_to_insert, options)
if not row then
return nil, tostring(err_t), err_t
Expand Down Expand Up @@ -741,34 +774,10 @@ function DAO:update(primary_key, entity, options)
return nil, tostring(err_t), err_t
end

local entity_to_update, err, read_before_write =
self.schema:process_auto_fields(entity, "update")
local entity_to_update, err, err_t = check_update(self, primary_key, entity,
options)
if not entity_to_update then
local err_t = self.errors:schema_violation(err)
return nil, tostring(err_t), err_t
end

if read_before_write then
local rbw_entity, err, err_t = self:select(primary_key)
if not rbw_entity then
return nil, err, err_t
end

entity_to_update = self.schema:merge_values(entity_to_update, rbw_entity)
end

ok, errors = self.schema:validate_update(entity_to_update)
if not ok then
local err_t = self.errors:schema_violation(errors)
return nil, tostring(err_t), err_t
end

if options ~= nil then
ok, errors = validate_options_value(options, self.schema, "update")
if not ok then
local err_t = self.errors:invalid_options(errors)
return nil, tostring(err_t), err_t
end
return nil, err, err_t
end

local row, err_t = self.strategy:update(primary_key, entity_to_update, options)
Expand Down Expand Up @@ -825,6 +834,10 @@ function DAO:upsert(primary_key, entity, options)
end
end

if self.schema.cache_key and #self.schema.cache_key > 1 then
entity_to_upsert.cache_key = self:cache_key(entity)
end

local row, err_t = self.strategy:upsert(primary_key, entity_to_upsert, options)
if not row then
return nil, tostring(err_t), err_t
Expand Down Expand Up @@ -883,6 +896,33 @@ function DAO:delete(primary_key, options)
end


function DAO:select_by_cache_key(cache_key)
local ck_definition = self.schema.cache_key
if not ck_definition then
error("entity does not have a cache_key defined", 2)
end

if type(cache_key) ~= "string" then
cache_key = self:cache_key(cache_key)
end

if #ck_definition == 1 then
return self["select_by_" .. ck_definition[1]](self, cache_key)
end

local row, err_t = self.strategy:select_by_field("cache_key", cache_key)
if err_t then
return nil, tostring(err_t), err_t
end

if not row then
return nil
end

return self:row_to_entity(row)
end


function DAO:rows_to_entities(rows, options)
local count = #rows
if count == 0 then
Expand Down
34 changes: 27 additions & 7 deletions kong/db/schema/init.lua
Expand Up @@ -1163,6 +1163,8 @@ function Schema:process_auto_fields(input, context, nulls)
local now_s = ngx_time()
local now_ms = ngx_now()
local read_before_write = false
local setting_cache_key = 0
local cache_key_len = self.cache_key and #self.cache_key

for key, field in self:each_field(input) do

Expand All @@ -1189,6 +1191,11 @@ function Schema:process_auto_fields(input, context, nulls)
local field_value = output[key]

if field_value ~= nil then

if cache_key_len and self.cache_key_set[key] then
setting_cache_key = setting_cache_key + 1
end

local field_type = field.type
if field_type == "array" then
output[key] = make_array(field_value)
Expand All @@ -1213,13 +1220,19 @@ function Schema:process_auto_fields(input, context, nulls)
end
end

-- If a partial update does not provide the subschema key,
-- we need to do a read-before-write to get it and be
-- able to properly validate the entity.
if context == "update"
and self.subschema_key
and input[self.subschema_key] == nil then
read_before_write = true
if context == "update" then
-- If a partial update does not provide the subschema key,
-- we need to do a read-before-write to get it and be
-- able to properly validate the entity.
if self.subschema_key and input[self.subschema_key] == nil then
read_before_write = true

-- If we're partially resetting the value of a composite cache key,
-- we to do a read-before-write to get the rest of the cache key
-- and be able to properly update it.
elseif setting_cache_key > 0 and cache_key_len ~= setting_cache_key then
read_before_write = true
end
end

return output, nil, read_before_write
Expand Down Expand Up @@ -1521,6 +1534,13 @@ function Schema.new(definition)
local self = copy(definition)
setmetatable(self, Schema)

if self.cache_key then
self.cache_key_set = {}
for _, name in ipairs(self.cache_key) do
self.cache_key_set[name] = true
end
end

-- Also give access to fields by name
for key, field in self:each_field() do
self.fields[key] = field
Expand Down

0 comments on commit 4c53d38

Please sign in to comment.