In [None]:
#!/usr/bin/env python
# coding: utf-8
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import openpyxl
from scipy.stats import zscore
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
import math
import torch.nn.functional as F

# load the dataset
# SFEW_rename_title_line.xlsx is a slightly  modified file on the original dataset which renames the first line
data = pd.read_excel('SFEW_rename_title_line.xlsx', engine='openpyxl')

# separate into PHOG and LPQ data sets
data_Y_both = data.iloc[:, 1]
data_LPQ = pd.concat([data_Y_both, data.iloc[:, 2:7]], axis=1)
data_PHOG = pd.concat([data_Y_both, data.iloc[:, 7:]], axis=1)
data_LPQ = data_LPQ.dropna()
data_PHOG = data_PHOG.dropna()


# combined dataset
data = data.dropna()
data = data.iloc[:, 1:]



####################### CasCor model ###############################

class TensorDataset(torch.utils.data.Dataset):
    '''
    Used for backprop in CasCor
    '''
    def __init__(self,train_features,train_labels):
        self.X = train_features
        self.Y = train_labels
    
    def __getitem__(self,index):
        return self.X[index], self.Y[index]
    def __len__(self):
        return len(self.Y)

def backprop(x_, y_, weights, loss_func=torch.nn.CrossEntropyLoss(),
             learning_rate=0.001, epochs=500, is_cov=False, batch_size=128):
    '''
    Used for training input weights to a candidate unit and output weights to the output units
    '''
    x = torch.tensor(x_).float()
    if is_cov:
        y = torch.tensor(y_)
    else:
        y = torch.tensor(y_).long()
        
    train_loader = torch.utils.data.DataLoader(TensorDataset(x_, y_), batch_size, shuffle = True)
    net = torch.nn.Linear(weights.shape[0], weights.shape[1])
    optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
    
    for epoch in range(epochs):
        for _, (batch_x, batch_y) in enumerate(train_loader):
            X = batch_x.float()
            if is_cov:
                Y = batch_y  # for training the input weights to candidate units
            else:
                Y = batch_y.long()

            optimizer.zero_grad()        
            y_pred = net(X)
        
            loss = loss_func(y_pred, Y)
            loss.backward()
            optimizer.step()

    final_pred = net.forward(x)
    final_loss = loss_func(final_pred, y)

    return net.weight.data.numpy(), final_pred.detach().numpy(), final_loss.item(), net


def covariance_loss(pred, target):
    '''Calculate the correlation S, and take its reciprocal'''
    pred_mean = torch.mean(pred, dim=0)
    target_mean = torch.mean(target, dim=0)
    target_cor = target - target_mean

    S = torch.sum(torch.abs(torch.sum((pred - pred_mean) * target_cor, axis=0)), axis=0)
    return 1/S


def train_hidden(x, y, predicted, pool_size=3):
    '''
    Train the input weights to a candidate unit and returns the candidate with maximum correlation S
    '''
    n, m = x.shape
    candidate_pool = []
    best_idx = 0
    best_S = math.inf
    net = None

    # select the predicted probability for the correct label
    predicted = F.softmax(torch.tensor(predicted).float(), dim=1).detach().numpy()
    predicted = np.choose(y, predicted.T).reshape(-1, 1)

    for i in range(pool_size):
        weights = np.random.randn(m, 1)

        # Calculate the residual error for correlation
        err = torch.tensor(predicted - np.ones(predicted.shape))

        # pass in the residual errors as a target
        weights, predicted, inv_S, net = backprop(x, err, weights, loss_func=covariance_loss, is_cov=True)

        # select the minimum 1/S == select the maximum S
        candidate_pool.append((weights, predicted))
        if inv_S < best_S:
            best_S = inv_S
            best_idx = i

    return candidate_pool[best_idx], net


