Skip to content

Commit

Permalink
Aggregation must operate considering the namespace (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekho committed Jun 8, 2016
1 parent db43319 commit 0460009
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 27 deletions.
53 changes: 34 additions & 19 deletions nginx-metrix/scheduler.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local logger = require 'nginx-metrix.logger'
local dict = require 'nginx-metrix.storage.dict'
local namespaces = require 'nginx-metrix.storage.namespaces'

local collectors = {}

Expand All @@ -21,37 +22,51 @@ local setup_lock = function()
return (dict.add(lock_key, lock_key, lock_timeout))
end

local process = function(collectors_list, worker_id)
if length(collectors_list) == 0 then
local process = function(collectors_list, namespaces_list, worker_id)
local col_count = length(collectors_list)
local ns_count = length(namespaces_list)

if col_count == 0 then
logger.debug(("[scheduler #%s] collectors list is empty, skipping"):format(worker_id))
return
end

if ns_count == 0 then
logger.debug(("[scheduler #%s] namespaces list is empty, skipping"):format(worker_id))
return
end

if not setup_lock() then
logger.debug(("[scheduler #%s] lock still exists, skipping"):format(worker_id))
return
end

logger.debug(("[scheduler #%s] lock obtained, aggregating"):format(worker_id))
iter(collectors_list):map(
function(collector)
logger.debug(("[scheduler #%s] spawned thread for Collector<%s>"):format(worker_id, collector.name))
return collector, ngx.thread.spawn(function()
logger.debug(("[scheduler #%s] thread processing started for Collector<%s>"):format(worker_id, collector.name))

local col_ns_thread_iter = take(
col_count * ns_count,
tabulate(function(x)
local collector = collectors_list[operator.intdiv(x, ns_count) + 1]
local namespace = namespaces_list[operator.mod(x, ns_count) + 1]
local thread = ngx.thread.spawn(function()
logger.debug(("[scheduler #%s] thread processing started for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace))
namespaces.activate(namespace)
collector:aggregate()
logger.debug(("[scheduler #%s] thread processing finished for Collector<%s>"):format(worker_id, collector.name))
logger.debug(("[scheduler #%s] thread processing finished for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace))
end)
end
):each(
function(collector, thread)
local ok, res = ngx.thread.wait(thread)
if not ok then
logger.error(("[scheduler #%s] failed to run Collector<%s>:aggregate()"):format(worker_id, collector.name), res)
else
logger.debug(("[scheduler #%s] thread finished for Collector<%s>"):format(worker_id, collector.name))
end
end
logger.debug(("[scheduler #%s] spawned thread for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace))
return collector, namespace, thread
end)
)

col_ns_thread_iter:each(function(collector, namespace, thread)
local ok, res = ngx.thread.wait(thread)
if not ok then
logger.error(("[scheduler #%s] failed to run Collector<%s>:aggregate() on namespace '%s'"):format(worker_id, collector.name, namespace), res)
else
logger.debug(("[scheduler #%s] thread finished for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace))
end
end)
end

local handler
Expand All @@ -63,7 +78,7 @@ handler = function(premature, collectors_list, worker_id)

local ok, err

process(collectors_list, worker_id)
process(collectors_list, namespaces.list(), worker_id)

ok, err = ngx.timer.at(delay, handler, collectors_list, worker_id)
if not ok then
Expand Down
39 changes: 31 additions & 8 deletions tests/scheduler_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require('tests.bootstrap')(assert)
describe('scheduler', function()
local scheduler
local dict_mock
local namespaces
local logger
local match

Expand All @@ -11,6 +12,7 @@ describe('scheduler', function()
match = require 'luassert.match'

dict_mock = require 'nginx-metrix.storage.dict'
namespaces = require 'nginx-metrix.storage.namespaces'

_G.ngx = {
timer = {
Expand All @@ -26,6 +28,8 @@ describe('scheduler', function()

teardown(function()
mock.revert(dict_mock)
mock.revert(namespaces)
package.loaded['nginx-metrix.storage.namespaces'] = nil
package.loaded['nginx-metrix.storage.dict'] = nil
package.loaded['nginx-metrix.logger'] = nil
_G.ngx = nil
Expand All @@ -35,6 +39,7 @@ describe('scheduler', function()
mock(_G.ngx)

mock(dict_mock, true)
mock(namespaces, true)
package.loaded['nginx-metrix.storage.dict'] = dict_mock

scheduler = require 'nginx-metrix.scheduler'
Expand Down Expand Up @@ -151,11 +156,12 @@ describe('scheduler', function()
local process_stub = mock({process = function() end}).process
scheduler.__private__._process(process_stub)

namespaces.list.on_call_with().returns({})
stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), {}, worker_id).returns(false, 'test error')

scheduler.__private__.handler(false, {}, worker_id)

assert.spy(process_stub).was.called_with({}, worker_id)
assert.spy(process_stub).was.called_with({}, {}, worker_id)
assert.spy(process_stub).was.called(1)

assert.spy(logger.error).was.called_with('[scheduler #' .. worker_id .. '] Failed to continue the scheduler - failed to create the timer', 'test error')
Expand All @@ -165,19 +171,29 @@ describe('scheduler', function()
it('process without collectors', function()
local worker_id = 13

scheduler.__private__.process({}, worker_id)
scheduler.__private__.process({}, {}, worker_id)

assert.spy(dict_mock.add).was_not.called()
assert.spy(logger.error).was_not.called()
assert.spy(logger.debug).was.called_with('[scheduler #' .. worker_id .. '] collectors list is empty, skipping')
end)

it('process without namespaces', function()
local worker_id = 13

scheduler.__private__.process({[[collector]]}, {}, worker_id)

assert.spy(dict_mock.add).was_not.called()
assert.spy(logger.error).was_not.called()
assert.spy(logger.debug).was.called_with('[scheduler #' .. worker_id .. '] namespaces list is empty, skipping')
end)

it('process skips if can not setup lock', function()
local worker_id = 13

dict_mock.add.on_call_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout()).returns(false, 'exists', false)

scheduler.__private__.process({[[collector]]}, worker_id)
scheduler.__private__.process({[[collector]]}, {'example.com'}, worker_id)

assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout())
assert.spy(dict_mock.add).was.called(1)
Expand All @@ -196,17 +212,19 @@ describe('scheduler', function()

stub.new(_G.ngx.thread, 'spawn').on_call_with(match.is_function()).returns(thread)
stub.new(_G.ngx.thread, 'wait').on_call_with(thread).returns(false, 'test error')
stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), { test_collector }, worker_id).returns(true, nil)
stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), { test_collector }, {'example.com'} , worker_id).returns(true, nil)

scheduler.__private__.process({ test_collector }, worker_id)
-- logger.error.on_call_with(match._, match._).invokes(function(...) print(require 'inspect'({...})) end)

scheduler.__private__.process({ test_collector }, {'example.com'}, worker_id)

assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout())
assert.spy(dict_mock.add).was.called(1)

assert.spy(_G.ngx.thread.spawn).was.called_with(match.is_function())
assert.spy(_G.ngx.thread.wait).was.called_with(thread)

assert.spy(logger.error).was.called_with('[scheduler #' .. worker_id .. '] failed to run Collector<test>:aggregate()', 'test error')
assert.spy(logger.error).was.called_with("[scheduler #" .. worker_id .. "] failed to run Collector<test>:aggregate() on namespace 'example.com'", 'test error')
assert.spy(logger.error).was.called(1)
end)

Expand All @@ -224,14 +242,19 @@ describe('scheduler', function()

logger.error.on_call_with(match._, match._).invokes(function(...) print(require 'inspect'({...})) end)

scheduler.__private__.process({ test_collector }, worker_id)
scheduler.__private__.process({ test_collector }, {'first.com', 'second.org'}, worker_id)

assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout())
assert.spy(dict_mock.add).was.called(1)

assert.spy(_G.ngx.thread.spawn).was.called_with(match.is_function())
assert.spy(_G.ngx.thread.wait).was.called_with(thread)
assert.spy(test_collector.aggregate).was.called(1)
assert.spy(test_collector.aggregate).was.called(2)

assert.spy(namespaces.list).was.called(1)
assert.spy(namespaces.activate).was.called_with('first.com')
assert.spy(namespaces.activate).was.called_with('second.org')
assert.spy(namespaces.activate).was.called(2)

assert.spy(logger.error).was_not.called()
end)
Expand Down

0 comments on commit 0460009

Please sign in to comment.