Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] dao prepare #146

Merged
merged 6 commits into from
Apr 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions kong/cli/db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ if args.command == "seed" then
cutils.logger:error_exit(err)
end

local err = dao_factory:prepare()
if err then
cutils.logger:error(err)
end

local faker = Faker(dao_factory)
faker:seed(args.random and args.number or nil)
cutils.logger:success("Populated")
Expand Down
107 changes: 58 additions & 49 deletions kong/dao/cassandra/base_dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local cassandra = require "cassandra"
local timestamp = require "kong.tools.timestamp"
local validate = require("kong.dao.schemas").validate
local DaoError = require "kong.dao.error"
local stringy = require "stringy"
local Object = require "classic"
local utils = require "kong.tools.utils"
local uuid = require "uuid"
Expand All @@ -17,8 +18,7 @@ uuid.seed()

function BaseDao:new(properties)
self._properties = properties
self._statements = {} -- Mirror of _queries but with prepared statements instead of strings
self._statements_cache = {} -- Prepared statements of SELECTS generated with find_by_keys
self._statements_cache = {}
end

-------------
Expand Down Expand Up @@ -87,7 +87,7 @@ end
function BaseDao:_check_unique(statement, t, is_updating)
local results, err = self:_execute(statement, t)
if err then
return false, "Error during UNIQUE check: "..err
return false, "Error during UNIQUE check: "..err.message
elseif results and #results > 0 then
if not is_updating then
return false
Expand Down Expand Up @@ -118,7 +118,7 @@ end
function BaseDao:_check_foreign(statement, t)
local results, err = self:_execute(statement, t)
if err then
return false, "Error during FOREIGN check: "..err
return false, "Error during FOREIGN check: "..err.message
elseif not results or #results == 0 then
return false
else
Expand All @@ -133,10 +133,10 @@ end
-- @return {table|nil} Error if any during execution
-- @return {table|nil} A table with the list of not existing foreign entities
function BaseDao:_check_all_foreign(t)
if not self._statements.__foreign then return true end
if not self._queries.__foreign then return true end

local errors
for k, statement in pairs(self._statements.__foreign) do
for k, statement in pairs(self._queries.__foreign) do
if t[k] and t[k] ~= constants.DATABASE_NULL_ID then
local exists, err = self:_check_foreign(statement, t)
if err then
Expand All @@ -158,10 +158,10 @@ end
-- @return {table|nil} Error if any during execution
-- @return {table|nil} A table with the list of already existing entities
function BaseDao:_check_all_unique(t, is_updating)
if not self._statements.__unique then return true end
if not self._queries.__unique then return true end

local errors
for k, statement in pairs(self._statements.__unique) do
for k, statement in pairs(self._queries.__unique) do
if t[k] or k == "self" then
local unique, err = self:_check_unique(statement, t, is_updating)
if err then
Expand Down Expand Up @@ -219,9 +219,11 @@ end

-- Execute an operation statement.
-- The operation can be one of the following:
-- * _statements (which contains .query and .param for ordered binding of parameters)
-- * _queries (which contains .query and .param for ordered binding of parameters) and
-- will be prepared on the go if not already in the statements cache
-- * a lua-resty-cassandra BatchStatement (see ratelimiting_metrics.lua)
-- * a lua-resty-cassandra prepared statement
-- * a plain query (string)
--
-- @param {table} operation The operation to execute
-- @param {table} values_to_bind Raw values to bind
Expand All @@ -232,33 +234,39 @@ end
-- Boolean if type of results is VOID
-- @return {table|nil} Cassandra error if any
function BaseDao:_execute(operation, values_to_bind, options)
local statement
local statement = operation

-- Determine kind of operation
if operation.is_kong_statement then
statement = operation.query
-- Retrieve the prepared statement from cache or prepare and cache
local cache_key
if operation.query then
cache_key = operation.query
elseif type(operation) == "string" then
cache_key = operation
end

if operation.params and values_to_bind then
local errors
values_to_bind, errors = encode_cassandra_values(self._schema, values_to_bind, operation.params)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end
if cache_key then
if not self._statements_cache[cache_key] then
statement = self:prepare_kong_statement(cache_key, operation.params)
else
statement = self._statements_cache[cache_key].statement
end
end

-- Bind parameters if operation has some
if operation.params and values_to_bind then
local errors
values_to_bind, errors = encode_cassandra_values(self._schema, values_to_bind, operation.params)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end
elseif operation.is_batch_statement then
statement = operation
values_to_bind = nil
options = nil
else
statement = operation
end

-- Execute operation
local session, err = self:_open_session()
if err then
return nil, err
end

