# Extended Kalman Filter

Use uncertain information (sensor data) and knowledge of how the system behaves (educated guess) to understand the state of a system which is continuously changing

eg, combine
- an intermittent and imprecise GPS signal
- prediction of what will _probably_ happen when you enter a certain input, such as 'forward'
- noisey but high frequency sensor data from an IMU

to get a very accurate estimate of the current position and velocity of a system

KF assumes that the variables are random and Gaussian distributed with a mean value $\mu$ and variance/uncertainty $\sigma^2$. However, KF relies on the fact that the variables involved are related in some way - eg. position and velocity

Following a guide from https://www.bzarg.com/p/how-a-kalman-filter-works-in-pictures/ . Equation numbers correspond with this guide

## The State Estimate

$$ x_k = \begin{bmatrix} \theta \\ \dot{\theta} \\ \ddot{\theta} \end{bmatrix} $$

where $\theta$ is the absolute angle to the object

## The Covariance Matrix

$$ P = \begin{bmatrix} \Sigma_{pp} & \Sigma_{pv} & \Sigma_{pa} \\ 
                       \Sigma_{vp} & \Sigma_{vv} & \Sigma_{va} \\
                       \Sigma_{ap} & \Sigma_{av} & \Sigma_{aa} \end{bmatrix}$$

where $ \Sigma_{ij} $ is the degree of variance between the _i'th_ and _j'th_ state variable.

## The Prediction Matrix, Control Matrix and Control Vector

Indicates how we move from one state to the next

$$ \begin{split}
   p_k &= p_{k-1} + \Delta t \cdot v_{k-1} \\
   v_k &= v_{k-1} + \Delta t \cdot a_{k-1} \\
   a_k &= \beta a_{k-1}
   \end{split}
$$

where $\beta$ represents constant acceleration ($\beta = 1$), no acceleration ($\beta = 0$) or a decaying acceleration ($0 \lt \beta \lt 1$)

Thus,

$$ \begin{split} \hat{x}_k &= \begin{bmatrix} 1 & \Delta t & 0 \\
                                              0 & 1 & \Delta t \\
                                              0 & 0 & \beta \end{bmatrix}\hat{x}_{k-1} \\ \\
                           &= F_k\ \hat{x}_{k-1} \end{split} $$

where $F_k$ is the prediction matrix

In other words, the new best estimate is a prediction made from the previous best estimate

## Factoring in Uncertainty

Eg. uncertainty from modelling, noise, etc

Update the prediction matrix as follows,

$$ P_k = F_k\ P_{k-1}\ F_k^T + Q_k$$

In other words, the new uncertainty is predicted from the old uncertainty, with some additional uncertainty from the environment

## The Sensor Uncertainty Matrix

$$ \mu_{expected} = H_k\ \hat{x}_k $$
$$ \Sigma_{expected} = H_k\ P_k\ H_k^T $$
$$ y = z - h(x) $$

where $H_k$ is a matrix which models the sensors

In [None]:
import numpy as np

