## The Notebook on Controlling the Thymio

For this project and for the control of the Thymio I had the vision of a wrapper for the node and the client that would take care of everything in a very modular way. I the opted to create another more semantic wraper of AsyncClientInterface called ThymioRobot, with AsyncClientInterface serving as a lower level class

### AsyncClientInterface

In [None]:
from typing import Callable, Any
import dependencies.constants_robot as cst
from tdmclient import ClientAsync, aw


class InvalidArgumentError(Exception):
    pass


class AsyncClientInterface:
    """
    This is a class aimed to make the handling of the node easier and more readable
    """
    LEFT_MOTOR = "motor.left.target"
    RIGHT_MOTOR = "motor.right.target"
    RIGHT_SPEED = "motor.right.speed"
    LEFT_SPEED = "motor.left.speed"
    PROX_HORIZONTAL_VALUES = "prox.horizontal"
    LEDS_BOTTOM_LEFT = "leds.bottom.left"
    LEDS_BOTTOM_RIGHT = "leds.bottom.right"
    PROX_GROUND_DELTA = "prox.ground.delta"
    PROX_GROUND_REFLECTED = "prox.ground.reflected"
    MIC_INTENSITY = "mic.intensity"
    LEDS_TOP = "leds.top"
    __right_motor_value: int = 0
    __left_motor_value: int = 0
    _delta_calib = 1
    _refl_calib = 1

    def __init__(self, delta_calib: float = cst.DELTA_ROBOT, refl_calib: float = cst.REFL_ROBOT):
        """
        :param delta_calib: input known calibration for me it is 1.35
        :param refl_calib: input known calibration for me it is also about 1.35
        :param time_to_turn_const: time in seconds to turn 360° with motor speed = 150
        :type time_to_turn_const: float
        """
        self.client = ClientAsync()
        self.node = aw(self.client.wait_for_node())
        aw(self.node.lock())
        # Code to desactivate leds on start
        aw(self.node.compile("""call leds.prox.v(0, 0)\ncall leds.prox.h(0, 0, 0, 0, 0, 0, 0, 0)\ncall leds.temperature(0, 0)\ncall leds.top(0, 0, 0)\ncall leds.circle(0, 0, 0, 0, 0, 0, 0, 0)\ncall leds.buttons(0, 0, 0, 0)\ncall leds.sound(0)\ncall leds.rc(0)"""))
        self._refl_calib = refl_calib
        self._delta_calib = delta_calib


    def sleep(self, seconds):
        aw(self.client.sleep(seconds))

    def get_refl_calib(self) -> float:
        return self._refl_calib

    def get_del_calib(self) -> float:
        return self._delta_calib

    def set_motors(self, *args, **kwargs) -> None:
        """
        A flexible function that lets you input the speed of the motors

        :param args: left_motor then right_motor
        :type args: int
        :param kwargs: can put in right_motor or left_motor value in directly or put in array with length 2
        :type kwargs: int or list[int, int]

        :return: None
        """
        if len(args) == 2:
            aw(self.node.set_variables({
                self.LEFT_MOTOR: [args[0]],
                self.RIGHT_MOTOR: [args[1]]
            }))

        elif len(args) == 1:
            aw(self.node.set_variables({
                self.LEFT_MOTOR: [args[0]]
            }))

        if 'left_motor' in kwargs:
            if isinstance(kwargs['left_motor'], int):
                aw(self.node.set_variables({
                    self.LEFT_MOTOR: [kwargs['left_motor']]
                }))
            else:
                raise TypeError("value attributed to left_motor was not int")

        if 'right_motor' in kwargs:
            if isinstance(kwargs['right_motor'], int):
                aw(self.node.set_variables({
                    self.RIGHT_MOTOR: [kwargs['right_motor']]
                }))
            else:
                raise TypeError("value attributed to right_motor was not int")
        if 'motor' in kwargs:
            if isinstance(kwargs['motor'], list) and len(kwargs['motor']) == 2:
                aw(self.node.set_variables({
                    self.LEFT_MOTOR: [int(kwargs['motor'][0])],
                    self.RIGHT_MOTOR: [int(kwargs['motor'][1])]
                }))
            else:
                raise TypeError("either array too long or too short or did not respect type list")

    def calibrate_sensor_prox_ref(self, delta: bool, set_val: bool = True, preset_Val: float = None) -> float | bool:
        """
            A function that returns an int (sensor_1/sensor_2) used to calibrate the two sensors of
            prox_ground_reflected

            :param set_val: defines whether you would like to set the internal calib val
            :type set_val: bool

            :param delta: if function should be used for delta or reflected
            :type delta: bool

            :returns: ref_sensor_1/ref_sensor_2
            :rtype: float or bool
        """

        ref_value = self.get_sensor("prox_del") if delta else self.get_sensor("prox_ref")
        calibration = ref_value[0] / ref_value[1]
        if preset_Val:
            if delta:
                self._delta_calib = preset_Val
            else:
                self._refl_calib = preset_Val
            return preset_Val
        elif ref_value:
            if set_val:
                if delta:
                    self._delta_calib = calibration
                else:
                    self._refl_calib = calibration
            return calibration
        else:
            return False

    def set_led(self, sensor: str, value: int | list[int]) -> None:
        """
        A function that enables you to set sensors through the node.set_variables method the leds expect values between 0 and 32
        :param sensor: input your sensor
            - 'leds_top': expects three rgb values

        :param value: a list or an array containing the values you want to output
        """

        if sensor == "leds_top":
            if isinstance(value, list) and len(value) == 3:
                aw(self.node.wait_for_variables({"leds.top"}))
                for index, rgb in enumerate(value):
                    self.node.v.leds.top[index] = rgb
            else:
                raise TypeError("either did not input list or list was not of right length")

        self.node.flush()


    def get_sensor(self, sensor: str, calibrated: bool = False) -> list[int] | int | bool:
        """
        This is a function that returns the value of the sensor it accepts only a string and uses a switch case method


        :param sensor : Accepts  a limited number of strings\
            - 'prox_del' : this presents the array of prox ground delta with the option of calibration\
            - 'prox_ref' : this presents the array of prox ground reflected with the option of calibration\
            - self.CONSTANT: this is presented in the example in the description

        :type sensor: str

        :return: returns the value or array of a sensor, or returns False if invalid string is passed as argument
        :rtype: list[int] or int or bool or list[bool]

        Example:

        >>>Client = AsyncClientInterface(1.35,1.35)

        >>>print(Client.get_sensor(Client.PROX_GROUND_DELTA))
        """
        if sensor == "prox_del":
            aw(self.node.wait_for_variables({self.PROX_GROUND_DELTA}))
            if calibrated:
                calibrated_sensor_2 = int(list(self.node.v.prox.ground.delta)[1]) * self._delta_calib
                return [int(list(self.node.v.prox.ground.delta)[0]), int(calibrated_sensor_2)].copy()
            else:
                return list(self.node.v.prox.ground.delta).copy()

        elif sensor == "prox_ref":
            aw(self.node.wait_for_variables({self.PROX_GROUND_REFLECTED}))
            if calibrated:
                calibrated_sensor_2 = int(list(self.node.v.prox.ground.reflected)[1]) * self._refl_calib
                return [int(list(self.node.v.prox.ground.reflected)[0]), int(calibrated_sensor_2)].copy()
            else:
                return list(self.node.v.prox.ground.reflected).copy()

        else:
            aw(self.node.wait_for_variables({sensor}))
            attributes = sensor.split(".")
            node_with_attr = getattr(self.node, "v")
            for attribute in attributes:
                node_with_attr = getattr(node_with_attr, attribute)

            try:
                if not type(node_with_attr) == int:
                    return list(node_with_attr)
                else:
                    return node_with_attr
            except AttributeError:
                raise InvalidArgumentError("attribute was not found due to wrong input being put in")



    def __del__(self):
        aw(self.node.unlock())


