Skip to content

Commit

Permalink
Add type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
sea-bass committed Nov 2, 2023
1 parent e4eb914 commit b434598
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion, ros_loader
from rosbridge_library.protocol import Protocol


class ActionFeedback(Capability):
Expand All @@ -42,14 +43,14 @@ class ActionFeedback(Capability):
(False, "values", dict),
]

def __init__(self, protocol):
def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("action_feedback", self.action_feedback)

def action_feedback(self, message):
def action_feedback(self, message: dict) -> None:
# Typecheck the args
self.basic_type_check(message, self.action_feedback_msg_fields)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion, ros_loader
from rosbridge_library.protocol import Protocol


class ActionResult(Capability):
Expand All @@ -43,14 +44,14 @@ class ActionResult(Capability):
(True, "result", bool),
]

def __init__(self, protocol):
def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("action_result", self.action_result)

def action_result(self, message):
def action_result(self, message: dict) -> None:
# Typecheck the args
self.basic_type_check(message, self.action_result_msg_fields)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@
# POSSIBILITY OF SUCH DAMAGE.

import fnmatch
from typing import Any

import rclpy
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion
from rosbridge_library.internal.ros_loader import get_action_class
from rosbridge_library.protocol import Protocol


class AdvertisedActionHandler:

id_counter = 1

def __init__(self, action_name, action_type, protocol, sleep_time=0.001):
def __init__(
self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001
) -> None:
self.goal_futures = {}
self.goal_handles = {}

Expand All @@ -61,12 +65,12 @@ def __init__(self, action_name, action_type, protocol, sleep_time=0.001):
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
)

def next_id(self):
def next_id(self) -> int:
id = self.id_counter
self.id_counter += 1
return id

async def execute_callback(self, goal):
async def execute_callback(self, goal: Any) -> Any:
# generate a unique ID
goal_id = f"action_goal:{self.action_name}:{self.next_id()}"

Expand All @@ -92,7 +96,7 @@ async def execute_callback(self, goal):
del self.goal_futures[goal_id]
del self.goal_handles[goal_id]

def handle_feedback(self, goal_id, feedback):
def handle_feedback(self, goal_id: str, feedback: Any) -> None:
"""
Called by the ActionFeedback capability to handle action feedback from the external client.
"""
Expand All @@ -101,16 +105,16 @@ def handle_feedback(self, goal_id, feedback):
else:
self.protocol.log("warning", f"Received action feedback for unrecognized id: {goal_id}")

def handle_result(self, goal_id, res):
def handle_result(self, goal_id: str, result: Any) -> None:
"""
Called by the ActionResult capability to handle an action result from the external client.
"""
if goal_id in self.goal_futures:
self.goal_futures[goal_id].set_result(res)
self.goal_futures[goal_id].set_result(result)
else:
self.protocol.log("warning", f"Received action result for unrecognized id: {goal_id}")

def graceful_shutdown(self):
def graceful_shutdown(self) -> None:
"""
Signal the AdvertisedActionHandler to shutdown.
"""
Expand All @@ -132,14 +136,14 @@ class AdvertiseAction(Capability):

advertise_action_msg_fields = [(True, "action", str), (True, "type", str)]

def __init__(self, protocol):
def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("advertise_action", self.advertise_action)

def advertise_action(self, message):
def advertise_action(self, message: dict) -> None:
# Typecheck the args
self.basic_type_check(message, self.advertise_action_msg_fields)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from rosbridge_library.capability import Capability
from rosbridge_library.internal.actions import ActionClientHandler
from rosbridge_library.protocol import Protocol


class SendActionGoal(Capability):
Expand All @@ -51,7 +52,7 @@ class SendActionGoal(Capability):
actions_glob = None
client_handler_list = {}

def __init__(self, protocol):
def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

Expand All @@ -75,7 +76,7 @@ def __init__(self, protocol):

protocol.register_operation("cancel_action_goal", self.cancel_action_goal)

def send_action_goal(self, message):
def send_action_goal(self, message: dict) -> None:
# Pull out the ID
cid = message.get("id", None)

Expand Down Expand Up @@ -128,7 +129,7 @@ def send_action_goal(self, message):
client_handler.run()
del self.client_handler_list[cid]

def cancel_action_goal(self, message):
def cancel_action_goal(self, message: dict) -> None:
# Extract the args
cid = message.get("id", None)
action = message["action"]
Expand All @@ -146,7 +147,9 @@ def cancel_action_goal(self, message):
if client_handler.send_goal_helper is not None:
client_handler.send_goal_helper.cancel_goal()

