# **Mobile robotics project - Group 29**

##### **Group members**:

&nbsp;&nbsp;&nbsp;&nbsp; Brown Andrew Michael   
&nbsp;&nbsp;&nbsp;&nbsp; Elmaleh Daniel Abraham   
&nbsp;&nbsp;&nbsp;&nbsp; Evangelisti Chiara  
&nbsp;&nbsp;&nbsp;&nbsp; Ziegler Mathieu   


### **Table of Contents**
- [1. Introduction](#Introduction)
  - [1.1 Abstract](#Abstract)
  - [1.2 Environment setup](#Environment-setup)
- [2. Vision](#Vision)
  - [2.1 ](#Section-2.1)
- [3. Global Navigation](#Global-navigation)
  - [3.1](#Section-3.1)
- [4.Local Navigation](#Local-navigation)
  - [4.1 ](#Section-4.1)
- [5. Extended Kalman Filter](#Extended-Kalman-filter)
  - [5.1](#Section-5.1)


## Introduction

### Abstract


### Environment setup

## Vision

In [None]:
import numpy as np
import math as mt
import cv2


if __name__ == "__main__":
    _test=True
else:_test=True



_obs_min_area=250



class Vision:
    def __init__(self, camID, tagID, areaPX=(1080, 720), areaCM=(118, 90), calibration_filename="cal.npy"):
        self._camID=camID
        self._thymio_tagID=tagID
        self._cal_file=calibration_filename


        #self._cap=cv2.VideoCapture('http://192.168.0.56:8080/video')
        self._cap=cv2.VideoCapture(camID)
        self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        self._areaPX=areaPX
        self._areaCM=areaCM


        if _test:
            cv2.namedWindow('Video Feed', cv2.WINDOW_NORMAL)
            cv2.setMouseCallback('Video Feed',self._get_hsv_value)

        
        # Load the predefined dictionary of ArUco markers
        arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_ARUCO_ORIGINAL)
        arucoParams = cv2.aruco.DetectorParameters()

        # Detect ArUco markers in the image
        self._detector = cv2.aruco.ArucoDetector(arucoDict, arucoParams)
        self._frame=None

        self._hsv_margin=None
        self._hsv_buffer_size=30
        self._hsv_buffer = np.full((self._hsv_buffer_size, 3), np.nan)
        try:
            self._obstacles_hsv = np.load(self._cal_file)
        except FileNotFoundError:
            self._obstacles_hsv = None

        self.obstacles=None
        self.thymio_pos=None
        self.target=np.array([0,0])
        self.direction=None
        self.usable_thymio_pos=False
        self.usable_obstacles=False
        
    def get_thymio_pos(self):
        if  not self.usable_thymio_pos: return None
        pos = self.thymio_pos
        factor=self._areaCM[0]/self._areaPX[0]
        pos = [pos[0]*factor,pos[1]*factor,pos[2]]
        return pos
    
    def get_obstacles(self):
        if not self.usable_obstacles: return None
        factor=self._areaCM[0]/self._areaPX[0]
        transformed_polygons = []
        for polygon in self.obstacles:
            transformed_polygon = [([i[0][0] * factor,i[0][1] * (factor)]) for i in polygon]
            transformed_polygons.append(transformed_polygon)
        return transformed_polygons
    
    def get_target(self):
        pos = self.target
        factor=self._areaCM[0]/self._areaPX[0]
        return [pos[0]*factor,pos[1]*factor]
    
    def update(self):
        self.usable_thymio_pos=False
        self.usable_obstacles=False

        ret, self._frame = self._cap.read()
        if not ret:
            raise ValueError("Camera Missing")

        if self._warp():
            self.usable_obstacles=self._update_obstacles()
            self.usable_thymio_pos=self._update_thymio()
            if _test:
                cv2.circle(self._frame, (tuple(self.target[:2])), 5, [0,0,255],-1)
                if self.usable_obstacles: self._draw_contours()
                if self.usable_thymio_pos: self._draw_robot()

        if _test: cv2.imshow('Video Feed', self._frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            pass

        self.get_thymio_pos()

    def _get_mask(self, image, colors):
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, colors[0], colors[1])
        return mask

    def _warp(self):
        # Convert the frame to grayscale
        #gray = cv2.cvtColor(self._frame, cv2.COLOR_BGR2GRAY)

        (corners, ids, rejected) = self._detector.detectMarkers(self._frame)

        # Check if there are four markers detected

        centers=[0,0,0,0]
        # Assuming the ArUco markers are placed at the four corners of the desired area
        # Sort the corners if necessary. The order should be: top-left, top-right, bottom-right, bottom-left
        id_seen=[]
        if ids is None: return False

        for corner, id in zip(corners, ids):
            if id[0] in [0,6,2,3]:
            # Calculate the center for each marker
                center = np.mean(corner[0], axis=0)
                val=id[0]
                if val==6: val=1
                centers[val] = center
                id_seen.append(id)
            if id == 5:
                center = np.mean(corner[0], axis=0)
                pos = center+ corner[0,1]-corner[0,2]
                self._update_hsv_obs(pos)
        if sorted(id_seen) != [0,2,3,6]:
            return False
        # Flatten the corner points and convert to float32
        pts1 = np.float32([c for c in centers])
        # Define the points for the transformed image
        _width=self._areaPX[0]
        _height=self._areaPX[1]
        pts2 = np.float32([[0, 0], [_width, 0], [_width, _height], [0, _height]])

        # Compute the perspective transform matrix and apply it
        matrix = cv2.getPerspectiveTransform(pts1, pts2)
        self._frame = cv2.warpPerspective(self._frame, matrix, (_width, _height))

        return True
    
    def _update_obstacles(self):
        self.obstacles=[]
        if self._obstacles_hsv is None:
            return False
        obs_mask = self._get_mask(self._frame, self._obstacles_hsv)
        temp_contours, dump = cv2.findContours(obs_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

        obs=[]
        for i in temp_contours:
            if cv2.contourArea(i)> _obs_min_area:
                obs.append(i)


        epsilon_factor = 0.02  # This factor controls the approximation precision
        for cnt in obs:
            epsilon = epsilon_factor * cv2.arcLength(cnt, True)
            approx = cv2.approxPolyDP(cnt, epsilon, True)
            if len(approx) > 2:  # Filter out small shapes, adjust as needed
                self.obstacles.append(approx)
        return True

    def _update_thymio(self):
        gray = cv2.cvtColor(self._frame, cv2.COLOR_BGR2GRAY)
        (corners, ids, rejected) = self._detector.detectMarkers(self._frame)
        center=np.array([0,0])
        time_seen=0
        direction=None
        if ids is None: return
        for corner, id in zip(corners, ids):
            if id==self._thymio_tagID:
                center = np.mean(corner[0], axis=0)
                time_seen+=1
                direction=corner[0][0]-corner[0][1]
            if id == 7:
                self.target=np.mean(corner[0], axis=0).astype(np.int64)
        if time_seen != 1: return False
        self.direction=np.array([-direction[1], direction[0]])
        angle=mt.atan2(*self.direction[::-1])
        self.thymio_pos=[int(center[0]), int(center[1]), angle]


        return True

    def _draw_contours(self):
        for cnt in self.obstacles:
            epsilon = 0.02 * cv2.arcLength(cnt, True)
            approx = cv2.approxPolyDP(cnt, epsilon, True)
            cv2.drawContours(self._frame, [approx], -1, (0, 255, 0), 3)

    def _draw_robot(self):
        cv2.circle(self._frame, (tuple(self.thymio_pos[:2])), 5, [0,0,255],-1)
        start=tuple(self.thymio_pos[:2])
        stop =tuple((self.thymio_pos[:2] + self.direction).astype(int))
        cv2.line(self._frame,start,stop,(255,0,0),5)

    def _get_hsv_value(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            # Get the HSV value at the clicked point
            hsv_frame = cv2.cvtColor(self._frame, cv2.COLOR_BGR2HSV)
            hsv_value = hsv_frame[y, x]
            print("HSV Value at ({}, {}): {}".format(x, y, hsv_value))
    
    def _update_hsv_obs(self, pos):
        hsv_frame = cv2.cvtColor(self._frame, cv2.COLOR_BGR2HSV)
        pos=pos.astype(np.int64)
        if pos[0]>self._areaPX[0]: return
        if pos[1]>self._areaPX[1]: return
        if _test: cv2.circle(self._frame,pos, 5, [0,255,0],-1 )
        middle_point = hsv_frame[*pos[::-1]]
        self._hsv_buffer[0]=middle_point
        self._hsv_buffer=np.roll(self._hsv_buffer, 1, axis=0)
        hsv_bounds = np.array([[0, 0, 0], [180, 255, 220]])  # Assuming Hue is in 0-180
        mean=np.nanmean(self._hsv_buffer, axis=0 )
        margin=[10, 50, 50]

        # Calculate lower and upper bounds
        lower_bound = np.clip(mean - margin, hsv_bounds[0], hsv_bounds[1])
        upper_bound = np.clip(mean + margin, hsv_bounds[0], hsv_bounds[1])

        # HSV margin array
        self._obstacles_hsv = np.array([lower_bound, upper_bound])
        np.save(self._cal_file, self._obstacles_hsv)


if __name__ == '__main__': 
    vis=Vision(1, 4)
    while True:
        vis.update()


The Vision class is responsible for 3 things:
- Detection of the limits of the playground
- Localisation of the Tymio and target
- Localisation of the obstacles

Some design choice were made early:
- Use of aruco markes to detect the limits, Tymio and target
- Use of a know rectangular playground

### Detection of the limits
The first part of the update loop is to detect the aruco markers which are the limits of the playground (ID 0,1,6,3, repectively Top Left, TR, BR, BL). Once the limits are detected the image is "warped" in order to have rectangular shape. 
#### Aruco Detector
To do this, we use the aruco detector class of the opencv python library  (which is an API for the c opencv library). The documentation can be found here https://docs.opencv.org/4.x/d5/dae/tutorial_aruco_detection.html. The first step of the detection of an aruco detection is thresholding, the goal being to get a black and white image of the scene, with hopefully the black and white of the aruco each in a different color. The aruco detector class (of the C++ library) uses adaptive thresholding to reach this goal (for more details https://docs.opencv.org/3.4/d7/d4d/tutorial_py_thresholding.html), it requires transforming the image into a grayscale. Then it finds a threshold with the method "ADAPTIVE_THRESH_MEAN_C" (see https://github.com/opencv/opencv/blob/4.x/modules/objdetect/src/aruco/aruco_detector.cpp line 121). This defines the threshold for a position as the mean of square block around the position (the size of the square being defined by opencv) minux a constant C (all of the constant being optimised in opencv).

The second step is to find the contours of the aruco tag. This is done with the suzuki algorithm. This works by first getting the edge points, probably using the canny algorithm. The canny algorithms works by getting the module of the gradient of the image. This is then fed into suzuki's algorithms which builds the actual contours. The contours are then simplified using the douglas-peucker algorithm (https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm#Algorithm nothing can explain it better that this gif). This gives a  polygon with 4 corners

Once a credible polygon is found we can hop into last step, appyling a grid onto the found polygon. The number of white and black is counted in each grid cell and the bit coresponding to the square is the the one carried by the majority of pixels. This is then used to reorder the corners (in order to be able to use the orientation of the aruco)
### Localisation of the Tymio and target
Both Tymio and target have aruco tags (Tymio is 4 and target is 7). The position is found by the mean of the corners of the aruco detected on the warped image. The direction (for thymio) is found with the order of the aruco tag corners

### Localisation of the obstacles

The localisation of obstacles is done in 2 steps:
- Thresholding, this is done by bounding thresholding the warped image in the HSV space. To have a precise HSV value, we have a calibration function and calibration aruco tag (which we can pause next to the color we want to threshold, its id is 5)
- Finding contours, which done the same way as for aruco tags. It also passes through the DP algorithm because we want to limit the number of points to speed up the djikstra algorithm

## Global navigation

### Section 3.1

## Local navigation

### Section 4.1


## Kalman Filter

In [None]:
import numpy as np
import time

class KalmanFilter:
    def __init__(self, wheel_base=9.5):
        # Wheel base of the robot
        self.L = wheel_base

        # Process noise covariance
        q = 0.1 ** 2
        self.Q = q * np.eye(3)

        # Measurement noise covariance
        self.R =np.diag([0.02**2,0.02**2, np.deg2rad(5)**2])

        # Initial state estimate
        self.x_hat = np.zeros(3)

        # Initial covariance estimate
        self.P = np.eye(3)

        # Last time update was called
        self.last_time = time.time()

    def predict(self, u):
        # Calculate the current time and dt
        current_time = time.time()
        dt = current_time - self.last_time
        self.last_time = current_time

        # Extract the state for readability
        x, y, theta = self.x_hat

        # Calculate the average speed
        v = (u[0] + u[1]) / 2

        # Update state
        x += dt * v * np.cos(theta)
        y += dt * v * np.sin(theta)
        theta += dt * (u[0] - u[1]) / self.L

        # Update state estimate
        self.x_hat = np.array([x, y, theta])

        # Jacobian of the motion model
        F = np.array([[1, 0, -dt * v * np.sin(theta)],
                      [0, 1, dt * v * np.cos(theta)],
                      [0, 0, 1]])

        # Predict covariance
        self.P = np.dot(np.dot(F, self.P), F.T) + self.Q

    def update(self, z, u):
        # If the measurement is None, skip the update step
        self.predict(u)
        if z is None:
            return

        # Measurement model
        H = np.eye(3)

        # Kalman gain
        K = np.dot(np.dot(self.P, H.T), np.linalg.inv(np.dot(np.dot(H, self.P), H.T) + self.R))

        # Update state estimate
        self.x_hat += np.dot(K, (z - self.x_hat))

        # Update covariance estimate
        self.P = self.P - np.dot(np.dot(K, H), self.P)

    def get_state(self):
        return self.x_hat

The Kalman filter is responsible of keeping track of the robot's position when the camera feed breaks for whatever reason. To do this the Kalman filter uses 2 informations:
- The wheel speeds
- The Tymio position and orientation from the Vision part

The Kalman filter works in 2 steps:
- Predict step
- Update step

#### Predict step
This step determines where the robot is from the time passed since last prediction. Here the predicted position is  calculated by adding: $position = last\_position + dt \times speed$. The difference in angle comes from the difference in wheel speed divided by the wheel distance. The next state covariance (self.P) matrix is then calculated for the last one, the jacobian (F) and the process noise covariance (self.Q)

#### Update step
Here we compare the measurments we did with Vision with the predicted value. If we don't get any measurments from the camera, this part is skipped and self.P (state covariance) will keep growing until we get the camera back. If on the other hand we get the data from vision, we can calculate the Kalman gain matrix (K) which is then used to weight the measurment against the prediction (thus a prediction that has not been updated since a lot of time will have a very small weight against the camera).

## Controller

In [None]:
import math as mt
import numpy as np

class PDController:
    def __init__(self, kp=200, kd=0, default_speed=100, max_speed=200):
        self.max_speed=max_speed
        self.kp = kp  # Proportional gain
        self.kd = kd  # Derivative gain
        self.default_speed = default_speed
        self.previous_error = 0
        self.is_target_behind=False
        self.follow_angle=False

    def update(self, current_position, path):
        if path is None: return (0,0)
        self.follow_angle=(len(path)==1)
        spd=self.default_speed
        if self.follow_angle:
            dist=0
            control_signal = self._pd_control(current_position, path[0])
            spd=50
        else:
            dist=np.linalg.norm(np.array(current_position[0:2])-np.array(path[-1]))
            if dist<5:
                return (0,0)
            control_signal = self._pd_control(current_position, path[1])

        



        if self.is_target_behind:
            spd = 0  # 
        return self._calculate_wheel_speeds(control_signal, spd)
    
    def _calculate_error(self, current_position, target_position):
        # Calculate the Euclidean distance as error
        if not self.follow_angle:
            current_position=np.array(current_position)
            target_position=np.array(target_position)
            vector=target_position-current_position[:2]
            target_angle=mt.atan2(*vector[::-1])
        else: 
            target_angle=target_position
            print("PD c t", current_position[2], target_angle)
        angle_difference = target_angle-current_position[2]
        angle_difference = (angle_difference + np.pi) % (2 * np.pi) - np.pi
        self.is_target_behind = np.abs(angle_difference) > np.pi / 2
        print(angle_difference)
        return angle_difference

    def _pd_control(self, current_position, target_position):
        error = self._calculate_error(current_position, target_position)
        derivative = error + self.previous_error
        control_signal = self.kp * error - self.kd * derivative
        self.previous_error = error
        print("control_signal", control_signal)
        return control_signal
    

    def _calculate_wheel_speeds(self, control_signal, spd):
        # Implement the logic to calculate wheel speeds based on your robot's design
        # This is a placeholder for your specific implementation
        left_wheel_speed = np.clip(spd + control_signal, -self.max_speed, self.max_speed)
        right_wheel_speed = np.clip(spd - control_signal, -self.max_speed, self.max_speed)
        return int(left_wheel_speed), int(right_wheel_speed)
    def bound_value(value, min_value, max_value):
        return max(min_value, min(value, max_value))



if __name__=="__main__":
    def motors(left, right):
        return {
            "motor.left.target": [left],
            "motor.right.target": [right],
        }
    import vision
    vis = vision.Vision(0,4)
    pd  = PDController()
    from tdmclient import ClientAsync, aw # ClientAsync.aw allows us to run asychronous functions sychronously
    while True:
        vis.update()
        if vis.get_thymio_pos() is not None:
            break
    client = ClientAsync()                 # Create client object
    client.process_waiting_messages()      # Ensure connection to TDM (Thymio Device Manager)
    node = aw(client.wait_for_node()) # Ensure connection with Thymio robot (node)
    aw(node.lock())     
    while True:
        vis.update()
        pos=vis.get_thymio_pos()
        if pos is None: continue
        speeds=pd.update(pos, vis.get_target())
        node.send_set_variables(motors(*speeds))


            


The controller is a simple P controller (designed as PD) but since the kd was not necessary it was set as 0 (the robot has a very low inertia compared to the wheels torque).

It has 2 working modes:
- Follow point, this is used for global navigation, basically it calulates the angle to target and follows this angle
- Follow angle, used in local navigation, it is used to follow the direction of the inverse of the gradient of the potential field.

It is also responsible for stoping the robot one the final target is reached.