Let us start by analysing the Initialisation

The constructor and destructor takes the seven lines of code below and automates them

In [None]:
from tdmclient import ClientAsync

client = ClientAsync()
node = aw(client.wait_for_node())
aw(node.lock())
# Code to desactivate leds on start
aw(node.compile("""call leds.prox.v(0, 0)\ncall leds.prox.h(0, 0, 0, 0, 0, 0, 0, 0)\ncall leds.temperature(0, 0)\ncall leds.top(0, 0, 0)\ncall leds.circle(0, 0, 0, 0, 0, 0, 0, 0)\ncall leds.buttons(0, 0, 0, 0)\ncall leds.sound(0)\ncall leds.rc(0)"""))

aw(node.unlock())
        

giving us this code instead

In [None]:
from dependencies.AsyncClientInterface import AsyncClientInterface

AsyncClient = AsyncClientInterface()

Let us now move on to the function that are important the project the first being set_motors

Through this class I aimed to control the motors through a very permissive method. I had to work around the problem of their being no possibility to override functions like in C++ in python I there had to use args, kwargs and some logic. This gives us a very permissive function

In [None]:
def set_motors(self, *args, **kwargs) -> None:
        """
        A flexible function that lets you input the speed of the motors

        :param args: left_motor then right_motor
        :type args: int
        :param kwargs: can put in right_motor or left_motor value in directly or put in array with length 2
        :type kwargs: int or list[int, int]

        :return: None
        """
        if len(args) == 2:
            aw(self.node.set_variables({
                self.LEFT_MOTOR: [args[0]],
                self.RIGHT_MOTOR: [args[1]]
            }))

        elif len(args) == 1:
            aw(self.node.set_variables({
                self.LEFT_MOTOR: [args[0]]
            }))

        if 'left_motor' in kwargs:
            if isinstance(kwargs['left_motor'], int):
                aw(self.node.set_variables({
                    self.LEFT_MOTOR: [kwargs['left_motor']]
                }))
            else:
                raise TypeError("value attributed to left_motor was not int")

        if 'right_motor' in kwargs:
            if isinstance(kwargs['right_motor'], int):
                aw(self.node.set_variables({
                    self.RIGHT_MOTOR: [kwargs['right_motor']]
                }))
            else:
                raise TypeError("value attributed to right_motor was not int")
        if 'motor' in kwargs:
            if isinstance(kwargs['motor'], list) and len(kwargs['motor']) == 2:
                aw(self.node.set_variables({
                    self.LEFT_MOTOR: [int(kwargs['motor'][0])],
                    self.RIGHT_MOTOR: [int(kwargs['motor'][1])]
                }))
            else:
                raise TypeError("either array too long or too short or did not respect type list")


