Skip to content

Commit

Permalink
better peer update structure
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Oct 5, 2018
1 parent fdacaef commit 3746e5a
Showing 1 changed file with 91 additions and 44 deletions.
135 changes: 91 additions & 44 deletions src/appier/base.py
Expand Up @@ -4335,52 +4335,14 @@ def _load_supervisor(self):

# runs the initial bind operations for both the update and the
# (new) peer events responsible for the global status consistency
self.bind_bus("update_peers", self.on_discover_peers)
self.bind_bus("peer", self.on_peer)
self.bind_bus("update_peers", self._on_update_peers)
self.bind_bus("peer", self._on_peer)

def _unload_supervisor(self):
self.unbind_bus("update_peers", self.on_discover_peers)
self.unbind_bus("peer", self.on_peer)

def _schedule_peers(self, timeout = 60.0):
if not self.get_bus_d(): return
self._cleanup_peers(timeout = timeout * 2)
self.trigger_bus("update_peers")
self.schedule(
self._schedule_peers,
timeout = timeout,
kwargs = dict(timeout = timeout)
)

def _cleanup_peers(self, timeout = 120.0):
current = time.time()
target = current - timeout
for uid, peer in legacy.items(self._peers):
if peer["ping"] > target: continue
del self._peers[uid]

def on_discover_peers(self):
self.trigger_bus(
"peer",
data = dict(
uid = self.uid,
name = self.name,
name_b = self.name_b,
name_i = self.name_i,
info_dict = self.info_dict()
)
)

def on_peer(self, data = None):
if not data: return
uid = data.get("uid", None)
if not uid: return
if uid == self.uid: return
peer = self._peers.get("uid", None)
if not peer: peer = dict()
peer["data"] = data
peer["ping"] = time.time()
self._peers[uid] = peer
# runs the unbind operation for the global events related with the
# peer updating operations (no longer needed)
self.unbind_bus("update_peers", self._on_update_peers)
self.unbind_bus("peer", self._on_peer)

def _add_handlers(self, logger):
for handler in self.handlers:
Expand Down Expand Up @@ -4602,7 +4564,92 @@ def _set_global(self):
def _set_variables(self):
self.description = self._description()

def _schedule_peers(self, timeout = 60.0):
"""
Runs the scheduling of the peers discovery operation for
the current tick and at the end of its execution schedules
a new one according to the provided timeout.
:type timeout: float
:param timeout: The number of seconds until the next peers
scheduling operation should be performed.
"""

if not self.get_bus_d(): return
self._refresh_peers(timeout = timeout * 2)
self.trigger_bus("update_peers")
self.schedule(
self._schedule_peers,
timeout = timeout,
kwargs = dict(timeout = timeout)
)

def _refresh_peers(self, timeout = 120.0):
"""
Runs the house keeping operation on the peers structure so
that for instance old peers are removed once a certain timeout
threshold is reached.
It should be considered a garbage collection operation.
:type timeout: float
:param timeout: The maximum number of seconds waiting for a "ping"
operation from a peer until it should be considered expired.
"""

current = time.time()
target = current - timeout
for uid, peer in legacy.items(self._peers):
if peer["ping"] > target: continue
del self._peers[uid]

def _on_update_peers(self):
"""
Callback method triggered when a request for an update operation
about the peers on the bus is performed.
Should send the information on the current peer to the bus.
"""

self.trigger_bus(
"peer",
data = dict(
uid = self.uid,
name = self.name,
name_b = self.name_b,
name_i = self.name_i,
info_dict = self.info_dict()
)
)

def _on_peer(self, data = None):
"""
Callback method to be called when the information regarding
a new peer is received on the currently (shared) bus.
:type data: Dictionary
:param data: The map contain the detailed information on the
peer (it should have been send on the "wire").
"""

if not data: return
uid = data.get("uid", None)
if not uid: return
if uid == self.uid: return
peer = self._peers.get("uid", None)
if not peer: peer = dict()
peer["data"] = data
peer["ping"] = time.time()
self._peers[uid] = peer

def _apply_config(self):
"""
Applies a series of configuration related values to the current
instance should be able to normalized many of its values.
Some of the values are constructed by appending multiple values.
"""

self.instance = config.conf("INSTANCE", None)
self.instance = config.conf("PROFILE", self.instance)
self.name = config.conf("NAME", self.name)
Expand Down

0 comments on commit 3746e5a

Please sign in to comment.