# **Mobile Robotics Projet**

##### **Team 24 :**
- Yuheng He
- Alvaro Martinez-Vizmanos
- Yo-Shiun Cheng
- Lucas Jacques Louis Milesi

## **Presentation of the Project**
We had to carry out a project in which we used the five robotics components studied during the semester:

- Vision
- Global Navigation
- Motion Control
- Local Navigation
- Filtering

In this project, we had to guide our Thymio robot from a starting point to a finishing point. It needed to follow an optimal path and avoid obstacles. Additionally, it had to be capable of dodging dynamic obstacles (those introduced along its path). To achieve this, we used a camera positioned above the environment where the Thymio robot operates.

#### **Vision Part**
This camera provided all the vision data necessary to detect the environment's boundaries, determine the robot's position and angle of rotation, and identify obstacles.

#### **Global Navigation Part**
Using the vision data, we extracted a binary image of the environment in which obstacles and clear paths were clearly identified. Based on this image, we applied the A* Dijkstra algorithm to find the optimal path between the starting and finishing points, as introduced in Lesson 5 of the course.

#### **Motion Control Part**
To control the robot, we implemented a Differential Drive Motion Control system as studied in Lesson 1 of the course. Specifically, we decided to use an Astolfi Controller, which we deemed suitable for our project.

#### **Local Navigation Part**
To enable the robot to dodge dynamic obstacles, we implemented a neural network connecting the laser sensors to the motors, as covered in Lesson 3 of the course. Local obstacle avoidance was used in parallel with motion control.

#### **Filtering Part**
To locate our robot precisely, we used an Extended Kalman Filter (EKF). This approach helps reduce noise and inaccuracies in sensor measurements, providing a more reliable estimate of the robot's position and orientation over time. Additionally, the EKF corrects cumulative errors, such as those caused by wheel slippage or sensor drift, which could otherwise lead to deviations in the robot's trajectory.

<br>
To complete the project in the time available, we divided up the various tasks between the different members of the group. It was necessary to have good communication between the different parts, as they are all interconnected.


## **Environment**

Our robot will evolve on a map with a white background on which Aruco markers will be placed to delimit the space in which the robot will evolve as well as to calibrate the camera and detect the start and goal points.

We created an example map that we will follow throughout this report :
<br>

<img src="images/Start_Goal.jpg"/>

<br>

### **Libraries required for programs to work properly**

In [None]:
# Import Libraries

import asyncio
import cv2
import numpy as np
from heapq import heappush, heappop
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import time

## **Vision Part**

#### **Constants used throughout the vision part**

In [3]:
# Size to be adjusted according to the size of the environment
Size_img_x = 600 # Size image height
Size_img_y = 1000 # Size image width

Camera = 0 # Camera used

**WARNING** It is important to press the 'L' key (lower case) if the code uses vision capture as there is a risk of killing the kernel. 

Initially, we decided to base our vision system on color markers with different shapes. However, the problem was that the detection reliability was not optimal and was highly sensitive to the room's lighting conditions. Therefore, we chose to switch to using Aruco markers, which are much easier to detect.

#### **Creation Aruco marks**

We decided to use the Aruco marks to detect the limitations of our environment, the start and finish points and the real-time information from our robot (position and angle of rotation). Initially, we did all the detection based on the detection of coloured dots, but we noticed that this type of detection was much less reliable than detection with aruco markers.

In [None]:
def generate_aruco_marker(marker_id, side_pixels, output_filename):
    """
    Function to create aruco markers 
    
    Input : marker_id -> Marker ID
          : side_pixels -> Pixel size of the marker
          : output_filename -> Where the image will be saved
    Output : An image file containing the aruco marker created
    """
    # Load the Aruco dictionary to identify Aruco markers
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    # Creation of the image
    marker_image = np.zeros((side_pixels, side_pixels), dtype=np.uint8)
    # Generation of the image corresponding to the desired id
    cv2.aruco.generateImageMarker(aruco_dict, marker_id, side_pixels, marker_image, 1)

    # Save Image
    cv2.imwrite(output_filename, marker_image)
    print(f"Aruco marker (ID={marker_id}) saved in {output_filename}")

# Choose the id, marker size and image name
generate_aruco_marker(marker_id=6, side_pixels=200, output_filename="aruco_marker_6.png")

### **Orthogonal Projection :**

Before starting the vision for our robot, we need to define the environment in which it will navigate. Our camera needs to be able to detect the boundaries of the environment, and be able to see the map perfectly from above, to minimize errors. So we decided to make an orthogonal projection using homography to get a map seen from above.

We have defined the Aruco marker to delimit our environment: 
- ID 3: Top left corner
- ID 4 : Top right corner
- ID 5 : Bottom left corner
- ID 6 : Bottom right corner

We defined the environment in a image with these coordinates :
<br>

<img src="images/Image_definition.jpg"/>

<br>

In [None]:
def detect_aruco_corners(image):
    """
    Function to detect the limits of the environment in which the robot will navigate by 4 Aruco markers from ID 3 to 6 
    (ID 3 : top left corner, ID 4 : top right corner, ID 5 : bottom left corner, ID 6 : bottom right corner).

    Input : image -> Input image (from camera)
    Output : Corner dictionary for specified IDs
    """
    # Load the Aruco dictionary and detector
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)

    # Detect markers
    corners, ids, _ = detector.detectMarkers(image)

    # Filter for markers with IDs 3, 4, 5, and 6
    required_ids = {3, 4, 5, 6}
    if ids is not None:
        # Map detected IDs to their corners if the ID is in the required list
        id_to_corners = {id[0]: corner[0] for id, corner in zip(ids, corners) if id[0] in required_ids}
        if id_to_corners:
            return id_to_corners
        else:
            print("None of the required markers (3, 4, 5, 6) were detected.")
    else:
        print("No Aruco marker detected.")

    return None

def calculate_homography(id_to_corners, output_size=(Size_img_y, Size_img_x)):
    """
    Function to calculate the homography matrix, estimating missing corners if some markers are occluded.

    Input: id_to_corners -> Dictionary of detected marker corners
           output_size -> Size of the projected image
    Output: homography_matrix -> Homography matrix
    """
    # Initialize corner points (corners of the environment)
    corner_points = np.zeros((4, 2), dtype=np.float32)
    detected_ids = list(id_to_corners.keys())
    
    # Estimate missing corners
    if 3 in detected_ids:  # ID 3 : top left corner
        corner_points[0] = id_to_corners[3][0]
    else:
        # Estimate : Opposite corner to the bottom-right corner (ID 6)
        if 6 in detected_ids and 4 in detected_ids:
            corner_points[0] = id_to_corners[4][1] - (id_to_corners[6][2] - id_to_corners[4][1])
        else:
            raise ValueError("Unable to estimate the top-left corner")
    
    if 4 in detected_ids:  # ID 4 : top right corner
        corner_points[1] = id_to_corners[4][1]
    else:
        # Estimate : Opposite corner to the bottom-left corner (ID 5)
        if 3 in detected_ids and 5 in detected_ids:
            corner_points[1] = id_to_corners[3][0] + (id_to_corners[5][3] - id_to_corners[3][0])
        else:
            raise ValueError("Unable to estimate the top-right corner")
    
    if 6 in detected_ids:  # ID 6 : bottom right corner
        corner_points[2] = id_to_corners[6][2]
    else:
        # Estimate : Opposite corner to the top-left corner (ID 3)
        if 5 in detected_ids and 4 in detected_ids:
            corner_points[2] = id_to_corners[5][3] + (id_to_corners[4][1] - id_to_corners[5][3])
        else:
            raise ValueError("Unable to estimate the bottom-right corner")
    
    if 5 in detected_ids:  # ID 5 : bottom left corner
        corner_points[3] = id_to_corners[5][3]
    else:
        # Estimate : Opposite corner to the top-right corner (ID 4)
        if 3 in detected_ids and 6 in detected_ids:
            corner_points[3] = id_to_corners[3][0] + (id_to_corners[6][2] - id_to_corners[3][0])
        else:
            raise ValueError("Unable to estimate the bottom-left corner")
    
    # Target points (rectangle in the projected space)
    dst_points = np.array([
        [0, 0],  # Top-left of the output
        [output_size[0] - 1, 0],  # Top-right of the output
        [output_size[0] - 1, output_size[1] - 1],  # Bottom-right of the output
        [0, output_size[1] - 1]  # Bottom-left of the output
    ], dtype=np.float32)
    
    # Calculate the homography matrix
    homography_matrix, _ = cv2.findHomography(corner_points, dst_points)
    return homography_matrix

