Skip to content

Commit

Permalink
Backend notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
mohierf committed Apr 23, 2018
1 parent b0da56d commit 76ca24c
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 57 deletions.
38 changes: 18 additions & 20 deletions alignak/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,8 @@ def request_stop(self, message='', exit_code=0):
if message:
logger.info(message)

_ts = time.time()
self.unlink()
self.do_stop()
statsmgr.timer('stop-delay', time.time() - _ts)

logger.info("Stopped %s.", self.name)
sys.exit(exit_code)
Expand Down Expand Up @@ -926,7 +924,6 @@ def daemon_connection_init(self, s_link, set_wait_new_conf=False):

# Get the connection running identifier - first client / server communication
logger.debug("[%s] Getting running identifier for '%s'", self.name, s_link.name)
_t0 = time.time()
# Assume the daemon should be alive and reachable
# because we are initializing the connection...
s_link.alive = True
Expand All @@ -940,7 +937,6 @@ def daemon_connection_init(self, s_link, set_wait_new_conf=False):
s_link.wait_new_conf()
break
time.sleep(0.3)
statsmgr.timer('con-initialization.%s' % s_link.name, time.time() - _t0)

return got_a_running_id

Expand Down Expand Up @@ -1034,7 +1030,7 @@ def do_main_loop(self): # pylint: disable=too-many-branches, too-many-statement
# Daemon load
# TODO this measurement needs to be made reliable (check and adapt if needed)
self.load_1_min.update_load(self.maximum_loop_duration - elapsed_time)
statsmgr.gauge('load_1_min', self.load_1_min.get_load())
statsmgr.gauge('load.1_min', self.load_1_min.get_load())
if self.log_loop:
logger.debug("+++ %d, load: %s", self.loop_count, self.load_1_min.load)

Expand All @@ -1043,29 +1039,29 @@ def do_main_loop(self): # pylint: disable=too-many-branches, too-many-statement
my_process = psutil.Process()
with my_process.oneshot():
perfdatas.append("num_threads=%d" % my_process.num_threads())
statsmgr.counter("num_threads", my_process.num_threads())
statsmgr.counter("system.num_threads", my_process.num_threads())
# perfdatas.append("num_ctx_switches=%d" % my_process.num_ctx_switches())
perfdatas.append("num_fds=%d" % my_process.num_fds())
statsmgr.counter("num_fds", my_process.num_fds())
statsmgr.counter("system.num_fds", my_process.num_fds())
# perfdatas.append("num_handles=%d" % my_process.num_handles())
perfdatas.append("create_time=%d" % my_process.create_time())
perfdatas.append("cpu_num=%d" % my_process.cpu_num())
statsmgr.counter("cpu_num", my_process.cpu_num())
statsmgr.counter("system.cpu_num", my_process.cpu_num())
perfdatas.append("cpu_usable=%d" % len(my_process.cpu_affinity()))
statsmgr.counter("cpu_usable", len(my_process.cpu_affinity()))
statsmgr.counter("system.cpu_usable", len(my_process.cpu_affinity()))
perfdatas.append("cpu_percent=%.2f%%" % my_process.cpu_percent())
statsmgr.counter("cpu_percent", my_process.cpu_percent())
statsmgr.counter("system.cpu_percent", my_process.cpu_percent())

cpu_times_percent = my_process.cpu_times()
for key in cpu_times_percent._fields:
perfdatas.append("cpu_%s_time=%.2fs" % (key,
getattr(cpu_times_percent, key)))
statsmgr.counter("cpu_%s_time" % key, getattr(cpu_times_percent, key))
statsmgr.counter("system.cpu_%s_time" % key, getattr(cpu_times_percent, key))

memory = my_process.memory_full_info()
for key in memory._fields:
perfdatas.append("mem_%s=%db" % (key, getattr(memory, key)))
statsmgr.counter("mem_%s" % key, getattr(memory, key))
statsmgr.counter("system.mem_%s" % key, getattr(memory, key))

