Skip to content

Commit

Permalink
fix: minor fixes; add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
nampereira committed Jan 9, 2024
1 parent c4eba98 commit 4e208ac
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 55 deletions.
25 changes: 7 additions & 18 deletions arena/arena_mqtt.py
Expand Up @@ -12,14 +12,14 @@

from .auth import ArenaAuth
from .event_loop import *
from .utils import ProgramRunInfo

from .env import (
MQTTH,
REALM,
ARENA_USERNAME,
ARENA_PASSWORD,
NAMESPACE
NAMESPACE,
_get_env
)

class ArenaMQTT(object):
Expand All @@ -43,7 +43,7 @@ def __init__(
**kwargs
):
if os.environ.get(MQTTH):
self.web_host = os.environ[MQTTH]
self.web_host = _get_env(MQTTH)
print(f"Using Host from 'MQTTH' env variable: {self.web_host}")
elif "host" in kwargs and kwargs["host"]:
self.web_host = kwargs["host"]
Expand All @@ -53,7 +53,7 @@ def __init__(
self.web_host = web_host

if os.environ.get(REALM):
self.realm = os.environ[REALM]
self.realm = _get_env(REALM)
print(f"Using Realm from 'REALM' env variable: {self.realm}")
elif "realm" in kwargs and kwargs["realm"]:
self.realm = kwargs["realm"]
Expand All @@ -72,8 +72,8 @@ def __init__(
self.auth = ArenaAuth()
if os.environ.get(ARENA_USERNAME) and os.environ.get(ARENA_PASSWORD):
# auth 1st: use passed in env var
self.username = os.environ[ARENA_USERNAME]
token = os.environ[ARENA_PASSWORD]
self.username = _get_env(ARENA_USERNAME)
token = _get_env(ARENA_PASSWORD)
self.auth.store_environment_auth(self.username, token)
else:
if self.scene:
Expand All @@ -90,7 +90,7 @@ def __init__(
self.username = self.auth.authenticate_user(self.web_host)

if os.environ.get(NAMESPACE):
self.namespace = os.environ[NAMESPACE]
self.namespace = _get_env(NAMESPACE)
elif "namespace" not in kwargs or ("namespace" in kwargs and kwargs["namespace"] is None):
self.namespace = self.username
else:
Expand Down Expand Up @@ -152,9 +152,6 @@ def __init__(
self.mqttc.on_connect = self.on_connect
self.mqttc.on_disconnect = self.on_disconnect
self.mqttc.on_publish = self.on_publish

# setup program run info to collect stats
self.run_info = ProgramRunInfo(self.event_loop, update_callback=self.run_info_update)

# add main message processing + callbacks loop to tasks
self.run_async(self.process_message)
Expand Down Expand Up @@ -314,7 +311,6 @@ def on_message(self, client, userdata, msg):
# ignore own messages
if mqtt.topic_matches_sub(self.ignore_topic, msg.topic):
return
self.run_info.msg_rcv()
self.msg_queue.put_nowait(msg)

async def process_message(self):
Expand All @@ -333,9 +329,6 @@ def disconnect(self):
self.end_program_callback(self)
self.mqttc.disconnect()

def on_publish(self, client, userdata, mid):
self.run_info.msg_publish()

def message_callback_add(self, sub, callback):
"""Subscribes to new topic and adds callback"""
self.mqttc.subscribe(sub)
Expand All @@ -345,7 +338,3 @@ def message_callback_remove(self, sub):
"""Unsubscribes to topic and removes callback"""
self.mqttc.unsubscribe(sub)
self.mqttc.message_callback_remove(sub)

def run_info_update(self, stats):
"""Callbak when program info/stats are updated; publish program object update"""
raise NotImplementedError("Must override run_info_update")
3 changes: 2 additions & 1 deletion arena/device.py
Expand Up @@ -7,6 +7,7 @@

from .env import (
DEVICE,
_get_env
)

class Device(ArenaMQTT):
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(
debug = self.args["debug"]

if os.environ.get(DEVICE):
self.device = os.environ[DEVICE]
self.device = _get_env(DEVICE)
print(f"Using Device from 'DEVICE' env variable: {self.device}")
elif "device" in kwargs and kwargs["device"]:
if re.search("/", kwargs["device"]):
Expand Down
13 changes: 8 additions & 5 deletions arena/env.py
Expand Up @@ -129,19 +129,22 @@
Default: 5000.
"""

# env variables defaults
""" Environment variables default values """
ENV_DEFAULTS = {
ENABLE_INTERPRETER: 'false',
OTLP_ENDPOINT: 'http://localhost:4317',
OTEL_LOG_LEVEL: 'info',
PROGRAM_STATS_UPDATE_INTERVAL_MS: 5000
}

def _get_env(all=False):
skip = ('os', 'sys')
def _get_env(env_var):
""" Get value of env variable with default defined by ENV_DEFAULTS; None if not defined in ENV_DEFAULTS """
return os.environ.get(env_var, ENV_DEFAULTS.get(env_var))

def _get_arena_env():
"""Get all variables defined in this module; skip credentials, private data and imports"""
env = {}
# get variables defined in this module; skip credentials, private data and imports
if not all: skip = ( ARENA_PASSWORD, ARENA_USERNAME, 'os', 'sys' )
skip = ( ARENA_PASSWORD, ARENA_USERNAME, 'os', 'sys' )
for key in [ v for v in dir(sys.modules[__name__]) if not v.startswith('_') and v not in skip]:
env[key] = os.environ.get(key)
return env
29 changes: 23 additions & 6 deletions arena/scene.py
Expand Up @@ -11,11 +11,12 @@
from .attributes import *
from .events import *
from .objects import *
from .utils import Utils, ArenaTelemetry, ArenaCmdInterpreter
from .utils import Utils, ArenaTelemetry, ArenaCmdInterpreter, ProgramRunInfo

from .env import (
SCENE,
PROGRAM_OBJECT_ID
PROGRAM_OBJECT_ID,
_get_env
)

class Scene(ArenaMQTT):
Expand Down Expand Up @@ -53,7 +54,7 @@ def __init__(
self.connected_evt = threading.Event()

# start the command interpreter (if enabled by env variable)
self.cmd_interpreter = ArenaCmdInterpreter(self, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'stats'),
self.cmd_interpreter = ArenaCmdInterpreter(self, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'run_info'),
get_callables=('persisted_objs', 'persisted_scene_option', 'writable_scenes', 'user_list'),
start_cmd_event=self.connected_evt)

Expand All @@ -69,7 +70,7 @@ def __init__(
debug = self.args["debug"]

if os.environ.get(SCENE):
self.scene = os.environ[SCENE]
self.scene = _get_env(SCENE)
print(f"Using Scene from 'SCENE' env variable: {self.scene}")
elif "scene" in kwargs and kwargs["scene"]:
if re.search("/", kwargs["scene"]):
Expand All @@ -96,7 +97,7 @@ def __init__(
# create a program object to describe this program
# PROGRAM_OBJECT_ID allows to match the object id of persisted program object
# when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one
self.program = Program(object_id=os.environ.get(PROGRAM_OBJECT_ID),
self.program = Program(object_id=_get_env(PROGRAM_OBJECT_ID),
name=f"{self.namespace}/{self.scene}",
filename=sys.argv[0],
filetype="PY")
Expand All @@ -117,9 +118,18 @@ def __init__(
# have a reference to
self.users = {} # dict of all users

# setup program run info to collect stats
self.run_info = ProgramRunInfo(self.event_loop,
update_callback=self.run_info_update,
web_host=self.web_host,
namespace=self.namespace,
scene=self.scene,
realm=self.realm
)

# Always use the the hostname specified by the user, or defaults.
print(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}")

span.add_event(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}")

def exit(self, arg=0):
Expand All @@ -139,6 +149,13 @@ def on_connect(self, client, userdata, flags, rc):
# create arena-py Objects from persist server
# no need to return anything here
self.get_persisted_objs()

def on_message(self, client, userdata, msg):
super().on_message(client, userdata, msg)
self.run_info.msg_rcv()

def on_publish(self, client, userdata, mid):
self.run_info.msg_publish()

async def process_message(self):
while True:
Expand Down
41 changes: 29 additions & 12 deletions arena/utils/arena_telemetry.py
@@ -1,3 +1,10 @@
"""
The ArenaTelemetry generates traces, metrics, and logs using OpenTelemetry (OTEL).
It can export using OTEL's protocol (OTLP), send JSON OTEL spans to MQTT, or to the console.
The :envvar:`ARENA_TELEMETRY` environment variable enables the telemetry.
The :envvar:`OTLP_ENDPOINT` environment variable defines the OTLP endpoint when OTLP is used.
"""
import atexit
import sys
import os
Expand All @@ -17,15 +24,13 @@
from ..env import (
ARENA_TELEMETRY,
OTLP_ENDPOINT,
ENV_DEFAULTS
_get_env
)

TRACE_TOPIC_DFT = "realm/ns/scene/t/traces"

class MQTTSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that sends spans to MQTT
"""
"""Implementation of :class:`SpanExporter` that sends spans to MQTT"""

def __init__(
self,
Expand Down Expand Up @@ -59,10 +64,22 @@ def shutdown(self) -> None:
pass

class ArenaTelemetry():
"""Implementation of ARENA telemetry.
According to :envvar:`ARENA_TELEMETRY`, exports using OTLP, send JSON OTEL spans to MQTT, or to the console.
"""

parent_span: Span = None

def __init__(self, name=sys.argv[0], id=None):
"""Return a `ArenaTelemetry` using given service name and id
Provides utility calls that wrap open telemetry functionality to start spans, log events, and other.
Creates a parent span for all the spans related to the program.
Args:
name: name of the service used with the telemetry backend
id: additional id used with the telemetry backend
"""

service_name = f"{name}"
if id: service_name = service_name + "({id})"
Expand All @@ -71,8 +88,8 @@ def __init__(self, name=sys.argv[0], id=None):
SERVICE_NAME: service_name
})

env_telemetry = os.environ.get(ARENA_TELEMETRY, 'None')
otlp_endpoint = os.environ.get(OTLP_ENDPOINT, ENV_DEFAULTS.get(OTLP_ENDPOINT))
env_telemetry = _get_env(ARENA_TELEMETRY)
otlp_endpoint = _get_env(OTLP_ENDPOINT)
tel_exporters = {
'otlp': lambda: OTLPSpanExporter(otlp_endpoint, insecure=True),
'mqtt': lambda: MQTTSpanExporter(),
Expand Down Expand Up @@ -101,8 +118,8 @@ def __init__(self, name=sys.argv[0], id=None):
# make sure we end parent span
atexit.register(self.exit)

# record exit status on error
def exit(self, error_msg=None):
"""Record exit status on error """
if not self.enabled: return
if error_msg: self.parent_span.set_status(Status(StatusCode.ERROR, error_msg))
self.parent_span.end()
Expand All @@ -112,31 +129,31 @@ def exit(self, error_msg=None):
def __del__(self):
if self.parent_span != INVALID_SPAN: self.exit()

# wrapper to otel start_as_current_span; force context to be parent span
def start_span(self, name, **kwargs):
"""Wrapper to otel start_as_current_span; force context to be parent span"""
if 'context' in kwargs: del kwargs['context']
return self.tracer.start_as_current_span(name, context=self.parent_span_ctx, **kwargs)

# wrapper to otel start_as_current_span to start a process message span; force context to be parent span
def start_process_msg_span(self, obj_id, action, **kwargs):
"""Wrapper to otel start_as_current_span to start a process message span; force context to be parent span"""
if 'context' in kwargs: del kwargs['context']
return self.tracer.start_as_current_span(f"{obj_id} {action} process_message", context=self.parent_span_ctx, **kwargs)

# wrapper to otel start_as_current_span to start a process message span; force context to be parent span
def start_publish_span(self, obj_id, action, type, **kwargs):
"""Wrapper to otel start_as_current_span to start a process message span; force context to be parent span"""
if 'context' in kwargs: del kwargs['context']
return self.tracer.start_as_current_span(f"{obj_id} {action} publish_message {type}", context=self.parent_span_ctx, **kwargs)

# add event to given or current span
def add_event(self, name, span=None, print_msg=True, **kwargs):
"""Add event to given or current span"""
if print_msg: print(name)
if not self.enabled: return
if not span: span = trace.get_current_span()
if span == INVALID_SPAN: span = self.parent_span
span.add_event(name, kwargs)

# set error on given or current span
def set_error(self, error_msg, span=None, print_msg=True):
"""Set error on given or current span"""
if print_msg: print(error_msg)
if not self.enabled: return
if not span: span = trace.get_current_span()
Expand Down
23 changes: 21 additions & 2 deletions arena/utils/cmd_interpreter.py
@@ -1,7 +1,26 @@
# A simple command interpreter
"""
The ArenaCmdInterpreter is a simple line-oriented command interpreter that
allows to inspect library/program state. It looks at :envvar:`ENABLE_INTERPRETER`
to enable the interpreter.
The :class:`.ArenaCmdInterpreter` receives a :class:`.Scene` instance and provides commands
to inspect attributes and execute functions (callables) given to the constructor.
The commands available are:
show: displays attributes
info: excutes scene functions that output information
help: displays the commands available
exit: terminates the program
"""

import cmd, os, json, asyncio, threading, time
from datetime import date, datetime
from ..env import (
ENABLE_INTERPRETER,
_get_env
)

class ArenaCmdInterpreter(cmd.Cmd):
intro = 'Welcome to the arena-py console. Type help or ? to list available commands.\n'
prompt = '# '
Expand All @@ -15,7 +34,7 @@ def __serialize_obj(self, obj):
raise TypeError("Type not serializable")

def __init__(self, scene, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'msg_io'), get_callables=('persisted_objs', 'persisted_scene_option', 'writable_scenes', 'user_list'), start_cmd_event=None):
self.enable_interp = os.environ.get("ENABLE_INTERPRETER", 'False').lower() in ('true', '1', 't')
self.enable_interp = _get_env(ENABLE_INTERPRETER).lower() in ('true', '1', 't')
if not self.enable_interp: return
super().__init__(completekey='tab')
self._scene = scene
Expand Down

0 comments on commit 4e208ac

Please sign in to comment.