In [1]:
# pre reqs 
"""
 zt = 2xt/vsound 
 ut = xt.. 
 xt = xt0 + ut(t)^2/2 
 ept = N(0,R)
 delt = N(0,Q) 
 X = [xt,xt.] 
 
 326 time stamps with 0.01 interval 
 xt(0)= 0 , var = 1e-4I 
 Q = 0.01 
 var(x) = 0.1^2 , var(x.) = 0.5^2 , R is diag 
 vsound = 3000 km/hr 
 u(t) = ... 
"""
import numpy as np 
import math 
from numpy.random import multivariate_normal  
import plotly.express as px
import plotly.graph_objects as go 
plot_dir = "plots_part_1/"
tol = 0

In [2]:
# True x,v 
def u(t):
    if t < 0.25:
        return 400 
    elif t > 3 and t < 3.25:
        return -400 
    else:
        return 0 

# def v(t):
#     if t < 0.25: 
#         return u(t)*t 
#     elif t > 3 and t < 3.25:
#         return 100+u(t)*(t-3)
#     else:
#         return 0.25*u(0) 

# def x(t):
#     if t < 0.25:
#         return (1/2)*u(t)*t**2 
#     elif t > 3 and t <= 3.25:
#         return (1/2)*u(0)*(0.25)**2 + (2.75)*v(0.25) + v(0.25)*(t-3) + (1/2)*u(t)*(t-3)**2
#     else:
#         return (1/2)*u(0)*(0.25)**2 + (t-0.25)*v(0.25)


In [3]:
# estimated x,v 


sig_x_0 = 0.1 
sig_v_0 = 0.5
sig_s_0 = 0.01

sig_x = sig_x_0 
sig_v = sig_v_0
sig_s = sig_s_0

time_step = 0.01
N = 326
v_sound = 3000
time = [(0.01*i-tol) for i in range(0,326)]

mu_0 = np.transpose(np.array([0,0]))
mu_0 = mu_0[:,np.newaxis]
sigma_0 = 1e-4*np.array([[1,0],[0,1]])
 

R = np.array([[sig_x**2,0],[0,sig_v**2]])
Q = np.array([[sig_s**2]])

A_t = np.array([[1,time_step],[0,1]]) 
B_t = np.transpose(np.array([0.5*time_step**2,time_step]))
B_t = B_t[:,np.newaxis]

C_t = np.array([[2/v_sound,0]]) 



def kalman_filter(mu_prev,sigma_prev,u_t,z_t=None,R=R,Q=Q):

    mu_t = A_t@mu_prev + B_t@u_t 
    sigma_t = A_t@sigma_prev@np.transpose(A_t) + R 
    
    K_t = sigma_t@np.transpose(C_t)@np.linalg.inv((C_t@sigma_t@np.transpose(C_t)+Q))
    if z_t is not None:
        mu_t = mu_t + K_t@(z_t-C_t@mu_t)
        # sigma_t = (np.array([[1,0],[0,1]])-K_t@C_t)@sigma_t
        sigma_t = (np.eye(2) - K_t @ C_t) @ sigma_t @ (np.eye(2) - K_t @ C_t).T + K_t @ Q @ K_t.T

    return mu_t,sigma_t,K_t

def get_state_and_measurements_vs_time(R=R,Q=Q):
    state_t = [np.transpose(np.array(multivariate_normal(np.transpose(mu_0)[0],sigma_0,1)))]
    for t in time[1:]:
        noise_sample = np.transpose(np.array(multivariate_normal(np.array([0,0]),R,1)[0]))
        noise_sample = noise_sample[:,np.newaxis]
        state_t.append(A_t@state_t[-1]+B_t@np.array([[u(t)]])+noise_sample)
    z_t = [(C_t@state_t[i]+multivariate_normal([0],Q,1)[0]) for i in range(len(time))]
    return state_t,z_t 

