Skip to content

Commit

Permalink
window aggregation (#42)
Browse files Browse the repository at this point in the history
* use ngx.worker.id instead of shared dict (#38)

* fixed readme and updated tests

* Window implementation

* use window for cyclic metrics
  • Loading branch information
ekho committed Jul 12, 2016
1 parent c56cbb5 commit 68a6ba9
Show file tree
Hide file tree
Showing 15 changed files with 1,059 additions and 338 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ lua_shared_dict metrix 16m; # creating metrix storage in nginx shared memory. Us
init_by_lua_block {
metrix = require 'nginx-metrix.main'({
shared_dict = 'metrix',
vhosts = {'mydomain1', 'mydomain2', ...}
vhosts = {'mydomain1', 'mydomain2', ...},
window_size = 10,
})
}
Expand Down
194 changes: 95 additions & 99 deletions nginx-metrix/collectors.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ local storage_collector_wrapper_factory = require 'nginx-metrix.storage.collecto

local exports = {}

local collectors = {{name = 'dummy'}} -- dummy item for correct work iter function
local collectors = { { name = 'dummy' } } -- dummy item for correct work iter function
local collectors_iter = iter(collectors)
table.remove(collectors, 1) -- removing dummy item

Expand All @@ -12,137 +12,133 @@ table.remove(collectors, 1) -- removing dummy item
-- @return bool
---
local collector_exists = function(collector)
return index(collector.name, collectors_iter:map(function(c) return c.name end)) ~= nil
return index(collector.name, collectors_iter:map(function(c) return c.name end)) ~= nil
end

---
-- @param collector table
---
local collector_validate = function(collector)
assert(
type(collector) == 'table',
('Collector MUST be a table, got %s: %s'):format(type(collector), inspect(collector))
)

assert(
type(collector.name) == 'string',
('Collector must have string property "name", got %s: %s'):format(type(collector.name), inspect(collector))
)

-- collector exists
assert(
not collector_exists(collector),
('Collector<%s> already exists'):format(collector.name)
)

assert(
type(collector.ngx_phases) == 'table',
('Collector<%s>.ngx_phases must be an array, given: %s'):format(collector.name, type(collector.ngx_phases))
)

assert(
all(function(phase) return type(phase) == 'string' end, collector.ngx_phases),
('Collector<%s>.ngx_phases must be an array of strings, given: %s'):format(collector.name, inspect(collector.ngx_phases))
)

assert(
is.callable(collector.on_phase),
('Collector<%s>:on_phase must be a function or callable table, given: %s'):format(collector.name, type(collector.on_phase))
)

assert(
type(collector.fields) == 'table',
('Collector<%s>.fields must be a table, given: %s'):format(collector.name, type(collector.fields))
)

assert(
all(function(field, params) return type(field) == 'string' and type(params) == 'table' end, collector.fields),
('Collector<%s>.fields must be an table[string, table], given: %s'):format(collector.name, inspect(collector.fields))
)
assert(
type(collector) == 'table',
('Collector MUST be a table, got %s: %s'):format(type(collector), inspect(collector))
)

assert(
type(collector.name) == 'string',
('Collector must have string property "name", got %s: %s'):format(type(collector.name), inspect(collector))
)

-- collector exists
assert(
not collector_exists(collector),
('Collector<%s> already exists'):format(collector.name)
)

assert(
type(collector.ngx_phases) == 'table',
('Collector<%s>.ngx_phases must be an array, given: %s'):format(collector.name, type(collector.ngx_phases))
)

assert(
all(function(phase) return type(phase) == 'string' end, collector.ngx_phases),
('Collector<%s>.ngx_phases must be an array of strings, given: %s'):format(collector.name, inspect(collector.ngx_phases))
)

assert(
is.callable(collector.on_phase),
('Collector<%s>:on_phase must be a function or callable table, given: %s'):format(collector.name, type(collector.on_phase))
)

assert(
type(collector.fields) == 'table',
('Collector<%s>.fields must be a table, given: %s'):format(collector.name, type(collector.fields))
)

assert(
all(function(field, params) return type(field) == 'string' and type(params) == 'table' end, collector.fields),
('Collector<%s>.fields must be an table[string, table], given: %s'):format(collector.name, inspect(collector.fields))
)
end

---
-- @param collector table
-- @return table
---
local collector_extend = function(collector)
local _metatable = {
init = function(self, storage)
self.storage = storage
end,

handle_ngx_phase = function(self, phase)
self:on_phase(phase)
end,

aggregate = function(self)
iter(self.fields):each(function(field, params)
if params.mean then
self.storage:mean_flush(field)
elseif params.cyclic then
self.storage:cyclic_flush(field)
end
end)
end,

get_raw_stats = function(self)
return iter(self.fields):map(function(k, _)
return k, (self.storage:get(k) or 0)
end)
end,

get_text_stats = function(self, output_helper)
return output_helper.render_stats(self)
end,

get_html_stats = function(self, output_helper)
return output_helper.render_stats(self)
local _metatable = {
init = function(self, storage)
self.storage = storage
end,
handle_ngx_phase = function(self, phase)
self:on_phase(phase)
end,
aggregate = function(self)
iter(self.fields):each(function(field, params)
if params.mean then
self.storage:mean_flush(field)
elseif params.cyclic then
self.storage:cyclic_flush(field, params.window)
end
}
_metatable.__index = _metatable

setmetatable(collector, _metatable)

return collector
end)
end,
get_raw_stats = function(self)
return iter(self.fields):map(function(k, _)
return k, (self.storage:get(k) or 0)
end)
end,
get_text_stats = function(self, output_helper)
return output_helper.render_stats(self)
end,
get_html_stats = function(self, output_helper)
return output_helper.render_stats(self)
end
}
_metatable.__index = _metatable

setmetatable(collector, _metatable)

return collector
end

---
-- @param collector table
-- @return table
---
local collector_register = function(collector)
collector_validate(collector)
collector_validate(collector)

collector = collector_extend(collector)
collector = collector_extend(collector)

local storage = storage_collector_wrapper_factory.create(collector)
collector:init(storage)
local storage = storage_collector_wrapper_factory.create(collector)
collector:init(storage)

table.insert(collectors, collector)
table.insert(collectors, collector)

return collector
return collector
end

--------------------------------------------------------------------------------
-- EXPORTS
--------------------------------------------------------------------------------
exports.register = collector_register
exports.all = collectors_iter
exports.all = collectors_iter
exports.set_window_size = storage_collector_wrapper_factory.set_window_size

if __TEST__ then
exports.__private__ = {
collectors = function(value)
if value ~= nil then
local count = length(collectors)
while count > 0 do table.remove(collectors); count = count - 1 end
iter(value):each(function(collector) table.insert(collectors, collector) end)
end
return collectors
end,
collector_exists = collector_exists,
collector_extend = collector_extend,
collector_validate = collector_validate,
}
exports.__private__ = {
collectors = function(value)
if value ~= nil then
local count = length(collectors)
while count > 0 do table.remove(collectors); count = count - 1 end
iter(value):each(function(collector) table.insert(collectors, collector) end)
end
return collectors
end,
collector_exists = collector_exists,
collector_extend = collector_extend,
collector_validate = collector_validate,
}
end

return exports
32 changes: 16 additions & 16 deletions nginx-metrix/collectors/request.lua
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
local collector = {
name = 'request',
ngx_phases = { [[log]] },
fields = {
rps = { format = '%d', cyclic = true, },
internal_rps = { format = '%d', cyclic = true, },
https_rps = { format = '%d', cyclic = true, },
time_ps = { format = '%0.3f', mean = true, },
length_ps = { format = '%0.3f', mean = true, },
}
name = 'request',
ngx_phases = { [[log]] },
fields = {
rps = { format = '%d', cyclic = true, window = true, },
internal_rps = { format = '%d', cyclic = true, window = true, },
https_rps = { format = '%d', cyclic = true, window = true, },
time_ps = { format = '%0.3f', mean = true, },
length_ps = { format = '%0.3f', mean = true, },
}
}

function collector:on_phase(phase)
if phase == 'log' then
self.storage:cyclic_incr('rps')
if tonumber(ngx.var.request_time) ~= nil then self.storage:mean_add('time_ps', ngx.var.request_time) end
if ngx.req.is_internal() then self.storage:cyclic_incr('internal_rps') end
if ngx.var.https == 'on' then self.storage:cyclic_incr('https_rps') end
if tonumber(ngx.var.request_length) ~= nil then self.storage:mean_add('length_ps', ngx.var.request_length) end
end
if phase == 'log' then
self.storage:cyclic_incr('rps')
if tonumber(ngx.var.request_time) ~= nil then self.storage:mean_add('time_ps', ngx.var.request_time) end
if ngx.req.is_internal() then self.storage:cyclic_incr('internal_rps') end
if ngx.var.https == 'on' then self.storage:cyclic_incr('https_rps') end
if tonumber(ngx.var.request_length) ~= nil then self.storage:mean_add('length_ps', ngx.var.request_length) end
end
end

return collector
46 changes: 23 additions & 23 deletions nginx-metrix/collectors/status.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@
-- 426, 428, 429, 431, 451, 499,
-- 500, 501, 502, 503, 504, 505, 506, 507, 508, 510, 511, 599,

local field_params = { format = '%d', cyclic = true, }
local field_params = { format = '%d', cyclic = true, window = true, }

local collector = {
name = 'status',
fields = {
['200'] = field_params,
['301'] = field_params,
['302'] = field_params,
['304'] = field_params,
['403'] = field_params,
['404'] = field_params,
['500'] = field_params,
['502'] = field_params,
['503'] = field_params,
['504'] = field_params,
},
ngx_phases = {[[log]]},
on_phase = function(self, phase)
if phase == 'log' and ngx.status ~= nil then
if self.fields[tostring(ngx.status)] == nil then
self.fields[tostring(ngx.status)] = field_params
end
self.storage:cyclic_incr(ngx.status)
end
end,
name = 'status',
fields = {
['200'] = field_params,
['301'] = field_params,
['302'] = field_params,
['304'] = field_params,
['403'] = field_params,
['404'] = field_params,
['500'] = field_params,
['502'] = field_params,
['503'] = field_params,
['504'] = field_params,
},
ngx_phases = { [[log]] },
on_phase = function(self, phase)
if phase == 'log' and ngx.status ~= nil then
if self.fields[tostring(ngx.status)] == nil then
self.fields[tostring(ngx.status)] = field_params
end
self.storage:cyclic_incr(ngx.status)
end
end,
}

return collector
28 changes: 14 additions & 14 deletions nginx-metrix/collectors/upstream.lua
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
local collector = {
name = 'upstream',
ngx_phases = { [[log]] },
fields = {
rps = { format = '%d', cyclic = true, },
connect_time = { format = '%0.3f', mean = true, },
header_time = { format = '%0.3f', mean = true, },
response_time = { format = '%0.3f', mean = true, },
}
name = 'upstream',
ngx_phases = { [[log]] },
fields = {
rps = { format = '%d', cyclic = true, window = true, },
connect_time = { format = '%0.3f', mean = true, },
header_time = { format = '%0.3f', mean = true, },
response_time = { format = '%0.3f', mean = true, },
}
}

function collector:on_phase(phase)
if phase == 'log' and ngx.var.upstream_addr ~= nil then
self.storage:cyclic_incr('rps')
if tonumber(ngx.var.upstream_connect_time) ~= nil then self.storage:mean_add('connect_time', ngx.var.upstream_connect_time) end
if tonumber(ngx.var.upstream_header_time) ~= nil then self.storage:mean_add('header_time', ngx.var.upstream_header_time) end
if tonumber(ngx.var.upstream_response_time) ~= nil then self.storage:mean_add('response_time', ngx.var.upstream_response_time) end
end
if phase == 'log' and ngx.var.upstream_addr ~= nil then
self.storage:cyclic_incr('rps')
if tonumber(ngx.var.upstream_connect_time) ~= nil then self.storage:mean_add('connect_time', ngx.var.upstream_connect_time) end
if tonumber(ngx.var.upstream_header_time) ~= nil then self.storage:mean_add('header_time', ngx.var.upstream_header_time) end
if tonumber(ngx.var.upstream_response_time) ~= nil then self.storage:mean_add('response_time', ngx.var.upstream_response_time) end
end
end

return collector
4 changes: 3 additions & 1 deletion nginx-metrix/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ setmetatable(exports, {
namespaces.init({namespaces=options.vhosts})
end

collectors.set_window_size(options.window_size)

if not options.skip_register_builtin_collectors then
self:register_builtin_collectors()
end
Expand All @@ -115,4 +117,4 @@ setmetatable(exports, {
end,
})

return exports
return exports

0 comments on commit 68a6ba9

Please sign in to comment.