From f52c157c4d869326459cc6b52f08ff172efeaf0e Mon Sep 17 00:00:00 2001 From: Nuno Pereira Date: Tue, 9 Jan 2024 11:38:39 +0000 Subject: [PATCH] fix: small refactor: env_vars to env and ProgramStats to ProgramRunInfo --- arena/arena_mqtt.py | 18 +++++---- arena/device.py | 2 +- arena/{env_vars.py => env.py} | 26 ++++++++++++- arena/objects/arena_object.py | 5 ++- arena/objects/program.py | 29 +++----------- arena/scene.py | 35 ++++++++++++----- arena/utils/program_info.py | 72 +++++++++++++++++++++++++++++++++++ 7 files changed, 142 insertions(+), 45 deletions(-) rename arena/{env_vars.py => env.py} (82%) create mode 100644 arena/utils/program_info.py diff --git a/arena/arena_mqtt.py b/arena/arena_mqtt.py index c00ab648..19958da9 100644 --- a/arena/arena_mqtt.py +++ b/arena/arena_mqtt.py @@ -12,15 +12,16 @@ from .auth import ArenaAuth from .event_loop import * -from .utils import ProgramStats +from .utils import ProgramRunInfo -from .env_vars import ( +from .env import ( MQTTH, REALM, ARENA_USERNAME, ARENA_PASSWORD, NAMESPACE ) + class ArenaMQTT(object): """ Wrapper around Paho MQTT client and EventLoop. @@ -152,8 +153,8 @@ def __init__( self.mqttc.on_disconnect = self.on_disconnect self.mqttc.on_publish = self.on_publish - # setup program stats collection - self.stats = ProgramStats(self.event_loop, update_callback=self.stats_update) + # setup program run info to collect stats + self.run_info = ProgramRunInfo(self.event_loop, update_callback=self.run_info_update) # add main message processing + callbacks loop to tasks self.run_async(self.process_message) @@ -313,7 +314,7 @@ def on_message(self, client, userdata, msg): # ignore own messages if mqtt.topic_matches_sub(self.ignore_topic, msg.topic): return - self.stats.msg_rcv() + self.run_info.msg_rcv() self.msg_queue.put_nowait(msg) async def process_message(self): @@ -333,7 +334,7 @@ def disconnect(self): self.mqttc.disconnect() def on_publish(self, client, userdata, mid): - self.stats.msg_publish() + self.run_info.msg_publish() def message_callback_add(self, sub, callback): """Subscribes to new topic and adds callback""" @@ -345,5 +346,6 @@ def message_callback_remove(self, sub): self.mqttc.unsubscribe(sub) self.mqttc.message_callback_remove(sub) - def stats_update(self): - raise NotImplementedError("Must override stats_update") + def run_info_update(self, stats): + """Callbak when program info/stats are updated; publish program object update""" + raise NotImplementedError("Must override run_info_update") diff --git a/arena/device.py b/arena/device.py index 5776d75a..dcf842bf 100644 --- a/arena/device.py +++ b/arena/device.py @@ -5,7 +5,7 @@ from .arena_mqtt import ArenaMQTT -from .env_vars import ( +from .env import ( DEVICE, ) diff --git a/arena/env_vars.py b/arena/env.py similarity index 82% rename from arena/env_vars.py rename to arena/env.py index db66b04c..4d5ba4cf 100644 --- a/arena/env_vars.py +++ b/arena/env.py @@ -1,6 +1,8 @@ # Environment variables definitions -# When applicable, cariable defaults are defined by ENV_DEFAULTS +# When applicable, variable defaults are defined by ENV_DEFAULTS # +import sys +import os MQTTH = "MQTTH" """ @@ -117,9 +119,29 @@ Default: "info". """ +PROGRAM_STATS_UPDATE_INTERVAL_MS = "PROGRAM_STATS_UPDATE_INTERVAL_MS" +""" +.. envvar:: PROGRAM_STATS_UPDATE_INTERVAL_MS + +The :envvar:`PROGRAM_STATS_UPDATE_INTERVAL_MS` environment variable defines how often program +stats are published + +Default: 5000. +""" + # env variables defaults ENV_DEFAULTS = { ENABLE_INTERPRETER: 'false', OTLP_ENDPOINT: 'http://localhost:4317', OTEL_LOG_LEVEL: 'info', -} \ No newline at end of file + PROGRAM_STATS_UPDATE_INTERVAL_MS: 5000 +} + +def _get_env(all=False): + skip = ('os', 'sys') + env = {} + # get variables defined in this module; skip credentials, private data and imports + if not all: skip = ( ARENA_PASSWORD, ARENA_USERNAME, 'os', 'sys' ) + for key in [ v for v in dir(sys.modules[__name__]) if not v.startswith('_') and v not in skip]: + env[key] = os.environ.get(key) + return env diff --git a/arena/objects/arena_object.py b/arena/objects/arena_object.py index 0e8134a9..9bc92227 100644 --- a/arena/objects/arena_object.py +++ b/arena/objects/arena_object.py @@ -198,8 +198,9 @@ def add(cls, obj): def remove(cls, obj): object_id = obj.object_id del Object.all_objects[object_id] - for task in obj.delayed_prop_tasks.values(): # Cancel all pending tasks - task.cancel() + if (hasattr(obj, "delayed_prop_tasks")): + for task in obj.delayed_prop_tasks.values(): # Cancel all pending tasks + task.cancel() @classmethod def exists(cls, object_id): diff --git a/arena/objects/program.py b/arena/objects/program.py index f3feb656..cae4590a 100644 --- a/arena/objects/program.py +++ b/arena/objects/program.py @@ -4,7 +4,7 @@ import uuid import os -from ..env_vars import ( +from ..env import ( PROGRAM_OBJECT_ID, ) @@ -21,21 +21,13 @@ class Program(BaseObject): :param str[] args: Command-line arguments (passed in argv); e.g. [ "arg=value" ]. :param str[] env: Environment variables; e.g. [ "SCENE=ascene" ]. - - Uses env variable PROGRAM_OBJECT_ID to identify persist program object that represents the running program; """ type = "program" object_type = "program" - - # save the program object the represents the running program instance - running_instance = None - def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs): - - if os.environ.get(PROGRAM_OBJECT_ID) == object_id: - Program.running_instance = self - + def __init__(self, object_id=str(uuid.uuid4()), persist=False, objects_list_add=True, **kwargs): + # remove timestamp, if exists if "timestamp" in kwargs: del kwargs["timestamp"] @@ -44,8 +36,8 @@ def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs): # remove "action", if exists if "action" in kwargs: del kwargs["action"] - - # print warning if object is being created with the same id as an existing object + + # update program object or print warning if existing object if Object.exists(object_id): if not Object.get(object_id).persist: print("[WARNING]", f"An object with object_id of {object_id} was already created. The previous object will be overwritten.") @@ -62,7 +54,7 @@ def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs): ) # add current object to all_objects dict - Object.add(self) + if objects_list_add: Object.add(self) def update_attributes(self, evt_handler=None, update_handler=None, **kwargs): if "data" not in self: @@ -86,12 +78,3 @@ def json(self, **kwargs): json_payload["data"] = data return self.json_encode(json_payload) - - @classmethod - def running_instance_stats(cls, stats): - if Program.running_instance: - Program.running_instance.data['info'] = stats - Program.running_instance.persist = True - return Program.running_instance - return None - \ No newline at end of file diff --git a/arena/scene.py b/arena/scene.py index b8029650..a97d9c00 100644 --- a/arena/scene.py +++ b/arena/scene.py @@ -13,9 +13,11 @@ from .objects import * from .utils import Utils, ArenaTelemetry, ArenaCmdInterpreter -from .env_vars import ( +from .env import ( SCENE, + PROGRAM_OBJECT_ID ) + class Scene(ArenaMQTT): """ Gives access to an ARENA scene. @@ -45,7 +47,7 @@ def __init__( ): # init telemetry - self.telemetry = ArenaTelemetry() + self.telemetry = ArenaTelemetry() # setup event to let others wait on connection self.connected_evt = threading.Event() @@ -90,6 +92,15 @@ def __init__( with self.telemetry.start_span(f"init {self.namespace}/{self.scene}") as span: # 'init' span will track the remainder of the initialization + + # create a program object to describe this program + # PROGRAM_OBJECT_ID allows to match the object id of persisted program object + # when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one + self.program = Program(object_id=os.environ.get(PROGRAM_OBJECT_ID), + name=f"{self.namespace}/{self.scene}", + filename=sys.argv[0], + filetype="PY") + self.persist_host = self.config_data["ARENADefaults"]["persistHost"] self.persist_path = self.config_data["ARENADefaults"]["persistPath"] @@ -523,7 +534,8 @@ def get_persisted_objs(self): object_type = data.get("object_type") # special case for Program type - if obj.get("type") == Program.object_type: object_type = Program.object_type + if obj.get("type") == Program.object_type: + object_type = Program.object_type if object_id != None: if object_id in self.all_objects: @@ -535,6 +547,11 @@ def get_persisted_objs(self): obj_class = OBJECT_TYPE_MAP.get(object_type, Object) persisted_obj = obj_class(object_id=object_id, data=data) persisted_obj.persist = True + + # replace program object, if matches our program id + if object_type == Program.object_type: + if os.environ.get(PROGRAM_OBJECT_ID) == object_id: + self.program = persisted_obj objs[object_id] = persisted_obj @@ -558,12 +575,12 @@ def get_user_list(self): """Returns a list of users""" return self.users.values() - def stats_update(self): - """Callbak when program stats are updated""" - obj = Program.running_instance_stats(self.stats.get_stats()) - if obj: - # publish program object update - self._publish(obj, "update") + def run_info_update(self, run_info): + """Callbak when program stats are updated; publish program object update""" + # Add run info to program data object and publish program object update + run_info.add_program_info(self.program.data) + self.program.persist = True + self._publish(self.program, "update") class Arena(Scene): """ diff --git a/arena/utils/program_info.py b/arena/utils/program_info.py new file mode 100644 index 00000000..9ae134cc --- /dev/null +++ b/arena/utils/program_info.py @@ -0,0 +1,72 @@ +from datetime import datetime +import os +import sys + +from ..event_loop import PersistentWorker +from ..base_object import BaseObject +from ..env import ( + ENV_DEFAULTS, + PROGRAM_STATS_UPDATE_INTERVAL_MS, + _get_env +) + +class ProgramRunInfo(BaseObject): + """ + Program Run Info + """ + + object_type = "run_info" + + def __init__(self, evt_loop=None, update_callback=None, update_interval_ms=os.environ.get(PROGRAM_STATS_UPDATE_INTERVAL_MS, ENV_DEFAULTS[PROGRAM_STATS_UPDATE_INTERVAL_MS])): + # program args, env + self.filename=sys.argv[0] + self.args=str(sys.argv[1:]) + self.env=_get_env() + + # run stats + self.create_time = datetime.utcnow().isoformat()[:-3]+"Z" + self.last_active_time = datetime.utcnow().isoformat()[:-3]+"Z" + self.last_rcv_time = None + self.last_pub_time = None + self.rcv_msgs = 0 + self.pub_msgs= 0 + self.rcv_msgs_per_sec = 0.0 + self.pub_msgs_per_sec = 0.0 + + self._msg_rate_time_start = datetime.now() + self._update_callback = update_callback + + if evt_loop: + # update stats periodically + t = PersistentWorker(evt_loop, self._update_stats, interval=update_interval_ms) + evt_loop.add_task(t) + + def _update_stats(self): + """Update stats; Execute callback if defined """ + elapsed = datetime.now() - self._msg_rate_time_start + if elapsed.seconds > 0: + self.rcv_msgs_per_sec = round(self.rcv_msgs / elapsed.seconds, 2) + self.pub_msgs_per_sec = round(self.pub_msgs / elapsed.seconds, 2) + if self._update_callback: self._update_callback(self) + + def msg_rcv(self): + self.last_rcv_time = datetime.utcnow().isoformat()[:-3]+"Z" + self.rcv_msgs = self.rcv_msgs + 1 + self.last_active_time = self.last_rcv_time + + def msg_publish(self): + self.last_pub_time = datetime.utcnow().isoformat()[:-3]+"Z" + self.pub_msgs = self.pub_msgs + 1 + self.last_active_time = self.last_pub_time + + def get_info(self, **kwargs): + """ Return run info dictionary to publish; public members only """ + obj = {k: v for k, v in vars(self).items() if k.startswith("_") == False} + obj.update(kwargs) + return obj + + def add_program_info(self, adict): + """ Add program info to another dictionary """ + adict[ProgramRunInfo.object_type] = self.get_info() + return adict + \ No newline at end of file