# Imports

In [None]:
import math

from IPython.display import HTML
import numpy as np 
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import scipy.stats as stats

from tools.simulators import CarSimulator
import tools.plot_tools as plot_tools
%matplotlib inline

# Kalman Filter Class

In [None]:
class KalmanFilter:
    '''Kalman filter class'''
    def __init__(self, dim_state=1, 
                       dim_meas=1):

        self.x = np.zeros((dim_state, 1)) # state
        self.P = np.eye(dim_state) # uncertainty covariance
        self.Q = np.eye(dim_state) # process noise covariance
        self.F = np.eye(dim_state) # system matrix
        self.H = np.zeros((dim_meas, dim_state)) # measurement matrix
        self.R = np.eye(dim_meas)               # measurement uncertainty
        
    
    def predict(self, F = None, Q=None):
        
        if F is None:
            F = self.F
        if Q is None:
            Q = self.Q
        
        x = self.x
        P = self.P

        self.x = F@x # state prediction
        self.P = F@P@F.transpose() + Q # covariance prediction
        return self.x, self.P

    def update(self, z, R=None, H=None):
        # update state and covariance with associated measurement
        if H is None:
            H = self.H
        if R is None:
            R = self.R
        
        x = self.x
        P = self.P
        I = np.eye(x.shape[0])

        gamma = z - H@x # residual
        S = H@P@H.transpose() + R # covariance of residual
        K = P@H.transpose()@np.linalg.inv(S) # Kalman gain
        self.x = x + K@gamma # state update
        self.P = (I - K@H) @ P # covariance update
        return self.x, self.P     
        

# Example in one dimension

In [None]:
np.random.seed(13)
dt = 1.0

x0 = np.array([[0.0],
               [1.0]])

kf = KalmanFilter(dim_state=2,dim_meas=1)

kf.x = x0

kf.P = np.array([[400, 0],
              [0, 0]])

kf.F = np.array([[1, dt],
              [0, 1]])

kf.Q = np.array([[2, 0],
              [0, 0]])

kf.H = np.array([[1, 0]])

kf.R = np.array([[4.5]])

process_std = math.sqrt(2)
sensor_std = math.sqrt(4.5)
mean = 0
std = 20

N = 25

car = CarSimulator(x0 = x0[0,0], vel = x0[1,0] , meas_std=sensor_std, process_std =process_std)
gts, zs = car.simulate_steps(N)

xs, priors = np.zeros((N, 2)), np.zeros((N, 2))

for i, z in enumerate(zs):
    prior, P = kf.predict()
    priors[i,0] = prior[0,0]
    priors[i,1] = P[0,0]
    posterior, P = kf.update(z)  
    xs[i,0] = posterior[0,0]
    xs[i,1] = P[0,0]

plot_tools.plot_measurements(zs)
plot_tools.plot_measurements(gts, color="green", label="GT")
plot_tools.plot_filter(xs[:, 0], var=priors[:, 1])
plot_tools.plot_predictions(priors[:, 0])
plot_tools.show_legend()
plt.xlim()

## Interactive example

In [None]:
import ipywidgets
from ipywidgets import interact
def plot_kalman_filter(start_pos, 
                       sensor_noise, 
                       velocity,
                       process_noise):
    N = 25
    np.random.seed(303)
    car = CarSimulator(start_pos, velocity, math.sqrt(sensor_noise), math.sqrt(process_noise))
    
    kf = KalmanFilter(2,1)
    kf.x = np.array([[start_pos],
               [velocity]])
    kf.P = np.array([[1000, 0],
                [0, 0]])
    kf.F = np.array([[1, dt],
                [0, 1]])
    kf.Q = np.array([[process_noise, 0],
                [0, 0]])
    kf.H = np.array([[1, 0]])
    kf.R = np.array([[sensor_noise]])


    gts, meas = car.simulate_steps(N)
    ps = []
    for z in meas:    
        prior, P = kf.predict()
        pos, P = kf.update(z) 

        ps.append(pos[0,0])

    plt.figure()
    plt.plot(meas, c='k', marker='o', linestyle='', label='measurement')
    plt.plot(gts, c='green', marker='x', linestyle='', label='ground truth')
    plt.plot(ps, c='#004080', alpha=0.7, label='filter')
    plt.legend(loc=4)
    plt.show()

