# Tuning A Neural Network Using SIF vs KF (Classification Task)

In [1]:
# Importing global modules
from pprint import pformat
from sklearn import datasets
import pandas as pd
import numpy as np
import keras
import matplotlib.pyplot as plt
from filterpy.kalman import (
    KalmanFilter,
    UnscentedKalmanFilter,
    MerweScaledSigmaPoints,
    unscented_transform,
)
from keras.models import Sequential
from keras.layers import Dense, Dropout
import math
import os
import time
import logging
from sklearn.metrics import accuracy_score
from keras.callbacks import Callback

# import matlab.engine
from io import StringIO
import pdb
import tensorflow as tf
import random

In [2]:
# Tracking of weight records of every epochs
class EpochInfoTracker(Callback):
    def __init__(self):
        self.weights_history = []  # Tracking the weights in each epochs

    def on_epoch_end(self, epoch, logs=None):
        weights_vec = get_weights_vector(self.model)
        self.weights_history.append(weights_vec)


# Class for storing the necessary parameters
class Params:
    pass

## Loading Iris Dataset

In [3]:
iris = datasets.load_iris()  # Load iris dataset

# Create X and y of dataframe
X = iris.data[:, :4]                         # X dataset
y = np.asarray(pd.get_dummies(iris.target))  # y dataset

print("X dataset shape:", X.shape)
print("y dataset shape:", y.shape)

X dataset shape: (150, 4)
y dataset shape: (150, 3)


In [4]:
from sklearn.model_selection import train_test_split

# Prepare training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=5)
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape :", X_test.shape)
print("y_test shape :", y_test.shape)

X_train shape: (105, 4)
y_train shape: (105, 3)
X_test shape : (45, 4)
y_test shape : (45, 3)


## Initialize Essential Functions for the Algorithm

In [5]:
# --------------------Initialization of necessary functions-----------------
# Create a simple feedforward neural network
def create_neural_net(M):
    """
    M: input dimension of the neural network
    """

    # Build a simple neural network
    ann = Sequential()
    ann.add(Dense(1, input_dim=M, activation="relu"))
    ann.add(Dense(3, activation="softmax"))
    ann.compile(optimizer="sgd", loss="categorical_crossentropy", metrics="accuracy")

    # Print out the summary of the model
    ann.summary()

    return ann


# Get weights of the neural network model
def get_weights_vector(model):
    weights = model.get_weights()
    # logging.info(weights)
    weights_vec = []
    for w_mat in weights:
        weights_vec.extend(w_mat.reshape(w_mat.size))

    weights_vec = np.array(weights_vec)
    return weights_vec


# Set weights of the neural network model
def set_weights(model, weights_vec):
    prev_weights = model.get_weights()
    # logging.info(prev_weights)
    new_weights = []
    start = 0

    for prev_w_mat in prev_weights:
        end = start + prev_w_mat.size
        new_w_mat = np.array(weights_vec[start:end]).reshape(prev_w_mat.shape)
        new_weights.append(new_w_mat)
        start = end

    model.set_weights(new_weights)

## SIF (Step by step)

Initialization of neural networks

In [7]:
n_samples = X_train.shape[1]                    # Setting training set length

# Create ANN, get its initial weights
sif_ann = create_neural_net(X_train.shape[1])   # Create a neural net model
w_init = get_weights_vector(sif_ann)            # Get weights from neural nets
num_weights = w_init.shape[0]                   # Number of weights inside the neural network

z_true_series = y_train                         # Set the labels as your true series

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 1)                 5         
_________________________________________________________________
dense_1 (Dense)              (None, 3)                 6         
Total params: 11
Trainable params: 11
Non-trainable params: 0
_________________________________________________________________


Getting and setting initial SIF parameters

In [8]:
# -----------Initalize & Pre-allocate SIF Variables--------------
x = w_init                                                                  # Set the initial NN weights
n = x.shape[0]                                                              # Number of States (11 parameters)
m = z_true_series.shape[0]                                                  # Number of measurements (105 training samples)

delta = np.random.uniform(low=0.0009, high=0.9, size=(X_train.shape[0]))    # Assign delta values for each data point (105,1)
sat = np.zeros((m, m))                                                      # Pre-allocation of saturation matrix
C = np.ones((X_train.shape[0], n))                                          # Pre-allocation of measurement matrix
                                                     #
w = np.zeros((x.shape))
x = get_weights_vector(sif_ann)

## Prediction Stage  

Eq.(3.3):  $$ \hat{z}_{k+1|k}  =  z_{k+1|k}  - h  (\hat{x}_{k+1|k}) $$ 

<br></br>

* Predicted innovation $ \hat{z}_{k+1|k} $: pred_innov

* True values $ z_{k+1|k} $:  z_true_series  

* Measurement matrix $ \hat{x}_{k+1|k} $: prediction

* Nonlinear measuement function $h()$: sif_ann.predict()


In [9]:
preds_softmax = sif_ann.predict(X_train)                                        # NN_model (weights)

masked_output = z_true_series.max(axis=1, keepdims=1) == z_true_series          # Initialize the masking array which returns the true class

predict_output = preds_softmax[masked_output]                                   # Slice the softmax value of the true class
z_output = z_true_series[masked_output]                                         # Slice the true class vlaues

pred_innov = z_output - predict_output                                          # Compute the predicted innovation matrix
print("Predicted innovation's shape: ", pred_innov.shape)                                                         # Print the predicted innovation matrix's shape


accuracy = accuracy_score(z_true_series.argmax(axis=1), preds_softmax.argmax(axis=1)) # Compute the accuracy
print("Accuracy: ", accuracy)

Predicted innovation's shape:  (105,)
Accuracy:  0.3333333333333333


### Computation of the SIF Gain ($K_{k+1}$):
<br></br>
Eq.(3.4) : $$K_{k+1} = C^{+} \overline{sat} (|\hat{z}_{k+1|k}| / \delta)$$

* $\overline{sat}$ refers to the diagonal of the saturation term, sat refers to the saturation of a value (yields a result between 1 and -1)

* Note that $C^{+}$ refers to the pseudoinverse of the measurement matrix

In [10]:
# TODO: Initialize it properly. (Time-variant delta)
delta = np.random.uniform(low=0.0009, high=0.9, size=(X_train.shape[0]))


# neural_net(|Z| ./ delta)

m = 105                                             # Length of the training samples
#------------- saturation computation ----------------
for i in range(1, m):
    if (abs(pred_innov[i]) / delta[i]) >= 1:        # If the value is greater than 1 keep it as 1
        sat[i][i] = 1

    elif (abs(pred_innov[i]) / delta[i]) <= -1:     # If the value is lower than -1 keep it as -1
        sat[i][i] = -1 

    else:
        sat[i][i] = abs(pred_innov[i]) / delta[i]   # If the value is between -1 and 1 keep it as it is
#-----------saturation computation-----------------

pinvC = np.linalg.pinv(C)                           # Pseudo inverse of C (105,11)
K = np.dot(pinvC, sat)                              # Calculation of SIF gain

print("SIF gain shape: ", K.shape)                  # SIF_gain (4,11)       

SIF gain shape:  (11, 105)


In [12]:
print(K.shape)

(11, 105)


Computation of State Estimate ($\hat{x}_{k+1|k+1}$):

Eq. (3.5): $$\hat{x}_{k+1|k+1} = \hat{x}_{k+1|k} +  (K_{k+1})(\hat{z}_{k+1|k})$$



In [13]:
w_init = get_weights_vector(sif_ann)                # Get the initial neuron weights
x = w_init                                          # Set them as our measurements

x = x + np.dot(K, pred_innov)                       # Compute the state estimate

print("Previous weights: \n", w_init)
print("Updated weights: \n", x)

Previous weights: 
 [ 0.04405022 -0.8271221  -0.9231755  -0.9758304   0.          0.70051146
 -0.14064014 -0.26801395  0.          0.          0.        ]
Updated weights: 
 [ 0.10203806 -0.76913424 -0.86518767 -0.91784253  0.05798785  0.7584993
 -0.08265229 -0.21002611  0.05798785  0.05798785  0.05798785]


## Genetic Algorithm (without crossover & mutation)

In [14]:
# -----------------Genetic Algorithm------------
accuracy_GA = []                                    # Accuracy values of each individual
weights_GA = []                                     # Weights of each individual

# Genetic Algorithm loop
# In each iteration a random weights are assigned and evaluated based on their accuracy. 
epoch = 0
for jj in range(100):

    weights_GA.append(get_weights_vector(sif_ann))  # Store the weights vector
    accuracy_GA.append(accuracy)                    # Store the accuracy values
    ##################################################

    # SIF Predicition Stage
    preds_softmax = sif_ann.predict(X_train)                                 # NN_model (weights)

    masked_output = z_true_series.max(axis=1, keepdims=1) == z_true_series          # Initialize the masking array which returns the true class

    predict_output = preds_softmax[masked_output]                                   # Slice the softmax value of the true class
    z_output = z_true_series[masked_output]                                         # Slice the true class vlaues

    pred_innov = z_output - predict_output                                          # Compute the predicted innovation matrix


    #-------------saturation computation----------------
    for i in range(1, m):
        if (abs(pred_innov[i]) / delta[i]) >= 1:        # If the value 
            sat[i][i] = 1

        elif (abs(pred_innov[i]) / delta[i]) <= -1:
            sat[i][i] = -1 

        else:
            sat[i][i] = abs(pred_innov[i]) / delta[i]
    #-----------saturation computation-----------------


    pinvC = np.linalg.pinv(C)
    K = np.dot(pinvC, sat)
    x = np.asarray(
        [xx * random.uniform(0.001, 1) for xx in x]
    )  # Randomly initialize weights

    was = np.reshape(np.dot(K, pred_innov), x.shape)
    x = x + was  # NEED TO CHECK THIS LINE

    set_weights(sif_ann, x)

    preds = sif_ann.predict(X_train)
    preds = np.argmax(preds, axis=1)
    accuracy = accuracy_score(z_true_series.argmax(axis=1), preds)
    print(f"\nNN paramaters no:{jj}\naccuracy: {accuracy}", "\nparams :", get_weights_vector(sif_ann))

    accuracy_GA.append(accuracy)
    weights_GA.append(x)
    # print(accuracy_GA)

    # Genetic Algorithm
    if jj == 99:
        # print(accuracy_GA[accuracy_GA.index(min(accuracy_GA))])

        set_weights(sif_ann, weights_GA[accuracy_GA.index(max(accuracy_GA))])
        preds = sif_ann.predict(X_train)
        set_weights(sif_ann, weights_GA[accuracy_GA.index(max(accuracy_GA))])


NN paramaters no:0
accuracy: 0.3333333333333333 
params : [ 0.08839896 -0.15627757  0.03143298 -0.29322362  0.08861101  0.51494217
 -0.01289834 -0.15050404  0.1103749   0.06905197  0.10099892]

NN paramaters no:1
accuracy: 0.3333333333333333 
params : [ 0.0925796  -0.03541391  0.0760443  -0.21700409  0.14131665  0.51293486
  0.05145787 -0.0356743   0.15239458  0.09704424  0.12949026]

NN paramaters no:2
accuracy: 0.3333333333333333 
params : [ 0.06394213  0.05401875  0.11915083 -0.10372134  0.09941566  0.21263243
  0.09689748  0.04335486  0.20355703  0.13945226  0.1654558 ]

NN paramaters no:3
accuracy: 0.3333333333333333 
params : [0.09685257 0.09605999 0.07860827 0.03928357 0.0817469  0.20694198
 0.137129   0.08775541 0.2206449  0.15340357 0.107708  ]

NN paramaters no:4
accuracy: 0.3333333333333333 
params : [0.08373327 0.12666568 0.06151841 0.09096002 0.13833435 0.14499283
 0.06694549 0.14063482 0.24036655 0.17432292 0.11737414]

NN paramaters no:5
accuracy: 0.3333333333333333 
pa

In [15]:
print("First Accuracy: ", accuracy_GA[0])
print("After 100 random weights and SIF approximation...")
print("Best Accuracy: ", np.max(accuracy_GA))

First Accuracy:  0.3333333333333333
After 100 random weights and SIF approximation...
Best Accuracy:  0.5619047619047619


## Main

In [36]:
def SIF_ANN(w_init, max_epoch=1000):
    # -------------------------------------------
    # Setting parameters
    n_samples = X_train.shape[1]                       # Setting training series length

    # Create ANN, get its initial weights
    sif_ann = create_neural_net(X_train.shape[1])      # Create a neural net model
    num_weights = w_init.shape[0]                      # Number of weights inside the neural network

    # ---------------------------Filter Parameters-----------------------
    z_true_series = y_train                              # Set the test set as the training set
    num_iter = max_epoch * len(z_true_series)               # Initialize max_iteration: epochs * dataset_len


    # -----------SIF Initalize Variables--------------
    x = w_init                                                  # Weights of the neural network
    n = x.shape[0]                                              # Number of States: 11
    m = z_true_series.shape[0]                                  # Measurement matrix: 105                                 

    delta = np.random.uniform(low=0.0009, high=0.9, size=(X_train.shape[0]))
    sat = np.zeros((m, m))
    C = np.ones((X_train.shape[0], n))
    w = np.zeros((x.shape))
    x = get_weights_vector(sif_ann)
    # Training loop with UKF
    aRate = 0.05

    # Epochs * len(y_train)
    for i in range(num_iter):
        idx = i % len(z_true_series)

        # Checking the accuracy of the model
        preds_softmax = sif_ann.predict(X_train)                                # Model prediction (softmax format)
        z_true_series_accuracy = np.argmax(z_true_series, axis=1)               # Select the highest probability as the output
        preds_accuracy = np.argmax(preds_softmax, axis=1)                       # Take the highest possibility as output among softmax output
        accuracy = accuracy_score(z_true_series_accuracy, preds_accuracy)       # Calculate the accuracy

        sif_ann_accuracy.append(accuracy)
        print("The accuracy is: ", accuracy)
        if (accuracy >= 0.85) and (i > 1):
            thelast = i
            break

            epoch += 1

        # -----------------Genetic Algorithm------------
        accuracy_GA = []                                    # Accuracy values of each individual
        weights_GA = []                                     # Weights of each individual

        # Genetic Algorithm loop
        # In each iteration a random weights are assigned and evaluated based on their accuracy. 
        epoch = 0
        for jj in range(100):

            weights_GA.append(get_weights_vector(sif_ann))  # Store the weights vector
            accuracy_GA.append(accuracy)                    # Store the accuracy values
            ##################################################

            # SIF Predicition Stage
            preds_softmax = sif_ann.predict(X_train)                                 # NN_model (weights)

            masked_output = z_true_series.max(axis=1, keepdims=1) == z_true_series          # Initialize the masking array which returns the true class

            predict_output = preds_softmax[masked_output]                                   # Slice the softmax value of the true class
            z_output = z_true_series[masked_output]                                         # Slice the true class vlaues

            pred_innov = z_output - predict_output                                          # Compute the predicted innovation matrix


            #-------------saturation computation----------------
            for i in range(1, m):
                if (abs(pred_innov[i]) / delta[i]) >= 1:        # If the value 
                    sat[i][i] = 1

                elif (abs(pred_innov[i]) / delta[i]) <= -1:
                    sat[i][i] = -1 

                else:
                    sat[i][i] = abs(pred_innov[i]) / delta[i]
            #-----------saturation computation-----------------


            pinvC = np.linalg.pinv(C)
            K = np.dot(pinvC, sat)
            x = np.asarray(
                [xx * random.uniform(0.001, 1) for xx in x]
            )  # Randomly initialize weights

            was = np.reshape(np.dot(K, pred_innov), x.shape)
            x = x + was  # NEED TO CHECK THIS LINE

            set_weights(sif_ann, x)

            preds = sif_ann.predict(X_train)
            accuracy = accuracy_score(z_true_series.argmax(axis=1), preds.argmax(axis=1))

            accuracy_GA.append(accuracy)
            weights_GA.append(x)

            # Pick the best individual
            if jj == 99:
                set_weights(sif_ann, weights_GA[accuracy_GA.index(max(accuracy_GA))])
                preds = sif_ann.predict(X_train)
                set_weights(sif_ann, weights_GA[accuracy_GA.index(max(accuracy_GA))])
        """
        time_to_train = time.time() - t0
        logging.info(
            "Training complete. time_to_train = {:.2f} sec, {:.2f} min".format(
                time_to_train, time_to_train / 60
            )
        )
        """


nn = create_neural_net(X_train.shape[1])
w_init = get_weights_vector(nn)
sif_ann_accuracy = []
epoch = 0

SIF_ANN(w_init)

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_4 (Dense)              (None, 1)                 5         
_________________________________________________________________
dense_5 (Dense)              (None, 3)                 6         
Total params: 11
Trainable params: 11
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_6 (Dense)              (None, 1)                 5         
_________________________________________________________________
dense_7 (Dense)              (None, 3)                 6         
Total params: 11
Trainable params: 11
Non-trainable params: 0
_________________________________________________________________
The accuracy is:  0.3238095238095238
The

## Result Analysis

In [None]:
#---------------Results analysis--------------

# Visualize evolution of ANN weights

# Visualize error curve (SGD vs UKF)
x_var = range(thelast + 1)                                                          # Get the last epoch's number
hist = history.history["loss"]                                                      # Get the history of the NN with SGD
ukf_train_mse = np.array(sifnn)                     
# utility.plot(x_var, hist, xlabel='Epoch',
#            label='SGD ANN training history (MSE)')
utility.plot(
    x_var, ukf_train_mse, new_figure=False, label="SIF ANN training history (MSE)"
)

# True test series vs. ANN pred vs, UKF pred
logging.info("Evaluating and visualizing neural net predictions")
evaluate_neural_nets(
    sgd_ann, ukf_ann, window, use_train_series=True, train_series=X_series
)
evaluate_neural_nets(sgd_ann, ukf_ann, window)

utility.save_all_figures("output")
plt.show()

print("The Min MSE is ", min(minval), " vs ", hist[-1])
print("Total amount of epochs for SIF: ", epoch)