# Extended Kalman Filter

In [1]:
! pip install h5py
! pip install numpy-quaternion


Looking in indexes: http://192.168.16.2:8080/root/pypi/+simple/, https://pypi.ngc.nvidia.com
You should consider upgrading via the '/usr/bin/python -m pip install --upgrade pip' command.[0m
Looking in indexes: http://192.168.16.2:8080/root/pypi/+simple/, https://pypi.ngc.nvidia.com
You should consider upgrading via the '/usr/bin/python -m pip install --upgrade pip' command.[0m


In [4]:
from pathlib import Path
from scipy.spatial.transform import Rotation
import pandas as pd
import os
from os import path as osp
import numpy as np
from kalman_filter import ExtendedKalmanFilter
from metric import compute_ate_rte
import matplotlib.pyplot as plt
import h5py
import json
import quaternion
from data_utils import select_orientation_source

In [5]:
# Just need to pass in bias calibrated acc data, gyro data, gt position (with noise) and gt orientations (with noise) AS PD DATAFRAME

def test_kalman_filter(sample_data, gt_noise_std = 2, ori_noise_std = 0.05, std_dev = 0.01):

    time_arr = np.linspace(0, len(sample_data)*0.01, num = len(sample_data))
    
    quaternion = sample_data[['rotation.x', 'rotation.y', 'rotation.z', 'rotation.w',]].values

    # Create a rotation object from the quaternion
    rotation = Rotation.from_quat(quaternion)

    # Convert the quaternion to Euler angles
    euler_orientations = rotation.as_euler('xyz', degrees= False)

    # Convert acceleration into m/s^2
    acc = sample_data[['user_acc_x(G)', 'user_acc_y(G)', 'user_acc_z(G)']].values
    acc = acc * 9.81

    gyro = sample_data[['rotation_rate_x(radians/s)', 'rotation_rate_y(radians/s)', 'rotation_rate_z(radians/s)']].values

    N = len(sample_data)

    xy_obs_noise = np.random.normal(0.0, gt_noise_std, (N, 2))  # gen gaussian noise
    gt_trajectory_xy = sample_data[['translation.x','translation.y']].values
    obs_trajectory_xy = gt_trajectory_xy.copy()

    print(obs_trajectory_xy.shape)
    obs_trajectory_xy += xy_obs_noise  # add the noise to ground-truth positions
#     fig, ax = plt.subplots(1, 1, figsize=(12, 9))

#     xs, ys = gt_trajectory_xy[:, 0], gt_trajectory_xy[:, 1]
#     ax.plot(xs, ys, lw=2, label='ground-truth trajectory')

#     xs, ys = obs_trajectory_xy[:, 0], obs_trajectory_xy[:, 1]
#     ax.plot(xs, ys, lw=0, marker='.', markersize=2, alpha=0.4, label='noisy trajectory')

#     ax.set_xlabel('X [m]')
#     ax.set_ylabel('Y [m]')
#     ax.legend()
#     ax.grid()

    N = len(sample_data)

    orientation_obs_noise = np.random.normal(0.0, ori_noise_std, (N, 3))  # gen gaussian noise
    gt_orientation = euler_orientations
    obs_orientation = euler_orientations.copy()

    obs_orientation += orientation_obs_noise  # add the noise to ground-truth positions
#     fig, ax = plt.subplots(1, 1, figsize=(12, 9))

#     roll, pitch, yaw = gt_orientation[:, 0], gt_orientation[:, 1], gt_orientation[:,2]
#     ax.plot(roll, lw=2, label='ground-truth roll')
#     ax.plot(pitch, lw=2, label= 'ground truth pitch')
#     ax.plot(yaw, lw=2, label= 'ground truth yaw')

#     roll, pitch, yaw = obs_orientation[:, 0], obs_orientation[:, 1], obs_orientation[:,2]
#     ax.plot(roll, lw=0, marker='.', markersize=2, alpha=0.4, label='noisy roll')
#     ax.plot(pitch, lw=0, marker='.', markersize=2, alpha=0.4,label= 'noisy pitch')
#     ax.plot(yaw, lw=0, marker='.', markersize=2, alpha=0.4,label= 'noisy yaw')

#     ax.set_xlabel('X [m]')
#     ax.set_ylabel('Y [m]')
#     ax.legend()
#     ax.grid()

    # initial state x_0
    # State: [x position, y position, roll, pitch, yaw, velocity_x, velocity_y]; Assume velocity start at 0

    initial_list = [obs_trajectory_xy[0, 0], obs_trajectory_xy[0, 1], obs_orientation[0, 0], obs_orientation[0, 1], obs_orientation[0, 2], 0, 0]

    x = np.array(initial_list)

    # P covariance for initial state estimation error

    P = np.array([
        [gt_noise_std ** 2, 0, 0, 0, 0, 0, 0],
        [0, gt_noise_std ** 2, 0, 0, 0, 0, 0],
        [0, 0, ori_noise_std ** 2, 0, 0, 0, 0],
        [0, 0, 0,ori_noise_std ** 2 , 0, 0, 0],
        [0, 0, 0, 0, ori_noise_std ** 2, 0, 0],
        [0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0]
    ])

    # R (numpy.array): Measurement noise covariance matrix
    R = np.array([
        [gt_noise_std ** 2, 0, 0, 0, 0],
        [0, gt_noise_std ** 2, 0, 0, 0],
        [0, 0, ori_noise_std ** 2, 0, 0],
        [0, 0, 0,ori_noise_std ** 2 , 0],
        [0, 0, 0, 0, ori_noise_std ** 2]
    ])

    # State transition noise covariance will depend on how noisy is my acceleration and gyroscope data. For now lets assume a normal std of 0.01
    # Q (numpy.array): Process noise covariance matrix
    Q = np.eye(6) * (std_dev ** 2)

    # Initialize Kalman filter
    kf = ExtendedKalmanFilter(x, P)

    # Arrays to store estimated 2D pose [x, y, roll, pitch, yaw]
    mu_x = [x[0]]
    mu_y = [x[1]]
    mu_roll = [x[2]]
    mu_pitch = [x[3]]
    mu_yaw = [x[4]]

    # Arrays to store estimated error variance of 2D pose
    var_x = [P[0, 0]]
    var_y = [P[1, 1]]
    var_roll = [P[2, 2]]
    var_pitch = [P[3, 3]]
    var_yaw = [P[4, 4]]

    t_last = time_arr[0]

    for t_idx in range(1, N):
        t = time_arr[t_idx]
        dt = t - t_last
        
        # Get control input `u = [acceleration_x, acceleration_y, acceleration_z, roll_rate, pitch_rate, yaw_rate]`
        u = np.array([
            acc[t_idx, 0],
            acc[t_idx, 1],
            acc[t_idx, 2],
            gyro[t_idx, 0],
            gyro[t_idx, 1],
            gyro[t_idx, 2]
        ])
        
        # Because the acceleration and angular rates are already in the global frame,
        # we can directly use them in the propagation step
        
        # Propagate!
        kf.propagate(u, dt, Q)
        
        # Get measurement `z = [position_x, position_y, roll, pitch, yaw]`
        z = np.array([
            obs_trajectory_xy[t_idx, 0],
            obs_trajectory_xy[t_idx, 1],
            obs_orientation[t_idx, 0],
            obs_orientation[t_idx, 1],
            obs_orientation[t_idx, 2]
        ])
        
        # Update!
        kf.update(z, R)
        
        # Save estimated state to analyze later
        mu_x.append(kf.x[0])
        mu_y.append(kf.x[1])
        mu_roll.append(kf.x[2])
        mu_pitch.append(kf.x[3])
        mu_yaw.append(kf.x[4])
        
        # Save estimated variance to analyze later
        var_x.append(kf.P[0, 0])
        var_y.append(kf.P[1, 1])
        var_roll.append(kf.P[2, 2])
        var_pitch.append(kf.P[3, 3])
        var_yaw.append(kf.P[4, 4])
        
        t_last = t

    mu_x = np.array(mu_x)
    mu_y = np.array(mu_y)
    mu_roll = np.array(mu_roll)
    mu_pitch = np.array(mu_pitch)
    mu_yaw = np.array(mu_yaw)

    var_x = np.array(var_x)
    var_y = np.array(var_y)
    var_roll = np.array(var_roll)
    var_pitch = np.array(var_pitch)
    var_yaw = np.array(var_yaw)

    fig, ax = plt.subplots(1, 1, figsize=(12, 9))

    xs, ys = gt_trajectory_xy[:, 0], gt_trajectory_xy[:, 1]
    ax.plot(xs, ys, lw=2, label='ground-truth trajectory')

    xs, ys = obs_trajectory_xy[:, 0], obs_trajectory_xy[:, 1]
    ax.plot(xs, ys, lw=0, marker='.', markersize=2, alpha=0.4, label='noisy trajectory')

    ax.plot(mu_x, mu_y, lw=1, label='estimated trajectory', color='r')

    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.legend()
    ax.grid()

    pos_pred = np.zeros((N,2))
    pos_pred[:,0] = mu_x
    pos_pred[:,1] = mu_y


    ate, rte = compute_ate_rte(pos_pred, gt_trajectory_xy, 1)
    print('ATE: ', ate)
    print('RTE: ', rte)
    
    return ate, rte



In [72]:
dataset_folder = osp.join(Path(os.getcwd()).parents[2], 'localisation_datasets')
data_path = dataset_folder + '/Ronin_datasets/unseen_subjects_test_set/a006_2'

with open(osp.join(data_path, 'info.json')) as f:
    info = json.load(f)

info['ori_source'], ori, info['source_ori_error'] = select_orientation_source(data_path, 
max_ori_error=20.0, grv_only=True)
    
with h5py.File(osp.join(data_path, 'data.hdf5')) as f:
    gyro_uncalib = f['synced/gyro_uncalib']
    acce_uncalib = f['synced/acce']
    gyro = gyro_uncalib - np.array(info['imu_init_gyro_bias'])
    acce = np.array(info['imu_acce_scale']) * (acce_uncalib - np.array(info['imu_acce_bias']))
    ts = np.copy(f['synced/time'])
    tango_pos = np.copy(f['pose/tango_pos'])
    
    # from tango frame to gt coord frame(?)
    init_tango_ori = quaternion.quaternion(*f['pose/tango_ori'][0])

# Compute the IMU orientation in the Tango coordinate frame.
ori_q = quaternion.from_float_array(ori) # ori is in global coord frame (?)

# rotation vector to get imu frame to tango frame
rot_imu_to_tango = quaternion.quaternion(*info['start_calibration'])

init_rotor = init_tango_ori * rot_imu_to_tango * ori_q[0].conj()

#we assume thais orientation is correct for now to rotate accelerations from the device coord frame to tango/global
ori_q = init_rotor * ori_q 

gyro_q = quaternion.from_float_array(np.concatenate([np.zeros([gyro.shape[0], 1]), gyro], axis=1))
acce_q = quaternion.from_float_array(np.concatenate([np.zeros([acce.shape[0], 1]), acce], axis=1))
glob_gyro = quaternion.as_float_array(ori_q * gyro_q * ori_q.conj())[:, 1:]
glob_acce = quaternion.as_float_array(ori_q * acce_q * ori_q.conj())[:, 1:]

start_frame = info.get('start_frame', 0)
ts = ts[start_frame:]
features = np.concatenate([glob_gyro, glob_acce], axis=1)[start_frame:]
orientations = quaternion.as_float_array(ori_q)[start_frame:]
gt_pos = tango_pos[start_frame:]

In [73]:
print(acce[0])
print(glob_acce[0])

[-9.78617015  0.14254558 -1.18405454]
[0.19562306 0.04113948 9.8565504 ]


In [81]:
dataset_folder = osp.join(Path(os.getcwd()).parents[2], 'localisation_datasets')
data_path = dataset_folder + '/Ronin_datasets/unseen_subjects_test_set/a006_2'

with open(osp.join(data_path, 'info.json')) as f:
    info = json.load(f)

info['ori_source'], ori, info['source_ori_error'] = select_orientation_source(data_path, max_ori_error=20.0, grv_only=True)
    
with h5py.File(osp.join(data_path, 'data.hdf5')) as f:
    gyro_uncalib = f['synced/gyro_uncalib']
    acce_uncalib = f['synced/acce']
    gyro = gyro_uncalib - np.array(info['imu_init_gyro_bias'])
    acce = np.array(info['imu_acce_scale']) * (acce_uncalib - np.array(info['imu_acce_bias']))
    ts = np.copy(f['synced/time'])
    tango_pos = np.copy(f['pose/tango_pos'])
    
    # from tango frame to gt coord frame(?)
    init_tango_ori = quaternion.quaternion(*f['pose/tango_ori'][0])

# Compute the IMU orientation in the Tango coordinate frame.
ori_q = quaternion.from_float_array(ori) # ori is in global coord frame (?)

# rotation vector to get imu frame to tango frame
rot_imu_to_tango = quaternion.quaternion(*info['start_calibration'])

init_rotor = init_tango_ori * rot_imu_to_tango * ori_q[0].conj()

#we assume thais orientation is correct for now to rotate accelerations from the device coord frame to tango/global
ori_q = init_rotor * ori_q 
print(ori_q[0])


# convert ori to [x, y, z, w] from [w, x, y, z]
ori_q = quaternion.as_float_array(ori_q)
temp = ori_q[:, 0].copy()
ori_q[:,0] = ori_q[:,3]
ori_q[:,3] = temp

rotation = Rotation.from_quat(ori_q[0,:])
print(ori_q[0,:])
print(rotation.apply(acce[0]))
glob_acce = rotation.apply(acce)

start_frame = info.get('start_frame', 0)
ts = ts[start_frame:]
features = np.concatenate([glob_gyro, glob_acce], axis=1)[start_frame:]
orientations = quaternion.as_float_array(ori_q)[start_frame:]
gt_pos = tango_pos[start_frame:]

quaternion(-0.423513592429391, 0.558918555508853, -0.4982519392916, -0.50989399605611)
[-0.509894    0.55891856 -0.49825194 -0.42351359]
[ 1.00447288  2.61632969 -9.45184023]


In [75]:
print(acce[0])
print(glob_acce[0])

[-9.78617015  0.14254558 -1.18405454]
[ 1.00447288  2.61632969 -9.45184023]