We can notice that the internal function code uses variables such as self.LEFT_MOTOR and self.RIGHT_MOTOR which are set to motor.left.target and motor.right.target respectively. These are put under the form of variables to avoid magic strings and make code more readable

This finally means the function can be called in different ways

In [None]:
AsyncClient.set_motors(50, 50)
AsyncClient.set_motors(left_motor=100, right_motor=100)
AsyncClientInterface.set_motors(motor=[100, 100])

I will skip over the functions that have not been used for the project I decided to keep them because they are a nice addition and could have potentially been useful

Now onto get_sensor which is a polyvalent function that enables us to get the value of any sensor

In [None]:
    def get_sensor(self, sensor: str, calibrated: bool = False) -> list[int] | int | bool:
        """
        This is a function that returns the value of the sensor it accepts only a string and uses a switch case method


        :param sensor : Accepts  a limited number of strings\
            - 'prox_del' : this presents the array of prox ground delta with the option of calibration\
            - 'prox_ref' : this presents the array of prox ground reflected with the option of calibration\
            - self.CONSTANT: this is presented in the example in the description

        :type sensor: str

        :return: returns the value or array of a sensor, or returns False if invalid string is passed as argument
        :rtype: list[int] or int or bool or list[bool]

        Example:

        >>>Client = AsyncClientInterface(1.35,1.35)

        >>>print(Client.get_sensor(Client.PROX_GROUND_DELTA))
        """
        if sensor == "prox_del":
            aw(self.node.wait_for_variables({self.PROX_GROUND_DELTA}))
            if calibrated:
                calibrated_sensor_2 = int(list(self.node.v.prox.ground.delta)[1]) * self._delta_calib
                return [int(list(self.node.v.prox.ground.delta)[0]), int(calibrated_sensor_2)].copy()
            else:
                return list(self.node.v.prox.ground.delta).copy()

        elif sensor == "prox_ref":
            aw(self.node.wait_for_variables({self.PROX_GROUND_REFLECTED}))
            if calibrated:
                calibrated_sensor_2 = int(list(self.node.v.prox.ground.reflected)[1]) * self._refl_calib
                return [int(list(self.node.v.prox.ground.reflected)[0]), int(calibrated_sensor_2)].copy()
            else:
                return list(self.node.v.prox.ground.reflected).copy()

        else:
            aw(self.node.wait_for_variables({sensor}))
            attributes = sensor.split(".")
            node_with_attr = getattr(self.node, "v")
            for attribute in attributes:
                node_with_attr = getattr(node_with_attr, attribute)

            try:
                if not type(node_with_attr) == int:
                    return list(node_with_attr)
                else:
                    return node_with_attr
            except AttributeError:
                raise InvalidArgumentError("attribute was not found due to wrong input being put in")


there are two sensor which needed calibration on the thymio so I added them as a special condition which can be found with "prox_del" and "prox_refl" so they follow textbook code to get the values of sensors from the node. They were not used for the project so I will not look at that section

Now the most interesting part of the code is this snippet where I split the incoming string use getattr function in order to get the values of the sensors 

In the upcoming section of code I split the code at full stops present in the incoming string and then use getattr function imported from python in order to emulate, if we call the function with "motor.left.speed" the code fetching node.v.motor.left.speed  

In [None]:
else:
    aw(self.node.wait_for_variables({sensor}))
    attributes = sensor.split(".")
    node_with_attr = getattr(self.node, "v")
    for attribute in attributes:
        node_with_attr = getattr(node_with_attr, attribute)

    try:
        if not type(node_with_attr) == int:
            return list(node_with_attr)
        else:
            return node_with_attr
    except AttributeError:
        raise InvalidArgumentError("attribute was not found due to wrong input being put in")


this enables us to write lean code such as

In [None]:
AsyncClient.get_sensor(AsyncClient.LEFT_SPEED)

Then I put in set_led enables to set the leds and checks for 3 values.

In [None]:
    def set_led(self, sensor: str, value: int | list[int]) -> None:
        """
        A function that enables you to set sensors through the node.set_variables method the leds expect values between 0 and 32
        :param sensor: input your sensor
            - 'leds_top': expects three rgb values

        :param value: a list or an array containing the values you want to output
        """

        if sensor == "leds_top":
            if isinstance(value, list) and len(value) == 3:
                aw(self.node.wait_for_variables({"leds.top"}))
                for index, rgb in enumerate(value):
                    self.node.v.leds.top[index] = rgb
            else:
                raise TypeError("either did not input list or list was not of right length")

        self.node.flush()


We can then set the upper leds to blue with this bit of code

In [None]:
AsyncClient.set_leds("leds_top",[0,0, 32])

### The ThymioRobot class

it is important to note that for this class we get values for pos and goal that use the main convention of the class which we then had to convert into the astolfi conventions. Therefore we inherit a position which is an array with $[x,y,\theta]$ and we have to convert it into $[x,y,\theta+\pi]$ to then be able to apply the controller.

In [None]:
import numpy as np

import dependencies.constants_robot as constants_robot
from dependencies.AsyncClientInterface import AsyncClientInterface
from dependencies.helper_functions import convert_angle