def apply_homography(image, homography_matrix, output_size):
    """
    Function to apply the homography matrix to obtain an orthogonal projection of the environment

    Input : image -> Input image (from camera)
          : homography_matrix -> Homography matrix
          : output_size -> Size of projected image
    Output : projected_image -> Projected image
    """
    projected_image = cv2.warpPerspective(image, homography_matrix, output_size)
    return projected_image

cap = cv2.VideoCapture(Camera)

while True:
    success, frame = cap.read()
    if not success:
        break

    # Detection of Aruco markers for map delimitation
    id_to_corners = detect_aruco_corners(frame)

    # If we detected Aruco markers for map delimitation
    if id_to_corners :
        try:
            # Environment projection operation
            homography_matrix = calculate_homography(id_to_corners)
            output_size = (Size_img_y, Size_img_x)
            projected_frame = apply_homography(frame, homography_matrix, output_size)
            # Afficher l'image
            cv2.imshow("Calibration with Aruco", projected_frame)

        except ValueError as e:
            print(f"Error with projected image : {e}")

    # Stop video capture by pressing l
    if cv2.waitKey(1) & 0xFF == ord('l'):
        break

cap.release()
cv2.destroyAllWindows()

# We saved the homography matrix for all the others programs

We have this result (we've masked the elements pasted on our map): 
<br>

<img src="images/Homography.jpg"/>

<br>

### **Calibration camera :**

Now we need to calibrate our camera so that we know the ratio between a pixel and the actual distance in the environment, in order to guide our robot as effectively as possible.

We therefore used an aruco marker (ID = 2) of known size (90 mm square) to perform our calibration.

In [None]:
def calibrate_camera_with_aruco(img, marker_size_mm=90):
    """
    Function to calibrate camera (pixel/mm). We calibrate the camera using a 9cm aruco marker to find out the actual size of the environment.
    
    Input : img -> Input image (projected image) 
          : marker_size_mm = 90mm -> Size of Aruco marker
    Output : pixel_to_mm -> Distance from the environment (mm) for a pixel in the image
    """
    # Loading the Aruco marker library
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)

    # Detecting the Aruco marker
    corners, ids, _ = detector.detectMarkers(img)

    if ids is not None:
        for corner, marker_id in zip(corners, ids):
            if marker_id[0] == 2: # Only the Id = 2 marker is used for calibration
                # Extraction of marker coordinates
                points = corner[0]

                # Calculate dimensions in pixels
                width_px = np.linalg.norm(points[1] - points[2])  # Size y
                height_px = np.linalg.norm(points[0] - points[1])  # Size x

                # Average dimensions
                avg_size_px = (width_px + height_px) / 2

                # Calculation of the actual distance from the environment (mm) for a pixel in the image
                pixel_to_mm = marker_size_mm / avg_size_px

                return pixel_to_mm

    return None

# Video Capture
cap = cv2.VideoCapture(Camera)

delay_between_calibrations = 5 # Time beetween 2 measurements (in seconds)
last_calibration_time = 0 # Initial time (in seconds)

while True:
    success, frame = cap.read()
    if not success:
        break

    current_time = time.time() 
    output_size = (Size_img_y, Size_img_x)
    projected_frame = apply_homography(frame, homography_matrix, output_size)
    # Calculation of the actual distance from the environment (mm) for a pixel in the image
    # Measurements are taken every 5 seconds to avoid saturating the output
    if current_time - last_calibration_time >= delay_between_calibrations:
        pixel_to_mm = calibrate_camera_with_aruco(projected_frame, marker_size_mm=90)
        last_calibration_time = current_time
        # If we have a pixel/mm value
        if pixel_to_mm:
            print(f"Calibration : mm/pixel : {pixel_to_mm:.4f}")
    # Afficher l'image
    cv2.imshow("Calibration with Aruco", projected_frame)

    # Stop video capture by pressing l
    if cv2.waitKey(1) & 0xFF == ord('l'):
        break

cap.release()
cv2.destroyAllWindows()

<br>

<img src="images/Calibration.jpg"/>

<br>

### **Obstacle detection :** 

Now we need to detect obstacles in our environment to create areas where the robot cannot go. We assume that : 
- the obstacles are black
- the aruco markers that delimit the map will be considered as obstacles

Next, we will perform morphological transformations on our image (dilation, erosion) in order to remove potential noise and to enlarge the obstacles by at least half the width of the robot so that the robot does not collide with them.

In [58]:
def obstacles_detection(img, threshold, pixel_to_mm):
    """
    Function to detect obstacles in the environment so that they can be taken into account in the matrix supplied to the A* Dijkstra algorithm.
    
    Input : img -> Input image (projected image) 
          : threshold -> threshold to detect more or less bright objects in the image
          : pixel_to_mm -> Distance from the environment (mm) for a pixel in the image
    Output : erode_img -> Image projected after the various morphological transformations
           : matrix -> Matrix of the image to be supplied to the A* dijkstra algorithm
    """
    # Conversion Grayscale
    gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Threshold to detect dark obstacles (we can adjust the threshold to detect more or less obstacles -> it depends on the darker or lighter colour of the obstacles)
    _, binary_image = cv2.threshold(gray_img, threshold, 255, cv2.THRESH_BINARY)
    # We're adding pixels to the edges of the image to indicate obstacles so that my robot doesn't leave the environment
    binary_image = cv2.copyMakeBorder(binary_image, 1, 1, 1, 1, cv2.BORDER_CONSTANT, value=0)

    # We apply a structuring element which is a circle with a diameter of 5 pixels to remove noise
    denoising_kernel_size = 5
    denoising_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (denoising_kernel_size, denoising_kernel_size))
    # We apply a structuring element which is a circle with a diameter of half width of robot transform in pixels to enlarge obstacles to the width of the robot
    kernel_size = int(85 / pixel_to_mm)  # (65 mm (half width of robot) + 20 mm (margin)) / Ratio (mm/pixel)
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))

    # Apply closing (to remove noise) and erosion (to adapt ostacles to width of robot)
    closing_img = cv2.morphologyEx(binary_image, cv2.MORPH_CLOSE, denoising_kernel)
    erode_img = cv2.erode(closing_img, kernel, iterations=1)
    # We remove the pixels added for obstacles on the edges of the image
    erode_img = erode_img[1:-1, 1:-1]

    # Creation of matrix to find the optimal path (for the A* Djikstra Algorithm)
    matrix_init = (erode_img / 255).astype(int) 
    # We replace the 0‘s with -1’s (to detect obstacles) and the 1‘s with 0’s (to detect the free environment).
    matrix = np.where(matrix_init == 0, -1, 0)

    return erode_img, matrix

# Video Capture
cap = cv2.VideoCapture(Camera)

# Capturing an image
success, frame = cap.read()

output_size = (Size_img_y, Size_img_x)
projected_frame = apply_homography(frame, homography_matrix, output_size)

# Obstacles detection
erode_img, matrix = obstacles_detection(projected_frame, 60, pixel_to_mm)
cv2.imshow("Binary image", erode_img)


