In [1]:
import numpy as np
import os
from pykalman import KalmanFilter
from KF_params_MLE import tune_KF_params_MLE

In [2]:
T_train = 1000
T_test = 1000

with open('data.txt') as f:
    first = f.readline()
    first = first.strip('\n')
    temp = first.split(' ')
    T = int(temp[0])
    o_dim = int(temp[1])
    s_dim = int(temp[2])
    o_matrix = np.zeros((T, o_dim), np.float32)
    for i in range(T):
        temp = f.readline().strip('\n').split(' ')
        for j in range(s_dim):
            o_matrix[i,j] = float(temp[j])
    s_matrix = np.zeros((T, s_dim), np.float32)
    for i in range(T):
        temp = f.readline().strip('\n').split(' ')
        for j in range(o_dim):
            s_matrix[i,j] = float(temp[j])

In [3]:
T_train = 1000
train_data = np.zeros((T_train, 3*s_dim), np.float32)
for i in range(T_train):
    train_data[i, :] = np.concatenate((s_matrix[i, :], s_matrix[i+1, :], o_matrix[i+1, :]), axis=0)
    
T_test = 1000
test_data = np.zeros((T_test, 3*s_dim), np.float32)
for i in range(T_test):
    test_data[i, :] = np.concatenate((s_matrix[T_train + i, :], s_matrix[T_train + 1 + i, :], o_matrix[T_train + 1 + i, :]),0)

In [4]:
A_2, b_2, Sig_2, A_3, b_3, Sig_3 = tune_KF_params_MLE(train_data[:,:s_dim],\
                                                 train_data[:,s_dim:2*s_dim], train_data[:,2*s_dim:3*s_dim])
kf_orig = KalmanFilter(\
                       initial_state_mean =\
                           np.matmul(A_2, np.transpose(test_data[0,:s_dim])) + b_2.reshape(-1),\
                       initial_state_covariance = Sig_2,\
                       transition_matrices = A_2, \
                       transition_covariance = Sig_2, \
                       transition_offsets = np.transpose(b_2.reshape(-1)),\
                       observation_matrices = A_3,\
                       observation_covariance = Sig_3,\
                       observation_offsets = np.transpose(b_3.reshape(-1))\
                      )
measurements = test_data[:,2*s_dim:]
(est_s_t_onOrig, _) = kf_orig.filter(measurements)
print(np.mean(np.linalg.norm(est_s_t_onOrig - test_data[:, s_dim : 2*s_dim] , axis = 1)), end = '')
print(' / ', end = '')
print(np.mean(np.linalg.norm(test_data[:, s_dim : 2*s_dim], axis = 1)))
print('mean consecutive diff: ', end='')
print(np.mean(np.linalg.norm(test_data[1:, s_dim : 2*s_dim] - test_data[:-1, s_dim : 2*s_dim], axis = 1)))

1.97230050789 / 157.143
mean consecutive diff: 2.92586


In [5]:
with open('axiliary.txt') as f:
    first = f.readline()
    first = first.strip('\n')
    temp = first.split(' ')
    T = int(temp[0])
    o_p_dim = int(temp[1])
    s_p_dim = int(temp[2])
    o_p_matrix = np.zeros((T, o_p_dim), np.float32)
    for i in range(T):
        temp = f.readline().strip('\n').split(' ')
        for j in range(s_p_dim):
            o_p_matrix[i,j] = float(temp[j])
    s_p_matrix = np.zeros((T, s_p_dim), np.float32)
    for i in range(T):
        temp = f.readline().strip('\n').split(' ')
        for j in range(o_p_dim):
            s_p_matrix[i,j] = float(temp[j])

In [6]:

hidden_train_data = np.zeros((T_train, 3*s_p_dim), np.float32)
for i in range(T_train):
    hidden_train_data[i, :] = np.concatenate((s_p_matrix[i, :], s_p_matrix[i+1, :], o_p_matrix[i+1, :]), axis=0)
    