def FloatSlider(value, **kwargs):
    """
    Creates an ipwidgets FloatSlider with continuous update
    turned off
    """
    return ipywidgets.FloatSlider(value, continuous_update=False, **kwargs)

interact(plot_kalman_filter,
         start_pos=(-10, 10), 
         sensor_noise=FloatSlider(value=5, min=0., max=100), 
         velocity=FloatSlider(value=1, min=-2., max=2.), 
         process_noise=FloatSlider(value=.1, min=0, max=40));

## Animation

In [None]:

def plot_video_graph(x_values, priors, vars_prior, posteriors, vars_pos, gts=None, meas=None, frames_per_second=10):
    # Create a figure and axis object
    fig, ax = plt.subplots(2,1)

    # Initialize the line object
    line,  = ax[0].plot([],[], lw=2)
    gt_scat,  = ax[0].plot([],[], c='green', marker='x', linestyle='')
    meas_scat, = ax[0].plot([],[],c='black', marker='o', linestyle='')
    text_up = ax[0].text(0.5, 0.95, 'My Animation', ha='center', va='top', transform=ax[0].transAxes)
    
    process_pdf_line, = ax[1].plot([],[],c='green', lw=2)
    meas_pdf_line, = ax[1].plot([],[], c='red', lw=2)
    filter_pdf_line, = ax[1].plot([],[], c='blue', lw=2)
    text_bot = ax[1].text(0.5, 0.95, 'My Animation', ha='center', va='top', transform=ax[1].transAxes)

    xs_pdf = np.arange(np.min(posteriors)-3, np.max(posteriors)+5, (np.max(posteriors)+5 - np.min(posteriors)+3) / 1000.)
    # Set the x and y limits of the plot
    ax[0].set_xlim(np.min(x_values), np.max(x_values))
    ax[0].set_ylim(np.min(posteriors)-5, np.max(posteriors)+5)
    ax[1].set_xlim(np.min(posteriors)-3, np.max(posteriors)+5)
    ax[1].set_ylim(0, 1)
    # Set the x and y labels of the plot
    ax[0].set_xlabel("time")
    ax[0].set_ylabel("pos")
    plt.close()
    # Define the update function for the animation
    def update(frame):
        state = frame % 4
        frame = frame // 4
        if state == 0:
            gt_scat.set_data(x_values[:frame], gts[:frame])
            text_up.set_text('Move')
            text_bot.set_text('Move')
        elif state == 1:
            n = stats.norm(priors[frame], math.sqrt(vars_prior[frame]))
            process_pdf_line.set_data(xs_pdf, n.pdf(xs_pdf))
            text_up.set_text('Predict')
            text_bot.set_text('Predict')
        elif state == 2:
            n = stats.norm(meas[frame], math.sqrt(4.5))
            meas_pdf_line.set_data(xs_pdf, n.pdf(xs_pdf))
            meas_scat.set_data(x_values[:frame], meas[:frame])
            text_up.set_text('Measure')
            text_bot.set_text('Measure')
        elif state == 3:
            n = stats.norm(posteriors[frame], math.sqrt(vars_pos[frame]))
            filter_pdf_line.set_data(xs_pdf, n.pdf(xs_pdf))
            line.set_data(x_values[:frame], posteriors[:frame])
            meas_pdf_line.set_data([], [])
            process_pdf_line.set_data([], [])
            text_up.set_text('Correct')
            text_bot.set_text('Correct')
        return line,
    # Create the animation object
    
    animation = FuncAnimation(fig, update, frames=4*len(posteriors), interval=1000/frames_per_second, blit=True)

    # Convert the animation to HTML5 video
    html_video = HTML(animation.to_jshtml())

    # Display the video in Jupyter
    display(html_video)


np.random.seed(14)
dt = 1.0
process_var = 2
sensor_var = 4.5
mean = 0
std = 20
car = CarSimulator(x0 = x0[0,0], vel = x0[1,0] , meas_std=math.sqrt(sensor_var), process_std =math.sqrt(process_std))
kf = KalmanFilter(2,1)

kf.x = np.array([[mean],
               [1]])
kf.P = np.array([[std**2, 0],
              [0, 0]])

kf.F = np.array([[1, dt],
              [0, 1]])

kf.Q = np.array([[2, 0],
              [0, 0]])

kf.H = np.array([[1, 0]])

kf.R = np.array([[4.5]])



N = 25
ts = np.linspace(0,N*dt,N)


gts, meas = car.simulate_steps(N)

priors = []
vars_priors = []
posteriors = []
vars_pos = []

for z in meas:    
    prior, prior_P = kf.predict()
    pos, pos_P = kf.update(z) 

    priors.append(prior[0,0])
    vars_priors.append(prior_P[0,0])
    posteriors.append(pos[0,0])
    vars_pos.append(pos_P[0,0])

plot_video_graph(ts, priors,vars_pos, posteriors,vars_pos, gts=gts, meas=meas, frames_per_second=5)


## Kalman filter in 2 Dimensions

In [None]:
np.random.seed(5)
N = 25
q = 1
dt = 1

pos_x = 0.0
pos_y = 0.0
vx = 1.0
vy = 1.0

sigma_x = q
sigma_y = q

meas_sig_x = 2
meas_sig_y = 2

x0 = np.array([[pos_x],
               [pos_y],
               [vx],
               [vy]])

process_var = np.array([[sigma_x**2],
                        [sigma_y**2]])

sensor_var = np.array([[meas_sig_x**2],
                       [meas_sig_y**2]])

process_std = np.sqrt(process_var)
sensor_std = np.sqrt(sensor_var)

car = CarSimulator(x0=x0[:2], vel=np.array([[vx], [vy]]), process_std=process_std, meas_std=sensor_std)

kf = KalmanFilter(dim_state=4, dim_meas=2)

kf.x = x0
kf.F = np.array([[1, 0, dt, 0],
                 [0, 1, 0, dt],
                 [0, 0, 1, 0],
                 [0, 0, 0, 1]])

q1 = ((dt**3)/3) * q 
q2 = ((dt**2)/2) * q 
q3 = dt * q 
kf.Q = np.array([[q1, 0, q2, 0],
                    [0, q1, 0, q2],
                    [q2, 0, q3, 0],
                    [0, q2, 0,  q3]])


kf.P = np.array([[sigma_x**2, 0, 0, 0],
                 [0, sigma_y**2, 0, 0],
                 [0, 0, 2**2, 0],
                 [0, 0, 0, 2**2]])

kf.H = np.array([[1,0,0,0],
                 [0,1,0,0]])

kf.R = np.array([[meas_sig_x**2, 0], # measurement noise covariance matrix
                 [0, meas_sig_y**2]])

gts, meas = car.simulate_steps(N)

priors = np.zeros((N,2,1))
var_priors = np.zeros((N,2,2))

posteriors = np.zeros((N,2,1))
var_pos = np.zeros((N,2,2))

for i ,(gt, z) in enumerate(zip(gts, meas)):  

    # predict
    prior, prior_P = kf.predict()

    # update
    pos, pos_P = kf.update(z[:,np.newaxis])
    priors[i,:,:] = prior[:2,:] 
    var_priors[i,:,:] = prior_P[:2,:2]

    posteriors[i,:,:] = pos[:2,:] 
    var_pos[i,:,:] = pos_P[:2,:2]

print(posteriors.shape)


## Animation

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scipy.stats import multivariate_normal

