Skip to content

Commit

Permalink
Improve Alignak dispatcher - redispatch a configuration to a reloaded…
Browse files Browse the repository at this point in the history
… daemon
  • Loading branch information
mohierf committed Apr 4, 2018
1 parent a4fb2e2 commit f6fadf6
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .pylintrc

Large diffs are not rendered by default.

124 changes: 68 additions & 56 deletions alignak/daemons/arbiterdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ def daemons_check(self):
except psutil.AccessDenied:
# Probably stopping...
if not self.will_stop and proc == daemon['process']:
logger.warning("Daemon %s/%s is not running!",
daemon['satellite'].type, daemon['satellite'].name)
logger.debug("Access denied - Process %s is %s", proc.name(), proc.status())
if not self.start_daemon(daemon['satellite']):
# Set my satellite as dead :(
Expand All @@ -1001,6 +1003,8 @@ def daemons_check(self):
else:
logger.info("I restarted %s/%s",
daemon['satellite'].type, daemon['satellite'].name)
logger.info("Pausing %.2f seconds...", 0.5)
time.sleep(0.5)
else:
logger.info("Child process %s is %s", proc.name(), proc.status())

Expand Down Expand Up @@ -1080,10 +1084,9 @@ def daemons_reachability_check(self):
return True

logger.debug("Alignak daemons reachability check")
result = True

_t0 = time.time()
self.dispatcher.check_reachable()
result = self.dispatcher.check_reachable()
statsmgr.timer('dispatcher.check-alive', time.time() - _t0)

# Set the last check as now
Expand Down Expand Up @@ -1320,61 +1323,62 @@ def manage_signal(self, sig, frame):
else:
Daemon.manage_signal(self, sig, frame)

def configuration_dispatch(self):
def configuration_dispatch(self, not_configured=None):
"""Monitored configuration preparation and dispatch
:return: None
"""
self.dispatcher = Dispatcher(self.conf, self.link_to_myself)
# I set my own dispatched configuration as the provided one...
# because I will not push a configuration to myself :)
self.cur_conf = self.conf

# Loop for the first configuration dispatching, if the first dispatch fails, bail out!
# Without a correct configuration, Alignak daemons will not run correctly
first_connection_try_count = 0
logger.info("Connecting to my satellites...")
while True:
first_connection_try_count += 1

# Initialize connection with all our satellites
self.all_connected = True
for satellite in self.dispatcher.all_daemons_links:
if satellite == self.link_to_myself:
continue
if not satellite.active:
continue
connected = self.daemon_connection_init(satellite, set_wait_new_conf=True)
logger.debug(" %s is %s", satellite, connected)
self.all_connected = self.all_connected and connected
if not not_configured:
self.dispatcher = Dispatcher(self.conf, self.link_to_myself)
# I set my own dispatched configuration as the provided one...
# because I will not push a configuration to myself :)
self.cur_conf = self.conf

# Loop for the first configuration dispatching, if the first dispatch fails, bail out!
# Without a correct configuration, Alignak daemons will not run correctly
first_connection_try_count = 0
logger.info("Connecting to my satellites...")
while True:
first_connection_try_count += 1

if self.all_connected:
logger.info("- satellites connection #%s is ok", first_connection_try_count)
break
else:
logger.warning("- satellites connection #%s is not correct; "
"let's give another chance after %d seconds...",
first_connection_try_count,
self.link_to_myself.polling_interval)
time.sleep(self.link_to_myself.polling_interval)
if first_connection_try_count >= 3:
self.request_stop("All the daemons connections could not be established "
"despite %d tries! "
"Sorry, I bail out!" % first_connection_try_count,
exit_code=4)
# Initialize connection with all our satellites
self.all_connected = True
for satellite in self.dispatcher.all_daemons_links:
if satellite == self.link_to_myself:
continue
if not satellite.active:
continue
connected = self.daemon_connection_init(satellite, set_wait_new_conf=True)
logger.debug(" %s is %s", satellite, connected)
self.all_connected = self.all_connected and connected

