From 3746e5ae0f68ac0b24ea439bd6bf484bc1de30ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Magalh=C3=A3es?= Date: Fri, 5 Oct 2018 18:41:46 +0100 Subject: [PATCH] better peer update structure --- src/appier/base.py | 135 ++++++++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 44 deletions(-) diff --git a/src/appier/base.py b/src/appier/base.py index 78a3c34b..3d0beb08 100644 --- a/src/appier/base.py +++ b/src/appier/base.py @@ -4335,52 +4335,14 @@ def _load_supervisor(self): # runs the initial bind operations for both the update and the # (new) peer events responsible for the global status consistency - self.bind_bus("update_peers", self.on_discover_peers) - self.bind_bus("peer", self.on_peer) + self.bind_bus("update_peers", self._on_update_peers) + self.bind_bus("peer", self._on_peer) def _unload_supervisor(self): - self.unbind_bus("update_peers", self.on_discover_peers) - self.unbind_bus("peer", self.on_peer) - - def _schedule_peers(self, timeout = 60.0): - if not self.get_bus_d(): return - self._cleanup_peers(timeout = timeout * 2) - self.trigger_bus("update_peers") - self.schedule( - self._schedule_peers, - timeout = timeout, - kwargs = dict(timeout = timeout) - ) - - def _cleanup_peers(self, timeout = 120.0): - current = time.time() - target = current - timeout - for uid, peer in legacy.items(self._peers): - if peer["ping"] > target: continue - del self._peers[uid] - - def on_discover_peers(self): - self.trigger_bus( - "peer", - data = dict( - uid = self.uid, - name = self.name, - name_b = self.name_b, - name_i = self.name_i, - info_dict = self.info_dict() - ) - ) - - def on_peer(self, data = None): - if not data: return - uid = data.get("uid", None) - if not uid: return - if uid == self.uid: return - peer = self._peers.get("uid", None) - if not peer: peer = dict() - peer["data"] = data - peer["ping"] = time.time() - self._peers[uid] = peer + # runs the unbind operation for the global events related with the + # peer updating operations (no longer needed) + self.unbind_bus("update_peers", self._on_update_peers) + self.unbind_bus("peer", self._on_peer) def _add_handlers(self, logger): for handler in self.handlers: @@ -4602,7 +4564,92 @@ def _set_global(self): def _set_variables(self): self.description = self._description() + def _schedule_peers(self, timeout = 60.0): + """ + Runs the scheduling of the peers discovery operation for + the current tick and at the end of its execution schedules + a new one according to the provided timeout. + + :type timeout: float + :param timeout: The number of seconds until the next peers + scheduling operation should be performed. + """ + + if not self.get_bus_d(): return + self._refresh_peers(timeout = timeout * 2) + self.trigger_bus("update_peers") + self.schedule( + self._schedule_peers, + timeout = timeout, + kwargs = dict(timeout = timeout) + ) + + def _refresh_peers(self, timeout = 120.0): + """ + Runs the house keeping operation on the peers structure so + that for instance old peers are removed once a certain timeout + threshold is reached. + + It should be considered a garbage collection operation. + + :type timeout: float + :param timeout: The maximum number of seconds waiting for a "ping" + operation from a peer until it should be considered expired. + """ + + current = time.time() + target = current - timeout + for uid, peer in legacy.items(self._peers): + if peer["ping"] > target: continue + del self._peers[uid] + + def _on_update_peers(self): + """ + Callback method triggered when a request for an update operation + about the peers on the bus is performed. + + Should send the information on the current peer to the bus. + """ + + self.trigger_bus( + "peer", + data = dict( + uid = self.uid, + name = self.name, + name_b = self.name_b, + name_i = self.name_i, + info_dict = self.info_dict() + ) + ) + + def _on_peer(self, data = None): + """ + Callback method to be called when the information regarding + a new peer is received on the currently (shared) bus. + + :type data: Dictionary + :param data: The map contain the detailed information on the + peer (it should have been send on the "wire"). + """ + + if not data: return + uid = data.get("uid", None) + if not uid: return + if uid == self.uid: return + peer = self._peers.get("uid", None) + if not peer: peer = dict() + peer["data"] = data + peer["ping"] = time.time() + self._peers[uid] = peer + def _apply_config(self): + """ + Applies a series of configuration related values to the current + instance should be able to normalized many of its values. + + Some of the values are constructed by appending multiple values. + """ + self.instance = config.conf("INSTANCE", None) self.instance = config.conf("PROFILE", self.instance) self.name = config.conf("NAME", self.name)