Skip to content

Commit

Permalink
modules: http, graphite, policy, daf support map()
Browse files Browse the repository at this point in the history
all relevant modules now support running in
forked mode and polling workers for information.
for example graphite module can poll stats from
all workers and then aggregate before sending,
or HTTP module can run on the process group leader
only and then poll workers for information.
  • Loading branch information
vavrusa committed Jul 6, 2016
1 parent cfe0bbd commit d02ce8e
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 120 deletions.
6 changes: 3 additions & 3 deletions modules/daf/daf.js
Expand Up @@ -271,20 +271,20 @@ $(function() {
}
/* Rule builder submit */
$('#daf-add').click(function () {
const form = $('#daf-builder-form');
const form = $('#daf-builder-form').parent();
if (dafBuilder.items.length == 0 || form.hasClass('has-error')) {
return;
}
/* Clear previous errors and resubmit. */
form.find('.alert').remove();
form.parent().find('.alert').remove();
$.post('daf', dafBuilder.items.join(' '))
.done(function (data) {
dafBuilder.clear();
loadRule(data, $('#daf-rules'));
})
.fail(function (data) {
const reason = data.responseText.length > 0 ? data.responseText : 'internal error';
form.append(
form.after(
'<div class="alert alert-danger" role="alert">'+
'Couldn\'t add rule (code: '+data.status+', reason: '+reason+').'+
'</div>'
Expand Down
88 changes: 61 additions & 27 deletions modules/daf/daf.lua
@@ -1,5 +1,3 @@
local cqueues = require('cqueues')

-- Load dependent modules
if not view then modules.load('view') end
if not policy then modules.load('policy') end
Expand Down Expand Up @@ -60,7 +58,8 @@ local filters = {
end,
}

local function parse_filter(tok, g)
local function parse_filter(tok, g, prev)
if not tok then error(string.format('expected filter after "%s"', prev)) end
local filter = filters[tok:lower()]
if not filter then error(string.format('invalid filter "%s"', tok)) end
return filter(g)
Expand All @@ -77,11 +76,11 @@ local function parse_rule(g)
-- or terminate filter chain and return
tok = g()
while tok do
if tok == 'AND' then
local fa, fb = f, parse_filter(g(), g)
if tok:lower() == 'and' then
local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) and fb(req, qry) end
elseif tok == 'OR' then
local fa, fb = f, parse_filter(g(), g)
elseif tok:lower() == 'or' then
local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) or fb(req, qry) end
else
break
Expand Down Expand Up @@ -131,7 +130,7 @@ local M = {

-- @function Cleanup module
function M.deinit()
if http then
if http and http.endpoints then
http.endpoints['/daf'] = nil
http.endpoints['/daf.js'] = nil
http.snippets['/daf'] = nil
Expand All @@ -140,6 +139,10 @@ end

-- @function Add rule
function M.add(rule)
-- Ignore duplicates
for _, r in ipairs(M.rules) do
if r.info == rule then return r end
end
local id, action, filter = compile(rule)
if not id then error(action) end
-- Combine filter and action into policy
Expand Down Expand Up @@ -202,6 +205,15 @@ function M.enable(id, val)
return M.toggle(id, true)
end

local function consensus(op, ...)
local ret = true
local results = map(string.format(op, ...))
for _, r in ipairs(results) do
ret = ret and r
end
return ret
end

-- @function Public-facing API
local function api(h, stream)
local m = h:get(':method')
Expand All @@ -227,7 +239,7 @@ local function api(h, stream)
local path = h:get(':path')
local id = tonumber(path:match '/([^/]*)$')
if id then
if M.del(id) then
if consensus('daf.del "%s"', id) then
return tojson(true)
end
return 404, '"No such rule"' -- Not found
Expand All @@ -237,8 +249,10 @@ local function api(h, stream)
elseif m == 'POST' then
local query = stream:get_body_as_string()
if query then
local ok, r, err = pcall(M.add, query)
if not ok then return 500, string.format('"%s"', r) end
local ok, r = pcall(M.add, query)
if not ok then return 500, string.format('"%s"', r:match('/([^/]+)$')) end
-- Dispatch to all other workers
consensus('daf.add "%s"', query)
return rule_info(r)
end
return 400
Expand All @@ -252,7 +266,7 @@ local function api(h, stream)
end
-- We do not support more actions
if action == 'active' then
if M.toggle(id, val == 'true') then
if consensus('daf.toggle(%d, %s)', id, val == 'true' or 'false') then
return tojson(true)
else
return 404, '"No such rule"'
Expand All @@ -263,23 +277,39 @@ local function api(h, stream)
end
end

local function getmatches()
local update = {}
for _, rules in ipairs(map 'daf.rules') do
for _, r in ipairs(rules) do
local id = tostring(r.rule.id)
-- Must have string keys for JSON object and not an array
update[id] = (update[id] or 0) + r.rule.count
end
end
return update
end

-- @function Publish DAF statistics
local function publish(h, ws)
local ok, counters = true, {}
local cqueues = require('cqueues')
local ok, last = true, nil
while ok do
-- Check if we have new rule matches
local update = {}
for _, r in ipairs(M.rules) do
local id = r.rule.id
if counters[id] ~= r.rule.count then
-- Must have string keys for JSON object and not an array
update[tostring(id)] = r.rule.count
counters[id] = r.rule.count
local diff = {}
local has_update, update = pcall(getmatches)
if has_update then
if last then
for id, count in pairs(update) do
if not last[id] or last[id] < count then
diff[id] = count
end
end
end
last = update
end
-- Update counters when there is a new data
if next(update) ~= nil then
ok = ws:send(tojson(update))
if next(diff) ~= nil then
ok = ws:send(tojson(diff))
else
ok = ws:send_ping()
end
Expand All @@ -289,7 +319,7 @@ end

-- @function Configure module
function M.config(conf)
if not http then error('"http" module is not loaded, cannot load DAF') end
if not http or not http.endpoints then return end
-- Export API and data publisher
http.endpoints['/daf.js'] = http.page('daf.js', 'daf')
http.endpoints['/daf'] = {'application/json', api, publish}
Expand All @@ -301,13 +331,17 @@ function M.config(conf)
<div class="col-md-11">
<input type="text" id="daf-builder" class="form-control" aria-label="..." />
</div>
<button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
<div class="col-md-1">
<button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
</div>
</form>
</div>
<div class="row">
<table id="daf-rules" class="table table-striped table-responsive">
<th><td>No rules here yet.</td></th>
</table>
<div class="col-md-12">
<table id="daf-rules" class="table table-striped table-responsive">
<th><td>No rules here yet.</td></th>
</table>
</div>
</div>
]]}
end
Expand Down
81 changes: 44 additions & 37 deletions modules/graphite/graphite.lua
@@ -1,5 +1,9 @@
--- @module graphite
local graphite = {}
-- Load dependent modules
if not stats then modules.load('stats') end

-- This is leader-only module
if worker.id > 0 then return {} end
local M = {}
local socket = require('socket')

-- Create connected UDP socket
Expand Down Expand Up @@ -38,80 +42,83 @@ local function make_tcp(host, port)
return s
end

local function merge(results)
local t = {}
for _, result in ipairs(results) do
for k, v in pairs(result) do
t[k] = (t[k] or 0) + v
end
end
return t
end

-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
for key,val in pairs(metrics) do
local msg = key..' '..val..' '..now..'\n'
if prefix then
msg = prefix..'.'..msg
end
for i in ipairs(graphite.cli) do
local ok, err = graphite.cli[i]:send(msg)
for i in ipairs(M.cli) do
local ok, err = M.cli[i]:send(msg)
if not ok then
-- Best-effort reconnect once per two tries
local tcp = graphite.cli[i]['connect'] ~= nil
local host = graphite.info[i]
if tcp and host.seen + 2 * graphite.interval / 1000 <= now then
local tcp = M.cli[i]['connect'] ~= nil
local host = M.info[i]
if tcp and host.seen + 2 * M.interval / 1000 <= now then
print(string.format('[graphite] reconnecting: %s#%d reason: %s',
host.addr, host.port, err))
graphite.cli[i] = make_tcp(host.addr, host.port)
M.cli[i] = make_tcp(host.addr, host.port)
host.seen = now
end
end
end
end
end

function graphite.init(module)
graphite.ev = nil
graphite.cli = {}
graphite.info = {}
graphite.interval = 5 * sec
graphite.prefix = 'kresd.' .. hostname()
function M.init(module)
M.ev = nil
M.cli = {}
M.info = {}
M.interval = 5 * sec
M.prefix = 'kresd.' .. hostname()
return 0
end

function graphite.deinit(module)
if graphite.ev then event.cancel(graphite.ev) end
function M.deinit(module)
if M.ev then event.cancel(M.ev) end
return 0
end

-- @function Publish results to the Graphite server(s)
function graphite.publish()
function M.publish()
local now = os.time()
-- Publish built-in statistics
if not graphite.cli then error("no graphite server configured") end
publish_table(cache.stats(), graphite.prefix..'.cache', now)
publish_table(worker.stats(), graphite.prefix..'.worker', now)
if not M.cli then error("no graphite server configured") end
publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now)
publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now)
-- Publish extended statistics if available
if not stats then
return 0
end
local now_metrics = stats.list()
if type(now_metrics) ~= 'table' then
return 0 -- No metrics to watch
end
publish_table(now_metrics, graphite.prefix, now)
publish_table(merge(map 'stats.list()'), M.prefix, now)
return 0
end

-- @function Make connection to Graphite server.
function graphite.add_server(graphite, host, port, tcp)
function M.add_server(graphite, host, port, tcp)
local s, err = tcp and make_tcp(host, port) or make_udp(host, port)
if not s then
error(err)
end
table.insert(graphite.cli, s)
table.insert(graphite.info, {addr = host, port = port, seen = 0})
table.insert(M.cli, s)
table.insert(M.info, {addr = host, port = port, seen = 0})
return 0
end

function graphite.config(conf)
function M.config(conf)
-- config defaults
if not conf then return 0 end
if not conf.port then conf.port = 2003 end
if conf.interval then graphite.interval = conf.interval end
if conf.prefix then graphite.prefix = conf.prefix end
if conf.interval then M.interval = conf.interval end
if conf.prefix then M.prefix = conf.prefix end
-- connect to host(s)
if type(conf.host) == 'table' then
for key, val in pairs(conf.host) do
Expand All @@ -121,9 +128,9 @@ function graphite.config(conf)
graphite:add_server(conf.host, conf.port, conf.tcp)
end
-- start publishing stats
if graphite.ev then event.cancel(graphite.ev) end
graphite.ev = event.recurrent(graphite.interval, graphite.publish)
if M.ev then event.cancel(M.ev) end
M.ev = event.recurrent(M.interval, M.publish)
return 0
end

return graphite
return M

0 comments on commit d02ce8e

Please sign in to comment.