# 2D Simple Kalman Filter

**Overall steps:**
1. Calculate the predicted state
2. Calculate the state covariance
3. Calculate the Kalman Gain
4. Calculate the new observation
5. Update the state
6. Update the process covariance matrix

In [1]:
# installing the required dependencies
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
#defining the control variables

g = -9.81    # m/s^2

ctrl = {
    'a_x': 0,
    'a_y': g,
}

ctrl_matrix = np.matrix([[ctrl['a_x']],[ctrl['a_y']]])
# defining the noise matrix - for now we will set it to a zero matrix
noise_matrix = np.zeros((4,1))
z_matrix = np.zeros((4,1))
measurement_noise_matrix = np.matrix(np.diag([625,0,36,0]))

In [3]:
# defining our time steps
dt = 1    #seconds

For 2D, the state vector will be of the form $ \vec{X} = [x , y , \dot{x} , \dot{y}] ^ T $

When calculating the predicted state, we will use $ \vec{X_{k_p}} = A \vec{x_{k-1}} + B \vec{u_{k}} + \vec{w_k} $, 

where $ A = \begin{pmatrix} 1 & 0 & \Delta T &0\\0&1&0&\Delta T\\0&0&1&0\\0&0&0&1\end{pmatrix} $ and $ B = \begin{pmatrix} \frac{1}{2} (\Delta T)^2 & 0 \\ 0 & \frac{1}{2} (\Delta T)^2 \\ \Delta T & 0 \\ 0 & \Delta T\end{pmatrix} $

In [4]:
# calculating the predicted state

def calculate_predicted_state(state_vector, control_variable_vector, dt, noise_matrix):
    
    # defining the A matrix
    A = np.matrix([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]])
    
    #defining the B matrix
    B = np.matrix([[0.5*(dt**2), 0],[0,0.5*(dt**2)],[dt,0],[0,dt]])
    
    #calculating the predicted state
    predicted_x = A * state_vector + B * control_variable_vector + noise_matrix
    
    return predicted_x
    

We will calculate the state covariance matrix by doing the following:

$ P_{k_p} = A P_{k-1} A^T + Q $, where $A$ is the same matrix as above and $Q$ s the noise process covariance matrix

In [5]:
def calculate_new_covariance_matrix(previous_covariance_matrix, dt ,noise_covariance_matrix=np.zeros((4,1))):
    # defining the A matrix
    A = np.matrix([ [1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]])
    
    #defining the Q matrix - for now we will set this to 0
    Q = np.zeros((4,4))
    
    #calculating the new P matrix
    p_new = A * previous_covariance_matrix * A.transpose() + Q
    
    #for now, we will just get the diagonal terms from the matrix
    p_new_diag = np.diag(np.diag(p_new))
    
    return p_new_diag

We will now calculate the Kalman Gain matrix: $ Kalman Gain = \frac{P_{k_p} H^T}{H P_{k_p} H^T + R} $

where $R$ is the measurement covariance matrix, and $H$ is to resize the matrix into the correct form

In [6]:
def calculate_kalman_gain(state_covariance_matrix, measurement_noise_covariance_matrix):
    #defining the H matrix
    H = np.identity(4)
    
    # calculating the kalman gain
    kalman_gain_numerator = state_covariance_matrix * H.transpose()
    kalman_gain_denominator = H * state_covariance_matrix * H.transpose() + measurement_noise_covariance_matrix
    
    # based on the assumption that the numerator and denominators are diagonal matrices, we can extract
    # the diagonal elements into a list and divide them and then put back into a matrix
    kalman_numerator_list = np.diag(kalman_gain_numerator)
    kalman_denominator_list = np.diag(kalman_gain_denominator)
    
    #getting the kalman gain matrix
    kalman_gain_matrix = np.matrix(np.diag(np.divide(kalman_numerator_list,kalman_denominator_list)))
    
    return kalman_gain_matrix