def _success(self, cid, action, fragment_size, compression, message):
def _success(
self, cid: str, action: str, fragment_size: int, compression: bool, message: dict
) -> None:
outgoing_message = {
"op": "action_result",
"action": action,
Expand All @@ -158,8 +161,8 @@ def _success(self, cid, action, fragment_size, compression, message):
# TODO: fragmentation, compression
self.protocol.send(outgoing_message)

def _failure(self, cid, action, exc):
self.protocol.log("error", "send_action_goal %s: %s" % (type(exc).__name__, str(exc)), cid)
def _failure(self, cid: str, action: str, exc: Exception) -> None:
self.protocol.log("error", f"send_action_goal {type(exc).__name__}: {cid}")
# send response with result: false
outgoing_message = {
"op": "action_result",
Expand All @@ -171,7 +174,7 @@ def _failure(self, cid, action, exc):
outgoing_message["id"] = cid
self.protocol.send(outgoing_message)

def _feedback(self, cid, action, message):
def _feedback(self, cid: str, action: str, message: dict) -> None:
outgoing_message = {
"op": "action_feedback",
"action": action,
Expand All @@ -183,13 +186,13 @@ def _feedback(self, cid, action, message):
self.protocol.send(outgoing_message)


def trim_action_name(action):
def trim_action_name(action: str) -> str:
if "#" in action:
return action[: action.find("#")]
return action


def extract_id(action, cid):
def extract_id(action: str, cid: str) -> str:
if cid is not None:
return cid
elif "#" in action:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@
import fnmatch

from rosbridge_library.capability import Capability
from rosbridge_library.protocol import Protocol


class UnadvertiseAction(Capability):

actions_glob = None

def __init__(self, protocol):
def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("unadvertise_action", self.unadvertise_action)

def unadvertise_action(self, message):
def unadvertise_action(self, message: dict) -> None:
# parse the message
action_name = message["action"]

Expand Down
42 changes: 26 additions & 16 deletions rosbridge_library/src/rosbridge_library/internal/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@

import time
from threading import Thread
from typing import Any, Callable, Optional, Union

from rclpy.action import ActionClient
from rclpy.expand_topic_name import expand_topic_name
from rclpy.node import Node
from rclpy.task import Future
from rosbridge_library.internal.message_conversion import (
extract_values,
populate_instance,
Expand All @@ -46,21 +49,21 @@


class InvalidActionException(Exception):
def __init__(self, action_name):
def __init__(self, action_name) -> None:
Exception.__init__(self, f"Action {action_name} does not exist")


class ActionClientHandler(Thread):
def __init__(
self,
action,
action_type,
args,
success_callback,
error_callback,
feedback_callback,
node_handle,
):
action: str,
action_type: str,
args: dict,
success_callback: Callable[[str, str, int, bool, dict], None],
error_callback: Callable[[str, str, Exception], None],
feedback_callback: Callable[[str, str, dict], None],
node_handle: Node,
) -> None:
"""
Create a client handler for the specified action.
Use start() to start in a separate thread or run() to run in this thread.
Expand Down Expand Up @@ -89,7 +92,7 @@ def __init__(
self.node_handle = node_handle
self.send_goal_helper = None

def run(self):
def run(self) -> None:
try:
# Call the service and pass the result to the success handler
self.send_goal_helper = SendGoal()
Expand All @@ -107,7 +110,7 @@ def run(self):
self.error(e)


def args_to_action_goal_instance(action, inst, args):
def args_to_action_goal_instance(action: str, inst: Any, args: Union[list, dict]) -> Any:
""" "
Populate an action goal instance with the provided args
Expand All @@ -128,23 +131,30 @@ def args_to_action_goal_instance(action, inst, args):
class SendGoal:
"""Helper class to send action goals."""

def __init__(self, server_timeout_time=1.0, sleep_time=0.001):
def __init__(self, server_timeout_time: float = 1.0, sleep_time: float = 0.001) -> None:
self.server_timeout_time = server_timeout_time
self.sleep_time = sleep_time
self.goal_handle = None
self.goal_canceled = False

def get_result_cb(self, future):
def get_result_cb(self, future: Future) -> None:
self.result = future.result()

def goal_response_cb(self, future):
def goal_response_cb(self, future: Future) -> None:
self.goal_handle = future.result()
if not self.goal_handle.accepted:
raise Exception("Action goal was rejected")
result_future = self.goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_cb)

def send_goal(self, node_handle, action, action_type, args=None, feedback_cb=None):
def send_goal(
self,
node_handle: Node,
action: str,
action_type: str,
args: Optional[dict] = None,
feedback_cb: Optional[Callable[[str, str, dict], None]] = None,
) -> dict:
# Given the action name and type, fetch a request instance
action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace())
action_class = get_action_class(action_type)
Expand All @@ -171,7 +181,7 @@ def send_goal(self, node_handle, action, action_type, args=None, feedback_cb=Non

return json_response

def cancel_goal(self):
def cancel_goal(self) -> None:
if self.goal_handle is None:
return

Expand Down
Loading

0 comments on commit b434598

Please sign in to comment.