# Now I have a connection with all the daemons I need to contact them,
# check they are alive and ready to run
_t0 = time.time()
self.all_connected = self.dispatcher.check_reachable()
statsmgr.timer('dispatcher.check-alive', time.time() - _t0)
if self.all_connected:
logger.info("- satellites connection #%s is ok", first_connection_try_count)
break
else:
logger.warning("- satellites connection #%s is not correct; "
"let's give another chance after %d seconds...",
first_connection_try_count,
self.link_to_myself.polling_interval)
time.sleep(self.link_to_myself.polling_interval)
if first_connection_try_count >= 3:
self.request_stop("All the daemons connections could not be established "
"despite %d tries! "
"Sorry, I bail out!" % first_connection_try_count,
exit_code=4)

# Now I have a connection with all the daemons I need to contact them,
# check they are alive and ready to run
_t0 = time.time()
self.all_connected = self.dispatcher.check_reachable()
statsmgr.timer('dispatcher.check-alive', time.time() - _t0)

_t0 = time.time()
# Preparing the configuration for dispatching
logger.info("Preparing the configuration for dispatching...")
self.dispatcher.prepare_dispatch()
statsmgr.timer('dispatcher.prepare-dispatch', time.time() - _t0)
logger.info("- configuration is ready to dispatch")
_t0 = time.time()
# Preparing the configuration for dispatching
logger.info("Preparing the configuration for dispatching...")
self.dispatcher.prepare_dispatch()
statsmgr.timer('dispatcher.prepare-dispatch', time.time() - _t0)
logger.info("- configuration is ready to dispatch")

# Loop for the first configuration dispatching, if the first dispatch fails, bail out!
# Without a correct configuration, Alignak daemons will not run correctly
Expand Down Expand Up @@ -1437,7 +1441,7 @@ def do_before_loop(self):
# Make a pause to let our started daemons get ready...
pause = max(self.conf.daemons_start_timeout, len(self.my_daemons) * 0.5)
if pause:
logger.info("Pausing %d seconds...", pause)
logger.info("Pausing %.2f seconds...", pause)
time.sleep(pause)

# Prepare and dispatch the monitored configuration
Expand Down Expand Up @@ -1505,11 +1509,19 @@ def do_loop_turn(self): # pylint: disable=too-many-branches, too-many-statement

# Check that my daemons are alive
if not self.daemons_check():
self.request_stop(message="Some Alignak daemons cannot be checked.",
exit_code=4)
if self.conf.daemons_failure_kill:
self.request_stop(message="Some Alignak daemons cannot be checked.",
exit_code=4)
else:
logger.warning("Should have killed my children if "
"'daemons_failure_kill' were set!")

# Now the dispatcher job - check if all daemons are reachable and have a configuration
if not self.daemons_reachability_check():
logger.warning("A new configuration dispatch is required!")

# Now the dispatcher job
self.daemons_reachability_check()
# Prepare and dispatch the monitored configuration
self.configuration_dispatch(self.dispatcher.not_configured)

# Now get things from our module instances
_t0 = time.time()
Expand Down Expand Up @@ -1923,7 +1935,7 @@ def main(self):
# Make a pause to let our satellites get ready...
pause = self.conf.daemons_new_conf_timeout
if pause:
logger.info("Pausing %d seconds...", pause)
logger.info("Pausing %.2f seconds...", pause)
time.sleep(pause)

except Exception as exp: # pragma: no cover, this should never happen indeed ;)
Expand Down
30 changes: 24 additions & 6 deletions alignak/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ def __init__(self, conf, arbiter_link):
self.brokers = []
self.receivers = []

# List the satellites that are not configured
self.not_configured = []

