Skip to content

Commit

Permalink
Add test for spare daemons
Browse files Browse the repository at this point in the history
  • Loading branch information
mohierf committed Nov 20, 2016
1 parent d8a48eb commit 2d5b4ef
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 136 deletions.
8 changes: 6 additions & 2 deletions alignak/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,12 @@ class Daemon(object):
BoolProp(default=False),
'daemon_enabled':
BoolProp(default=True),
'spare':
BoolProp(default=False),
# Todo: spare is not present currently in the daemon.ini file
# 'spare':
# BoolProp(default=False),
# Todo: missing daemon.ini port parameter!
'port':
IntegerProp(default=0),
'max_queue_size':
IntegerProp(default=0),
'daemon_thread_pool_size':
Expand Down
86 changes: 52 additions & 34 deletions alignak/daemons/arbiterdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ class Arbiter(Daemon): # pylint: disable=R0902
})

def __init__(self, config_file, monitoring_files, is_daemon, do_replace, verify_only, debug,
debug_file, config_name, analyse=None):
debug_file, arbiter_name, analyse=None):

super(Arbiter, self).__init__('arbiter', config_file, is_daemon, do_replace,
debug, debug_file)

self.config_files = monitoring_files
self.verify_only = verify_only
self.analyse = analyse
self.config_name = config_name
self.arbiter_name = arbiter_name

self.broks = {}
self.is_master = False
Expand Down Expand Up @@ -254,9 +254,11 @@ def load_monitoring_config_file(self): # pylint: disable=R0915

self.conf.early_arbiter_linking()

# Search which Arbiterlink I am
# Search which arbiter I am in the arbiters list
for arb in self.conf.arbiters:
if arb.get_name() in ['Default-Arbiter', self.config_name]:
if arb.get_name() in ['Default-Arbiter', self.arbiter_name]:
logger.info("Found myself in the configuration: %s", arb.get_name())
# Arbiter is master one
arb.need_conf = False
self.myself = arb
self.is_master = not self.myself.spare
Expand All @@ -276,14 +278,16 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
# Set myself as alive ;)
self.myself.alive = True
else: # not me
# Arbiter is not me!
logger.info("Found another arbiter in the configuration: %s", arb.get_name())
arb.need_conf = True

if not self.myself:
sys.exit("Error: I cannot find my own Arbiter object (%s), I bail out. "
"To solve this, please change the arbiter_name parameter in "
"the arbiter configuration file (certainly arbiter-master.cfg) "
"with the value '%s'."
" Thanks." % (self.config_name, socket.gethostname()))
" Thanks." % (self.arbiter_name, socket.gethostname()))

# Ok it's time to load the module manager now!
self.load_modules_manager()
Expand Down Expand Up @@ -410,7 +414,7 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
self.conf.show_errors()
sys.exit(0)

if self.analyse:
if self.analyse: # pragma: no cover, not used currently (see #607)
self.launch_analyse()
sys.exit(0)

Expand All @@ -420,18 +424,18 @@ def load_monitoring_config_file(self): # pylint: disable=R0915
self.conf.prepare_for_sending()

# Ignore daemon configuration parameters (port, log, ...) in the monitoring configuration
# It's better to use daemon default parameters rather than host found in the monitoring
# configuration...
# It's better to use daemon default parameters rather than those found in the monitoring
# configuration (if some are found because they should not be there)...

self.accept_passive_unknown_check_results = BoolProp.pythonize(
getattr(self.myself, 'accept_passive_unknown_check_results', '0')
)

# We need to set self.host & self.port to be used by do_daemon_init_and_start
self.host = self.myself.address
self.port = self.myself.port
# self.host = self.myself.address
# self.port = self.myself.port

logger.info("Configuration Loaded")
logger.info("Configuration loaded and prepared")

# Still a last configuration check because some things may have changed when
# we prepared the configuration for sending
Expand Down Expand Up @@ -492,7 +496,7 @@ def load_modules_configuration_objects(self, raw_objects):
len(objs[prop]), type_c, inst.get_name())

def launch_analyse(self):
"""Print the number of objects we have for each type.
""" Dump the number of objects we have for each type to a JSON formatted file
:return: None
"""
Expand Down Expand Up @@ -556,7 +560,7 @@ def main(self):
else:
self.request_stop()

except SystemExit, exp:
except SystemExit as exp:
# With a 2.4 interpreter the sys.exit() in load_config_file
# ends up here and must be handled.
sys.exit(exp.code)
Expand All @@ -565,27 +569,37 @@ def main(self):
raise

def setup_new_conf(self):
""" Setup a new conf received from a Master arbiter.
""" Setup a new configuration received from a Master arbiter.
Todo: perharps we should not accept the configuration or raise an error if we do not
find our own configuration data in the data. Thus this should never happen...
:return: None
"""
with self.conf_lock:
conf = self.new_conf
if not conf:
if not self.new_conf:
logger.warning("Should not be here - I already got a configuration")
return

logger.info("I received a new configuration from my master")

