Skip to content

Commit

Permalink
Merge pull request #800 from Alignak-monitoring/closes-#798
Browse files Browse the repository at this point in the history
Closes #798 - detect and run missing daemons
  • Loading branch information
ddurieux committed May 24, 2017
2 parents 943e253 + 733fbe3 commit 3128e52
Show file tree
Hide file tree
Showing 34 changed files with 784 additions and 434 deletions.
24 changes: 18 additions & 6 deletions alignak/daemon.py
Expand Up @@ -223,7 +223,8 @@ class Daemon(object):
IntegerProp(default=8),
}

def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
def __init__(self, name, config_file, is_daemon, do_replace,
debug, debug_file, port=None, local_log=None):
"""
:param name:
Expand All @@ -242,13 +243,23 @@ def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
self.debug = debug
self.debug_file = debug_file
self.interrupted = False
self.pidfile = None
self.pidfile = "%s.pid" % self.name

if port:
self.port = int(port)
print("Daemon '%s' is started with an overidden port number: %d"
% (self.name, self.port))

if local_log:
self.local_log = local_log
print("Daemon '%s' is started with an overidden log file: %s"
% (self.name, self.local_log))

if self.debug:
print("Daemon %s is in debug mode" % self.name)
print("Daemon '%s' is in debug mode" % self.name)

if self.is_daemon:
print("Daemon %s is in daemon mode" % self.name)
print("Daemon '%s' is in daemon mode" % self.name)

# Track time
now = time.time()
Expand Down Expand Up @@ -290,7 +301,8 @@ def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
# Fill the properties
properties = self.__class__.properties
for prop, entry in properties.items():
setattr(self, prop, entry.pythonize(entry.default))
if getattr(self, prop, None) is None:
setattr(self, prop, entry.pythonize(entry.default))

# At least, lose the local log file if needed
def do_stop(self):
Expand Down Expand Up @@ -524,7 +536,7 @@ def check_parallel_run(self): # pragma: no cover, not with unit tests...
pid_var = self.fpid.readline().strip(' \r\n')
if pid_var:
pid = int(pid_var)
logger.info("Found an existing pid: '%s'", pid_var)
logger.info("Found an existing pid (%s): '%s'", self.pidfile, pid_var)
else:
logger.debug("Not found an existing pid: %s", self.pidfile)
return
Expand Down
125 changes: 108 additions & 17 deletions alignak/daemons/arbiterdaemon.py
Expand Up @@ -67,8 +67,11 @@
import cStringIO
import json

import subprocess

from alignak.misc.serialization import unserialize, AlignakClassLookupException
from alignak.objects.config import Config
from alignak.macroresolver import MacroResolver
from alignak.external_command import ExternalCommandManager
from alignak.dispatcher import Dispatcher
from alignak.daemon import Daemon
Expand Down Expand Up @@ -100,16 +103,21 @@ class Arbiter(Daemon): # pylint: disable=R0902
PathProp(default='arbiterd.log'),
})

# pylint: disable=too-many-arguments
def __init__(self, config_file, monitoring_files, is_daemon, do_replace, verify_only, debug,
debug_file, arbiter_name, analyse=None):
debug_file, alignak_name, analyse=None,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'arbiter'
if daemon_name:
self.daemon_name = daemon_name

super(Arbiter, self).__init__('arbiter', config_file, is_daemon, do_replace,
debug, debug_file)
super(Arbiter, self).__init__(self.daemon_name, config_file, is_daemon, do_replace,
debug, debug_file, port, local_log)

self.config_files = monitoring_files
self.verify_only = verify_only
self.analyse = analyse
self.arbiter_name = arbiter_name
self.arbiter_name = alignak_name
self.alignak_name = None

self.broks = {}
Expand All @@ -128,21 +136,23 @@ def __init__(self, config_file, monitoring_files, is_daemon, do_replace, verify_
self.http_interface = ArbiterInterface(self)
self.conf = Config()

def add(self, b):
def add(self, elt):
"""Generic function to add objects to queues.
Only manage Broks and ExternalCommand
#Todo: does the arbiter still needs to manage external commands
:param b: objects to add
:type b: alignak.brok.Brok | alignak.external_command.ExternalCommand
:param elt: objects to add
:type elt: alignak.brok.Brok | alignak.external_command.ExternalCommand
:return: None
"""
if isinstance(b, Brok):
self.broks[b.uuid] = b
elif isinstance(b, ExternalCommand): # pragma: no cover, useful?
if isinstance(elt, Brok):
self.broks[elt.uuid] = elt
elif isinstance(elt, ExternalCommand): # pragma: no cover, useful?
# todo: does the arbiter will still manage external commands? It is the receiver job!
self.external_commands.append(b)
self.external_commands.append(elt)
else:
logger.warning('Cannot manage object type %s (%s)', type(elt), elt)

def push_broks_to_broker(self):
"""Send all broks from arbiter internal list to broker
Expand Down Expand Up @@ -213,6 +223,7 @@ def get_daemon_links(daemon_type): # pragma: no cover, not used anywhere
# the attribute name to get these differs for schedulers and arbiters
return daemon_type + 's'

