# Mapping and Perception for an autonomous robot (0510-7951)

### 2024/B

### Written by Roy Orfaig

Exercise 2:
---
**Part A** (50 points)

Localization based on Kalman Filter

**Part B** (50 points)

Localization based on Extended Kalman Filter


-------------------------------------

**Instructions**

1. Go to [**KITTI dataset**](http://www.cvlibs.net/datasets/kitti/) ,and download your specific records (chose "sync" recording)
2. Fill in the code the "TODO" section
2. Answer the question inside the sections
3. **Please copy all the results to the report:**
  - Outputs- Images, tables, scores,etc
  - Performace, analysis and your explanations.
  - Attach the completed notebook to the report package.

Good luck!

In [1]:
import sys

project_path = r"C:\Users\Nadav\Desktop\TAU\מיפוי וחישה למערכות אוטונמיות\Project_2\Project_KF_EKF_python"
sys.path.append(project_path)

import os
import io
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Ellipse
import numpy as np
from PIL import Image
import graphs
from tqdm import tqdm
import random

from data_preparation import build_LLA_GPS_trajectory,build_GPS_trajectory,add_gaussian_noise,add_gaussian_noise_dict, normalize_angle,normalize_angles_array
from utils.misc_tools import error_ellipse
from utils.ellipse import draw_ellipse
from utils.misc_tools import error_ellipse
from utils.ellipse import draw_ellipse
from utils.misc_tools import error_ellipse
from data_loader import DataLoader
from utils.plot_state import plot_state
from data_preparation import normalize_angle, normalize_angles_array



### Student name: Nadav Marciano

### ID: 305165698

### Record id (kitti): 2011_09_30_drive_0020

# Settings

In [3]:
# Please don't change that:
random.seed(1)
np.random.seed(2)

font = {'size': 20}
plt.rc('font', **font)

In [3]:
basedir = "/Users/Nadav/Desktop/TAU"#"TODO" (insert the correct path)
date = '2011_09_30'#"TODO" (insert  the correct folder)
drive = '0020' #"TODO" - (insert your correct record number)
pwd = os.getcwd()
# dat_dir = os.path.join(pwd,"data") 

dataset = DataLoader(basedir, date, drive)

# Utilties

In [4]:
def save_graphs(folder=None, file_name=None):
    """
    Saves or shows the current plot
    Args:
        folder: optional, must be provided with file_name too, the image will be saved to this path with the given file name
        file_name: optional, must be provided with folder too, the image will be saved to this path with the given file name
    """
    if not folder or not file_name:
        plt.show()
    else:
        file_name = "{}/{}.png".format(folder, file_name)
        figure = plt.gcf()  # get current figure
        number_of_subplots_in_figure = len(plt.gcf().get_axes())
        figure.set_size_inches(number_of_subplots_in_figure * 18, 18)
        ram = io.BytesIO()
        plt.savefig(ram, format='png', dpi=100)
        ram.seek(0)
        im = Image.open(ram)
        im2 = im.convert('RGB').convert('P', palette=Image.ADAPTIVE)
        im2.save(file_name, format='PNG')
        plt.close(figure)
    plt.close('all')

# Part A
## Localization based on Kalman Filter

In [5]:
#Complete the #TODO sections within the KalmanFilter class.

In [6]:
class KalmanFilter:
    """
    class for the implementation of Kalman filter
    """

    def __init__(self, enu_noise, times, vl, vf, sigma_xy, sigma_n, sigma_vx, sigma_vy, k, is_dead_reckoning=False):
        """
        Args:
            enu_noise: enu data with noise
            times: elapsed time in seconds from the first timestamp in the sequence
            sigma_xy: sigma in the x and y axis as provided in the question
            sigma_vx: sigma in the x velocity
            sigma_vy: sigma in the y velocity
            sigma_n: hyperparameter used to fine tune the filter
            is_dead_reckoning: should dead reckoning be applied after 5.0 seconds when applying the filter
            k: The factor kk used to enlarge the initial covariance matrix (this is NOT the gain K_t)
        """
        self.enu_noise = enu_noise
        self.times = times
        self.sigma_xy = sigma_xy
        self.sigma_n = sigma_n
        self.sigma_vx = sigma_vx
        self.sigma_vy = sigma_vy
        self.k = k
        self.is_dead_reckoning = is_dead_reckoning
        self.starting_time_dead_reck=5 #[seconds]

    
    def run(self):
        """
        Runs the Kalman filter

        outputs: enu_kf, covs
        """
        # Note: set the type of conavinces as dtype='float32' , use numpy
        B = np.zeros((4, 4),dtype='float32')
        U = np.zeros((B.shape[0]),dtype='float32')
        C = np.array([[1, 0, 0, 0],[0, 0, 1, 0]])

        P0 = np.array([
            [self.k * (self.sigma_xy ** 2), 0, 0, 0],
            [0, self.k * (self.sigma_vy ** 2), 0, 0],
            [0, 0, self.k * (self.sigma_xy ** 2), 0],
            [0, 0, 0, self.k * (self.sigma_vy ** 2)]
        ])

        Q = np.array([[self.sigma_xy ** 2, 0], [0, self.sigma_xy ** 2]])

        # Kalman Filter initialization  #TODO
        
        muo_tO = np.array([[self.enu_noise[0, 0]], [10], [self.enu_noise[0, 1]], [10]])
        sigma_t_minus1 = P0
            
        # Run Kalman filter 
        locations_kf = []
        sigma_kf = []
            
        for idx, (enu_point, curr_timestemp) in enumerate(zip(self.enu_noise,self.times)):
            if idx == 0: # stage 0
               ## Kalman Filter initialization (t=0)       
                muo_t_minus1 = muo_tO.flatten()
                start_timestemp = curr_timestemp
                prev_timestemp = curr_timestemp
                locations_kf.append(muo_t_minus1[0::2]) #extract x and y from the state vector
                sigma_kf.append(sigma_t_minus1)
                continue

            # Extract Noisy Measurement(x,y)
            z_t = np.array([[self.enu_noise[idx, 0]], [self.enu_noise[idx, 1]]])
            delta_t = (curr_timestemp - prev_timestemp).total_seconds()
            time_since_start = (curr_timestemp - start_timestemp).total_seconds()
            
            A = np.array([[1, delta_t, 0, 0], [0, 1, 0, 0], [0, 0, 1, delta_t], [0, 0, 0, 1]])
    
            R = np.array([[0, 0, 0, 0], [0, delta_t, 0, 0], [0, 0, 0, 0], [0, 0, 0, delta_t]]) * (self.sigma_n ** 2)
         
            # Kalman Filter mechanizm: Hint: Use matrix multiplication (@ operator)
            
            muo_t_bar = A @ muo_t_minus1.reshape(-1, 1) + B @ U.reshape(-1, 1)
            sigma_t_bar = A @ sigma_t_minus1 @ A.T + R
            
            K_t = sigma_t_bar @ C.T @ np.linalg.inv(C @ sigma_t_bar @ C.T + Q) 
            
            if time_since_start >= self.starting_time_dead_reck and self.is_dead_reckoning:
                 K_t = np.zeros_like(K_t)
            muo_t = (muo_t_bar + K_t @ (z_t - (C @ muo_t_bar))).flatten()  
            sigma_t = (np.eye(4) - K_t @ C) @ sigma_t_bar 

            muo_t_minus1 = muo_t
            prev_timestemp = curr_timestemp
            sigma_t_minus1 = sigma_t
            locations_kf.append(muo_t[0::2]) # Extract x and y from the state vector as a vector
            sigma_kf.append(sigma_t)

        return np.array(locations_kf),np.array(sigma_kf)


In [8]:
#Complete the #TODO sections within the ProjectQuestions1 class.

In [7]:
class ProjectQuestions1:
    def __init__(self, dataset, display_results, animation_results):
        """
        Given a Loaded Kitti data set with the following ground truth values: tti dataset and adds noise to GT-gps values
        - lat: latitude [deg]
        - lon: longitude [deg]
        - yaw: heading [rad]
        - vf: forward velocity parallel to earth-surface [m/s]
        - wz: angular rate around z axis [rad/s]
        Builds the following np arrays:
        - enu - lla converted to enu data
        - times - for each frame, how much time has elapsed from the previous frame
        - yaw_vf_wz - yaw, forward velocity and angular change rate
        - enu_noise - enu with Gaussian noise (sigma_xy=3 meters)
        """
        self.dataset = dataset
        self.display_results = display_results
        self.animation_results = animation_results
        self.enu, self.times, self.yaw_vf_vl_wz = build_GPS_trajectory(dataset)
        self.location_GT = self.enu[:,0:2]

        # add noise to the trajectory
        self.sigma_xy = 3  # Set value for sigma_xy
        self.sigma_vx = 1.3  # Set value for sigma_vx
        self.sigma_vy = 1.3  # Set value for sigma_vy
        self.sigma_n = 1.5  # Set value for sigma_n
        self.k = 1  # Set value for k

        e_noised = add_gaussian_noise(self.enu[:, 0], self.sigma_xy)
        n_noised = add_gaussian_noise(self.enu[:, 1], self.sigma_xy)
        u_noised = add_gaussian_noise(self.enu[:, 2], self.sigma_xy)
        self.enu_noise = np.stack([e_noised, n_noised, u_noised], axis=-1)



        self.fig_dir_path = os.getcwd() + "/Results_Q1/"
        if not os.path.exists(self.fig_dir_path):
            os.makedirs(self.fig_dir_path)

    @staticmethod
    def calc_RMSE_maxE(X_Y_GT, X_Y_est, starting_point=100):
        """
        This function calculates RMSE and maxE

        Args:
            X_Y_GT (np.ndarray): ground truth values of x and y
            X_Y_est (np.ndarray): estimated values of x and y
            starting_point (int): Starting point index for calculations

        Returns:
            (float, float): RMSE, maxE
        """
        maxE = -1  # Initialize max error variable
        num_of_elements = 0  # Initialize count of elements
        e_squared_list = []  # List to store squared errors
        err_x_arr = []  # List to store errors in x
        err_y_arr = []  # List to store errors in y

        for idx in range(X_Y_GT.shape[0]):
            e_x = X_Y_GT[idx, 0] - X_Y_est[idx, 0]
            e_y = X_Y_GT[idx, 1] - X_Y_est[idx, 1]
            err_x_arr.append(e_x)
            err_y_arr.append(e_y)
            if idx > starting_point:
                e_squared_list.append(e_x ** 2 + e_y ** 2)  # Calculate squared error
                curr_E = e_x + e_y  # Calculate current error
                if curr_E > maxE:  # Update max error
                    maxE = curr_E
                num_of_elements += 1

        RMSE = np.sqrt(np.sum(e_squared_list) / num_of_elements) if num_of_elements > 0 else 0.0  # Calculate RMSE

        return RMSE, maxE, np.array(err_x_arr), np.array(err_y_arr)

    def Q1(self):
        """
        This function runs the code of question 1 of the project.
        Load data from the KITTI dataset, add noise to the ground truth GPS values,
        and apply a Kalman filter to the noisy data (enu).
        """
        locations_GT = self.enu[:, 0:2]

        # b, c
        lla_coordinates = build_LLA_GPS_trajectory(self.dataset)
        if self.display_results:
            graphs.plot_trajectory_and_height(lla_coordinates, "Longitude Latitude world coordinate trajectory",
                                              'longitude[deg]', 'latitude[deg]',
                                              "Altitude per frame in the world coordinate trajectory", 'frame[Number]',
                                              'Altitude[m]')
            save_graphs(self.fig_dir_path, "ground-truth GPS trajectory LLA")

            graphs.plot_trajectory_and_height(self.enu, "X-East Y-North coordinate trajectory", 'X-East[m]',
                                              'Y-North[m]',
                                              "Height per frame in X Y coordinate trajectory", 'frame[Number]',
                                              'Height[m]')
            save_graphs(self.fig_dir_path, "ground-truth GPS trajectory ENU")

            # d
            # Add a Gaussian noise to the ground-truth GPS data
            graphs.plot_trajectory_with_noise(self.enu, np.stack([self.enu_noise[:, 0], self.enu_noise[:, 1]], axis=-1),
                                              'Trajectory in local coordinates(ENU) with and without noise',
                                              'X-East[m]', 'Y-North[m]', 'GT Trajectory(ENU)', 'Noised Trajectory(ENU)')
            save_graphs(self.fig_dir_path, "Comparison of the ENU path with and noise")

        ### e. calibration of Kalman filter ###
        sigma_n_values = np.arange(0, 2, 0.1)
        Rmse_vec = []
        maxE_vec = []
        for sigma_n in sigma_n_values:
            kf = KalmanFilter(self.enu_noise,
                              self.dataset.get_timestamps(),
                              self.yaw_vf_vl_wz[:, 1],
                              self.yaw_vf_vl_wz[:, 2],
                              self.sigma_xy,
                              sigma_n,
                              self.sigma_vx,
                              self.sigma_vy,
                              self.k,
                              False)

            locations_kf, kf_sigma = kf.run()
            RMSE, maxE, err_x_arr, err_y_arr = ProjectQuestions1.calc_RMSE_maxE(locations_GT, locations_kf)
            Rmse_vec.append(RMSE)
            maxE_vec.append(maxE)

        min_idx_rmse = np.argmin(Rmse_vec)
        min_idx_maxE = np.argmin(maxE_vec)

        fig_e, axes_e = plt.subplots(1, 2, figsize=(12, 8))
        axes_e[0].plot(sigma_n_values, Rmse_vec)
        axes_e[0].scatter(sigma_n_values[min_idx_rmse], Rmse_vec[min_idx_rmse])
        axes_e[0].set_title('RMSE vs sigma_n values \n min for sigma_n = ' + str(round(sigma_n_values[min_idx_rmse], 3)))
        axes_e[0].set(xlabel='sigma_n', ylabel='Rmse [m]')
        axes_e[0].grid()
        axes_e[1].plot(sigma_n_values, maxE_vec)
        axes_e[1].scatter(sigma_n_values[min_idx_maxE], maxE_vec[min_idx_maxE])
        axes_e[1].set_title('maxE vs sigma_n values \n min for sigma_n = ' + str(round(sigma_n_values[min_idx_maxE], 3)))
        axes_e[1].set(xlabel='sigma_n', ylabel='maxE [m]')
        axes_e[1].grid()
        fig_e.tight_layout()
        file_name = "{}/{}.png".format(self.fig_dir_path, f"Kalman_filter_calibration")
        fig_e.savefig(file_name)

        # f - Kalman
        # Dead Reckoning OFF
        kalman_filter = KalmanFilter(self.enu_noise,
                                     self.dataset.get_timestamps(),
                                     self.yaw_vf_vl_wz[:, 1],
                                     self.yaw_vf_vl_wz[:, 2],
                                     self.sigma_xy,
                                     self.sigma_n,
                                     self.sigma_vx,
                                     self.sigma_vy,
                                     self.k,
                                     False)

        locations_kf, kf_sigma = kalman_filter.run()
        self.locations_kf = locations_kf

        if self.display_results:
            graphs.plot_trajectory_comparison(locations_GT, locations_kf)
            save_graphs(self.fig_dir_path, f"Comparison of the ground truth trajectory and the filtered trajectory (KF)")

        # RMSE, maxE, err_x_arr, err_y_arr
        RMSE, maxE, err_x_arr, err_y_arr = ProjectQuestions1.calc_RMSE_maxE(locations_GT, locations_kf)

        # Dead Reckoning ON
        kalman_filter = KalmanFilter(self.enu_noise,
                                     self.dataset.get_timestamps(),
                                     self.yaw_vf_vl_wz[:, 1],
                                     self.yaw_vf_vl_wz[:, 2],
                                     self.sigma_xy,
                                     self.sigma_n,
                                     self.sigma_vx,
                                     self.sigma_vy,
                                     self.k,
                                     True)

        locations_kf_dr, kf_sigma_dr = kalman_filter.run()

        if self.display_results:
            print(f'maxE={maxE}, RMSE={RMSE}')
            graphs.plot_trajectory_comparison_dead_reckoning(locations_GT, locations_kf, locations_kf_dr)
            save_graphs(self.fig_dir_path, f"trajectory_comparison_dead_reckoning")
            graphs.plot_error([err_x_arr, np.sqrt(kf_sigma[:, 0, 0])], [err_y_arr, np.sqrt(kf_sigma[:, 2, 2])])
            save_graphs(self.fig_dir_path, f"plot_trajectory_error")

        if self.animation_results:
            print("wait...")
            ani = graphs.build_animation(locations_GT, locations_kf_dr, locations_kf,
                                         kf_sigma[:, ::2, ::2].reshape(kf_sigma.shape[0], -1),
                                         'KF Trajectory estimation - constant velocity with dead reckoning',
                                         'X-East[m]', 'Y-North[m]', 'GT', 'Dead Reckoning', 'KF')
            graphs.save_animation(ani, self.fig_dir_path, "kf_predict_with_dead_reckoning")
            print("Done!")

        plt.close()

    def run(self):
        self.Q1()
        print("Successfully finished. All data saved in {}".format(self.fig_dir_path))


In [14]:
# Run Kalman Filter!

display_results=True
animation_results=True

project = ProjectQuestions1(dataset,display_results,animation_results)
project.run()
locations_kf=project.locations_kf

maxE=4.304851892707902, RMSE=1.6260814977429983
wait...
Creating animation
Saving animation
Animation saved
Done!
Successfully finished. All data saved in C:\Users\Nadav/Results_Q1/


# Part B
## Localization based on Extended Kalman Filter

In [15]:
#Complete the #TODO sections within the ExtendedKalmanFilter class.

In [10]:
def normalize_angle(angle):
    """
    Normalize an angle to the range [-pi, pi).
    
    Args:
        angle (float): angle in radians.
        
    Returns:
        float: normalized angle in radians.
    """
    while angle > np.pi:
        angle -= 2.0 * np.pi
    while angle < -np.pi:
        angle += 2.0 * np.pi
    return angle

In [11]:
class ExtendedKalmanFilter:
    """
    class for the implementation of the extended Kalman filter
    """
    def __init__(self, enu_noise, yaw_vf_wz_noise, times, sigma_xy, sigma_theta, sigma_vf, sigma_wz, k, is_dead_reckoning, dead_reckoning_start_sec=5.0):
        """
        Args:
            enu_noise: enu data with noise
            times: elapsed time in seconds from the first timestamp in the sequence
            sigma_xy: sigma in the x and y axis as provided in the question
            sigma_n: hyperparameter used to fine tune the filter
            yaw_vf_wz: the yaw, forward velocity and angular change rate to be used (either non noisy or noisy, depending on the question)
            sigma_theta: sigma of the heading
            sigma_vf: sigma of the forward velocity
            sigma_wz: sigma of the angular change rate
            k: hyper parameter to fine tune the filter
            is_dead_reckoning: should dead reckoning be applied after 5.0 seconds when applying the filter
            dead_reckoning_start_sec: from what second do we start applying dead reckoning, used for experimentation only
        """
        self.enu_noise = enu_noise
        self.yaw_vf_wz_noise = yaw_vf_wz_noise
        self.times = times
        self.sigma_xy = sigma_xy
        self.sigma_theta = sigma_theta
        self.sigma_vf = sigma_vf
        self.sigma_wz = sigma_wz
        self.k = k
        self.is_dead_reckoning = is_dead_reckoning
        self.dead_reckoning_start_sec = dead_reckoning_start_sec


    @staticmethod
    def calc_RMSE_maxE(X_Y_GT, YAW_GT, X_Y_est, YAW_est):
        """
        That function calculates RMSE and maxE

        Args:
            X_Y_GT (np.ndarray): ground truth values of x and y
            X_Y_est (np.ndarray): estimated values of x and y

        Returns:
            (float, float): RMSE, maxE
        """
        start_point = 100  # TODO Set the starting point for calculations
        maxE = -1   # Initialize max error variable
        num_of_elements = 0  # Initialize count of elements
        e_squared_list = []  # List to store squared errors
        err_x_arr = []  # List to store errors in x
        err_y_arr = []  # List to store errors in y
        err_yaw_arr = []  # List to store errors in yaw

        for idx in range(X_Y_GT.shape[0]):
            e_x = X_Y_GT[idx, 0] - X_Y_est[idx, 0]
            e_y = X_Y_GT[idx, 1] - X_Y_est[idx, 1]
            e_yaw = normalize_angle(YAW_GT[idx] - YAW_est[idx])
            err_x_arr.append(e_x)
            err_y_arr.append(e_y)
            err_yaw_arr.append(e_yaw)
            if idx > start_point:
                e_squared_list.append(e_x ** 2 + e_y ** 2)  # TODO calculate squared error (hint use e_x,e_y)
                curr_E = e_x + e_y  # TODO Calculate current error (hint use e_x,e_y)
                if curr_E > maxE:  # Update max error
                    maxE = curr_E
                num_of_elements += 1

        RMSE = np.sqrt(np.sum(e_squared_list) / num_of_elements) if num_of_elements > 0 else 0.0  # Calculate RMSE
        return RMSE, maxE, np.array(err_x_arr), np.array(err_y_arr), np.array(err_yaw_arr)

    def run(self):
        """
        Runs the Kalman filter

        outputs: enu_kf, covs
        """
        P0 = np.array([
            [self.k * (self.sigma_xy ** 2), 0, 0],
            [0, self.k * (self.sigma_xy * self.sigma_theta), 0],
            [0, 0, self.k * (self.sigma_theta ** 2)]
        ], dtype='float32')

        H = np.array([[1, 0, 0], [0, 1, 0]])

        R = np.array([[self.sigma_vf ** 2, 0], [0, self.sigma_wz ** 2]], dtype='float32')

        R_gal = np.zeros((3, 3), dtype='float32')
        R_gal[0, 0] = 0.00001
        R_gal[1, 1] = 0.00001
        R_gal[2, 2] = 0.00001

        Q = np.array([[self.sigma_xy ** 2, 0], [0, self.sigma_xy ** 2]], dtype='float32')

        muo_tO = np.array([[self.enu_noise[0, 0]], [self.enu_noise[0, 1]], [self.yaw_vf_wz_noise[0][0]]], dtype='float32')
        sigma_t_minus1 = P0

        locations_kf = []
        sigma_kf = []
        for idx, (enu_point, curr_timestemp, yaw_vf_wz_noise) in enumerate(zip(self.enu_noise, self.times, self.yaw_vf_wz_noise)):
            if idx == 0:
                muo_t_minus1 = muo_tO.flatten()
                start_timestemp = curr_timestemp
                prev_timestemp = curr_timestemp
                sigma_t_minus1 = P0
                locations_kf.append(muo_t_minus1)
                sigma_kf.append(sigma_t_minus1)
                continue

            z_t = np.array([enu_point[0], enu_point[1]])
            delta_t = (curr_timestemp - prev_timestemp).total_seconds()
            time_since_start = (curr_timestemp - start_timestemp).total_seconds()

            vf_t = yaw_vf_wz_noise[1]
            w_t = yaw_vf_wz_noise[2]
            ratio = vf_t / w_t
            
            G = np.array([
                [1, 0, float(-ratio * np.cos(muo_t_minus1[2]) + ratio * np.cos(muo_t_minus1[2] + w_t * delta_t))],
                [0, 1, float(-ratio * np.sin(muo_t_minus1[2]) + ratio * np.sin(muo_t_minus1[2] + w_t * delta_t))],
                [0, 0, 1]
            ])

            non_linear_muo = np.array([
                -ratio * np.sin(muo_t_minus1[2]) + ratio * np.sin(muo_t_minus1[2] + w_t * delta_t),
                ratio * np.cos(muo_t_minus1[2]) - ratio * np.cos(muo_t_minus1[2] + w_t * delta_t),
                w_t * delta_t
            ], dtype='float32')

            V = np.array([
                [(-1 / w_t) * np.sin(muo_t_minus1[2]) + (1 / w_t) * np.sin(muo_t_minus1[2] + w_t * delta_t), 
                 (vf_t / (w_t ** 2)) * np.sin(muo_t_minus1[2]) - (vf_t / (w_t ** 2)) * np.sin(muo_t_minus1[2] + w_t * delta_t) + (vf_t / w_t) * np.cos(muo_t_minus1[2] + w_t * delta_t) * delta_t],
                [(1 / w_t) * np.cos(muo_t_minus1[2]) - (1 / w_t) * np.cos(muo_t_minus1[2] + w_t * delta_t), 
                 (-vf_t / (w_t ** 2)) * np.cos(muo_t_minus1[2]) + (vf_t / (w_t ** 2)) * np.cos(muo_t_minus1[2] + w_t * delta_t) + (vf_t / w_t) * np.sin(muo_t_minus1[2] + w_t * delta_t) * delta_t],
                [0, delta_t]
            ], dtype='float32')

            muo_t_bar = muo_t_minus1 + non_linear_muo
            sigma_t_bar = G @ sigma_t_minus1 @ G.T + V @ R @ V.T + R_gal

            K_t = sigma_t_bar @ H.T @ np.linalg.inv(H @ sigma_t_bar @ H.T + Q)
                        
            if time_since_start >= self.dead_reckoning_start_sec and self.is_dead_reckoning:
                
                K_t = np.zeros_like(K_t)
            muo_t = muo_t_bar + K_t @ (z_t - H @ muo_t_bar)
            sigma_t = (np.eye(3) - K_t @ H) @ sigma_t_bar

            muo_t_minus1 = muo_t
            muo_t_minus1[-1] = normalize_angle(muo_t_minus1[-1])
            prev_timestemp = curr_timestemp
            sigma_t_minus1 = sigma_t
            locations_kf.append(muo_t)
            sigma_kf.append(sigma_t)

        return np.array(locations_kf), np.array(sigma_kf)

In [12]:
#Complete the #TODO sections within the ProjectQuestions2 class.

In [36]:
class ProjectQuestions2:
    def __init__(self, dataset,display_results,save_animation,locations_kf):
        """
        Given a Loaded Kitti data set with the following ground truth values: tti dataset and adds noise to GT-gps values
        - lat: latitude [deg]
        - lon: longitude [deg]
        - yaw: heading [rad]
        - vf: forward velocity parallel to earth-surface [m/s]
        - wz: angular rate around z axis [rad/s]
        Builds the following np arrays:
        - enu - lla converted to enu data
        - times - for each frame, how much time has elapsed from the previous frame
        - yaw_vf_wz - yaw, forward velocity and angular change rate
        - enu_noise - enu with Gaussian noise (sigma_xy=3 meters)
        - yaw_vf_wz_noise - yaw_vf_wz with Gaussian noise in vf (sigma 2.0) and wz (sigma 0.2)
        """
        self.dataset = dataset
        self.display_results=display_results
        self.save_animation=save_animation
        self.locations_kf=locations_kf
        self.enu, self.times, self.yaw_vf_vl_wz = build_GPS_trajectory(self.dataset)
        self.location_GT = self.enu[:,0:2]
        # # # add noise to the trajectory
        self.sigma_xy = 3
        
        e_noised = add_gaussian_noise(self.enu[:, 0], self.sigma_xy)
        n_noised = add_gaussian_noise(self.enu[:, 1], self.sigma_xy)
        u_noised = add_gaussian_noise(self.enu[:, 2], self.sigma_xy)
        self.enu_noise = np.stack([e_noised, n_noised, u_noised], axis=-1)

        self.sigma_theta = 0.18  # Set value for sigma_theta
        self.sigma_vf = 2 # Set value for sigma_vf
        self.sigma_wz = 0.2  # Set value for sigma_wz
        self.k =  1
        
        yaw_noised = add_gaussian_noise(self.yaw_vf_vl_wz[:, 0], self.sigma_theta)
        vf_noised = add_gaussian_noise(self.yaw_vf_vl_wz[:, 1], self.sigma_vf)
        wz_noised = add_gaussian_noise(self.yaw_vf_vl_wz[:, 3], self.sigma_wz)
        self.yaw_vf_wz_noise = np.stack([yaw_noised, vf_noised, wz_noised], axis=-1)

        self.fig_dir_path = os.getcwd() + "/Results_Q2/"
        if not os.path.exists(self.fig_dir_path):
            os.makedirs(self.fig_dir_path)

    
    def Q2(self):

        """
        That function runs the code of question 2 of the project.
        Load data from the KITTI dataset, add noise to the ground truth GPS values, yaw rate, and velocities, and apply a Extended Kalman filter to the noisy data.
        """

        if self.display_results:
            # c, d, e, f -          # plot vf and wz with and without noise
            graphs.plot_yaw_yaw_rate_fv(self.yaw_vf_vl_wz[:,0],self.yaw_vf_vl_wz[:,3],self.yaw_vf_vl_wz[:,1])
            save_graphs(self.fig_dir_path,F"Yaw_Yaw_rate_fv")

            graphs.plot_vf_wz_with_and_without_noise(self.yaw_vf_vl_wz[:,[0,1,3]],self.yaw_vf_wz_noise)
            save_graphs(self.fig_dir_path,F"vf_wz_with_and_without_noise")

        ekf = ExtendedKalmanFilter(self.enu_noise,
                                   self.yaw_vf_wz_noise,
                                   self.dataset.get_timestamps(),
                                   self.sigma_xy,
                                   self.sigma_theta,
                                   self.sigma_vf,
                                   self.sigma_wz,
                                   self.k,
                                   False)
        
        locations_ekf_x_y_yaw, sigma_x_xy_yx_y_t = ekf.run()
        locations_ekf = locations_ekf_x_y_yaw[:,0:2]
        self.locations_ekf = locations_ekf
        RMSE, maxE, err_x_arr, err_y_arr, err_yaw_arr = ekf.calc_RMSE_maxE(self.location_GT, self.yaw_vf_vl_wz[:,0], locations_ekf, locations_ekf_x_y_yaw[:,2])

        if self.display_results:
            print(f'maxE{maxE}, RMSE={RMSE}')
            graphs.plot_trajectory_comparison(self.location_GT,locations_ekf)
            save_graphs(self.fig_dir_path,f"plot_trajectory_comparison")

            graphs.plot_error([err_x_arr,np.sqrt(sigma_x_xy_yx_y_t[:,0,0])],[err_y_arr,np.sqrt(sigma_x_xy_yx_y_t[:,1,1])],[err_yaw_arr,np.sqrt(sigma_x_xy_yx_y_t[:,2,2])])
           # save_graphs(self.fig_dir_path,"ground-truth GPS trajectory ENU")
            save_graphs(self.fig_dir_path,f"plot_error_EKF_vs_GT")

        ekf = ExtendedKalmanFilter(self.enu_noise,
                                   self.yaw_vf_wz_noise,
                                   self.dataset.get_timestamps(),
                                   self.sigma_xy,
                                   self.sigma_theta,
                                   self.sigma_vf,
                                   self.sigma_wz,
                                   self.k,
                                   True)

        locations_ekf_dr, ekf_sigma_dr = ekf.run()
        locations_ekf_dr_x_y_yaw, ekf_sigma_dr = ekf.run()
        locations_ekf_dr = locations_ekf_dr_x_y_yaw[:,0:2]

        if self.display_results:
            graphs.plot_trajectory_comparison_dead_reckoning(self.location_GT, locations_ekf, locations_ekf_dr)
            save_graphs(self.fig_dir_path,f"plot_trajectory_comparison_dead_reckoning")
            graphs.plot_trajectory_comparison_kf_ekf(self.location_GT, self.locations_ekf, self.locations_kf)
            save_graphs(self.fig_dir_path,f"trajectory_comparison_kf_ekf")
     

        if self.save_animation:
            print("wait...")
            ani = graphs.build_animation(self.location_GT, locations_ekf_dr, locations_ekf, sigma_x_xy_yx_y_t[:, :2, :2].reshape(sigma_x_xy_yx_y_t.shape[0], -1),'EKF Trajectory estimation - constant velocity with dead reckoning', 'X-East[m]', 'Y-North[m]', 'GT', 'Dead Reckoning', 'EKF') #hint- graphs.build_animation)
            
            graphs.save_animation(ani, self.fig_dir_path, "EKF_predict_with_dead_reckoning")
            print("Done!...")
            plt.close()

    def get_odometry(self, sensor_data):
        """
        Args:
            sensor_data: map from a tuple (frame number, type) where type is either ‘odometry’ or ‘sensor’.
            Odometry data is given as a map containing values for ‘r1’, ‘t’ and ‘r2’ – the first angle, the translation and the second angle in the odometry model respectively.
            Sensor data is given as a map containing:
              - ‘id’ – a list of landmark ids (starting at 1, like in the landmarks structure)
              - ‘range’ – list of ranges, in order corresponding to the ids
              - ‘bearing’ – list of bearing angles in radians, in order corresponding to the ids

        Returns:
            numpy array of of dim [num of frames X 3]
            first two components in each row are the x and y in meters
            the third component is the heading in radians
        """
        num_frames = len(sensor_data) // 2
        state = np.array([[0, 0, 0]], dtype=float).reshape(1, 3)
        for i in range(num_frames):
            curr_odometry = sensor_data[i, 'odometry']
            t = np.array([
                curr_odometry['t'] * np.cos(state[-1, 2] + curr_odometry['r1']),
                curr_odometry['t'] * np.sin(state[-1, 2] + curr_odometry['r1']),
                curr_odometry['r1'] + curr_odometry['r2']
            ]).reshape(3, 1)
            new_pos = state[-1, :].reshape(3, 1) + t
            state = np.concatenate([state, new_pos.reshape(1, 3)], axis=0)
        return state

    
    def run(self):
        self.Q2()
        print("Successfully finished. All data saved in {}".format(self.fig_dir_path))


In [38]:
display_results=True
save_animation=True
project = ProjectQuestions2(dataset,display_results,save_animation,locations_kf)
project.run()

maxE3.5285106239445625, RMSE=1.3169028684098618
wait...
Creating animation
Saving animation
Animation saved
Done!...
Successfully finished. All data saved in C:\Users\Nadav/Results_Q2/