class ThymioRobot():
    """
    :param np.array self.position: describes the initial position and the angle in the clockwise direction
    """

    def __init__(self, init_position=None,
                 kappa_alpha: float = constants_robot.DEFAULT_KAPPA_ALPHA,
                 kappa_rho: float = constants_robot.DEFAULT_KAPPA_RHO,
                 kappa_beta: float = constants_robot.DEFAULT_KAPPA_BETA,
                 threshold: float = constants_robot.DEFAULT_THRESHOLD):
        if init_position is None:
            self._position = np.array([0, 0, 0])
        else:
            self._position = init_position
        self.__THRESHOLD = threshold
        self.__KAPPA_ALPHA = kappa_alpha
        self.__KAPPA_BETA = kappa_beta
        self.__KAPPA_RHO = kappa_rho
        self.movement_thread = None
        self.goal = None
        self.__WHEEL_DISTANCE = 0.09  # in m
        self.AsyncClient = AsyncClientInterface()
        self.on_objective: bool = False
        self.is_not_happy = True

    def set_new_goal(self, new_goal: list[int, int, int] | np.ndarray):
        self.goal = np.array(new_goal)

    def set_new_position(self, position: np.array):
        self._position = position

    def be_happy(self):
        self.not_angry()
        self.is_not_happy = False
        self.AsyncClient.set_motors(0, 0)

    def stop(self):
        self.AsyncClient.set_motors(left_motor=0, right_motor=0)

    def apply_local_nav(self):
        """
        Applies the local navigation with a simple ANN model
        """
        # first row left motor second right
        W = np.array([[1, 1, 1, -1, -1], [-1, -1, -1, 1, 1]]) * constants_robot.LOCAL_NAV_FACTOR
        lr_motor = W @ self.get_sensors().T + constants_robot.LOCAL_NAV_FORWARD_SPEED
        self.AsyncClient.set_motors(left_motor=int(lr_motor[0]), right_motor=int(lr_motor[1]))

    def get_sensors(self, sensor: str = "horizontal_sensor") -> list | np.ndarray | int:
        """
        :param sensor: either "horizontal_sensor" one gets classical horizontal sensors or "wheels"
        :type sensor: str
        then right wheel speed "wheels_target" the value stored in aseba.
        :rtype np.ndarray:
        :return: left then right wheel or horizontal array of sensors
        """
        if sensor == "horizontal_sensor":
            return np.array(self.AsyncClient.get_sensor(self.AsyncClient.PROX_HORIZONTAL_VALUES))[0:5]
        elif sensor == "wheels":
            return np.array([self.AsyncClient.get_sensor(self.AsyncClient.LEFT_SPEED),
                             self.AsyncClient.get_sensor(self.AsyncClient.RIGHT_SPEED)])

    def not_angry(self):
        """
        set the top leds to off
        """
        self.AsyncClient.set_led("leds_top", [0, 0, 0])

    def angry(self):
        """
        set the leds to red
        """
        self.AsyncClient.set_led("leds_top", [32, 0, 0])

    def apply_motor_command(self):
        """
        Applies the astolfi command in accordance to the goal and the position we have given to
        """
        movement_vector = self.goal - self._position
        rho = np.sqrt(movement_vector[1] ** 2 + movement_vector[0] ** 2)
        if rho < self.__THRESHOLD:
            self.on_objective = True
            return
        # we add np.pi to position[2] in order to adapt to alstofi referential as there is a difference in referential
        alpha = convert_angle(-self._position[2] + np.pi + np.arctan2(-movement_vector[1], movement_vector[0]))
        beta = convert_angle(- self._position[2] - alpha + np.pi)
        forward_speed = self.__KAPPA_RHO * rho
        turning_velocity = -(self.__WHEEL_DISTANCE / 2) * (self.__KAPPA_ALPHA * alpha + self.__KAPPA_BETA * beta)
        if forward_speed > 150:
            turning_velocity *= 150 / forward_speed
            forward_speed = 150

        movement_array = [-turning_velocity + forward_speed, turning_velocity + forward_speed]
        self.AsyncClient.set_motors(left_motor=int(np.floor(movement_array[1])),
                                    right_motor=int(np.floor(movement_array[0])))
        self.on_objective = False
        


