# Kalman Filter
In a nutshell, a Kalman filter is a method to estimate the current state of a noisy system based on the previous state and measurements used to correct. 

Historically, it was built to monitor the position and speed of an orbiting vehicle. However, KF can be applied to non physical systems (e.g., [economic system](https://arxiv.org/abs/1811.11618))

## Process State
First, we need a **state**. A state is a representation of all the parameters needed to describe the current system and perform its prediciton. In general, the state is a **vector**, denoted ***x***.

## Prediction (using a model and a control input)

Second, we need a **model**. The model describes how the system behaves. It may be an ideal representation or a simplified version of our system. It may be justified by some physical laws (e.g., in robotics) or by some empirical analysis (e.g., finance).

The model is expressed as follows:

$$
x_k = A x_{k-1} + B u_k + w
$$

where:
1. **A** represents the state-transition model (i.e., how does **x** change without considering any input to the system). Of course, the column and row size must be equal to the size of **x**. Sometimes in other writtings **A** is named **F**.
2. **$u_k$** is the **control signal** applied to the system. **$u_k$** does not necessarily need to be of same size as ***x*** (e.g., control input of turtlebot might be the angle rotated by both wheels, while its state is [x, y, $\theta$]). 
3. **$B$** describes how the system reacts to the **control signal**. The number of rows must be equal to the size of ***$u_k$***, and the columns equal to the size of ***x***.
4. **$w$** adds noise to our model, representing the noise of $u_k$ but also anything else that we do not measure in our model. Usually we don't know the actual value of the noise (or at least some part of it), and we make assumtions about it. Noise is assumed to be zero mean Gaussian white noise with covariance $W$ : $w \sim \mathcal{N}(0, W)$


## Update (using measurements)

Third, we need to use **measurements** to improve our model. When new data arrives, we want to change our state estimate to reflect our improved understanding of the current situation. Of course, what we measure does not have to be exactly the state ***x***, as long as it can be related.
In general, the measurement should be a vector, denoted by ***z***. Measurements are modeled to be noisy to reflect that we do not measure perfectly. Hence, the equation to model measurements would be:

$$
z = H x_k + v_k
$$

Where:

1. $v_k$ is the measurement noise, assumed to be zero mean Gaussian white noise with covariance $R_k$ : $v_k \sim \mathcal{N}(0, R_k)$. $v_k$ is a vector of same size as $z$.
2. ***H*** is the observation model, which maps the true state ***x*** to the observation. It's a matrix with rows equal to the number of measurement variables and columns equal to the number of state variables.

When we get some measurements $z_k$, we can use the measurement model to compute what we expect those measurements to be, based on our current state. The **expected measurements** are defined by:

$$
\bar{z}_k = H \bar{x}_k
$$

Intuitively, what me measure ($z_k$) and the expected measurements ($\bar{z}_k$) will be slightly different from 0, due to errors in the measurements but also in the belief state ($\bar{x}_k$). We will call this difference $y_k = z_k - \bar{z}_k$ the **innovation**. It represents the bias of our measurement estimation. Obviously, if everything were perfect, the innovation should be zero. 

Since the innovation depends on gaussian variables, $y$ is also gaussian. Its associated covariance matrix is given by:

$$
S_k = Cov(H\bar{x}_k + v_k) = H P_k H^T + R_k
$$



Now, we want to incorporate the innovation to our model (i.e., update it). But we cannot add it blindly in the state equation. We should multiply this innovation by a matrix factor that reflects somehow the correlation between our measurement bias and our state bias. Hence, we will compute a *correction*, or *update* to our state equation as follows:

$$
x_k = \bar{x}_k + K_k y_k
$$

with its associated covariance: 

$$
P_k = (I - K_k H) P_k
$$

where the matrix $K_t$ is called the **Kalman gain**.  Intuetively, we can easily understand the meaning of K by the following rules of thumb:

1. The more noisy our measurement is, the less precis it is. Hence the innovation that represents the bias in our measurement may not be real innovation, but rather an artifact of the measurement noise. Thus, the larger the measurement noise, the lower the Kalman gain should be. If $K_k$ is close to 0, then  $x_k = \bar{x}_k$ and $P_k = P_k$, which is the same as doing no update.
2. The more noisy our process state ($x_k$) is, the more important the innovation should be taken into account. Hence, the larger the process state variance, the larger the Kalman gain should be.

By intuition, we would expect the Kalman gain to be something like:

$$
K_k \sim \tfrac{Process~Noise}{Measurement~Noise}
$$

And as expected, the real equation for the Kalman gain is closely related to our intuition, and given by:

$$
K_k = P_k H^T S_k^{-1}
$$


## Algorithm

Now, all equations in the Kalman filter have been presented. Summarizing, the algorithm looks like: 

#### Predict

1. Predicted (a priori) state estimate: $\hat{x}_k = A \hat{x}_{k-1} + B u_k$
2. Predicted (a priori) covariance estimate: $P_k = A P_{k-1} A^T + Q$

#### Update

3. Innovation residual: $\bar{y}_k = z_k - H \bar{x}_k$
4. Innovation covariance: $S_k = H P_k H^T + R_k$
5. Optimal Kalman Gain: $K_k = P_k H^T S_k^{-1}$
6. Updated (a posteriori) state estimate: $\hat{x}_k = \hat{x}_k + K_k \bar{y}_k$
7. Updated (a posteriori) estimate covariance: $P_k = (I - K_k H) P_k$ (*)

#### Alternative Covariance Update Equation (*)

The Updated (a posteriori) estimate covariance, is usually computed in the following alternative form:

$$
P_k = (I - K_k H_k)\bar{P}_k{(I - K_k H_k)}^T + K_k R_k K_k^T
$$

known as the *Joseph form*, which is known to be less sensitive to round-off numerical errors preserving the symmetry of $P_k$.

# Extended Kalman Filter

In this notebook, we are going to revist the Extended Kalman Filter (EKF). The EKF handles nonlinearity by linearizing the system at the point of the current estimate, and then the linear Kalman filter is used to filter this linearized system. It was one of the very first techniques used for nonlinear problems, and it remains the most common technique. 


## Linearizing the Kalman Filter

The Kalman filter uses linear equations, so it does not work with nonlinear problems. Problems can be nonlinear in two ways. First, the process model might be nonlinear. An object falling through the atmosphere encounters drag which reduces its acceleration. The drag coefficient varies based on the velocity the object. The resulting behavior is nonlinear - it cannot be modeled with linear equations. Second, the measurements could be nonlinear. For example, a radar gives a range and bearing to a target. We use trigonometry, which is nonlinear, to compute the position of the target.

For the linear filter we have these equations for the process and measurement models:

$$
x_k = A x_{k-1} + B u_k + w
$$
$$
z = H x_k + v_k
$$

For the nonlinear model the linear expression $x_k = A x_{k-1} + B u_k + w$ is replaced by a nonlinear function $x_k = f(\mathbf x_k, \mathbf u, w)$, and the linear expression $z = \mathbf{Hx}$ is replaced by a nonlinear function $z = h(\mathbf x)$:

$$
x_k = f(x_{k-1}, u_{k-1}) + w
$$
$$ 
z_k = h(x_k) + v_k
$$

You might imagine that we could proceed by just using these non-linear equations in the Kalman Filter formulation. But if you remember lectures about **Nonlinear Filtering**, passing a Gaussian through a nonlinear function results in a probability distribution that is no longer Gaussian. So this will not work.

The EKF does not alter the Kalman filter's linear equations. Instead, it linearizes the nonlinear equations at the point of the current estimate, and uses this linearization in the linear Kalman filter.

We linearize $f(\mathbf x, \mathbf u)$, and $h(\mathbf x)$ by taking the partial derivatives of each to evaluate $\mathbf F$ and $\mathbf H$ at the point $\mathbf x_t$ and $\mathbf u_t$. We call the partial derivative of a matrix the [*Jacobian*](https://en.wikipedia.org/wiki/Jacobian_matrix_and_determinant). This gives us the the discrete state transition matrix and measurement model matrix:

$$
\begin{aligned}
\mathbf F 
&= {\frac{\partial{f(\mathbf x_k, \mathbf u_k, w_k)}}{\partial{\mathbf x}}}\biggr|_{{\mathbf x_k},{\mathbf u_k}} \\
\mathbf H &= \frac{\partial{h(\bar{\mathbf x}_k)}}{\partial{\bar{\mathbf x}}}\biggr|_{\bar{\mathbf x}_k} 
\end{aligned}
$$

## Algorithm

Now, all equations in the Extended Kalman filter have been presented. Summarizing, the algorithm looks like: 

#### Predict

1. Predicted (a priori) state estimate: $\hat{x}_k = f(\hat{x}_{k-1}, u_k)$
2. Predicted (a priori) covariance estimate: $P_k = F P_{k-1} F^T + W Q W^T$

#### Update

3. Innovation residual: $\bar{y}_k = z_k - h(\bar{x}_k)$
4. Innovation covariance: $S_k = H P_k H^T + R_k$
5. Optimal Kalman Gain: $K_k = P_k H^T S_k^{-1}$
6. Updated (a posteriori) state estimate: $\hat{x}_k = \hat{x}_k + K_k \bar{y}_k$
7. Updated (a posteriori) estimate covariance: $P_k = (I - K_k H) P_k$ (*)


# Implementation

Now, we will implement a few examples of EKF for tracking a robot pose using different data measurements. We will use synthetic data from CSV files. These CSVs have been generated so the same line repesents the same time stamp for each file. As you will see, this is done to simplify csv processing. If in a specific time there is no measurement data, the row will be filled with **None** (or similar). 

In [1]:
#Necessary to plot animations in jupyter (Remove it if you convert the notebook to pure python)
%config Completer.use_jedi = False
%matplotlib notebook 

import matplotlib.pyplot as plt
import matplotlib.animation as anim
import matplotlib.patches as patches
from matplotlib import transforms as trans

import csv
import numpy as np
import math
import copy

## Odometry readings
An **Odom** class is defined to encapsulate a odometry measurement.
Then, we define a function to read the whole csv file and return an ordered list of Odom objects.

In [2]:
class Odom:
    def __init__(self, time, u, Q, dt):
        self.time = time
        self.u = u
        self.Q = Q
        self.dt = dt

def ReadOdomCSV(csv_file):
    data = []
    with open(csv_file, 'r') as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader) #Skip header
        for row in csv_reader:
            time = float(row[0])
            dt = float(row[1])
            u = np.array(row[2:4]).astype(float)
            Q = np.array([row[4:6],row[6:8]]).astype(float)
            data.append(Odom(time, u, Q, dt))
    print("data len", len(data))

    return data

## Ground Truth Readings

The ground truth CSV file provides us the real trajectory of the robot (we know it since data is generated synthetically). We will read it from the csv and save it as a list of numpy arrays (containing $[x, y, \theta]$) which will be used for visualization.

In [3]:
def ReadGroundTruthCSV(csv_file):
    gt_data = []
    gt_time = []
    with open(csv_file, 'r') as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader) #Skip header
        for row in csv_reader:
            time = float(row[0])
            gt_data.append(np.array(row[1:4]).astype(float))
            gt_time.append(time)
    return gt_data, gt_time

## The Extended Kalman Filter Base Class

Now, we will develop a base class to implement the EKF. This class will store a **state** and its **covariance**.

It implements by default:
- prediction
- update

and requires inherited classes to implement:
- compute_f (i.e., implement $x = f(x,u)$)
- compute_Jhx (i.e., implement ${\frac{\partial{f(\mathbf x_k, \mathbf u_k, w_k)}}{\partial{\mathbf x}}}$)
- compute_Jhv (i.e., implement ${\frac{\partial{f(\mathbf x_k, \mathbf u_k, w_k)}}{\partial{\mathbf w_k}}}$)


In [4]:
class ExtendedKalmanFilter:
    "Generic Extended Kalman filter class"

    def __init__(self, x_init, P_init) -> None:
        self.x = copy.copy(x_init)
        self.P = copy.copy(P_init)

    def prediction(self, u_k, Q_k, dt): 
        assert isinstance(u_k, np.ndarray)
        assert isinstance(Q_k, np.ndarray)
        
        x_k_ = self.calculate_f(u_k, dt)
        A_k = self.calculate_Jfx(u_k, dt)
        W_k = self.calculate_Jfw(u_k, dt)
        P_k_ = A_k @ self.P @ A_k.transpose() + W_k @ Q_k @ W_k.transpose()
        
        return [x_k_, P_k_]

    
    def update(self, z_k, R_k, hx, H_k, V_k): 
        assert isinstance(z_k, np.ndarray)
        assert isinstance(R_k, np.ndarray)
        assert isinstance(H_k, np.ndarray)
        assert isinstance(V_k, np.ndarray)

        I = np.eye(self.P.shape[0], self.P.shape[1]) # Identity Matrix
 
        # Compute innovation
        y_k = z_k - hx

        S_k = H_k @ self.P @ H_k.transpose() + V_k @ R_k @ V_k.transpose()
        K_k = self.P @ H_k.transpose() @ np.linalg.inv(S_k)
        x_k = self.x + K_k @ y_k

        # P_k = (I - K_k @ H_k) @ self.P @ (I - K_k @ H_k).transpose()
        Aux = (I - K_k @ H_k)
        P_k = Aux @ self.P @ Aux.transpose()
        
        return [x_k, P_k]
        
    '''
    Implements x = f(x, u)
    Input:
    - u: numpy.array of shape (m,) --> the input control
    - dt: float 
    return: numpy.array of shape (n,)
    '''
    def calculate_f(self, u, dt = None) -> np.ndarray:
        print("Calculate F NOT IMPLEMENTED!!")
        pass

    '''
    Implements the Jacobian of f(x, u, w) with respect to x, at point x
    Input:
    - u: numpy.array of shape (m,) --> the input control
    - dt: float 
    return: numpy matrix of shape (n,n)
    '''
    def calculate_Jfx(self, u, dt = None) -> np.ndarray:
        print("Calculate JFX NOT IMPLEMENTED!!")
        pass

    '''
    Implements the Jacobian of f(x, u, w) with respect to w, at point x
    Input:
    - u: numpy.array of shape (m,) --> the input control
    - dt: float 
    return: numpy matrix of shape (n,a), where a is the length of the modeled noise
    '''
    def calculate_Jfw(self, u, dt = None) -> np.ndarray:
        print("Calculate JFW NOT IMPLEMENTED!!")
        pass



## Implementing the Prediction step for a differential drive robot using a odometry model

First, we need to be able to predict the robot state according to a given model. Implement the prediction equations to estimate $x = [p_x, p_y, p_{\theta}]$ based on a odometry model ($u = [v^T, w^T]^T$), where $p_x$ and $p_y$ are the vehicle positions in $x$ and $y$, $p_{\theta}$ is the vehicle orientation, and $v$ and $w$ are the vehicle's linear velocity and angular velocity. For now, we will assume we have no other measurements to perform updates. 

The model will be encapsulated in a class **DiffDriveOdomEKF** which inherits from **ExtendedKalmanFilter**. Here, you are requested to fill the methods:

- calculate_f
- calculate_Jfx
- calculate_Jfw

In [5]:
class DiffDriveOdomEKF(ExtendedKalmanFilter):
    def __init__(self, x_init, P_init) -> None:
        super().__init__(x_init, P_init)

    def calculate_f(self, u, dt) -> np.ndarray:   
        x_ = self.x[0] + np.cos(self.x[2])* u[0]*dt 
        y_ = self.x[1] + np.sin(self.x[2])* u[0]*dt 
        theta_ = self.x[2] + u[1]*dt
        return np.array([x_,y_,theta_])

    def calculate_Jfx(self, u, dt) -> np.ndarray:
        return np.array([[1, 0, -np.sin(self.x[2])*u[0]*dt],
                         [0, 1, np.cos(self.x[2])*u[0]*dt],
                         [0, 0, 1]])
        
    def calculate_Jfw(self, u, dt = None) -> np.ndarray:
        return np.array([[np.cos(self.x[2])*dt, 0],
                         [np.sin(self.x[2])*dt, 0],
                         [0                   , dt]])
    
    

## Dead Reckoning localization 

Now, we can use the prediction step to perform dead reckoning localization. Everything necessary is already implemented, so its all about calling the **prediction** method of **DiffDriveOdomEKF** with data from the odometry csv file. It is also necessary to plot what is happening, so we will create animations using matplotlib, similar to what you have done in the first lab of Hands On Manipulation.

We will define a class **DeadReckoningAnimation** which inherits from **DiffDriveOdomEKF** (so it has all the methods from **ExtendedKalmanFilter** implemented!). This class will in charge to maintain the data and update the plot to perform the animation. 


In [6]:
class DeadReckoningAnimation(DiffDriveOdomEKF):
    def __init__(self, x_init, P_init, odom_data, gt_path) -> None:
        super().__init__(x_init, P_init)
    

        self.traj_x = []
        self.traj_y = []
        
        self.odom_data = odom_data
        self.gt_path = gt_path
        
        
        # Drawing preparation (figure with one subplot)
        self.fig = plt.figure() 
        
        self.ax = self.fig.add_subplot(111, autoscale_on=False, xlim=(-0.5, 3), ylim=(-0.5, 3))
        self.ax.set_title('Dead Reckoning')
        self.ax.set_aspect('equal')
        self.ax.set(xlabel='x[m]', ylabel='y[m]')
        self.ax.grid()

        self.anim_path, = self.ax.plot([], [], 'r-', lw=1) # Line for displaying the path
        self.anim_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='r') 
        self.anim_robot_cov = patches.Ellipse((0.0, 0.0), 0.9, 0.5, 0, edgecolor='r', lw=1, facecolor='none')
        self.ax.add_patch(self.anim_robot_cov)
        self.ax.add_patch(self.anim_robot)
        
       
        gt_x = [x[0] for x in self.gt_path]
        gt_y = [x[1] for x in self.gt_path]
        self.anim_gt_path, = self.ax.plot(gt_x, gt_y, 'b-', lw=1) # Line for displaying the path

        self.anim_gt_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='b')
        self.ax.add_patch(self.anim_gt_robot)
        
        


    def iterate_anim(self, idx):
        # Do prediction
        odom = self.odom_data[idx]
        [self.x, self.P] = self.prediction(odom.u, odom.Q, odom.dt)

        # Update trajectory path
        self.traj_x.append(self.x[0])
        self.traj_y.append(self.x[1])

        # Update the drawing
        self.anim_path.set_data(self.traj_x, self.traj_y)
        ## Only way to rotate rectangle is to use set_transform
        self.anim_robot.set_transform(trans.Affine2D().rotate(self.x[2]) + trans.Affine2D().translate(self.x[0],self.x[1])+ self.ax.transData )
        
        #Calculate ellipsoide!
        v, w = np.linalg.eigh(self.P[0:2,0:2])
        v = 2.0 * np.sqrt(5.991) * np.sqrt(v) # 5.991 is the chi-square value for 95% confidence in 2DOF
        u = w[0] / np.linalg.norm(w[0])
        ell_angle = np.arctan2(u[1] , u[0])
        self.anim_robot_cov.width = v[0]
        self.anim_robot_cov.height = v[1]
        self.anim_robot_cov.set_transform( trans.Affine2D().rotate(ell_angle) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        self.anim_gt_robot.set_transform(trans.Affine2D().rotate(self.gt_path[idx][2]) + trans.Affine2D().translate(self.gt_path[idx][0],self.gt_path[idx][1])+  self.ax.transData )
        return self.anim_path, self.anim_gt_robot, self.anim_robot_cov, self.anim_robot



Now, we just initialize the animated filter and call matplotlib *FuncAnimation*. If your implementation is correct, you should see an animation of a dead-reckoning localization.

In [11]:
x_init = np.array([0.0,0.0,0.0])
P_init = np.eye(3)*0.02
odom_data = ReadOdomCSV("DataSet1/odom.csv")

[gt_data,gt_time] = ReadGroundTruthCSV("DataSet1/ground_truth.csv")
robot = DeadReckoningAnimation(x_init, P_init, odom_data, gt_data)


animation = anim.FuncAnimation(robot.fig, robot.iterate_anim, range(0,len(odom_data)), interval=2 , blit=True, repeat=False)
plt.show()

data len 4799


<IPython.core.display.Javascript object>

## EKF using GPS measurements

Now, its time to use measurements to update (and improve) our state estimate. In this exercise, we will use GPS measurements.

First, we need to read the CSV file containing the GPS data. As before, we start by defining a class **GPSMeasurement** to encapsulate the measurement data. Furthermore, a mesurement class will require to implement the following methods:

- expected_measurement (i.e., z = h(x))
- Jhx (i.e., ${\frac{\partial{h(\mathbf x_k, v_k)}}{\partial{\mathbf x}}}$)
- Jhv (i.e ${\frac{\partial{h(\mathbf x_k, v_k)}}{\partial{\mathbf v_k}}}$)

You are requested to implement these 3 methods.

Defining these methods here will allow us to implement easier all our EKF exercises.

Then, we implement a function **ReadGPSCSV** to process the csv file and return a list of GPS Measurements.

In [12]:
 class GPSMeasurement: #GPS Measurement of a Differential Drive Static Model
    def __init__(self, time, z, R):
        self.time = time
        self.z = z
        self.R = R

    '''
    Implements z = h(x)
    z is a vector of length k
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    return: numpy.array of shape (k,) --> the expected measurement
    '''
    def expected_measurement(self, x):
        return x[0:2]
        
    '''
    Implements the Jacobian of h(x,v) with respect to x, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    return: numpy matrix of shape (k, n)
    '''
    def Jhx(self, x = None) -> np.ndarray:
        return np.array([[1,0,0],[0,1,0]])
    
    '''
    Implements the Jacobian of h(x,v) with respect to v, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    return: numpy matrix of shape (k, b), where
    '''
    def Jhv(self, x = None) -> np.ndarray:
        return np.array([[1,0],[0,1]])
    
def ReadGPSCSV(csv_file):
    data = []
    with open(csv_file, 'r') as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader) #Skip header
        for row in csv_reader:
            if len(row) > 2:
                time = float(row[0])
                z = np.array(row[1:3]).astype(float)
                R = np.array([row[3:5],row[5:7]]).astype(float)
                data.append(GPSMeasurement(time, z, R))
            else:
                data.append(None)

    return data
    

