From 1c78cd1c3166047bd37585fa5767d076921caea9 Mon Sep 17 00:00:00 2001 From: mwfarb Date: Wed, 21 Feb 2024 14:51:14 -0500 Subject: [PATCH] fix: failed animation object access as obj/dict --- arena/scene.py | 70 +++++++++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/arena/scene.py b/arena/scene.py index 4586554c..5a692e7e 100644 --- a/arena/scene.py +++ b/arena/scene.py @@ -47,18 +47,18 @@ def __init__( cli_args = False, **kwargs ): - + # init telemetry - self.telemetry = ArenaTelemetry() - + self.telemetry = ArenaTelemetry() + # setup event to let others wait on connection self.connected_evt = threading.Event() - + # start the command interpreter (if enabled by env variable) self.cmd_interpreter = ArenaCmdInterpreter(self, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'run_info'), get_callables=('persisted_objs', 'persisted_scene_option', 'writable_scenes', 'user_list'), start_cmd_event=self.connected_evt) - + if cli_args: self.args = self.parse_cli() if self.args["host"]: @@ -81,7 +81,7 @@ def __init__( if re.search("/", self.scene): self.exit("Scene cannot include '/', aborting...") - + super().__init__( host, realm, @@ -99,11 +99,11 @@ def __init__( # create a program object to describe this program # PROGRAM_OBJECT_ID allows to match the object id of persisted program object # when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one - self.program = Program(object_id=_get_env(PROGRAM_OBJECT_ID, super().client_id()), - name=f"{self.namespace}/{self.scene}", - filename=sys.argv[0], + self.program = Program(object_id=_get_env(PROGRAM_OBJECT_ID, super().client_id()), + name=f"{self.namespace}/{self.scene}", + filename=sys.argv[0], filetype="PY") - + self.persist_host = self.config_data["ARENADefaults"]["persistHost"] self.persist_path = self.config_data["ARENADefaults"]["persistPath"] @@ -122,8 +122,8 @@ def __init__( # setup program run info to collect stats self.run_info = ProgramRunInfo(self.event_loop, - queue_len_callable=lambda: self.get_rcv_pub_queue_len(), - update_callback=self.run_info_update, + queue_len_callable=lambda: self.get_rcv_pub_queue_len(), + update_callback=self.run_info_update, web_host=self.web_host, namespace=self.namespace, scene=self.scene, @@ -132,7 +132,7 @@ def __init__( # Always use the the hostname specified by the user, or defaults. print(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}") - + span.add_event(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}") def exit(self, arg=0): @@ -142,7 +142,7 @@ def exit(self, arg=0): error_msg = f"Exiting with sys.exit('{arg}')" self.telemetry.exit(error_msg) os._exit(arg) - + def on_connect(self, client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) if rc == 0: @@ -150,7 +150,7 @@ def on_connect(self, client, userdata, flags, rc): self.connected_evt.set() # create arena-py Objects from persist server - # no need to return anything here + # no need to return anything here self.get_persisted_objs() def on_message(self, client, userdata, msg): @@ -159,13 +159,13 @@ def on_message(self, client, userdata, msg): def on_publish(self, client, userdata, mid): self.run_info.msg_publish() - + async def process_message(self): while True: try: msg = await self.msg_queue.get() except RuntimeError as e: - self.telemetry.add_event(f"Ignoring error: {e}") + self.telemetry.add_event(f"Ignoring error: {e}") return # extract payload @@ -179,7 +179,7 @@ async def process_message(self): object_id = payload.get("object_id", None) action = payload.get("action", None) - with self.telemetry.start_process_msg_span(object_id, action) as span: + with self.telemetry.start_process_msg_span(object_id, action) as span: try: # update object attributes, if possible if object_id: @@ -239,7 +239,7 @@ async def process_message(self): else: # create/update obj.update_attributes(**payload) span.add_event("Object attributes update.") - + else: self.telemetry.set_error("No message action!", span) @@ -287,12 +287,12 @@ async def process_message(self): self.callback_wrapper(self.new_obj_callback, obj, payload) self.unspecified_object_ids.add(object_id) span.add_event("New Object.") - + span.add_event("Handle Msg Done.") else: self.telemetry.set_error("No object id!", span) - except Exception as e: + except Exception as e: self.telemetry.set_error(f"Something went wrong, ignoring: {payload}. {e}") def callback_wrapper(self, func, arg, msg): @@ -488,7 +488,7 @@ async def _delayed_task(): def _publish(self, obj, action, custom_payload=False): """Publishes to mqtt broker with "action":action""" - with self.telemetry.start_publish_span(obj.object_id, action, obj.type) as span: + with self.telemetry.start_publish_span(obj["object_id"], action, obj["type"]) as span: if not self.can_publish: self.telemetry.set_error(f"ERROR: Publish failed! You do not have permission to publish to topic {self.root_topic} on {self.web_host}", span) @@ -504,13 +504,13 @@ def _publish(self, obj, action, custom_payload=False): payload = obj.json(action=action, timestamp=d) self.mqttc.publish(topic, payload, qos=0) - if self.debug: + if self.debug: self.telemetry.add_event(f"[publish] {topic} {payload}") return payload def get_persisted_obj(self, object_id): """Returns a dictionary for a persisted object. - + If object is known by arena-py, return local object, not persisted """ obj = None @@ -535,12 +535,12 @@ def get_persisted_obj(self, object_id): obj_class = OBJECT_TYPE_MAP.get(object_type, Object) obj = obj_class(object_id=obj_id, data=data) obj.persist = True - + return obj - + def get_persisted_objs(self): """Returns a dictionary of persisted objects. - + If object is known by arena-py, return our local object, not persisted Silently fails/skip objects without object_id and object_type (except programs) Instanciates generic Object if object_type is given but unknown to arena-py @@ -554,11 +554,11 @@ def get_persisted_objs(self): object_id = obj.get("object_id") data = obj.get("attributes", {}) object_type = data.get("object_type") - + # special case for Program type - if obj.get("type") == Program.object_type: + if obj.get("type") == Program.object_type: object_type = Program.object_type - + if object_id != None: if object_id in self.all_objects: # note: object from our list (not from persist) @@ -569,7 +569,7 @@ def get_persisted_objs(self): obj_class = OBJECT_TYPE_MAP.get(object_type, Object) persisted_obj = obj_class(object_id=object_id, data=data) persisted_obj.persist = True - + # replace program object, if matches our program id if object_type == Program.object_type: if os.environ.get(PROGRAM_OBJECT_ID) == object_id: @@ -578,8 +578,8 @@ def get_persisted_objs(self): else: # dont persist program objet if not ours persisted_obj.persist = False - - + + objs[object_id] = persisted_obj return objs @@ -605,13 +605,13 @@ def get_user_list(self): def get_rcv_pub_queue_len(self): """Return QueueStats object with receive and publish queue length""" return QueueStats(super().rcv_queue_len(), super().pub_queue_len()) - + def run_info_update(self, run_info): """Callbak when program stats are updated; publish program object update""" # Add run info to program data object and publish program object update run_info.add_program_info(self.program.data) self._publish(self.program, "update") - + class Arena(Scene): """ Another name for Scene.