In [None]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

In [None]:
# fixed seed for reproducibility
import numpy as np
np.random.seed(1337)

In [None]:
# Load the FashionMNIST dataset
from torchvision import datasets, transforms

transforms = transforms.ToTensor()

# Load the training data
train_dataset = datasets.FashionMNIST('./data', download=True, train=True, transform=transforms)

# Load the test data
test_dataset = datasets.FashionMNIST('./data', download=True, train=False, transform=transforms)

In [None]:
# visualize the data
import matplotlib.pyplot as plt


fig, axes = plt.subplots(nrows=2, ncols=5, sharex=True, sharey=True, figsize=(20, 5))
for i in range(10):
    ax = axes[i // 5, i % 5]
    ax.imshow(train_dataset.data[i], cmap='gray')
    ax.set_title(train_dataset.classes[train_dataset.targets[i]])
plt.tight_layout()
plt.show()

In [None]:
# we need to calculate accuracy, precision, recall, and F1 score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input):
        raise NotImplementedError

    def backward(self, output_grad, learning_rate):
        raise NotImplementedError
    

class Activation(Layer):
    def __init__(self):
        super(Activation, self).__init__()

    def forward(self, input):
        raise NotImplementedError
    
    def backward(self, output_grad, learning_rate):
        raise NotImplementedError
    

class Regularization(Layer):
    def __init__(self):
        super(Regularization, self).__init__()

    def forward(self, input):
        raise NotImplementedError
    
    def backward(self, output_grad, learning_rate):
        raise NotImplementedError
    
    def parameters(self):
        return []
    

class Optimizer:
    def __init__(self):
        pass

    def step(self):
        raise NotImplementedError
    

class Normalization(Layer):
    def __init__(self):
        super(Normalization, self).__init__()

    def forward(self, input):
        raise NotImplementedError
    
    def backward(self, output_grad, learning_rate):
        raise NotImplementedError 
    

class Loss:
    def __init__(self):
        pass

    def forward(self, input, target):
        raise NotImplementedError
    
    def backward(self):
        raise NotImplementedError

# Dense Layer

In [None]:
class Dense(Layer):
    def __init__(self, input_dim, output_dim):
        super(Dense, self).__init__()
        # Xavier initialization
        # weight dimension = (input_dim, output_dim)
        self.weight = np.random.randn(input_dim, output_dim) * np.sqrt(1 / input_dim)

        # bias dimension = (1, output_dim)
        self.bias = np.zeros((1, output_dim))

        # Initialize the gradients
        # weight gradient dimension = (input_dim, output_dim)
        self.weight_grad = np.zeros_like(self.weight)

        # bias gradient dimension = (1, output_dim)
        self.bias_grad = np.zeros_like(self.bias)

    def forward(self, X):
        # input dimension = (batch_size, features)
        self.input = X

        # input dimension = (batch_size, features)
        # weight dimension = (features, output_dim)
        # output dimension = (batch_size, output_dim)
        self.output = np.dot(self.input, self.weight) + self.bias

        return self.output
    

    def update_params(self, lr):
        self.weight -= lr * self.weight_grad
        self.bias -= lr * self.bias_grad
    
    def backward(self, grad_in, lr):
        # grad_in dimension = (batch_size, output_dim)

        # input dimension = (batch_size, input_dim)
        # weight dimension = (input_dim, output_dim)
        # grad_out dimension = (batch_size, input_dim)
        grad_out = np.dot(grad_in, self.weight.T)

        # input dimension = (batch_size, input_dim)
        # grad_in dimension = (batch_size, output_dim)
        # weight_grad dimension = (input_dim, output_dim)
        self.weight_grad = np.dot(self.input.T, grad_in)

        # bias_grad dimension = (1, output_dim)
        self.bias_grad = np.sum(grad_in, axis=0, keepdims=True)

        # Update the weights
        # self.update_params(lr)

        return grad_out

# RELU Activation

In [None]:
class ReLU(Activation):
    def __init__(self):
        super(ReLU, self).__init__()

    def forward(self, X):
        self.input = X
        return np.maximum(0, X)
    
    def backward(self, grad_in, lr):
        return grad_in * (self.input > 0)

# SoftMax Activation

In [None]:
class SoftMax(Activation):
    def __init__(self):
        super(SoftMax, self).__init__()
        self.output = None
        self.input = None

    def forward(self, X):
        self.input = X
        
        X_max = np.max(X, axis=1, keepdims=True)
        stab = X - X_max
        exps = np.exp(X - X_max)
        self.output = exps / (np.sum(exps, axis=1, keepdims=True) + 1e-10)
        return self.output
    
    def backward(self, grad_in, lr):
        return grad_in
        


# Dropout Regularization

In [None]:
class Dropout(Regularization):
    def __init__(self, dropout_rate):
        super(Dropout, self).__init__()
        self.dropout_rate = dropout_rate
        self.mask = None
        self.train = True

    def forward(self, X):
        if self.train:
            self.mask = np.random.binomial(1, 1 - self.dropout_rate, size=X.shape) / (1 - self.dropout_rate)
            return X * self.mask
        return X
    
    def backward(self, grad_in, lr):
        return grad_in * self.mask

# Batch Normalization

In [None]:
class Batchnorm(Normalization):
    def __init__(self, input_dim):
        self.input_dim = input_dim

        # gamma dimension = (1, input_dim)
        self.weight = np.random.randn(1, input_dim) * np.sqrt(1 / input_dim)

        # beta dimension = (1, input_dim)
        self.bias = np.zeros((1, input_dim))

        # Initialize the gradients
        # gamma gradient dimension = (1, input_dim)
        self.weight_grad = np.zeros_like(self.weight)

        # beta gradient dimension = (1, input_dim)
        self.bias_grad = np.zeros_like(self.bias)

        self.input = None
        self.normalized_input = None
        self.miu = None
        self.variance = None
        self.epsilon = 1e-7
        self.output = None


    def forward(self, X):
        self.input = X

        # input dimension = (batch_size, input_dim)
        # miu dimension = (1, input_dim)
        self.miu = np.mean(self.input, axis=0, keepdims=True)
        
        # input dimension = (batch_size, input_dim)
        # variance dimension = (1, input_dim)
        self.variance = np.var(self.input, axis=0, keepdims=True)

        # input dimension = (batch_size, input_dim)
        # normalized_input dimension = (batch_size, input_dim)
        self.normalized_input = (self.input - self.miu) / np.sqrt(self.variance + self.epsilon)

        # input dimension = (batch_size, input_dim)
        # gamma dimension = (1, input_dim)
        # beta dimension = (1, input_dim)
        # output dimension = (batch_size, input_dim)
        self.output = self.weight * self.normalized_input + self.bias

        return self.output
    

    def backward(self, grad_in, lr):
        # dbias shape = (1, input_dim)
        dbias = np.sum(grad_in, axis=0, keepdims=True)

        # dgamma shape = (1, input_dim)
        dweights = np.sum(grad_in * self.normalized_input, axis=0, keepdims=True)

        # dnormalized_input shape = (batch_size, input_dim)
        dnormalized_input = grad_in * self.weight

        # dvairance shape = (1, input_dim)
        dvariance = np.sum(dnormalized_input * (self.input - self.miu) * -0.5 * np.power(self.variance + self.epsilon, -1.5), axis=0, keepdims=True)

        # dmiu shape = (1, input_dim)
        dmiu = np.sum(dnormalized_input * -1 / np.sqrt(self.variance + self.epsilon), axis=0, keepdims=True) + dvariance * np.mean(-2 * (self.input - self.miu), axis=0, keepdims=True)

        # grad_out shape = (batch_size, input_dim)
        grad_out = dnormalized_input / np.sqrt(self.variance + self.epsilon) + dvariance * 2 * (self.input - self.miu) / self.input.shape[0] + dmiu / self.input.shape[0]

        self.weight_grad = dweights
        self.bias_grad = dbias

        return grad_out


# Adam Optimizer

In [None]:
class Adam(Optimizer):
    def __init__(self, layers, lr=0.001, beta1=0.9, beta2=0.999):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = 1e-8
        self.layers = layers
        
        # if the layer has weights and biases, initialize the m_t and v_t for weights and biases 
        # if not initialize them to None
        self.m_t_weight = [None] * len(self.layers)
        self.v_t_weight = [None] * len(self.layers)
        self.m_t_bias = [None] * len(self.layers)
        self.v_t_bias = [None] * len(self.layers)

        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'weight'):
                self.m_t_weight[i] = np.zeros_like(layer.weight)
                self.v_t_weight[i] = np.zeros_like(layer.weight)
            if hasattr(layer, 'bias'):
                self.m_t_bias[i] = np.zeros_like(layer.bias)
                self.v_t_bias[i] = np.zeros_like(layer.bias)
        self.t = 0 

    def step(self):
        self.t += 1
        for i, layer in enumerate(self.layers):
           
            if hasattr(layer, 'weight'):
                self.m_t_weight[i] = self.beta1 * self.m_t_weight[i] + (1 - self.beta1) * layer.weight_grad
                self.v_t_weight[i] = self.beta2 * self.v_t_weight[i] + (1 - self.beta2) * layer.weight_grad ** 2

                m_t_weight_hat = self.m_t_weight[i] / (1 - self.beta1 ** self.t)
                v_t_weight_hat = self.v_t_weight[i] / (1 - self.beta2 ** self.t)

                layer.weight -= self.lr * m_t_weight_hat / (np.sqrt(v_t_weight_hat) + self.epsilon)

            if hasattr(layer, 'bias'):
                self.m_t_bias[i] = self.beta1 * self.m_t_bias[i] + (1 - self.beta1) * layer.bias_grad
                self.v_t_bias[i] = self.beta2 * self.v_t_bias[i] + (1 - self.beta2) * layer.bias_grad ** 2

                m_t_bias_hat = self.m_t_bias[i] / (1 - self.beta1 ** self.t)
                v_t_bias_hat = self.v_t_bias[i] / (1 - self.beta2 ** self.t)

                layer.bias -= self.lr * m_t_bias_hat / (np.sqrt(v_t_bias_hat) + self.epsilon)

    def save_state_dict(self, state_dict):
       
        state_dict['m_t_weight'] = self.m_t_weight
        state_dict['v_t_weight'] = self.v_t_weight
        state_dict['m_t_bias'] = self.m_t_bias
        state_dict['v_t_bias'] = self.v_t_bias
        state_dict['t'] = self.t
        state_dict['lr'] = self.lr
        state_dict['beta1'] = self.beta1
        state_dict['beta2'] = self.beta2
        state_dict['epsilon'] = self.epsilon

        return state_dict
    
    def load_state_dict(self, state_dict):
        
        self.m_t_weight = state_dict['m_t_weight']
        self.v_t_weight = state_dict['v_t_weight']
        self.m_t_bias = state_dict['m_t_bias']
        self.v_t_bias = state_dict['v_t_bias']
        self.t = state_dict['t']
        self.lr = state_dict['lr']
        self.beta1 = state_dict['beta1']
        self.beta2 = state_dict['beta2']
        self.epsilon = state_dict['epsilon']

        return self
            

# Cross Entropy Loss

In [None]:
class CategoricalCrossEntropyLoss(Loss):
    def __init__(self):
        super(CategoricalCrossEntropyLoss, self).__init__()
        self.loss = None
        self.labels = None
        self.pred = None

    def forward(self, pred, labels):
        # one-hot encode the labels
        self.labels = np.eye(pred.shape[1])[labels]
        
        # pred dimension = (batch_size, num_classes)
        # labels dimension = (batch_size, num_classes)
        self.pred = pred
        self.loss = -np.sum(self.labels * np.log(pred + 1e-10)) / pred.shape[0]
        return self.loss
    
    def backward(self):
        return self.pred - self.labels

In [None]:
import pickle

In [None]:
class MyNeuralNetwork:
    def __init__(self, layers, loss_fn, optimizer):
        self.layers = layers
        self.loss_fn = loss_fn
        self.optimizer = optimizer

    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X
    
    def backward(self, grad_in, lr):
        for layer in reversed(self.layers):
            grad_in = layer.backward(grad_in, lr)

    def train(self, X, labels, lr):
        # Forward pass 
        # reshape the input
        X = X.reshape(X.shape[0], -1)
       
        output = self.forward(X)
        
        loss = self.loss_fn.forward(output, labels)

        # Backward pass
        grad = self.loss_fn.backward()
        self.backward(grad, lr)

        # Update the weights
        self.optimizer.step()

        return loss
    
    def predict(self, X):
        # Forward pass
        # reshape the input
        X = X.reshape(X.shape[0], -1)
        output = self.forward(X)
        return output 
    
    def eval_mode(self):
        for layer in self.layers:
            if isinstance(layer, Dropout):
                layer.train = False

    def train_mode(self):
        for layer in self.layers:
            if isinstance(layer, Dropout):
                layer.train = True

    def save_model(self, path):
        # save the weights and biases of the model
        state_dict = {}

        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'weight'):
                state_dict[f'weight_{i}'] = layer.weight
            if hasattr(layer, 'bias'):
                state_dict[f'bias_{i}'] = layer.bias

        # save the state of the optimizer
        state_dict = self.optimizer.save_state_dict(state_dict)

       
        with open(path, 'wb') as f:
            pickle.dump(state_dict, f)



    @staticmethod
    def load_model(path, layers, loss_fn, optimizer):
        # load the weights and biases of the model
        with open(path, 'rb') as f:
            state_dict = pickle.load(f)

        model = MyNeuralNetwork(layers, loss_fn, optimizer)
        for i, layer in enumerate(model.layers):
            if hasattr(layer, 'weight'):
                layer.weight = state_dict[f'weight_{i}']
            if hasattr(layer, 'bias'):
                layer.bias = state_dict[f'bias_{i}']

        # load the state of the optimizer
        model.optimizer = model.optimizer.load_state_dict(state_dict)
        model.optimizer.layers = model.layers
        
        return model
        
        
        

       