logger.debug("Daemon %s (%s), pid=%s, ppid=%s, status=%s, cpu/memory|%s",
self.name, my_process.name(), my_process.pid, my_process.ppid(),
Expand Down Expand Up @@ -1110,9 +1106,8 @@ def do_main_loop(self): # pylint: disable=too-many-branches, too-many-statement
if self.log_loop:
logger.debug("Elapsed time, current loop: %.2f, from start: %.2f (%d loops)",
loop_duration, elapsed_time, self.loop_count)
statsmgr.gauge('loop.count', self.loop_count)
statsmgr.timer('loop.duration', loop_duration)
statsmgr.timer('run.duration', elapsed_time)
statsmgr.gauge('loop-count', self.loop_count)
statsmgr.timer('run-duration', elapsed_time)

# Maybe someone said we will stop...
if self.will_stop:
Expand Down Expand Up @@ -1161,7 +1156,8 @@ def do_load_modules(self, modules):
if self.modules_manager.configuration_warnings: # pragma: no cover, not tested
for msg in self.modules_manager.configuration_warning:
logger.warning(msg)
statsmgr.timer('modules-loading', time.time() - _ts)
statsmgr.gauge('modules.count', len(modules))
statsmgr.timer('modules.load-time', time.time() - _ts)

def add(self, elt):
""" Abstract method for adding brok
Expand Down Expand Up @@ -1840,7 +1836,7 @@ def wait_for_initial_conf(self, timeout=1.0):

if not self.interrupted:
logger.info("Got initial configuration, waited for: %.2f", time.time() - _ts)
statsmgr.timer('initial-configuration', time.time() - _ts)
statsmgr.timer('configuration.initial', time.time() - _ts)
else:
logger.info("Interrupted before getting the initial configuration")

Expand All @@ -1861,7 +1857,7 @@ def wait_for_new_conf(self, timeout=1.0):

if not self.interrupted:
logger.info("Got the new configuration, waited for: %.2f", time.time() - _ts)
statsmgr.timer('new-configuration', time.time() - _ts)
statsmgr.timer('configuration.new', time.time() - _ts)
else:
logger.info("Interrupted before getting the new configuration")

Expand Down Expand Up @@ -1903,7 +1899,7 @@ def hook_point(self, hook_name):
logger.exception('Exception %s', exp)
self.modules_manager.set_to_restart(module)
else:
statsmgr.timer('core.hook.%s.%s' % (module.name, hook_name), time.time() - _ts)
statsmgr.timer('hook.%s.%s' % (module.name, hook_name), time.time() - _ts)

def get_retention_data(self): # pylint: disable=R0201
"""Basic function to get retention data,
Expand Down Expand Up @@ -2043,6 +2039,7 @@ def get_objects_from_from_queues(self):
:return: True if we got something in the queue, False otherwise.
:rtype: bool
"""
_t0 = time.time()
had_some_objects = False
for module in self.modules_manager.get_external_instances():
queue = module.from_q
Expand All @@ -2051,7 +2048,7 @@ def get_objects_from_from_queues(self):
while True:
queue_size = queue.qsize()
if queue_size:
statsmgr.gauge('queue.from.%s.size' % module.get_name(), queue_size)
statsmgr.gauge('queues.from.%s.count' % module.get_name(), queue_size)
try:
obj = queue.get_nowait()
except Full:
Expand All @@ -2066,6 +2063,7 @@ def get_objects_from_from_queues(self):
else:
had_some_objects = True
self.add(obj)
statsmgr.timer('queues.time', time.time() - _t0)

return had_some_objects

Expand Down
11 changes: 6 additions & 5 deletions alignak/daemons/arbiterdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ def add(self, elt):
"""
if isinstance(elt, Brok):
self.broks[elt.uuid] = elt
statsmgr.counter('broks.got', 1)
statsmgr.counter('broks.added', 1)
elif isinstance(elt, ExternalCommand):
self.external_commands.append(elt)
statsmgr.counter('external-commands.got', 1)
statsmgr.counter('external-commands.added', 1)
else: # pragma: no cover, simple dev alerting
logger.error('Do not manage object type %s (%s)', type(elt), elt)

Expand Down Expand Up @@ -818,7 +818,7 @@ def load_modules_alignak_configuration(self): # pragma: no cover, not yet with
logger.error("Back trace of this remove: %s", output.getvalue())
output.close()
continue
statsmgr.timer('core.hook.get_alignak_configuration', time.time() - _t0)
statsmgr.timer('hook.get_alignak_configuration', time.time() - _t0)

params = []
if alignak_cfg:
Expand Down Expand Up @@ -1532,7 +1532,7 @@ def do_loop_turn(self): # pylint: disable=too-many-branches, too-many-statement
# # Maybe our satellites raised new broks. Reap them...
# _t0 = time.time()
# self.get_broks_from_satellites()
# statsmgr.timer('broks.got', time.time() - _t0)
# statsmgr.timer('broks.got.time', time.time() - _t0)
#
# # Maybe our satellites raised new external commands. Reap them...
# _t0 = time.time()
Expand All @@ -1542,7 +1542,7 @@ def do_loop_turn(self): # pylint: disable=too-many-branches, too-many-statement
# # One broker is responsible for our broks, we give him our broks
# _t0 = time.time()
# self.push_broks_to_broker()
# statsmgr.timer('broks.pushed', time.time() - _t0)
# statsmgr.timer('broks.pushed.time', time.time() - _t0)
#
# # We push our external commands to our schedulers...
# _t0 = time.time()
Expand Down Expand Up @@ -1675,6 +1675,7 @@ def get_daemon_stats(self, details=False): # pylint: disable=too-many-branches
'brokers', 'receivers', 'pollers'):
counters["dispatcher.%s" % sat_type] = len(getattr(self.dispatcher, sat_type))