In [None]:
class KalmanFilter():
    """
    Inputs:
        Ts      Sampling time of the model (used in `predict()` method)
        Q, R    Uncertainty matrices.
                Higher Q = more uncertainty in model.
                Higher R = more uncertainty in sensor = slower response to measurements
        a       Acceleration change from one instant to the next. `a = 0` for no acceleration, or `a = 1` for constant
        
    Keeps track of:
        x       the state estimate, θ_pos, θ_vel and θ_acc. Measured in degrees
        P       the covariance matrix
    """
    def __init__(self, Ts, Q, R, a=1):
        self.Ts = Ts
        self.x = np.matrix([[0],  # position
                            [0],  # velocity
                            [0]]) # acceleration
        self.P = np.matrix(np.eye(3))
        self.F = np.matrix([[1, Ts,  0],
                            [0,  1, Ts],
                            [0,  0,  a]])
        self.Q = np.matrix([[Q*(Ts**2),    0, 0],
                            [        0, Q*Ts, 0],
                            [        0,    0, Q]])
        self.R = R
        
        
    def predict(self):
        """
            get from x[k-1] to x[k] by using knowledge of how the system behaves 
            x = F*x
            P = F*P*F' + Q
        """
        self.x = self.F * self.x
        self.P = (self.F * self.P * self.F.T) + self.Q

    
    def update(self, z):
        """
            get from x[k] to the best possible estimate of the system by using sensor data
            y = z - h(x)
            K = P*H' * inv(H*P*H' + R)
            x = x + K*(z - H*x)
            P = P - K*H*P
            
            where z = the pixel position, scaled from -1 (pixel 0) to 1 (end pixel)
        """
        y = z - self.h(self.x)
        H = self.h_dash(self.x)
        
        K = (self.P * H.T) / (H * self.P * H.T + self.R)
        self.x = self.x  +  K * y
        self.P = (np.eye(3) - K * H) * self.P
    
    def h(self, x): return x.item(0)
    def h_dash(self, x): return np.matrix([1, 0, 0])

    def get_pos(self): return self.x.item(0)
    def get_vel(self): return self.x.item(1)
    def get_acc(self): return self.x.item(2)
    
    def predict_ahead(self, T):
        """ predict T seconds ahead using the constant acceleration model """
        x = np.matrix(np.copy(self.x))
        F = np.matrix(np.copy(self.F))

        F[0,1], F[1,2] = T/5, T/5
        
        for _ in range(5): # run the loop 5 times for slightly more accurate integration
            x = F * x
        
        return x.item(0)

In [None]:
class ExtendedKalmanFilter(KalmanFilter):
    """
    Inputs:
        Ts                  Sampling time of the model (used in `predict()` method)
        Q, R                Uncertainty matrices
                            Higher Q = more uncertainty in model.
                            Higher R = more uncertainty in sensor = slower response to measurements
        a                   Acceleration change from one instant to the next
                            `a = 0` for no acceleration, or `a = 1` for constant
        camera_FOV_deg      The angular width of the camera. picam width, height = 62.2ᵒ, 48.8ᵒ
        
    Keeps track of:
        x       the state estimate, θ_pos, θ_vel and θ_acc. Measured in degrees
        P       the covariance matrix
    """
    def __init__(self, Ts, Q, R, a=1, camera_FOV_deg=62.2):
        super().__init__(Ts, Q=Q, R=R, a=a)  # calling init method in KalmanFilter
        self.camera_FOV_rad = np.deg2rad(camera_FOV_deg)

    def update(self, z):
        """
            get from x[k] to the best possible estimate of the system by using sensor data
            y = z - h(x)
            K = P*H' * inv(H*P*H' + R)
            x = x + K*(z - H*x)
            P = P - K*H*P
            
            where z = the pixel position, scaled from -1 (pixel 0) to 1 (end pixel)
        """
        y = self.tmp(z) - self.h(self.x)
        H = self.h_dash(self.x)

        K = (self.P * H.T) / (H * self.P * H.T + self.R)
        self.x = self.x  +  K * y
        self.P = (np.eye(3) - K * H) * self.P

    def get_pos(self): return np.rad2deg(self.x.item(0))
    def get_vel(self): return np.rad2deg(self.x.item(1))
    def get_acc(self): return np.rad2deg(self.x.item(2))

    def h(self, x):  # overriding method in KalmanFilter
        """ convert x, an angle, into a pixel """
        return np.tan(x.item(0)) / np.tan(self.camera_FOV_rad/2)
    
    def tmp(self, x):
        return np.tan(np.deg2rad(x)) / np.tan(self.camera_FOV_rad/2)
    
    def h_dash(self, x):  # overriding method in KalmanFilter
        _h_dash = (1 + np.power(np.tan(x.item(0)),2)) / np.tan(self.camera_FOV_rad/2)
        return np.matrix([_h_dash, 0, 0])

## Debugging