In [None]:
# import tqdm
from tqdm import tqdm

In [None]:
def preprocess_data(train_dataset, test_dataset):
    train_data = train_dataset.data.numpy()
    train_labels = train_dataset.targets.numpy()

    # split
    train_data, val_data, train_labels, val_labels = train_test_split(train_data, train_labels, test_size=0.2)

    test_data = test_dataset.data.numpy()
    test_labels = test_dataset.targets.numpy()

    # Normalize the data
    train_data = train_data / 255.0
    val_data = val_data / 255.0
    test_data = test_data / 255.0 

    # shuffle the data
    idx = np.random.permutation(len(train_data))
    train_data, train_labels = train_data[idx], train_labels[idx]

    idx = np.random.permutation(len(val_data))
    val_data, val_labels = val_data[idx], val_labels[idx]

    idx = np.random.permutation(len(test_data))
    test_data, test_labels = test_data[idx], test_labels[idx]

    return train_data, val_data, test_data, train_labels, val_labels, test_labels

    

In [None]:
def trainer(layers ,train_data, train_labels, val_data, val_labels ,learning_rate, nepochs, batch_size):
    input_dim = train_data.shape[1] * train_data.shape[1]
    output_dim = len(np.unique(train_labels))
   
    loss_fn = CategoricalCrossEntropyLoss()

    optimizer = Adam(layers, lr=learning_rate)

    
    model = MyNeuralNetwork(layers, loss_fn, optimizer)

    
    train_loss = []
    val_loss = []
    train_acc = []
    val_acc = []
    val_macro_f1 = []

    

    for epoch in tqdm(range(nepochs)):
        model.train_mode()
        epoch_loss = 0
        correct = 0
        total = 0
        
        
        idx = np.random.permutation(len(train_data))
        train_data, train_labels = train_data[idx], train_labels[idx]

        no_of_iterations = len(train_data) // batch_size
        for i in range(0, no_of_iterations):
            X = train_data[i * batch_size: (i + 1) * batch_size]
            labels = train_labels[i * batch_size: (i + 1) * batch_size]
            loss = model.train(X, labels, learning_rate)
            epoch_loss += loss
            pred = np.argmax(model.predict(X), axis=1)
            correct += np.sum(pred == labels)
            total += len(labels)
        train_loss.append(epoch_loss / no_of_iterations)
        train_acc.append(correct / total)

        model.eval_mode()
        epoch_loss = 0
        correct = 0
        total = 0
        f1 = 0 
        conf_matrix = np.zeros((output_dim, output_dim))
        no_of_iterations = len(val_data) // batch_size
        for i in range(0, no_of_iterations):
            X = val_data[i * batch_size: (i + 1) * batch_size]
            labels = val_labels[i * batch_size: (i + 1) * batch_size]
            loss = model.train(X, labels, learning_rate)
            epoch_loss += loss
            pred = np.argmax(model.predict(X), axis=1)
            correct += np.sum(pred == labels)
            total += len(labels)
            f1 += f1_score(labels, pred, average='macro')
            conf_matrix += confusion_matrix(labels, pred, labels=np.unique(train_labels))

        val_loss.append(epoch_loss / no_of_iterations)
        val_acc.append(correct / total)
        val_macro_f1.append(f1 / no_of_iterations)
        print(f"Epoch: {epoch + 1}, Train Loss: {train_loss[-1]:.4f}, Val Loss: {val_loss[-1]:.4f}, Train Acc: {train_acc[-1]:.4f}, Val Acc: {val_acc[-1]:.4f}, Val Macro F1: {val_macro_f1[-1]:.4f}")

        # implement learning rate scheduler based on the validation loss
        if epoch > 0:
            if val_loss[-1] >= val_loss[-2]:
                learning_rate = learning_rate * 0.5
                optimizer.lr = learning_rate
                print(f"Learning rate reduced to {learning_rate}")

        # early stopping
        if epoch > 10:
            if val_loss[-1] >= val_loss[-2] and val_loss[-2] >= val_loss[-3] and val_loss[-3] >= val_loss[-4]:
                print("Early stopping")
                break
    # save the model
    model.save_model('model/model.pkl')
    return train_loss, val_loss, train_acc, val_acc, val_macro_f1, conf_matrix