def get_estimates_and_kalman_gain_vs_time(z_t,R=R,Q=Q):
    K_t = []
    mu_t = [mu_0]
    sigma_t = [sigma_0]


    for i,t in enumerate(time[1:]):
        mu,sigma,K = kalman_filter(mu_t[-1],sigma_t[-1],np.array([[u(t)]]),z_t[i],R,Q) 
        K_t.append(K)
        mu_t.append(mu)
        sigma_t.append(sigma) 
    
    return mu_t,sigma_t,K_t 
    
def get_trajectories(mu_t,sigma_t,state_t):
    x_est = [mu[0][0] for mu in mu_t]
    v_est = [mu[1][0] for mu in mu_t]   
    std_dev_xx = [np.sqrt(sigma_t[i][0][0]) for i in range(len(time))]
    std_dev_vv= [np.sqrt(sigma_t[i][1][1]) for i in range(len(time))]
    x_true = [state_t[i][0][0] for i in range(len(time))]
    v_true = [state_t[i][1][0] for i in range(len(time))]

    result = {'x_est':x_est,'v_est':v_est,'std_dev_xx':std_dev_xx,'x_true':x_true,'v_true':v_true,'std_dev_vv':std_dev_vv}
    return result 



    


In [4]:
# Part(a) and Part(b), Plotting the ground truth and estimated position and velocity vs time 
state_t,z_t = get_state_and_measurements_vs_time()
mu_t,sigma_t,K_t = get_estimates_and_kalman_gain_vs_time(z_t) 
simulation = get_trajectories(mu_t,sigma_t,state_t) 

x_est = simulation['x_est']
v_est = simulation['v_est']
std_dev_xx = simulation['std_dev_xx']
std_dev_vv= simulation['std_dev_vv']
x_true = simulation['x_true']
v_true = simulation['v_true']

fig = px.scatter(x=time, y=x_true,title="Ground Truth Position vs Time",labels={'x':'Time (hour)','y':'Positition (km)'})
fig.show()
fig.write_html(f"{plot_dir}part_ab/ground_truth_position_vs_time.html")

fig = px.scatter(x=time,y=v_true,title='Ground Truth Velocity vs Time',labels={'x':'Time (hour)','y':'Velocty (km/h)'})
fig.show()
fig.write_html(f"{plot_dir}part_ab/ground_truth_velocity_vs_time.html")

fig = px.scatter(x=time, y=x_est,title="Estimated Position vs Time",labels={'x':'Time (hour)','y':'Positition (km)'})
fig.show()
fig.write_html(f"{plot_dir}part_ab/estimated_position_vs_time.html")

fig = px.scatter(x=time,y=v_est,title='Estimated Velocity vs Time',labels={'x':'Time (hour)','y':'Velocty (km/h)'})
fig.show()
fig.write_html(f"{plot_dir}part_ab/estimated_velocity_vs_time.html")

In [5]:
# Part (c) Jointly plotting the actual and estimated trajectories and velocities vs time 
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=x_true, mode='lines+markers', name='Ground Truth Position'))
fig.add_trace(go.Scatter(x=time, y=x_est, mode='lines+markers', name='Estimated Position',error_y=dict(
        type='data',
        array=std_dev_xx, 
        color='brown', 
        visible=True
    )))
fig.update_layout(
    title ="Actual and Estimated Trajectory vs Time",
    xaxis_title = "Time (hour)",
    yaxis_title = "Position (km)",
    legend_title = "Legend"
)
fig.show()
fig.write_html(f"{plot_dir}part_c/actual_and_estimated_trajectory_vs_time.html") 

fig = go.Figure()
fig.add_trace(go.Scatter(x=time,y=v_true,mode='lines+markers',name='Ground Truth Velocity'))
fig.add_trace(go.Scatter(x=time,y=v_est,mode="lines+markers",name="Estimated Velocity",error_y=dict(
    type='data',
    array=std_dev_vv,
    color='brown',
    visible=True
)))
fig.update_layout(
    title="Actual and Estimated Velocity vs Time",
    xaxis_title='Time (hour)',
    yaxis_title="Velocity (km/h)",
    legend_title="Legend"
)
fig.show()
fig.write_html(f"{plot_dir}part_c/actual_and_estimated_velocity_vs_time.html")

