# Package Import

In [None]:
import numpy as np
import numpy.linalg as la
from sklearn.preprocessing import Normalizer
import matplotlib.pyplot as plt

# Utility Functions

In [None]:
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict


def merge_batches(num_to_load=1):
    for i in range(num_to_load):
        fileName = "/home/amir/Documents/Machine Vision/HW#4/cifar-10-python/cifar-10-batches-py/data_batch_" + str(i + 1)
        data = unpickle(fileName)
        if i == 0:
            features = data[b'data']
            labels = np.array(data[b'labels'])
        else:
            features = np.append(features, data[b'data'], axis=0)
            labels = np.append(labels, data[b'labels'], axis=0)
    return features, labels


class Utility:
    @classmethod
    def sigmoid(cls, arr):
        return 1 / (1 + np.exp(-arr))

    @classmethod
    def sigmoid_d(cls, arr):
        return cls.sigmoid(arr) * (1 - cls.sigmoid(arr))
    
    @classmethod
    def mean_square_loss(cls, y, y_hat):
        return (la.norm(y - y_hat) ** 2) / y.shape[0]

    @classmethod
    def get_batch(cls, X, y, idx, batch_size=100):
        return X[idx*batch_size:(idx+1)*batch_size], y[idx*batch_size:(idx+1)*batch_size]

    @classmethod
    def SigmoidCrossEntropyLoss(cls, a, y):
        a = cls.softmax(a)
        return np.sum(np.nan_to_num(-y * np.log(a) - (1-y) * np.log(1-a))) / (10*y.shape[0])

    @classmethod
    def one_hot_encode(cls, data, size=10):
        one_hot = np.zeros((data.shape[0], size))
        one_hot[np.arange(data.shape[0]), data] = 1
        return one_hot

# Multi-layer Perceptron Network

In [None]:
class Network:
    def __init__(self, x, y, hs=50, os=10):
        self.hidden_size = hs
        self.output_size = os
        self.input = x
        self.label = y
        self.h = np.zeros(hs)
        self.output = np.zeros(self.output_size)
        self.W = np.random.randn(self.input.shape[1], self.hidden_size)
        self.W_b = np.zeros((1, self.hidden_size))
        self.U = np.random.randn(self.hidden_size, self.output_size)
        self.U_b = np.zeros((1, self.output_size))

    def feed_forward(self, x):
        l1 = np.dot(x, self.W) + self.W_b
        a1 = Utility.sigmoid(l1)
        l2 = np.dot(a1, self.U) + self.U_b
        a2 = Utility.sigmoid(l2)

        return l1, a1, l2, a2

    def backpropagate(self, x, y, batch_size):
        r, h, z, o = self.feed_forward(x)
        # loss = Utility.SigmoidCrossEntropyLoss(o, y)
        loss = Utility.mean_square_loss(o, y)
        loss_deriv = o - y
        du = np.dot(h.T, loss_deriv * Utility.sigmoid_d(np.dot(h, self.U)))
        dbu = np.dot(np.ones((1, batch_size)), loss_deriv * Utility.sigmoid_d(np.dot(h, self.U)))
        dw = np.dot(x.T, np.dot(loss_deriv * Utility.sigmoid_d(np.dot(h, self.U)), self.U.T)
                    * Utility.sigmoid_d(np.dot(x, self.W)))
        dbw = np.dot(np.ones((1, batch_size)), np.dot(loss_deriv * Utility.sigmoid_d(np.dot(h, self.U)), self.U.T)
                     * Utility.sigmoid_d(np.dot(x, self.W)))

        return loss, du, dbu, dw, dbw

    def train(self, gamma=0.9, alpha=0.1, batch_size=100, epoch=50):
        losses = []
        accs = []
        iteration_num = int(len(self.input)/batch_size)
        print("iteration number: ", iteration_num)
        for e in range(epoch):
            dw_prev, dbw_prev, du_prev, dbu_prev = 0, 0, 0, 0
            for b in range(iteration_num-1):
                batch_x, batch_y = Utility.get_batch(self.input, self.label, idx=b, batch_size=batch_size)
                loss, du, dbu, dw, dbw = self.backpropagate(batch_x, batch_y, batch_size)
                dw_prev = gamma*dw_prev + (1-gamma)*dw
                dbw_prev = gamma*dbw_prev + (1-gamma)*dbw
                du_prev = gamma*du_prev + (1-gamma)*du
                dbu_prev = gamma*dbu_prev + (1-gamma)*dbu
                self.W -= alpha*dw_prev
                self.W_b -= alpha*dbw_prev
                self.U -= alpha*du_prev
                self.U_b -= alpha*dbu_prev

            eval_batch_x, eval_batch_y = Utility.get_batch(self.input, self.label, idx=(iteration_num-1), batch_size=batch_size)
            accs.append(self.evaluate(eval_batch_x, eval_batch_y))
            losses.append(loss)
            alpha *= 0.99
            print("Epoch %d complete\tLoss: %f\n" % (e, loss))
        plt.plot(list(np.arange(0, epoch, 1)), losses, c='red')
        plt.xlabel('epoch')
        plt.ylabel('loss')
        plt.show()
        plt.plot(list(np.arange(0, epoch, 1)), accs, c='green')
        plt.xlabel('epoch')
        plt.ylabel('training accuracy')
        plt.show()

    def evaluate(self, x, y):
        count = 0
        r, h, z, o = self.feed_forward(x)
        for output, _y in zip(o, y):
            if np.argmax(output) == np.argmax(_y):
                count += 1
        print("Accuracy: %f" % ((float(count) / x.shape[0]) * 100))
        return float(count) / (x.shape[0]) * 100

# Data Load & Prep

In [None]:
data, label = merge_batches(5)
one_hot_label = Utility.one_hot_encode(label)
data_prep = Normalizer().fit_transform(data)

test_batch = unpickle('/home/amir/Documents/Machine Vision/HW#4/cifar-10-python/cifar-10-batches-py/test_batch')
test_data = test_batch[b'data']
test_data_prep = Normalizer().fit_transform(test_data)
test_label = np.array(test_batch[b'labels'])
one_hot_test_label = Utility.one_hot_encode(test_label)

# Train Network

In [None]:
network = Network(x=data_prep, y=one_hot_label, hs=50)
network.train()

# Evaluate

In [None]:
network.evaluate(test_data_prep, one_hot_test_label)