class CasCorNet(object):
    def __init__(self, input_size, output_size):
        self.input_size = input_size
        self.output_size = output_size
        self.weights = self.init_weights()  # weights of inputs to the outputs and hidden units to the outputs
        self.no_hidden_units = 0
        self.loss_func = torch.nn.CrossEntropyLoss()


    def set_data(self, X_train, y_train, X_val, y_val, X_test, y_test):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.X_test = X_test
        self.y_test = y_test

    def init_weights(self):
        '''
        Random initial the weights
        '''
        weights = np.random.randn(self.input_size, self.output_size)
        return weights

    def accuracy(self, y_pred, Y):
        predicted = np.argmax(y_pred, 1)
        total = len(y_pred)
        correct = sum(predicted == Y)
        return 100 * correct / total

    def test_model(self, net):
        '''Use the reserved test set to evaluate the final model'''
        if net: # net is not None, otherwise return directly
            y_test_val = net.forward(torch.tensor(self.X_test).float())
            test_loss = self.loss_func(y_test_val, torch.tensor(self.y_test).long())
            print("#" * 10)
            print("Final test loss: ", test_loss.item())
            print("Final test accuracy:", self.accuracy(y_test_val.detach().numpy(), self.y_test))
            
            return test_loss.item(), self.accuracy(y_test_val.detach().numpy(), self.y_test)

    def train(self):
        all_train_loss = []
        all_train_acc = []
        all_val_loss = []
        all_val_acc = []

        iteration = 0
        acceptable_loss = 0.1
        max_iterations = 32

        while True:
            new_weights, predict, loss, net = backprop(self.X_train, self.y_train, self.weights,
                                                       loss_func=nn.CrossEntropyLoss())

            # update output units' weights
            self.weights = new_weights
            
            # calculate train accuracy and loss
            train_acc = self.accuracy(predict, self.y_train)
            all_train_loss.append(loss)
            all_train_acc.append(train_acc)
            print("training loss: {:.2f}".format(loss))
            print("training accuracy: {:.2f}".format(train_acc))

            # calculate validation accuracy and loss
            y_pred_val = net.forward(torch.tensor(self.X_val).float())
            val_loss = self.loss_func(y_pred_val, torch.tensor(self.y_val).long())
            val_acc = self.accuracy(y_pred_val.detach().numpy(), self.y_val)
            all_val_loss.append(val_loss)
            all_val_acc.append(val_acc)
            print("validation loss: {:.2f}".format(val_loss.item()))
            print("validation accuracy: {:.2f}%".format(val_acc))
            print()

            # termination criterion
            if val_loss < acceptable_loss:
                break
            if iteration == max_iterations:
                break

            # otherwise add a new hidden unit
            self.X_train = self.add_hidden_unit(self.X_train, self.y_train, predict)

            iteration += 1

        # use the test set to evaluate the model
        return self.test_model(net)
        

    def augment_input(self, x, v):
        '''
        Helper method for augment v to x
        '''
        new_x = np.zeros((x.shape[0], x.shape[1] + 1))
        new_x[:, :-1] = x
        new_x[:, -1] = v

        return new_x

    def add_hidden_unit(self, X, Y, predict):
        # train the input weights to the hidden unit and augment the hidden unit's value to the input
        (weights, neuron_value), net = train_hidden(X, Y, predict)
        # augment hidden unit value to the input <-> frozen and store input weights to the hidden unit
        new_X = self.augment_input(X, neuron_value.reshape(-1))

        # update validation set input <-> frozen and store input weight to the hidden unit
        val_neuron_value = net.forward(torch.tensor(self.X_val).float()).detach().numpy()
        self.X_val = self.augment_input(self.X_val, val_neuron_value.reshape(-1))

        # update test set input <-> frozen and store input weight to the hidden unit
        test_neuron_value = net.forward(torch.tensor(self.X_test).float()).detach().numpy()
        self.X_test = self.augment_input(self.X_test, test_neuron_value.reshape(-1))

        self.input_size += 1

        # update weights connecting to the output units for later weight updating
        new_weights = self.init_weights()
        new_weights[:-1, :] = self.weights.T
        self.weights = new_weights
        self.no_hidden_units += 1
        print("number of hidden units:   ", self.no_hidden_units)

        return new_X

