# Machine learning using EKF

In [3]:
from IPython import display
import os
import random
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd
from sklearn.model_selection import train_test_split
import seaborn as sns

# Importing Pytorch libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from  sklearn.datasets import make_regression
from sklearn.datasets import load_boston

In [4]:
# Select device which you are going to use for training
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
print(device)

cpu


### Import Data Sets
Testing using a toy sine data

In [5]:
# Let us generate toy data
np.random.seed(2)
# X,y = make_regression(n_samples=800,n_features=100)
X,y = load_boston(return_X_y=True)
print(X.shape)
print(y.shape)

(506, 13)
(506,)


In [6]:
## Use MNIST data set
# import packages.mnist.mnist_loader as mnist_loader
# training_data, validation_data, test_data = mnist_loader.load_data_wrapper()
# training_data = list(training_data)

## Data Partition


In [7]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=.75)
print(X_train.shape)
print(X_test.shape)
# x_train_scaled = scaler.fit_transform(x_train)
# x_test_scaled = scaler.transform(x_test)

(126, 13)
(380, 13)


## Define Neural network

In [12]:
class MLP(nn.Module):
    def __init__(self, n_inputs, n_hidden_layer, n_outputs,bias=True):
        super(MLP, self).__init__()
        # YOUR CODE HERE
        #raise NotImplementedError()
        self.fc1 = nn.Linear(n_inputs, n_hidden_layer, bias)
        self.fc2 = nn.Linear(n_hidden_layer, n_hidden_layer, bias)
        #self.fc3 = nn.Linear(n_hidden_layer, n_hidden_layer, bias)
        self.fc4 = nn.Linear(n_hidden_layer, n_outputs, bias)
        

    def forward(self, x):
        # YOUR CODE HERE
        #raise NotImplementedError()
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        #x = torch.tanh(self.fc3(x))
        x = self.fc4(x)
        return x    

In [13]:
## Testing net
n_inputs = X.shape[1]
n_outputs = 1
n_hidden_layer = 100
test_net = MLP(n_inputs, n_hidden_layer, n_outputs)
print(test_net)

MLP(
  (fc1): Linear(in_features=13, out_features=100, bias=True)
  (fc2): Linear(in_features=100, out_features=100, bias=True)
  (fc3): Linear(in_features=100, out_features=100, bias=True)
  (fc4): Linear(in_features=100, out_features=1, bias=True)
)


In [14]:
# Define number of Input and Output layers
torch.set_default_dtype(torch.float64)
n_inputs = X.shape[1]
n_outputs = 1
n_hidden_layer = 100
mlp = MLP(n_inputs,n_hidden_layer, n_outputs)
mpl = mlp.to(device)
optimizer = torch.optim.Adam(mlp.parameters(), lr=0.005)
n_epochs = 100

train_accuracy_history = []
test_accuracy_history = []

x_train = torch.tensor(X_train, device=device, dtype=torch.float64)
y_train = torch.tensor(y_train, device=device, dtype=torch.float64)

ceLoss = nn.MSELoss()



for epoch in range(n_epochs):
    
    #Set gradients as zero
    mlp.zero_grad()
    #Calculate outputs with forward()
    outputs = mlp(x_train)   
    
    #calculate loss
    loss = ceLoss(outputs, y_train)
#     print(f"The output is :{outputs.item()} expected is :{y_train}")
    #calculate gradients - calling backward()
    loss.backward()
    
    #update weights using optimizer
    optimizer.step()

  from ipykernel import kernelapp as app


In [15]:
from sklearn.metrics import mean_absolute_error
with torch.no_grad():
    x_test = torch.tensor(X_test, device=device, dtype=torch.float64)
    y_pred = mlp.forward(x_test)
    y_pred = y_pred.cpu().data.numpy()
    error = mean_absolute_error(y_test,y_pred)
    print(error)

6.833590994795237


## Using EKF for learning 

In [8]:
# #Calculate Weight size
# weight_mat_size = 0
# for i in range(len(layer_list)-1):
#     weight_mat_size = weight_mat_size + (layer_list[i]*layer_list[i+1])
        
def getWeights(net):
    weight_mat = []
    for name,param in net.named_parameters():
    
        #print('Layer',name, 'is', param.data.shape)
        if (len(list(param.data.shape)) == 2):
            weight_mat.append(param.data.flatten())
        
    weight_mat = torch.cat(weight_mat, dim=0)       
    #print('Shape of weight matrix', weight_mat.shape)
    return weight_mat.view(-1, 1)

def getWeightsgrad(net):
    weight_grad_mat = []
    for name,param in net.named_parameters():
        #print('Layer Grads',name, 'is', param.grad.shape)
        if (len(list(param.grad.shape)) == 2):
            weight_grad_mat.append(param.grad.flatten())
    weight_grad_mat = torch.cat(weight_grad_mat)       
    #print('Shape of weight matrix', weight_grad_mat.shape)   
    return weight_grad_mat.view(-1, 1)

def setWeights(net, weight_mat):
    mem_ind = 0;
    for name,param in net.named_parameters():
        if (len(list(param.data.shape)) == 2):
            #print('Layer',name, 'is', param.data.shape)
            #print(torch.numel(param.data))
            #print(weight_mat[mem_ind:mem_ind+torch.numel(param.data)].view(param.data.shape).shape)
            param.data = weight_mat[mem_ind:mem_ind+torch.numel(param.data)].view(param.data.shape)
            mem_ind = torch.numel(param.data)
    


In [17]:
# Define number of Input and Output layers
torch.set_default_dtype(torch.float64)
n_inputs = X_train.shape[1]
n_outputs = 1
n_hidden_layer = 20
mlp_EKF = MLP(n_inputs,n_hidden_layer, n_outputs, bias = False)
mlp_EKF = mlp_EKF.to(device)
n_epochs = 1


# Define EKF covariances
weight_mat = getWeights(mlp_EKF).to(device)
print(f"Shape of W:{weight_mat.shape}")
# System Noise or also known as training  noise  
Q = 1e-6*torch.eye(weight_mat.shape[0],device=device, dtype=torch.float64)
# Measurement noise or noise in targets 
R = 10*torch.eye(n_outputs,device=device, dtype=torch.float64)
#Covariance Matrix
P = 100*torch.eye(weight_mat.shape[0],device=device, dtype=torch.float64)
print(f"Shape of P:{P.shape}")

print(f"network {mlp_EKF}")
print(weight_mat.shape)



Shape of W:torch.Size([2420, 1])
Shape of P:torch.Size([2420, 2420])
network MLP(
  (fc1): Linear(in_features=100, out_features=20, bias=False)
  (fc2): Linear(in_features=20, out_features=20, bias=False)
  (fc4): Linear(in_features=20, out_features=1, bias=False)
)
torch.Size([2420, 1])


In [18]:
x_train = torch.tensor(X_train, device=device, dtype=torch.float64)
y_train = torch.tensor(y_train, device=device, dtype=torch.float64)

ceLoss = nn.MSELoss(reduction='none')

# Plotting before learning 

for epoch in range(n_epochs):
    
    outputs = [] 
    #Calling Backward for each sample
    for i in range((x_train.shape[0])):
        output = mlp_EKF(x_train[i])
        outputs.append(output)
        #print("Update for Batch",i)
        mlp_EKF.zero_grad()
        output.backward(torch.ones_like(output))
        #calculate loss
        loss = (y_train[i]-output).view(1,-1)
        H = getWeightsgrad(mlp_EKF).to(device).view(1,-1)
        #update weights using EKF filter Update
        intermediate = torch.mm(torch.mm(H, P), torch.t(H))
#         print(f"shape of intermediate {intermediate.shape}")
#         print(f"intermediate:{intermediate+R}")
        Ak = torch.inverse(R + intermediate)
#         print(f"shape of Ak:{Ak.shape}")
#         print(f"Ak: {Ak}")
        #Ak = torch.ones(1).view(1,1).to(device)
        Kk = torch.mm(torch.mm(P, torch.t(H)), Ak)
#         print(f"Kk matrix: {Kk}")
        #Kk = torch.ones((120,1)).to(device)
        #print(weight_mat.shape)
        #print("torch.inverse(Shape of Kalman gain",Kk)
        #print("Shape of loss",loss.shape)
        #print("Shape of mul term",torch.mm(Kk, loss).shape)
        #print("Weight Mat Shape",weight_mat)
        weight_mat = getWeights(mlp_EKF).to(device) + torch.mm(Kk, loss)
        #print("Weight Mat Shape",weight_mat.shape)
        P = P + Q - torch.mm(torch.mm(Kk,H),P)
#         print(f"Updated P:{P}")
        setWeights(mlp_EKF,weight_mat)
#     print(f"Epoch loss:{loss}")  
    #print("Weight",weight_mat)
        

    
# display.clear_output(wait=True)

In [11]:
from sklearn.metrics import mean_absolute_error
with torch.no_grad():
    x_test = torch.tensor(X_test, device=device, dtype=torch.float64)
    y_pred = mlp_EKF.forward(x_test)
    y_pred = y_pred.cpu().data.numpy()
    error = mean_absolute_error(y_test,y_pred)
    print(error)

175.0898134299245


# Appendix