In [1]:
import numpy as np
from torchvision import datasets, transforms
from matplotlib import pyplot as plt
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
import seaborn as sn
import pandas as pd
import pickle
np.random.seed(1)

Process data

In [2]:
def preprocess(dataset):
    images = []
    labels = []
    for data in dataset:
        image = np.array(data[0].numpy().squeeze().flatten())
        label = data[1]

        images.append(image)
        labels.append(label)
    
    return images, labels

In [3]:
def encode(Y):
    Y = np.eye(10)[Y]
    return Y

In [4]:
def load_data():
    transform = transforms.ToTensor()

    train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)

    test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

    tv_images, tv_labels = preprocess(train_dataset)
    X_train, X_val, y_train, y_val = train_test_split(tv_images, tv_labels, train_size=0.8, random_state=42)

    X_test, y_test = preprocess(test_dataset)

    y_train = encode(y_train)
    y_val = encode(y_val)
    y_test = encode(y_test)

    X_train = np.array(X_train)
    X_val = np.array(X_val)
    y_train = np.array(y_train)
    y_val = np.array(y_val)
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    return X_train, X_val, y_train, y_val, X_test, y_test

Optimization

In [5]:
class AdamOptimizer:
    def __init__(self, params, lr=0.005, beta1=0.9, beta2=0.999, eps=1e-6):
        self.params = params
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.m = [np.zeros_like(p) for p in params]
        self.v = [np.zeros_like(p) for p in params]
        self.t = 0
    
    def update(self, grads, lr):
        self.lr = lr
        self.t += 1
        updated_params = []
        for i, (param, grad) in enumerate(zip(self.params, grads)):
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * np.square(grad)
            
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)
            
            param -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
            updated_params.append(param)

        return updated_params


Normalization

In [6]:
class BatchNorm:
    def __init__(self, outputs, eps=1e-6):
        self.outputs = outputs
        self.gamma = np.ones((1, outputs))
        self.beta = np.zeros((1, outputs))
        self.eps = eps
        self.optimizer = AdamOptimizer([self.gamma, self.beta])
        self.mean = 0 
        self.variance = 0

    def reset(self):
        self.gamma = np.ones((1, self.outputs))
        self.beta = np.zeros((1, self.outputs))
        self.optimizer = AdamOptimizer([self.gamma, self.beta])
        self.mean = 0 
        self.variance = 0

    def clear(self):
        self.z_norm = None
        self.z_out = None

    def forward(self, z, training=True):
        if training:
            self.mean = np.mean(z, axis=0)
            self.variance = np.var(z, axis=0)
        self.z_norm = (z - self.mean) / np.sqrt(self.variance + self.eps)
        self.z_out = self.gamma * self.z_norm + self.beta
        return self.z_out

    def backward(self, dz, lr):
        dga = np.sum(dz * self.z_norm, axis=0)
        dbe = np.sum(dz, axis=0)
        N = dz.shape[0]
        dxstd = dz * self.gamma
        var_sqrt_i = 1.0/(np.sqrt(self.variance + self.eps))
        dx = (1.0 / N) * var_sqrt_i * (N * dxstd - np.sum(dxstd, axis=0) - self.z_norm * np.sum(dxstd * self.z_norm, axis=0))

        self.gamma, self.beta = self.optimizer.update([dga, dbe], lr)

        return dx

Activation

In [7]:
class Relu:
    def __init__(self):
        self.A = None

    def reset(self):
        self.A = None

    def clear(self):
        self.A = None
        
    def forward(self, z, training):
        self.A = np.maximum(0, z)
        return self.A
    
    def backward(self, dA, lr):
        return np.where(self.A > 0, dA, 0)

Regularization

In [8]:
class Dropout:
    def __init__(self, prob):
        self.prob = prob
        self.mask = None

    def reset(self):
        self.mask = None

    def clear(self):
        self.mask = None

    def forward(self, A, training=True):
        if training:
            self.mask = (np.random.rand(*A.shape) < self.prob).astype(float)
            A_dropped = A * self.mask / self.prob
            return A_dropped
        return A
    
    def backward(self, dA, lr):
        return dA * self.mask
    
    def dropout(self, A):
        mask = np.random.rand(*A.shape) < self.prob
        return A * mask/self.prob, mask