try:
conf = unserialize(conf)
conf = unserialize(self.new_conf)
except AlignakClassLookupException as exp:
logger.error('Cannot un-serialize configuration received from arbiter: %s', exp)
self.new_conf = None
logger.exception('Cannot un-serialize received configuration: %s', exp)
return

logger.info("Got new configuration #%s", conf.magic_hash)

logger.info("I am: %s", self.arbiter_name)
# This is my new configuration now ...
self.cur_conf = conf
self.conf = conf
# Ready to get a new one ...
self.new_conf = None
for arb in self.conf.arbiters:
if (arb.address, arb.port) == (self.host, self.port):
if arb.get_name() in ['Default-Arbiter', self.arbiter_name]:
self.myself = arb
arb.is_me = lambda x: True # we now definitively know who we are, just keep it.
else:
arb.is_me = lambda x: False # and we know who we are not, just keep it.
logger.info("I found myself in the configuration")

def do_loop_turn(self):
"""Loop turn for Arbiter
Expand All @@ -595,8 +609,8 @@ def do_loop_turn(self):
"""
# If I am a spare, I wait for the master arbiter to send me
# true conf.
if self.myself.spare:
logger.debug("I wait for master")
if not self.is_master:
logger.info("Waiting for master...")
self.wait_for_master_death()

if self.must_run:
Expand All @@ -608,33 +622,34 @@ def wait_for_master_death(self):
:return: None
"""
logger.info("Waiting for master death")
timeout = 1.0
self.last_master_speack = time.time()
self.last_master_ping = time.time()

# Look for the master timeout
master_timeout = 300
for arb in self.conf.arbiters:
if not arb.spare:
master_timeout = arb.check_interval * arb.max_check_attempts
logger.info("I'll wait master for %d seconds", master_timeout)
logger.warning("I'll wait master for %d seconds", master_timeout)

while not self.interrupted:
# This is basically sleep(timeout) and returns 0, [], int
# We could only paste here only the code "used" but it could be
# harder to maintain.
_, _, tcdiff = self.handle_requests(timeout)
# if there was a system Time Change (tcdiff) then we have to adapt last_master_speak:
# if there was a system Time Change (tcdiff) then we have to adapt last_master_ping:
if tcdiff:
self.last_master_ping += tcdiff

if self.new_conf:
self.setup_new_conf()
if tcdiff:
self.last_master_speack += tcdiff

sys.stdout.write(".")
sys.stdout.flush()

# Now check if master is dead or not
now = time.time()
if now - self.last_master_speack > master_timeout:
if now - self.last_master_ping > master_timeout:
logger.info("Arbiter Master is dead. The arbiter %s take the lead",
self.myself.get_name())
for arb in self.conf.arbiters:
Expand Down Expand Up @@ -685,16 +700,19 @@ def run(self):
# Before running, I must be sure who am I
# The arbiters change, so we must re-discover the new self.me
for arb in self.conf.arbiters:
if arb.get_name() in ['Default-Arbiter', self.config_name]:
if arb.get_name() in ['Default-Arbiter', self.arbiter_name]:
self.myself = arb
logger.info("I am the arbiter: %s", self.myself.arbiter_name)

logger.info("Begin to dispatch configurations to satellites")
logger.info("Begin to dispatch configuration to the satellites")
logger.warning('Configuration sent to dispatcher: %s / %s', self.conf.uuid, self.conf.magic_hash)
self.dispatcher = Dispatcher(self.conf, self.myself)
self.dispatcher.check_alive()
self.dispatcher.check_dispatch()
# REF: doc/alignak-conf-dispatching.png (3)
self.dispatcher.prepare_dispatch()
self.dispatcher.dispatch()
logger.info("Configuration has been dispatched to the satellites")

# Now we can get all initial broks for our satellites
self.get_initial_broks_from_satellitelinks()
Expand Down
5 changes: 4 additions & 1 deletion alignak/daemons/brokerdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,12 @@ def setup_new_conf(self): # pylint: disable=R0915,R0912
statsd_host=self.statsd_host, statsd_port=self.statsd_port,
statsd_prefix=self.statsd_prefix, statsd_enabled=self.statsd_enabled)

logger.debug("[%s] Sending us configuration %s", self.name, conf)
logger.warning("[%s] Sending us configuration %s", self.name, conf)

# If we've got something in the schedulers, we do not
# want it anymore
# self.schedulers.clear()
logger.warning("[%s] schedulers: %s", self.name, conf['schedulers'])
for sched_id in conf['schedulers']:
# Must look if we already have it to do not overdie our broks

Expand Down Expand Up @@ -517,6 +518,7 @@ def setup_new_conf(self): # pylint: disable=R0915,R0912
logger.info(" - %s ", daemon['name'])

# Now get arbiter
logger.warning("[%s] arbiters: %s", self.name, conf['arbiters'])
for arb_id in conf['arbiters']:
# Must look if we already have it
already_got = arb_id in self.arbiters
Expand Down Expand Up @@ -551,6 +553,7 @@ def setup_new_conf(self): # pylint: disable=R0915,R0912
logger.info(" - %s ", daemon['name'])

# Now for pollers
logger.warning("[%s] pollers: %s", self.name, conf['pollers'])
for pol_id in conf['pollers']:
# Must look if we already have it
already_got = pol_id in self.pollers
Expand Down
34 changes: 21 additions & 13 deletions alignak/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self, conf, arbiter):
self.arbiter = arbiter
# Pointer to the whole conf
self.conf = conf
logger.warning('Dispatcher configuration: %s / %s', self.conf.uuid, self.conf.magic_hash)
self.realms = conf.realms
# Direct pointer to important elements for us

Expand Down Expand Up @@ -172,19 +173,21 @@ def check_dispatch(self):
# Check if the other arbiter has a conf, but only if I am a master
for arb in self.arbiters:
# If not me and I'm a master
if arb != self.arbiter and self.arbiter and not self.arbiter.spare:
if self.arbiter and arb != self.arbiter and not self.arbiter.spare:
logger.info('Configuration to dispatch: #%s (%d bytes)',
self.conf.magic_hash, len(self.conf.whole_conf_pack))
if not arb.have_conf(self.conf.magic_hash):
if not hasattr(self.conf, 'whole_conf_pack'):
logger.error('CRITICAL: the arbiter try to send a configuration but '
'it is not a MASTER one?? Look at your configuration.')
continue
logger.info('Configuration sent to arbiter: %s', arb.get_name())
logger.info('Sending configuration #%s to arbiter: %s',
self.conf.magic_hash, arb.get_name())
test = self.conf.whole_conf_pack.split('magic_hash')
arb.put_conf(self.conf.whole_conf_pack)
# Remind it that WE are the master here!
arb.do_not_run()
logger.info('Configuration sent to arbiter: %s', arb.get_name())
else:
# Ok, it already has the conf. I remember that
# it does not have to run, I'm still alive!
logger.debug("Do not send configuration")
arb.do_not_run()

# We check for confs to be dispatched on alive schedulers. If not dispatched, need
Expand Down Expand Up @@ -224,9 +227,9 @@ def check_dispatch(self):
sched.need_conf = True
sched.conf = None

self.check_disptach_other_satellites()
self.check_dispatch_other_satellites()

def check_disptach_other_satellites(self):
def check_dispatch_other_satellites(self):
"""
Check the dispatch in other satellites: reactionner, poller, broker, receiver
Expand Down Expand Up @@ -531,16 +534,19 @@ def prepare_dispatch_other_satellites(self, sat_type, realm, cfg, arbiters_cfg):
satellite_string = "[%s] Dispatching %s satellites ordered as: " % (
realm.get_name(), sat_type)
for sat in satellites:
satellite_string += '%s (spare:%s), ' % (
sat.get_name(), str(sat.spare))
satellite_string += '%s (spare:%s), ' % (sat.get_name(), str(sat.spare))
logger.info(satellite_string)