# To be refactored
metrics = res['metrics']
metrics.append('%s.%s.external-commands.queue %d %d'
% (self.type, self.name, len(self.external_commands), now))
Expand Down
8 changes: 5 additions & 3 deletions alignak/daemons/brokerdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def do_loop_turn(self):
_t0 = time.time()
try:
my_initial_broks = satellite.get_initial_broks(self.name)
statsmgr.timer('initial-broks.%s' % satellite.name, time.time() - _t0)
statsmgr.timer('broks.initial.%s.time' % satellite.name, time.time() - _t0)
if not my_initial_broks:
logger.info("No initial broks were raised, "
"my scheduler is not yet ready...")
Expand All @@ -444,6 +444,7 @@ def do_loop_turn(self):
self.got_initial_broks = True
logger.info("Got %d initial broks from '%s'",
my_initial_broks, satellite.name)
statsmgr.gauge('broks.initial.%s.count' % satellite.name, my_initial_broks)
except LinkError as exp:
logger.warning("Scheduler connection failed, I could not get initial broks!")

Expand Down Expand Up @@ -515,8 +516,8 @@ def do_loop_turn(self):

# Maybe our external modules raised 'objects', so get them
if self.get_objects_from_from_queues():
statsmgr.gauge('got.external-commands', len(self.external_commands))
statsmgr.gauge('got.broks', len(self.external_broks))
statsmgr.gauge('external-commands.got.count', len(self.external_commands))
statsmgr.gauge('broks.got.count', len(self.external_broks))

def get_daemon_stats(self, details=False):
"""Increase the stats provided by the Daemon base class
Expand All @@ -538,6 +539,7 @@ def get_daemon_stats(self, details=False):
counters['reactionners'] = len(self.reactionners)
counters['receivers'] = len(self.receivers)

# To be refactored
metrics = res['metrics']
metrics.append('%s.%s.external-broks.queue %d %d'
% (self.type, self.name, len(self.external_broks), now))
Expand Down
8 changes: 3 additions & 5 deletions alignak/daemons/receiverdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,12 @@ def do_loop_turn(self):
# Get external commands from the arbiters...
_t0 = time.time()
self.get_external_commands_from_arbiters()
statsmgr.timer('broks.got', time.time() - _t0)

statsmgr.gauge('got.external-commands', len(self.unprocessed_external_commands))
statsmgr.gauge('got.broks', len(self.broks))
statsmgr.timer('external-commands.got.time', time.time() - _t0)
statsmgr.gauge('external-commands.got.count', len(self.unprocessed_external_commands))

_t0 = time.time()
self.push_external_commands_to_schedulers()
statsmgr.timer('external-commands.pushed', time.time() - _t0)
statsmgr.timer('external-commands.pushed.time', time.time() - _t0)

# Say to modules it's a new tick :)
self.hook_point('tick')
Expand Down
2 changes: 1 addition & 1 deletion alignak/http/arbiter_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def command(self):
command_line = '%s;%s' % (command_line, parameters)

# Add a command to get managed
logger.debug("Got an external command: %s", command_line)
logger.warning("Got an external command: %s", command_line)
self.app.add(ExternalCommand(command_line))

return {'_status': 'OK', '_message': "Got command: %s" % command_line}
Expand Down
1 change: 0 additions & 1 deletion alignak/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ def set_log_level(log_level=logging.INFO):
:param log_level: log level
:return: n/a
"""
"""Set the test logger at DEBUG level - useful for some tests that check debug log"""
# Change the collector logger log level
print("set_debug_log")
logger_ = logging.getLogger(ALIGNAK_LOGGER_NAME)
Expand Down
15 changes: 6 additions & 9 deletions alignak/satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def get_new_actions(self):
try:
_t0 = time.time()
self.do_get_new_actions()
statsmgr.timer('core.get-new-actions', time.time() - _t0)
statsmgr.timer('actions.got.time', time.time() - _t0)
except RuntimeError:
logger.error("Exception like issue #1007")

