diff --git a/nginx-metrix/scheduler.lua b/nginx-metrix/scheduler.lua index 48f7f61..ecf77fc 100644 --- a/nginx-metrix/scheduler.lua +++ b/nginx-metrix/scheduler.lua @@ -1,5 +1,6 @@ local logger = require 'nginx-metrix.logger' local dict = require 'nginx-metrix.storage.dict' +local namespaces = require 'nginx-metrix.storage.namespaces' local collectors = {} @@ -21,37 +22,51 @@ local setup_lock = function() return (dict.add(lock_key, lock_key, lock_timeout)) end -local process = function(collectors_list, worker_id) - if length(collectors_list) == 0 then +local process = function(collectors_list, namespaces_list, worker_id) + local col_count = length(collectors_list) + local ns_count = length(namespaces_list) + + if col_count == 0 then logger.debug(("[scheduler #%s] collectors list is empty, skipping"):format(worker_id)) return end + if ns_count == 0 then + logger.debug(("[scheduler #%s] namespaces list is empty, skipping"):format(worker_id)) + return + end + if not setup_lock() then logger.debug(("[scheduler #%s] lock still exists, skipping"):format(worker_id)) return end logger.debug(("[scheduler #%s] lock obtained, aggregating"):format(worker_id)) - iter(collectors_list):map( - function(collector) - logger.debug(("[scheduler #%s] spawned thread for Collector<%s>"):format(worker_id, collector.name)) - return collector, ngx.thread.spawn(function() - logger.debug(("[scheduler #%s] thread processing started for Collector<%s>"):format(worker_id, collector.name)) + + local col_ns_thread_iter = take( + col_count * ns_count, + tabulate(function(x) + local collector = collectors_list[operator.intdiv(x, ns_count) + 1] + local namespace = namespaces_list[operator.mod(x, ns_count) + 1] + local thread = ngx.thread.spawn(function() + logger.debug(("[scheduler #%s] thread processing started for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace)) + namespaces.activate(namespace) collector:aggregate() - logger.debug(("[scheduler #%s] thread processing finished for Collector<%s>"):format(worker_id, collector.name)) + logger.debug(("[scheduler #%s] thread processing finished for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace)) end) - end - ):each( - function(collector, thread) - local ok, res = ngx.thread.wait(thread) - if not ok then - logger.error(("[scheduler #%s] failed to run Collector<%s>:aggregate()"):format(worker_id, collector.name), res) - else - logger.debug(("[scheduler #%s] thread finished for Collector<%s>"):format(worker_id, collector.name)) - end - end + logger.debug(("[scheduler #%s] spawned thread for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace)) + return collector, namespace, thread + end) ) + + col_ns_thread_iter:each(function(collector, namespace, thread) + local ok, res = ngx.thread.wait(thread) + if not ok then + logger.error(("[scheduler #%s] failed to run Collector<%s>:aggregate() on namespace '%s'"):format(worker_id, collector.name, namespace), res) + else + logger.debug(("[scheduler #%s] thread finished for Collector<%s> on namespace '%s'"):format(worker_id, collector.name, namespace)) + end + end) end local handler @@ -63,7 +78,7 @@ handler = function(premature, collectors_list, worker_id) local ok, err - process(collectors_list, worker_id) + process(collectors_list, namespaces.list(), worker_id) ok, err = ngx.timer.at(delay, handler, collectors_list, worker_id) if not ok then diff --git a/tests/scheduler_spec.lua b/tests/scheduler_spec.lua index 1a863cc..7a93c58 100644 --- a/tests/scheduler_spec.lua +++ b/tests/scheduler_spec.lua @@ -3,6 +3,7 @@ require('tests.bootstrap')(assert) describe('scheduler', function() local scheduler local dict_mock + local namespaces local logger local match @@ -11,6 +12,7 @@ describe('scheduler', function() match = require 'luassert.match' dict_mock = require 'nginx-metrix.storage.dict' + namespaces = require 'nginx-metrix.storage.namespaces' _G.ngx = { timer = { @@ -26,6 +28,8 @@ describe('scheduler', function() teardown(function() mock.revert(dict_mock) + mock.revert(namespaces) + package.loaded['nginx-metrix.storage.namespaces'] = nil package.loaded['nginx-metrix.storage.dict'] = nil package.loaded['nginx-metrix.logger'] = nil _G.ngx = nil @@ -35,6 +39,7 @@ describe('scheduler', function() mock(_G.ngx) mock(dict_mock, true) + mock(namespaces, true) package.loaded['nginx-metrix.storage.dict'] = dict_mock scheduler = require 'nginx-metrix.scheduler' @@ -151,11 +156,12 @@ describe('scheduler', function() local process_stub = mock({process = function() end}).process scheduler.__private__._process(process_stub) + namespaces.list.on_call_with().returns({}) stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), {}, worker_id).returns(false, 'test error') scheduler.__private__.handler(false, {}, worker_id) - assert.spy(process_stub).was.called_with({}, worker_id) + assert.spy(process_stub).was.called_with({}, {}, worker_id) assert.spy(process_stub).was.called(1) assert.spy(logger.error).was.called_with('[scheduler #' .. worker_id .. '] Failed to continue the scheduler - failed to create the timer', 'test error') @@ -165,19 +171,29 @@ describe('scheduler', function() it('process without collectors', function() local worker_id = 13 - scheduler.__private__.process({}, worker_id) + scheduler.__private__.process({}, {}, worker_id) assert.spy(dict_mock.add).was_not.called() assert.spy(logger.error).was_not.called() assert.spy(logger.debug).was.called_with('[scheduler #' .. worker_id .. '] collectors list is empty, skipping') end) + it('process without namespaces', function() + local worker_id = 13 + + scheduler.__private__.process({[[collector]]}, {}, worker_id) + + assert.spy(dict_mock.add).was_not.called() + assert.spy(logger.error).was_not.called() + assert.spy(logger.debug).was.called_with('[scheduler #' .. worker_id .. '] namespaces list is empty, skipping') + end) + it('process skips if can not setup lock', function() local worker_id = 13 dict_mock.add.on_call_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout()).returns(false, 'exists', false) - scheduler.__private__.process({[[collector]]}, worker_id) + scheduler.__private__.process({[[collector]]}, {'example.com'}, worker_id) assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout()) assert.spy(dict_mock.add).was.called(1) @@ -196,9 +212,11 @@ describe('scheduler', function() stub.new(_G.ngx.thread, 'spawn').on_call_with(match.is_function()).returns(thread) stub.new(_G.ngx.thread, 'wait').on_call_with(thread).returns(false, 'test error') - stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), { test_collector }, worker_id).returns(true, nil) + stub.new(_G.ngx.timer, 'at').on_call_with(1, match.is_function(), { test_collector }, {'example.com'} , worker_id).returns(true, nil) - scheduler.__private__.process({ test_collector }, worker_id) +-- logger.error.on_call_with(match._, match._).invokes(function(...) print(require 'inspect'({...})) end) + + scheduler.__private__.process({ test_collector }, {'example.com'}, worker_id) assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout()) assert.spy(dict_mock.add).was.called(1) @@ -206,7 +224,7 @@ describe('scheduler', function() assert.spy(_G.ngx.thread.spawn).was.called_with(match.is_function()) assert.spy(_G.ngx.thread.wait).was.called_with(thread) - assert.spy(logger.error).was.called_with('[scheduler #' .. worker_id .. '] failed to run Collector:aggregate()', 'test error') + assert.spy(logger.error).was.called_with("[scheduler #" .. worker_id .. "] failed to run Collector:aggregate() on namespace 'example.com'", 'test error') assert.spy(logger.error).was.called(1) end) @@ -224,14 +242,19 @@ describe('scheduler', function() logger.error.on_call_with(match._, match._).invokes(function(...) print(require 'inspect'({...})) end) - scheduler.__private__.process({ test_collector }, worker_id) + scheduler.__private__.process({ test_collector }, {'first.com', 'second.org'}, worker_id) assert.spy(dict_mock.add).was.called_with(scheduler.__private__.lock_key(), scheduler.__private__.lock_key(), scheduler.__private__.lock_timeout()) assert.spy(dict_mock.add).was.called(1) assert.spy(_G.ngx.thread.spawn).was.called_with(match.is_function()) assert.spy(_G.ngx.thread.wait).was.called_with(thread) - assert.spy(test_collector.aggregate).was.called(1) + assert.spy(test_collector.aggregate).was.called(2) + + assert.spy(namespaces.list).was.called(1) + assert.spy(namespaces.activate).was.called_with('first.com') + assert.spy(namespaces.activate).was.called_with('second.org') + assert.spy(namespaces.activate).was.called(2) assert.spy(logger.error).was_not.called() end)