-- Execute operation
local results, err = session:execute(statement, values_to_bind, options)
if err then
err = DaoError(err, error_types.DATABASE)
Expand All @@ -271,11 +279,12 @@ function BaseDao:_execute(operation, values_to_bind, options)

-- Parse result
if results and results.type == "ROWS" then
-- do we have more pages to fetch?
-- do we have more pages to fetch? if so, alias the paging_state
if results.meta.has_more_pages then
results.next_page = results.meta.paging_state
end

-- only the DAO needs those, it should be transparant in the application
results.meta = nil
results.type = nil

Expand All @@ -285,7 +294,7 @@ function BaseDao:_execute(operation, values_to_bind, options)

return results, err
elseif results and results.type == "VOID" then
-- return boolean
-- result is not a set of rows, let's return a boolean to indicate success
return err == nil, err
else
return results, err
Expand All @@ -304,15 +313,19 @@ end
--
-- @param {string} query A CQL query to prepare
-- @param {table} params An array of parameters (ordered) matching the query placeholders order
-- @return {table|nil} A "kong statement" to be used by _execute
-- @return {table|nil} A "kong statement" with a prepared statement and parameters to be used by _execute
-- @return {table|nil} Error if any
function BaseDao:prepare_kong_statement(query, params)
-- handle SELECT queries with %s for dynamic select by keys
local query_to_prepare = string.format(query, "")
query_to_prepare = stringy.strip(query_to_prepare)

local session, err = self:_open_session()
if err then
return nil, err
end

local prepared_stmt, prepare_err = session:prepare(query)
local prepared_stmt, prepare_err = session:prepare(query_to_prepare)