Regression

In [9]:
class Softmax:
    def __init__(self):
        pass

    def reset(self):
        pass

    def clear(self):
        self.A = None

    def forward(self, z, training):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        self.A = exp_z / np.sum(exp_z, axis=1, keepdims=True)
        return self.A
    
    def backward(self, dz, lr):
        return self.A - dz
    
    def compute_loss(self, A, Y):
        m = Y.shape[0]
        log_ll = -np.log(A[range(m), Y])
        loss = np.sum(log_ll) / m
        return loss

Dense Layer

In [10]:
class DenseLayer:
    def __init__(self, inputs, outputs):
        self.inputs = inputs
        self.outputs = outputs
        
        std = np.sqrt(2 / (inputs + outputs))
        self.biases = np.zeros(outputs)
        self.weights = np.random.normal(0, std, (inputs, outputs))
        #self.weights = np.random.randn(inputs, outputs)*np.sqrt(2.0/inputs)
        self.optimizer = AdamOptimizer([self.weights, self.biases])

    def reset(self):
        std = np.sqrt(2 / (self.inputs + self.outputs))
        self.biases = np.zeros(self.outputs)
        self.weights = np.random.normal(0, std, (self.inputs, self.outputs))
        self.optimizer = AdamOptimizer([self.weights, self.biases])

    def clear(self):
        self.X = None
        

    def forward(self, X, training):
        self.X = X
        z = np.dot(X, self.weights) + self.biases
        return z
    
    def backward(self, dz, lr):
        m = dz.shape[1]
        dw = np.dot(self.X.T, dz) / m
        db = np.mean(dz, axis=0)

        self.weights, self.biases = self.optimizer.update([dw, db], lr)

        return np.dot(dz, self.weights.T)/m

Prediction for testing

In [11]:
def predict(fnn, input):
    output = input
    for layer in fnn:
        output = layer.forward(output, training=False)
    return output

Compute Loss

In [12]:
def compute_loss(y_pred, y_true):
    m = y_true.shape[0]
    eps = 1e-7
    y_pred = np.clip(y_pred, eps, 1. - eps)
    #log_ll = -np.log(y_pred[range(m), y_true])
    loss = -np.sum(y_true * np.log(y_pred))/m
    return loss

Train Model

In [13]:
def train(fnn, X_train, y_train, X_val, y_val, epochs=10, lr=0.005, batch=1000):
    train_loss, validation_loss, train_acc, val_acc, val_f1 = [], [], [], [], []
    best_val_f1 = 0
    best_val_pred = None
    best_model = None
    n = len(X_train)
    for epoch in range(epochs):
        shuffle_indices = np.random.permutation(n)
        X_train = X_train[shuffle_indices]
        y_train = y_train[shuffle_indices]
        acc = 0.0
        correct_train = 0
        correct_val = 0
        total = 0
        training_loss = 0.0
        val_loss = 0.0

        batch_losses = []
        for i in range(0, n, batch):
            X_batch = X_train[i: i+batch]
            y_batch = y_train[i: i+batch]
            output = X_batch
            

            for layer in fnn:
                output = layer.forward(output, training=True)

            loss = compute_loss(output, y_batch)
            training_loss += loss.sum()

            correct_train += np.sum(np.argmax(output, axis=1) == np.argmax(y_batch, axis=1))
            total += len(y_batch)

            dA = y_batch
            for layer in reversed(fnn):
                dA = layer.backward(dA, lr)

        train_loss.append(training_loss)
        train_acc.append(correct_train/total)
        
        output = X_val

        for layer in fnn:
            output = layer.forward(output, training=False)
        
        loss = compute_loss(output, y_val)
        val_loss = loss.sum()
        validation_loss.append(val_loss)

        correct_val += np.sum(np.argmax(output, axis=1) == np.argmax(y_val, axis=1))
        acc = correct_val / len(X_val)
        val_acc.append(acc)

        f1 = f1_score(np.argmax(y_val, axis=1), np.argmax(output, axis=1), average='macro')
        val_f1.append(f1)

        if f1 > best_val_f1:
            best_val_f1 = f1
            best_val_pred = output
            best_model = fnn

            #epoch_loss = np.mean(batch_losses)
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {training_loss}, Validation Loss: {val_loss}")
        print(f"Training Accuracy: {correct_train / total * 100}, Validation Accuracy: {correct_val / len(y_val) * 100}", f"F1 Score: {f1}")

    plt.plot(train_loss, label='Train Loss')
    plt.plot(validation_loss, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    plt.plot(train_acc, label='Train Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    plt.plot(val_f1, label='Validation F1 Score (macro)')
    plt.xlabel('Epoch')
    plt.ylabel('F1 Score')
    plt.legend()
    plt.show()
    
    return best_val_f1, best_val_pred, best_model

Find model performances

In [14]:
def fashionmnist():
    X_train, X_val, y_train, y_val, X_test, y_test = load_data()

    fnns = [
        [
            DenseLayer(28*28, 512),
            Relu(),
            DenseLayer(512, 256),
            BatchNorm(256),
            Relu(),
            Dropout(0.8),
            DenseLayer(256, 10),
            Softmax()
        ],
        [
            DenseLayer(28*28, 512),
            BatchNorm(512),
            Relu(),
            Dropout(0.8),
            DenseLayer(512, 256),
            BatchNorm(256),
            Relu(),
            Dropout(0.8),
            DenseLayer(256, 10),
            Softmax()
        ],
        [
            DenseLayer(28*28, 512),
            BatchNorm(512),
            Relu(),
            Dropout(0.8),
            DenseLayer(512, 256),
            BatchNorm(256),
            Relu(),
            Dropout(0.8),
            DenseLayer(256, 128),
            BatchNorm(128),
            Relu(),
            Dropout(0.8),
            DenseLayer(128, 10),
            Softmax()
        ]
    ]

    lrs = [5e-3, 1e-3, 5e-4, 1e-4]
    best_f1 = 0
    best_model = None
    best_lr = 0
    best_model_no = 0

    for lr in lrs:
        print(f'Learning rate: {lr}')
        for i, fnn in enumerate(fnns):
            for layer in fnn:
                layer.reset()
            print(f'Model {i+1}')

            f1, pred, model = train(fnn, X_train, y_train, X_val, y_val, epochs=20, lr=lr)
            print(f'Best f1-score: {f1}')

            if f1 > best_f1:
                best_f1 = f1
                best_model = model
                best_lr = lr
                best_model_no = i+1
                
            cm = confusion_matrix(np.argmax(y_val, axis=1), np.argmax(pred, axis=1))
            labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
                        'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
            disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
            
            disp.plot(cmap=plt.cm.Blues)
            plt.xticks(rotation=90)
            plt.title("Confusion Matrix")
            plt.show()

    print(f'Best f-1: {best_f1}, Best LR: {best_lr}, Best Model no.: {best_model_no}')
    return best_model
  

Choose best model and pickle dump

In [None]:
best_model = fashionmnist()

with open('model_1905014.pkl', 'wb') as f:
    for layer in best_model:
        layer.clear()
    pickle.dump(best_model, f)

Testing Block

In [16]:
with open('model_1905014.pkl', 'rb') as f:
    model = pickle.load(f)

#test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)
with open('a1.pkl', 'rb') as a1:
  test_dataset = pickle.load(a1)
X_test, y_test = preprocess(test_dataset)

y_test = encode(y_test)

X_test = np.array(X_test)
y_test = np.array(y_test)

prediction = predict(model, X_test)
cm = confusion_matrix(np.argmax(y_test, axis=1), np.argmax(prediction, axis=1))

accuracy = np.mean(np.argmax(prediction, axis=1) == np.argmax(y_test, axis=1))
print(f"Test accuracy: " + str(accuracy*100))
print(f"Test f1 score: " + str(f1_score(np.argmax(y_test, axis=1), np.argmax(prediction, axis=1), average='macro')))

# labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
#             'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
# disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

# disp.plot(cmap=plt.cm.Blues)
# plt.xticks(rotation=90)
# plt.title("Confusion Matrix")
# plt.show()


Test accuracy: 24.14843533647189
Test f1 score: 0.09156366998567561
