# Basics of Mobile Robotics - Main Notebook


In [None]:
# Import libraries
import numpy as np
import cv2 as cv
from tdmclient import ClientAsync
from Local_Nav import Local_Nav as LN
from Global_Nav import Global_Nav as GN
# import RepeatedTimer


# Team Members 

Catarina 
Dimitri Hollosi : MA3 Mechanical Engineering student, specialising in automation and control with a minor in space technologies
Gonçalo 
Richard

# General Considerations

Here the main code structure and functionalities aim to be detailed so as to gain an understanding in the underlying structure behind the Thymio implementations. Generally speaking, the code can be split into 3 main parts, namely Camera Vision, Global Navigation and Local Navigation. The latter includes all aspects related to obstacle avoidance, which was iteratively tuned to obtain desired performance. 
One important element to consider prior to delving into the implemented functionalities within the code would be the chosen environment setup for the final project implementation. Indeed, it was decided to use of of the team member's lower cupboard portion so as to benefit from a relatively "stable" environment, which could further simplifiy the calibration, lighting and initial setup process. This environment does however come with its limitations, as this implies that the Thymio must navigate in a restricted and confined space, namely implying that the local navigation must take that into account (i.e white walls and corners must be accounted for) and yield a relatively high response rate without interfering with the global navigation tasks. 

# Code Structure

As previously mentioned, ... 
### Wait until final main to talk about this in detail

# Local Navigation

The Local Navigation for this project was implemented as a class in order to be able to benefit from conciceness and structure. Local_Nav takes as an input the asynchronous client, the awaited node and finally the motor speeds. The constructor then defines the "self" variables which are to be used throughout the different methods, which proves to be particularly useful as it is readily able to update the values which are to be sent to the thymio (through the node). 

The main class method is obstacle_avoidance and is structured as follows:

The get_sensor_data(self) method gathers horizontal proximity sensor readings at each timestep, which is "fed" into the analyse_data(self) method. The latter returns a flag = 1 if any of the read values were higher than the lower threshold (iteratively deduced to be worth 2800), at which point the obstacle avoidance manoeuvres are returned. From there, depending on which sensors were triggered, the resulting motor speeds are adjusted by means of a weighted gain and directly updated through the ''' self.node.send_set_variables(adjust_speed) ''' command. It ought to be mentioned that three gains (ranging from low to high) were implemented depending on the intensity of the sensed values. Furthermore, the gains are obtained through and average of the extremity sensors and the "mid-extremity" sensors (the ones in between the front middle and each extremity). This allows to single handedly capture most of the real-case scenarios, as testing showed that the Thymio could be as likely to encounter any of the three following scenarios : 

- Only the extremity sensors being triggered
- Only the mid-extremity sensors being triggered
- A combination of both

Therefore, this implementation of the gains allows some flexbility in the manoeuvering should an obstacle be encountered. Regarding the lower threshold of 2800, this was initially set to be of lower value (i.e 1500) but unfortunately proved to be too  sensitive to the walls in the confined environment that the Thymio operated in. To amend to this, the walls were covered in relatively dark material so as to decrease the sensitivity of the sensors.

Finally, the local navigation is easily implemented in the "main" module as : 

local_nav = LN(client, node, cruising, cruising)

flag_obs = local_nav.analyse_data()

    if flag_obs == 1:
        task = local_nav.obstacle_avoidance2() #trigger obstacle avoidance task depending on if flag detected 
    else:
    ...

In [1]:
import math
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d

class Local_Nav:

    def __init__(self,client,node, motor_speed_left,motor_speed_right):
        self.motor_speed_left = motor_speed_left
        self.motor_speed_right = motor_speed_right
        self.client = client
        self.node = node 
        self.sat_speed = 500
        self.lower_threshold = 2800 #lower threshold for proximity sensor values
        self.middle_threshold = 3500
        self.upper_threshold = 4000
        
        #The measures were obtained empirically...
        self.sensor_distances = np.array([15, 9.6, 9, 8.5, 7.5, 7, 6, 5, 4, 3, 2.5, 2, 1.5, 0.5, 0.2, 0])
        self.sensor_measurements = np.array(
            [0, 1186, 1420, 1510, 1715, 1830, 2069, 2250, 2468, 2850, 3155, 3468, 3940, 4355, 4432, 4500])
        self.center_offset = np.array([5.5, 5.5])
        self.sensor_pos_from_center = np.array(
            [[0.9, 9.4], [3.1, 10.5], [5.5, 11.0], [8.0, 10.4], [10.2, 9.3], [2.5, 0], [8.5, 0]]) - self.center_offset
        self.sensor_angles = np.array([120, 105, 90, 75, 60, -90, -90]) * math.pi / 180
        self.thymio_coords = np.array([[0, 0], [11, 0], [11, 8.5], [10.2, 9.3],
                                  [8, 10.4], [5.5, 11], [3.1, 10.5],
                                  [0.9, 9.4], [0, 8.5], [0, 0]]) - self.center_offset

#######################################################################################################################
########################################   Local Navigation Section  ##################################################
#######################################################################################################################

    def get_sensor_data(self):
        self.client.aw(self.node.wait_for_variables({"prox.horizontal"}))
        prox_sens = self.node.v.prox.horizontal
        return  list(prox_sens)

    def analyse_data(self):
        prox_sens_data = self.get_sensor_data()
        check = [i for i in prox_sens_data if i >= self.lower_threshold]
        if any(check):
            flag = 1
            return flag
        else :
            flag = 0
            return flag

    def obstacle_avoidance(self):
        ''' If a flag is detected, then obstacle avoidance will be activated. The flag returns 1 if any of the sensors 
        measured something higher that the self.lower_threshold value. The latter was iteratively tuned to adjust to the
        constraining environment conditions, and could be made smaller if not in a confined space (with white walls...)
        for better results.'''
        
        flag = self.analyse_data()
        obstSpeedGain = [6, 4, 2]
        prox_sens_data = self.get_sensor_data()
        extermity_sens = [prox_sens_data[0], prox_sens_data[4]]
        mid_extermity_sens = [prox_sens_data[1], prox_sens_data[3]]
        middle_sensor = prox_sens_data[2]
        gain_low = [(mid_extermity_sens[0] + extermity_sens[0]) * obstSpeedGain[2] // 200,
                    (mid_extermity_sens[1] + extermity_sens[1]) * obstSpeedGain[2] // 200]
        gain_mid = [(mid_extermity_sens[0] + extermity_sens[0]) * obstSpeedGain[1] // 200,
                (mid_extermity_sens[1] + extermity_sens[1]) * obstSpeedGain[1] // 200]
        gain_high = [(mid_extermity_sens[0] + extermity_sens[0]) * obstSpeedGain[0] // 200,
                (mid_extermity_sens[1] + extermity_sens[1]) * obstSpeedGain[0] // 200]
        if flag == 1:
            if extermity_sens[0] or mid_extermity_sens[0] >= self.lower_threshold:
                adjust_speed = self.motors(self.motor_speed_left + gain_low[0],
                                           -self.motor_speed_right - gain_low[1])
                self.node.send_set_variables(adjust_speed)

            elif extermity_sens[1] or mid_extermity_sens[1] >= self.lower_threshold:
                adjust_speed = self.motors(-self.motor_speed_left - gain_low[0],
                                           +self.motor_speed_right + gain_low[1])
                self.node.send_set_variables(adjust_speed)

            elif self.lower_threshold <= middle_sensor <= self.middle_threshold:
                if mid_extermity_sens[0] <= mid_extermity_sens[1]:
                    adjust_speed = self.motors(-self.motor_speed_left - gain_mid[0],
                                               self.motor_speed_right + gain_mid[1])
                else:
                    adjust_speed = self.motors(self.motor_speed_left + gain_mid[0],
                                                -self.motor_speed_right - gain_mid[1])
                self.node.send_set_variables(adjust_speed)

            elif middle_sensor >= self.lower_threshold and mid_extermity_sens[0] >= self.lower_threshold:
                    adjust_speed = self.motors(-self.motor_speed_left - gain_mid[0],
                                               -self.motor_speed_right - gain_mid[1])
                    if mid_extermity_sens[0] >= self.upper_threshold :
                        adjust_speed = self.motors(-self.motor_speed_left - gain_high[0],
                                                   -self.motor_speed_right - gain_high[1])
                    self.node.send_set_variables(adjust_speed)

            elif middle_sensor >= self.lower_threshold and mid_extermity_sens[1] >= self.lower_threshold:
                    adjust_speed = self.motors(-self.motor_speed_left - gain_mid[0],
                                               -self.motor_speed_right - gain_mid[1])
                    if mid_extermity_sens[0] >= self.upper_threshold :
                        adjust_speed = self.motors(-self.motor_speed_left - gain_high[0],
                                                   - self.motor_speed_right -  gain_high[1])
                    self.node.send_set_variables(adjust_speed)

    def motors(self, motor_speed_left, motor_speed_right):
        if motor_speed_left >= 500:
            return {
                "motor.left.target": [self.sat_speed],
                "motor.right.target": [motor_speed_right],
            }
        elif motor_speed_right >= 500:
            return {
                "motor.left.target": [motor_speed_left],
                "motor.right.target": [self.sat_speed],
            }
        elif motor_speed_left <= -500:
            return {
                "motor.left.target": [-self.sat_speed],
                "motor.right.target": [motor_speed_right],
            }
        elif motor_speed_right <= -500:
            return {
                "motor.left.target": [motor_speed_left],
                "motor.right.target": [-self.sat_speed],
            }
        else:
            return {
                "motor.left.target": [motor_speed_left],
                "motor.right.target": [motor_speed_right],
            }

#######################################################################################################################
####################################   Local Occupancy Graph Generation ###############################################
#######################################################################################################################

    def sensor_val_to_cm_dist(self, sens_val):
        """
        Returns the distance corresponding to the sensor value based
        on the sensor characteristics
        :param val: the sensor value that you want to convert to a distance
        :return: corresponding distance in cm
        """
        if sens_val == 0:
            return np.inf

        f = interp1d(self.sensor_measurements, self.sensor_distances)
        return f(sens_val).item()

    def obstacles_pos_from_sensor_vals(self, sensor_vals):
        """
        Returns a list containing the position of the obstacles
        w.r.t the center of the Thymio robot.
        :param sensor_vals: sensor values provided clockwise starting from the top left sensor.
        :return: numpy.array() that contains the position of the different obstacles
        """
        print(sensor_vals)
        dist_to_sensor = [self.sensor_val_to_cm_dist(x) for x in sensor_vals]
        dx_from_sensor = [d * math.cos(alpha) for (d, alpha) in zip(dist_to_sensor, self.sensor_angles)]
        dy_from_sensor = [d * math.sin(alpha) for (d, alpha) in zip(dist_to_sensor, self.sensor_angles)]
        obstacles_pos = [[x[0] + dx, x[1] + dy] for (x, dx, dy) in
                         zip(self.sensor_pos_from_center, dx_from_sensor, dy_from_sensor)]
        return np.array(obstacles_pos)

    def rotate(self, angle, coords):
        """
        Rotates the coordinates of a matrix by the desired angle
        :param angle: angle in radians by which we want to rotate
        :return: numpy.array() that contains rotated coordinates
        """
        R = np.array(((np.cos(angle), -np.sin(angle)),
                      (np.sin(angle), np.cos(angle))))
        return R.dot(coords.transpose()).transpose()

    def local_occupancy(self, abs_pos, sensor_vals):
        '''
        abs_pos: List of lists that will contain the absolute x,y,theta coordinates of the robot
        sensor_vals:  List of lists that will contain the 7 sensor values
        '''

        abs_pos = np.array(abs_pos)

        local_occupancy_grids = [self.obstacles_pos_from_sensor_vals(x) for x in sensor_vals]

        global_map, overall_thymio_coords = [], []

        for (local_grid, pos) in zip(local_occupancy_grids, abs_pos):
            # Rotate the local occupancy grid
            rotated_grid = self.rotate(pos[2] - math.pi / 2, local_grid)
            rotated_thymio_coords = self.rotate(pos[2] - math.pi / 2, self.thymio_coords)

            # Translate the grid at the position of the Thymio
            obstacles_pos = rotated_grid + np.array([pos[0], pos[1]])
            abs_Thymio_coords = rotated_thymio_coords + np.array([pos[0], pos[1]])

            # Store position of the obstacles and Thymio in the global map
            global_map.append(obstacles_pos)
            overall_thymio_coords.append(abs_Thymio_coords)

        global_map = np.array(np.vstack(global_map))

        plt.figure(figsize=(10, 10))
        plt.plot(abs_pos[:, 0], abs_pos[:, 1])
        plt.scatter(global_map[:, 0], global_map[:, 1], color="r", s=10)

        plt.plot(np.array(abs_pos)[:, 0],
                 np.array(abs_pos)[:, 1], color="r", marker="o")

        for coords in overall_thymio_coords:
            plt.plot(coords[:, 0], coords[:, 1], color="g")

        plt.axis("equal");
        plt.show()


# Thymio Parameters

In [1]:
r = 2 # radius of the wheel (cm)
L = 9.5 # distance from wheel to wheel (cm)
v_max = 20 # cm/s
cruising = 150 
motor_speed_max = 500
omega_to_motor = (motor_speed_max*r)/v_max
pixel_to_cm = 1/6

# Controller Parameters

In [2]:
Kp = 10   # Proporcional gain
Ki = 0.1  # Integral gain
Kd = 1    # Derivative gain
error_sum = 0       # Integral error
previous_error = 0  # Derivative error

# Kalman Filter Parameters

In [None]:
ts = 0.1  # time step
A = np.array([[1, ts, 0, 0, 0, 0],  # state transition matrix
              [0, 1, 0, 0, 0, 0],
              [0, 0, 1, ts, 0, 0],
              [0, 0, 0, 1, 0, 0],
              [0, 0, 0, 0, 1, ts],
              [0, 0, 0, 0, 0, 1]])

P0 = GN.covariance(20, 5, 20, 5, 20, 5)  # state covariance matrix

Q = GN.covariance(0.01, 0.1, 0.01, 0.1, 0.001, 0.01)  # process noise covariance matrix

R = GN.covariance(1, 0.5, 1, 0.5, 0.1, 0.01)  # measurement covariance matrix

In [None]:
client = ClientAsync()
node = client.aw(client.wait_for_node())
client.aw(node.lock_node())

# Define global_nav object with class attributes
global_nav = GN(client, node, cruising, ts, Kp, Ki, Kd, error_sum, previous_error, r, L, omega_to_motor, R, Q, A, P0)


In [None]:
#---------------------------------------------------------------------------
#                                 Find shortest Path
#---------------------------------------------------------------------------


# Initial states
X = np.array([      [0],    # x_dot0
        [thymio_pose[0]],    # x0
                     [0],    # y_dot0
        [thymio_pose[1]],    # y0
                     [0],    # phi_dot0
        [thymio_pose[2]]])   # phi0 

path_cm = path*pixel_to_cm
vid = cv.VideoCapture(1+ cv.CAP_DSHOW)  
while(True):
      
    # Capture the video frame
    # by frame
    ret, img = vid.read()
    next_point = 0
    while (np.abs(X[1,0]-path_cm[-1][1]) > 1) and (np.abs(X[3,0]-path_cm[-1][0]) > 1):
        while (np.abs(X[3,0]-path_cm[next_point][1]) > 1) and (np.abs(X[1,0]-path_cm[next_point][0]) > 1):
            node = client.aw(client.wait_for_node())
            # Calculate desired angle
            phi_d = np.arctan((path[next_point][1]-X[3,0])/(path[next_point].x-X[1,0]))
            # Control action to make the Thymio follow the reference
            vr, vl = GN.PIDcontroller(phi_d, X[5,0])
            motor_left = vl*omega_to_motor
            motor_right = vr*omega_to_motor            
            # Saturation
            if motor_left > 500:
                motor_left = 500
            elif motor_right > 500:
                motor_right = 500
            elif motor_right < -500:
                motor_right = -500
            elif motor_left < -500:
                motor_left = -500            
            # Activate Obstacle avoidance if necessary
            # Define local_nav object with class attributes
            local_nav = LN(client, node, motor_left, motor_right)
            flag = local_nav.analyse_data()
            if flag == 1:
                task = local_nav.obstacle_avoidance() #trigger task depending on if flag detected
            else:
                adjust_speed = GN.motors(motor_left, motor_right)
                node.send_set_variables(adjust_speed)
                node.flush()
            # Update states
            motor_sensor = GN.get_sensor_data()
            speed_sensor = motor_sensor/GN.omega_to_motor      # cm/s
            thymio_xy_pix,thymio_pose,_ = GN.find_thymio(img, pixel_to_cm)
            if thymio_pose != 0: # if the camera can find the thymio
                X[1,0] = thymio_pose[0]                                          # x
                X[3,0] = thymio_pose[1]                                          # y
                X[5,0] = thymio_pose[2]                                          # phi
                X[4,0] = (r/L)*(speed_sensor[1]-speed_sensor[0])                 # phi_dot
                X[0,0] = (r/2)*(speed_sensor[0]+speed_sensor[1])*np.cos(X[5,0])  # x_dot
                X[2,0] = (r/2)*(speed_sensor[0]+speed_sensor[1])*np.sin(X[5,0])  # y_dot
                # Draw a point on the frame
                img = cv.circle(img, (thymio_xy_pix[0], thymio_xy_pix[1]), radius=10, color=(0, 0, 255), thickness=-1)
            else:
                X[4,0] = (r/L)*(speed_sensor[1]-speed_sensor[0])                 # phi_dot
                X[5,0] += X[4,0]*ts                                              # phi
                X[0,0] = (r/2)*(speed_sensor[0]+speed_sensor[1])*np.cos(X[5,0])  # x_dot
                X[2,0] = (r/2)*(speed_sensor[0]+speed_sensor[1])*np.sin(X[5,0])  # y_dot
                X[1,0] += X[0,0]*ts                                              # x
                X[3,0] += X[3,0]*ts                                              # y
            X = GN.kalman_filter(X)
            cv.imshow("Video", img)
            # the 'q' button is set as the
            # quitting button you may use any
            # desired button of your choice
            if cv.waitKey(1) & 0xFF == ord('q'):
                break
        next_point += 1
        cv.imshow("Video", img)
        # the 'q' button is set as the
        # quitting button you may use any
        # desired button of your choice
        if cv.waitKey(1) & 0xFF == ord('q'):
            break
    motor_left = 0
    motor_right = 0
    adjust_speed = GN.motors(motor_left, motor_right)
    node.send_set_variables(adjust_speed)
    node.flush()
    cv.imshow("Video", img)
    # the 'q' button is set as the
    # quitting button you may use any
    # desired button of your choice
    if cv.waitKey(1) & 0xFF == ord('q'):
        break







In [3]:
# from tdmclient import ClientAsync
# import Local_Nav as LN
# import Global_Nav as GN

# client = ClientAsync()
# node = client.aw(client.wait_for_node())

# # Define all necessary variables for what comes after

# # Define global_nav object with class attributes
# global_nav = GN.Global_Nav(client, node, cruising, ts, Kp, Ki, Kd, ...
#                            error_sum, previous_error, r, L, omega_to_motor, R, Q, A, X0, P0)

# # Define local_nav object with class attributes
# local_nav = LN.Local_Nav(client, node, motor_speed_left, motor_speed_right)


SyntaxError: invalid syntax (<ipython-input-3-feacd1b74e29>, line 9)

In [4]:
# #----------------------------------------------------
# #          FINAL MAIN STRUCTURE TRYOUT
# #---------------------------------------------------


# # #----------------------------------------------------
# # Computer Vision part and getting the shortest path
# #---------------------------------------------------
# ''' From there we should obtain a "control sequence" of where and when Thymio should turn. This shall later be compared to
# the current position of the Thymio (obtained by tracking) and we want to align the directional vector with the shortest path'''

# #--------------------
# # Global Navigation
# #--------------------

# # pos_goal = ...
# # path_vector = ... #generate_path_vector

# client.aw(node.lock_node()) #allows to update thymio variables as we loop later on
# colour_switch = local_nav.colour(3) #Make Thymio blue when in standby, because it's cool
# node.flush()

# While pos_goal != |current_thymio_pos - 5| #5cm error ?

#     node = client.aw(client.wait_for_node()) # Update states and sensor values at each iteration
#     flag = local_nav.analyse_data() # store a flag=1 if obstacle detected (red LEDS)
#     global_nav.update_states()
    
#     controlled_speed = global_nav.actuation(next_point)
#     node.send_set_variables(controlled_speed) #
    
#     #--------------------
#     # Local Navigation
#     #--------------------
 
#     if flag == 1:
#         task = local_nav.obstacle_avoidance() #trigger task depending on if flag detected
#     else:
#         pass
    

# node.flush()

SyntaxError: invalid syntax (<ipython-input-4-a2476ea36da0>, line 15)