-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
init.lua
354 lines (284 loc) · 9.14 KB
/
init.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
local declarative_config = require "kong.db.schema.others.declarative_config"
local workspaces = require "kong.workspaces"
local lmdb = require("resty.lmdb")
local marshaller = require("kong.db.declarative.marshaller")
local yield = require("kong.tools.utils").yield
local unique_field_key = require("kong.db.declarative").unique_field_key
local kong = kong
local fmt = string.format
local type = type
local next = next
local sort = table.sort
local pairs = pairs
local match = string.match
local assert = assert
local tostring = tostring
local tonumber = tonumber
local encode_base64 = ngx.encode_base64
local decode_base64 = ngx.decode_base64
local null = ngx.null
local unmarshall = marshaller.unmarshall
local lmdb_get = lmdb.get
local get_workspace_id = workspaces.get_workspace_id
local PROCESS_AUTO_FIELDS_OPTS = {
no_defaults = true,
show_ws_id = true,
}
local off = {}
local _mt = {}
_mt.__index = _mt
local function ws(schema, options)
if not schema.workspaceable then
return ""
end
if options then
if options.workspace == null then
return "*"
end
if options.workspace then
return options.workspace
end
end
return get_workspace_id()
end
local function process_ttl_field(entity)
if entity and entity.ttl and entity.ttl ~= null then
local ttl_value = entity.ttl - ngx.time()
if ttl_value > 0 then
entity.ttl = ttl_value
else
entity = nil -- do not return the expired entity
end
end
return entity
end
-- Returns a dict of entity_ids tagged according to the given criteria.
-- Currently only the following kinds of keys are supported:
-- * A key like `services|<ws_id>|@list` will only return service keys
-- @tparam string the key to be used when filtering
-- @tparam table tag_names an array of tag names (strings)
-- @tparam string|nil tags_cond either "or", "and". `nil` means "or"
-- @treturn table|nil returns a table with entity_ids as values, and `true` as keys
local function get_entity_ids_tagged(key, tag_names, tags_cond)
local tag_name, list, err
local dict = {} -- keys are entity_ids, values are true
for i = 1, #tag_names do
tag_name = tag_names[i]
list, err = unmarshall(lmdb_get("taggings:" .. tag_name .. "|" .. key))
if err then
return nil, err
end
yield(true)
list = list or {}
if i > 1 and tags_cond == "and" then
local list_len = #list
-- optimization: exit early when tags_cond == "and" and one of the tags does not return any entities
if list_len == 0 then
return {}
end
local and_dict = {}
local new_tag_id
for i = 1, list_len do
new_tag_id = list[i]
and_dict[new_tag_id] = dict[new_tag_id] -- either true or nil
end
dict = and_dict
-- optimization: exit early when tags_cond == "and" and current list is empty
if not next(dict) then
return {}
end
else -- tags_cond == "or" or first iteration
-- the first iteration is the same for both "or" and "and": put all ids into dict
for i = 1, #list do
dict[list[i]] = true
end
end
end
local arr = {}
local len = 0
for entity_id in pairs(dict) do
len = len + 1
arr[len] = entity_id
end
sort(arr) -- consistency when paginating results
return arr
end
local function page_for_key(self, key, size, offset, options)
if not size then
size = self.connector:get_page_size(options)
end
if offset then
local token = decode_base64(offset)
if not token then
return nil, self.errors:invalid_offset(offset, "bad base64 encoding")
end
local number = tonumber(token)
if not number then
return nil, self.errors:invalid_offset(offset, "invalid offset")
end
offset = number
else
offset = 1
end
local list, err
if options and options.tags then
list, err = get_entity_ids_tagged(key, options.tags, options.tags_cond)
if err then
return nil, err
end
else
list, err = unmarshall(lmdb_get(key))
if err then
return nil, err
end
list = list or {}
end
yield()
local ret = {}
local ret_idx = 1
local schema = self.schema
local schema_name = schema.name
local item
for i = offset, offset + size - 1 do
item = list[i]
if not item then
offset = nil
break
end
-- Tags are stored in the cache entries "tags||@list" and "tags:<tagname>|@list"
-- The contents of both of these entries is an array of strings
-- Each of these strings has the form "<tag>|<entity_name>|<entity_id>"
-- For example "admin|services|<a service uuid>"
-- This loop transforms each individual string into tables.
if schema_name == "tags" then
local tag_name, entity_name, uuid = match(item, "^([^|]+)|([^|]+)|(.+)$")
if not tag_name then
return nil, "Could not parse tag from cache: " .. tostring(item)
end
item = { tag = tag_name, entity_name = entity_name, entity_id = uuid }
-- The rest of entities' lists (i.e. "services|<ws_id>|@list") only contain ids, so in order to
-- get the entities we must do an additional cache access per entry
else
item, err = unmarshall(lmdb_get(item))
if err then
return nil, err
end
end
if not item then
return nil, "stale data detected while paginating"
end
if schema.ttl then
item = process_ttl_field(item)
end
if item then
ret[ret_idx] = schema:process_auto_fields(item, "select", true, PROCESS_AUTO_FIELDS_OPTS)
ret_idx = ret_idx + 1
end
end
if offset then
return ret, nil, encode_base64(tostring(offset + size), true)
end
return ret
end
local function select_by_key(schema, key)
local entity, err = unmarshall(lmdb_get(key))
if not entity then
return nil, err
end
if schema.ttl then
entity = process_ttl_field(entity)
if not entity then
return nil
end
end
entity = schema:process_auto_fields(entity, "select", true, PROCESS_AUTO_FIELDS_OPTS)
return entity
end
local function page(self, size, offset, options)
local schema = self.schema
local ws_id = ws(schema, options)
local key = schema.name .. "|" .. ws_id .. "|@list"
return page_for_key(self, key, size, offset, options)
end
local function select(self, pk, options)
local schema = self.schema
local ws_id = ws(schema, options)
local id = declarative_config.pk_string(schema, pk)
local key = schema.name .. ":" .. id .. ":::::" .. ws_id
return select_by_key(schema, key)
end
local function select_by_field(self, field, value, options)
if type(value) == "table" then
local _
_, value = next(value)
end
local schema = self.schema
local ws_id = ws(schema, options)
local key
if field ~= "cache_key" then
local unique_across_ws = schema.fields[field].unique_across_ws
-- only accept global query by field if field is unique across workspaces
assert(not options or options.workspace ~= null or unique_across_ws)
key = unique_field_key(schema.name, ws_id, field, value, unique_across_ws)
else
-- if select_by_cache_key, use the provided cache_key as key directly
key = value
end
return select_by_key(schema, key)
end
do
local unsupported = function(operation)
return function(self)
local err = fmt("cannot %s '%s' entities when not using a database",
operation, self.schema.name)
return nil, self.errors:operation_unsupported(err)
end
end
local unsupported_by = function(operation)
return function(self, field_name)
local err = fmt("cannot %s '%s' entities by '%s' when not using a database",
operation, self.schema.name, '%s')
return nil, self.errors:operation_unsupported(fmt(err, field_name))
end
end
_mt.select = select
_mt.page = page
_mt.select_by_field = select_by_field
_mt.insert = unsupported("create")
_mt.update = unsupported("update")
_mt.upsert = unsupported("create or update")
_mt.delete = unsupported("remove")
_mt.update_by_field = unsupported_by("update")
_mt.upsert_by_field = unsupported_by("create or update")
_mt.delete_by_field = unsupported_by("remove")
_mt.truncate = function() return true end
-- off-strategy specific methods:
_mt.page_for_key = page_for_key
end
function off.new(connector, schema, errors)
local self = {
connector = connector, -- instance of kong.db.strategies.off.connector
schema = schema,
errors = errors,
}
if not kong.default_workspace then
-- This is not the id for the default workspace in DB-less.
-- This is a sentinel value for the init() phase before
-- the declarative config is actually loaded.
kong.default_workspace = "00000000-0000-0000-0000-000000000000"
end
local name = schema.name
for fname, fdata in schema:each_field() do
if fdata.type == "foreign" then
local entity = fdata.reference
local method = "page_for_" .. fname
self[method] = function(_, foreign_key, size, offset, options)
local ws_id = ws(schema, options)
local key = name .. "|" .. ws_id .. "|" .. entity .. "|" .. foreign_key.id .. "|@list"
return page_for_key(self, key, size, offset, options)
end
end
end
return setmetatable(self, _mt)
end
return off