Skip to content

Commit

Permalink
Merge pull request #33 from hivesolutions/hypervisor
Browse files Browse the repository at this point in the history
Supervisor
  • Loading branch information
joamag committed Oct 6, 2018
2 parents 4b27c27 + 0504c9e commit 2181f65
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 7 deletions.
5 changes: 5 additions & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ a federated environment and its orchestration using an event driven approach (eg
* `SCHEDULER_TIMEOUT` (`float`) - Determines the number of seconds between tick operation loops (default: `60.0`)
* `SCHEDULER_DAEMON` (`bool`) - Defines if the scheduler thread should be run as a daemon (default: `True`)

#### Supervisor

* `SUPERVISOR_INTERVAL` (`float`) - The number of seconds in between peer checking, this is also going to be used to
control the timeout on the peer health check operation (default: `60.0`)

#### Debug

* `EXTENDED_PATH` (`bool`) - If the file path URL should be set for every traceback line (default: `True`)
Expand Down
132 changes: 130 additions & 2 deletions src/appier/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def __init__(
self.touch_time = None
self.sort_headers = False
self.secure_headers = True
self.uid = str(uuid.uuid4())
self.random = str(uuid.uuid4())
self.secret = self.random
self.cache = datetime.timedelta(seconds = cache_s)
Expand Down Expand Up @@ -426,6 +427,7 @@ def __init__(
self._user_routes = None
self._core_routes = None
self._own = self
self._peers = {}
self.__routes = []
self.load(level = level, handlers = handlers)

Expand Down Expand Up @@ -616,6 +618,7 @@ def norm_route(
return [method, re.compile(expression, re.UNICODE), function, context, opts]

def load(self, *args, **kwargs):
if self._loaded: return
level = kwargs.get("level", None)
handlers = kwargs.get("handlers", None)
self._set_global()
Expand All @@ -642,11 +645,14 @@ def load(self, *args, **kwargs):
self._load_parts()
self._load_libraries()
self._load_patches()
self._load_supervisor()
self._set_config()
self._set_variables()
self._loaded = True

def unload(self, *args, **kwargs):
if not self._loaded: return
self._unload_supervisor()
self._unload_parts()
self._unload_models()
self._unload_execution()
Expand All @@ -664,6 +670,7 @@ def start(self, refresh = True):
self.start_date = datetime.datetime.utcnow()
self.touch_time = "t=%d" % self.start_time
self._start_models()
self._start_supervisor()
if refresh: self.refresh()
if self.manager: self.manager.start()
self.status = RUNNING
Expand All @@ -674,6 +681,7 @@ def stop(self, refresh = True):
self._print_bye()
self.tid = None
self._stop_models()
self._stop_supervisor()
if refresh: self.refresh()
self.status = STOPPED
self.trigger("stop")
Expand Down Expand Up @@ -1982,8 +1990,11 @@ def callable_t():
if timeout == 0: return callable()

# creates the thread to be used for the callable calling and
# starts it for asynchronous calling of the callable
# starts it for asynchronous calling of the callable, notice
# that that the thread is marked as daemon (avoiding problems
# with the exist of the current process)
thread = threading.Thread(target = callable_t)
thread.daemon = True
thread.start()

def chunks(self, data, size = 32768):
Expand Down Expand Up @@ -2949,7 +2960,7 @@ def trigger_bus(self, name, *args, **kwargs):

def get_uptime(self):
current_date = datetime.datetime.utcnow()
delta = current_date - self.start_date
delta = current_date - (self.start_date if self.start_time else current_date)
return delta

def get_uptime_s(self, count = 2):
Expand Down Expand Up @@ -4317,6 +4328,18 @@ def _load_patches(self):
output_charset = "utf-8"
)

def _load_supervisor(self):
# runs the initial bind operations for both the update and the
# (new) peer events responsible for the global status consistency
self.bind_bus("update_peers", self._on_update_peers)
self.bind_bus("peer", self._on_peer)

def _unload_supervisor(self):
# runs the unbind operation for the global events related with the
# peer updating operations (no longer needed)
self.unbind_bus("update_peers", self._on_update_peers)
self.unbind_bus("peer", self._on_peer)

def _add_handlers(self, logger):
for handler in self.handlers:
if not handler: continue
Expand Down Expand Up @@ -4484,6 +4507,16 @@ def _stop_models(self):
for model in self.models_l:
model.unregister(lazy = self.lazy)

def _start_supervisor(self):
# retrieves the global supervisor interval value and uses
# it as the base timeout on the supervisor by starting the
# first update operation
interval = config.conf("SUPERVISOR_INTERVAL", 60.0, cast = float)
self._schedule_peers(timeout = interval)

def _stop_supervisor(self):
pass

def _add_route(self, *args, **kwargs):
self.__routes.append(args)
self.clear_routes()
Expand Down Expand Up @@ -4538,6 +4571,13 @@ def _set_variables(self):
self.description = self._description()

def _apply_config(self):
"""
Applies a series of configuration related values to the current
instance should be able to normalized many of its values.
Some of the values are constructed by appending multiple values.
"""

self.instance = config.conf("INSTANCE", None)
self.instance = config.conf("PROFILE", self.instance)
self.name = config.conf("NAME", self.name)
Expand Down Expand Up @@ -4642,6 +4682,94 @@ def _set_url(self):
self.local_url = prefix + "localhost"
if not default_port: self.local_url += ":%d" % port

def _schedule_peers(self, timeout = 60.0):
"""
Runs the scheduling of the peers discovery operation for
the current tick and at the end of its execution schedules
a new one according to the provided timeout.
:type timeout: float
:param timeout: The number of seconds until the next peers
scheduling operation should be performed.
"""

if not self.get_bus_d(): return
self._refresh_peers(timeout = timeout * 2)
self.trigger_bus("update_peers")
self.schedule(
self._schedule_peers,
timeout = timeout,
kwargs = dict(timeout = timeout)
)

def _refresh_peers(self, timeout = 120.0):
"""
Runs the house keeping operation on the peers structure so
that for instance old peers are removed once a certain timeout
threshold is reached.
It should be considered a garbage collection operation.
:type timeout: float
:param timeout: The maximum number of seconds waiting for a "ping"
operation from a peer until it should be considered expired.
"""

current = time.time()
target = current - timeout
for uid, peer in legacy.items(self._peers):
if peer["ping"] > target: continue
del self._peers[uid]

def _send_peer(self):
"""
"Sends" the information on the current peer (instance) to the
shared bus, so that the other peers in the mesh are notified
about the existence of the current instance/process.
"""

if not self.get_bus_d(): return
self.trigger_bus(
"peer",
data = dict(
uid = self.uid,
name = self.name,
name_b = self.name_b,
name_i = self.name_i,
info_dict = self.info_dict()
)
)

def _on_update_peers(self):
"""
Callback method triggered when a request for an update operation
about the peers on the bus is performed.
Should send the information on the current peer to the bus.
"""

self._send_peer()

def _on_peer(self, data = None):
"""
Callback method to be called when the information regarding
a new peer is received on the currently (shared) bus.
:type data: Dictionary
:param data: The map contain the detailed information on the
peer (it should have been send on the "wire").
"""

if not data: return
uid = data.get("uid", None)
if not uid: return
if uid == self.uid: return
peer = self._peers.get("uid", None)
if not peer: peer = dict()
peer["data"] = data
peer["ping"] = time.time()
self._peers[uid] = peer

def _base_locale(self, fallback = "en_us"):
"""
Retrieves the locale considered to to be the base one for
Expand Down
10 changes: 6 additions & 4 deletions src/appier/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def bind(self, name, method):
methods.append(method)
self._events[name] = methods
channel = self._to_channel(name)
self._pubsub.subscribe(self._name + channel)
self._pubsub.subscribe(channel)

def unbind(self, name, method = None):
methods = self._events.get(name, [])
Expand All @@ -123,7 +123,7 @@ def trigger(self, name, *args, **kwargs):

def _load(self, *args, **kwargs):
Bus._load(self, *args, **kwargs)
self._name = config.conf("BUS_NAME", "global")
self._name = config.conf("BUS_NAME", self.owner.name_i)
self._name = config.conf("BUS_SCOPE", self._name)
self._name = kwargs.pop("name", self._name)
self._serializer = kwargs.pop("serializer", self.__class__.SERIALIZER)
Expand All @@ -138,10 +138,11 @@ def _unload(self, *args, **kwargs):

def _open(self):
cls = self.__class__
channel = self._to_channel(self._global_channel)
self._redis = redisdb.get_connection()
self._redis.ping()
self._pubsub = self._redis.pubsub()
self._pubsub.subscribe(self._name + self._global_channel)
self._pubsub.subscribe(channel)
self._listener = RedisListener(self)
self._listener.start()

Expand Down Expand Up @@ -177,12 +178,13 @@ def _loop(self, safe = True):
)

def _to_channel(self, name):
return self.owner.name_i + ":" + name
return self._name + ":" + name

class RedisListener(threading.Thread):

def __init__(self, bus):
threading.Thread.__init__(self)
self.daemon = True
self._bus = bus

def run(self):
Expand Down
2 changes: 1 addition & 1 deletion src/appier/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def set_alias(self):
self.args["sid"] = self.args[alias]

def set_session(self, create = False):
# tries to retrieves the session id (sid) from all the
# tries to retrieves the session id (SID) from all the
# possible sources so that something may be used in the
# identification of the current request
sid = self.cookies.get("sid", None)
Expand Down

0 comments on commit 2181f65

Please sign in to comment.