Permalink
Browse files

CentralAgent status is now based on Presence

This is way more robust than dealing with a keepalive that can be
delayed if the entity is busy in the xmpp.Process().
  • Loading branch information...
CyrilPeponnet committed Mar 16, 2016
1 parent ac8a04a commit e0c1cc96228b395d6baf5363ebed4e05fc4982bd
@@ -35,7 +35,7 @@
ARCHIPEL_ERROR_CODE_CENTRALAGENT = 123
ARCHIPEL_CENTRAL_AGENT_TIMEOUT = 120
ARCHIPEL_CENTRAL_AGENT_TIMEOUT = 60
class TNTasks(object):
"""Timed jobs tasker"""
@@ -134,16 +134,27 @@ def central_agent_jid(self):
"""
Returns the jid of the central agent. In case we are a VM, query hypervisor.
"""
# this could happen under heavy processing
if self.last_keepalive_heard and (datetime.datetime.now() - self.last_keepalive_heard).seconds > ARCHIPEL_CENTRAL_AGENT_TIMEOUT:
if not self.central_agent_jid_val:
return None
if self.entity.__class__.__name__ == "TNArchipelHypervisor":
# if central agent has a status, it's available
if self.entity.roster.getStatus(self.central_agent_jid_val.getStripped()):
return self.central_agent_jid_val
# If presence is not known check if we hit the keepalive threshold timeout
# This could append when you restart it for example
# Time interval is not an exact science so let's double it for timetout
if self.last_keepalive_heard and (datetime.datetime.now() - self.last_keepalive_heard).total_seconds() > self.keepalive_interval * 2:
self.entity.log.error("CENTRALDB: CentralAgent is down.")
print self.last_keepalive_heard
print (datetime.datetime.now() - self.last_keepalive_heard).total_seconds()
self.central_agent_jid_val = None
return None
else:
return self.entity.hypervisor.get_plugin("centraldb").central_agent_jid_val
self.entity.log.warning("CENTRALDB: CentralAgent looks down. Can't retrieve its presence")
return self.central_agent_jid_val
def handle_central_keepalive_event(self,event):
def handle_central_keepalive_event(self, event):
"""
Called when the central agent sends a keepalive.
@type event: xmpp.Node
@@ -172,9 +183,8 @@ def handle_central_keepalive_event(self,event):
self.delayed_tasks.add((self.hypervisor_timeout_threshold - self.keepalive_interval)*2/3, self.push_statistics_to_centraldb)
if old_central_agent_jid == None:
self.delayed_tasks.add(self.keepalive_interval, self.handle_first_keepalive, {'keepalive_jid':keepalive_jid})
if central_announcement_event.getAttr("force_update") == "true" or keepalive_jid != old_central_agent_jid:
self.delayed_tasks.add(self.keepalive_interval, self.handle_first_keepalive, {'keepalive_jid':keepalive_jid, 'callback': self.push_vms_in_central_db, 'kwargs':{'central_announcement_event':central_announcement_event}})
elif central_announcement_event.getAttr("force_update") == "true" or keepalive_jid != old_central_agent_jid:
self.delayed_tasks.add(self.keepalive_interval, self.push_vms_in_central_db, {'central_announcement_event':central_announcement_event})
def push_statistics_to_centraldb(self):
@@ -195,7 +205,7 @@ def push_statistics_to_centraldb(self):
self.update_hypervisors([stats_results])
def handle_first_keepalive(self, keepalive_jid):
def handle_first_keepalive(self, keepalive_jid, callback=None, kwargs={}):
"""
this is the first keepalive. We query hypervisors that have vm entities somewhere else
then we trigger method manage_persistence to instantiate vm entities.
@@ -218,6 +228,8 @@ def handle_first_keepalive(self, keepalive_jid):
def _get_existing_vms_instances_callback(conn, packed_vms):
existing_vms_entities = self.unpack_entries(packed_vms)
self.entity.manage_persistence(vms_from_local_db, existing_vms_entities)
if callback:
callback(**kwargs)
self.entity.xmppclient.SendAndCallForResponse(iq, _get_existing_vms_instances_callback)
@@ -435,5 +447,3 @@ def plugin_info():
"identifier": plugin_identifier,
"configuration-section": plugin_configuration_section,
"configuration-tokens": plugin_configuration_tokens}
@@ -273,8 +273,8 @@ def update_presence(self, origin=None, user_info=None, parameters=None, presence
else:
status = u"%s (%s) — no VT" % (ARCHIPEL_XMPP_SHOW_ONLINE, minor_info)
if self.get_plugin("centraldb") and self.get_plugin("centraldb").central_agent_jid():
status = u'\u26AD '+ status
if self.get_plugin("centraldb") and self.get_plugin("centraldb").central_agent_jid_val:
status = u'\u26AD ' + status
if status != self.xmppstatus:
self.change_presence(self.xmppstatusshow, status)
@@ -1356,15 +1356,14 @@ def on_xmpp_loop_tick(self):
if self.get_plugin("centraldb"):
if self.get_plugin("centraldb").central_agent_jid():
if not self.seen_central_agent:
self.log.info("HYPERVISOR: Central Agent detected %s, using central database with keep alive set to %ss" % ( self.get_plugin("centraldb").central_agent_jid(), self.get_plugin("centraldb").keepalive_interval))
self.log.info("HYPERVISOR: Central Agent detected %s, using central database with keep alive set to %ss" % ( self.get_plugin("centraldb").central_agent_jid_val, self.get_plugin("centraldb").keepalive_interval))
self.seen_central_agent = True
return
elif self.seen_central_agent:
self.log.error("HYPERVISOR: Central Agent timeout after %s seconds. We didn't receive the keepalive." % self.get_plugin("centraldb").keepalive_interval)
self.update_presence()
self.seen_central_agent = False
waiting_for = (datetime.datetime.now() - self.last_keepalive_from_central_agent).seconds
waiting_for = int((datetime.datetime.now() - self.last_keepalive_from_central_agent).total_seconds())
if waiting_for < self.get_plugin("centraldb").keepalive_interval:
status = "Waiting %ss for central-agent" % (self.get_plugin("centraldb").keepalive_interval - waiting_for)
self.update_presence(presence_msg=status)
@@ -789,10 +789,10 @@ def check_hyps(self):
for row in rows:
jid, last_seen, status = row
last_seen_date = datetime.datetime.strptime(last_seen, "%Y-%m-%d %H:%M:%S.%f")
if (now - last_seen_date).days*86400 + (now - last_seen_date).seconds > self.hypervisor_timeout_threshold and status == "Online":
if (now - last_seen_date).total_seconds() > self.hypervisor_timeout_threshold and status == "Online":
self.log.warning("CENTRALAGENT: Hypervisor %s timed out" % jid)
hypervisor_to_update.append({"jid": jid, "status": "Unreachable"})
elif (now - last_seen_date).days*86400 + (now - last_seen_date).seconds <= self.hypervisor_timeout_threshold and status == "Unreachable":
elif (now - last_seen_date).total_seconds() <= self.hypervisor_timeout_threshold and status == "Unreachable":
self.log.info("CENTRALAGENT: Hypervisor %s is back up Online" % jid)
hypervisor_to_update.append({"jid": jid, "status": "Online"})
@@ -820,10 +820,10 @@ def on_xmpp_loop_tick(self):
if not self.is_central_agent and self.central_agent_mode == "auto":
self.become_central_agent()
elif self.is_central_agent: # we are central agent
if (datetime.datetime.now() - self.last_keepalive_sent).seconds >= self.keepalive_interval:
if (datetime.datetime.now() - self.last_keepalive_sent).total_seconds() >= self.keepalive_interval:
self.central_keepalive_pubsub.add_item(self.keepalive_event_with_date())
self.last_keepalive_sent = datetime.datetime.now()
if self.ping_hypervisors:
if (datetime.datetime.now() - self.last_hyp_check).seconds >= self.hypervisor_check_interval:
if (datetime.datetime.now() - self.last_hyp_check).total_seconds() >= self.hypervisor_check_interval:
self.check_hyps()
@@ -144,12 +144,14 @@ ping_hypervisors = True
keepalive_interval = 10
# Hypervisor timeout threshold in second
# This must be at least three time the keepalive_interval
# This should be at least three time the keepalive_interval
# When a keepalive is received the hypervisor will delay the response as
# (hypervisor_timeout_threshold - keepalive_interval) * 2/3 to avoid iq storm
# in big setup.
hypervisor_timeout_threshold = 120
# The time between two hypervisor check in database.
# if a hypervisor reach the timeout value below it will be marked as offline.
# and it's vm visible from other hypervisors in the parking.
# This must be not more than the half of hypervisor_timeout_threshold
hypervisor_check_interval = 60

0 comments on commit e0c1cc9

Please sign in to comment.