cv2.waitKey(0)
cv2.destroyAllWindows()

Before Morphological Transfomation :
<br>

<img src="images/Map_with_obstacles.jpg"/>

<br>
After Morphological Transfomation : 
<br>

<img src="images/Binary_map.jpg"/>

<br>

### **Start and Goal Point Detection :** 

We now need to detect the robot's start and end points. To do this, we'll use the aruco markers again, and more specifically the ID = 0 marker for the start point and the ID = 1 marker for the end point.

In [None]:
def detect_start_and_goal_aruco(img):
    """
    Function to detect start and goal point in the environment
    
    Input : img -> Input image (projected image) 
    Output : img -> Image with markers indications
           : start_position -> Coordinates of start point (pixels)
           : goal_position -> Coordinates of goal point (pixels)
    """
    # Load the Aruco dictionary to identify Aruco markers
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)
    
    # Detection Aruco Marks
    corners, ids, _ = detector.detectMarkers(img)
    
    start_position = None
    goal_position = None
    
    # Detection all Aruco Marks
    if ids is not None:
        for i, marker_id in enumerate(ids.flatten()):
            x_center = int(corners[i][0][:, 0].mean())
            y_center = int(corners[i][0][:, 1].mean())

            # Detection Start point (ID 0)
            if marker_id == 0:
                start_position = (y_center, x_center)
                cv2.circle(img, (x_center, y_center), 5, (255, 0, 0), -1)
                cv2.putText(img, "Start", (x_center - 20, y_center - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
            
            # Detection Goal point (ID 1)
            elif marker_id == 1:
                goal_position = (y_center, x_center)
                cv2.circle(img, (x_center, y_center), 5, (0, 255, 0), -1)
                cv2.putText(img, "Goal", (x_center - 20, y_center - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
    
    return img, start_position, goal_position

# Video capture
cap = cv2.VideoCapture(Camera)

delay_between_measurements = 5 # Time beetween 2 measurements (in seconds)
last_measurement_time = 0 # Initial time (in seconds)

while True:
    success, frame = cap.read()
    if not success:
        break
    
    # Detection of Aruco markers for map delimitation
    current_time = time.time() 
        
    output_size = (Size_img_y, Size_img_x) 
    projected_frame = apply_homography(frame, homography_matrix, output_size)

    # Detection start and goal Aruco marks
    processed_img, start_position, goal_position = detect_start_and_goal_aruco(projected_frame)
    
    if current_time - last_measurement_time >= delay_between_measurements:
        last_measurement_time = current_time
        # Print position
        print("Start position :", start_position)
        print("Goal position :", goal_position)

    # Print image
    cv2.imshow("Detection ArUco Start and Goal", processed_img)
    
    # Stop video capture by pressing l
    if cv2.waitKey(1) & 0xFF == ord('l'):
        break

    
cap.release()
cv2.destroyAllWindows()

<br>

<img src="images/Start_Goal.jpg"/>

<br>

### **Detection position and angle of rotation of the robot :** 

Now we need to detect the robot's position and angle of rotation in real time in order to provide information to the Kalman filter. Using the aruco marker ID = 2, we can extract this information in real time

In [60]:
def wrap_angle(angle):
    """
    Function to normalize an angle to the interval [-π, π].
    
    Input : angle -> Angle to be normalized (in radians)
    Output : Normalized angle within the range [-π, π] (in radians)
    """
    return (angle + np.pi) % (2 * np.pi) - np.pi

def detect_aruco_marker(img, target_id=2):
    """
    Function to detect the position and angle of rotation of the Aruco ID = 2 marker positioned on the robot
    
    Input : img -> Input image (projected image) 
          : target_id=2 -> Id number of the Aruco marker
    Output : image -> Image with marker indications
           : (center_x, center_y) -> Coordinates of the robot (pixels)
           : angle -> Angle of rotation of the robot (rad)
    """

    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)

    corners, ids, _ = detector.detectMarkers(img)
    if ids is not None:
        for corner, marker_id in zip(corners, ids):
            if marker_id[0] == target_id:
                points = corner[0]
                center_x = int(points[:, 0].mean())
                center_y = int(points[:, 1].mean())
                # Compute angle then normalization between [-π, π]
                vector = points[1] - points[0]
                angle = np.atan2(vector[1], vector[0])
                return img, (center_x, center_y), wrap_angle(angle)
    return img, None, None

Our vision is now initialized and we have a clear knowledge of the environment in which the robot will evolve.

## **Global Navigation Part**

### **Find the Optimal Path : A * Dijkstra Algorithm**

To find the optimal path, we decided to re-use the algorithm we had seen in class and in the exercise: the A* dijkstra algorithm. We modified it and the heuristic function so that it could take diagonal displacements into account.

In [None]:
def heuristic(a, b):
    """
    Function to calculate the Euclidean distance between two points for the heuristic function.
    
    Input : a -> Coordinates of the first point 
          : b -> Coordinates of the second point 
    Output : Euclidean distance between the two points 
    """
    # Euclidean distance
    return np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2)

def a_star_search(map_grid, start, goal):
    """
    Function to perform A* search algorithm to find the optimal path from a start point to a goal point.
    
    Input : map_grid -> 2D array representing the environment (-1 for obstacles, 0 for free space)
          : start -> Starting coordinates from vision
          : goal -> Goal coordinates from vision
    Output : path -> List of coordinates representing the optimal path 
           : explored -> Set of nodes that were explored during the search 
           : operation_count -> Number of operations performed during the search 
    """
    # Initialize the open set as a priority queue and add the start node
    open_set = []
    heappush(open_set, (heuristic(start, goal), 0, start))  # (f_cost, g_cost, position)

    # Initialize the came_from dictionary
    came_from = {}
    # Initialize g_costs dictionary with default value of infinity and set g_costs[start] = 0
    g_costs = {start: 0}
    # Initialize the explored set
    explored = set()
    operation_count = 0

    while open_set:
        # Pop the node with the lowest f_cost from the open set
        current_f_cost, current_g_cost, current_pos = heappop(open_set)

        # Add the current node to the explored set
        explored.add(current_pos)

        # For directly reconstruct path
        if current_pos == goal:
            break

        # Get the neighbors of the current node (up, down, left, right)
        neighbors = [
            (current_pos[0] - 1, current_pos[1]),  # Up
            (current_pos[0] + 1, current_pos[1]),  # Down
            (current_pos[0], current_pos[1] - 1),  # Left
            (current_pos[0], current_pos[1] + 1),  # Right
            (current_pos[0]-1, current_pos[1]-1),  # Up-left
            (current_pos[0]-1, current_pos[1]+1),  # Up-right
            (current_pos[0]+1, current_pos[1]+1),  # Down-right
            (current_pos[0]+1, current_pos[1]-1)   # Down-left
        ]

        for neighbor in neighbors:
            # Check if neighbor is within bounds and not an obstacle
            if (0 <= neighbor[0] < map_grid.shape[0]) and (0 <= neighbor[1] < map_grid.shape[1]):
                if map_grid[neighbor[0], neighbor[1]] != -1 and neighbor not in explored:
                    # Calculate tentative_g_cost
                    # if diagonal --> cost = sqrt(2)
                    if (neighbor==[current_pos[0]-1, current_pos[1]-1] or neighbor==[current_pos[0]-1, current_pos[1]+1] or neighbor==[current_pos[0]+1, current_pos[1]+1] or neighbor==[current_pos[0]+1, current_pos[1]-1]):
                        tentative_g_cost = current_g_cost + np.sqrt(2) + map_grid[neighbor[0], neighbor[1]]
                    # else --> cost = 1
                    else :
                        tentative_g_cost = current_g_cost + 1 + map_grid[neighbor[0], neighbor[1]]

                    # If this path to neighbor is better than any previous one
                    if neighbor not in g_costs or tentative_g_cost < g_costs[neighbor]:
                        # Update came_from, g_costs, and f_cost
                        came_from[neighbor] = current_pos
                        g_costs[neighbor] = tentative_g_cost
                        f_cost = tentative_g_cost + heuristic(neighbor, goal)

                        # Add neighbor to open set
                        heappush(open_set, (f_cost, tentative_g_cost, neighbor))
                        operation_count += 1

    # Reconstruct path
    if current_pos == goal:
        path = []
        while current_pos in came_from:
            path.append(current_pos)
            current_pos = came_from[current_pos]
        path.append(start)
        return path[::-1], explored,operation_count
    else:
        # If we reach here, no path was found
        return None, explored,operation_count

def display_map(map_grid, path, start, goal, explored):
    """
    Function to visualize the map grid, the explored nodes, and the optimal path.
    
    Input : map_grid -> 2D array representing the environment (-1 for obstacles, 0 for free space)
          : path -> List of coordinates representing the optimal path 
          : start -> Starting coordinates from vision 
          : goal -> Goal coordinates from vision
          : explored -> Set of nodes explored during the search 
    """
    cmap = ListedColormap(['white', 'black', 'blue', 'green', 'red', 'grey'])
    map_display = np.zeros_like(map_grid, dtype=object)

    # Assign colors based on the map grid values
    map_display[map_grid == -1] = 'black'  # Obstacles
    map_display[map_grid == 0] = 'white'   # Free space

    for position in explored:
        if map_display[tuple(position)] == 'white':
            map_display[tuple(position)] = 'grey'  # Explored cells

    # Visualize the path
    for position in path:
        if map_display[position[0], position[1]] in ['white', 'grey']:
            map_display[position[0], position[1]] = 'blue'  # Path

    map_display[start[0], start[1]] = 'green'  # Start
    map_display[goal[0], goal[1]] = 'red'      # Goal

    # Convert color names to numbers for plotting
    color_mapping = {'white': 0, 'black': 1, 'blue': 2, 'green': 3, 'red': 4, 'grey': 5}
    map_numeric_display = np.vectorize(color_mapping.get)(map_display)
    fig, ax = plt.subplots(figsize=(72, 128))
    ax.imshow(map_numeric_display, cmap=cmap)
    ax.set_xticks(np.arange(-0.5, map_grid.shape[1], 1), minor=True)
    ax.set_yticks(np.arange(-0.5, map_grid.shape[0], 1), minor=True)
    ax.grid(which='minor', color='gray', linestyle='-', linewidth=0.5)
    ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False)
    ax.set_title('a_star_search Visualization')
    plt.show()


def classify_segments_as_vectors(path):
    """
    Segments the path into continuous vectors based on direction (horizontal, vertical, diagonal).
    
    Input : path -> List of coordinates representing the optimal path 
    Output : vectors -> List of tuples containing the direction, start, and end points of each segment [(direction, start_point, end_point), ...]
    """
    if len(path) < 2:
        return []

    vectors = [] 
    start_point = path[0] 

    # Identify direction beween 2 points
    def get_direction(p1, p2):
        dx = p2[0] - p1[0]
        dy = p2[1] - p1[1]
        if dx == 0:  # Horizontal
            return "horizontal"
        elif dy == 0:  # Vertical
            return "vertical"
        elif abs(dx) == abs(dy):  # Diagonal
            return "diagonal"
        else:
            return "other"

    # Identify continous segment
    current_direction = get_direction(path[0], path[1])

    for i in range(1, len(path)):
        direction = get_direction(path[i - 1], path[i])
        if direction != current_direction:
            # End of the actual segment
            vectors.append((current_direction, start_point, path[i - 1]))
            # New segment
            start_point = path[i - 1]
            current_direction = direction

    # Add the last segment
    vectors.append((current_direction, start_point, path[-1]))

    return vectors

def A_Dijkstra(Map, SearchStart, SearchGoal):
    """
    Function to extract the data needed to control the robot to reach the goal point
    
    Input : Map -> Matrix of our environment (-1 for obstacles and 0 for clear path)
          : SearchStart -> Start point coordinates
          :  SearchGoal -> Goal point coordinates
    Output : start_points -> Start points for each vector in the trajectory
           : goal_points -> Goal points for each vector in the trajectory
           : trajectory -> Set of trajectory vectors
    """
    path, explored, operation_count = a_star_search(Map, SearchStart, SearchGoal)
    # Display the result
    if path:
        print("Path found:")
        print(f"Number of operations: {operation_count}")
        display_map(Map, path, SearchStart, SearchGoal, explored)
    else:
        print("No path found.")

    vectors = classify_segments_as_vectors(path)
    start_points = []
    goal_points = []
    trajectory = []

    # Print results
    for direction, start, end in vectors:
        print(f"Direction : {direction}, Début (pixels) : {start}, Fin (pixels) : {end} ")
        print("Début (mm): (" + str(start[0]*pixel_to_mm)+";" + str(start[1]*pixel_to_mm) + "), Fin (mm) : (" + str(end[0]*pixel_to_mm) + ";" + str(end[1]*pixel_to_mm) + ")")
        print("Vector (pixels) : (" + str(end[0]-start[0]) + ", " + str(end[1]-start[1]) + ")")
        print("Vector (mm) : (" + str((end[0]-start[0])*pixel_to_mm) + ", " + str((end[1]-start[1])*pixel_to_mm) + ")")
        
        # Table containing the values of start points of each vector (according to x and y) which allow the robot to be guided to the global goal point
        start_points = start_points + [[start[0]*pixel_to_mm, start[1]*pixel_to_mm]]
        # Table containing the values of goal points of each vector (according to y and x)
        goal_points = goal_points + [[end[1]*pixel_to_mm, end[0]*pixel_to_mm]]
        # Table containing the values of vectors according to x and y
        trajectory = trajectory + [[(end[0]-start[0])*pixel_to_mm, (end[1]-start[1])*pixel_to_mm]]

    return start_points, goal_points, trajectory

# Application of the A* Dijkstra algorithm
start_points, goal_points, trajectory = A_Dijkstra(matrix, start_position, goal_position)



<br>

<img src="images/Dijkstra.png"/>

<br>

We now have our Global Navigation and our vision ready.

## **Motion Control Part**

To control our robot and ensure it reaches the goal point, we implemented Astolfi's Differential Drive Motion Control system. This approach computes velocity commands based on the robot's position and orientation relative to the target. Below is a breakdown of the theory and implementation.

### Kinematic Model
The kinematic model relates the robot's velocity ($v$, $\omega$) to its position ($\rho$) and angles ($\alpha$, $\beta$) over time.

<div style="display: flex; justify-content: center; align-items: center; gap: 15px;">
    <figure style="text-align: center;">
        <img src="images/Motion_1.jpg" alt="Kinematic Model" style="max-width: 40%;"/>
        <figcaption>Kinematic Model: Relation between velocity, position, and angles.</figcaption>
    </figure>
    <figure style="text-align: center;">
        <img src="images/Motion_2.jpg" alt="Control Law" style="max-width: 100%;"/>
        <figcaption>Control Law: Computes velocity commands.</figcaption>
    </figure>
</div>

### Control Law
This law computes the robot’s linear ($v$) and angular ($\omega$) velocities based on feedback control gains and the robot's state.

<div style="display: flex; justify-content: center;">
    <figure style="text-align: center;">
        <img src="images/Motion_3.jpg" alt="Closed-Loop System" style="max-width: 80%;"/>
        <figcaption>Closed-Loop System: Ensures convergence to the goal.</figcaption>
    </figure>
</div>

### Stability Conditions
The system is exponentially stable under the following conditions (proven using Lyapunov theory):
- $k_\rho > 0$
- $k_\alpha < 0$
- $k_\beta < 0$
- $k_\rho > |k_\alpha \cdot k_\beta|$

<div style="display: flex; justify-content: center;">
    <figure style="text-align: center;">
        <img src="images/Motion_4.jpg" alt="Stability Conditions" style="max-width: 60%;"/>
        <figcaption>Stability Conditions: Exponential stability of the system.</figcaption>
    </figure>
</div>

### Final System Overview

<div style="display: flex; justify-content: center;">
    <figure style="text-align: center;">
        <img src="images/Motion_5.jpg" alt="Final System" style="max-width: 85%;"/>
        <figcaption>Final System: Integrated control setup overview.</figcaption>
    </figure>
</div>

### Implementation in Code
The astolfi_control function calculates motor speed commands using the kinematic model. It computes:
- *$\rho$*: Distance to the target.  
- *$\alpha$*: Angle difference between the robot's heading and the goal.  
- *$\beta$*: Remaining orientation difference to align with the goal.

With tuned control gains ($k_\rho = 0.5$, $k_\alpha = 3$, $k_\beta = -0.5$), we calculate the robot’s velocities and convert them into motor commands for the left and right wheels.

In [62]:
def astolfi_control(robot_position, robot_orientation, goal_position):
    """
    Implements Astolfi's controller to compute the speed commands for the robot to move towards a goal point.
    
    Input : robot_position -> Current position of the robot as (x, y) coordinates 
          : robot_orientation -> Current orientation of the robot (in radians)
          : goal_position -> Desired goal position as (x, y) coordinates 
    Output : spLeft -> Speed command for the left motor 
           : spRight -> Speed command for the right motor 
    """
    k_rho, k_alpha, k_beta = 0.3, 1.5, -0.5
    delta_x = goal_position[0] - robot_position[0]
    delta_y = goal_position[1] - robot_position[1]

    rho = np.sqrt(delta_x**2 + delta_y**2)
    alpha = (wrap_angle(np.atan2(delta_y, delta_x) - robot_orientation))*(-1)
    beta = wrap_angle(-alpha - robot_orientation)

    v = k_rho * rho
    omega = k_alpha * alpha + k_beta * beta
    spLeft = int(v - omega * 10)
    spRight = int(v + omega * 10)
    return spLeft, spRight

## **Local Navigation Part**

The robot's navigation system is designed to handle local obstacles introduced dynamically. Using weighted proximity sensor readings, the system adjusts motor speeds to avoid collisions. Symmetrical sensor weights ensure balanced reactions to obstacles on either side, while the central sensor slows the robot for frontal obstacles. This approach enables smooth navigation in the presence of unexpected objects.

In [78]:
obstSpeedGain = [4, 3, -1, -3, -4]  # Obstacle detection weights

def obstacle_avoidance(prox_values):
    """
    Function to compute corrections to motor speeds to avoid obstacles based on proximity sensor readings (prox.horizontal).
    
    Input : prox_values -> List of proximity sensor values 
    Output : spLeft -> Updated speed command for the left motor 
           : spRight -> Updated speed command for the right motor 
    """
    global spLeft, spRight
    for i in range(5):
        spLeft += prox_values[i] * obstSpeedGain[i] // 100
        spRight += prox_values[i] * obstSpeedGain[4 - i] // 100
    return spLeft, spRight

## **Filtering Part**

The filtering part explains how the Thymio robot identifies itself on the map. Since our application requires a smooth and contunuous state estimation, and Extended Kalman Filter (EKF) is an algorithm with relativlely high computational efficiency that a more reliable pose estimation by fusing the results from our model and sensor. We utilize EKF to compute the pose of our Thymio robot as our system is non-linear and the noise is approximately Gaussian. The EKF algorithm is defined as following:

### **EKF Algortihm**
#### Parameters for EKF

1. **Wheel Base ($ b $)**: Distance between the wheels.
   $$
   b = 95 \ \text{mm}
   $$

2. **Time Step ($ \Delta t $)**: The interval between each prediction.
   $$
   \Delta t = 0.1 \ \text{seconds}
   $$

3. **Process Noise Covariance ($ Q $)**: (To be derived in the next section)

   $$
   Q = \begin{bmatrix}
   \sigma_{\text{x position, odometry}}^2 & 0 & 0 & 0 \\
   0 & \sigma_{\text{y position, odometry}}^2 & 0 & 0 \\
   0 & 0 & \sigma_{\theta\text{, odometry}}^2 & 0 \\
   0 & 0 & 0 & \sigma_{\text{velocity, odometry}}^2 \\
   \end{bmatrix}
   $$


4. **Measurement Noise Covariance ($ R $)**: (To be derived in the next section)
   $$
   Q = \begin{bmatrix}
   \sigma_{\text{x position, camera}}^2 & 0 & 0 & 0 \\
   0 & \sigma_{\text{y position, camera}}^2 & 0 & 0 \\
   0 & 0 & \sigma_{\theta\text{, camera}}^2 & 0 \\
   0 & 0 & 0 & \sigma_{\text{velocity, camera}}^2 \\
   \end{bmatrix}
   $$
   
5. **Initial State Estimate ($ \hat{X}_0 $)**:
   $$
   \hat{X}_0 = \begin{bmatrix}
   x_0 \\
   y_0 \\
   \theta_0 \\
   v_0 \\
   \end{bmatrix} = \begin{bmatrix}
   0 \\
   0 \\
   0 \\
   0 \\
   \end{bmatrix}
   $$

6. **Initial Covariance ($ P_0 $)**:
   $$
   P_0 = I_4 = \begin{bmatrix}
   1 & 0 & 0 & 0 \\
   0 & 1 & 0 & 0 \\
   0 & 0 & 1 & 0 \\
   0 & 0 & 0 & 1 \\
   \end{bmatrix}
   $$

---

#### EKF Prediction Step

##### Inputs:

- Previous state estimate: $ \hat{X}_{k-1} = \begin{bmatrix} x_{k-1} \\ y_{k-1} \\ \theta_{k-1} \\ v_{k-1} \end{bmatrix} $
- Previous covariance: $ P_{k-1} $
- Left wheel velocity: $ v_l $
- Right wheel velocity: $ v_r $

##### Calculations:

1. **Compute Linear and Angular Velocities**:

   - **Linear Velocity ($ v_k $)**:
     $$
     v_k = \frac{v_r + v_l}{2}
     $$

   - **Angular Velocity ($ \omega_k $)**:
     $$
     \omega_k = \frac{v_r - v_l}{b}
     $$

2. **Compute the Jacobian of the Motion Model ($ F_k $)**:

   $$
   F_k = \begin{bmatrix}
   1 & 0 & -v_k \Delta t \sin(\theta_k) & \Delta t \cos(\theta_k) \\
   0 & 1 & v_k \Delta t \cos(\theta_k) & \Delta t \sin(\theta_k) \\
   0 & 0 & 1 & \Delta t \\
   0 & 0 & 0 & 1 \\
   \end{bmatrix}
   $$

   Where $ \theta_k = \theta_{k-1} $.

3. **Predict the Next State ($ \hat{X}_{k|k-1} $)**:

   $$
   \hat{X}_{k|k-1} = \begin{bmatrix}
   x_{k|k-1} \\
   y_{k|k-1} \\
   \theta_{k|k-1} \\
   v_{k|k-1} \\
   \end{bmatrix} = \begin{bmatrix}
   x_{k-1} + v_k \Delta t \cos(\theta_k) \\
   y_{k-1} + v_k \Delta t \sin(\theta_k) \\
   \theta_{k-1} + \omega_k \Delta t \\
   v_k \\
   \end{bmatrix}
   $$


4. **Predict the Next Covariance ($ P_{k|k-1} $)**:

   $$
   P_{k|k-1} = F_k P_{k-1} F_k^\top + Q
   $$

---

#### EKF Update Step

##### Inputs:

- Predicted state estimate: $ \hat{X}_{k|k-1} $
- Predicted covariance: $ P_{k|k-1} $
- Measurement vector: $ Z_k $

##### Calculations:

1. **Measurement Matrix ($ H_k $)**:

   Since the measurements directly observe the state variables, $ H_k $ is an identity matrix:

   $$
   H_k = I_4 = \begin{bmatrix}
   1 & 0 & 0 & 0 \\
   0 & 1 & 0 & 0 \\
   0 & 0 & 1 & 0 \\
   0 & 0 & 0 & 1 \\
   \end{bmatrix}
   $$

2. **Compute the Kalman Gain ($ K_k $)**:

   $$
   K_k = P_{k|k-1} H_k^\top \left( H_k P_{k|k-1} H_k^\top + R \right)^{-1}
   $$

3. **Update the State Estimate ($ \hat{X}_k $)**:

   $$
   \hat{X}_k = \hat{X}_{k|k-1} + K_k \left( Z_k - H_k \hat{X}_{k|k-1} \right)
   $$

4. **Update the Covariance Estimate ($ P_k $)**:

   $$
   P_k = \left( I_4 - K_k H_k \right) P_{k|k-1}
   $$

---

#### Summary of the EKF Process

##### **Prediction Step**:

- **State Prediction**:
  $$
  \hat{X}_{k|k-1} = f(\hat{X}_{k-1}, u_k)
  $$
  Where $ u_k = [v_l, v_r]^\top $ are the control inputs.

- **Covariance Prediction**:
  $$
  P_{k|k-1} = F_k P_{k-1} F_k^\top + Q
  $$

##### **Update Step**:

- **Kalman Gain**:
  $$
  K_k = P_{k|k-1} H_k^\top \left( H_k P_{k|k-1} H_k^\top + R \right)^{-1}
  $$

- **State Update**:
  $$
  \hat{X}_k = \hat{X}_{k|k-1} + K_k \left( Z_k - H_k \hat{X}_{k|k-1} \right)
  $$

- **Covariance Update**:
  $$
  P_k = \left( I_4 - K_k H_k \right) P_{k|k-1}
  $$

---

#### Detailed Explanation

##### **1. State Variables**

- $ x $: Position along the x-axis.
- $ y $: Position along the y-axis.
- $ \theta $: Orientation angle of the robot.
- $ v $: Linear velocity of the robot.

##### **2. Motion Model**

The robot's motion model predicts its new position and orientation based on its current state and control inputs (wheel velocities).

- **State Transition Function ($ f $)**:
  $$
  \begin{cases}
  x_{k|k-1} = x_{k-1} + v_k \Delta t \cos(\theta_{k-1}) \\
  y_{k|k-1} = y_{k-1} + v_k \Delta t \sin(\theta_{k-1}) \\
  \theta_{k|k-1} = \theta_{k-1} + \omega_k \Delta t \\
  v_{k|k-1} = v_k \\
  \end{cases}
  $$

- **Control Inputs**:
  - **Linear Velocity ($ v_k $)**:
    $$
    v_k = \frac{v_r + v_l}{2}
    $$
  - **Angular Velocity ($ \omega_k $)**:
    $$
    \omega_k = \frac{v_r - v_l}{b}
    $$

##### **3. Jacobian Matrices**

- **Jacobian of the Motion Model ($ F_k $)**:
  This matrix linearizes the motion model around the current estimate.

  $$
  F_k = \begin{bmatrix}
  1 & 0 & -v_k \Delta t \sin(\theta_{k-1}) & \Delta t \cos(\theta_{k-1}) \\
  0 & 1 & v_k \Delta t \cos(\theta_{k-1}) & \Delta t \sin(\theta_{k-1}) \\
  0 & 0 & 1 & \Delta t \\
  0 & 0 & 0 & 1 \\
  \end{bmatrix}
  $$

- **Measurement Matrix ($ H_k $)**:
  Since measurements directly observe the state, $ H_k $ is the identity matrix.

  $$
  H_k = I_4
  $$

##### **4. Noise Covariances**

- **Process Noise Covariance ($ Q $)**: Represents the uncertainty in the motion model.
- **Measurement Noise Covariance ($ R $)**: Represents the uncertainty in the sensor measurements.

##### **5. Kalman Gain ($ K_k $)**

The Kalman Gain determines how much the measurements influence the updated estimate.

$$
K_k = P_{k|k-1} H_k^\top \left( H_k P_{k|k-1} H_k^\top + R \right)^{-1}
$$

##### **6. Measurement Update**

The difference between the actual measurement and the predicted measurement (residual):

$$
\text{Residual} = Z_k - H_k \hat{X}_{k|k-1}
$$

Updated state estimate:

$$
\hat{X}_k = \hat{X}_{k|k-1} + K_k \left( Z_k - H_k \hat{X}_{k|k-1} \right)
$$

Updated covariance estimate:

$$
P_k = \left( I_4 - K_k H_k \right) P_{k|k-1}
$$

---

#### **Notes**
- **Units**: All distances are in millimeters (mm), angles in radians, and time in seconds.
- **Assumptions**:
  - The robot moves on a plane with differential drive kinematics.
  - Process and measurement noises are zero-mean Gaussian.
  - The system is mildly nonlinear, suitable for EKF.

### **Process Noise and Measurement Noise Covariance Matrices Derivation**

This section demonstrates the derivation of the covariance matrix for process noise and measurement noise in a Thymio robotic system. The key objectives include:

- Collecting data for positional, angular, and velocity measurements.
- Computing variances and covariances for these variables.
- Formulating a diagonalized covariance matrix for use in Extended Kalman Filter (EKF) implementations.


#### **Process Noise Covariance Matrix Derivation**
In our system, we see the odometry obtained by the motor speed sensors of Thymio robot as the "ground truth". The process noise covariance matrix Q should reflect the uncertainty in the odometry data. we can determine Q by gathering data from the wheel speed sensor and analyzing the noise in the odometry outputs.

In [64]:
from tdmclient import ClientAsync
client = ClientAsync()
node = await client.wait_for_node()
await node.lock()

In [65]:
from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False


In [66]:
acquire_data = True
Ts = 0.1
thymio_data = []

def motors(left, right):
    return {
        "motor.left.target": [left],
        "motor.right.target": [right],
    }

def get_data():
    thymio_data.append({"ground":list(node["prox.ground.reflected"]), 
                        "sensor":list(node["prox.ground.reflected"]),
                        "left_speed":node["motor.left.speed"],
                        "right_speed":node["motor.right.speed"]})
    

if acquire_data:
    await node.wait_for_variables() # wait for Thymio variables values
    rt = RepeatedTimer(Ts, get_data) # it auto-starts, no need of rt.start()

    try:
        # time.sleep would not work here, use asynchronous client.sleep method instead
        await client.sleep(1)
        node.send_set_variables(motors(50, 50))
        await client.sleep(10) 
    finally:
        rt.stop() # better in a try/finally block to make sure the program ends!
        node.send_set_variables(motors(0, 0))
else:
    thymio_data = []

In [67]:
wheel_base = 95 # The wheel base of Thymio robot measured by a ruler (mm)

# Extract data for motor speed
v_l = [x["left_speed"] for x in thymio_data]
v_r = [x["right_speed"] for x in thymio_data]

# Convert the data into mm/s (conversion factor "2688.170234" is derived by 
# putting a pencil in Thymio robot and measure the trajectory plotted by it with a ruler)
v_l = np.array(v_l) / 2.688170243
v_r = np.array(v_r) / 2.688170243
vel_hat = [0]
omg = [0]
x_hat = [0]
y_hat= [0]
theta_hat = [0]

for i in range(len(v_l) - 1):
    # Update the states
    vel_hat.append((v_r[i] + v_l[i]) / 2)
    omg.append((v_l[i] - v_r[i]) / wheel_base)
    theta_hat.append(theta_hat[-1] + omg[i] * Ts)
    x_hat.append(x_hat[-1] + Ts * vel_hat[i] * np.sin(theta_hat[i]))
    y_hat.append(y_hat[-1] + Ts * vel_hat[i] * np.cos(theta_hat[i]))


<span style="color: #2980B9 ;">
<blockquote>
    We calculate the variances from 25 because befor 25 time steps it is transient and we don't want the transient data to effect the variances of the steady state.

In [None]:
var_theta = np.var(theta_hat[25:])
var_vel = np.var(vel_hat[25:])
print("The velocity variance in m^2/s^2 is {}".format(var_vel))
print("The theta variance in rad^2 is {}".format(var_theta))

##### **X and Y Variances Derivation**
In the following cells, we ask thymio to remain still for 10 seconds and collect the data from wheel speed sensors. We then convert the speed of the left, right motors into the states we need via the dynamic equations of our system, and we calculate the variances for x position and y position.

In [69]:
acquire_data = True
Ts = 0.1
thymio_data = []

def motors(left, right):
    return {
        "motor.left.target": [left],
        "motor.right.target": [right],
    }

def get_data():
    thymio_data.append({"ground":list(node["prox.ground.reflected"]), 
                        "sensor":list(node["prox.ground.reflected"]),
                        "left_speed":node["motor.left.speed"],
                        "right_speed":node["motor.right.speed"]})
    

if acquire_data:
    await node.wait_for_variables() # wait for Thymio variables values
    rt = RepeatedTimer(Ts, get_data) # it auto-starts, no need of rt.start()

    try:
        # time.sleep would not work here, use asynchronous client.sleep method instead
        await client.sleep(1)
        node.send_set_variables(motors(0, 0)) # Thymio remains still for 10 sec
        await client.sleep(10) 
    finally:
        rt.stop() # better in a try/finally block to make sure the program ends!
        node.send_set_variables(motors(0, 0))
else:
    thymio_data = []

In [70]:
# Extract data for motor speed
v_l0 = [x["left_speed"] for x in thymio_data]
v_r0 = [x["right_speed"] for x in thymio_data]
v_l0 = np.array(v_l) / 2.688170243 # in mm/s
v_r0 = np.array(v_r) / 2.688170243
vel0_hat = [0]
omg0 = [0]
x_hat0 = [0]
y_hat0= [0]
theta_hat0 = [0]

for i in range(len(v_l) - 1):
    # Update the states
    vel0_hat.append((v_r0[i] + v_l0[i]) / 2)
    omg0.append((v_l0[i] - v_r0[i]) / wheel_base)
    theta_hat0.append(theta_hat0[-1] + omg0[i] * Ts)
    x_hat0.append(x_hat0[-1] + Ts * vel0_hat[i] * np.sin(theta_hat0[i]))
    y_hat0.append(y_hat0[-1] + Ts * vel0_hat[i] * np.cos(theta_hat0[i]))


In [None]:
var_x = np.var(x_hat0[25:])
var_y = np.var(y_hat0[25:])
print("The x variance in mm^2 is {}".format(var_x))
print("The y variance in mm^2 is {}".format(var_y))

##### **Process Noise Covariance Matrix Q**
We then combine the obtained variances into a diagonal covariance matrix.

In [None]:
Q = np.diag([var_x, var_y, var_theta, var_vel]) # Process noise covariance
print("The process noise covariance matrix Q is: ")
print(Q)

#### **Measurement Noise Covaraince Matrix Derivation**
The measurement noise covariance matrix R should reflect the uncertainty in the camera-based pose estimation (from the ArUco marker).
##### **Measurement Noise Covariance Matrix Derivation**
In the following cell, we detect the position, yaw angle, and velocity via computer vision, and thus gather the data and calculate the variance of each state. Last, we combine them into a diagonal matrix which is the measurement noise covariance matrix R.

In [None]:
def detect_aruco_marker(image, target_id=2):
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)

    corners, ids, _ = detector.detectMarkers(image)

    if ids is not None:
        for corner, marker_id in zip(corners, ids):
            if marker_id[0] == target_id:  # Only process the marker with ID = 2
                points = corner[0]
                center_y = int(points[:, 0].mean())
                center_x = int(points[:, 1].mean())

                # Compute angle
                vector = points[1] - points[0]
                angle = -np.arctan2(vector[1], vector[0])

                # Draw the marker and annotate
                cv2.polylines(image, [corner.astype(int)], True, (0, 255, 0), 2)
                return image, (center_x, center_y), angle

    return image, None, None

