Skip to content

Commit

Permalink
fix: small refactor: env_vars to env and ProgramStats to ProgramRunInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
nampereira committed Feb 6, 2024
1 parent 01e5404 commit f52c157
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 45 deletions.
18 changes: 10 additions & 8 deletions arena/arena_mqtt.py
Expand Up @@ -12,15 +12,16 @@

from .auth import ArenaAuth
from .event_loop import *
from .utils import ProgramStats
from .utils import ProgramRunInfo

from .env_vars import (
from .env import (
MQTTH,
REALM,
ARENA_USERNAME,
ARENA_PASSWORD,
NAMESPACE
)

class ArenaMQTT(object):
"""
Wrapper around Paho MQTT client and EventLoop.
Expand Down Expand Up @@ -152,8 +153,8 @@ def __init__(
self.mqttc.on_disconnect = self.on_disconnect
self.mqttc.on_publish = self.on_publish

# setup program stats collection
self.stats = ProgramStats(self.event_loop, update_callback=self.stats_update)
# setup program run info to collect stats
self.run_info = ProgramRunInfo(self.event_loop, update_callback=self.run_info_update)

# add main message processing + callbacks loop to tasks
self.run_async(self.process_message)
Expand Down Expand Up @@ -313,7 +314,7 @@ def on_message(self, client, userdata, msg):
# ignore own messages
if mqtt.topic_matches_sub(self.ignore_topic, msg.topic):
return
self.stats.msg_rcv()
self.run_info.msg_rcv()
self.msg_queue.put_nowait(msg)

async def process_message(self):
Expand All @@ -333,7 +334,7 @@ def disconnect(self):
self.mqttc.disconnect()

def on_publish(self, client, userdata, mid):
self.stats.msg_publish()
self.run_info.msg_publish()

def message_callback_add(self, sub, callback):
"""Subscribes to new topic and adds callback"""
Expand All @@ -345,5 +346,6 @@ def message_callback_remove(self, sub):
self.mqttc.unsubscribe(sub)
self.mqttc.message_callback_remove(sub)

def stats_update(self):
raise NotImplementedError("Must override stats_update")
def run_info_update(self, stats):
"""Callbak when program info/stats are updated; publish program object update"""
raise NotImplementedError("Must override run_info_update")
2 changes: 1 addition & 1 deletion arena/device.py
Expand Up @@ -5,7 +5,7 @@

from .arena_mqtt import ArenaMQTT

from .env_vars import (
from .env import (
DEVICE,
)

Expand Down
26 changes: 24 additions & 2 deletions arena/env_vars.py → arena/env.py
@@ -1,6 +1,8 @@
# Environment variables definitions
# When applicable, cariable defaults are defined by ENV_DEFAULTS
# When applicable, variable defaults are defined by ENV_DEFAULTS
#
import sys
import os

MQTTH = "MQTTH"
"""
Expand Down Expand Up @@ -117,9 +119,29 @@
Default: "info".
"""

PROGRAM_STATS_UPDATE_INTERVAL_MS = "PROGRAM_STATS_UPDATE_INTERVAL_MS"
"""
.. envvar:: PROGRAM_STATS_UPDATE_INTERVAL_MS
The :envvar:`PROGRAM_STATS_UPDATE_INTERVAL_MS` environment variable defines how often program
stats are published
Default: 5000.
"""

# env variables defaults
ENV_DEFAULTS = {
ENABLE_INTERPRETER: 'false',
OTLP_ENDPOINT: 'http://localhost:4317',
OTEL_LOG_LEVEL: 'info',
}
PROGRAM_STATS_UPDATE_INTERVAL_MS: 5000
}

def _get_env(all=False):
skip = ('os', 'sys')
env = {}
# get variables defined in this module; skip credentials, private data and imports
if not all: skip = ( ARENA_PASSWORD, ARENA_USERNAME, 'os', 'sys' )
for key in [ v for v in dir(sys.modules[__name__]) if not v.startswith('_') and v not in skip]:
env[key] = os.environ.get(key)
return env
5 changes: 3 additions & 2 deletions arena/objects/arena_object.py
Expand Up @@ -198,8 +198,9 @@ def add(cls, obj):
def remove(cls, obj):
object_id = obj.object_id
del Object.all_objects[object_id]
for task in obj.delayed_prop_tasks.values(): # Cancel all pending tasks
task.cancel()
if (hasattr(obj, "delayed_prop_tasks")):
for task in obj.delayed_prop_tasks.values(): # Cancel all pending tasks
task.cancel()

@classmethod
def exists(cls, object_id):
Expand Down
29 changes: 6 additions & 23 deletions arena/objects/program.py
Expand Up @@ -4,7 +4,7 @@
import uuid
import os

from ..env_vars import (
from ..env import (
PROGRAM_OBJECT_ID,
)

Expand All @@ -21,21 +21,13 @@ class Program(BaseObject):
:param str[] args: Command-line arguments (passed in argv); e.g. [ "arg=value" ].
:param str[] env: Environment variables; e.g. [ "SCENE=ascene" ].
Uses env variable PROGRAM_OBJECT_ID to identify persist program object that represents the running program;
"""

type = "program"
object_type = "program"

# save the program object the represents the running program instance
running_instance = None

def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs):

