Skip to content

Commit

Permalink
only one scheduler per time (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekho committed Jun 3, 2016
1 parent 8107113 commit bae2be1
Show file tree
Hide file tree
Showing 15 changed files with 1,265 additions and 1,042 deletions.
28 changes: 14 additions & 14 deletions .busted
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
return {
_all = {
coverage = true,
ROOT = {"tests"},
['sort-files'] = true,
},
default = {
verbose = true,
output = 'utfTerminal'
},
ci = {
verbose = true,
output = 'junit'
},
}
_all = {
coverage = true,
ROOT = { "tests" },
['sort-files'] = true,
},
default = {
verbose = true,
output = 'utfTerminal'
},
ci = {
verbose = true,
output = 'junit'
},
}
19 changes: 19 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# EditorConfig is awesome: http://EditorConfig.org

# top-most EditorConfig file
root = true

# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true

[{*.lua,.luacheckrc,.luacov,.busted}]
charset = utf-8
indent_style = space
indent_size = 2

[{.travis.yml,.appveyor.yml}]
indent_style = space
indent_size = 2
9 changes: 5 additions & 4 deletions .luacheckrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ stds.is = require 'nginx-metrix.lib.is'

std = 'max+luajit+fun+ngx_lua'

files['tests'] = {std = "+busted"}
files['tests'] = { std = "+busted" }

new_read_globals = {
'__TEST__',
'is',
}
'__TEST__',
'is',
'copy',
}
46 changes: 23 additions & 23 deletions .luacov
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
return {
include = {
'^[.]/nginx[-]metrix/'
},
modules = {
['nginx-metrix.main'] = 'nginx-metrix/main.lua',
include = {
'^[.]/nginx[-]metrix/'
},
modules = {
['nginx-metrix.main'] = 'nginx-metrix/main.lua',

['nginx-metrix.collectors'] = 'nginx-metrix/collectors.lua',
['nginx-metrix.listener'] = 'nginx-metrix/listener.lua',
['nginx-metrix.logger'] = 'nginx-metrix/logger.lua',
['nginx-metrix.scheduler'] = 'nginx-metrix/scheduler.lua',
['nginx-metrix.version'] = 'nginx-metrix/version.lua',
['nginx-metrix.collectors'] = 'nginx-metrix/collectors.lua',
['nginx-metrix.listener'] = 'nginx-metrix/listener.lua',
['nginx-metrix.logger'] = 'nginx-metrix/logger.lua',
['nginx-metrix.scheduler'] = 'nginx-metrix/scheduler.lua',
['nginx-metrix.version'] = 'nginx-metrix/version.lua',

['nginx-metrix.lib.is'] = 'nginx-metrix/lib/is.lua',
['nginx-metrix.lib.json'] = 'nginx-metrix/lib/json.lua',
['nginx-metrix.lib.is'] = 'nginx-metrix/lib/is.lua',
['nginx-metrix.lib.json'] = 'nginx-metrix/lib/json.lua',

['nginx-metrix.collectors.request'] = 'nginx-metrix/collectors/request.lua',
['nginx-metrix.collectors.status'] = 'nginx-metrix/collectors/status.lua',
['nginx-metrix.collectors.upstream'] = 'nginx-metrix/collectors/upstream.lua',
['nginx-metrix.collectors.request'] = 'nginx-metrix/collectors/request.lua',
['nginx-metrix.collectors.status'] = 'nginx-metrix/collectors/status.lua',
['nginx-metrix.collectors.upstream'] = 'nginx-metrix/collectors/upstream.lua',

['nginx-metrix.output.helper'] = 'nginx-metrix/output/helper.lua',
['nginx-metrix.output.renderer'] = 'nginx-metrix/output/renderer.lua',
['nginx-metrix.output.helper'] = 'nginx-metrix/output/helper.lua',
['nginx-metrix.output.renderer'] = 'nginx-metrix/output/renderer.lua',

['nginx-metrix.storage.collector_wrapper_factory'] = 'nginx-metrix/storage/collector_wrapper_factory.lua',
['nginx-metrix.storage.dict'] = 'nginx-metrix/storage/dict.lua',
['nginx-metrix.storage.namespaces'] = 'nginx-metrix/storage/namespaces.lua',
['nginx-metrix.storage.serializer'] = 'nginx-metrix/storage/serializer.lua',
},
}
['nginx-metrix.storage.collector_wrapper_factory'] = 'nginx-metrix/storage/collector_wrapper_factory.lua',
['nginx-metrix.storage.dict'] = 'nginx-metrix/storage/dict.lua',
['nginx-metrix.storage.namespaces'] = 'nginx-metrix/storage/namespaces.lua',
['nginx-metrix.storage.serializer'] = 'nginx-metrix/storage/serializer.lua',
},
}
4 changes: 2 additions & 2 deletions nginx-metrix/collectors.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ local collector_extend = function(collector)
self:on_phase(phase)
end,

periodically = function(self)
aggregate = function(self)
iter(self.fields):each(function(field, params)
if params.mean then
self.storage:mean_flush(field)
Expand Down Expand Up @@ -145,4 +145,4 @@ if __TEST__ then
}
end

return exports
return exports
104 changes: 68 additions & 36 deletions nginx-metrix/scheduler.lua
Original file line number Diff line number Diff line change
@@ -1,54 +1,81 @@
local logger = require 'nginx-metrix.logger'
local dict = require 'nginx-metrix.storage.dict'

local collectors = {}

---
-- @param collector table
--
local attach_collector = function(collector)
if is.callable(collector.periodically) then
table.insert(collectors, collector)
end
if is.callable(collector.aggregate) then
table.insert(collectors, collector)
end
end

local lock_key = '--aggregator-lock--'
local lock_timeout = 2

local get_lock = function()
local lock_id = ngx.now()
local success, _, _ = dict.add(lock_key, lock_id, lock_timeout)
if success then
return true, lock_id
end
return false, nil
end

local drop_lock = function(lock_id)
if lock_id == dict.get(lock_key) then
dict.delete(lock_key)
return true
end
return false
end

local delay = 1

local handler
handler = function (premature, collectors_list)
if premature then
return
end
handler = function(premature, collectors_list)
if premature then
return
end

if length(collectors_list) > 0 then
iter(collectors_list):map(
function(collector)
return collector, ngx.thread.spawn(function() collector:periodically() end)
end
):each(
function(collector, thread)
local ok, res = ngx.thread.wait(thread)
if not ok then
logger.error(("failed to run Collector<%s>:periodically(): "):format(collector.name), res)
end
end
)
end
if length(collectors_list) > 0 then
local lock_success, lock_id = get_lock()
if lock_success then
iter(collectors_list):map(
function(collector)
return collector, ngx.thread.spawn(function() collector:aggregate() end)
end
):each(
function(collector, thread)
local ok, res = ngx.thread.wait(thread)
if not ok then
logger.error(("failed to run Collector<%s>:aggregate(): "):format(collector.name), res)
end
end
)

local ok, err = ngx.timer.at(delay, handler, collectors_list)
if not ok then
logger.error("Failed to continue the scheduler - failed to create the timer: ", err)
return
drop_lock(lock_id)
end
end

local ok, err = ngx.timer.at(delay, handler, collectors_list)
if not ok then
logger.error("Failed to continue the scheduler - failed to create the timer: ", err)
return
end
end

---
-- @return bool
local start = function()
local ok, err = ngx.timer.at(delay, handler, collectors)
if not ok then
logger.error("Failed to start the scheduler - failed to create the timer: ", err)
return false
end
return true
local ok, err = ngx.timer.at(delay, handler, collectors)
if not ok then
logger.error("Failed to start the scheduler - failed to create the timer: ", err)
return false
end
return true
end

--------------------------------------------------------------------------------
Expand All @@ -59,10 +86,15 @@ exports.attach_collector = attach_collector
exports.start = start

if __TEST__ then
exports.__private__ = {
get_collectors = function() return collectors end,
handler = handler,
}
exports.__private__ = {
get_collectors = function() return collectors end,
lock_key = function(value) if value == nil then return lock_key else lock_key = value end end,
lock_timeout = function(value) if value == nil then return lock_timeout else lock_timeout = value end end,

handler = handler,
get_lock = get_lock,
drop_lock = drop_lock,
}
end

return exports
return exports
Loading

0 comments on commit bae2be1

Please sign in to comment.