In [None]:
if __name__ == '__main__':
    Ts   = 0.050
    T_nn = 0.150
    sim_time = 15
    nn_delay = 10  # nn_delay measured in units of Ts
    FOV_deg  = 62.2  # degrees

    def noise(scale): return (np.random.random() - 0.5)*scale

    def ref_unit_sigmoid(t, sim_time): return 2/(1 + 3**(-t + sim_time/2)) - 1 #+ noise(0.2)
    def ref_unit_step(t, sim_time): return 1 #+ noise(0.2)
    def ref_unit_sin(t, sim_time): return np.sin(t*0.8) #+ noise(0.2)
    
    def pixel_to_angle(norm_pixel, cam_fov_rad):
        return np.rad2deg(np.arctan(norm_pixel * np.tan(cam_fov_rad/2)))

    nn_input_funcs = [ref_unit_sigmoid, ref_unit_step, ref_unit_sin]

In [None]:
if __name__ == '__main__':
    def derivative(f):
        df = [(x - y)/0.1 for x, y in zip(f[1:], f[0:-1])]
        df.append(df[-1])
        return df

    def plot_simulation():
        plt.subplot(311)
        plt.plot(t_arr, nn_input_arr, label='input')
        plt.plot(t_arr, KF_pos, label='KF angle position')
        plt.plot(t_arr, EKF_pos, label='EKF angle position')
        plt.legend(loc='lower right'); plt.grid()

        plt.subplot(312)
        plt.plot(t_arr, derivative(nn_input_arr), label='input velocity')
        plt.plot(t_arr, KF_vel, label='KF angle velocity')
        plt.plot(t_arr, EKF_vel, label='EKF angle velocity')
        plt.legend(loc='lower right'); plt.grid()

        plt.subplot(313)
        plt.plot(t_arr, derivative(derivative(nn_input_arr)), label='input acceleration')
        plt.plot(t_arr, KF_acc, label='KF angle acceleration')
        plt.plot(t_arr, EKF_acc, label='EKF angle acceleration')
        plt.legend(loc='lower right'); plt.grid()

        fig = plt.gcf(); fig.set_size_inches(18.5, 6, forward=True)

        plt.show()

In [None]:
if __name__ == '__main__':
    import matplotlib.pyplot as plt
    
    for nn_input_func in nn_input_funcs:
        KF = KalmanFilter(Ts=Ts, Q=0.5, R=0.05, a=0.94)
        EKF = ExtendedKalmanFilter(Ts=Ts, Q=0.5, R=0.05, a=0.94, camera_FOV_deg=FOV_deg)
        
        KF_pos, KF_vel, KF_acc = [], [], []
        EKF_pos, EKF_vel, EKF_acc = [], [], []
        nn_input_arr, t_arr = [], []
        
        nn_input_delayed = np.zeros(nn_delay)
        
        for t in np.arange(0, sim_time, Ts):
            nn_input = nn_input_func(t, sim_time)  # get input
            nn_input_delayed[0] = nn_input  # put at front of buffer
            nn_input_delayed = np.roll(nn_input_delayed, 1)  # roll buffer

            if (t % T_nn < 0.01) or -0.01 < abs(t % T_nn) - T_nn < 0.01:
                KF.predict();
                KF.update(pixel_to_angle(nn_input_delayed[-1] + noise(0.2), np.deg2rad(FOV_deg)))
                EKF.predict();
                EKF.update(pixel_to_angle(nn_input_delayed[-1] + noise(0.2), np.deg2rad(FOV_deg)))
            else:
                KF.predict()
                EKF.predict()

            nn_input_arr.append(pixel_to_angle(nn_input, np.deg2rad(FOV_deg)))
            KF_pos.append(KF.get_pos()); EKF_pos.append(EKF.get_pos())
            KF_vel.append(KF.get_vel()); EKF_vel.append(EKF.get_vel())
            KF_acc.append(KF.get_acc()); EKF_acc.append(EKF.get_acc())
            t_arr.append(t)
            
        plot_simulation()