Now we will write a function to process the next/newest observation: $\vec{Y_k} = C \vec{Y_{k_{measured}}} + \vec{Z_k} $

$C$ is a reshaping matrix and $Z$ is the apparatus error/ adjustment error

In [7]:
def process_next_observation(observation_vector,adjustment_noise= np.zeros((4,1))):
    #defining the C matrix
    C = np.identity(4)
    
    # processing the observation
    y_new = C * observation_vector + adjustment_noise
    
    return y_new

We are now going to calculate a new adjusted state vector of where we believe our object is going to be: $ \vec{X_{k}} = \vec{X_{k_p}} + K \times (\vec{Y_{k}} - H \vec{X_{k_p}}) $

In [8]:
def calculate_updated_state(kalman_gain_matrix,predicted_state_vector,observation_vector):
    # defining the H matrix
    H = np.identity(4)
    
    #calculating the new x vector
    x_updated_vector = predicted_state_vector + kalman_gain_matrix * (observation_vector - predicted_state_vector)
    
    return x_updated_vector

Now we will calculate the updated process covariance matrix: $ P_k = (I - K H) P_{k_{p}} $ 

where $ P_{k_p} $ is the predicted process covariance matrix calculated from before, $I$ is the identity matrix, $K$ is the Kalman Gain matrix, and $H$ is a reshaping matrix

In [9]:
def calculate_updated_process_covariance_matrix(kalman_gain_matrix,process_covariance_matrix):
    #defining the I and H matrices
    I = np.identity(4)
    H = np.identity(4)
    
    # calculating the updated covariance matrix
    p_updated = (I - kalman_gain_matrix * H) * process_covariance_matrix
    
    return p_updated

We have written all the functions above, now we will write a simple loop to filter through

In [29]:
observation_matrix = np.matrix([]) ''' <----- insert the data you want here'''

SyntaxError: invalid syntax (<ipython-input-29-4bf75d13e319>, line 1)

In [25]:
#define the initial state variables
x_initial = np.reshape(observation_matrix[0],(4,1))
p_initial = np.matrix(np.diag([]))  ''' <------- initialise the covariance matrix by writing in values d1, d2, etc..'''

# set up the current variables
x_current = x_initial
p_current = p_initial

n = 10

for k in range(1,3):
    #calculate the predicted state
    x_predicted = calculate_predicted_state(x_current,ctrl_matrix,dt,noise_matrix)

    # calculate the predicted covariance matrix
    p_predicted = calculate_new_covariance_matrix(p_current, dt ,noise_matrix)

    # calculate the Kalman Gain matrix
    kalman_gain_matrix = calculate_kalman_gain(p_predicted, measurement_noise_matrix) 

    # process the newest observation
    observation = np.reshape(observation_matrix[k],(4,1))

    processed_observation = process_next_observation(observation,noise_matrix)

    # calculate the updated state
    x_updated = calculate_updated_state(kalman_gain_matrix,x_predicted,processed_observation)

    # calculating the updated process covariance matrix
    p_updated = calculate_updated_process_covariance_matrix(kalman_gain_matrix,p_predicted)

    print(x_updated)
    print(p_updated)
    print(np.resize(p_updated[2],(4,1)))

[[4271.9047619 ]
 [          nan]
 [ 280.81967213]
 [          nan]]
[[252.97619048   0.           0.           0.        ]
 [         nan          nan          nan          nan]
 [  0.           0.          14.75409836   0.        ]
 [         nan          nan          nan          nan]]
[[ 0.        ]
 [ 0.        ]
 [14.75409836]
 [ 0.        ]]
[[2548.02380952]
 [          nan]
 [ 166.47540984]
 [          nan]]
[[252.97619048   0.           0.           0.        ]
 [         nan          nan          nan          nan]
 [  0.           0.          14.75409836   0.        ]
 [         nan          nan          nan          nan]]
[[ 0.        ]
 [ 0.        ]
 [14.75409836]
 [ 0.        ]]


  from ipykernel import kernelapp as app
