diff --git a/rosbridge_library/src/rosbridge_library/capabilities/action_feedback.py b/rosbridge_library/src/rosbridge_library/capabilities/action_feedback.py index 2d3d9d5f..f94c3b15 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/action_feedback.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/action_feedback.py @@ -32,6 +32,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion, ros_loader +from rosbridge_library.protocol import Protocol class ActionFeedback(Capability): @@ -42,14 +43,14 @@ class ActionFeedback(Capability): (False, "values", dict), ] - def __init__(self, protocol): + def __init__(self, protocol: Protocol) -> None: # Call superclass constructor Capability.__init__(self, protocol) # Register the operations that this capability provides protocol.register_operation("action_feedback", self.action_feedback) - def action_feedback(self, message): + def action_feedback(self, message: dict) -> None: # Typecheck the args self.basic_type_check(message, self.action_feedback_msg_fields) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/action_result.py b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py index cd8a6116..bb508996 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/action_result.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/action_result.py @@ -32,6 +32,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion, ros_loader +from rosbridge_library.protocol import Protocol class ActionResult(Capability): @@ -43,14 +44,14 @@ class ActionResult(Capability): (True, "result", bool), ] - def __init__(self, protocol): + def __init__(self, protocol: Protocol) -> None: # Call superclass constructor Capability.__init__(self, protocol) # Register the operations that this capability provides protocol.register_operation("action_result", self.action_result) - def action_result(self, message): + def action_result(self, message: dict) -> None: # Typecheck the args self.basic_type_check(message, self.action_result_msg_fields) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py index 98af016d..518290e4 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -31,6 +31,7 @@ # POSSIBILITY OF SUCH DAMAGE. import fnmatch +from typing import Any import rclpy from rclpy.action import ActionServer @@ -38,13 +39,16 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion from rosbridge_library.internal.ros_loader import get_action_class +from rosbridge_library.protocol import Protocol class AdvertisedActionHandler: id_counter = 1 - def __init__(self, action_name, action_type, protocol, sleep_time=0.001): + def __init__( + self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001 + ) -> None: self.goal_futures = {} self.goal_handles = {} @@ -61,12 +65,12 @@ def __init__(self, action_name, action_type, protocol, sleep_time=0.001): callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 ) - def next_id(self): + def next_id(self) -> int: id = self.id_counter self.id_counter += 1 return id - async def execute_callback(self, goal): + async def execute_callback(self, goal: Any) -> Any: # generate a unique ID goal_id = f"action_goal:{self.action_name}:{self.next_id()}" @@ -92,7 +96,7 @@ async def execute_callback(self, goal): del self.goal_futures[goal_id] del self.goal_handles[goal_id] - def handle_feedback(self, goal_id, feedback): + def handle_feedback(self, goal_id: str, feedback: Any) -> None: """ Called by the ActionFeedback capability to handle action feedback from the external client. """ @@ -101,16 +105,16 @@ def handle_feedback(self, goal_id, feedback): else: self.protocol.log("warning", f"Received action feedback for unrecognized id: {goal_id}") - def handle_result(self, goal_id, res): + def handle_result(self, goal_id: str, result: Any) -> None: """ Called by the ActionResult capability to handle an action result from the external client. """ if goal_id in self.goal_futures: - self.goal_futures[goal_id].set_result(res) + self.goal_futures[goal_id].set_result(result) else: self.protocol.log("warning", f"Received action result for unrecognized id: {goal_id}") - def graceful_shutdown(self): + def graceful_shutdown(self) -> None: """ Signal the AdvertisedActionHandler to shutdown. """ @@ -132,14 +136,14 @@ class AdvertiseAction(Capability): advertise_action_msg_fields = [(True, "action", str), (True, "type", str)] - def __init__(self, protocol): + def __init__(self, protocol: Protocol) -> None: # Call superclass constructor Capability.__init__(self, protocol) # Register the operations that this capability provides protocol.register_operation("advertise_action", self.advertise_action) - def advertise_action(self, message): + def advertise_action(self, message: dict) -> None: # Typecheck the args self.basic_type_check(message, self.advertise_action_msg_fields) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py index 27a7dd89..4f8d8975 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py @@ -36,6 +36,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal.actions import ActionClientHandler +from rosbridge_library.protocol import Protocol class SendActionGoal(Capability): @@ -51,7 +52,7 @@ class SendActionGoal(Capability): actions_glob = None client_handler_list = {} - def __init__(self, protocol): + def __init__(self, protocol: Protocol) -> None: # Call superclass constructor Capability.__init__(self, protocol) @@ -75,7 +76,7 @@ def __init__(self, protocol): protocol.register_operation("cancel_action_goal", self.cancel_action_goal) - def send_action_goal(self, message): + def send_action_goal(self, message: dict) -> None: # Pull out the ID cid = message.get("id", None) @@ -128,7 +129,7 @@ def send_action_goal(self, message): client_handler.run() del self.client_handler_list[cid] - def cancel_action_goal(self, message): + def cancel_action_goal(self, message: dict) -> None: # Extract the args cid = message.get("id", None) action = message["action"] @@ -146,7 +147,9 @@ def cancel_action_goal(self, message): if client_handler.send_goal_helper is not None: client_handler.send_goal_helper.cancel_goal() - def _success(self, cid, action, fragment_size, compression, message): + def _success( + self, cid: str, action: str, fragment_size: int, compression: bool, message: dict + ) -> None: outgoing_message = { "op": "action_result", "action": action, @@ -158,8 +161,8 @@ def _success(self, cid, action, fragment_size, compression, message): # TODO: fragmentation, compression self.protocol.send(outgoing_message) - def _failure(self, cid, action, exc): - self.protocol.log("error", "send_action_goal %s: %s" % (type(exc).__name__, str(exc)), cid) + def _failure(self, cid: str, action: str, exc: Exception) -> None: + self.protocol.log("error", f"send_action_goal {type(exc).__name__}: {cid}") # send response with result: false outgoing_message = { "op": "action_result", @@ -171,7 +174,7 @@ def _failure(self, cid, action, exc): outgoing_message["id"] = cid self.protocol.send(outgoing_message) - def _feedback(self, cid, action, message): + def _feedback(self, cid: str, action: str, message: dict) -> None: outgoing_message = { "op": "action_feedback", "action": action, @@ -183,13 +186,13 @@ def _feedback(self, cid, action, message): self.protocol.send(outgoing_message) -def trim_action_name(action): +def trim_action_name(action: str) -> str: if "#" in action: return action[: action.find("#")] return action -def extract_id(action, cid): +def extract_id(action: str, cid: str) -> str: if cid is not None: return cid elif "#" in action: diff --git a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_action.py index d7ad5da0..703b8aca 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_action.py @@ -33,20 +33,21 @@ import fnmatch from rosbridge_library.capability import Capability +from rosbridge_library.protocol import Protocol class UnadvertiseAction(Capability): actions_glob = None - def __init__(self, protocol): + def __init__(self, protocol: Protocol) -> None: # Call superclass constructor Capability.__init__(self, protocol) # Register the operations that this capability provides protocol.register_operation("unadvertise_action", self.unadvertise_action) - def unadvertise_action(self, message): + def unadvertise_action(self, message: dict) -> None: # parse the message action_name = message["action"] diff --git a/rosbridge_library/src/rosbridge_library/internal/actions.py b/rosbridge_library/src/rosbridge_library/internal/actions.py index 3cd57af2..0421c31e 100644 --- a/rosbridge_library/src/rosbridge_library/internal/actions.py +++ b/rosbridge_library/src/rosbridge_library/internal/actions.py @@ -32,9 +32,12 @@ import time from threading import Thread +from typing import Any, Callable, Optional, Union from rclpy.action import ActionClient from rclpy.expand_topic_name import expand_topic_name +from rclpy.node import Node +from rclpy.task import Future from rosbridge_library.internal.message_conversion import ( extract_values, populate_instance, @@ -46,21 +49,21 @@ class InvalidActionException(Exception): - def __init__(self, action_name): + def __init__(self, action_name) -> None: Exception.__init__(self, f"Action {action_name} does not exist") class ActionClientHandler(Thread): def __init__( self, - action, - action_type, - args, - success_callback, - error_callback, - feedback_callback, - node_handle, - ): + action: str, + action_type: str, + args: dict, + success_callback: Callable[[str, str, int, bool, dict], None], + error_callback: Callable[[str, str, Exception], None], + feedback_callback: Callable[[str, str, dict], None], + node_handle: Node, + ) -> None: """ Create a client handler for the specified action. Use start() to start in a separate thread or run() to run in this thread. @@ -89,7 +92,7 @@ def __init__( self.node_handle = node_handle self.send_goal_helper = None - def run(self): + def run(self) -> None: try: # Call the service and pass the result to the success handler self.send_goal_helper = SendGoal() @@ -107,7 +110,7 @@ def run(self): self.error(e) -def args_to_action_goal_instance(action, inst, args): +def args_to_action_goal_instance(action: str, inst: Any, args: Union[list, dict]) -> Any: """ " Populate an action goal instance with the provided args @@ -128,23 +131,30 @@ def args_to_action_goal_instance(action, inst, args): class SendGoal: """Helper class to send action goals.""" - def __init__(self, server_timeout_time=1.0, sleep_time=0.001): + def __init__(self, server_timeout_time: float = 1.0, sleep_time: float = 0.001) -> None: self.server_timeout_time = server_timeout_time self.sleep_time = sleep_time self.goal_handle = None self.goal_canceled = False - def get_result_cb(self, future): + def get_result_cb(self, future: Future) -> None: self.result = future.result() - def goal_response_cb(self, future): + def goal_response_cb(self, future: Future) -> None: self.goal_handle = future.result() if not self.goal_handle.accepted: raise Exception("Action goal was rejected") result_future = self.goal_handle.get_result_async() result_future.add_done_callback(self.get_result_cb) - def send_goal(self, node_handle, action, action_type, args=None, feedback_cb=None): + def send_goal( + self, + node_handle: Node, + action: str, + action_type: str, + args: Optional[dict] = None, + feedback_cb: Optional[Callable[[str, str, dict], None]] = None, + ) -> dict: # Given the action name and type, fetch a request instance action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace()) action_class = get_action_class(action_type) @@ -171,7 +181,7 @@ def send_goal(self, node_handle, action, action_type, args=None, feedback_cb=Non return json_response - def cancel_goal(self): + def cancel_goal(self) -> None: if self.goal_handle is None: return diff --git a/rosbridge_library/src/rosbridge_library/internal/ros_loader.py b/rosbridge_library/src/rosbridge_library/internal/ros_loader.py index 6695a2f1..3651727a 100644 --- a/rosbridge_library/src/rosbridge_library/internal/ros_loader.py +++ b/rosbridge_library/src/rosbridge_library/internal/ros_loader.py @@ -33,6 +33,7 @@ import importlib from threading import Lock +from typing import Any """ ros_loader contains methods for dynamically loading ROS message classes at runtime. It's achieved by using roslib to load the manifest files for the @@ -51,12 +52,12 @@ class InvalidTypeStringException(Exception): - def __init__(self, typestring): - Exception.__init__(self, "%s is not a valid type string" % typestring) + def __init__(self, typestring: str) -> None: + Exception.__init__(self, f"{typestring} is not a valid type string") class InvalidModuleException(Exception): - def __init__(self, modname, subname, original_exception): + def __init__(self, modname: str, subname: str, original_exception: Exception) -> None: Exception.__init__( self, "Unable to import %s.%s from package %s. Caused by: %s" @@ -65,7 +66,9 @@ def __init__(self, modname, subname, original_exception): class InvalidClassException(Exception): - def __init__(self, modname, subname, classname, original_exception): + def __init__( + self, modname: str, subname: str, classname: str, original_exception: Exception + ) -> None: Exception.__init__( self, "Unable to import %s class %s from package %s. Caused by %s" @@ -73,59 +76,61 @@ def __init__(self, modname, subname, classname, original_exception): ) -def get_message_class(typestring): +def get_message_class(typestring: str) -> Any: """Loads the message type specified. Returns the loaded class, or throws exceptions on failure""" return _get_interface_class(typestring, "msg", _loaded_msgs, _msgs_lock) -def get_service_class(typestring): +def get_service_class(typestring: str) -> Any: """Loads the service type specified. Returns the loaded class, or None on failure""" return _get_interface_class(typestring, "srv", _loaded_srvs, _srvs_lock) -def get_action_class(typestring): +def get_action_class(typestring: str) -> Any: """Loads the action type specified. Returns the loaded class, or throws exceptions on failure""" return _get_interface_class(typestring, "action", _loaded_actions, _actions_lock) -def get_message_instance(typestring): +def get_message_instance(typestring: str) -> Any: """If not loaded, loads the specified type. Then returns an instance of it, or None.""" cls = get_message_class(typestring) return cls() -def get_service_request_instance(typestring): +def get_service_request_instance(typestring: str) -> Any: cls = get_service_class(typestring) return cls.Request() -def get_service_response_instance(typestring): +def get_service_response_instance(typestring: str) -> Any: cls = get_service_class(typestring) return cls.Response() -def get_action_goal_instance(typestring): +def get_action_goal_instance(typestring: str) -> Any: cls = get_action_class(typestring) return cls.Goal() -def get_action_feedback_instance(typestring): +def get_action_feedback_instance(typestring: str) -> Any: cls = get_action_class(typestring) return cls.Feedback() -def get_action_result_instance(typestring): +def get_action_result_instance(typestring: str) -> Any: cls = get_action_class(typestring) return cls.Result() -def _get_interface_class(typestring, intf_type, loaded_intfs, intf_lock): +def _get_interface_class( + typestring: str, intf_type: str, loaded_intfs: dict[str, Any], intf_lock: Lock +) -> Any: """ If not loaded, loads the specified ROS interface class then returns an instance of it. @@ -147,13 +152,14 @@ def _get_interface_class(typestring, intf_type, loaded_intfs, intf_lock): return _get_class(typestring, intf_type, loaded_intfs, intf_lock) -def _get_class(typestring, subname, cache, lock): +def _get_class(typestring: str, subname: str, cache: dict[str, Any], lock: Lock) -> Any: """If not loaded, loads the specified class then returns an instance of it. Loaded classes are cached in the provided cache dict - Throws various exceptions if loading the msg class fails""" + Throws various exceptions if loading the msg class fails. + """ # First, see if we have this type string cached cls = _get_from_cache(cache, lock, typestring) @@ -179,7 +185,7 @@ def _get_class(typestring, subname, cache, lock): return cls -def _load_class(modname, subname, classname): +def _load_class(modname: str, subname: str, classname: str) -> None: """Loads the manifest and imports the module that contains the specified type. @@ -200,7 +206,7 @@ def _load_class(modname, subname, classname): raise InvalidClassException(modname, subname, classname, exc) -def _splittype(typestring): +def _splittype(typestring: str) -> tuple[str, str]: """Split the string the / delimiter and strip out empty strings Performs similar logic to roslib.names.package_resource_name but is a bit @@ -214,13 +220,13 @@ def _splittype(typestring): raise InvalidTypeStringException(typestring) -def _add_to_cache(cache, lock, key, value): +def _add_to_cache(cache: dict[str, Any], lock: Lock, key: str, value: any) -> None: lock.acquire() cache[key] = value lock.release() -def _get_from_cache(cache, lock, key): +def _get_from_cache(cache: dict[str, Any], lock: Lock, key: str) -> Any: """Returns the value for the specified key from the cache. Locks the lock before doing anything. Returns None if key not in cache""" lock.acquire() diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index 99f7626d..074ddef9 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -32,10 +32,12 @@ import time from threading import Thread +from typing import Any, Callable, Optional, Union import rclpy from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.expand_topic_name import expand_topic_name +from rclpy.node import Node from rosbridge_library.internal.message_conversion import ( extract_values, populate_instance, @@ -47,12 +49,19 @@ class InvalidServiceException(Exception): - def __init__(self, service_name): + def __init__(self, service_name) -> None: Exception.__init__(self, f"Service {service_name} does not exist") class ServiceCaller(Thread): - def __init__(self, service, args, success_callback, error_callback, node_handle): + def __init__( + self, + service: str, + args: dict, + success_callback: Callable[[str, str, int, bool, Any], None], + error_callback: Callable[[str, str, Exception], None], + node_handle: Node, + ) -> None: """Create a service caller for the specified service. Use start() to start in a separate thread or run() to run in this thread. @@ -76,7 +85,7 @@ def __init__(self, service, args, success_callback, error_callback, node_handle) self.error = error_callback self.node_handle = node_handle - def run(self): + def run(self) -> None: try: # Call the service and pass the result to the success handler self.success(call_service(self.node_handle, self.service, args=self.args)) @@ -85,7 +94,7 @@ def run(self): self.error(e) -def args_to_service_request_instance(service, inst, args): +def args_to_service_request_instance(service: str, inst: Any, args: dict) -> Any: """Populate a service request instance with the provided args args can be a dictionary of values, or a list, or None @@ -101,7 +110,13 @@ def args_to_service_request_instance(service, inst, args): populate_instance(msg, inst) -def call_service(node_handle, service, args=None, server_timeout_time=1.0, sleep_time=0.001): +def call_service( + node_handle: Node, + service: str, + args: Optional[dict] = None, + server_timeout_time: float = 1.0, + sleep_time: float = 0.001, +) -> dict: # Given the service name, fetch the type and class of the service, # and a request instance service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace())