In [None]:
!pip install control

In [None]:
# This mounts your Google Drive to the Colab VM.
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
 
# Enter the foldername in your Drive where you have saved the unzipped
# assignment folder, e.g. 'cs231n/assignments/assignment1/'
FOLDERNAME = 'Semesterproject 1/Code/KalmanNet_control/'
assert FOLDERNAME is not None, "[!] Enter the foldername."
 
# Now that we've mounted your Drive, this ensures that
# the Python interpreter of the Colab VM can load
# python files from within it.
import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))

Mounted at /content/drive


In [None]:
import torch
from torch.distributions.multivariate_normal import MultivariateNormal
from system_model_lqr import SystemModelLQR
from systems import LinearSystem
from math import log10, cos, sin, pi

Running on the CPU


In [None]:
#############
### Model ###
#############
dtype_ = torch.float32
q2 = 1.0  # process noise variance
r2 = 1.0   # observation noise variance
T = 100    # trajectory length / time horizon
q2_dB = 10*log10(q2)
r2_dB = 10*log10(r2)

# State and observation dynamics
F = torch.tensor([[1, 1], [0,  1]], dtype=dtype_)
G = torch.tensor([[0],[1]], dtype=dtype_)
H = torch.tensor([[1, 0],[0,1]], dtype=dtype_)
h_name = 'H_identity/'

m = F.size()[0] # state dim
p = G.size()[1] # input dim
n = H.size()[0] # output dim

# Process and measurement noise
Q = q2 * torch.eye(m, dtype=dtype_)
R = r2 *  torch.eye(n, dtype=dtype_)  

# Initial state
m1_0 = x0 = torch.tensor([10, 0], dtype=dtype_)
m2_0 = 0.*torch.eye(m) # covariance of initial state

# LQR stuff: cost matrices
scale = 1
qT = 1
qx = 1
qu = 0.1
QT = qT * torch.eye(m, dtype=dtype_) * scale
Qx = qx * torch.eye(m, dtype=dtype_) * scale
Qu = qu * torch.eye(p, dtype=dtype_) * scale

# Prior covariances for KalmanNet architecture 2
prior_Q = 1 * torch.eye(m, dtype=dtype_)
prior_Sigma = 1 * torch.eye(m, dtype=dtype_)
prior_S = 1 * torch.eye(n, dtype=dtype_)

###################
### True System ###
###################
a_deg = 20
a = a_deg / 180 * pi
Rot = torch.tensor([[cos(a), -sin(a)], [sin(a),  cos(a)]])
trueF = torch.matmul(Rot, F)
trueG = G
trueH = H

# Create system
true_sys = LinearSystem(trueF, trueG, trueH, Q, R)

# Create model
sys_model = SystemModelLQR(F, G, q2, H, r2, T, T, true_sys, prior_Q, prior_Sigma, prior_S)
sys_model.InitCostMatrices(QN, Qx, Qu)
sys_model.InitSequence(m1_0, m2_0)

model_data = (F,G,H,QN,Qx,Qu,trueF,trueG,trueH)

In [None]:
#################################
# Generate noise trajectories ###
#################################

N_train = 10000
N_val = 2000
N_test = 2000
m = sys_model.m
n = sys_model.n
p = sys_model.p
T = sys_model.T
T_test = sys_model.T_test


# # Allocate storage for data
# training_Q = torch.empty(N_train, m, T)
# training_R = torch.empty(N_train, n, T)
# validation_Q = torch.empty(N_val, m, T)
# validation_R = torch.empty(N_val, n, T)
# test_Q = torch.empty(N_test, m, T_test)
# test_R = torch.empty(N_test, n, T_test)

# # Distributions to sample from 
# distrib_Q = MultivariateNormal(loc=torch.zeros([m]), covariance_matrix=Q)
# distrib_R = MultivariateNormal(loc=torch.zeros([n]), covariance_matrix=R)

# for k in range(N_train):
#     for t in range(T):
#         q_sample = distrib_Q.rsample()
#         r_sample = distrib_R.rsample()
#         training_Q[k,:,t] = q_sample
#         training_R[k,:,t] = r_sample

# for k in range(N_val):
#     for t in range(T):
#         q_sample = distrib_Q.rsample()
#         r_sample = distrib_R.rsample()
#         validation_Q[k,:,t] = q_sample
#         validation_R[k,:,t] = r_sample

# for k in range(N_test):
#     for t in range(T_test):
#         q_sample = distrib_Q.rsample()
#         r_sample = distrib_R.rsample()
#         test_Q[k,:,t] = q_sample
#         test_R[k,:,t] = r_sample

# training_noise = (training_Q, training_R)
# validation_noise = (validation_Q, validation_R)
# test_noise = (test_Q, test_R)

# training_noise = (Qnoise[0], training_R)
# validation_noise = (Qnoise[1], validation_R)
# test_noise = (Qnoise[2], test_R)

# assert Qnoise[0].shape[0] == training_R.shape[0]

drive_knet = '/content/drive/MyDrive/Semesterproject 1/Code/KalmanNet_control/'
noise_path = drive_knet + 'final_experiments/toy_model/' + h_name 
data_name = f'noise_N_{N_train}_{N_val}_{N_test}_T_{T}_q2_{int(q2_dB)}dB_r2_{int(r2_dB)}dB.pt'
# torch.save((training_noise, validation_noise, test_noise), noise_path+data_name)

In [None]:
#########################
### Generate LQR Data ###
#########################

noise_data = torch.load(noise_path+data_name)

training_Q, training_R = noise_data[0]
validation_Q, validation_R = noise_data[1]
test_Q, test_R = noise_data[2]

N_train, _, T = training_Q.shape
N_val = validation_Q.shape[0]
N_test, _, T_test = test_Q.size()

X = torch.empty_like(training_Q)
Y = torch.empty_like(training_R)
U = torch.empty(N_train, p, T)
X0_train = torch.empty(N_train, m)
XT_train = torch.empty(N_train, m)

X_val = torch.empty_like(validation_Q)
Y_val = torch.empty_like(validation_R)
U_val = torch.empty(N_val, p, T)
X0_val = torch.empty(N_val, m)
XT_val = torch.empty(N_val, m)

X_test = torch.empty_like(test_Q)
Y_test = torch.empty_like(test_R)
U_test = torch.empty(N_test, p, T_test)
X0_test = torch.empty(N_test, m)
XT_test = torch.empty(N_test, m)

x0 = torch.tensor([100, 0], dtype=torch.float32)
xT = torch.tensor([0, 0], dtype=torch.float32)

# start = 200

for k in range(N_train):
    # x0_1 = torch.randint(-start, start, (1,)).item()
    # xT_1 = torch.randint(-start, start, (1,)).item()
    # x0 = torch.tensor([x0_1, 0], dtype=torch.float32)
    # xT = torch.tensor([x0_1 + xT_1, 0], dtype=torch.float32)
    sys_model.GenerateSequence_with_my_noise(xT, T, training_Q[k], training_R[k], x0=x0, model=True, steady_state=False)
    Y[k] = sys_model.y
    U[k] = sys_model.u
    X[k] = sys_model.x
    X0_train[k] = x0
    XT_train[k] = xT


for k in range(N_val):
    # x0_1 = torch.randint(-start, start, (1,)).item()
    # xT_1 = torch.randint(-start, start, (1,)).item()
    # x0 = torch.tensor([x0_1, 0], dtype=torch.float32)
    # xT = torch.tensor([x0_1 + xT_1, 0], dtype=torch.float32)
    sys_model.GenerateSequence_with_my_noise(xT, T, validation_Q[k], validation_R[k], x0=x0, model=True, steady_state=False)
    Y_val[k] = sys_model.y
    U_val[k] = sys_model.u
    X_val[k] = sys_model.x
    X0_val[k] = x0
    XT_val[k] = xT


for k in range(N_test):
    # x0_1 = torch.randint(-start, start, (1,)).item()
    # xT_1 = torch.randint(-start, start, (1,)).item()
    # x0 = torch.tensor([x0_1, 0], dtype=torch.float32)
    # xT = torch.tensor([x0_1 + xT_1, 0], dtype=torch.float32)
    sys_model.GenerateSequence_with_my_noise(xT, T, test_Q[k], test_R[k], x0=x0, model=True, steady_state=False)
    Y_test[k] = sys_model.y
    U_test[k] = sys_model.u
    X_test[k] = sys_model.x
    X0_test[k] = x0
    XT_test[k] = xT


training_data = (Y, U, X, X0_train, XT_train)
validation_data = (Y_val, U_val, X_val, X0_val, XT_val)
test_data = (Y_test, U_test, X_test, X0_test, XT_test)

# Y_U_X_path = drive_knet + 'Experiments/double_integrator/' + h_name + f'model_mismatch/F_1_2_deg/'
# Y_U_X_name = f'y_u_x_x0_100_N_{N_train}_{N_val}_{N_test}_T_{T}_q2_{int(q2_dB)}dB_r2_{int(r2_dB)}dB.pt'
# torch.save((training_data, validation_data, test_data, model_data), Y_U_X_path + Y_U_X_name)

In [None]:
#####################################
### Generate Data with no control ###
#####################################

noise_data = torch.load(noise_path+data_name)

training_Q, training_R = noise_data[0]
validation_Q, validation_R = noise_data[1]
test_Q, test_R = noise_data[2]

N_train, _, T = training_Q.shape
N_val = validation_Q.shape[0]
N_test, _, T_test = test_Q.size()

# N_train = 20000
# N_val = 2000
# N_test = 2000
T = 30
T_test = T

# X = torch.empty_like(training_Q)
# Y = torch.empty_like(training_R)
X = torch.empty(N_train, m, T)
Y = torch.empty(N_train, n, T)
U = torch.zeros(N_train, p, T) #torch.empty(N_train, p, T)
X0_train = torch.empty(N_train, m)

X_val = torch.empty(N_val, m, T)
Y_val = torch.empty(N_val, n, T)
U_val = torch.zeros(N_val, p, T) #torch.empty(N_val, p, T)
X0_val = torch.empty(N_val, m)

X_test = torch.empty(N_test, m, T)
Y_test = torch.empty(N_test, n, T)
U_test = torch.zeros(N_test, p, T) #torch.empty(N_test, p, T_test)
X0_test = torch.empty(N_test, m)

x0 = torch.tensor([0, 0], dtype=torch.float32)

# start = 200

for k in range(N_train):
    sys_model.GenerateSequence_without_lqr(x0, T, U[k], training_Q[k], training_R[k])
    Y[k] = sys_model.y
    # U[k] = sys_model.u
    X[k] = sys_model.x
    X0_train[k] = x0

for k in range(N_val):
    sys_model.GenerateSequence_without_lqr(x0, T, U_val[k], validation_Q[k], validation_R[k])
    Y_val[k] = sys_model.y
    # U_val[k] = sys_model.u
    X_val[k] = sys_model.x
    X0_val[k] = x0


for k in range(N_test):
    sys_model.GenerateSequence_without_lqr(x0, T,  U_test[k], test_Q[k], test_R[k])
    Y_test[k] = sys_model.y
    # U_test[k] = sys_model.u
    X_test[k] = sys_model.x
    X0_test[k] = x0


training_data = (Y, U, X, X0_train)
validation_data = (Y_val, U_val, X_val, X0_val)
test_data = (Y_test, U_test, X_test, X0_test)

# Y_U_X_path = drive_knet + 'final_experiments/toy_model/' + h_name 
# Y_U_X_name = f'y_u_x_no_control_F_20_{N_train}_{N_val}_{N_test}_T_{T}_q2_{int(q2_dB)}dB_r2_{int(r2_dB)}dB.pt'
# torch.save((training_data, validation_data, test_data, model_data), Y_U_X_path + Y_U_X_name)

In [None]:
##############################
### Generate Data with LQG ###
##############################
# used for model mismatch and offline training

noise_data = torch.load(noise_path+data_name)

training_Q, training_R = noise_data[0]
validation_Q, validation_R = noise_data[1]
test_Q, test_R = noise_data[2]

N_train, _, T = training_Q.shape
N_val = validation_Q.shape[0]
N_test, _, T_test = test_Q.size()

# N_train = 20000
# N_val = 2000
# N_test = 2000
T = 100
T_test = T

# X = torch.empty_like(training_Q)
# Y = torch.empty_like(training_R)
X = torch.empty(N_train, m, T)
Y = torch.empty(N_train, n, T)
U = torch.empty(N_train, p, T) #torch.empty(N_train, p, T)
X0_train = torch.empty(N_train, m)

X_val = torch.empty(N_val, m, T)
Y_val = torch.empty(N_val, n, T)
U_val = torch.empty(N_val, p, T) #torch.empty(N_val, p, T)
X0_val = torch.empty(N_val, m)

X_test = torch.empty(N_test, m, T)
Y_test = torch.empty(N_test, n, T)
U_test = torch.empty(N_test, p, T) #torch.empty(N_test, p, T_test)
X0_test = torch.empty(N_test, m)

x0 = torch.tensor([10, 0], dtype=torch.float32)
xT = x0*0.0

# start = 200

for k in range(N_train):
    sys_model.GenerateSequence_with_LQG(x0, T, training_Q[k], training_R[k], xT=xT)
    Y[k] = sys_model.y
    U[k] = sys_model.u
    X[k] = sys_model.x
    X0_train[k] = x0

for k in range(N_val):
    sys_model.GenerateSequence_with_LQG(x0, T, validation_Q[k], validation_R[k], xT=xT)
    Y_val[k] = sys_model.y
    U_val[k] = sys_model.u
    X_val[k] = sys_model.x
    X0_val[k] = x0


for k in range(N_test):
    sys_model.GenerateSequence_with_LQG(x0, T, test_Q[k], test_R[k], xT=xT)
    Y_test[k] = sys_model.y
    U_test[k] = sys_model.u
    X_test[k] = sys_model.x
    X0_test[k] = x0


training_data = (Y, U, X, X0_train)
validation_data = (Y_val, U_val, X_val, X0_val)
test_data = (Y_test, U_test, X_test, X0_test)

# Y_U_X_path = drive_knet + 'final_experiments/toy_model/' + h_name 
# Y_U_X_name = f'y_u_x_LQG_control_F_20_N_{N_train}_{N_val}_{N_test}_T_{T}_q2_{int(q2_dB)}dB_r2_{int(r2_dB)}dB.pt'
# torch.save((training_data, validation_data, test_data, model_data), Y_U_X_path + Y_U_X_name)