From 4c53d38fa21f85e6966fc32f6bfd8de26545c01c Mon Sep 17 00:00:00 2001 From: Hisham Muhammad Date: Mon, 27 Aug 2018 18:32:10 -0300 Subject: [PATCH] feat(db) manage composite cache_key values internally The Plugins entity has a particular case of a composite uniqueness constraint involving nullable values, which isn't natively supported by either Postgres or Cassandra. The old DAO performed filtered searches on these composite-unique entries in two places: when using the cache_key values to validate the uniqueness of a plugin and in the plugins iterator. This had two problems: 1. To perform the composite uniqueness check, the old schema library gave full DAO access to the self-check function, which then performed a filtered search. 2. The old-DAO semantics of the filtered search were suboptimal, because it did an overly general search and then filtered out the undesired values in a loop. This added extra computation to the plugin iterator. The new approach generates the unique cache_key string as an internal field in the database, which can be unique-tested for in the usual way (solving problem 1) and also be searched for precisely (solving problem 2). For uniqueness check, in Postgres it uses native UNIQUE. For Cassandra, the implementation is pretty much the same as the other uniqueness checks. The management of a hidden column in those strategies is similar to that of the ttl field for Postgres. There is one interesting gotcha, which is how to keep it up-to-date on partial updates. There is no other alternative than doing read-before-write, but we need to do this for partial updates of Plugin config field too, so this uses the general mechanism between schema and generic DAO to determine "do I need a read-before-write on this update" (currently, the two situations that will trigger a read-before-write at the generic DAO level are updates of nested records and updates of composite cache-key fields). At this point, this mechanism will only be useful for the Plugins entity and we don't see it getting more widespread use in the future. We expect that if this composite-uniqueness restriction is dropped from Plugins, then this composite-cache-key handling mechanism will be able to be dropped from the DAO as well. --- kong/db/dao/init.lua | 140 +++++++++++++++++--------- kong/db/schema/init.lua | 34 +++++-- kong/db/strategies/cassandra/init.lua | 137 ++++++++++++++++++------- kong/db/strategies/postgres/init.lua | 68 ++++++++++++- 4 files changed, 281 insertions(+), 98 deletions(-) diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index b2855daece4..2045c90eb41 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -229,6 +229,50 @@ local function row_iterator(self, pager, size, options) end +local function check_update(self, key, entity, options, name) + local entity_to_update, err, read_before_write = + self.schema:process_auto_fields(entity, "update") + if not entity_to_update then + local err_t = self.errors:schema_violation(err) + return nil, tostring(err_t), err_t + end + + if read_before_write then + local rbw_entity, err, err_t + if name then + rbw_entity, err, err_t = self.strategy:select_by_field(name, key, options) + else + rbw_entity, err, err_t = self.strategy:select(key, options) + end + if not rbw_entity then + return nil, err, err_t + end + + entity_to_update = self.schema:merge_values(entity_to_update, rbw_entity) + end + + local ok, errors = self.schema:validate_update(entity_to_update) + if not ok then + local err_t = self.errors:schema_violation(errors) + return nil, tostring(err_t), err_t + end + + if options ~= nil then + ok, errors = validate_options_value(options, self.schema, "update") + if not ok then + local err_t = self.errors:invalid_options(errors) + return nil, tostring(err_t), err_t + end + end + + if self.schema.cache_key and #self.schema.cache_key > 1 then + entity_to_update.cache_key = self:cache_key(entity_to_update) + end + + return entity_to_update +end + + local function generate_foreign_key_methods(schema) local methods = {} @@ -287,7 +331,7 @@ local function generate_foreign_key_methods(schema) end local entities, err - entities, err, err_t = self:rows_to_entities(rows) + entities, err, err_t = self:rows_to_entities(rows, options) if err then return nil, err, err_t end @@ -381,29 +425,10 @@ local function generate_foreign_key_methods(schema) return nil, tostring(err_t), err_t end - if options ~= nil and type(options) ~= "table" then - error("options must be a table", 2) - end - - local entity_to_update, err = self.schema:process_auto_fields(entity, "update") + local entity_to_update, err, err_t = check_update(self, unique_value, + entity, options, name) if not entity_to_update then - local err_t = self.errors:schema_violation(err) - return nil, tostring(err_t), err_t - end - - local errors - ok, errors = self.schema:validate_update(entity_to_update) - if not ok then - local err_t = self.errors:schema_violation(errors) - return nil, tostring(err_t), err_t - end - - if options ~= nil then - ok, errors = validate_options_value(options, schema, "update") - if not ok then - local err_t = self.errors:invalid_options(errors) - return nil, tostring(err_t), err_t - end + return nil, err, err_t end local row, err_t = self.strategy:update_by_field(name, unique_value, @@ -463,6 +488,10 @@ local function generate_foreign_key_methods(schema) end end + if self.schema.cache_key and #self.schema.cache_key > 1 then + entity_to_upsert.cache_key = self:cache_key(entity) + end + local row, err_t = self.strategy:upsert_by_field(name, unique_value, entity_to_upsert, options) if not row then @@ -707,6 +736,10 @@ function DAO:insert(entity, options) end end + if self.schema.cache_key and #self.schema.cache_key > 1 then + entity_to_insert.cache_key = self:cache_key(entity) + end + local row, err_t = self.strategy:insert(entity_to_insert, options) if not row then return nil, tostring(err_t), err_t @@ -741,34 +774,10 @@ function DAO:update(primary_key, entity, options) return nil, tostring(err_t), err_t end - local entity_to_update, err, read_before_write = - self.schema:process_auto_fields(entity, "update") + local entity_to_update, err, err_t = check_update(self, primary_key, entity, + options) if not entity_to_update then - local err_t = self.errors:schema_violation(err) - return nil, tostring(err_t), err_t - end - - if read_before_write then - local rbw_entity, err, err_t = self:select(primary_key) - if not rbw_entity then - return nil, err, err_t - end - - entity_to_update = self.schema:merge_values(entity_to_update, rbw_entity) - end - - ok, errors = self.schema:validate_update(entity_to_update) - if not ok then - local err_t = self.errors:schema_violation(errors) - return nil, tostring(err_t), err_t - end - - if options ~= nil then - ok, errors = validate_options_value(options, self.schema, "update") - if not ok then - local err_t = self.errors:invalid_options(errors) - return nil, tostring(err_t), err_t - end + return nil, err, err_t end local row, err_t = self.strategy:update(primary_key, entity_to_update, options) @@ -825,6 +834,10 @@ function DAO:upsert(primary_key, entity, options) end end + if self.schema.cache_key and #self.schema.cache_key > 1 then + entity_to_upsert.cache_key = self:cache_key(entity) + end + local row, err_t = self.strategy:upsert(primary_key, entity_to_upsert, options) if not row then return nil, tostring(err_t), err_t @@ -883,6 +896,33 @@ function DAO:delete(primary_key, options) end +function DAO:select_by_cache_key(cache_key) + local ck_definition = self.schema.cache_key + if not ck_definition then + error("entity does not have a cache_key defined", 2) + end + + if type(cache_key) ~= "string" then + cache_key = self:cache_key(cache_key) + end + + if #ck_definition == 1 then + return self["select_by_" .. ck_definition[1]](self, cache_key) + end + + local row, err_t = self.strategy:select_by_field("cache_key", cache_key) + if err_t then + return nil, tostring(err_t), err_t + end + + if not row then + return nil + end + + return self:row_to_entity(row) +end + + function DAO:rows_to_entities(rows, options) local count = #rows if count == 0 then diff --git a/kong/db/schema/init.lua b/kong/db/schema/init.lua index 1d7bacbe8f3..e653080a660 100644 --- a/kong/db/schema/init.lua +++ b/kong/db/schema/init.lua @@ -1163,6 +1163,8 @@ function Schema:process_auto_fields(input, context, nulls) local now_s = ngx_time() local now_ms = ngx_now() local read_before_write = false + local setting_cache_key = 0 + local cache_key_len = self.cache_key and #self.cache_key for key, field in self:each_field(input) do @@ -1189,6 +1191,11 @@ function Schema:process_auto_fields(input, context, nulls) local field_value = output[key] if field_value ~= nil then + + if cache_key_len and self.cache_key_set[key] then + setting_cache_key = setting_cache_key + 1 + end + local field_type = field.type if field_type == "array" then output[key] = make_array(field_value) @@ -1213,13 +1220,19 @@ function Schema:process_auto_fields(input, context, nulls) end end - -- If a partial update does not provide the subschema key, - -- we need to do a read-before-write to get it and be - -- able to properly validate the entity. - if context == "update" - and self.subschema_key - and input[self.subschema_key] == nil then - read_before_write = true + if context == "update" then + -- If a partial update does not provide the subschema key, + -- we need to do a read-before-write to get it and be + -- able to properly validate the entity. + if self.subschema_key and input[self.subschema_key] == nil then + read_before_write = true + + -- If we're partially resetting the value of a composite cache key, + -- we to do a read-before-write to get the rest of the cache key + -- and be able to properly update it. + elseif setting_cache_key > 0 and cache_key_len ~= setting_cache_key then + read_before_write = true + end end return output, nil, read_before_write @@ -1521,6 +1534,13 @@ function Schema.new(definition) local self = copy(definition) setmetatable(self, Schema) + if self.cache_key then + self.cache_key_set = {} + for _, name in ipairs(self.cache_key) do + self.cache_key_set[name] = true + end + end + -- Also give access to fields by name for key, field in self:each_field() do self.fields[key] = field diff --git a/kong/db/strategies/cassandra/init.lua b/kong/db/strategies/cassandra/init.lua index 6f645f3932d..cb7ee0ca134 100644 --- a/kong/db/strategies/cassandra/init.lua +++ b/kong/db/strategies/cassandra/init.lua @@ -34,6 +34,9 @@ end local APPLIED_COLUMN = "[applied]" +local cache_key_field = { type = "string" } + + local _constraints = {} @@ -85,19 +88,28 @@ local function build_queries(self) local schema = self.schema local n_fields = #schema.fields local n_pk = #schema.primary_key + local composite_cache_key = schema.cache_key and #schema.cache_key > 1 - local insert_columns = new_tab(n_fields, 0) + local select_columns = new_tab(n_fields, 0) for field_name, field in schema:each_field() do if field.type == "foreign" then local db_columns = self.foreign_keys_db_columns[field_name] for i = 1, #db_columns do - insert(insert_columns, db_columns[i].col_name) + insert(select_columns, db_columns[i].col_name) end else - insert(insert_columns, field_name) + insert(select_columns, field_name) end end - insert_columns = concat(insert_columns, ", ") + select_columns = concat(select_columns, ", ") + local insert_columns = select_columns + + local insert_bind_args = rep("?, ", n_fields):sub(1, -3) + + if composite_cache_key then + insert_columns = select_columns .. ", cache_key" + insert_bind_args = insert_bind_args .. ", ?" + end local select_bind_args = new_tab(n_pk, 0) for _, field_name in self.each_pk_field() do @@ -105,8 +117,6 @@ local function build_queries(self) end select_bind_args = concat(select_bind_args, " AND ") - local insert_bind_args = rep("?, ", n_fields):sub(1, -3) - local partitioned, err = is_partitioned(self) if err then return nil, err @@ -124,15 +134,15 @@ local function build_queries(self) select = fmt([[ SELECT %s FROM %s WHERE partition = '%s' AND %s - ]], insert_columns, schema.name, schema.name, select_bind_args), + ]], select_columns, schema.name, schema.name, select_bind_args), select_page = fmt([[ SELECT %s FROM %s WHERE partition = '%s' - ]], insert_columns, schema.name, schema.name), + ]], select_columns, schema.name, schema.name), select_with_filter = fmt([[ SELECT %s FROM %s WHERE partition = '%s' AND %s - ]], insert_columns, schema.name, schema.name, "%s"), + ]], select_columns, schema.name, schema.name, "%s"), update = fmt([[ UPDATE %s SET %s WHERE partition = '%s' AND %s IF EXISTS @@ -168,17 +178,17 @@ local function build_queries(self) -- might raise a "you must enable ALLOW FILTERING" error select = fmt([[ SELECT %s FROM %s WHERE %s - ]], insert_columns, schema.name, select_bind_args), + ]], select_columns, schema.name, select_bind_args), -- might raise a "you must enable ALLOW FILTERING" error select_page = fmt([[ SELECT %s FROM %s - ]], insert_columns, schema.name), + ]], select_columns, schema.name), -- might raise a "you must enable ALLOW FILTERING" error select_with_filter = fmt([[ SELECT %s FROM %s WHERE %s - ]], insert_columns, schema.name, "%s"), + ]], select_columns, schema.name, "%s"), update = fmt([[ UPDATE %s SET %s WHERE %s IF EXISTS @@ -514,10 +524,52 @@ local function _select(self, cql, args) end +local function check_unique(self, primary_key, entity, field_name) + -- a UNIQUE constaint is set on this field. + -- We unfortunately follow a read-before-write pattern in this case, + -- but this is made necessary for Kong to behave in a + -- database-agnostic fashion between its supported RDBMs and + -- Cassandra. + local row, err_t = self:select_by_field(field_name, entity[field_name]) + if err_t then + return nil, err_t + end + + if row then + for _, pk_field_name in self.each_pk_field() do + if primary_key[pk_field_name] ~= row[pk_field_name] then + -- already exists + if field_name == "cache_key" then + local keys = {} + local schema = self.schema + for _, k in ipairs(schema.cache_key) do + local field = schema.fields[k] + if field.type == "foreign" and entity[k] ~= ngx.null then + keys[k] = field.schema:extract_pk_values(entity[k]) + else + keys[k] = entity[k] + end + end + return nil, self.errors:unique_violation(keys) + end + + return nil, self.errors:unique_violation { + [field_name] = entity[field_name], + } + end + end + end + + return true +end + + function _mt:insert(entity, options) local schema = self.schema local args = new_tab(#schema.fields, 0) local ttl = schema.ttl and options and options.ttl + local composite_cache_key = schema.cache_key and #schema.cache_key > 1 + local primary_key local cql, err if ttl then @@ -561,23 +613,27 @@ function _mt:insert(entity, options) -- We unfortunately follow a read-before-write pattern in this case, -- but this is made necessary for Kong to behave in a database-agnostic -- fashion between its supported RDBMs and Cassandra. - local row, err_t = self:select_by_field(field_name, entity[field_name]) + primary_key = primary_key or schema:extract_pk_values(entity) + local _, err_t = check_unique(self, primary_key, entity, field_name) if err_t then return nil, err_t end - - if row then - -- already exists - return nil, self.errors:unique_violation { - [field_name] = entity[field_name], - } - end end insert(args, serialize_arg(field, entity[field_name])) end end + if composite_cache_key then + primary_key = primary_key or schema:extract_pk_values(entity) + local _, err_t = check_unique(self, primary_key, entity, "cache_key") + if err_t then + return nil, err_t + end + + insert(args, serialize_arg(cache_key_field, entity["cache_key"])) + end + -- execute query local res, err = self.connector:query(cql, args, nil, "write") @@ -591,9 +647,9 @@ function _mt:insert(entity, options) if res[APPLIED_COLUMN] == false then -- lightweight transaction (IF NOT EXISTS) failed, -- retrieve PK values for the PK violation error - local pk_values = schema:extract_pk_values(entity) + primary_key = primary_key or schema:extract_pk_values(entity) - return nil, self.errors:primary_key_violation(pk_values) + return nil, self.errors:primary_key_violation(primary_key) end -- return foreign key as if they were fetched from :select() @@ -649,6 +705,11 @@ function _mt:select_by_field(field_name, field_value, options) local select_cql = fmt(cql, field_name .. " = ?") local bind_args = new_tab(1, 0) local field = self.schema.fields[field_name] + + if field_name == "cache_key" then + field = cache_key_field + end + bind_args[1] = serialize_arg(field, field_value) return _select(self, select_cql, bind_args) @@ -726,6 +787,7 @@ do local function update(self, primary_key, entity, mode, options) local schema = self.schema local ttl = schema.ttl and options and options.ttl + local composite_cache_key = schema.cache_key and #schema.cache_key > 1 local query_name if ttl then @@ -762,26 +824,10 @@ do else if field.unique and entity[field_name] ~= ngx.null then - -- a UNIQUE constaint is set on this field. - -- We unfortunately follow a read-before-write pattern in this case, - -- but this is made necessary for Kong to behave in a - -- database-agnostic fashion between its supported RDBMs and - -- Cassandra. - local row, err_t = self:select_by_field(field_name, entity[field_name]) + local _, err_t = check_unique(self, primary_key, entity, field_name) if err_t then return nil, err_t end - - if row then - for _, pk_field_name in self.each_pk_field() do - if primary_key[pk_field_name] ~= row[pk_field_name] then - -- already exists - return nil, self.errors:unique_violation { - [field_name] = entity[field_name], - } - end - end - end end insert(args, serialize_arg(field, entity[field_name])) @@ -790,6 +836,15 @@ do end end + if composite_cache_key then + local _, err_t = check_unique(self, primary_key, entity, "cache_key") + if err_t then + return nil, err_t + end + + insert(args, serialize_arg(cache_key_field, entity["cache_key"])) + end + -- serialize WHERE clause args for i, field_name, field in self.each_pk_field() do @@ -805,6 +860,10 @@ do update_columns_binds[i] = args_names[i] .. " = ?" end + if composite_cache_key then + insert(update_columns_binds, "cache_key = ?") + end + if ttl then cql = fmt(cql, ttl, concat(update_columns_binds, ", ")) else diff --git a/kong/db/strategies/postgres/init.lua b/kong/db/strategies/postgres/init.lua index 000c6393ed6..6996698c73d 100644 --- a/kong/db/strategies/postgres/init.lua +++ b/kong/db/strategies/postgres/init.lua @@ -386,6 +386,19 @@ local function toerror(strategy, err, primary_key, entity) if find(err, "violates unique constraint", 1, true) then log(NOTICE, err) + if find(err, "cache_key", 1, true) then + local keys = {} + for _, k in ipairs(schema.cache_key) do + local field = schema.fields[k] + if field.type == "foreign" and entity[k] ~= null then + keys[k] = field.schema:extract_pk_values(entity[k]) + else + keys[k] = entity[k] + end + end + return nil, errors:unique_violation(keys) + end + for field_name, field in schema:each_field() do if field.unique then if find(err, field_name, 1, true) then @@ -539,7 +552,6 @@ local function execute(strategy, statement_name, attributes, options) end local sql = statement.make(argv) - return connector:query(sql) end @@ -859,6 +871,7 @@ function _M.new(connector, schema, errors) end local ttl = schema.ttl == true + local composite_cache_key = schema.cache_key and #schema.cache_key > 1 local max_name_length = ttl and 3 or 1 local max_type_length = ttl and 24 or 1 local fields = {} @@ -1170,6 +1183,28 @@ function _M.new(connector, schema, errors) end local create_count = fields_count + 1 + + local cache_key_escaped + local cache_key_index + if composite_cache_key then + cache_key_escaped = escape_identifier(connector, "cache_key") + cache_key_index = escape_identifier(connector, table_name .. "_" .. "cache_key_idx") + update_fields_count = update_fields_count + 1 + update_names[update_fields_count] = "cache_key" + update_args_names[update_fields_count] = "cache_key" + update_expressions[update_fields_count] = cache_key_escaped .. " = $" .. update_fields_count + upsert_expressions[update_fields_count] = cache_key_escaped .. " = " .. "EXCLUDED." .. cache_key_escaped + + local create_expression = { + cache_key_escaped, + rep(" ", max_name_length - #cache_key_escaped + 2), + field_type_to_postgres_type({ type = "string" }), + } + + create_expressions[create_count] = concat(create_expression) + create_count = create_count + 1 + end + local ttl_escaped local ttl_index if ttl then @@ -1234,6 +1269,16 @@ function _M.new(connector, schema, errors) local count_statement local drop_statement + if composite_cache_key then + fields_hash.cache_key = { type = "string" } + + insert_count = fields_count + 1 + insert_names[insert_count] = "cache_key" + insert_expressions[insert_count] = "$" .. insert_count + insert_columns[insert_count] = cache_key_escaped + fields_count = fields_count + 1 + end + if ttl then fields_hash.ttl = { timestamp = true } @@ -1406,6 +1451,13 @@ function _M.new(connector, schema, errors) end end + if composite_cache_key then + create_statement = concat { create_statement, + "CREATE INDEX IF NOT EXISTS ", cache_key_index, + " ON ", table_name_escaped, " (", cache_key_escaped, ");" + } + end + local truncate_statement = concat { "TRUNCATE ", table_name_escaped, " RESTART IDENTITY CASCADE;" } @@ -1574,6 +1626,14 @@ function _M.new(connector, schema, errors) end end + if composite_cache_key then + unique_fields_count = unique_fields_count + 1 + insert(unique_fields, { + name = "cache_key", + name_escaped = escape_identifier(connector, "cache_key"), + }) + end + if unique_fields_count > 0 then local update_by_args_count = update_fields_count + 1 local update_by_args = new_tab(update_by_args_count, 0) @@ -1648,10 +1708,14 @@ function _M.new(connector, schema, errors) } local upsert_by_statement_name = "upsert_by_" .. unique_name + local conflict_key = unique_escaped + if composite_cache_key then + conflict_key = escape_identifier(connector, "cache_key") + end local upsert_by_statement = concat { "INSERT INTO ", table_name_escaped, " (", insert_columns, ")\n", " VALUES (", insert_expressions, ")\n", - "ON CONFLICT (", unique_escaped, ") DO UPDATE\n", + "ON CONFLICT (", conflict_key, ") DO UPDATE\n", " SET ", concat(upsert_expressions, ", "), "\n", " RETURNING ", select_expressions, ";", }