conf_uuid = cfg.uuid
# Now we dispatch cfg to every one ask for it
nb_cfg_prepared = 0
for sat in satellites:
if nb_cfg_prepared >= realm.get_nb_of_must_have_satellites(sat_type):
continue
# Todo: Remove this test, because the number of satellites per type in a realm
# do not take care of the spare daemons
# if nb_cfg_prepared >= realm.get_nb_of_must_have_satellites(sat_type):
# logger.warning("Already prepared enough satellites: %d / %s",
# nb_cfg_prepared, sat_type)
# continue
sat.cfg['schedulers'][conf_uuid] = realm.to_satellites[sat_type][conf_uuid]
if sat.manage_arbiters:
sat.cfg['arbiters'] = arbiters_cfg
Expand Down Expand Up @@ -573,6 +579,7 @@ def dispatch(self):
"""
if self.dispatch_ok:
return

self.dispatch_ok = True
for scheduler in self.schedulers:
if scheduler.is_sent:
Expand All @@ -593,7 +600,8 @@ def dispatch(self):
if satellite.get_my_type() == sat_type:
if satellite.is_sent:
continue
logger.info('Sending configuration to %s %s', sat_type, satellite.get_name())
logger.info('Sending configuration to %s %s: %s',
sat_type, satellite.get_name(), satellite.cfg)
is_sent = satellite.put_conf(satellite.cfg)
satellite.is_sent = is_sent
if not is_sent:
Expand Down
5 changes: 5 additions & 0 deletions alignak/http/arbiter_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ def have_conf(self, magic_hash=0):
return self.app.cur_conf and self.app.cur_conf.magic_hash == magic_hash

@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def put_conf(self, conf=None):
"""HTTP POST to the arbiter with the new conf (master send to slave)
:param conf: serialized new configuration
:type conf:
:return: None
"""
# if conf is None:
# confs = cherrypy.request.json
# conf = confs['conf']
with self.app.conf_lock:
super(ArbiterInterface, self).put_conf(conf)
self.app.must_run = False
Expand Down
Loading

0 comments on commit 2d5b4ef

Please sign in to comment.