local err = self:_close_session(session)
if err then
Expand All @@ -322,11 +335,16 @@ function BaseDao:prepare_kong_statement(query, params)
if prepare_err then
return nil, DaoError("Failed to prepare statement: \""..query_to_prepare.."\". "..prepare_err, error_types.DATABASE)
else
return {
is_kong_statement = true,
local kong_statement = {
query = query,
params = params,
query = prepared_stmt
statement = prepared_stmt
}

-- cache key is the non-striped/non-formatted query from _queries
self._statements_cache[query] = kong_statement

return prepared_stmt
end
end

Expand Down Expand Up @@ -370,7 +388,7 @@ function BaseDao:insert(t)
return nil, DaoError(errors, error_types.FOREIGN)
end

local _, stmt_err = self:_execute(self._statements.insert, self:_marshall(t))
local _, stmt_err = self:_execute(self._queries.insert, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand All @@ -392,7 +410,7 @@ function BaseDao:update(t)

-- Check if exists to prevent upsert and manually set UNSET values (pfffff...)
local results
ok, err, results = self:_check_foreign(self._statements.select_one, t)
ok, err, results = self:_check_foreign(self._queries.select_one, t)
if err then
return nil, DaoError(err, error_types.DATABASE)
elseif not ok then
Expand Down Expand Up @@ -430,7 +448,7 @@ function BaseDao:update(t)
return nil, DaoError(errors, error_types.FOREIGN)
end

local _, stmt_err = self:_execute(self._statements.update, self:_marshall(t))
local _, stmt_err = self:_execute(self._queries.update, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand All @@ -443,7 +461,7 @@ end
-- @param {string} id UUID of element to select
-- @return _execute()
function BaseDao:find_one(id)
local data, err = self:_execute(self._statements.select_one, { id = id })
local data, err = self:_execute(self._queries.select_one, { id = id })

-- Return the 1st and only element of the result set
if data and utils.table_size(data) > 0 then
Expand Down Expand Up @@ -491,16 +509,7 @@ function BaseDao:find_by_keys(t, page_size, paging_state)

local select_query = string.format(self._queries.select.query, where_str)

-- prepare query in a statement cache
if not self._statements_cache[select_query] then
local kong_stmt, err = self:prepare_kong_statement(select_query, keys)
if err then
return nil, DaoError(err, error_types.DATABASE)
end
self._statements_cache[select_query] = kong_stmt
end

return self:_execute(self._statements_cache[select_query], t, {
return self:_execute({ query = select_query, params = keys }, t, {
page_size = page_size,
paging_state = paging_state
})
Expand All @@ -521,14 +530,14 @@ end
-- @return {boolean} True if deleted, false if otherwise or not found
-- @return {table|nil} Error if any
function BaseDao:delete(id)
local exists, err = self:_check_foreign(self._statements.select_one, { id = id })
local exists, err = self:_check_foreign(self._queries.select_one, { id = id })
if err then
return false, DaoError(err, error_types.DATABASE)
elseif not exists then
return false
end

return self:_execute(self._statements.delete, { id = id })
return self:_execute(self._queries.delete, { id = id })
end

return BaseDao
67 changes: 37 additions & 30 deletions kong/dao/cassandra/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ local stringy = require "stringy"
local Object = require "classic"

local Apis = require "kong.dao.cassandra.apis"
local RateLimitingMetrics = require "kong.dao.cassandra.ratelimiting_metrics"
local PluginsConfigurations = require "kong.dao.cassandra.plugins_configurations"
local Consumers = require "kong.dao.cassandra.consumers"
local PluginsConfigurations = require "kong.dao.cassandra.plugins_configurations"
local BasicAuthCredentials = require "kong.dao.cassandra.basicauth_credentials"
local RateLimitingMetrics = require "kong.dao.cassandra.ratelimiting_metrics"
local KeyAuthCredentials = require "kong.dao.cassandra.keyauth_credentials"

local CassandraFactory = Object:extend()
Expand Down Expand Up @@ -43,54 +43,61 @@ function CassandraFactory:new(properties)
self._properties.hosts = normalize_localhost(self._properties.hosts)

self.apis = Apis(properties)
self.ratelimiting_metrics = RateLimitingMetrics(properties)
self.plugins_configurations = PluginsConfigurations(properties)
self.consumers = Consumers(properties)
self.plugins_configurations = PluginsConfigurations(properties)
self.basicauth_credentials = BasicAuthCredentials(properties)
self.ratelimiting_metrics = RateLimitingMetrics(properties)
self.keyauth_credentials = KeyAuthCredentials(properties)
end

function CassandraFactory:drop()
return self:execute_queries [[
TRUNCATE apis;
TRUNCATE ratelimiting_metrics;
TRUNCATE plugins_configurations;
TRUNCATE consumers;
TRUNCATE plugins_configurations;
TRUNCATE basicauth_credentials;
TRUNCATE keyauth_credentials;
TRUNCATE ratelimiting_metrics;
]]
end

-- Prepare all statements in collection._queries and put them in collection._statements.
-- Should be called with only a collection and will recursively call itself for nested statements.
-- @param collection A collection with a ._queries property
local function prepare_collection(collection, queries, statements)
if not queries then queries = collection._queries end
if not statements then statements = collection._statements end

for stmt_name, query in pairs(queries) do
if type(query) == "table" and query.query == nil then
collection._statements[stmt_name] = {}
prepare_collection(collection, query, collection._statements[stmt_name])
else
local q = stringy.strip(query.query)
q = string.format(q, "")
local kong_stmt, err = collection:prepare_kong_statement(q, query.params)
if err then
error(err)
-- Prepare all statements of collections `._queries` property and put them
-- in a statements cache
--
-- Note:
-- Even if the BaseDAO's :_execute() method support preparation of statements on-the-go,
-- this method should be called when Kong starts in order to detect any failure in advance
-- as well as test the connection to Cassandra.
--
-- @return error if any
function CassandraFactory:prepare()
local function prepare_collection(collection, collection_queries)
if not collection_queries then collection_queries = collection._queries end
for stmt_name, collection_query in pairs(collection_queries) do
if type(collection_query) == "table" and collection_query.query == nil then
-- Nested queries, let's recurse to prepare them too
prepare_collection(collection, collection_query)
else
-- _queries can contain strings or tables with string + keys of parameters to bind
local query_to_prepare
if type(collection_query) == "string" then
query_to_prepare = collection_query
elseif collection_query.query then
query_to_prepare = collection_query.query
end

local _, err = collection:prepare_kong_statement(query_to_prepare, collection_query.params)
if err then
error(err)
end
end
statements[stmt_name] = kong_stmt
end
end
end

-- Prepare all statements of collections
-- @return error if any
function CassandraFactory:prepare()
for _, collection in ipairs({ self.apis,
self.ratelimiting_metrics,
self.plugins_configurations,
self.consumers,
self.plugins_configurations,
self.ratelimiting_metrics,
self.basicauth_credentials,
self.keyauth_credentials }) do
local status, err = pcall(function() prepare_collection(collection) end)
Expand Down
2 changes: 1 addition & 1 deletion kong/dao/cassandra/plugins_configurations.lua
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function PluginsConfigurations:find_distinct()

-- Execute query
local distinct_names = {}
for _, rows, page, err in session:execute(self._statements.select.query, nil, {auto_paging=true}) do
for _, rows, page, err in session:execute(string.format(self._queries.select.query, ""), nil, {auto_paging=true}) do
if err then
return nil, err
end
Expand Down
Loading