In [6]:
# Part (d) Varying noise in the system and observing the effect on trajectories. 
std_x_ranges = [0.01*(i) for i in range(0,200,40)]
std_v_ranges = [0.05*(i) for i in range(0,200,40)] 
std_s_ranges = [0.01*(i) for i in range(0,200,40)] 


def vary_std_dev(variable,std_dev_ranges,state_param="Position"):
    colors = ['blue', 'red', 'green', 'purple', 'orange', 'cyan', 'magenta']
    fig = go.Figure()
    if variable == 'x':
        legend_title = f"Legend, std_dev_v={sig_v_0}, std_dev_s={sig_s_0}"
    elif variable == 'v':
        legend_title = f"Legend, std_dev_x={sig_x_0}, std_dev_s={sig_s_0}"
    elif variable == 's':
        legend_title = f"Legend, std_dev_x={sig_x_0}, std_dev_v={sig_v_0}" 
    def vary_and_graph(state_param):    
        for i,std_d in enumerate(std_dev_ranges):
            sig_x = sig_x_0
            sig_v = sig_v_0
            sig_s = sig_s_0
            if variable == 'x':
                sig_x = std_d 
            elif variable == 'v':
                sig_v = std_d 
            elif variable == 's':
                sig_s = std_d 
            R = np.array([[sig_x**2,0],[0,sig_v**2]]) 
            Q = np.array([[sig_s**2]]) 
            state_t,z_t = get_state_and_measurements_vs_time(R,Q)
            mu_t,sigma_t,K_t = get_estimates_and_kalman_gain_vs_time(z_t,R,Q)
            simulation = get_trajectories(mu_t,sigma_t,state_t)
            x_est = simulation['x_est']
            v_est = simulation['v_est']
            std_dev_xx = simulation['std_dev_xx']
            x_true = simulation['x_true']
            v_true = simulation['v_true']
            
            color = colors[i%len(colors)]
            fig.add_trace(go.Scatter(x=time, y=(lambda param: x_true if param=='Position' else v_true)(state_param), mode='lines+markers', name=f'Ground Truth {state_param} std_dev={std_d}',line=dict(color=color)))
            fig.add_trace(go.Scatter(x=time, y=(lambda param: x_est if param=='Position' else v_est)(state_param), mode='lines+markers', line=dict(color=color),name=f'Estimated {state_param} sig_{variable}={std_d}',error_y=dict(
                type='data',
                array=(lambda param: std_dev_xx if param=='Position' else std_dev_vv)(state_param), 
                color='brown', 
                visible=True
            )))
        fig.update_layout(
            title=f"Actual and Estimated {state_param} vs Time",
            xaxis_title="Time (hour)",
            yaxis_title="Position (km)",
            legend_title=legend_title
        )
        fig.show()
        fig.write_html(f"{plot_dir}part_d/varying_{variable}_std_dev_{(state_param).lower()}_vs_time.html")
    vary_and_graph(state_param)

vary_std_dev('x',std_x_ranges,"Position")
vary_std_dev('v',std_v_ranges,"Position")
vary_std_dev('s',std_s_ranges,"Position")

vary_std_dev('x',std_x_ranges,"Velocity")
vary_std_dev('v',std_v_ranges,"Velocity")
vary_std_dev('s',std_s_ranges,"Velocity")





