Skip to content

Commit

Permalink
fix: failed animation object access as obj/dict
Browse files Browse the repository at this point in the history
  • Loading branch information
mwfarb committed Feb 21, 2024
1 parent 8d5785b commit 1c78cd1
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions arena/scene.py
Expand Up @@ -47,18 +47,18 @@ def __init__(
cli_args = False,
**kwargs
):

# init telemetry
self.telemetry = ArenaTelemetry()
self.telemetry = ArenaTelemetry()

# setup event to let others wait on connection
self.connected_evt = threading.Event()

# start the command interpreter (if enabled by env variable)
self.cmd_interpreter = ArenaCmdInterpreter(self, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'run_info'),
get_callables=('persisted_objs', 'persisted_scene_option', 'writable_scenes', 'user_list'),
start_cmd_event=self.connected_evt)

if cli_args:
self.args = self.parse_cli()
if self.args["host"]:
Expand All @@ -81,7 +81,7 @@ def __init__(

if re.search("/", self.scene):
self.exit("Scene cannot include '/', aborting...")

super().__init__(
host,
realm,
Expand All @@ -99,11 +99,11 @@ def __init__(
# create a program object to describe this program
# PROGRAM_OBJECT_ID allows to match the object id of persisted program object
# when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one
self.program = Program(object_id=_get_env(PROGRAM_OBJECT_ID, super().client_id()),
name=f"{self.namespace}/{self.scene}",
filename=sys.argv[0],
self.program = Program(object_id=_get_env(PROGRAM_OBJECT_ID, super().client_id()),
name=f"{self.namespace}/{self.scene}",
filename=sys.argv[0],
filetype="PY")

self.persist_host = self.config_data["ARENADefaults"]["persistHost"]
self.persist_path = self.config_data["ARENADefaults"]["persistPath"]

Expand All @@ -122,8 +122,8 @@ def __init__(

# setup program run info to collect stats
self.run_info = ProgramRunInfo(self.event_loop,
queue_len_callable=lambda: self.get_rcv_pub_queue_len(),
update_callback=self.run_info_update,
queue_len_callable=lambda: self.get_rcv_pub_queue_len(),
update_callback=self.run_info_update,
web_host=self.web_host,
namespace=self.namespace,
scene=self.scene,
Expand All @@ -132,7 +132,7 @@ def __init__(

# Always use the the hostname specified by the user, or defaults.
print(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}")

span.add_event(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}")

def exit(self, arg=0):
Expand All @@ -142,15 +142,15 @@ def exit(self, arg=0):
error_msg = f"Exiting with sys.exit('{arg}')"
self.telemetry.exit(error_msg)
os._exit(arg)

def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
# set event
self.connected_evt.set()

# create arena-py Objects from persist server
# no need to return anything here
# no need to return anything here
self.get_persisted_objs()

def on_message(self, client, userdata, msg):
Expand All @@ -159,13 +159,13 @@ def on_message(self, client, userdata, msg):

def on_publish(self, client, userdata, mid):
self.run_info.msg_publish()

async def process_message(self):
while True:
try:
msg = await self.msg_queue.get()
except RuntimeError as e:
self.telemetry.add_event(f"Ignoring error: {e}")
self.telemetry.add_event(f"Ignoring error: {e}")
return

# extract payload
Expand All @@ -179,7 +179,7 @@ async def process_message(self):
object_id = payload.get("object_id", None)
action = payload.get("action", None)

with self.telemetry.start_process_msg_span(object_id, action) as span:
with self.telemetry.start_process_msg_span(object_id, action) as span:
try:
# update object attributes, if possible
if object_id:
Expand Down Expand Up @@ -239,7 +239,7 @@ async def process_message(self):
else: # create/update
obj.update_attributes(**payload)
span.add_event("Object attributes update.")

else:
self.telemetry.set_error("No message action!", span)

Expand Down Expand Up @@ -287,12 +287,12 @@ async def process_message(self):
self.callback_wrapper(self.new_obj_callback, obj, payload)
self.unspecified_object_ids.add(object_id)
span.add_event("New Object.")

span.add_event("Handle Msg Done.")
else:
self.telemetry.set_error("No object id!", span)

except Exception as e:
except Exception as e:
self.telemetry.set_error(f"Something went wrong, ignoring: {payload}. {e}")

def callback_wrapper(self, func, arg, msg):
Expand Down Expand Up @@ -488,7 +488,7 @@ async def _delayed_task():

def _publish(self, obj, action, custom_payload=False):
"""Publishes to mqtt broker with "action":action"""
with self.telemetry.start_publish_span(obj.object_id, action, obj.type) as span:
with self.telemetry.start_publish_span(obj["object_id"], action, obj["type"]) as span:
if not self.can_publish:
self.telemetry.set_error(f"ERROR: Publish failed! You do not have permission to publish to topic {self.root_topic} on {self.web_host}", span)

Expand All @@ -504,13 +504,13 @@ def _publish(self, obj, action, custom_payload=False):
payload = obj.json(action=action, timestamp=d)

self.mqttc.publish(topic, payload, qos=0)
if self.debug:
if self.debug:
self.telemetry.add_event(f"[publish] {topic} {payload}")
return payload

def get_persisted_obj(self, object_id):
"""Returns a dictionary for a persisted object.
If object is known by arena-py, return local object, not persisted
"""
obj = None
Expand All @@ -535,12 +535,12 @@ def get_persisted_obj(self, object_id):
obj_class = OBJECT_TYPE_MAP.get(object_type, Object)
obj = obj_class(object_id=obj_id, data=data)
obj.persist = True

return obj

def get_persisted_objs(self):
"""Returns a dictionary of persisted objects.
If object is known by arena-py, return our local object, not persisted
Silently fails/skip objects without object_id and object_type (except programs)
Instanciates generic Object if object_type is given but unknown to arena-py
Expand All @@ -554,11 +554,11 @@ def get_persisted_objs(self):
object_id = obj.get("object_id")
data = obj.get("attributes", {})
object_type = data.get("object_type")

# special case for Program type
if obj.get("type") == Program.object_type:
if obj.get("type") == Program.object_type:
object_type = Program.object_type

if object_id != None:
if object_id in self.all_objects:
# note: object from our list (not from persist)
Expand All @@ -569,7 +569,7 @@ def get_persisted_objs(self):
obj_class = OBJECT_TYPE_MAP.get(object_type, Object)
persisted_obj = obj_class(object_id=object_id, data=data)
persisted_obj.persist = True

# replace program object, if matches our program id
if object_type == Program.object_type:
if os.environ.get(PROGRAM_OBJECT_ID) == object_id:
Expand All @@ -578,8 +578,8 @@ def get_persisted_objs(self):
else:
# dont persist program objet if not ours
persisted_obj.persist = False


objs[object_id] = persisted_obj

return objs
Expand All @@ -605,13 +605,13 @@ def get_user_list(self):
def get_rcv_pub_queue_len(self):
"""Return QueueStats object with receive and publish queue length"""
return QueueStats(super().rcv_queue_len(), super().pub_queue_len())

def run_info_update(self, run_info):
"""Callbak when program stats are updated; publish program object update"""
# Add run info to program data object and publish program object update
run_info.add_program_info(self.program.data)
self._publish(self.program, "update")

class Arena(Scene):
"""
Another name for Scene.
Expand Down

0 comments on commit 1c78cd1

Please sign in to comment.