def animate_gaussian(priors, prior_vars, posteriors, pos_vars, meas, cov_meas):
    # Set up the figure and axis
    fig, ax = plt.subplots()

    ax.set_xlabel('x [m]')
    ax.set_ylabel('y [m]')
    ax.set_ylim([-30, 30])
    ax.set_xlim([-30, 30])


    # Create the multivariate Gaussian
    def multivariate_gaussian(x, y, mean, cov):
        return multivariate_normal(mean.flatten(), cov).pdf(np.dstack((x, y)))
    
    # Initialize the multivariate Gaussian parameters
    mean_prior = priors[0]
    mean_pos = posteriors[0]
    mean_meas = meas[0]


    # Create the meshgrid for the multivariate Gaussian
    x = np.linspace(-30, 30, 1000)
    y = np.linspace(-30, 30, 1000)
    X, Y = np.meshgrid(x, y)

    # Create the plot
    data = np.zeros((10, 10))
    cs1 = ax.contour(data, cmap='Greens', alpha=0.5, levels=10)
    cs2 = ax.contour(data, cmap='Reds', alpha=0.5, levels=10)
    cs3 = ax.contour(data, cmap='Blues', alpha=0.5, levels=10)

    # texts
    t1 = ax.text(mean_prior[0,0],mean_prior[1,0]+5, "", ha='center', va='top', color="green", fontsize=10)
    t2 = ax.text(mean_meas[0],mean_meas[1], "", ha='center', va='center', color="red", fontsize=10)
    t3 = ax.text(mean_pos[0],mean_pos[1]-5, "", ha='center', va='bottom', color="blue", fontsize=10)

    p = [ cs1, cs2, cs3]

    plt.close()
    # Define the animation function
    def animate(frame):
        def delete_contour(contour):
            for tp in contour.collections:
                tp.remove()

        state = frame % 4
        frame = frame // 4

        mean_prior = priors[frame]
        cov_prior = prior_vars[frame]
        mean_pos = posteriors[frame]
        cov_pos = pos_vars[frame]
        mean_meas = meas[frame]

        Z_prior = multivariate_gaussian(X, Y, mean_prior, cov_prior)
        Z_pos = multivariate_gaussian(X, Y, mean_pos, cov_pos)
        Z_meas = multivariate_gaussian(X, Y, mean_meas, cov_pos)
        

        if state == 0:
            ax.set_title(f'Frame {frame}: Move')
        elif state == 1:
            p[0] = ax.contour(X, Y, Z_prior, cmap='Greens', alpha=0.5, levels=10)
            t1.set_text('Prediction')
            t1.set_position([mean_prior[0], mean_prior[1,0]+5])
            ax.set_title(f'Frame {frame}: Predict')
        elif state == 2:
            p[1] = ax.contour(X, Y, Z_meas, cmap='Reds', alpha=0.5, levels=10)
            t2.set_text('Measurement')
            t2.set_position([mean_meas[0], mean_meas[1]])
            ax.set_title(f'Frame {frame}: Measure')
        elif state == 3:
            delete_contour(p[0])
            delete_contour(p[1])
            delete_contour(p[2])
            t1.set_text('')
            t2.set_text('')

            p[2] = ax.contour(X, Y, Z_pos, cmap='Blues', alpha=0.5, levels=10)
            t3.set_text('Correction')
            t3.set_position([mean_pos[0], mean_pos[1]-5])
            ax.set_title(f'Frame {frame}: Correct')


        return p[0].collections + p[1].collections + p[2].collections
    
    # Create the animation
    anim = animation.FuncAnimation(fig, animate, frames=4*len(priors), interval=50, blit=True)

    # Display the animation
    # Convert the animation to HTML5 video
    html_video = HTML(anim.to_jshtml())

    # Display the video in Jupyter
    display(html_video)
    plt.show()

sensor_var = np.array([[meas_sig_x**2, 0],
                       [0, meas_sig_y**2]])

animate_gaussian(priors, var_priors, posteriors, var_pos, meas, sensor_var)