# Direct pointer to important elements for us
self.arbiter_link = arbiter_link
self.alignak_conf = conf
Expand Down Expand Up @@ -207,7 +210,7 @@ def check_reachable(self, forced=False, test=False):
:return: True if all daemons are reachable
"""
all_ok = True
not_configured = []
self.not_configured = []
for daemon_link in self.all_daemons_links:
if daemon_link == self.arbiter_link:
# I exclude myself from the polling, sure I am reachable ;)
Expand Down Expand Up @@ -245,27 +248,38 @@ def check_reachable(self, forced=False, test=False):
daemon_link.type, daemon_link.name, daemon_link.cfg_managed)
if not self.first_dispatch_done:
# I just (re)started the arbiter
not_configured.append(daemon_link)
self.not_configured.append(daemon_link)
else:
# No managed configuration - a new dispatching is necessary but only
# if we already dispatched a configuration
# Probably a freshly restarted daemon ;)
logger.info("The %s %s do not have a configuration",
daemon_link.type, daemon_link.name)
# the daemon is not yet configured
not_configured.append(daemon_link)
self.not_configured.append(daemon_link)
# # Ask to wait for a new configuration
# daemon_link.wait_new_conf()
daemon_link.configuration_sent = False
else:
# Got a timeout !
not_configured.append(daemon_link)
self.not_configured.append(daemon_link)

if not_configured and self.new_to_dispatch and not self.first_dispatch_done:
if self.not_configured and self.new_to_dispatch and not self.first_dispatch_done:
logger.info("Dispatcher, those daemons are not configured: %s, "
"and a configuration is ready to dispatch, run the dispatching...",
','.join(d.name for d in not_configured))
','.join(d.name for d in self.not_configured))
self.dispatch_ok = False
self.dispatch(test=test)

elif self.not_configured and self.first_dispatch_done:
logger.info("Dispatcher, those daemons are not configured: %s, "
"and a configuration has yet been dispatched dispatch, "
"a new dispatch is required...",
','.join(d.name for d in self.not_configured))
self.dispatch_ok = False
# Avoid exception because dispatch is not accepted!
self.new_to_dispatch = True
self.first_dispatch_done = False
self.dispatch(test=test)

return all_ok
Expand Down Expand Up @@ -794,10 +808,12 @@ def dispatch(self, test=False): # pylint: disable=too-many-branches
:return: None
"""
if not self.new_to_dispatch:
logger.info("1")
raise DispatcherError("Dispatcher cannot dispatch, "
"because no configuration is prepared!")

if self.first_dispatch_done:
logger.info("2")
raise DispatcherError("Dispatcher cannot dispatch, "
"because the configuration is still dispatched!")

Expand Down Expand Up @@ -921,5 +937,7 @@ def stop_request(self, stop_now=False):

all_ok = all_ok and stop_ok

daemon_link.stopping = True

self.stop_request_sent = all_ok
return self.stop_request_sent
4 changes: 4 additions & 0 deletions alignak/objects/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ class Config(Item): # pylint: disable=R0904,R0902
'daemons_initial_port':
IntegerProp(default=7800),

# Kill launched daemons on communication failure
'daemons_failure_kill':
BoolProp(default=True),

'daemons_check_period':
IntegerProp(default=5),

Expand Down
28 changes: 22 additions & 6 deletions alignak/objects/satellitelink.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ class SatelliteLink(Item):
'have_conf': # The daemon has received a configuration
BoolProp(default=False, fill_brok=['full_status']),

'stopping': # The daemon is requested to stop
BoolProp(default=False, fill_brok=['full_status']),

'running_id': # The running identifier of my related daemon
FloatProp(default=0, fill_brok=['full_status']),