Now, similar to in the DeadReckoning exercise, we will create an animation to test our equations. Now, in the **iterate_anim** the update step is added:

In [13]:
class GPSEKFAnimation(DiffDriveOdomEKF):
    def __init__(self, x_init, P_init, odom_data, gt_path, gps_data) -> None:
        super().__init__(x_init, P_init)

        self.traj_x = []
        self.traj_y = []
        
        self.odom_data = odom_data
        self.gt_path = gt_path
        self.gps_data = gps_data
        # Drawing preparation (figure with one subplot)
        self.fig = plt.figure() 
        self.ax = self.fig.add_subplot(111, autoscale_on=False, xlim=(-0.5, 3), ylim=(-0.5, 3))
        self.ax.set_title('EKF Using GPS updates')
        self.ax.set_aspect('equal')
        self.ax.set(xlabel='x[m]', ylabel='y[m]')
        self.ax.grid()

        self.anim_path, = self.ax.plot([], [], 'r-', lw=1) # Line for displaying the path
        self.anim_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='r')
        self.ax.add_patch(self.anim_robot)

       
        gt_x = [x[0] for x in self.gt_path]
        gt_y = [x[1] for x in self.gt_path]
        self.anim_gt_path, = self.ax.plot(gt_x, gt_y, 'b-', lw=1) # Line for displaying the path

        self.anim_gt_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='b')
        self.anim_robot_cov = patches.Ellipse((0.0, 0.0), 0.9, 0.5, 0, edgecolor='r', lw=1, facecolor='none')
        self.ax.add_patch(self.anim_robot_cov)
        self.ax.add_patch(self.anim_gt_robot)
        
        # Extra drawings
        self.gps_readings = self.ax.scatter([], [], c = 'green', marker = "x",s=150,zorder=5) # GPS readings

        

    def iterate_anim(self, idx):
        ## EKF
        # predictio n
        odom = self.odom_data[idx]
        [self.x, self.P] = self.prediction(odom.u, odom.Q, odom.dt)
        # update
        gps = self.gps_data[idx]
        if gps is not None:
            z = gps.z
            R = gps.R
            hx = gps.expected_measurement(self.x)
            Jhx = gps.Jhx()
            Jhv = gps.Jhv()
            [self.x, self.P] = self.update(z, R, hx, Jhx, Jhv)
            
            self.gps_readings.set_offsets(z)

        ## PLOT
        # Update trajectory path
        self.traj_x.append(self.x[0])
        self.traj_y.append(self.x[1])

        # Update the drawing
        # Update robot believe mean
        self.anim_path.set_data(self.traj_x, self.traj_y)
        self.anim_robot.set_transform(trans.Affine2D().rotate(self.x[2]) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        #Update robot believe ellipse (Uncertainty)
        v, w = np.linalg.eigh(self.P[0:2,0:2])
        v = 2.0 * np.sqrt(5.991) * np.sqrt(v) # 5.991 is the chi-square value for 95% confidence in 2DOF
        u = w[0] / np.linalg.norm(w[0])
        ell_angle = np.arctan2(u[1] , u[0])
        self.anim_robot_cov.width = v[0]
        self.anim_robot_cov.height = v[1]
        self.anim_robot_cov.set_transform( trans.Affine2D().rotate(ell_angle) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        # Update robot ground truth position
        self.anim_gt_robot.set_transform(trans.Affine2D().rotate(self.gt_path[idx][2]) + trans.Affine2D().translate(self.gt_path[idx][0],self.gt_path[idx][1])+  self.ax.transData )
     
        return self.anim_path, self.anim_gt_robot, self.anim_robot_cov, self.anim_robot, self.gps_readings

In [14]:
plt.plot()
x_init = np.array([0.0,0.0,0.0])
P_init = np.eye(3)*0.02
odom_data = ReadOdomCSV("DataSet1/odom.csv")
[gt_data,gt_time] = ReadGroundTruthCSV("DataSet1/ground_truth.csv")
gps_data = ReadGPSCSV("DataSet1/gps.csv")
robot2 = GPSEKFAnimation(x_init, P_init, odom_data, gt_data, gps_data)

gps_anim = anim.FuncAnimation(robot2.fig, robot2.iterate_anim, range(0,len(odom_data)), interval=2 , blit=True, repeat=False)
plt.show()

data len 4799


<IPython.core.display.Javascript object>

## EKF using Range measurements

In this exercise, we will localize our Differential Drive robot using range-only measurements from two pingers located at a known position. We assume we know the id of the pinger we measure (i.e., the data association, or correspondance problem is solved)

Similarly as before, we start by defining a Measurement. You are requested to implement:

- expected_measurement
- Jhx
- Jhv

And the CSV reader is provided. Note that the pingers known poses (i.e., map) is written inside the file (2nd row)

In [15]:
class PingerWithIDMeasurement:
    def __init__(self, time, z, R, id):
        self.time = time
        self.z = z
        self.R = R
        self.id = id

    '''
    Implements z = h(x)
    z is a vector of length k
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    - map: a list of np.array of shape (2,) representing the positions [x,y] of the pingers, 
           where the id of a pingers is its position in the list
    return: numpy.array of shape (k,) --> the expected measurement
    '''
    def expected_measurement(self, x, map):
        mfeat = map[self.id]
        return np.linalg.norm(x[0:2] - mfeat)

    '''
    Implements the Jacobian of h(x,v) with respect to x, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    - map: a list of np.array of shape (2,) representing the positions [x,y] of the pingers, 
           where the id of a pingers is its position in the list
    return: numpy matrix of shape (k, n)
    '''
    def Jhx(self, x, map) -> np.ndarray:
        mfeat = map[self.id]
        return np.append((x[0:2] - mfeat)/np.linalg.norm(x[0:2] - mfeat),0)
    
    '''
    Implements the Jacobian of h(x,v) with respect to v, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    return: numpy matrix of shape (k, b), where
    '''
    def Jhv(self, x = None) -> np.ndarray:
        return np.array([1])
    
    
def ReadPingsCSV(csv_file):
    data = []
    pingers_map = []
    with open(csv_file, 'r') as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader) #Skip header
        row = next(csv_reader) #Map
        num_pingers = int(row[0])
        for i in range(num_pingers):
            pingers_map.append(np.array([row[1 + 2 * i],row[2 + 2*i]]).astype(float))
        for row in csv_reader:
            time = float(row[0])
            num_pings = int(row[1])
            pings = []
            for i in range(num_pings):
                z = np.array([row[2 + i * 3]]).astype(float)
                R = np.array([[row[3 + i * 3]]]).astype(float)
                pid = int(row[4 + i*3])
                pings.append(PingerWithIDMeasurement(time, z, R, pid))
            data.append(pings)
           
    return data, pingers_map

### Implementation
After you have implemented the Measurement class, we can proceed to run our new filter. Its all about preparing the **z**, **R**, **hx**, **Jhx**, and **Jhv** from a set of Range measurements that happened at the same time. You are requested to build these vectors and matrices.

You will need these 3 methods:

- np.concatenate : it concatenates two numpy vectors 
- np.vstack : it vertically stacks two numpy matrices
- block_diag (from scipy.linalg): it diagonally stacks two matrices

See example in the next cell

In [16]:
import numpy as np
from scipy.linalg import block_diag

a = np.array([1,2,3])
b = np.array([4,5])
print("a: ", a)
print("b: ", b)
print( "np.concatenate([a,b]): ", np.concatenate([a,b]))

c = np.array([[1,2], [3,4]])
d = np.array([[5,6], [7, 8], [9,0]])

print("c: \n", c)
print("d: \n", d)
print("np.vstack((c,d)): \n", np.vstack((c,d)))

print("block_diag(c,d): \n", block_diag(c,d))


a:  [1 2 3]
b:  [4 5]
np.concatenate([a,b]):  [1 2 3 4 5]
c: 
 [[1 2]
 [3 4]]
d: 
 [[5 6]
 [7 8]
 [9 0]]
np.vstack((c,d)): 
 [[1 2]
 [3 4]
 [5 6]
 [7 8]
 [9 0]]
block_diag(c,d): 
 [[1 2 0 0]
 [3 4 0 0]
 [0 0 5 6]
 [0 0 7 8]
 [0 0 9 0]]


In [17]:
class PingsEKFAnimation(DiffDriveOdomEKF):
    def __init__(self, x_init, P_init, odom_data, gt_path, pings_data, pingers_map) -> None:
        super().__init__(x_init, P_init)

        self.traj_x = []
        self.traj_y = []
        
        self.odom_data = odom_data
        self.gt_path = gt_path
        self.pings_data = pings_data
        self.pingers_map = pingers_map
        # Drawing preparation (figure with one subplot)
        self.fig = plt.figure() 
        self.ax = self.fig.add_subplot(111, autoscale_on=False, xlim=(-0.5, 3), ylim=(-0.5, 3))
        self.ax.set_title('EKF using distance-only pingers')
        self.ax.set_aspect('equal')
        self.ax.set(xlabel='x[m]', ylabel='y[m]')
        self.ax.grid()

        self.anim_path, = self.ax.plot([], [], 'r-', lw=1) # Line for displaying the path
        self.anim_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='r')
        self.ax.add_patch(self.anim_robot)

       
        gt_x = [x[0] for x in self.gt_path]
        gt_y = [x[1] for x in self.gt_path]
        self.anim_gt_path, = self.ax.plot(gt_x, gt_y, 'b-', lw=1) # Line for displaying the path

        self.anim_gt_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='b')
        self.anim_robot_cov = patches.Ellipse((0.0, 0.0), 0.9, 0.5, 0, edgecolor='r', lw=1, facecolor='none')
        self.ax.add_patch(self.anim_robot_cov)
        self.ax.add_patch(self.anim_gt_robot)
        
        # Extra drawings
        self.pingers_map_pos = self.ax.scatter([pingers_map[0][0],pingers_map[1][0]], [pingers_map[0][1],pingers_map[1][1]], c = 'black', marker = "h",zorder=5) # Pingers Locations
        self.ax.annotate("1", (pingers_map[0][0]+0.05, pingers_map[0][1]+0.05))
        self.ax.annotate("2", (pingers_map[1][0]-0.1, pingers_map[1][1]-0.1))
        self.pinger1_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 1
        self.pinger2_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 2
        

    def iterate_anim(self, idx):
        ## EKF
        # prediction
        odom = self.odom_data[idx]
        [self.x, self.P] = self.prediction(odom.u, odom.Q, odom.dt)
        # update
        z   = np.empty((0,))
        R   = np.empty((0, 0))
        hx  = np.empty((0,))
        Jhx = np.empty((0, self.x.shape[0]))
        Jhv = np.empty((0, 0))
        
        pings = self.pings_data[idx]
        
        if len(pings) > 0:
            for p in pings:                  # p is a PingerWithIDMeasurement
                z = np.concatenate([z, p.z]) #... p.z ...
                R = block_diag(R,p.R)        #... p.R ...
                hx = np.append(hx, p.expected_measurement(self.x, self.pingers_map))  #... p.expected_measurement(self.x, self.pingers_map) ...
                Jhx = np.vstack((Jhx, p.Jhx(self.x, self.pingers_map)))               #... p.Jhx(self.x, self.pingers_map) ...
                Jhv = block_diag(Jhv, p.Jhv())                                        #... p.Jhv() ...

            [self.x, self.P] = self.update(z, R, hx, Jhx, Jhv)
            self.pinger1_distance.set_data([pingers_map[0][0],self.x[0]], [pingers_map[0][1],self.x[1]])
            self.pinger2_distance.set_data([pingers_map[1][0],self.x[0]], [pingers_map[1][1],self.x[1]])

        ## PLOT
        # Update trajectory path
        self.traj_x.append(self.x[0])
        self.traj_y.append(self.x[1])

        # Update the drawing
        # Update robot believe mean
        self.anim_path.set_data(self.traj_x, self.traj_y)
        self.anim_robot.set_transform(trans.Affine2D().rotate(self.x[2]) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        #Update robot believe ellipse (Uncertainty)
        v, w = np.linalg.eigh(self.P[0:2,0:2])
        v = 2.0 * np.sqrt(5.991) * np.sqrt(v) # 5.991 is the chi-square value for 95% confidence in 2DOF
        u = w[0] / np.linalg.norm(w[0])
        ell_angle = np.arctan2(u[1] , u[0])
        self.anim_robot_cov.width = v[0]
        self.anim_robot_cov.height = v[1]
        self.anim_robot_cov.set_transform( trans.Affine2D().rotate(ell_angle) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        # Update robot ground truth position
        self.anim_gt_robot.set_transform(trans.Affine2D().rotate(self.gt_path[idx][2]) + trans.Affine2D().translate(self.gt_path[idx][0],self.gt_path[idx][1])+  self.ax.transData )
     
        return self.anim_path, self.anim_gt_robot, self.anim_robot_cov, self.anim_robot, self.pinger1_distance, self.pinger2_distance

In [18]:
plt.plot()
x_init = np.array([0.0,0.0,0.0])
P_init = np.eye(3)*0.02
odom_data = ReadOdomCSV("DataSet1/odom.csv")
[gt_data,gt_time] = ReadGroundTruthCSV("DataSet1/ground_truth.csv")
[pings_data, pingers_map] = ReadPingsCSV("DataSet1/pings.csv")
robot3 = PingsEKFAnimation(x_init, P_init, odom_data, gt_data, pings_data, pingers_map)

pings_anim = anim.FuncAnimation(robot3.fig, robot3.iterate_anim, range(0,len(odom_data)), interval=2 , blit=True, repeat=False)
plt.show()

data len 4799


<IPython.core.display.Javascript object>

# Exercices

1. Combine both GPS and Range measurements into the same EKF class, and execute the animation.
2. Improve visualization by: 
  1. Plotting the GPS measurements
  1. Plotting the pingers position
  3. Plotting the pingers measurements

In [19]:

class FusionEKFAnimation(DiffDriveOdomEKF):
    def __init__(self, x_init, P_init, odom_data, gt_path, pings_data, pingers_map, gps_data) -> None:
        super().__init__(x_init, P_init)

        self.traj_x = []
        self.traj_y = []
        
        self.odom_data = odom_data
        self.gt_path = gt_path
        self.pings_data = pings_data    # Pinger data
        self.pingers_map = pingers_map  # Pingers map
        self.gps_data = gps_data        # GPS data

        # Drawing preparation (figure with one subplot)
        self.fig = plt.figure() 
        self.ax = self.fig.add_subplot(111, autoscale_on=False, xlim=(-0.5, 3), ylim=(-0.5, 3))
        self.ax.set_title('EKF using Sensor Fusion')
        self.ax.set_aspect('equal')
        self.ax.set(xlabel='x[m]', ylabel='y[m]')
        self.ax.grid()

        self.anim_path, = self.ax.plot([], [], 'r-', lw=1) # Line for displaying the path
        self.anim_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='r',zorder=4)
        self.ax.add_patch(self.anim_robot)

       
        gt_x = [x[0] for x in self.gt_path]
        gt_y = [x[1] for x in self.gt_path]
        self.anim_gt_path, = self.ax.plot(gt_x, gt_y, 'b-', lw=1) # Line for displaying the path

        self.anim_gt_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='b')
        self.anim_robot_cov = patches.Ellipse((0.0, 0.0), 0.9, 0.5, 0, edgecolor='r', lw=1, facecolor='none')
        self.ax.add_patch(self.anim_robot_cov)
        self.ax.add_patch(self.anim_gt_robot)

        # Extra drawings

        self.pingers_map_pos = self.ax.scatter([pingers_map[0][0],pingers_map[1][0]], [pingers_map[0][1],pingers_map[1][1]], c = 'black', marker = "h",zorder=5) # Pingers Locations
        self.ax.annotate("1", (pingers_map[0][0]+0.05, pingers_map[0][1]+0.05))
        self.ax.annotate("2", (pingers_map[1][0]-0.1, pingers_map[1][1]-0.1))
        self.gps_readings = self.ax.scatter([], [], c = 'green', marker = "x",s=150,zorder=5) # GPS readings
        self.pinger1_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 1
        self.pinger2_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 2


    def iterate_anim(self, idx):
        ## EKF
        # prediction
        odom = self.odom_data[idx]
        [self.x, self.P] = self.prediction(odom.u, odom.Q, odom.dt)
        
        # GPS update
        gps = self.gps_data[idx]

        if gps is not None:
            z = gps.z
            R = gps.R
            hx = gps.expected_measurement(self.x)
            Jhx = gps.Jhx()
            Jhv = gps.Jhv()
            [self.x, self.P] = self.update(z, R, hx, Jhx, Jhv)
            print("GPS Update")

            self.gps_readings.set_offsets(z)


        # Pingers update
        z   = np.empty((0,))
        R   = np.empty((0, 0))
        hx  = np.empty((0,))
        Jhx = np.empty((0, self.x.shape[0]))
        Jhv = np.empty((0, 0))
        
        pings = self.pings_data[idx]
        
        if len(pings) > 0:
            for p in pings:                  # p is a PingerWithIDMeasurement
                z = np.concatenate([z, p.z]) #... p.z ...
                R = block_diag(R,p.R)        #... p.R ...
                hx = np.append(hx, p.expected_measurement(self.x, self.pingers_map))  #... p.expected_measurement(self.x, self.pingers_map) ...
                Jhx = np.vstack((Jhx, p.Jhx(self.x, self.pingers_map)))               #... p.Jhx(self.x, self.pingers_map) ...
                Jhv = block_diag(Jhv, p.Jhv())                                        #... p.Jhv() ...

            [self.x, self.P] = self.update(z, R, hx, Jhx, Jhv)
            self.pinger1_distance.set_data([pingers_map[0][0],self.x[0]], [pingers_map[0][1],self.x[1]])
            self.pinger2_distance.set_data([pingers_map[1][0],self.x[0]], [pingers_map[1][1],self.x[1]])
            print("Pingers Update")

        ## PLOT
        # Update trajectory path
        self.traj_x.append(self.x[0])
        self.traj_y.append(self.x[1])

        # Update the drawing
        # Update robot believe mean
        self.anim_path.set_data(self.traj_x, self.traj_y)
        self.anim_robot.set_transform(trans.Affine2D().rotate(self.x[2]) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        

        #Update robot believe ellipse (Uncertainty)
        v, w = np.linalg.eigh(self.P[0:2,0:2])
        v = 2.0 * np.sqrt(5.991) * np.sqrt(v) # 5.991 is the chi-square value for 95% confidence in 2DOF
        u = w[0] / np.linalg.norm(w[0])
        ell_angle = np.arctan2(u[1] , u[0])
        self.anim_robot_cov.width = v[0]
        self.anim_robot_cov.height = v[1]
        self.anim_robot_cov.set_transform( trans.Affine2D().rotate(ell_angle) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        # Update robot ground truth position
        self.anim_gt_robot.set_transform(trans.Affine2D().rotate(self.gt_path[idx][2]) + trans.Affine2D().translate(self.gt_path[idx][0],self.gt_path[idx][1])+  self.ax.transData )
     
        return self.anim_path, self.anim_gt_robot, self.anim_robot_cov, self.anim_robot, self.gps_readings, self.pinger1_distance, self.pinger2_distance



In [20]:
plt.plot()
x_init = np.array([0.0,0.0,0.0])
P_init = np.eye(3)*0.02
odom_data = ReadOdomCSV("DataSet1/odom.csv")
[gt_data,gt_time] = ReadGroundTruthCSV("DataSet1/ground_truth.csv")
gps_data = ReadGPSCSV("DataSet1/gps.csv")
[pings_data, pingers_map] = ReadPingsCSV("DataSet1/pings.csv")
robot4 = FusionEKFAnimation(x_init, P_init, odom_data, gt_data, pings_data, pingers_map, gps_data)

pings_anim = anim.FuncAnimation(robot4.fig, robot4.iterate_anim, range(0,len(odom_data)), interval=2 , blit=True, repeat=False)
plt.show()

data len 4799


<IPython.core.display.Javascript object>

# PART 2 : Data Association

So far, we have implemented an EKF localization algorithm using GPS (direct state measurements) and beacons (map measurements). In these two cases, the data association was solved: The GPS is known to measure the x and y position in world coordinates, and the pingers measurements contained the distance and an identifier. However, in most of the cases, measurements from the environment will not provide a unique identifier. So in order to perform an update, we need to solve **which map feature corresponds to each observation**. This is called the **Data Association** problem.

In the next exercise, we will use a DataSet where 4 pingers which provide distance-only measurements are used. Pingers positions in the world are known. The diference now is that the measurement does not provide the identifier of the pinger that we are measuring! Furthermore, the sensor is simulated to have a limited range (i.e., we will not get the 4 measurements always).

In this exercise, you are requested to adapt the previous implementation of EKF to incorporate data association. We will implement ICNN (Individual Compatibility Nearest Neighbour), following the pseudocode of Pere's slides.

First, we define a new class to encapsulate our measurements. Note that the methods to implement are not **class methods** (they do not use self). This are **static methods** which do not require an object to be instanciated in order to use them. This makes sense, since no object data or methods (i.e., anything using self) are necessary.

Now, the **expected_measurement**, **Jhx** and **Jhv** don't get the map from parameters, which was used in the class methods to get the associated feature based on the object id). Now, these methods will require the feature pose from parameters.

As always, the CSV reader is already provided.


In [21]:
class PingerMeasurement:
    def __init__(self, time, z, R):
        self.time = time
        self.z = z
        self.R = R

    '''
    Implements z = h(x)
    z is a vector of length k
    Input:
    - x_r: numpy.array of shape (n,) --> the robot state
    - x_f: a np.array of shape (2,) representing the positions [x,y] of the pinger
    return: numpy.array of shape (k,) --> the expected measurement
    '''
    def expected_measurement(self, x_r, x_f):
        return np.linalg.norm(x_r[0:2] - x_f) 

    '''
    Implements the Jacobian of h(x,v) with respect to x, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    - x_f: a np.array of shape (2,) representing the positions [x,y] of the pinger
    return: numpy matrix of shape (k, n)
    '''
    def Jhx(self, x_r, x_f) -> np.ndarray:
        return np.append((x_r[0:2] - x_f)/np.linalg.norm(x_r[0:2] - x_f),0)
    
    '''
    Implements the Jacobian of h(x,v) with respect to v, at point x
    Input:
    - x: numpy.array of shape (n,) --> the robot state
    return: numpy matrix of shape (k, b), where
    '''
    def Jhv(self, x_r = None) -> np.ndarray:
        return np.array([1])
    
    
def ReadPingsNoIDCSV(csv_file):
    data = []
    pingers_map = []
    with open(csv_file, 'r') as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader) #Skip header
        row = next(csv_reader) #Map
        num_pingers = int(row[0])
        for i in range(num_pingers):
            pingers_map.append(np.array([row[1 + 2 * i],row[2 + 2*i]]).astype(float))
        for row in csv_reader:
            time = float(row[0])
            num_pings = int(row[1])
            pings = []
            for i in range(num_pings):
                z = np.array([row[2 + i * 2]]).astype(float)
                R = np.array([[row[3 + i * 2]]]).astype(float)
                pings.append(PingerMeasurement(time, z, R))
            data.append(pings)
           
    return data, pingers_map


The following class encapsulates the animation loop for the EKF with data association exercise. Two blocks are required to be completed:

First, the method **data_association**. This method is the key of the exercise, and is supposed to get a list of pinger measurements and a map and return a hypotesis, which will be a list of integers. For any measurement with position **i** in the pinger measurements list, the hypotesis at position **i** has to be the position of the associated pinger in the map list. 

Second, rebuild the update step with the new methods (should be very similar!)

In [22]:
import scipy.stats

class PingsDAEKFAnimation(DiffDriveOdomEKF):
    def __init__(self, x_init, P_init, odom_data, gt_path, pings_data, pingers_map) -> None:
        super().__init__(x_init, P_init)

        self.traj_x = []
        self.traj_y = []
        
        self.odom_data = odom_data
        self.gt_path = gt_path
        self.pings_data = pings_data
        self.pingers_map = pingers_map
        # Drawing preparation (figure with one subplot)
        self.fig = plt.figure() 
        self.ax = self.fig.add_subplot(111, autoscale_on=False, xlim=(-0.5, 3), ylim=(-0.5, 3))
        self.ax.set_title('EKF using distance-only pingers and data association')
        self.ax.set_aspect('equal')
        self.ax.set(xlabel='x[m]', ylabel='y[m]')
        self.ax.grid()

        self.anim_path, = self.ax.plot([], [], 'r-', lw=1) # Line for displaying the path
        self.anim_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='r')
        self.ax.add_patch(self.anim_robot)

       
        gt_x = [x[0] for x in self.gt_path]
        gt_y = [x[1] for x in self.gt_path]
        self.anim_gt_path, = self.ax.plot(gt_x, gt_y, 'b-', lw=1) # Line for displaying the path

        self.anim_gt_robot = patches.Rectangle((-0.1,-0.05), 0.2, 0.1, angle=0, color='b')
        self.anim_robot_cov = patches.Ellipse((0.0, 0.0), 0.9, 0.5, 0, edgecolor='r', lw=1, facecolor='none')
        self.ax.add_patch(self.anim_robot_cov)
        self.ax.add_patch(self.anim_gt_robot)

        # Extra drawings
        self.pingers_map_pos = self.ax.scatter([pingers_map[0][0],pingers_map[1][0], pingers_map[2][0], pingers_map[3][0]], [pingers_map[0][1],pingers_map[1][1], pingers_map[2][1], pingers_map[3][1]], c = 'black', marker = "h",zorder=5) # Pingers Locations
        
        self.ax.annotate("1", (pingers_map[0][0]+0.05, pingers_map[0][1]+0.05))
        self.ax.annotate("2", (pingers_map[1][0]-0.1, pingers_map[1][1]-0.1))
        self.ax.annotate("3", (pingers_map[2][0]+0.05, pingers_map[2][1]+0.05))
        self.ax.annotate("4", (pingers_map[3][0]+0.05, pingers_map[3][1]+0.05))
        
        self.pinger1_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 1
        self.pinger2_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 2
        self.pinger3_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 3
        self.pinger4_distance, = self.ax.plot([], [], '--', lw=1) # Distance measurements from pinger 4

        



    '''
    Perform Data Association! 
    Input:
    - pings: list of PingerWithIDMeasurement objects, which do not have the field id filled (i.e., id = None)
    - map: list of pinger/beacon positions in the world (i.e., the map)
    - alpha: the confidence level to perform the chi-square test.
    Return: No return. The list of pingers is updated so the id of those measurements with a map correspondance is updated
    with the index of the feature in the map
    '''
    def data_association(self, pings, map_f, alpha = 0.1):
        hyp = [-1 for p in pings]
             
        for i in range(len(pings)):
            Dmin = math.inf
            nearest = -1 
            for j in range(len(map_f)):
                Jhxj = pings[i].Jhx(self.x, map_f[j])  #... This variable is the jacobian of h() in the current robot believe and map feature j
                hfj =  pings[i].expected_measurement(self.x,map_f[j]) #... The expected measurement if the associated map feature is j
                Pfj =  Jhxj@self.P@Jhxj.transpose() #... The expected measurement uncertainty if the associated map feature is j
        
                #Mahalanobis

                vij = pings[i].z - hfj
                sij = Pfj + pings[i].R
                
                Dij = vij.transpose()@np.linalg.inv(sij)@vij #... This variable is the mahalanobis distance
                
                if Dij < Dmin:
                    nearest = j
                    Dmin = Dij 
            hyp[i] = nearest

            ## Individually Compatible Check
            #chi_val = scipy.stats.chi2.ppf(1 - confidence, df=1) #This should not be here..
            #if Dmin < chi_val:
            #    hyp[i] = nearest

        return hyp

        
    def iterate_anim(self, idx):

        
        ## EKF
        # prediction
        odom = self.odom_data[idx]
        [self.x, self.P] = self.prediction(odom.u, odom.Q, odom.dt)
        # update
        z   = np.empty((0,))
        R   = np.empty((0, 0))
        hx  = np.empty((0,))
        Jhx = np.empty((0, self.x.shape[0]))
        Jhv = np.empty((0, 0))
        
        pings = self.pings_data[idx]
        hyp = self.data_association(pings, self.pingers_map)
        
        # Do update given measurements and hypotesis! 
       
        
        if len(pings) > 0:
            for i in range(len(hyp)):
                z = np.concatenate([z, pings[i].z]) #... p.z ...
                R = block_diag(R,pings[i].R)        #... p.R ...
                hx = np.append(hx, pings[i].expected_measurement(self.x, self.pingers_map[hyp[i]]))  #... p.expected_measurement(self.x, self.pingers_map) ...
                Jhx = np.vstack((Jhx, pings[i].Jhx(self.x, self.pingers_map[hyp[i]])))               #... p.Jhx(self.x, self.pingers_map) ...
                Jhv = block_diag(Jhv, pings[i].Jhv())                                                #... p.Jhv() ...

        # Plotting the range measurements to the respective landmarks
        for i in hyp:
            if i == 0:
                self.pinger1_distance.set_data([self.pingers_map[0][0],self.x[0]], [self.pingers_map[0][1],self.x[1]])
            if 0 not in hyp:
                self.pinger1_distance.set_data([],[])
            if i == 1:
                self.pinger2_distance.set_data([self.pingers_map[1][0],self.x[0]], [self.pingers_map[1][1],self.x[1]])
            if 1 not in hyp:
                self.pinger2_distance.set_data([],[])
            if i == 2:
                self.pinger3_distance.set_data([self.pingers_map[2][0],self.x[0]], [self.pingers_map[2][1],self.x[1]])
            if 2 not in hyp:
                self.pinger3_distance.set_data([],[])
            if i == 3:
                self.pinger4_distance.set_data([self.pingers_map[3][0],self.x[0]], [self.pingers_map[3][1],self.x[1]])
            if 3 not in hyp:
                self.pinger4_distance.set_data([],[])
        
        print("AssociatedPingers Update")

        [self.x, self.P] = self.update(z, R, hx, Jhx, Jhv)

        ## PLOT
        # Update trajectory path
        self.traj_x.append(self.x[0])
        self.traj_y.append(self.x[1])

        # Update the drawing
        # Update robot believe mean
        self.anim_path.set_data(self.traj_x, self.traj_y)
        self.anim_robot.set_transform(trans.Affine2D().rotate(self.x[2]) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        #Update robot believe ellipse (Uncertainty)
        v, w = np.linalg.eigh(self.P[0:2,0:2])
        v = 2.0 * np.sqrt(5.991) * np.sqrt(v) # 5.991 is the chi-square value for 95% confidence in 2DOF
        u = w[0] / np.linalg.norm(w[0])
        ell_angle = np.arctan2(u[1] , u[0])
        self.anim_robot_cov.width = v[0]
        self.anim_robot_cov.height = v[1]
        self.anim_robot_cov.set_transform( trans.Affine2D().rotate(ell_angle) + trans.Affine2D().translate(self.x[0],self.x[1])+  self.ax.transData )
        
        # Update robot ground truth position
        self.anim_gt_robot.set_transform(trans.Affine2D().rotate(self.gt_path[idx][2]) + trans.Affine2D().translate(self.gt_path[idx][0],self.gt_path[idx][1])+  self.ax.transData )
     
        return self.anim_path, self.anim_gt_robot, self.anim_robot_cov, self.anim_robot, self.pinger1_distance, self.pinger2_distance, self.pinger3_distance, self.pinger4_distance

In [23]:
plt.plot()
x_init = np.array([0.0,0.0,0.0])
P_init = np.eye(3)*0.02
odom_data = ReadOdomCSV("DataSet2/odom.csv")
[gt_data,gt_time] = ReadGroundTruthCSV("DataSet2/ground_truth.csv")
[pings_data, pingers_map] = ReadPingsNoIDCSV("DataSet2/pings_no_id.csv")

robot5 = PingsDAEKFAnimation(x_init, P_init, odom_data, gt_data, pings_data, pingers_map)

pings_anim = anim.FuncAnimation(robot5.fig, robot5.iterate_anim, range(0,len(odom_data)), interval=2 , blit=True, repeat=False)
plt.show()

data len 4799


<IPython.core.display.Javascript object>