From bb6283833ea2199eabe48f49f36a64ab4a8aafd2 Mon Sep 17 00:00:00 2001 From: Euan M <69640923+Moronibot@users.noreply.github.com> Date: Thu, 7 Oct 2021 16:03:34 +0100 Subject: [PATCH] feature: implement function annotations (#99) --- evasdk/Eva.py | 105 +++++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/evasdk/Eva.py b/evasdk/Eva.py index 69a7266..d70b84e 100644 --- a/evasdk/Eva.py +++ b/evasdk/Eva.py @@ -1,5 +1,9 @@ +import io import logging +from typing import Union, Dict, List, Any +from requests import Response + from .helpers import (strip_ip) from .eva_http_client import EvaHTTPClient from .eva_locker import EvaWithLocker @@ -22,7 +26,7 @@ def __init__(self, host_ip, token, request_timeout=5, renew_period=60 * 20): self.__eva_locker = EvaWithLocker(self) - def set_request_timeout(self, request_timeout): + def set_request_timeout(self, request_timeout: int) -> None: """Sets default request timeout for API commands. Args: @@ -39,7 +43,7 @@ def set_request_timeout(self, request_timeout): # --------------------------------------------- Websocket ----------------------------------------------- - def websocket(self): + def websocket(self) -> Websocket: """Creates a websocket class to monitor Eva in the background. Note: @@ -52,7 +56,7 @@ def websocket(self): Example: >>> with eva.websocket() as ws: >>> ws.register('state_change', print) - >>> input('press return when you want to stop\n') + >>> input('press return when you want to stop') """ self.__http_client.data_snapshot() host_uri = f'ws://{self.__http_client.host_ip}/api/v1/data/stream' @@ -70,8 +74,9 @@ def __exit__(self, type, value, traceback): # --------------------------------------------- HTTP HANDLERS --------------------------------------------- - def api_call_with_auth(self, method, path, payload=None, headers={}, timeout=None, version='v1'): - """Makes a direct API call to EVA to endpoints that requires authentication. + def api_call_with_auth(self, method: str, path: str, payload: dict = None, headers: dict = {}, timeout: int = None, + version: str = 'v1') -> Response: + """Makes a direct API call to EVA to endpoints that require authentication. Note: This is used within the SDK and will unlikely be used externally. @@ -99,7 +104,8 @@ def api_call_with_auth(self, method, path, payload=None, headers={}, timeout=Non return self.__http_client.api_call_with_auth(method, path, payload=payload, headers=headers, timeout=timeout, version=version) - def api_call_no_auth(self, method, path, payload=None, headers={}, timeout=None, version='v1'): + def api_call_no_auth(self, method: str, path: str, payload: dict = None, headers: dict = {}, timeout: int = None, + version: str = 'v1') -> Response: """Makes a direct API call to EVA to endpoints that do not require authentication. Note: @@ -131,7 +137,7 @@ def api_call_no_auth(self, method, path, payload=None, headers={}, timeout=None, # Global - def versions(self): + def versions(self) -> Dict[str, str]: """Gets the API versions supported by the server and the Choreograph version installed on the robot. Returns: @@ -147,7 +153,7 @@ def versions(self): self.__logger.debug('Eva.versions called') return self.__http_client.api_versions() - def name(self): + def name(self) -> Dict[str, str]: """Gets the name of the robot. Returns: @@ -161,7 +167,7 @@ def name(self): return self.__http_client.name() # Auth - def auth_renew_session(self): + def auth_renew_session(self) -> None: """Renews the authenticated session token. Note: @@ -177,7 +183,7 @@ def auth_renew_session(self): return self.__http_client.auth_renew_session() - def auth_create_session(self): + def auth_create_session(self) -> str: """Creates a session token token. Note: @@ -197,7 +203,7 @@ def auth_create_session(self): return self.__http_client.auth_create_session() - def auth_invalidate_session(self): + def auth_invalidate_session(self) -> None: """Deletes/invalidates the API session token. Raises: @@ -211,7 +217,7 @@ def auth_invalidate_session(self): # Data - def data_snapshot(self): + def data_snapshot(self) -> Dict[str, Any]: """Returns a nested dictionary of the status of the robot. Returns: @@ -224,7 +230,7 @@ def data_snapshot(self): return self.__http_client.data_snapshot() - def data_snapshot_property(self, prop): + def data_snapshot_property(self, prop: str) -> Dict[str, Any]: """Returns a property from the data_snapshot dict. Args: @@ -244,7 +250,7 @@ def data_snapshot_property(self, prop): return self.__http_client.data_snapshot_property(prop) - def data_servo_positions(self): + def data_servo_positions(self) -> List[float]: """Returns a list of current joint angles in radians. Note: @@ -266,7 +272,7 @@ def data_servo_positions(self): # Users - def users_get(self): + def users_get(self) -> Dict[str, list]: """Returns the list within a dictionary of users. Returns: @@ -285,7 +291,7 @@ def users_get(self): # Config - def config_update(self, update): + def config_update(self, update: io.BytesIO) -> None: """Applies choreograph update file to robot. Note: @@ -310,7 +316,7 @@ def config_update(self, update): # GPIO - def gpio_set(self, pin, status): + def gpio_set(self, pin: str, status: Union[bool, str]) -> None: """Changes the state of the output pin. Note: @@ -335,7 +341,7 @@ def gpio_set(self, pin, status): return self.__http_client.gpio_set(pin, status) - def gpio_get(self, pin, pin_type): + def gpio_get(self, pin: str, pin_type: str) -> Union[bool, float]: """Gets the state of output/input pin. Args: @@ -358,7 +364,7 @@ def gpio_get(self, pin, pin_type): self.__logger.debug('Eva.gpio_get called') return self.__http_client.gpio_get(pin, pin_type) - def globals_edit(self, keys, values): + def globals_edit(self, keys: str, values: Union[bool, float, str]) -> dict: """Allows editing of exposed global values. Note: @@ -378,7 +384,7 @@ def globals_edit(self, keys, values): return self.__http_client.globals_edit(keys, values) # Toolpaths - def toolpaths_list(self): + def toolpaths_list(self) -> List[dict]: """Gets a list of saved toolpaths on the robot. Returns: @@ -396,7 +402,7 @@ def toolpaths_list(self): return self.__http_client.toolpaths_list() - def toolpaths_retrieve(self, ID): + def toolpaths_retrieve(self, ID: int) -> Dict[str, Any]: """Retrieves the toolpath using toolpath ID on the robot. Args: @@ -420,7 +426,7 @@ def toolpaths_retrieve(self, ID): return self.__http_client.toolpaths_retrieve(ID) - def toolpaths_save(self, name, toolpath): + def toolpaths_save(self, name: str, toolpath: dict) -> int: """Create a new toolpath or update an existing one (if the name is already used). Args: @@ -437,7 +443,7 @@ def toolpaths_save(self, name, toolpath): return self.__http_client.toolpaths_save(name, toolpath) - def toolpaths_use_saved(self, toolpathId): + def toolpaths_use_saved(self, toolpathId: int) -> None: """Sets the active toolpath from the toolpaths_list which can be homed, run, paused, and stopped. Args: @@ -453,7 +459,7 @@ def toolpaths_use_saved(self, toolpathId): return self.__http_client.toolpaths_use_saved(toolpathId) - def toolpaths_use(self, toolpathRepr): + def toolpaths_use(self, toolpathRepr: dict) -> None: """Sets the toolpath passed to it as the active toolpath which can be homed, run, paused, and stopped. Args: @@ -471,7 +477,7 @@ def toolpaths_use(self, toolpathRepr): self.__logger.debug('Eva.toolpaths_use called') return self.__http_client.toolpaths_use(toolpathRepr) - def toolpaths_delete(self, toolpathId): + def toolpaths_delete(self, toolpathId: int) -> None: """Deletes a toolpath specified from the toolpaths stored on the robot. Args: @@ -485,7 +491,7 @@ def toolpaths_delete(self, toolpathId): # Lock - def lock_status(self): + def lock_status(self) -> Dict[str, str]: """Indicates status and owner of the lock. Raises: @@ -501,8 +507,8 @@ def lock_status(self): self.__logger.debug('Eva.lock_status called') return self.__http_client.lock_status() - - def lock(self, wait=True, timeout=None): + # Todo - Change annotation once support has been deprecated for 3.6 + def lock(self, wait: bool = True, timeout: int = None) -> 'Eva': """Owns the lock/control of the robot. Note: @@ -513,7 +519,7 @@ def lock(self, wait=True, timeout=None): timeout (int): time in seconds to wait for lock availability Returns: - None + Eva: evasdk.Eva object Raises: EvaError: If it is not successful or if another user owns the lock @@ -530,7 +536,7 @@ def lock(self, wait=True, timeout=None): return self - def lock_renew(self): + def lock_renew(self) -> None: """Renews the lock session. Note: @@ -548,7 +554,7 @@ def lock_renew(self): return self.__http_client.lock_renew() - def unlock(self): + def unlock(self) -> None: """Closes the lock session cleanly. Note: @@ -567,7 +573,7 @@ def unlock(self): # Control - def control_wait_for_ready(self): + def control_wait_for_ready(self) -> None: """Waits for the robot to enter the "ready" (RobotState.READY) state. Returns: @@ -580,7 +586,7 @@ def control_wait_for_ready(self): return self.__http_client.control_wait_for_ready() - def control_wait_for(self, goal): + def control_wait_for(self, goal: Union[str, enumerate]) -> None: """Waits for the robot to enter a state specified in RobotState. Note: @@ -603,7 +609,7 @@ def control_wait_for(self, goal): return self.__http_client.control_wait_for(goal) - def control_home(self, wait_for_ready=True): + def control_home(self, wait_for_ready: bool = True) -> None: """Sends robot to home position of the active toolpath. Note: @@ -627,7 +633,7 @@ def control_home(self, wait_for_ready=True): return self.__http_client.control_home(wait_for_ready=wait_for_ready) - def control_run(self, loop=1, wait_for_ready=True, mode='teach'): + def control_run(self, loop: int = 1, wait_for_ready: bool = True, mode: str = 'teach') -> None: """Runs active toolpath for specified amount of loops. Note: @@ -657,7 +663,8 @@ def control_run(self, loop=1, wait_for_ready=True, mode='teach'): return self.__http_client.control_run(loop=loop, wait_for_ready=wait_for_ready, mode=mode) - def control_go_to(self, joints, wait_for_ready=True, max_speed=None, time_sec=None, mode='teach'): + def control_go_to(self, joints: list, wait_for_ready: bool = True, max_speed: float = None, time_sec: int = None, + mode: str = 'teach') -> None: """Move robot to specified joint angles. Note: @@ -691,7 +698,7 @@ def control_go_to(self, joints, wait_for_ready=True, max_speed=None, time_sec=No time_sec=time_sec, mode=mode) - def control_pause(self, wait_for_paused=True): + def control_pause(self, wait_for_paused: bool = True) -> None: """Pauses robot while in operation. Note: @@ -716,7 +723,7 @@ def control_pause(self, wait_for_paused=True): return self.__http_client.control_pause(wait_for_paused=wait_for_paused) - def control_resume(self, wait_for_running=True): + def control_resume(self, wait_for_running: bool = True) -> None: """Continues robot operation from PAUSED state. Note: @@ -740,7 +747,7 @@ def control_resume(self, wait_for_running=True): return self.__http_client.control_resume(wait_for_running=wait_for_running) - def control_cancel(self, wait_for_ready=True): + def control_cancel(self, wait_for_ready: bool = True) -> None: """Cancels PAUSED state, robot will enter READY state after function call. Note: @@ -765,7 +772,7 @@ def control_cancel(self, wait_for_ready=True): return self.__http_client.control_cancel(wait_for_ready=wait_for_ready) - def control_stop_loop(self, wait_for_ready=True): + def control_stop_loop(self, wait_for_ready: bool = True) -> None: """Stops robot once it has reached the end of the current running loop. Note: @@ -789,7 +796,7 @@ def control_stop_loop(self, wait_for_ready=True): return self.__http_client.control_stop_loop(wait_for_ready=wait_for_ready) - def control_reset_errors(self, wait_for_ready=True): + def control_reset_errors(self, wait_for_ready: bool = True) -> None: """Resets state of the robot to READY. Note: @@ -814,7 +821,7 @@ def control_reset_errors(self, wait_for_ready=True): return self.__http_client.control_reset_errors(wait_for_ready=wait_for_ready) # Collision Detection - def control_configure_collision_detection(self, enabled, sensitivity): + def control_configure_collision_detection(self, enabled: bool, sensitivity: str) -> None: """Sets the sensitivity of the collision detection feature. Note: @@ -839,7 +846,7 @@ def control_configure_collision_detection(self, enabled, sensitivity): self.__logger.debug('Eva.collision_detection called') return self.__http_client.control_configure_collision_detection(enabled=enabled, sensitivity=sensitivity) - def control_acknowledge_collision(self, wait_for_ready=True): + def control_acknowledge_collision(self, wait_for_ready: bool = True) -> None: """Acknowledges collision incident and sets the robot to READY state. Note: @@ -863,7 +870,7 @@ def control_acknowledge_collision(self, wait_for_ready=True): return self.__http_client.control_acknowledge_collision(wait_for_ready=wait_for_ready) # Calc - def calc_forward_kinematics(self, joints, fk_type=None, tcp_config=None): + def calc_forward_kinematics(self, joints: list, fk_type: str = None, tcp_config: dict = None) -> Dict[str, Any]: """Gives the position of the robot and orientation of end-effector in 3D space. Args: @@ -886,8 +893,8 @@ def calc_forward_kinematics(self, joints, fk_type=None, tcp_config=None): return self.__http_client.calc_forward_kinematics(joints, fk_type=fk_type, tcp_config=tcp_config) - def calc_inverse_kinematics(self, guess, target_position, target_orientation, tcp_config=None, - orientation_type=None): + def calc_inverse_kinematics(self, guess: list, target_position: dict, target_orientation: dict, + tcp_config: dict = None, orientation_type: str = None) -> List[float]: """Gives a list of joint angles calculated from XYZ and end-effector orientation coordinates. Args: @@ -915,7 +922,7 @@ def calc_inverse_kinematics(self, guess, target_position, target_orientation, tc tcp_config=tcp_config, orientation_type=orientation_type) - def calc_nudge(self, joints, direction, offset, tcp_config=None): + def calc_nudge(self, joints: list, direction: str, offset: float, tcp_config: dict = None) -> List[float]: """Calculates joint angles required to move robot a certain distance in XYZ space. Raises: @@ -941,7 +948,7 @@ def calc_nudge(self, joints, direction, offset, tcp_config=None): return self.__http_client.calc_nudge(joints, direction, offset, tcp_config=tcp_config) - def calc_pose_valid(self, joints, tcp_config=None): + def calc_pose_valid(self, joints: list, tcp_config: dict = None) -> bool: """Checks whether requested joint angles are able to be reached successfully. Args: @@ -965,7 +972,7 @@ def calc_pose_valid(self, joints, tcp_config=None): return self.__http_client.calc_pose_valid(joints, tcp_config=tcp_config) - def calc_rotate(self, joints, axis, offset, tcp_config=None): + def calc_rotate(self, joints: list, axis: str, offset: float, tcp_config: dict = None) -> List[float]: """Calculates joint angles required to rotate end-effector in a given direction. Args: