## AAE 666 Project
Author: Vishnu Vijay

Description: AAE 666 Project Files and Implementation

Date: Created - 4/21/24

In [1]:
#   IMPORT: Public Library
import numpy as np
import scipy as sp
import cvxpy as cp
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

#   IMPORT: Personal Library
from dubins2d_agent import Dubins2D

In [2]:
##  CLASS: EDMD Loss Class for State Network
class BilinearEDMD_Loss(nn.Module):
    def __init__(self, num_lifted, num_states, model, X, Xn, U, w1=0.5, w2=0.5):
        super(BilinearEDMD_Loss, self).__init__()
        
        self.num_lifted = num_lifted
        self.num_states = num_states
        self.model = model

        self.X = torch.t(torch.from_numpy(X).to(torch.float32))
        self.Xn = torch.t(torch.from_numpy(Xn).to(torch.float32))
        self.U = (torch.from_numpy(U).to(torch.float32))

        self.num_data = np.prod(self.U.size())

        self.w1 = w1
        self.w2 = w2

        self.A = np.zeros((num_lifted, num_lifted))
        self.B = np.zeros((num_lifted, num_lifted))
        self.C = torch.ones((self.num_states, self.num_lifted))


    def forward(self, model_output, X_states):
        # State Reconstruction Loss
        loss1 = torch.linalg.matrix_norm(self.C@model_output - X_states)
        print(loss1)

        # Prediction Loss for Zn = D*Z + B*Z*U
        Z = torch.t(self.model(self.X))
        Zn = torch.t(self.model(self.Xn))
        D_cp = cp.Variable((self.num_lifted, self.num_lifted))
        B_cp = cp.Variable((self.num_lifted, self.num_lifted))
        err = 0

        for i in range(self.X.shape[1]):
            z = Z[:, i].detach().numpy()
            zn = Zn[:, i].detach().numpy()
            u = self.U[:, i].detach().numpy()

            err += cp.norm(zn - (D_cp @ z + B_cp @ z * u))

        objective = cp.Minimize( err )
        problem = cp.Problem(objective, [])
        problem.solve()
        assert problem.status == cp.OPTIMAL
        print("D: \n", D_cp.value)
        print("B: \n", B_cp.value)
        self.D = torch.from_numpy(D_cp.value).to(torch.float32)
        self.B = torch.from_numpy(B_cp.value).to(torch.float32)
        
        loss2 = torch.linalg.matrix_norm(Zn - self.D @ Z - self.B @ Z * self.U)

        # Total Loss
        self.total_loss = (self.w1*loss1 + self.w2*loss2)
        self.avg_loss = torch.div(self.total_loss, self.num_data)
        return self.total_loss

In [3]:
##  CLASS: Custom Deep Neural Network
class CustomNN(nn.Module):
    def __init__(self, input_dim, output_dim, num_hidden_layers):
        super(CustomNN, self).__init__()

        self.input_layer = nn.Linear(input_dim, 128)
        self.hidden_layers = [] * num_hidden_layers
        for i, layer in enumerate(self.hidden_layers):
            self.hidden_layers[i] = nn.Linear(128, 128)
        self.output_layer = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.input_layer(x))
        for i, layer in enumerate(self.hidden_layers):
            x = F.relu(layer(x))
        x = F.relu(self.output_layer(x))
        return x

In [4]:
#   INITIALIZE: Simulation Parameters
n = 3
m = 1
dt = 0.2
sim_len = 2000
traj_num = 1000
num_data = sim_len * traj_num


#   INITIALIZE: Bilinear EDMD Parameter
num_lifted = 6
hidden_layers = 2
hidden_nodes = 128
num_epochs = 5

In [5]:
#   INITIALIZE: Random Walk Car Controller
def random_walk(x, u):
    du = np.random.randint(low=-1, high=2)
    u += du
    return u
speed = 1


#   SIMULATE: Generate Trajectories for System Identification
X = np.zeros((n, sim_len*traj_num))
Xn = np.zeros((n, sim_len*traj_num))
U = np.zeros((1, sim_len*traj_num))

step = 0
for i in range(traj_num):
    theta0 = np.pi * (np.random.random() - 0.5)
    y0 = 1_000 * (np.random.random() - 0.5)
    z0 = 1_000 * (np.random.random() - 0.5)
    x = np.array([[y0, z0, theta0]]).T

    car_rand = Dubins2D(0, speed, x, random_walk, dt)
    for i in range(sim_len):
        X[:, step] = x.flatten()
        x = car_rand.iterate_single()
        U[:, step] = car_rand.input
        Xn[:, step] = x.flatten()
        step += 1


#   SPLIT: Split the data into training and testing data
training_split = round(num_data * 0.8)
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X)
# Xn_scaled = scaler.fit_transform(Xn)

X_train = X[:, 0:training_split]
Xn_train = Xn[:, 0:training_split]
U_train = U[:, 0:training_split]

X_test = X[:, training_split:]
Xn_test = Xn[:, training_split:]
U_test = U[:, training_split:]

In [6]:
#   IDENTIFY: System ID with DMDc
# A = None
# B = None
# A_cp = cp.Variable((n, n))
# B_cp = cp.Variable((n, m))
# err = 0
# for i in range(traj_num*sim_len):
#     x = X[:, i]
#     xn = Xn[:, i]
#     u = U[:, i]
#     err += cp.norm(xn - (A_cp @ x + B_cp @ u))
# objective = cp.Minimize( err )
# problem = cp.Problem(objective, [])
# problem.solve()
# if problem.status == cp.OPTIMAL:
#     print("SOLVED")
#     A = A_cp.value
#     B = B_cp.value

In [7]:
def get_accuracy(Z, Zn, U, criterion):
    # Extract Criterion variables
    D = criterion.D
    B = criterion.B
    w1 = criterion.w1
    w2 = criterion.w2

    C = np.zeros((n, num_lifted))

    # State Reconstruction Loss
    loss1 = np.linalg.norm(C@Z - X)

    # Prediction Loss for Zn = D*Z + B*Z*U
    loss2 = np.linalg.norm(Zn - D @ Z - B @ Z * U)

    # Total Loss
    total_loss = w1*loss1 + w2*loss2
    return total_loss

In [8]:
#   IDENTIFY: System ID step with EDMD with DNN
w1 = 0.5
w2 = 0.5
learning_rate = 1000

model = CustomNN(n, num_lifted, hidden_layers)
criterion = BilinearEDMD_Loss(num_lifted, n, model, X_train, Xn_train, U_train, w1, w2)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

train_loss=[]
train_accuracy=[]
test_accuracy=[]
X_train_tens = torch.t(torch.from_numpy(X_train).to(torch.float32))
Xn_train_tens = torch.t(torch.from_numpy(Xn_train).to(torch.float32))
X_test_tens = torch.t(torch.from_numpy(X_test).to(torch.float32))
Xn_test_tens = torch.t(torch.from_numpy(Xn_test).to(torch.float32))

for epoch in range(num_epochs):

    #forward feed
    Z_train = torch.t(model(X_train_tens))
    Zn_train = torch.t(model(Xn_train_tens))
    
    this_train_accuracy = 0 # get_accuracy(Z_train, Zn_train, U_train, criterion)
    train_accuracy.append(this_train_accuracy)

    #calculate the loss
    loss = criterion(Z_train, torch.t(X_train_tens))
    train_loss.append(loss.item())

    #clear out the gradients from the last step loss.backward()
    optimizer.zero_grad()

    #backward propagation: calculate gradients
    loss.backward()

    #update the weights
    optimizer.step()

    with torch.no_grad():
        Z_test = torch.t(model(X_test_tens))
        Zn_test = torch.t(model(Xn_test_tens))
        this_test_accuracy = 0 # get_accuracy(Z_test, Zn_test, U_train, criterion.D, criterion.B)
        test_accuracy.append(this_test_accuracy)

    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {loss.item():.4f},",
                f"Train Accuracy: {sum(train_accuracy)/len(train_accuracy):.2f},", 
                f"Test Accuracy: {sum(test_accuracy)/len(test_accuracy):.2f}")

print(criterion.D)
print(criterion.B)

tensor(4613123.5000, grad_fn=<LinalgVectorNormBackward0>)
D: 
 [[-4.95839165 -0.         -0.         -0.          3.85785498  2.71034332]
 [-0.         -0.         -0.         -0.         -0.         -0.        ]
 [-0.         -0.         -0.         -0.         -0.         -0.        ]
 [-0.         -0.         -0.         -0.         -0.         -0.        ]
 [-4.46988201 -0.         -0.         -0.          3.89430491  2.03283886]
 [ 3.71653945 -0.         -0.         -0.         -2.40480426 -0.6922566 ]]
B: 
 [[-0. -0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0. -0.]
 [-0. -0. -0. -0. -0. -0.]]
tensor(7.0124e+14, grad_fn=<LinalgVectorNormBackward0>)
D: 
 [[-0.00000000e+00 -0.00000000e+00 -0.00000000e+00 -0.00000000e+00
  -0.00000000e+00 -0.00000000e+00]
 [-0.00000000e+00 -0.00000000e+00 -0.00000000e+00 -0.00000000e+00
  -0.00000000e+00 -0.00000000e+00]
 [-0.00000000e+00 -0.00000000e+00 -0.00000000e+00 -0.00

In [9]:
#   INITIALIZE: Reference Trajectory
ref = lambda k : np.array([[-k*np.sin(3*k/100)],
                           [-k*np.cos(3*k/100)],
                           [3*k/100]])

#   