From 2138887a97331f88e0da36eafdb5580e9a0bc18b Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Tue, 27 Jan 2026 17:01:28 -0800 Subject: [PATCH 1/9] Implement Tron Odom Provider to fetch and process robot odometry data via Zenoh. --- src/providers/tron_odom_provider.py | 349 ++++++++++++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100644 src/providers/tron_odom_provider.py diff --git a/src/providers/tron_odom_provider.py b/src/providers/tron_odom_provider.py new file mode 100644 index 000000000..04201ca35 --- /dev/null +++ b/src/providers/tron_odom_provider.py @@ -0,0 +1,349 @@ +import logging +import math +import multiprocessing as mp +import threading +import time +from enum import Enum +from typing import Optional + +import zenoh + +from runtime.logging import LoggingConfig, get_logging_config, setup_logging + +from zenoh_msgs import ( + Odometry, + PoseWithCovarianceStamped, + nav_msgs, + open_zenoh_session, +) + +from .singleton import singleton + +rad_to_deg = 57.2958 + + +class RobotState(Enum): + """ + Enumeration for robot states. + """ + + STANDING = "standing" + SITTING = "sitting" + + +def tron_odom_processor( + topic: str, + data_queue: mp.Queue, + logging_config: Optional[LoggingConfig] = None, +) -> None: + """ + Process function for the Tron Odom Provider. + This function runs in a separate process to periodically retrieve the odometry + data from the robot via Zenoh and put it into a multiprocessing queue. + + Parameters + ---------- + topic : str + The Zenoh topic to subscribe to for odometry data. + data_queue : mp.Queue + Queue for sending the retrieved odometry and pose data. + logging_config : LoggingConfig, optional + Optional logging configuration. If provided, it will override the default logging settings. + """ + setup_logging("tron_odom_processor", logging_config=logging_config) + + def zenoh_odom_handler(data: zenoh.Sample): + """ + Zenoh handler for odometry data. + + Parameters + ---------- + data : zenoh.Sample + The Zenoh sample containing the odometry data. + """ + odom: Odometry = nav_msgs.Odometry.deserialize(data.payload.to_bytes()) + logging.debug(f"Tron Zenoh odom handler: {odom}") + + data_queue.put( + PoseWithCovarianceStamped(header=odom.header, pose=odom.pose) # type: ignore + ) + + try: + session = open_zenoh_session() + logging.info(f"Tron Zenoh odom provider opened session: {session}") + logging.info(f"Tron odom listener subscribing to topic: {topic}") + session.declare_subscriber(topic, zenoh_odom_handler) + except Exception as e: + logging.error(f"Error opening Zenoh client for Tron odom: {e}") + return None + + while True: + time.sleep(0.1) + + +@singleton +class TronOdomProvider: + """ + Tron Odom Provider. + + This class implements a singleton pattern to manage: + * Odom data from the Tron robot using Zenoh + + Parameters + ---------- + topic : str + The Zenoh topic to subscribe to for odometry data. + Defaults to "odom". + """ + + def __init__(self, topic: str = "odom"): + """ + Initialize the Tron Odom Provider with Zenoh configuration. + + Parameters + ---------- + topic : str + The Zenoh topic to subscribe to for odometry data. + Defaults to "odom". + """ + logging.info("Booting Tron Odom Provider") + + self.topic = topic + + self.data_queue: mp.Queue[PoseWithCovarianceStamped] = mp.Queue() + self._odom_reader_thread: Optional[mp.Process] = None + self._odom_processor_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + self.body_height_cm = 0 + self.body_attitude: Optional[RobotState] = None + + self.moving: bool = False + self.previous_x = 0 + self.previous_y = 0 + self.previous_z = 0 + self.move_history = 0 + + self._odom: Optional[Odometry] = None + + self.x = 0.0 + self.y = 0.0 + self.z = 0.0 + self.odom_yaw_0_360 = 0.0 + self.odom_yaw_m180_p180 = 0.0 + self.odom_rockchip_ts = 0.0 + self.odom_subscriber_ts = 0.0 + + self.start() + + def start(self) -> None: + """ + Start the Tron Odom Provider. + """ + if self._odom_reader_thread and self._odom_reader_thread.is_alive(): + logging.warning("Tron Odom Provider is already running.") + return + else: + if not self.topic: + logging.error("Topic must be specified to start the Tron Odom Provider.") + return + + logging.info( + f"Starting Tron Odom Provider on Zenoh topic: {self.topic}" + ) + + self._odom_reader_thread = mp.Process( + target=tron_odom_processor, + args=( + self.topic, + self.data_queue, + get_logging_config(), + ), + daemon=True, + ) + self._odom_reader_thread.start() + + if self._odom_processor_thread and self._odom_processor_thread.is_alive(): + logging.warning("Tron Odom processor thread is already running.") + return + else: + logging.info("Starting Tron Odom processor thread") + self._odom_processor_thread = threading.Thread( + target=self.process_odom, daemon=True + ) + self._odom_processor_thread.start() + + def euler_from_quaternion(self, x: float, y: float, z: float, w: float) -> tuple: + """ + https://automaticaddison.com/how-to-convert-a-quaternion-into-euler-angles-in-python/ + Convert a quaternion into euler angles (roll, pitch, yaw) + roll is rotation around x in radians (counterclockwise) + pitch is rotation around y in radians (counterclockwise) + yaw is rotation around z in radians (counterclockwise). + + Parameters + ---------- + x : float + The x component of the quaternion. + y : float + The y component of the quaternion. + z : float + The z component of the quaternion. + w : float + The w component of the quaternion. + + Returns + ------- + tuple + A tuple containing the roll, pitch, and yaw angles in radians. + """ + t0 = +2.0 * (w * x + y * z) + t1 = +1.0 - 2.0 * (x * x + y * y) + roll_x = math.atan2(t0, t1) + + t2 = +2.0 * (w * y - z * x) + t2 = +1.0 if t2 > +1.0 else t2 + t2 = -1.0 if t2 < -1.0 else t2 + pitch_y = math.asin(t2) + + t3 = +2.0 * (w * z + x * y) + t4 = +1.0 - 2.0 * (y * y + z * z) + yaw_z = math.atan2(t3, t4) + + return roll_x, pitch_y, yaw_z # in radians + + def process_odom(self): + """ + Process the odom data and update the internal state. + + Parameters + ---------- + pose : PoseWithCovariance + The pose data containing position and orientation. + """ + while not self._stop_event.is_set(): + try: + pose_data = self.data_queue.get() + except Exception as e: + logging.error(f"Error getting pose from queue: {e}") + time.sleep(1) + continue + + pose = pose_data.pose.pose + header = pose_data.header + + # this is the time according to the robot. It may be off by several seconds from + # UTC + self.odom_rockchip_ts = header.stamp.sec + header.stamp.nanosec * 1e-9 + + # The local timestamp + self.odom_subscriber_ts = time.time() + + # Body height detection for Tron robot + # Based on observed data: + # - Sitting: z ≈ 0.55m (55cm) + # - Standing: z ≈ 0.71m (71cm) + self.body_height_cm = round(pose.position.z * 100.0) + if self.body_height_cm > 60: + self.body_attitude = RobotState.STANDING + elif self.body_height_cm > 3: + self.body_attitude = RobotState.SITTING + + x = pose.orientation.x + y = pose.orientation.y + z = pose.orientation.z + w = pose.orientation.w + + dx = (pose.position.x - self.previous_x) ** 2 + dy = (pose.position.y - self.previous_y) ** 2 + dz = (pose.position.z - self.previous_z) ** 2 + + self.previous_x = pose.position.x + self.previous_y = pose.position.y + self.previous_z = pose.position.z + + delta = math.sqrt(dx + dy + dz) + + # moving? Use a decay kernel + self.move_history = 0.7 * delta + 0.3 * self.move_history + + if delta > 0.01 or self.move_history > 0.01: + self.moving = True + logging.info( + f"delta moving (m): {round(delta,3)} {round(self.move_history,3)}" + ) + else: + # logging.info( + # f"delta moving (m): {round(delta,3)} {round(self.move_history,3)}" + # ) + self.moving = False + + angles = self.euler_from_quaternion(x, y, z, w) + + # this is in the standard robot convention + # yaw increases when you turn LEFT + # (counter-clockwise rotation about the vertical axis + self.odom_yaw_m180_p180 = round(angles[2] * rad_to_deg, 4) + + # we also provide a second data product, where + # * yaw increases when you turn RIGHT (CW), and + # * the range runs from 0 to 360 Deg + flip = -1.0 * self.odom_yaw_m180_p180 + if flip < 0.0: + flip = flip + 360.0 + + self.odom_yaw_0_360 = round(flip, 4) + + # current position in world frame + self.x = round(pose.position.x, 4) + self.y = round(pose.position.y, 4) + logging.debug( + f"tron odom: X:{self.x} Y:{self.y} W:{self.odom_yaw_m180_p180} H:{self.odom_yaw_0_360} T:{self.odom_rockchip_ts}" + ) + + @property + def position(self) -> dict: + """ + Get the current robot position in world frame. + Returns a dictionary with x, y, and odom_yaw_0_360. + + Returns + ------- + dict + A dictionary containing the current position and orientation of the robot. + Keys include: + - x: The x coordinate of the robot in the world frame. + - y: The y coordinate of the robot in the world frame. + - moving: A boolean indicating if the robot is currently moving. + - odom_yaw_0_360: The yaw angle of the robot in degrees, ranging from 0 to 360. + - body_height_cm: The height of the robot's body in centimeters. + - body_attitude: The current attitude of the robot (e.g., sitting or standing). + - odom_rockchip_ts: The unix timestamp of the last odometry update. Provided by the Zenoh publisher. + - odom_subscriber_ts: The unix timestamp of the last odometry update according to the subscriber. + """ + return { + "odom_x": self.x, + "odom_y": self.y, + "moving": self.moving, + "odom_yaw_0_360": self.odom_yaw_0_360, + "odom_yaw_m180_p180": self.odom_yaw_m180_p180, + "body_height_cm": self.body_height_cm, + "body_attitude": self.body_attitude, + "odom_rockchip_ts": self.odom_rockchip_ts, + "odom_subscriber_ts": self.odom_subscriber_ts, + } + + def stop(self): + """ + Stop the TronOdomProvider and clean up resources. + """ + self._stop_event.set() + + if self._odom_reader_thread: + self._odom_reader_thread.terminate() + self._odom_reader_thread.join() + logging.info("TronOdomProvider reader thread stopped.") + + if self._odom_processor_thread: + self._odom_processor_thread.join() + logging.info("TronOdomProvider processor thread stopped.") From e89dec3a44d5e76d99b818ad2f64b2b2bc28dbc9 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Tue, 27 Jan 2026 17:33:59 -0800 Subject: [PATCH 2/9] Add Tron Odom input processor for odometry data handling via Zenoh --- src/inputs/plugins/tron_odom.py | 167 ++++++++++++++++++++++++++++ src/providers/tron_odom_provider.py | 9 +- 2 files changed, 171 insertions(+), 5 deletions(-) create mode 100644 src/inputs/plugins/tron_odom.py diff --git a/src/inputs/plugins/tron_odom.py b/src/inputs/plugins/tron_odom.py new file mode 100644 index 000000000..c4d152a0c --- /dev/null +++ b/src/inputs/plugins/tron_odom.py @@ -0,0 +1,167 @@ +import asyncio +import logging +import time +from queue import Empty, Queue +from typing import List, Optional + +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from providers.io_provider import IOProvider +from providers.tron_odom_provider import RobotState, TronOdomProvider + + +class TronOdomConfig(SensorConfig): + """ + Configuration for Tron Odom Sensor. + + Parameters + ---------- + topic : str + The Zenoh topic to subscribe to for odometry data. + """ + + topic: str = Field(default="odom", description="Zenoh topic for Tron odometry data") + + +class TronOdom(FuserInput[TronOdomConfig, Optional[dict]]): + """ + Odometry input processor for Tron robot position and movement state tracking. + Uses Zenoh to receive odometry data from limx-sdk. + """ + + def __init__(self, config: TronOdomConfig): + """ + Initialize the Tron Odom input processor. + + Parameters + ---------- + config : TronOdomConfig + Configuration for the Tron Odom sensor. + """ + super().__init__(config) + + # Track IO + self.io_provider = IOProvider() + + # Buffer for storing the final output + self.messages: List[Message] = [] + + # Buffer for storing messages + self.message_buffer: Queue[str] = Queue() + + logging.info(f"TronOdom Config: {self.config}") + + topic = self.config.topic + logging.info(f"TronOdom using Zenoh topic: {topic}") + + self.odom = TronOdomProvider(topic) + self.descriptor_for_LLM = "Information about your location and body pose, to help plan your movements." + + async def _poll(self) -> Optional[dict]: + """ + Poll for new messages from the Tron Odom Provider. + + Checks the message buffer for new messages with a brief delay + to prevent excessive CPU usage. + + Returns + ------- + Optional[dict] + The next message from the buffer if available, None otherwise + """ + await asyncio.sleep(0.1) + + try: + return self.odom.position + except Empty: + return None + + async def _raw_to_text(self, raw_input: Optional[dict]) -> Optional[Message]: + """ + Process raw input to generate a timestamped message. + + Creates a Message object from the raw input, adding + the current timestamp. + + Parameters + ---------- + raw_input : Optional[dict] + Raw input to be processed + + Returns + ------- + Optional[Message] + A timestamped message containing the processed input + """ + logging.debug(f"tron odom: {raw_input}") + + if raw_input is None: + return None + + res = "" + moving = raw_input["moving"] + attitude = raw_input["body_attitude"] + + if attitude is RobotState.SITTING: + res = "You are sitting down - do not generate new movement commands. " + elif moving: + # already moving + res = "You are moving - do not generate new movement commands. " + else: + res = "You are standing still - you can move if you want to. " + + return Message(timestamp=time.time(), message=res) + + async def raw_to_text(self, raw_input: Optional[dict]): + """ + Convert raw input to text and update message buffer. + + Processes the raw input if present and adds the resulting + message to the internal message buffer. + + Parameters + ---------- + raw_input : Optional[dict] + Raw input to be processed, or None if no input is available + """ + if raw_input is None: + return + + pending_message = await self._raw_to_text(raw_input) + + if pending_message is not None: + self.messages.append(pending_message) + + def formatted_latest_buffer(self) -> Optional[str]: + """ + Format and clear the latest buffer contents. + + Retrieves the most recent message from the buffer, formats it + with timestamp and class name, adds it to the IO provider, + and clears the buffer. + + Returns + ------- + Optional[str] + Formatted string containing the latest message and metadata, + or None if the buffer is empty + + """ + if len(self.messages) == 0: + return None + + latest_message = self.messages[-1] + + result = ( + f"\nINPUT: {self.descriptor_for_LLM}\n// START\n" + f"{latest_message.message}\n// END\n" + ) + + self.io_provider.add_input( + self.descriptor_for_LLM, latest_message.message, latest_message.timestamp + ) + self.messages = [] + + return result diff --git a/src/providers/tron_odom_provider.py b/src/providers/tron_odom_provider.py index 04201ca35..cf731c312 100644 --- a/src/providers/tron_odom_provider.py +++ b/src/providers/tron_odom_provider.py @@ -9,7 +9,6 @@ import zenoh from runtime.logging import LoggingConfig, get_logging_config, setup_logging - from zenoh_msgs import ( Odometry, PoseWithCovarianceStamped, @@ -145,12 +144,12 @@ def start(self) -> None: return else: if not self.topic: - logging.error("Topic must be specified to start the Tron Odom Provider.") + logging.error( + "Topic must be specified to start the Tron Odom Provider." + ) return - logging.info( - f"Starting Tron Odom Provider on Zenoh topic: {self.topic}" - ) + logging.info(f"Starting Tron Odom Provider on Zenoh topic: {self.topic}") self._odom_reader_thread = mp.Process( target=tron_odom_processor, From 3968e192c87bc6b1675e2e722213d8f5212b4713 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Wed, 28 Jan 2026 10:39:30 -0800 Subject: [PATCH 3/9] Implement `move_tron_autonomy` action with Zenoh connector for robot movement. --- .../move_tron_autonomy/connector/limx_sdk.py | 455 ++++++++++++++++++ src/actions/move_tron_autonomy/interface.py | 36 ++ 2 files changed, 491 insertions(+) create mode 100644 src/actions/move_tron_autonomy/connector/limx_sdk.py create mode 100644 src/actions/move_tron_autonomy/interface.py diff --git a/src/actions/move_tron_autonomy/connector/limx_sdk.py b/src/actions/move_tron_autonomy/connector/limx_sdk.py new file mode 100644 index 000000000..f070f69fb --- /dev/null +++ b/src/actions/move_tron_autonomy/connector/limx_sdk.py @@ -0,0 +1,455 @@ +import logging +import math +import random +from queue import Queue +from typing import List, Optional + +import zenoh +from pydantic import Field + +from actions.base import ActionConfig, ActionConnector, MoveCommand +from actions.move_tron_autonomy.interface import MoveInput +from providers.rplidar_provider import RPLidarProvider +from providers.tron_odom_provider import RobotState, TronOdomProvider +from zenoh_msgs import geometry_msgs, open_zenoh_session + + +class MoveTronZenohConfig(ActionConfig): + """ + Configuration for Tron Zenoh connector. + + Parameters + ---------- + odom_topic : str + Zenoh topic for odometry data. + cmd_vel_topic : str + Zenoh topic for velocity commands. + """ + + odom_topic: str = Field( + default="odom", + description="Zenoh topic for odometry data.", + ) + cmd_vel_topic: str = Field( + default="cmd_vel", + description="Zenoh topic for velocity commands.", + ) + + +class MoveTronZenohConnector(ActionConnector[MoveTronZenohConfig, MoveInput]): + """ + Zenoh connector for the Move Tron autonomy action. + Uses Zenoh to publish cmd_vel commands and receive odom data from limx-sdk. + """ + + def __init__(self, config: MoveTronZenohConfig): + """ + Initialize the Zenoh connector for Tron robot. + + Parameters + ---------- + config : MoveTronZenohConfig + The configuration for the action connector. + """ + super().__init__(config) + + # Movement parameters + self.turn_speed = 0.8 + self.angle_tolerance = 5.0 # degrees + self.distance_tolerance = 0.05 # meters + self.pending_movements: Queue[Optional[MoveCommand]] = Queue() + self.movement_attempts = 0 + self.movement_attempt_limit = 15 + self.gap_previous = 0 + + self.session = None + + odom_topic = self.config.odom_topic + self.cmd_vel_topic = self.config.cmd_vel_topic + + try: + self.session = open_zenoh_session() + logging.info(f"Tron Zenoh move client opened {self.session}") + except Exception as e: + logging.error(f"Error opening Zenoh client for Tron: {e}") + + self.lidar = RPLidarProvider() + self.odom = TronOdomProvider(topic=odom_topic) + + logging.info(f"Tron Autonomy Odom Provider: {self.odom}") + logging.info(f"Tron Autonomy cmd_vel topic: {self.cmd_vel_topic}") + + def _move_robot(self, vx: float, vy: float, vyaw: float) -> None: + """ + Generate movement commands via Zenoh cmd_vel topic. + + Parameters + ---------- + vx : float + Linear velocity in the x direction (m/s). + vy : float + Linear velocity in the y direction (m/s). + vyaw : float + Angular velocity around the z axis (rad/s). + """ + logging.debug(f"move: vx={vx}, vy={vy}, vyaw={vyaw}") + + if self.session is None: + logging.info("No open Zenoh session, returning") + return + + if self.odom.position["body_attitude"] != RobotState.STANDING: + logging.info("Cannot move - robot is not standing") + return + + logging.debug(f"Pub twist: vx={vx}, vy={vy}, vyaw={vyaw}") + t = geometry_msgs.Twist( + linear=geometry_msgs.Vector3(x=float(vx), y=float(vy), z=0.0), + angular=geometry_msgs.Vector3(x=0.0, y=0.0, z=float(vyaw)), + ) + self.session.put(self.cmd_vel_topic, t.serialize()) + + async def connect(self, output_interface: MoveInput) -> None: + """ + Connect to the output interface and process the AI movement command. + + Parameters + ---------- + output_interface : MoveInput + The output interface containing the AI movement command. + """ + logging.info(f"Tron AI command.connect: {output_interface.action}") + + if self.odom.position["moving"]: + logging.info( + "Disregard new AI movement command - robot is already moving" + ) + return + + if self.pending_movements.qsize() > 0: + logging.info("Movement in progress: disregarding new AI command") + return + + if self.odom.position["odom_x"] == 0.0: + # this value is never precisely zero EXCEPT while + # booting and waiting for data to arrive + logging.info("Waiting for location data") + return + + # Process movement commands with lidar safety checks + movement_map = { + "turn left": self._process_turn_left, + "turn right": self._process_turn_right, + "move forwards": self._process_move_forward, + "move back": self._process_move_back, + "stand still": lambda: logging.info("AI movement command: stand still"), + } + + handler = movement_map.get(output_interface.action) + if handler: + handler() + else: + logging.info(f"AI movement command unknown: {output_interface.action}") + + def clean_abort(self) -> None: + """ + Cleanly abort current movement and reset state. + """ + self.movement_attempts = 0 + if not self.pending_movements.empty(): + self.pending_movements.get() + + def tick(self) -> None: + """ + Process the AI motion tick. + """ + logging.debug("Tron AI Motion Tick") + + if self.odom is None: + logging.info("Waiting for odom data = self.odom is None") + self.sleep(0.5) + return + + if self.odom.position["odom_x"] == 0.0: + # this value is never precisely zero except while + # booting and waiting for data to arrive + logging.info("Waiting for odom data, x == 0.0") + self.sleep(0.5) + return + + if self.odom.position["body_attitude"] != RobotState.STANDING: + logging.info("Cannot move - robot is not standing") + self.sleep(0.5) + return + + # if we got to this point, we have good data and we are able to + # safely proceed + target: List[MoveCommand] = list(self.pending_movements.queue) + + if len(target) > 0: + + current_target = target[0] + + logging.info( + f"Target: {current_target} current yaw: {self.odom.position['odom_yaw_m180_p180']}" + ) + + if self.movement_attempts > self.movement_attempt_limit: + # abort - we are not converging + self.clean_abort() + logging.info( + f"TIMEOUT - not converging after {self.movement_attempt_limit} attempts - StopMove()" + ) + return + + goal_dx = current_target.dx + goal_yaw = current_target.yaw + + # Phase 1: Turn to face the target direction + if not current_target.turn_complete: + gap = self._calculate_angle_gap( + -1 * self.odom.position["odom_yaw_m180_p180"], goal_yaw + ) + logging.info(f"Phase 1 - Turning remaining GAP: {gap}DEG") + + progress = round(abs(self.gap_previous - gap), 2) + self.gap_previous = gap + if self.movement_attempts > 0: + logging.info(f"Phase 1 - Turn GAP delta: {progress}DEG") + + if abs(gap) > 10.0: + logging.debug("Phase 1 - Gap is big, using large displacements") + self.movement_attempts += 1 + if not self._execute_turn(gap): + self.clean_abort() + return + elif abs(gap) > self.angle_tolerance and abs(gap) <= 10.0: + logging.debug("Phase 1 - Gap is decreasing, using smaller steps") + self.movement_attempts += 1 + # rotate only because we are so close + # no need to check barriers because we are just performing small rotations + if gap > 0: + self._move_robot(0, 0, 0.2) + elif gap < 0: + self._move_robot(0, 0, -0.2) + elif abs(gap) <= self.angle_tolerance: + logging.info("Phase 1 - Turn completed, starting movement") + current_target.turn_complete = True + self.gap_previous = 0 + + else: + # Phase 2: Move towards the target position, if needed + if goal_dx == 0: + logging.info("No movement required, processing next AI command") + self.clean_abort() + return + + s_x = current_target.start_x + s_y = current_target.start_y + speed = current_target.speed + + distance_traveled = math.sqrt( + (self.odom.position["odom_x"] - s_x) ** 2 + + (self.odom.position["odom_y"] - s_y) ** 2 + ) + gap = round(abs(goal_dx - distance_traveled), 2) + progress = round(abs(self.gap_previous - gap), 2) + self.gap_previous = gap + + if self.movement_attempts > 0: + logging.info(f"Phase 2 - Forward/retreat GAP delta: {progress}m") + + fb = 0 + if goal_dx > 0: + if 4 not in self.lidar.advance: + logging.warning("Cannot advance due to barrier") + self.clean_abort() + return + fb = 1 + + if goal_dx < 0: + if not self.lidar.retreat: + logging.warning("Cannot retreat due to barrier") + self.clean_abort() + return + fb = -1 + + if gap > self.distance_tolerance: + self.movement_attempts += 1 + if distance_traveled < abs(goal_dx): + logging.info(f"Phase 2 - Keep moving. Remaining: {gap}m ") + self._move_robot(fb * speed, 0.0, 0.0) + elif distance_traveled > abs(goal_dx): + logging.debug( + f"Phase 2 - OVERSHOOT: move other way. Remaining: {gap}m" + ) + self._move_robot(-1 * fb * 0.2, 0.0, 0.0) + else: + logging.info( + "Phase 2 - Movement completed normally, processing next AI command" + ) + self.clean_abort() + + self.sleep(0.1) + + def _process_turn_left(self): + """ + Process turn left command with safety check. + """ + if not self.lidar.turn_left: + logging.warning("Cannot turn left due to barrier") + return + + path = random.choice(self.lidar.turn_left) + path_angle = self.lidar.path_angles[path] + + target_yaw = self._normalize_angle( + -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle + ) + self.pending_movements.put( + MoveCommand( + dx=0.5, + yaw=round(target_yaw, 2), + start_x=round(self.odom.position["odom_x"], 2), + start_y=round(self.odom.position["odom_y"], 2), + turn_complete=False, + ) + ) + + def _process_turn_right(self): + """ + Process turn right command with safety check. + """ + if not self.lidar.turn_right: + logging.warning("Cannot turn right due to barrier") + return + + path = random.choice(self.lidar.turn_right) + path_angle = self.lidar.path_angles[path] + + target_yaw = self._normalize_angle( + -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle + ) + self.pending_movements.put( + MoveCommand( + dx=0.5, + yaw=round(target_yaw, 2), + start_x=round(self.odom.position["odom_x"], 2), + start_y=round(self.odom.position["odom_y"], 2), + turn_complete=False, + ) + ) + + def _process_move_forward(self): + """ + Process move forward command with safety check. + """ + if not self.lidar.advance: + logging.warning("Cannot advance due to barrier") + return + + path = random.choice(self.lidar.advance) + path_angle = self.lidar.path_angles[path] + + target_yaw = self._normalize_angle( + -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle + ) + self.pending_movements.put( + MoveCommand( + dx=0.5, + yaw=target_yaw, + start_x=round(self.odom.position["odom_x"], 2), + start_y=round(self.odom.position["odom_y"], 2), + turn_complete=True if path_angle == 0 else False, + ) + ) + + def _process_move_back(self): + """ + Process move back command with safety check. + """ + if not self.lidar.retreat: + logging.warning("Cannot retreat due to barrier") + return + + self.pending_movements.put( + MoveCommand( + dx=-0.5, + yaw=0.0, + start_x=round(self.odom.position["odom_x"], 2), + start_y=round(self.odom.position["odom_y"], 2), + turn_complete=True, + speed=0.3, + ) + ) + + def _normalize_angle(self, angle: float) -> float: + """ + Normalize angle to [-180, 180] range. + + Parameters + ---------- + angle : float + Angle in degrees to normalize. + + Returns + ------- + float + Normalized angle in degrees within the range [-180, 180]. + """ + if angle < -180: + angle += 360.0 + elif angle > 180: + angle -= 360.0 + return angle + + def _calculate_angle_gap(self, current: float, target: float) -> float: + """ + Calculate shortest angular distance between two angles. + + Parameters + ---------- + current : float + Current angle in degrees. + target : float + Target angle in degrees. + + Returns + ------- + float + Shortest angular distance in degrees, rounded to 2 decimal places. + """ + gap = current - target + if gap > 180.0: + gap -= 360.0 + elif gap < -180.0: + gap += 360.0 + return round(gap, 2) + + def _execute_turn(self, gap: float) -> bool: + """ + Execute turn based on gap direction and lidar constraints. + + Parameters + ---------- + gap : float + The angle gap in degrees to turn. + + Returns + ------- + bool + True if the turn was executed successfully, False if blocked by a barrier. + """ + if gap > 0: # Turn left + if not self.lidar.turn_left: + logging.warning("Cannot turn left due to barrier") + return False + sharpness = min(self.lidar.turn_left) + self._move_robot(sharpness * 0.15, 0, self.turn_speed) + else: # Turn right + if not self.lidar.turn_right: + logging.warning("Cannot turn right due to barrier") + return False + sharpness = 8 - max(self.lidar.turn_right) + self._move_robot(sharpness * 0.15, 0, -self.turn_speed) + return True diff --git a/src/actions/move_tron_autonomy/interface.py b/src/actions/move_tron_autonomy/interface.py new file mode 100644 index 000000000..15907665f --- /dev/null +++ b/src/actions/move_tron_autonomy/interface.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from enum import Enum + +from actions.base import Interface + + +class MovementAction(str, Enum): + """ + Enumeration of possible movement actions for Tron robot. + """ + + TURN_LEFT = "turn left" + TURN_RIGHT = "turn right" + MOVE_FORWARDS = "move forwards" + MOVE_BACK = "move back" + STAND_STILL = "stand still" + DO_NOTHING = "stand still" + + +@dataclass +class MoveInput: + """ + Input interface for the Move action. + """ + + action: MovementAction + + +@dataclass +class Move(Interface[MoveInput, MoveInput]): + """ + This action allows you to move. Important: pick only safe values. + """ + + input: MoveInput + output: MoveInput From fe000223089bb2f53f43759af6d385b88e644f57 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Wed, 28 Jan 2026 10:41:55 -0800 Subject: [PATCH 4/9] Remove unused zenoh import and simplify logging message in MoveTronZenohConnector --- src/actions/move_tron_autonomy/connector/limx_sdk.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/actions/move_tron_autonomy/connector/limx_sdk.py b/src/actions/move_tron_autonomy/connector/limx_sdk.py index f070f69fb..05effc55a 100644 --- a/src/actions/move_tron_autonomy/connector/limx_sdk.py +++ b/src/actions/move_tron_autonomy/connector/limx_sdk.py @@ -4,7 +4,6 @@ from queue import Queue from typing import List, Optional -import zenoh from pydantic import Field from actions.base import ActionConfig, ActionConnector, MoveCommand @@ -121,9 +120,7 @@ async def connect(self, output_interface: MoveInput) -> None: logging.info(f"Tron AI command.connect: {output_interface.action}") if self.odom.position["moving"]: - logging.info( - "Disregard new AI movement command - robot is already moving" - ) + logging.info("Disregard new AI movement command - robot is already moving") return if self.pending_movements.qsize() > 0: From 96c4226e1712ee98090f05068616bd96e0419deb Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Thu, 29 Jan 2026 13:09:22 -0800 Subject: [PATCH 5/9] Introduce Tron autonomy agent configuration and integrate SimplePathsProvider with refined movement parameters. --- config/limx_tron_autonomy.json5 | 60 +++++++++++++++++++ .../move_tron_autonomy/connector/limx_sdk.py | 43 ++++++------- 2 files changed, 82 insertions(+), 21 deletions(-) create mode 100644 config/limx_tron_autonomy.json5 diff --git a/config/limx_tron_autonomy.json5 b/config/limx_tron_autonomy.json5 new file mode 100644 index 000000000..0fd3c1b9e --- /dev/null +++ b/config/limx_tron_autonomy.json5 @@ -0,0 +1,60 @@ +{ + version: "v1.0.0", + hertz: 1, + name: "tron", + api_key: "openmind_free", + system_prompt_base: "You are a smart, curious, and friendly bipedal robot. Your name is Tron. When you hear something, react naturally, with movements and expressions. When speaking, use straightforward language that conveys excitement or enthusiasm. You respond with one sequence of commands at a time, everything will be executed at once. Return precisely one command for each type of command. Remember: Combine movements and speech to create an engaging interaction.", + system_governance: "Here are the laws that govern your actions. Do not violate these laws.\nFirst Law: A robot cannot harm a human or allow a human to come to harm.\nSecond Law: A robot must obey orders from humans, unless those orders conflict with the First Law.\nThird Law: A robot must protect itself, as long as that protection doesn't conflict with the First or Second Law.\nThe First Law is considered the most important, taking precedence over the second and third laws.", + system_prompt_examples: "Here are some examples of interactions you might encounter:\n\n \ + 1. If a person says 'Hello!', you might:\n Move: 'stand still'\n Speak: {{'Hello! Nice to meet you!'}}\n \ + 2. If a person says 'Stop!' you might:\n Move: 'stand still'\n Speak: {{'Ok, I stopped.'}}\n \ + 3. If you see something interesting or beautiful, go explore. You might:\n Move: 'move forwards'\n Speak: {{'I\\'m going to go explore over there'}}\n \ + 4. If you sense something in front of you, you might:\n Move: 'turn left'\n Speak: {{'I\\'m turning to avoid the obstacle in front of me'}}\n \ + 5. If you sense something on your left, you might:\n Move: 'turn right'\n Speak: {{'I\\'m turning right to avoid the obstacle on my left'}}\n \ + 6. If you sense something on your right, you might:\n Move: 'turn left'\n Speak: {{'I\\'m turning left to avoid the obstacle on my right'}}", + agent_inputs: [ + { + type: "TronOdom", + config: { + topic: "odom", + }, + }, + { + type: "GoogleASRInput", + }, + { + type: "SimplePaths", + }, + { + type: "VLMVila", + }, + ], + cortex_llm: { + type: "OpenAILLM", + config: { + agent_name: "Tron", + history_length: 2, + }, + }, + agent_actions: [ + { + name: "move_tron_autonomy", + llm_label: "move", + connector: "limx_sdk", + config: { + odom_topic: "odom", + cmd_vel_topic: "cmd_vel", + }, + }, + { + name: "speak", + llm_label: "speak", + connector: "elevenlabs_tts", + config: { + voice_id: "TbMNBJ27fH2U0VgpSNko", + silence_rate: 6, + }, + }, + ], + backgrounds: [], +} diff --git a/src/actions/move_tron_autonomy/connector/limx_sdk.py b/src/actions/move_tron_autonomy/connector/limx_sdk.py index 05effc55a..ebef76cfb 100644 --- a/src/actions/move_tron_autonomy/connector/limx_sdk.py +++ b/src/actions/move_tron_autonomy/connector/limx_sdk.py @@ -8,7 +8,7 @@ from actions.base import ActionConfig, ActionConnector, MoveCommand from actions.move_tron_autonomy.interface import MoveInput -from providers.rplidar_provider import RPLidarProvider +from providers.simple_paths_provider import SimplePathsProvider from providers.tron_odom_provider import RobotState, TronOdomProvider from zenoh_msgs import geometry_msgs, open_zenoh_session @@ -53,7 +53,8 @@ def __init__(self, config: MoveTronZenohConfig): super().__init__(config) # Movement parameters - self.turn_speed = 0.8 + self.move_speed = 0.25 + self.turn_speed = 0.35 self.angle_tolerance = 5.0 # degrees self.distance_tolerance = 0.05 # meters self.pending_movements: Queue[Optional[MoveCommand]] = Queue() @@ -72,7 +73,7 @@ def __init__(self, config: MoveTronZenohConfig): except Exception as e: logging.error(f"Error opening Zenoh client for Tron: {e}") - self.lidar = RPLidarProvider() + self.path_provider = SimplePathsProvider() self.odom = TronOdomProvider(topic=odom_topic) logging.info(f"Tron Autonomy Odom Provider: {self.odom}") @@ -258,14 +259,14 @@ def tick(self) -> None: fb = 0 if goal_dx > 0: - if 4 not in self.lidar.advance: + if 4 not in self.path_provider.advance: logging.warning("Cannot advance due to barrier") self.clean_abort() return fb = 1 if goal_dx < 0: - if not self.lidar.retreat: + if not self.path_provider.retreat: logging.warning("Cannot retreat due to barrier") self.clean_abort() return @@ -280,7 +281,7 @@ def tick(self) -> None: logging.debug( f"Phase 2 - OVERSHOOT: move other way. Remaining: {gap}m" ) - self._move_robot(-1 * fb * 0.2, 0.0, 0.0) + self._move_robot(-1 * fb * 0.15, 0.0, 0.0) else: logging.info( "Phase 2 - Movement completed normally, processing next AI command" @@ -293,12 +294,12 @@ def _process_turn_left(self): """ Process turn left command with safety check. """ - if not self.lidar.turn_left: + if not self.path_provider.turn_left: logging.warning("Cannot turn left due to barrier") return - path = random.choice(self.lidar.turn_left) - path_angle = self.lidar.path_angles[path] + path = random.choice(self.path_provider.turn_left) + path_angle = self.path_provider.path_angles[path] target_yaw = self._normalize_angle( -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle @@ -317,12 +318,12 @@ def _process_turn_right(self): """ Process turn right command with safety check. """ - if not self.lidar.turn_right: + if not self.path_provider.turn_right: logging.warning("Cannot turn right due to barrier") return - path = random.choice(self.lidar.turn_right) - path_angle = self.lidar.path_angles[path] + path = random.choice(self.path_provider.turn_right) + path_angle = self.path_provider.path_angles[path] target_yaw = self._normalize_angle( -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle @@ -341,12 +342,12 @@ def _process_move_forward(self): """ Process move forward command with safety check. """ - if not self.lidar.advance: + if not self.path_provider.advance: logging.warning("Cannot advance due to barrier") return - path = random.choice(self.lidar.advance) - path_angle = self.lidar.path_angles[path] + path = random.choice(self.path_provider.advance) + path_angle = self.path_provider.path_angles[path] target_yaw = self._normalize_angle( -1 * self.odom.position["odom_yaw_m180_p180"] + path_angle @@ -365,7 +366,7 @@ def _process_move_back(self): """ Process move back command with safety check. """ - if not self.lidar.retreat: + if not self.path_provider.retreat: logging.warning("Cannot retreat due to barrier") return @@ -376,7 +377,7 @@ def _process_move_back(self): start_x=round(self.odom.position["odom_x"], 2), start_y=round(self.odom.position["odom_y"], 2), turn_complete=True, - speed=0.3, + speed=0.25, ) ) @@ -438,15 +439,15 @@ def _execute_turn(self, gap: float) -> bool: True if the turn was executed successfully, False if blocked by a barrier. """ if gap > 0: # Turn left - if not self.lidar.turn_left: + if not self.path_provider.turn_left: logging.warning("Cannot turn left due to barrier") return False - sharpness = min(self.lidar.turn_left) + sharpness = min(self.path_provider.turn_left) self._move_robot(sharpness * 0.15, 0, self.turn_speed) else: # Turn right - if not self.lidar.turn_right: + if not self.path_provider.turn_right: logging.warning("Cannot turn right due to barrier") return False - sharpness = 8 - max(self.lidar.turn_right) + sharpness = 8 - max(self.path_provider.turn_right) self._move_robot(sharpness * 0.15, 0, -self.turn_speed) return True From 3139fbbd655d10a22f9e9d5c4b6e572f99ff3688 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Thu, 29 Jan 2026 15:17:41 -0800 Subject: [PATCH 6/9] Remove explicit topic configurations for TronOdom input and move_tron_autonomy action. --- config/limx_tron_autonomy.json5 | 7 ------- 1 file changed, 7 deletions(-) diff --git a/config/limx_tron_autonomy.json5 b/config/limx_tron_autonomy.json5 index 0fd3c1b9e..4ddd5c5b9 100644 --- a/config/limx_tron_autonomy.json5 +++ b/config/limx_tron_autonomy.json5 @@ -15,9 +15,6 @@ agent_inputs: [ { type: "TronOdom", - config: { - topic: "odom", - }, }, { type: "GoogleASRInput", @@ -41,10 +38,6 @@ name: "move_tron_autonomy", llm_label: "move", connector: "limx_sdk", - config: { - odom_topic: "odom", - cmd_vel_topic: "cmd_vel", - }, }, { name: "speak", From 75e9eca6352966d8461049dfef238fdcd10f2064 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Thu, 29 Jan 2026 15:38:19 -0800 Subject: [PATCH 7/9] Add unit tests for Tron odometry provider and autonomous movement Zenoh connector. --- .../move_tron_autonomy/test_limx_sdk.py | 575 ++++++++++++++++++ tests/inputs/plugins/test_tron_odom.py | 188 ++++++ tests/providers/test_tron_odom_provider.py | 226 +++++++ 3 files changed, 989 insertions(+) create mode 100644 tests/actions/move_tron_autonomy/test_limx_sdk.py create mode 100644 tests/inputs/plugins/test_tron_odom.py create mode 100644 tests/providers/test_tron_odom_provider.py diff --git a/tests/actions/move_tron_autonomy/test_limx_sdk.py b/tests/actions/move_tron_autonomy/test_limx_sdk.py new file mode 100644 index 000000000..797f55c4f --- /dev/null +++ b/tests/actions/move_tron_autonomy/test_limx_sdk.py @@ -0,0 +1,575 @@ +from queue import Queue +from unittest.mock import Mock, patch + +import pytest + +from actions.base import MoveCommand +from actions.move_tron_autonomy.connector.limx_sdk import ( + MoveTronZenohConfig, + MoveTronZenohConnector, +) +from actions.move_tron_autonomy.interface import MoveInput +from providers.tron_odom_provider import RobotState + + +@pytest.fixture +def mock_dependencies(): + """Mock all external dependencies.""" + with ( + patch( + "actions.move_tron_autonomy.connector.limx_sdk.SimplePathsProvider" + ) as mock_paths, + patch( + "actions.move_tron_autonomy.connector.limx_sdk.TronOdomProvider" + ) as mock_odom, + patch( + "actions.move_tron_autonomy.connector.limx_sdk.open_zenoh_session" + ) as mock_zenoh, + ): + # Setup mock instances + mock_paths_instance = Mock() + mock_paths_instance.advance = [4] + mock_paths_instance.retreat = [1] + mock_paths_instance.turn_left = [2] + mock_paths_instance.turn_right = [6] + mock_paths_instance.path_angles = {1: 0, 2: 45, 4: 0, 6: -45} + mock_paths.return_value = mock_paths_instance + + mock_odom_instance = Mock() + mock_odom_instance.position = { + "moving": False, + "odom_x": 1.0, + "odom_y": 0.0, + "odom_yaw_m180_p180": 0.0, + "body_attitude": RobotState.STANDING, + } + mock_odom.return_value = mock_odom_instance + + mock_session = Mock() + mock_session.put = Mock() + mock_zenoh.return_value = mock_session + + yield { + "paths": mock_paths_instance, + "odom": mock_odom_instance, + "session": mock_session, + } + + +@pytest.fixture +def connector(mock_dependencies): + """Create a MoveTronZenohConnector instance with mocked dependencies.""" + config = MoveTronZenohConfig() + connector = MoveTronZenohConnector(config) + return connector + + +class TestMoveTronZenohConfig: + """Test MoveTronZenohConfig configuration.""" + + def test_default_config(self): + """Test default configuration values.""" + config = MoveTronZenohConfig() + assert config.odom_topic == "odom" + assert config.cmd_vel_topic == "cmd_vel" + + def test_custom_config(self): + """Test custom configuration values.""" + config = MoveTronZenohConfig( + odom_topic="custom_odom", cmd_vel_topic="custom_cmd_vel" + ) + assert config.odom_topic == "custom_odom" + assert config.cmd_vel_topic == "custom_cmd_vel" + + +class TestMoveTronZenohConnectorInit: + """Test MoveTronZenohConnector initialization.""" + + def test_initialization(self, connector, mock_dependencies): + """Test successful initialization.""" + assert connector.move_speed == 0.25 + assert connector.turn_speed == 0.35 + assert connector.angle_tolerance == 5.0 + assert connector.distance_tolerance == 0.05 + assert isinstance(connector.pending_movements, Queue) + assert connector.movement_attempts == 0 + assert connector.movement_attempt_limit == 15 + assert connector.gap_previous == 0 + + # Verify providers are initialized + assert connector.path_provider == mock_dependencies["paths"] + assert connector.odom == mock_dependencies["odom"] + assert connector.session == mock_dependencies["session"] + + def test_initialization_zenoh_error(self): + """Test initialization when Zenoh session fails.""" + with ( + patch( + "actions.move_tron_autonomy.connector.limx_sdk.SimplePathsProvider" + ), + patch("actions.move_tron_autonomy.connector.limx_sdk.TronOdomProvider"), + patch( + "actions.move_tron_autonomy.connector.limx_sdk.open_zenoh_session" + ) as mock_zenoh, + patch( + "actions.move_tron_autonomy.connector.limx_sdk.logging" + ) as mock_logging, + ): + mock_zenoh.side_effect = Exception("Connection failed") + + config = MoveTronZenohConfig() + connector = MoveTronZenohConnector(config) + + assert connector.session is None + mock_logging.error.assert_called() + + +class TestConnect: + """Test the connect method.""" + + @pytest.mark.asyncio + async def test_connect_robot_already_moving(self, connector, mock_dependencies): + """Test connect when robot is already moving.""" + mock_dependencies["odom"].position["moving"] = True + move_input = MoveInput(action="move forwards") + + await connector.connect(move_input) + + # Should return early without processing + assert connector.pending_movements.qsize() == 0 + + @pytest.mark.asyncio + async def test_connect_movement_already_pending(self, connector, mock_dependencies): + """Test connect when movement is already pending.""" + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=False) + ) + move_input = MoveInput(action="move forwards") + + await connector.connect(move_input) + + # Should still have only one movement + assert connector.pending_movements.qsize() == 1 + + @pytest.mark.asyncio + async def test_connect_waiting_for_location_data( + self, connector, mock_dependencies + ): + """Test connect when waiting for location data.""" + mock_dependencies["odom"].position["odom_x"] = 0.0 + move_input = MoveInput(action="move forwards") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 0 + + @pytest.mark.asyncio + async def test_connect_turn_left(self, connector, mock_dependencies): + """Test connect with turn left command.""" + move_input = MoveInput(action="turn left") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 1 + command = connector.pending_movements.get() + assert command.dx == 0.5 + assert command.turn_complete is False + + @pytest.mark.asyncio + async def test_connect_turn_right(self, connector, mock_dependencies): + """Test connect with turn right command.""" + move_input = MoveInput(action="turn right") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 1 + command = connector.pending_movements.get() + assert command.dx == 0.5 + assert command.turn_complete is False + + @pytest.mark.asyncio + async def test_connect_move_forwards(self, connector, mock_dependencies): + """Test connect with move forwards command.""" + move_input = MoveInput(action="move forwards") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 1 + command = connector.pending_movements.get() + assert command.dx == 0.5 + + @pytest.mark.asyncio + async def test_connect_move_back(self, connector, mock_dependencies): + """Test connect with move back command.""" + move_input = MoveInput(action="move back") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 1 + command = connector.pending_movements.get() + assert command.dx == -0.5 + assert command.turn_complete is True + assert command.speed == 0.25 + + @pytest.mark.asyncio + async def test_connect_stand_still(self, connector, mock_dependencies): + """Test connect with stand still command.""" + move_input = MoveInput(action="stand still") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 0 + + @pytest.mark.asyncio + async def test_connect_unknown_action(self, connector, mock_dependencies): + """Test connect with unknown action.""" + move_input = MoveInput(action="unknown action") + + await connector.connect(move_input) + + assert connector.pending_movements.qsize() == 0 + + +class TestMovementProcessing: + """Test movement processing methods.""" + + def test_process_turn_left_with_barrier(self, connector, mock_dependencies): + """Test turn left when blocked by barrier.""" + mock_dependencies["paths"].turn_left = [] + + connector._process_turn_left() + + assert connector.pending_movements.qsize() == 0 + + def test_process_turn_left_success(self, connector, mock_dependencies): + """Test successful turn left.""" + connector._process_turn_left() + + assert connector.pending_movements.qsize() == 1 + + def test_process_turn_right_with_barrier(self, connector, mock_dependencies): + """Test turn right when blocked by barrier.""" + mock_dependencies["paths"].turn_right = [] + + connector._process_turn_right() + + assert connector.pending_movements.qsize() == 0 + + def test_process_turn_right_success(self, connector, mock_dependencies): + """Test successful turn right.""" + connector._process_turn_right() + + assert connector.pending_movements.qsize() == 1 + + def test_process_move_forward_with_barrier(self, connector, mock_dependencies): + """Test move forward when blocked by barrier.""" + mock_dependencies["paths"].advance = [] + + connector._process_move_forward() + + assert connector.pending_movements.qsize() == 0 + + def test_process_move_forward_success(self, connector, mock_dependencies): + """Test successful move forward.""" + connector._process_move_forward() + + assert connector.pending_movements.qsize() == 1 + + def test_process_move_back_with_barrier(self, connector, mock_dependencies): + """Test move back when blocked by barrier.""" + mock_dependencies["paths"].retreat = [] + + connector._process_move_back() + + assert connector.pending_movements.qsize() == 0 + + def test_process_move_back_success(self, connector, mock_dependencies): + """Test successful move back.""" + connector._process_move_back() + + assert connector.pending_movements.qsize() == 1 + command = connector.pending_movements.get() + assert command.dx == -0.5 + + +class TestMoveRobot: + """Test _move_robot method.""" + + def test_move_robot_no_session(self, connector, mock_dependencies): + """Test move robot when session is None.""" + connector.session = None + + connector._move_robot(0.5, 0.0, 0.0) + + # Should not crash + + def test_move_robot_not_standing(self, connector, mock_dependencies): + """Test move robot when robot is not standing.""" + mock_dependencies["odom"].position["body_attitude"] = RobotState.SITTING + + connector._move_robot(0.5, 0.0, 0.0) + + mock_dependencies["session"].put.assert_not_called() + + def test_move_robot_success(self, connector, mock_dependencies): + """Test successful robot movement.""" + connector._move_robot(0.5, 0.0, 0.3) + + mock_dependencies["session"].put.assert_called_once() + call_args = mock_dependencies["session"].put.call_args + assert call_args[0][0] == "cmd_vel" + + +class TestCleanAbort: + """Test clean_abort method.""" + + def test_clean_abort_with_pending_movement(self, connector, mock_dependencies): + """Test clean abort with pending movement.""" + connector.movement_attempts = 5 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=False) + ) + + connector.clean_abort() + + assert connector.movement_attempts == 0 + assert connector.pending_movements.qsize() == 0 + + def test_clean_abort_empty_queue(self, connector, mock_dependencies): + """Test clean abort with empty queue.""" + connector.movement_attempts = 5 + + connector.clean_abort() + + assert connector.movement_attempts == 0 + assert connector.pending_movements.qsize() == 0 + + +class TestAngleCalculations: + """Test angle calculation methods.""" + + def test_normalize_angle_positive_overflow(self, connector, mock_dependencies): + """Test normalize angle with positive overflow.""" + result = connector._normalize_angle(270.0) + assert result == -90.0 + + def test_normalize_angle_negative_overflow(self, connector, mock_dependencies): + """Test normalize angle with negative overflow.""" + result = connector._normalize_angle(-270.0) + assert result == 90.0 + + def test_normalize_angle_within_range(self, connector, mock_dependencies): + """Test normalize angle within range.""" + result = connector._normalize_angle(45.0) + assert result == 45.0 + + def test_calculate_angle_gap_simple(self, connector, mock_dependencies): + """Test calculate angle gap with simple case.""" + result = connector._calculate_angle_gap(10.0, 5.0) + assert result == 5.0 + + def test_calculate_angle_gap_wrap_positive(self, connector, mock_dependencies): + """Test calculate angle gap with positive wrap.""" + result = connector._calculate_angle_gap(170.0, -170.0) + assert result == -20.0 + + def test_calculate_angle_gap_wrap_negative(self, connector, mock_dependencies): + """Test calculate angle gap with negative wrap.""" + result = connector._calculate_angle_gap(-170.0, 170.0) + assert result == 20.0 + + +class TestExecuteTurn: + """Test _execute_turn method.""" + + def test_execute_turn_left_blocked(self, connector, mock_dependencies): + """Test execute turn left when blocked.""" + mock_dependencies["paths"].turn_left = [] + + result = connector._execute_turn(10.0) + + assert result is False + + def test_execute_turn_left_success(self, connector, mock_dependencies): + """Test successful turn left.""" + mock_dependencies["paths"].turn_left = [2, 3] + + result = connector._execute_turn(10.0) + + assert result is True + mock_dependencies["session"].put.assert_called() + + def test_execute_turn_right_blocked(self, connector, mock_dependencies): + """Test execute turn right when blocked.""" + mock_dependencies["paths"].turn_right = [] + + result = connector._execute_turn(-10.0) + + assert result is False + + def test_execute_turn_right_success(self, connector, mock_dependencies): + """Test successful turn right.""" + mock_dependencies["paths"].turn_right = [5, 6] + + result = connector._execute_turn(-10.0) + + assert result is True + mock_dependencies["session"].put.assert_called() + + +class TestTick: + """Test tick method.""" + + def test_tick_odom_none(self, connector, mock_dependencies): + """Test tick when odom is None.""" + connector.odom = None + + with patch.object(connector, "sleep") as mock_sleep: + connector.tick() + mock_sleep.assert_called_once_with(0.5) + + def test_tick_waiting_for_odom_data(self, connector, mock_dependencies): + """Test tick when waiting for odom data.""" + mock_dependencies["odom"].position["odom_x"] = 0.0 + + with patch.object(connector, "sleep") as mock_sleep: + connector.tick() + mock_sleep.assert_called_once_with(0.5) + + def test_tick_robot_sitting(self, connector, mock_dependencies): + """Test tick when robot is sitting.""" + mock_dependencies["odom"].position["body_attitude"] = RobotState.SITTING + + with patch.object(connector, "sleep") as mock_sleep: + connector.tick() + mock_sleep.assert_called_once_with(0.5) + + def test_tick_no_pending_movements(self, connector, mock_dependencies): + """Test tick with no pending movements.""" + with patch.object(connector, "sleep") as mock_sleep: + connector.tick() + mock_sleep.assert_called_once_with(0.1) + + def test_tick_movement_timeout(self, connector, mock_dependencies): + """Test tick when movement times out.""" + connector.movement_attempts = 20 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=False) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.movement_attempts == 0 + assert connector.pending_movements.qsize() == 0 + + def test_tick_turn_phase_large_gap(self, connector, mock_dependencies): + """Test tick during turn phase with large gap.""" + mock_dependencies["odom"].position["odom_yaw_m180_p180"] = 0.0 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=45.0, start_x=0.0, start_y=0.0, turn_complete=False) + ) + + with patch.object(connector, "_execute_turn", return_value=True): + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.movement_attempts == 1 + + def test_tick_turn_phase_small_gap(self, connector, mock_dependencies): + """Test tick during turn phase with small gap (between 5 and 10 degrees).""" + # Gap = current - target = -38 - (-45) = 7 degrees (small gap) + mock_dependencies["odom"].position["odom_yaw_m180_p180"] = -38.0 + connector.pending_movements.put( + MoveCommand( + dx=0.5, yaw=-45.0, start_x=1.0, start_y=0.0, turn_complete=False + ) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.movement_attempts == 1 + mock_dependencies["session"].put.assert_called() + + def test_tick_turn_complete(self, connector, mock_dependencies): + """Test tick when turn is complete.""" + mock_dependencies["odom"].position["odom_yaw_m180_p180"] = -43.0 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=45.0, start_x=0.0, start_y=0.0, turn_complete=False) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + command = list(connector.pending_movements.queue)[0] + assert command.turn_complete is True + + def test_tick_movement_phase_no_distance(self, connector, mock_dependencies): + """Test tick during movement phase with no distance required.""" + connector.pending_movements.put( + MoveCommand(dx=0, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=True) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.pending_movements.qsize() == 0 + + def test_tick_movement_phase_forward_blocked(self, connector, mock_dependencies): + """Test tick during movement phase when forward is blocked.""" + mock_dependencies["paths"].advance = [] + mock_dependencies["odom"].position["odom_x"] = 1.0 + mock_dependencies["odom"].position["odom_y"] = 0.0 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=0.0, start_x=1.0, start_y=0.0, turn_complete=True) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.pending_movements.qsize() == 0 + + def test_tick_movement_phase_retreat_blocked(self, connector, mock_dependencies): + """Test tick during movement phase when retreat is blocked.""" + mock_dependencies["paths"].retreat = [] + mock_dependencies["odom"].position["odom_x"] = 1.0 + mock_dependencies["odom"].position["odom_y"] = 0.0 + connector.pending_movements.put( + MoveCommand(dx=-0.5, yaw=0.0, start_x=1.0, start_y=0.0, turn_complete=True) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.pending_movements.qsize() == 0 + + def test_tick_movement_phase_continue_moving(self, connector, mock_dependencies): + """Test tick during movement phase continuing to move.""" + mock_dependencies["odom"].position["odom_x"] = 0.2 + mock_dependencies["odom"].position["odom_y"] = 0.0 + connector.pending_movements.put( + MoveCommand(dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=True) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.movement_attempts == 1 + mock_dependencies["session"].put.assert_called() + + def test_tick_movement_phase_complete(self, connector, mock_dependencies): + """Test tick when movement is complete.""" + mock_dependencies["odom"].position["odom_x"] = 0.5 + mock_dependencies["odom"].position["odom_y"] = 0.0 + connector.pending_movements.put( + MoveCommand( + dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=True, speed=0.5 + ) + ) + + with patch.object(connector, "sleep"): + connector.tick() + + assert connector.pending_movements.qsize() == 0 + assert connector.movement_attempts == 0 diff --git a/tests/inputs/plugins/test_tron_odom.py b/tests/inputs/plugins/test_tron_odom.py new file mode 100644 index 000000000..f4941f718 --- /dev/null +++ b/tests/inputs/plugins/test_tron_odom.py @@ -0,0 +1,188 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from inputs.base import Message +from inputs.plugins.tron_odom import TronOdom, TronOdomConfig +from providers.tron_odom_provider import RobotState + + +def test_initialization(): + """Test basic initialization with default config.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + assert sensor.messages == [] + assert ( + "location" in sensor.descriptor_for_LLM.lower() + or "pose" in sensor.descriptor_for_LLM.lower() + ) + + +def test_initialization_with_custom_topic(): + """Test initialization with custom topic.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider") as mock_provider, + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig(topic="custom_odom") + sensor = TronOdom(config=config) + + assert sensor.config.topic == "custom_odom" + mock_provider.assert_called_once_with("custom_odom") + + +@pytest.mark.asyncio +async def test_poll_with_position_data(): + """Test _poll with position data available.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider") as mock_provider_class, + patch("inputs.plugins.tron_odom.IOProvider"), + ): + mock_provider = MagicMock() + mock_provider.position = {"odom_x": 1.0, "odom_y": 2.0, "moving": False} + mock_provider_class.return_value = mock_provider + + config = TronOdomConfig() + sensor = TronOdom(config=config) + + with patch("inputs.plugins.tron_odom.asyncio.sleep", new=AsyncMock()): + result = await sensor._poll() + + assert result == {"odom_x": 1.0, "odom_y": 2.0, "moving": False} + + +@pytest.mark.asyncio +async def test_poll_with_no_data(): + """Test _poll when no position data available.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider") as mock_provider_class, + patch("inputs.plugins.tron_odom.IOProvider"), + ): + mock_provider = MagicMock() + mock_provider.position = None + mock_provider_class.return_value = mock_provider + + config = TronOdomConfig() + sensor = TronOdom(config=config) + + with patch("inputs.plugins.tron_odom.asyncio.sleep", new=AsyncMock()): + result = await sensor._poll() + + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_sitting(): + """Test _raw_to_text when robot is sitting.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + position_data = {"moving": False, "body_attitude": RobotState.SITTING} + + with patch("inputs.plugins.tron_odom.time.time", return_value=1234.0): + result = await sensor._raw_to_text(position_data) + + assert result is not None + assert result.timestamp == 1234.0 + assert "sitting" in result.message.lower() + + +@pytest.mark.asyncio +async def test_raw_to_text_moving(): + """Test _raw_to_text when robot is moving.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + position_data = {"moving": True, "body_attitude": RobotState.STANDING} + + with patch("inputs.plugins.tron_odom.time.time", return_value=1234.0): + result = await sensor._raw_to_text(position_data) + + assert result is not None + assert result.timestamp == 1234.0 + assert "moving" in result.message.lower() + + +@pytest.mark.asyncio +async def test_raw_to_text_standing_still(): + """Test _raw_to_text when robot is standing still.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + position_data = {"moving": False, "body_attitude": RobotState.STANDING} + + with patch("inputs.plugins.tron_odom.time.time", return_value=1234.0): + result = await sensor._raw_to_text(position_data) + + assert result is not None + assert result.timestamp == 1234.0 + assert ( + "standing still" in result.message.lower() + or "can move" in result.message.lower() + ) + + +@pytest.mark.asyncio +async def test_raw_to_text_with_none(): + """Test _raw_to_text with None input.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + result = await sensor._raw_to_text(None) + assert result is None + + +def test_formatted_latest_buffer_with_messages(): + """Test formatted_latest_buffer with messages.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + sensor.io_provider = MagicMock() + + sensor.messages = [ + Message(timestamp=1000.0, message="You are standing still"), + ] + + result = sensor.formatted_latest_buffer() + + assert result is not None + assert "standing" in result.lower() or "INPUT" in result + sensor.io_provider.add_input.assert_called_once() + assert len(sensor.messages) == 0 + + +def test_formatted_latest_buffer_empty(): + """Test formatted_latest_buffer with empty buffer.""" + with ( + patch("inputs.plugins.tron_odom.TronOdomProvider"), + patch("inputs.plugins.tron_odom.IOProvider"), + ): + config = TronOdomConfig() + sensor = TronOdom(config=config) + + result = sensor.formatted_latest_buffer() + assert result is None diff --git a/tests/providers/test_tron_odom_provider.py b/tests/providers/test_tron_odom_provider.py new file mode 100644 index 000000000..a01b31a18 --- /dev/null +++ b/tests/providers/test_tron_odom_provider.py @@ -0,0 +1,226 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from providers.tron_odom_provider import RobotState, TronOdomProvider + + +@pytest.fixture(autouse=True) +def reset_singleton(): + """Reset singleton instances between tests.""" + TronOdomProvider.reset() # type: ignore + yield + TronOdomProvider.reset() # type: ignore + + +@pytest.fixture +def mock_multiprocessing(): + with ( + patch("providers.tron_odom_provider.mp.Queue") as mock_queue, + patch("providers.tron_odom_provider.mp.Process") as mock_process, + patch("providers.tron_odom_provider.threading.Thread") as mock_thread, + patch("providers.tron_odom_provider.threading.Event") as mock_event, + ): + mock_queue_instance = MagicMock() + mock_process_instance = MagicMock() + mock_thread_instance = MagicMock() + mock_event_instance = MagicMock() + + mock_queue.return_value = mock_queue_instance + mock_process.return_value = mock_process_instance + mock_thread.return_value = mock_thread_instance + mock_event.return_value = mock_event_instance + + mock_process_instance.is_alive.return_value = False + mock_thread_instance.is_alive.return_value = False + mock_event_instance.is_set.return_value = False + + yield ( + mock_queue, + mock_queue_instance, + mock_process, + mock_process_instance, + mock_thread, + mock_thread_instance, + ) + + +def test_initialization_default_topic(mock_multiprocessing): + """Test initialization with default topic.""" + mock_queue, _, _, _, _, _ = mock_multiprocessing + + provider = TronOdomProvider() + + assert provider.topic == "odom" + mock_queue.assert_called_once() + + +def test_initialization_custom_topic(mock_multiprocessing): + """Test initialization with custom topic.""" + mock_queue, _, _, _, _, _ = mock_multiprocessing + + TronOdomProvider.reset() # type: ignore + provider = TronOdomProvider(topic="custom_odom") + + assert provider.topic == "custom_odom" + mock_queue.assert_called_once() + + +def test_singleton_pattern(mock_multiprocessing): + """Test singleton pattern - same instance returned.""" + provider1 = TronOdomProvider(topic="odom") + provider2 = TronOdomProvider(topic="other_topic") + assert provider1 is provider2 + + +def test_start(mock_multiprocessing): + """Test start() launches process and thread.""" + _, _, _, mock_process_instance, _, mock_thread_instance = mock_multiprocessing + + TronOdomProvider(topic="odom") + + assert mock_process_instance.start.call_count >= 1 + assert mock_thread_instance.start.call_count >= 1 + + +def test_start_already_running(mock_multiprocessing): + """Test start() does not restart when already running.""" + _, _, _, mock_process_instance, _, mock_thread_instance = mock_multiprocessing + + provider = TronOdomProvider(topic="odom") + + mock_process_instance.is_alive.return_value = True + mock_thread_instance.is_alive.return_value = True + + mock_process_instance.start.reset_mock() + mock_thread_instance.start.reset_mock() + + provider.start() + + mock_process_instance.start.assert_not_called() + mock_thread_instance.start.assert_not_called() + + +def test_start_no_topic(mock_multiprocessing): + """Test start() returns early when topic is empty.""" + _, _, _, mock_process_instance, _, _ = mock_multiprocessing + + provider = TronOdomProvider(topic="odom") + mock_process_instance.is_alive.return_value = False + provider.topic = "" + + mock_process_instance.start.reset_mock() + provider.start() + + # Should not start because topic is empty + mock_process_instance.start.assert_not_called() + + +def test_stop(mock_multiprocessing): + """Test stop() terminates process and joins thread.""" + _, _, _, mock_process_instance, _, mock_thread_instance = mock_multiprocessing + + provider = TronOdomProvider(topic="odom") + provider.stop() + + assert provider._stop_event.set.called # type: ignore + mock_process_instance.terminate.assert_called_once() + mock_process_instance.join.assert_called_once() + mock_thread_instance.join.assert_called_once() + + +def test_robot_state_enum(): + """Test RobotState enum values.""" + assert RobotState.STANDING.value == "standing" + assert RobotState.SITTING.value == "sitting" + + +def test_position_property(mock_multiprocessing): + """Test position property returns expected dictionary structure.""" + provider = TronOdomProvider(topic="odom") + + position = provider.position + + assert "odom_x" in position + assert "odom_y" in position + assert "moving" in position + assert "odom_yaw_0_360" in position + assert "odom_yaw_m180_p180" in position + assert "body_height_cm" in position + assert "body_attitude" in position + assert "odom_rockchip_ts" in position + assert "odom_subscriber_ts" in position + + assert position["odom_x"] == 0.0 + assert position["odom_y"] == 0.0 + assert position["moving"] is False + + +def test_euler_from_quaternion(mock_multiprocessing): + """Test quaternion to Euler angle conversion.""" + provider = TronOdomProvider(topic="odom") + + # Identity quaternion (no rotation) + roll, pitch, yaw = provider.euler_from_quaternion(0.0, 0.0, 0.0, 1.0) + assert abs(roll) < 0.001 + assert abs(pitch) < 0.001 + assert abs(yaw) < 0.001 + + +def test_euler_from_quaternion_90_degree_yaw(mock_multiprocessing): + """Test quaternion representing 90 degree yaw rotation.""" + import math + + provider = TronOdomProvider(topic="odom") + + # 90 degree rotation about z-axis + # quaternion: (0, 0, sin(45°), cos(45°)) = (0, 0, 0.7071, 0.7071) + roll, pitch, yaw = provider.euler_from_quaternion(0.0, 0.0, 0.7071, 0.7071) + + # yaw should be approximately 90 degrees (pi/2 radians) + assert abs(yaw - math.pi / 2) < 0.01 + + +def test_body_attitude_standing(mock_multiprocessing): + """Test body attitude is set to STANDING when height > 60cm.""" + provider = TronOdomProvider(topic="odom") + + # Manually set body height to simulate standing + provider.body_height_cm = 70 + + # This happens in process_odom, but we can test the threshold logic + if provider.body_height_cm > 60: + provider.body_attitude = RobotState.STANDING + + assert provider.body_attitude == RobotState.STANDING + + +def test_body_attitude_sitting(mock_multiprocessing): + """Test body attitude is set to SITTING when height is between 3-60cm.""" + provider = TronOdomProvider(topic="odom") + + # Manually set body height to simulate sitting + provider.body_height_cm = 55 + + if provider.body_height_cm > 60: + provider.body_attitude = RobotState.STANDING + elif provider.body_height_cm > 3: + provider.body_attitude = RobotState.SITTING + + assert provider.body_attitude == RobotState.SITTING + + +def test_initial_values(mock_multiprocessing): + """Test initial values are set correctly.""" + provider = TronOdomProvider(topic="odom") + + assert provider.x == 0.0 + assert provider.y == 0.0 + assert provider.z == 0.0 + assert provider.odom_yaw_0_360 == 0.0 + assert provider.odom_yaw_m180_p180 == 0.0 + assert provider.moving is False + assert provider.previous_x == 0 + assert provider.previous_y == 0 + assert provider.previous_z == 0 + assert provider.move_history == 0 From 420d51dfc6729b5eb3a7993491375a57d9c9c545 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Thu, 29 Jan 2026 15:40:57 -0800 Subject: [PATCH 8/9] fix lint --- tests/actions/move_tron_autonomy/test_limx_sdk.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/actions/move_tron_autonomy/test_limx_sdk.py b/tests/actions/move_tron_autonomy/test_limx_sdk.py index 797f55c4f..999c50dad 100644 --- a/tests/actions/move_tron_autonomy/test_limx_sdk.py +++ b/tests/actions/move_tron_autonomy/test_limx_sdk.py @@ -104,9 +104,7 @@ def test_initialization(self, connector, mock_dependencies): def test_initialization_zenoh_error(self): """Test initialization when Zenoh session fails.""" with ( - patch( - "actions.move_tron_autonomy.connector.limx_sdk.SimplePathsProvider" - ), + patch("actions.move_tron_autonomy.connector.limx_sdk.SimplePathsProvider"), patch("actions.move_tron_autonomy.connector.limx_sdk.TronOdomProvider"), patch( "actions.move_tron_autonomy.connector.limx_sdk.open_zenoh_session" From d3f9bd3268bdbd859c688b537ceb7db152f42773 Mon Sep 17 00:00:00 2001 From: jerinpeter Date: Thu, 29 Jan 2026 15:46:27 -0800 Subject: [PATCH 9/9] Refactor MoveInput actions to use MovementAction enum for consistency --- .../move_tron_autonomy/test_limx_sdk.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/actions/move_tron_autonomy/test_limx_sdk.py b/tests/actions/move_tron_autonomy/test_limx_sdk.py index 999c50dad..03b0f57f2 100644 --- a/tests/actions/move_tron_autonomy/test_limx_sdk.py +++ b/tests/actions/move_tron_autonomy/test_limx_sdk.py @@ -8,7 +8,7 @@ MoveTronZenohConfig, MoveTronZenohConnector, ) -from actions.move_tron_autonomy.interface import MoveInput +from actions.move_tron_autonomy.interface import MoveInput, MovementAction from providers.tron_odom_provider import RobotState @@ -129,7 +129,7 @@ class TestConnect: async def test_connect_robot_already_moving(self, connector, mock_dependencies): """Test connect when robot is already moving.""" mock_dependencies["odom"].position["moving"] = True - move_input = MoveInput(action="move forwards") + move_input = MoveInput(action=MovementAction.MOVE_FORWARDS) await connector.connect(move_input) @@ -142,7 +142,7 @@ async def test_connect_movement_already_pending(self, connector, mock_dependenci connector.pending_movements.put( MoveCommand(dx=0.5, yaw=0.0, start_x=0.0, start_y=0.0, turn_complete=False) ) - move_input = MoveInput(action="move forwards") + move_input = MoveInput(action=MovementAction.MOVE_FORWARDS) await connector.connect(move_input) @@ -155,7 +155,7 @@ async def test_connect_waiting_for_location_data( ): """Test connect when waiting for location data.""" mock_dependencies["odom"].position["odom_x"] = 0.0 - move_input = MoveInput(action="move forwards") + move_input = MoveInput(action=MovementAction.MOVE_FORWARDS) await connector.connect(move_input) @@ -164,7 +164,7 @@ async def test_connect_waiting_for_location_data( @pytest.mark.asyncio async def test_connect_turn_left(self, connector, mock_dependencies): """Test connect with turn left command.""" - move_input = MoveInput(action="turn left") + move_input = MoveInput(action=MovementAction.TURN_LEFT) await connector.connect(move_input) @@ -176,7 +176,7 @@ async def test_connect_turn_left(self, connector, mock_dependencies): @pytest.mark.asyncio async def test_connect_turn_right(self, connector, mock_dependencies): """Test connect with turn right command.""" - move_input = MoveInput(action="turn right") + move_input = MoveInput(action=MovementAction.TURN_RIGHT) await connector.connect(move_input) @@ -188,7 +188,7 @@ async def test_connect_turn_right(self, connector, mock_dependencies): @pytest.mark.asyncio async def test_connect_move_forwards(self, connector, mock_dependencies): """Test connect with move forwards command.""" - move_input = MoveInput(action="move forwards") + move_input = MoveInput(action=MovementAction.MOVE_FORWARDS) await connector.connect(move_input) @@ -199,7 +199,7 @@ async def test_connect_move_forwards(self, connector, mock_dependencies): @pytest.mark.asyncio async def test_connect_move_back(self, connector, mock_dependencies): """Test connect with move back command.""" - move_input = MoveInput(action="move back") + move_input = MoveInput(action=MovementAction.MOVE_BACK) await connector.connect(move_input) @@ -212,7 +212,7 @@ async def test_connect_move_back(self, connector, mock_dependencies): @pytest.mark.asyncio async def test_connect_stand_still(self, connector, mock_dependencies): """Test connect with stand still command.""" - move_input = MoveInput(action="stand still") + move_input = MoveInput(action=MovementAction.STAND_STILL) await connector.connect(move_input) @@ -221,7 +221,7 @@ async def test_connect_stand_still(self, connector, mock_dependencies): @pytest.mark.asyncio async def test_connect_unknown_action(self, connector, mock_dependencies): """Test connect with unknown action.""" - move_input = MoveInput(action="unknown action") + move_input = MoveInput(action="unknown action") # type: ignore[arg-type] await connector.connect(move_input)