# Main loop
cap = cv2.VideoCapture(Camera)
Position_x_before = 0
Position_y_before = 0
previous_time = time.time()

# Storage for measurements
measurements = []  # To store [x (mm), y (mm), theta (deg), v (mm/s)]

while True:
    ret, frame = cap.read()
    if not ret:
        break

    processed_image, position, angle = detect_aruco_marker(frame, target_id=2)

    if position:
        # Convert to millimeters
        x_mm = position[0] * pixel_to_mm
        y_mm = position[1] * pixel_to_mm
        
        # Calculate velocity
        current_time = time.time()
        delta_time = current_time - previous_time
        dx = x_mm - Position_x_before
        dy = y_mm - Position_y_before
        velocity_mm_s = np.sqrt(dx**2 + dy**2) / delta_time
        Position_x_before = x_mm
        Position_y_before = y_mm
        previous_time = current_time

        # Append to measurements
        measurements.append([x_mm, y_mm, angle, velocity_mm_s])

        # Display results on the video feed
        cv2.putText(processed_image, f"x: {x_mm:.4f} m", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(processed_image, f"y: {y_mm:.4f} m", (10, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(processed_image, f"theta: {angle:.4f} rad", (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        cv2.putText(processed_image, f"v: {velocity_mm_s:.4f} m/s", (10, 90),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    # Show the processed frame
    cv2.imshow("Aruco Marker Detection", processed_image)

    # Quit with "l"
    if cv2.waitKey(1) & 0xFF == ord('l'):
        break

cap.release()
cv2.destroyAllWindows()

# Calculate covariance matrix
measurements = np.array(measurements)
R = np.cov(measurements, rowvar=False)  # rowvar=False ensures columns are treated as variables

# Simplify to diagonal matrix
R_diagonalized = np.diag(np.diag(R))

# Print diagonalized covariance matrix
print("the measuement noise covariance matrix R is: ")
print(R_diagonalized)



In [None]:
# Parameters for EKF
wheel_base = 95 # Distance between wheels (in mm)
Ts = 0.1  # Time step
R = R_diagonalized # Measurement noise covariance
X_hat = np.array([0, 0, 0, 0])  # Initial state: [x, y, theta, v]
P = np.eye(4)  # Initial covariance

def ekf_predict(X_hat, P, v_l, v_r):
    """EKF Prediction step."""
    v_k = (v_r + v_l) / 2
    omega_k = (v_r - v_l) / wheel_base
    theta_k = X_hat[2]

    F = np.array([
        [1, 0, -v_k * Ts * np.sin(theta_k), Ts * np.cos(theta_k)],
        [0, 1,  v_k * Ts * np.cos(theta_k), Ts * np.sin(theta_k)],
        [0, 0, 1,                          Ts],
        [0, 0, 0,                           1]
    ])

    X_hat = np.array([
        X_hat[0] + Ts * v_k * np.cos(theta_k),
        X_hat[1] + Ts * v_k * np.sin(theta_k),
        wrap_angle(X_hat[2] + Ts * omega_k),
        v_k
    ])

    P = F @ P @ F.T + Q
    return X_hat, P

def ekf_update(X_hat, P, z_k):
    """EKF Update step."""
    H = np.eye(4)
    K = P @ H.T @ np.linalg.inv(H @ P @ H.T + R)
    X_hat = X_hat + K @ (z_k - X_hat)
    P = (np.eye(4) - K @ H) @ P
    return X_hat, P

## **Execution of all parts in a main program**

In [None]:
import tdmclient.notebook
await tdmclient.notebook.start()

### Global Parameters
PIXEL_TO_MM = pixel_to_mm  # Conversion factor from pixels to millimeters
OBSTACLE_THRESHOLD = 2000  # Threshold for obstacle detection (can be adjusted)
goal_threshold_mm = 45  # To adjust distance to the goal to switch to the next segment 
kidnap = 0 # State of "kidnap" situation

### Trajectory
start_points, goal_points, trajectory = A_Dijkstra(matrix, start_position, goal_position)
current_segment = 0   # Tracking the current segment
robot_position = [start_points[0][0], start_points[0][1]] # Initial position of the robot
robot_orientation = 0
spLeft = 0
spRight = 0
acc_values = [0,0,0]

def detect_aruco_marker(img, target_id=2):
    """
    Function to detect the position and angle of rotation of the Aruco ID = 2 marker positioned on the robot
    
    Input : img -> Input image (projected image) 
          : target_id=2 -> Id number of the Aruco marker
    Output : image -> Image with marker indications
           : (center_x, center_y) -> Coordinates of the robot (pixels)
           : angle -> Angle of rotation of the robot (rad)
    """

    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
    parameters = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, parameters)

    corners, ids, _ = detector.detectMarkers(img)
    if ids is not None:
        for corner, marker_id in zip(corners, ids):
            if marker_id[0] == target_id:
                points = corner[0]
                center_x = int(points[:, 0].mean())
                center_y = int(points[:, 1].mean())
                # Compute angle then normalization between [-π, π]
                vector = points[1] - points[0]
                angle = np.atan2(vector[1], vector[0])
                return img, (center_x, center_y), wrap_angle(angle)
    return img, None, None

def euclidean_distance(point1, point2):
    """Calculates the distance between two points."""
    return np.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)


@tdmclient.notebook.sync_var
def motors(l_speed, r_speed):
    global motor_left_target, motor_right_target
    
    motor_left_target = l_speed
    motor_right_target = r_speed

@tdmclient.notebook.sync_var
def prox():
    global prox_horizontal
    # get the prox values   
    prox_vals = np.array(prox_horizontal)
    return prox_vals

@tdmclient.notebook.sync_var
def accelerometer():
    global acc
    # get the prox values   
    acc_vals = np.array(acc)
    return acc_vals


### Main program
cap = cv2.VideoCapture(Camera)

while current_segment < len(goal_points):
    success, frame = cap.read()
    if not success:
        print("Error with the camera")
        break

    if kidnap == 0 :
        ## EKF Prediction 
        # Simulated real inputs for v_l and v_r (left and right wheel speeds)
        v_l, v_r = spLeft, spRight 
        X_hat, P = ekf_predict(X_hat, P, v_l, v_r)
        print("Prediction :")
        print(X_hat)

        ## Aruco Marker Detection
        output_size = (Size_img_y, Size_img_x) 
        projected_frame = apply_homography(frame, homography_matrix, output_size)
        processed_frame, detected_position, detected_orientation = detect_aruco_marker(projected_frame, target_id=2)

        if detected_position:
            # Update the measurement
            z_k = [detected_position[0] * PIXEL_TO_MM,
                detected_position[1] * PIXEL_TO_MM,
                detected_orientation,
                0]  # No velocity measurement from the camera
            
            ## EKF Update
            X_hat, P = ekf_update(X_hat, P, np.array(z_k))
            print("States estimated of the robot :")
            print(X_hat)

            ## Check Distance to Current Goal
            distance_to_goal = euclidean_distance(X_hat[:2], goal_points[current_segment])
            print(goal_points[current_segment])
            if distance_to_goal < goal_threshold_mm:
                print(f"Segment {current_segment + 1} atteint.")
                current_segment += 1
                continue

            ## Astolfi Control
            spLeft, spRight = astolfi_control(X_hat[:2], X_hat[2], goal_points[current_segment])

            ## Obstacle Avoidance 
            prox_values = prox()  
            # If the robot detects an obstacle
            if max(prox_values) > OBSTACLE_THRESHOLD:
                spLeft = 0
                spRight = 0
                spLeft, spRight = obstacle_avoidance(prox_values)

            ## Motor Commands
            motors([spLeft], [spRight])

    ### Kidnapping situation
        acc_values = accelerometer()  
        # If abnormal acceleration is detected along the robot's y axis
        if abs(acc_values[1])>5:
            # We stop the robot
            motors(0, 0)
            current_segment = 0 
            spLeft = 0
            spRight = 0
            # The robot is in a kidnap situation
            kidnap = 1

    else :
        acc_values = accelerometer()  
        if abs(acc_values[1])<3:
            ## Aruco Marker Detection
            projected_frame = apply_homography(frame, homography_matrix, output_size)
            processed_frame, detected_position, detected_orientation = detect_aruco_marker(projected_frame, target_id=2)
            # If we redetect the robot marker
            if detected_position != None:
                detected_position = detected_position[::-1]
                # We apply the A* Dijkstra algorithm again, taking the robot's current position as the starting point
                start_points, goal_points, trajectory = A_Dijkstra(matrix, detected_position, goal_position)
                robot_position = [start_points[0][0], start_points[0][1]]
                robot_orientation = detected_orientation
                # The robot is no longer in a kidnap situation
                kidnap = 0


    ## Video Display
    cv2.putText(processed_frame, f"Goal: ({goal_points[current_segment][0]:.1f}, {goal_points[current_segment][1]:.1f})", 
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
    cv2.putText(processed_frame, f"Robot: ({X_hat[0]:.1f}, {X_hat[1]:.1f})", 
                (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
    cv2.putText(processed_frame, f"Orientation: {X_hat[2]:.2f}", 
                (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
    cv2.imshow("Robot Navigation", processed_frame)

    if cv2.waitKey(1) & 0xFF == ord('l'):
        break

        
# Stop motors and release resources
motors(0, 0)
cap.release()
cv2.destroyAllWindows()

## **Videos**

### Test Global and Local Navigation
<video controls>
  <source src="videos/Global_Local_nav.mp4" type="video/mp4">
</video>

### Test obstruction camera and kidnapping situation
<video controls>
  <source src="videos/Kidnap.mp4" type="video/mp4">
</video>

In [20]:
# Stop motors
motors(0, 0)

In [34]:
# Stop connection with Thymio
await tdmclient.notebook.stop()

## **Conclusion**

We successfully completed this project, meeting all the requirements specified in the brief. With more time, we could have improved the video display or implemented a more efficient optimal path method (such as Visibility Graphs or other techniques). The key to the success of this project was the communication between the different parties, which allowed us to make quicker progress on certain aspects.

This has been a very intense and highly interesting project, as it allowed us to explore all the topics covered in class through a concrete application. We would like to thank everyone who assisted us during this project.

## **References used for this project**

- Computer Vision Tutorials (from moodle week 10)
- Morphological Transformation (Image Processing I course)
- A* Dijkstra : Documents and codes from exercices session 5
- Astolfi Controller : Documents from lesson 1
- Local Avoidance with ANN : Documents and codes from exercices session 3
- Kalman Filter : https://youtu.be/wLlG66W1uQI?si=UYZdBWXx42qzs7Yx, https://youtube.com/playlist?list=PLn8PRpmsu08rSg2j8fBt4JsnBkMfiIhLI&si=mb7H4dLKVYNNxl0A