# pylint: disable=too-many-branches
def load_monitoring_config_file(self): # pylint: disable=R0915
"""Load main configuration file (alignak.cfg)::
Expand Down Expand Up @@ -396,6 +407,16 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
# Maybe some elements were not wrong, so we must clean if possible
self.conf.clean()

# Dump Alignak macros
macro_resolver = MacroResolver()
macro_resolver.init(self.conf)

logger.info("Alignak global macros:")
for macro_name in sorted(self.conf.macros):
macro_value = macro_resolver.resolve_simple_macros_in_string("$%s$" % macro_name, [],
None, None)
logger.info("- $%s$ = %s", macro_name, macro_value)

# If the conf is not correct, we must get out now (do not try to split the configuration)
if not self.conf.conf_is_correct: # pragma: no cover, not with unit tests.
err = "Configuration is incorrect, sorry, I bail out"
Expand All @@ -417,14 +438,17 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
self.conf.show_errors()
sys.exit(err)

logger.info('Things look okay - No serious problems were detected '
'during the pre-flight check')

# Clean objects of temporary/unnecessary attributes for live work:
self.conf.clean()

logger.info("Things look okay - "
"No serious problems were detected during the pre-flight check")

# Exit if we are just here for config checking
if self.verify_only:
if self.conf.missing_daemons:
logger.warning("Some missing daemons were detected in the parsed configuration.")

logger.info("Arbiter checked the configuration")
# Display found warnings and errors
self.conf.show_errors()
Expand All @@ -434,6 +458,17 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
self.launch_analyse()
sys.exit(0)

# Some errors like a realm with hosts and no schedulers for it may imply to run new daemons
if self.conf.missing_daemons:
logger.info("Trying to handle the missing daemons...")
if not self.manage_missing_daemons():
err = "Some detected as missing daemons did not started correctly. " \
"Sorry, I bail out"
logger.error(err)
# Display found warnings and errors
self.conf.show_errors()
sys.exit(err)

# Some properties need to be "flatten" (put in strings)
# before being sent, like realms for hosts for example
# BEWARE: after the cutting part, because we stringify some properties
Expand All @@ -455,7 +490,7 @@ def load_monitoring_config_file(self): # pylint: disable=R0915

# Still a last configuration check because some things may have changed when
# we prepared the configuration for sending
if not self.conf.conf_is_correct: # pragma: no cover, not with unit tests.
if not self.conf.conf_is_correct:
err = "Configuration is incorrect, sorry, I bail out"
logger.error(err)
# Display found warnings and errors
Expand All @@ -465,6 +500,62 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
# Display found warnings and errors
self.conf.show_errors()

def manage_missing_daemons(self):
"""Manage the list of detected missing daemons
If the daemon does not in exist `my_satellites`, then:
- prepare daemon start arguments (port, name and log file)
- start the daemon
- make sure it started correctly
:return: True if all daemons are running, else False
"""
result = True
# Parse the list of the missing daemons and try to run the corresponding processes
satellites = [self.conf.schedulers, self.conf.pollers, self.conf.brokers]
self.my_satellites = {}
for satellites_list in satellites:
daemons_class = satellites_list.inner_class
for daemon in self.conf.missing_daemons:
if daemon.__class__ != daemons_class:
continue

daemon_type = getattr(daemon, 'my_type', None)
daemon_log_folder = getattr(self.conf, 'daemons_log_folder', '/tmp')
daemon_arguments = getattr(self.conf, 'daemons_arguments', '')
daemon_name = daemon.get_name()

if daemon_name in self.my_satellites:
logger.info("Daemon '%s' is still running.", daemon_name)
continue

args = ["alignak-%s" % daemon_type, "--name", daemon_name,
"--port", str(daemon.port),
"--local_log", "%s/%s.log" % (daemon_log_folder, daemon_name)]
if daemon_arguments:
args.append(daemon_arguments)
logger.info("Trying to launch daemon: %s...", daemon_name)
logger.info("... with arguments: %s", args)
self.my_satellites[daemon_name] = subprocess.Popen(args)
logger.info("%s launched (pid=%d)",
daemon_name, self.my_satellites[daemon_name].pid)

# Wait at least one second for a correct start...
time.sleep(1)

ret = self.my_satellites[daemon_name].poll()
if ret is not None:
logger.error("*** %s exited on start!", daemon_name)
for line in iter(self.my_satellites[daemon_name].stdout.readline, b''):
logger.error(">>> " + line.rstrip())
for line in iter(self.my_satellites[daemon_name].stderr.readline, b''):
logger.error(">>> " + line.rstrip())
result = False
else:
logger.info("%s running (pid=%d)",
daemon_name, self.my_satellites[daemon_name].pid)
return result

def load_modules_configuration_objects(self, raw_objects): # pragma: no cover,
# not yet with unit tests.
"""Load configuration objects from arbiter modules
Expand Down Expand Up @@ -498,7 +589,7 @@ def load_modules_configuration_objects(self, raw_objects): # pragma: no cover,
for type_c in types_creations:
(_, _, prop, dummy) = types_creations[type_c]
if prop not in objs:
logger.warning("Got unmanaged %s objects from module %s", prop, inst.get_name())
logger.warning("Did not get '%s' objects from module %s", prop, inst.get_name())
continue
for obj in objs[prop]:
# test if raw_objects[k] are already set - if not, add empty array
Expand Down Expand Up @@ -743,7 +834,7 @@ def push_external_commands_to_schedulers(self):
# Now for all alive schedulers, send the commands
for scheduler in self.conf.schedulers:
cmds = scheduler.external_commands
if len(cmds) > 0 and scheduler.alive:
if cmds and scheduler.alive:
logger.debug("Sending %d commands to scheduler %s", len(cmds), scheduler.get_name())
scheduler.run_external_commands(cmds)
# clean them
Expand Down
12 changes: 8 additions & 4 deletions alignak/daemons/brokerdaemon.py
Expand Up @@ -98,10 +98,14 @@ class Broker(BaseSatellite):
PathProp(default='brokerd.log'),
})