The setters and getters such as get_sensors, set_new_goal and set_new_position are simple functions which are essential to the proper functionning of the class.

The more interesting functions are the apply_motor_command and the local_nav command

In [None]:
def apply_motor_command(self):
        """
        Applies the astolfi command in accordance to the goal and the position we have given to
        """
        movement_vector = self.goal - self._position
        rho = np.sqrt(movement_vector[1] ** 2 + movement_vector[0] ** 2)
        if rho < self.__THRESHOLD:
            self.on_objective = True
            return
        # we add np.pi to position[2] in order to adapt to the different astolfi referential
        alpha = convert_angle(-self._position[2] + np.pi + np.arctan2(-movement_vector[1], movement_vector[0]))
        beta = convert_angle(- self._position[2] - alpha + np.pi)
        forward_speed = self.__KAPPA_RHO * rho
        turning_velocity = (self.__WHEEL_DISTANCE / 2) * (self.__KAPPA_ALPHA * alpha + self.__KAPPA_BETA * beta)
        if forward_speed > 150:
            turning_velocity *= 150 / forward_speed
            forward_speed = 150

        movement_array = [-turning_velocity + forward_speed, turning_velocity + forward_speed]
        self.AsyncClient.set_motors(left_motor=int(np.floor(movement_array[0])),
                                    right_motor=int(np.floor(movement_array[1])))
        self.on_objective = False
     

Which apply the astolfy command

Indeed in this function we first extract the movement vector($[\Delta x, \Delta y, \Delta \theta]$) to be able to set rho, alpha and beta through $\Delta x$ and $\Delta y$

From this we can calculate a $\rho=||\mathbf{[x,y]}||=\sqrt{x^2+y^2}$ which then enables us to check whether we are in the threshhold in which case we just return as we want to keep the previous motor target profile. Therefore we just exit the function as there is no need for further execution

We can then calculate $\alpha=-(\theta \pm \pi)+ \arctan(\frac{-\Delta y}{\Delta x})$

We use $-\Delta y$ due to the fact that we are in a different convention in main

we finally calculate $\beta = -(\theta \pm \pi) -\alpha$

We then apply the force profile accordingly to the course which we divide $\nu = \kappa_\rho \rho$ and $\omega =\kappa_\alpha \alpha+\kappa_\beta \beta$ which we convert into a force profile

In [None]:
if forward_speed > 150:
    turning_velocity *= 150 / forward_speed
    forward_speed = 150


we modulate the controller in order for the thymio to be more insensitive to the size of the terrain which can in extreme cases greatly increase the rho and therefore forward speed drowning out the rotation speed completely e.g left_motor= 510; right_motor=490$\equiv$left_motor = 500; right_motor=500 seeing as how the motors cannot reach those speeds. This is due to the fact that $\rho\in[0;\infty)$ is not bounded whereas $\theta\in[-\pi;\pi)$ is.

We then apply these force profiles to the wheels

Let us now look at the apply_local_nav function

In [None]:
def apply_local_nav(self):
        """
        Applies the local navigation with a simple ANN model
        """
        # first row left motor second right
        W = np.array([[1, 1, 1, -1, -1], [-1, -1, -1, 1, 1]]) * constants_robot.LOCAL_NAV_FACTOR
        lr_motor = W @ self.get_sensors().T + constants_robot.LOCAL_NAV_FORWARD_SPEED
        self.AsyncClient.set_motors(left_motor=int(lr_motor[0]), right_motor=int(lr_motor[1]))


Which, in a way very similar to the exercises, applies a tensor that takes in all the sensor values multiplies them by a matrix to give us the values for the wheels. For instance if the values of the left sensor are positive we want to take them and add them to the result so that the left motor speed increase when an obstacle is detected on the left. We also want to set the weights of the right sensors to be negative for the first row in order to put the left wheels in reverse when the thymio sees an obstacle to its right.