if os.environ.get(PROGRAM_OBJECT_ID) == object_id:
Program.running_instance = self

def __init__(self, object_id=str(uuid.uuid4()), persist=False, objects_list_add=True, **kwargs):

# remove timestamp, if exists
if "timestamp" in kwargs: del kwargs["timestamp"]

Expand All @@ -44,8 +36,8 @@ def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs):

# remove "action", if exists
if "action" in kwargs: del kwargs["action"]

# print warning if object is being created with the same id as an existing object
# update program object or print warning if existing object
if Object.exists(object_id):
if not Object.get(object_id).persist:
print("[WARNING]", f"An object with object_id of {object_id} was already created. The previous object will be overwritten.")
Expand All @@ -62,7 +54,7 @@ def __init__(self, object_id=str(uuid.uuid4()), persist=False, **kwargs):
)

# add current object to all_objects dict
Object.add(self)
if objects_list_add: Object.add(self)

def update_attributes(self, evt_handler=None, update_handler=None, **kwargs):
if "data" not in self:
Expand All @@ -86,12 +78,3 @@ def json(self, **kwargs):

json_payload["data"] = data
return self.json_encode(json_payload)

@classmethod
def running_instance_stats(cls, stats):
if Program.running_instance:
Program.running_instance.data['info'] = stats
Program.running_instance.persist = True
return Program.running_instance
return None

35 changes: 26 additions & 9 deletions arena/scene.py
Expand Up @@ -13,9 +13,11 @@
from .objects import *
from .utils import Utils, ArenaTelemetry, ArenaCmdInterpreter