Expand Down Expand Up @@ -797,8 +797,7 @@ def do_get_new_actions(self):
self.add_actions(actions, scheduler_link)
logger.debug("Got %d actions from %s in %s",
len(actions), scheduler_link.name, time.time() - _t0)
statsmgr.gauge('get-new-actions-count.%s' % (scheduler_link.name), len(actions))
statsmgr.timer('get-new-actions-time.%s' % (scheduler_link.name), time.time() - _t0)
statsmgr.gauge('actions.added.count.%s' % (scheduler_link.name), len(actions))

def clean_previous_run(self):
"""Clean variables from previous configuration,
Expand Down Expand Up @@ -844,9 +843,9 @@ def do_loop_turn(self): # pylint: disable=too-many-branches
logger.debug("[%s][%s][%s] actions queued: %d, results queued: %d",
sched.name, mod, worker_id, actions_count, results_count)
# Update the statistics
statsmgr.gauge('core.worker-%s.actions-queue-size' % worker_id,
statsmgr.gauge('worker.%s.actions-queue-size' % worker_id,
actions_count)
statsmgr.gauge('core.worker-%s.results-queue-size' % worker_id,
statsmgr.gauge('worker.%s.results-queue-size' % worker_id,
results_count)
except (IOError, EOFError):
pass
Expand Down Expand Up @@ -933,13 +932,11 @@ def do_loop_turn(self): # pylint: disable=too-many-branches
logger.warning("Scheduler connection failed, I could not get new actions!")

# Get objects from our modules that are not Worker based
_t0 = time.time()
if self.log_loop:
logger.debug("[%s] get objects from queues", self.name)
self.get_objects_from_from_queues()
statsmgr.timer('core.get-objects-from-queues', time.time() - _t0)
statsmgr.gauge('got.external-commands', len(self.external_commands))
statsmgr.gauge('got.broks', len(self.broks))
statsmgr.gauge('external-commands.count', len(self.external_commands))
statsmgr.gauge('broks.count', len(self.broks))

def do_post_daemon_init(self):
"""Do this satellite (poller or reactionner) post "daemonize" init
Expand Down
14 changes: 9 additions & 5 deletions alignak/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,15 @@ def run_external_commands(self, cmds):
if not self.external_commands_manager:
return

_t0 = time.time()
logger.debug("Scheduler '%s' got %d commands", self.name, len(cmds))
for command in cmds:
self.external_commands_manager.resolve_command(ExternalCommand(command))
statsmgr.timer('core.run_external_commands', time.time() - _t0)
try:
_t0 = time.time()
logger.debug("Scheduler '%s' got %d commands", self.name, len(cmds))
for command in cmds:
self.external_commands_manager.resolve_command(ExternalCommand(command))
statsmgr.counter('external_commands.got.count', len(cmds))
statsmgr.timer('external_commands.got.time', time.time() - _t0)
except Exception as exp:
logger.warning("External command parsing error: %s", exp)

def add_brok(self, brok, broker_uuid=None):
"""Add a brok into brokers list
Expand Down
7 changes: 4 additions & 3 deletions alignak/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,17 @@ def flush(self):
return True

try:
logger.info("Flushing %d metrics to Graphite/carbon", self.metrics_count)
logger.debug("Flushing %d metrics to Graphite/carbon", self.metrics_count)
if self.carbon.send_data():
self.my_metrics = []
else:
logger.warning("Failed sending metrics to Graphite/carbon. Inner stored metric: %d",
self.metrics_count)
return False
except Exception: # pylint: disable=broad-except
except Exception as exp: # pylint: disable=broad-except
logger.warning("Failed sending metrics to Graphite/carbon. Inner stored metric: %d",
self.metrics_count)
logger.warning("Exception: %s", str(exp))
return False
return True

Expand All @@ -434,7 +435,7 @@ def send_to_graphite(self, metric, value):
"""
# Manage Graphite part
if self.statsd_enabled and self.carbon:
self.my_metrics.append(('.'.join([self.statsd_prefix, metric]),
self.my_metrics.append(('.'.join([self.statsd_prefix, self.name, metric]),
(int(time.time()), value)))
if self.metrics_count >= self.metrics_flush_count:
self.carbon.add_data_list(self.my_metrics)
Expand Down
Loading

0 comments on commit 76ca24c

Please sign in to comment.