In [7]:
# Part (e) Plotting Kalman Gain and its variation with various noise 
def vary_and_plot_kalman_gain(variable,std_dev_ranges,state_param="Position"):
    fig = go.Figure()
    if variable == 'x':
        legend_title = f"Legend, std_dev_v={sig_v_0}, std_dev_s={sig_s_0}"
    elif variable == 'v':
        legend_title = f"Legend, std_dev_x={sig_x_0}, std_dev_s={sig_s_0}"
    elif variable == 's':
        legend_title = f"Legend, std_dev_x={sig_x_0}, std_dev_v={sig_v_0}"
    colors = ['blue', 'red', 'green', 'purple', 'orange', 'cyan', 'magenta']
    def vary_and_graph(state_param):
        for i,std_d in enumerate(std_dev_ranges):
            sig_x = sig_x_0
            sig_v = sig_v_0
            sig_s = sig_s_0
            if variable == 'x':
                sig_x = std_d 
            elif variable == 'v':
                sig_v = std_d 
            elif variable == 's':
                sig_s = std_d 
            R = np.array([[sig_x**2,0],[0,sig_v**2]]) 
            Q = np.array([[sig_s**2]]) 
            state_t,z_t = get_state_and_measurements_vs_time(R,Q)
            mu_t,sigma_t,K_t = get_estimates_and_kalman_gain_vs_time(z_t,R,Q)
            K_position = [K[0][0] for K in K_t]
            K_velocity = [K[1][0] for K in K_t]
            color = colors[i%len(colors)]
            fig.add_trace(go.Scatter(x=time,y=(lambda param:K_position if param=="Position" else K_velocity)(state_param),mode='lines+markers',name=f'Kalman Gain {state_param} sig_{variable}={std_d}',line=dict(color=color)))
        fig.update_layout(
            title=f"Kalman Gain {state_param} vs Time", 
            xaxis_title="Time (hour)",
            yaxis_title=f"Kalman Gain {state_param}",
            legend_title=legend_title 
        )
        fig.show()
        fig.write_html(f"{plot_dir}part_e/varying_{variable}_std_dev_kalman_gain_{state_param.lower()}_vs_time.html")
    vary_and_graph(state_param)

vary_and_plot_kalman_gain('x',std_x_ranges,"Position")
vary_and_plot_kalman_gain('v',std_v_ranges,"Position")
vary_and_plot_kalman_gain('s',std_s_ranges,"Position")

vary_and_plot_kalman_gain('x',std_x_ranges,"Velocity")
vary_and_plot_kalman_gain('v',std_v_ranges,"Velocity")
vary_and_plot_kalman_gain('s',std_s_ranges,"Velocity")


In [8]:
# Part (f), Missing Observations 
state_t,z_t = get_state_and_measurements_vs_time(R,Q)
z_t = [None if 1.5 <= t <= 2.5 else z for t, z in zip(time, z_t)]
mu_t,sigma_t,K_t = get_estimates_and_kalman_gain_vs_time(z_t,R,Q)
simulation = get_trajectories(mu_t,sigma_t,state_t)
x_est = simulation['x_est']
x_true = simulation['x_true']
v_est = simulation['v_est']
v_true = simulation['v_true']
std_dev_xx = simulation['std_dev_xx']
std_dev_vv = simulation['std_dev_vv']

fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=x_true, mode='lines+markers', name='Ground Truth Position'))
fig.add_trace(go.Scatter(x=time, y=x_est, mode='lines+markers', name='Estimated Position',error_y=dict(
        type='data',
        array=std_dev_xx, 
        color='brown', 
        visible=True
    )))
fig.update_layout(
    title ="Actual and Estimated Trajectory vs Time (missing observations at 1.5 <= t <= 2.5)",
    xaxis_title = "Time (hour)",
    yaxis_title = "Position (km)",
    legend_title = "Legend"
)
fig.show()
fig.write_html(f"{plot_dir}part_f/actual_and_estimated_trajectory_vs_time_with_missing_data.html")

fig = go.Figure()
fig.add_trace(go.Scatter(x=time,y=v_true,mode='lines+markers',name='Ground Truth Velocity'))
fig.add_trace(go.Scatter(x=time,y=v_est,mode="lines+markers",name="Estimated Velocity",error_y=dict(
    type='data',
    array=std_dev_vv,
    color='brown',
    visible=True
)))
fig.update_layout(
    title="Actual and Estimated Velocity vs Time (missing observations at 1.5 <= t <= 2.5)",
    xaxis_title='Time (hour)',
    yaxis_title="Velocity (km/h)",
    legend_title="Legend"
)
fig.show()
fig.write_html(f"{plot_dir}part_f/actual_and_estimated_velocity_vs_time_with_missing_data.html")