from .env_vars import (
from .env import (
SCENE,
PROGRAM_OBJECT_ID
)

class Scene(ArenaMQTT):
"""
Gives access to an ARENA scene.
Expand Down Expand Up @@ -45,7 +47,7 @@ def __init__(
):

# init telemetry
self.telemetry = ArenaTelemetry()
self.telemetry = ArenaTelemetry()

# setup event to let others wait on connection
self.connected_evt = threading.Event()
Expand Down Expand Up @@ -90,6 +92,15 @@ def __init__(

with self.telemetry.start_span(f"init {self.namespace}/{self.scene}") as span:
# 'init' span will track the remainder of the initialization

# create a program object to describe this program
# PROGRAM_OBJECT_ID allows to match the object id of persisted program object
# when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one
self.program = Program(object_id=os.environ.get(PROGRAM_OBJECT_ID),
name=f"{self.namespace}/{self.scene}",
filename=sys.argv[0],
filetype="PY")

self.persist_host = self.config_data["ARENADefaults"]["persistHost"]
self.persist_path = self.config_data["ARENADefaults"]["persistPath"]

Expand Down Expand Up @@ -523,7 +534,8 @@ def get_persisted_objs(self):
object_type = data.get("object_type")

# special case for Program type
if obj.get("type") == Program.object_type: object_type = Program.object_type
if obj.get("type") == Program.object_type:
object_type = Program.object_type

if object_id != None:
if object_id in self.all_objects:
Expand All @@ -535,6 +547,11 @@ def get_persisted_objs(self):
obj_class = OBJECT_TYPE_MAP.get(object_type, Object)
persisted_obj = obj_class(object_id=object_id, data=data)
persisted_obj.persist = True

# replace program object, if matches our program id
if object_type == Program.object_type:
if os.environ.get(PROGRAM_OBJECT_ID) == object_id:
self.program = persisted_obj

objs[object_id] = persisted_obj

Expand All @@ -558,12 +575,12 @@ def get_user_list(self):
"""Returns a list of users"""
return self.users.values()

def stats_update(self):
"""Callbak when program stats are updated"""
obj = Program.running_instance_stats(self.stats.get_stats())
if obj:
# publish program object update
self._publish(obj, "update")
def run_info_update(self, run_info):
"""Callbak when program stats are updated; publish program object update"""
# Add run info to program data object and publish program object update
run_info.add_program_info(self.program.data)
self.program.persist = True
self._publish(self.program, "update")

class Arena(Scene):
"""
Expand Down
72 changes: 72 additions & 0 deletions arena/utils/program_info.py
@@ -0,0 +1,72 @@
from datetime import datetime
import os
import sys

from ..event_loop import PersistentWorker
from ..base_object import BaseObject
from ..env import (
ENV_DEFAULTS,
PROGRAM_STATS_UPDATE_INTERVAL_MS,
_get_env
)

class ProgramRunInfo(BaseObject):
"""
Program Run Info
"""

object_type = "run_info"

def __init__(self, evt_loop=None, update_callback=None, update_interval_ms=os.environ.get(PROGRAM_STATS_UPDATE_INTERVAL_MS, ENV_DEFAULTS[PROGRAM_STATS_UPDATE_INTERVAL_MS])):
# program args, env
self.filename=sys.argv[0]
self.args=str(sys.argv[1:])
self.env=_get_env()

# run stats
self.create_time = datetime.utcnow().isoformat()[:-3]+"Z"
self.last_active_time = datetime.utcnow().isoformat()[:-3]+"Z"
self.last_rcv_time = None
self.last_pub_time = None
self.rcv_msgs = 0
self.pub_msgs= 0
self.rcv_msgs_per_sec = 0.0
self.pub_msgs_per_sec = 0.0

self._msg_rate_time_start = datetime.now()
self._update_callback = update_callback

if evt_loop:
# update stats periodically
t = PersistentWorker(evt_loop, self._update_stats, interval=update_interval_ms)
evt_loop.add_task(t)

def _update_stats(self):
"""Update stats; Execute callback if defined """
elapsed = datetime.now() - self._msg_rate_time_start
if elapsed.seconds > 0:
self.rcv_msgs_per_sec = round(self.rcv_msgs / elapsed.seconds, 2)
self.pub_msgs_per_sec = round(self.pub_msgs / elapsed.seconds, 2)
if self._update_callback: self._update_callback(self)

def msg_rcv(self):
self.last_rcv_time = datetime.utcnow().isoformat()[:-3]+"Z"
self.rcv_msgs = self.rcv_msgs + 1
self.last_active_time = self.last_rcv_time

def msg_publish(self):
self.last_pub_time = datetime.utcnow().isoformat()[:-3]+"Z"
self.pub_msgs = self.pub_msgs + 1
self.last_active_time = self.last_pub_time

def get_info(self, **kwargs):
""" Return run info dictionary to publish; public members only """
obj = {k: v for k, v in vars(self).items() if k.startswith("_") == False}
obj.update(kwargs)
return obj

def add_program_info(self, adict):
""" Add program info to another dictionary """
adict[ProgramRunInfo.object_type] = self.get_info()
return adict

0 comments on commit f52c157

Please sign in to comment.