In [None]:
def tester(layers, path, test_data, test_labels, batch_size):
    model = MyNeuralNetwork.load_model(path, layers, CategoricalCrossEntropyLoss(), Adam(layers, lr=0.001))
    
    model.eval_mode()
    
    correct = 0
    total = 0
    predictions = []
    targets = []

    no_of_iterations = len(test_data) // batch_size
    for i in range(0, no_of_iterations):
        X = test_data[i * batch_size: (i + 1) * batch_size]
        labels = test_labels[i * batch_size: (i + 1) * batch_size]
        pred = np.argmax(model.predict(X), axis=1)
        predictions.extend(pred.tolist())
        targets.extend(labels.tolist())
        correct += np.sum(pred == labels)
        total += len(labels)
    

    accuracy = correct / total
    precision = precision_score(targets, predictions, average='macro')
    recall = recall_score(targets, predictions, average='macro')
    f1 = f1_score(targets, predictions, average='macro')
    conf_matrix = confusion_matrix(targets, predictions)

    print(f'Test Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}')

    return accuracy, precision, recall, f1, conf_matrix

# Preprocess and load the data

In [None]:
# preprocess the data
train_data, val_data, test_data, train_labels, val_labels, test_labels = preprocess_data(train_dataset, test_dataset)

input_dim = train_data.shape[1] * train_data.shape[1]
output_dim = len(np.unique(train_labels))