def add_bias(X):
    '''
    Add bias to the inputs before training 
    '''
    tmp = np.ones((X.shape[0], X.shape[1] + 1))
    tmp[:, :-1] = X
    return tmp



def cascor_train(data_train_X, data_train_Y, data_val_X, data_val_Y, data_test_X, data_test_Y):
    '''
    Add bias to the input, then start training
    '''
    data_train_X = add_bias(data_train_X.to_numpy())
    data_train_Y = data_train_Y.to_numpy()
    data_val_X = add_bias(data_val_X.to_numpy())
    data_val_Y = data_val_Y.to_numpy()
    data_test_X = add_bias(data_test_X.to_numpy())
    data_test_Y = data_test_Y.to_numpy()
    
    net = CasCorNet(len(data_train_X[0]), len(np.unique(data_train_Y)))
    net.set_data(data_train_X, data_train_Y, data_val_X, data_val_Y, data_test_X, data_test_Y)
    return net.train()

def cascor_test_train_split(data):
    # split data set into train, test
    msk = np.random.rand(len(data)) < 0.95
    data_train = data[msk]
    data_test = data[~msk]
    
    # split the train data set into train, validation
    msk = np.random.rand(len(data_train)) < 0.9
    data_valid = data_train[~msk]
    data_train = data_train[msk]
    
    data_train_X = data_train.iloc[:, 1:]
    data_train_Y = data_train.iloc[:, 0] - 1
    data_valid_X = data_valid.iloc[:, 1:]
    data_valid_Y = data_valid.iloc[:, 0] - 1
    data_test_X = data_test.iloc[:, 1:]
    data_test_Y = data_test.iloc[:, 0] - 1
    
    return cascor_train(data_train_X, data_train_Y, data_valid_X, data_valid_Y, data_test_X, data_test_Y)
    
def cascor_starter(data_LPQ, data_PHOG):
    '''Train LPQ and PHOG separately and diplay the results'''
    LPQ_loss, LPQ_acc = cascor_test_train_split(data_LPQ)
    PHOG_loss, acc = cascor_test_train_split(data_PHOG)

    print("LPQ loss acc: ", LPQ_loss, LPQ_acc)
    print("PHOG loss acc: ", PHOG_loss, PHOG_acc)

def cascor_combined_starter(data):
    '''Train combined dataset and display the result'''
    loss, acc = cascor_test_train_split(data)
    print("Combined loss: ", loss)
    print("Combined acc: ", acc)



# cascor_starter(data_LPQ, data_PHOG)
cascor_combined_starter(data)

####################### End of the CasCor model ###############################



  warn(msg)


training loss: 1.93
training accuracy: 20.00
validation loss: 1.94
validation accuracy: 11.59%





number of hidden units:    1
training loss: 1.93
training accuracy: 20.71
validation loss: 1.94
validation accuracy: 10.14%

number of hidden units:    2
training loss: 1.93
training accuracy: 20.88
validation loss: 1.94
validation accuracy: 11.59%

number of hidden units:    3
training loss: 1.93
training accuracy: 20.18
validation loss: 1.94
validation accuracy: 13.04%

number of hidden units:    4
training loss: 1.93
training accuracy: 20.88
validation loss: 1.94
validation accuracy: 10.14%

number of hidden units:    5
training loss: 1.93
training accuracy: 21.95
validation loss: 1.94
validation accuracy: 13.04%

number of hidden units:    6
training loss: 1.93
training accuracy: 20.18
validation loss: 1.94
validation accuracy: 8.70%

number of hidden units:    7
training loss: 1.93
training accuracy: 20.71
validation loss: 1.94
validation accuracy: 11.59%

number of hidden units:    8
training loss: 1.93
training accuracy: 21.24
validation loss: 1.94
validation accuracy: 14.49%

n