hidden_test_data = np.zeros((T_test, 3*s_p_dim), np.float32)
for i in range(T_test):
    hidden_test_data[i, :] = np.concatenate((s_p_matrix[T_train + i, :], s_p_matrix[T_train + 1 + i, :], o_p_matrix[T_train + 1 + i, :]),0)

In [7]:
A_2, b_2, Sig_2, A_3, b_3, Sig_3 = tune_KF_params_MLE(s_p_matrix[:T_train,:s_p_dim],\
                                                      hidden_train_data[:,s_p_dim:2*s_p_dim],\
                                                      hidden_train_data[:,2*s_p_dim:3*s_p_dim])
kf_hidden = KalmanFilter(\
                         initial_state_mean =\
                            np.matmul(A_2, np.transpose(hidden_test_data[0,:s_p_dim])) + b_2.reshape(-1),\
                         initial_state_covariance = Sig_2,\
                         transition_matrices = A_2, \
                         transition_covariance = Sig_2, \
                         transition_offsets = np.transpose(b_2.reshape(-1)),\
                         observation_matrices = A_3,\
                         observation_covariance = Sig_3,\
                         observation_offsets = np.transpose(b_3.reshape(-1))\
                      )
measurements = hidden_test_data[:,2*s_p_dim:]
(est_s_t_p, _) = kf_hidden.filter(measurements)
print(np.mean(np.linalg.norm(est_s_t_p - hidden_test_data[:, s_p_dim : 2*s_p_dim] , axis = 1)), end = '')
print(' / ', end = '')
print(np.mean(np.linalg.norm(hidden_test_data[:, s_p_dim : 2*s_p_dim], axis = 1)))
print('mean consecutive diff: ', end='')
print(np.mean(np.linalg.norm(hidden_test_data[1:, s_p_dim : 2*s_p_dim] -\
                             hidden_test_data[:-1, s_p_dim : 2*s_p_dim], axis = 1)))

0.584746824366 / 81.383
mean consecutive diff: 1.01887


In [8]:
with open('params.txt') as f:
    ln = f.readline()
    ln = f.readline()
    for i in range(s_p_dim):
        ln = f.readline()
    for i in range(s_p_dim):
        ln = f.readline()
    for i in range(o_p_dim):
        ln = f.readline()
    for i in range(s_p_dim):
        ln = f.readline()
    for i in range(o_p_dim):
        ln = f.readline()
    for i in range(o_dim):
        ln = f.readline()
    for i in range(o_dim):
        ln = f.readline()
    for i in range(o_dim):
        ln = f.readline()
    D1 = np.zeros((s_dim, s_p_dim), np.float32)
    for i in range(s_dim):
        temp = f.readline().strip('\n').split(' ')
        for j in range(s_p_dim):
            D1[i,j] = float(temp[j])        

In [9]:
est_s_t_onHidden = 20 * np.tanh(np.matmul(est_s_t_p /20 , np.transpose(D1)))
print(np.mean(np.linalg.norm(est_s_t_onHidden - test_data[:, s_dim : 2*s_dim] , axis = 1)), end = '')
print(' / ', end = '')
print(np.mean(np.linalg.norm(test_data[:, s_dim : 2*s_dim], axis = 1)))
print('mean consecutive diff: ', end='')
print(np.mean(np.linalg.norm(test_data[1:, s_dim : 2*s_dim] -\
                             test_data[:-1, s_dim : 2*s_dim], axis = 1)))

1.79577745463 / 157.143
mean consecutive diff: 2.92586


In [10]:
validate_est_s_t_onHidden = 20 * np.tanh(np.matmul(hidden_test_data[:,s_p_dim:2*s_p_dim] /20 , np.transpose(D1)))
print(np.mean(np.linalg.norm(validate_est_s_t_onHidden - test_data[:, s_dim : 2*s_dim] , axis = 1)))

1.52566e-05