# Train

In [None]:
layers = [
    Dense(input_dim, 512),
    Batchnorm(512),
    ReLU(),
    Dropout(0.5),
    Dense(512, 256),
    Batchnorm(256),
    ReLU(),
    Dropout(0.5),
    Dense(256, output_dim),
    SoftMax()
]
# train the model
train_loss, val_loss, train_acc, val_acc, val_macro_f1, val_conf_matrix = trainer(
    layers=layers,
    train_data=train_data,
    train_labels=train_labels,
    val_data=val_data,
    val_labels=val_labels,
    learning_rate=0.0005,
    nepochs=60,
    batch_size=64
)

In [None]:
# plot the loss , accuracy and f1 score
plt.figure(figsize=(20, 5))
plt.subplot(1, 3, 1)
plt.plot(train_loss, label='Train Loss')
plt.plot(val_loss, label='Val Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(train_acc, label='Train Accuracy')
plt.plot(val_acc, label='Val Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(val_macro_f1, label='Val Macro F1')
plt.xlabel('Epochs')
plt.ylabel('F1 Score')
plt.legend()
plt.show()



In [None]:
# plot the validation confusion matrix
import seaborn as sns
plt.figure(figsize=(10, 7))
sns.heatmap(val_conf_matrix, annot=True, fmt='g', cmap='Blues', xticklabels=train_dataset.classes, yticklabels=train_dataset.classes)
plt.xlabel('Predictions')
plt.ylabel('Targets')
plt.show()


# Load and test the saved model

In [None]:

layers = [
    Dense(input_dim, 512),
    Batchnorm(512),
    ReLU(),
    Dropout(0.5),
    Dense(512, 256),
    Batchnorm(256),
    ReLU(),
    Dropout(0.5),
    Dense(256, output_dim),
    SoftMax()
]

# test the model
accuracy, precision, recall, f1, conf_matrix = tester(
    layers=layers,
    path='model/model.pkl',
    test_data=test_data,
    test_labels=test_labels,
    batch_size=64
)

# plot the confusion matrix
import seaborn as sns
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='g', cmap='Blues', xticklabels=test_dataset.classes, yticklabels=test_dataset.classes)
plt.xlabel('Predicted')
plt.ylabel('Actual')

plt.show()