Expand Down Expand Up @@ -551,13 +554,22 @@ def add_failed_check_attempt(self, reason=''):
# Don't need to warn again and again if the satellite is already dead
# Only warn when it is alive
if self.alive:
logger.warning("Add failed attempt for %s (%d/%d) - %s",
self.name, self.attempt, self.max_check_attempts, reason)
if not self.stopping:
logger.warning("Add failed attempt for %s (%d/%d) - %s",
self.name, self.attempt, self.max_check_attempts, reason)
else:
logger.info("Stopping... failed attempt for %s (%d/%d) - also probably stopping",
self.name, self.attempt, self.max_check_attempts)

# If we reached the maximum attempts, set the daemon as dead
if self.attempt >= self.max_check_attempts:
logger.warning("Set %s as dead, too much failed attempts (%d), last problem is: %s",
self.name, self.max_check_attempts, reason)
if not self.stopping:
logger.warning("Set %s as dead, too much failed attempts (%d), last problem is: %s",
self.name, self.max_check_attempts, reason)
else:
logger.info("Stopping... set %s as dead, too much failed attempts (%d)",
self.name, self.max_check_attempts)

self.set_dead()

def valid_connection(*outer_args, **outer_kwargs):
Expand Down Expand Up @@ -599,8 +611,12 @@ def decorated(*args, **kwargs): # pylint: disable=missing-docstring
except HTTPClientConnectionException as exp:
# A Connection error is raised when the daemon connection cannot be established
# No way with the configuration parameters!
logger.warning("A daemon (%s/%s) that we must be related with "
"cannot be connected: %s", link.type, link.name, exp)
if not link.stopping:
logger.warning("A daemon (%s/%s) that we must be related with "
"cannot be connected: %s", link.type, link.name, exp)
else:
logger.info("Stopping... daemon (%s/%s) cannot be connected. "
"It is also probably stopping.", link.type, link.name)
link.set_dead()
except (LinkError, HTTPClientTimeoutException) as exp:
link.add_failed_check_attempt("Connection timeout "
Expand Down
3 changes: 2 additions & 1 deletion alignak/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2250,7 +2250,8 @@ def find_item_by_id(self, object_id):
if object_id in items:
return items[object_id]

raise AttributeError("Item with id %s not found" % object_id) # pragma: no cover,
# raise AttributeError("Item with id %s not found" % object_id) # pragma: no cover,
logger.error("Item with id %s not found", object_id) # pragma: no cover,
# simple protection this should never happen

def before_run(self):
Expand Down
2 changes: 2 additions & 0 deletions etc/alignak.ini
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ polling_interval=5
# The arbiter is checking the running daemons every daemons_check_period seconds
# - the checking only concerns the daemons that got started by the arbiter
daemons_check_period=5
# Daemons failure kill all daemons
;daemons_failure_kill=1
# Graceful stop delay
# - beyond this period, the arbiter will force kill the daemons that it launched
daemons_stop_timeout=10
Expand Down
1 change: 1 addition & 0 deletions test/alignak_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def _stop_alignak_daemons(self, arbiter_only=True):
if arbiter_only and name not in ['arbiter-master']:
continue
if proc.pid == self.my_pid:
print("- do not kill myself!")
continue
print("Asking %s (pid=%d) to end..." % (name, proc.pid))
try:
Expand Down
4 changes: 2 additions & 2 deletions test/cfg/run_realms/alignak.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ _dist_LOG=%(_dist)s
config_name=Alignak configuration

#-- Username and group to run (defaults to current user)
;user=alignak
;group=alignak
;;;;;user=alignak
;;;;;group=alignak
# Disabling security means allowing the daemons to run under root account
# idontcareaboutsecurity=0

Expand Down
4 changes: 2 additions & 2 deletions test/cfg/run_realms_manage_sub_realms/alignak.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ _dist_LOG=%(_dist)s
config_name=Alignak configuration

#-- Username and group to run (defaults to current user)
;user=alignak
;group=alignak
;;;user=alignak
;;;group=alignak
# Disabling security means allowing the daemons to run under root account
# idontcareaboutsecurity=0

Expand Down
Loading

0 comments on commit f6fadf6

Please sign in to comment.