def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):

super(Broker, self).__init__('broker', config_file, is_daemon, do_replace, debug,
debug_file)
def __init__(self, config_file, is_daemon, do_replace, debug, debug_file,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'broker'
if daemon_name:
self.daemon_name = daemon_name

super(Broker, self).__init__(self.daemon_name, config_file, is_daemon, do_replace, debug,
debug_file, port, local_log)

# Our arbiters
self.arbiters = {}
Expand Down
11 changes: 8 additions & 3 deletions alignak/daemons/pollerdaemon.py
Expand Up @@ -70,6 +70,11 @@ class Poller(Satellite):
PathProp(default='pollerd.log'),
})

def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
super(Poller, self).__init__('poller', config_file, is_daemon, do_replace, debug,
debug_file)
def __init__(self, config_file, is_daemon, do_replace, debug, debug_file,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'poller'
if daemon_name:
self.daemon_name = daemon_name

super(Poller, self).__init__(self.daemon_name, config_file, is_daemon, do_replace,
debug, debug_file, port, local_log)
11 changes: 8 additions & 3 deletions alignak/daemons/reactionnerdaemon.py
Expand Up @@ -83,6 +83,11 @@ class Reactionner(Satellite):
PathProp(default='reactionnerd.log'),
})

def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
super(Reactionner, self).__init__('reactionner', config_file, is_daemon, do_replace, debug,
debug_file)
def __init__(self, config_file, is_daemon, do_replace, debug, debug_file,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'reactionner'
if daemon_name:
self.daemon_name = daemon_name

super(Reactionner, self).__init__(self.daemon_name, config_file, is_daemon, do_replace,
debug, debug_file, port, local_log)
12 changes: 8 additions & 4 deletions alignak/daemons/receiverdaemon.py
Expand Up @@ -86,10 +86,14 @@ class Receiver(Satellite):
PathProp(default='receiverd.log'),
})

def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):

super(Receiver, self).__init__(
'receiver', config_file, is_daemon, do_replace, debug, debug_file)
def __init__(self, config_file, is_daemon, do_replace, debug, debug_file,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'receiver'
if daemon_name:
self.daemon_name = daemon_name

super(Receiver, self).__init__(self.daemon_name, config_file, is_daemon, do_replace,
debug, debug_file, port, local_log)

# Our arbiters
self.arbiters = {}
Expand Down
12 changes: 8 additions & 4 deletions alignak/daemons/schedulerdaemon.py
Expand Up @@ -89,10 +89,14 @@ class Alignak(BaseSatellite):
PathProp(default='schedulerd.log'),
})

def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):

BaseSatellite.__init__(self, 'scheduler', config_file, is_daemon, do_replace, debug,
debug_file)
def __init__(self, config_file, is_daemon, do_replace, debug, debug_file,
port=None, local_log=None, daemon_name=None):
self.daemon_name = 'scheduler'
if daemon_name:
self.daemon_name = daemon_name

BaseSatellite.__init__(self, self.daemon_name, config_file, is_daemon, do_replace,
debug, debug_file, port, local_log)

self.http_interface = SchedulerInterface(self)
self.sched = Scheduler(self)
Expand Down

0 comments on commit 3128e52

Please sign in to comment.