In [14]:
import numpy as np
import pickle
import struct
import seaborn as sns
from sklearn.metrics import f1_score, confusion_matrix
import matplotlib.pyplot as plt
import os

In [15]:
class AdamOptimizer:
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999):
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.m = None
        self.v = None
        self.t = 0
    
    def update(self, param, grad):
        if self.m is None:
            self.m = np.zeros_like(param)
            self.v = np.zeros_like(param)
        
        self.t += 1
        self.m = self.beta1 * self.m + (1 - self.beta1) * grad
        self.v = self.beta2 * self.v + (1 - self.beta2) * grad ** 2
        m_hat = self.m / (1 - self.beta1 ** self.t)
        v_hat = self.v / (1 - self.beta2 ** self.t)
        param -= self.learning_rate * m_hat / (np.sqrt(v_hat) + 1e-8)

In [16]:
class Layers:
    def __init__(self, numNodesIn, numNodesOut, drop_out_prob = None, activation='relu',  learning_rate=0.0001, beta1=0.9, beta2=0.999):
        self.activation = activation
        self.w = np.random.randn(numNodesIn, numNodesOut) * np.sqrt(2. / numNodesIn)
        self.b = np.zeros((1, numNodesOut))  
        self.drop_out_prob = drop_out_prob
        self.dropout_mask = None
        self.prev_input = None
        self.z = None
        self.a = None
        self.delta = None
        self.gamma = np.ones((1, numNodesOut))  
        self.beta = np.zeros((1, numNodesOut)) 
        self.n = None
        self.x_hat = None
        self.mean = None
        self.variance = None
        self.out = None
        self.cache = None
        self.training = True  
        self.running_mean = np.zeros((1, numNodesOut))
        self.running_var = np.zeros((1, numNodesOut))

        self.w_optimizer = AdamOptimizer(learning_rate, beta1, beta2)
        self.b_optimizer = AdamOptimizer(learning_rate, beta1, beta2)
        self.gamma_optimizer = AdamOptimizer(learning_rate, beta1, beta2)
        self.beta_optimizer = AdamOptimizer(learning_rate, beta1, beta2)

    def relu(self, x):
        return np.maximum(0, x)
    
    def relu_derivative(self, x):
        return np.where(x > 0, 1, 0)

    def softmax(self, x):
        exps = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exps / np.sum(exps, axis=1, keepdims=True)
    
    def set_training_mode(self, training=True):
        self.training = training

    def batchnorm_forward(self, z):
        N, D = z.shape
        if self.training:
            mu = np.mean(z, axis=0, keepdims=True)
            var = np.var(z, axis=0, keepdims=True)
            xhat = (z - mu) / np.sqrt(var + 1e-8)
            self.out = self.gamma * xhat + self.beta
            momentum = 0.9
            self.running_mean = momentum * self.running_mean + (1 - momentum) * mu
            self.running_var = momentum * self.running_var + (1 - momentum) * var
            self.cache = (xhat, z - mu, 1. / np.sqrt(var + 1e-8), np.sqrt(var + 1e-8), var)
        else:
            xhat = (z - self.running_mean) / np.sqrt(self.running_var + 1e-8)
            self.out = self.gamma * xhat + self.beta
        return self.out
    
    def dropout(self, x):
        if self.drop_out_prob is not None and self.training:
            self.dropout_mask = (np.random.rand(1, x.shape[1]) > self.drop_out_prob).astype(float)
            return x * self.dropout_mask / (1 - self.drop_out_prob)
        else:
            return x

    def forward(self, input):
        self.prev_input = input  
        self.z = np.dot(input, self.w) + self.b  
        if self.activation == 'relu':
            self.n = self.batchnorm_forward(self.z)  
            self.a = self.dropout(self.relu(self.n))  
        elif self.activation == 'softmax':
            self.a = self.softmax(self.z)  
        return self.a
    
    def batchnorm_backward(self, dout):
        xhat, xmu, ivar, sqrtvar, var = self.cache
        N, D = dout.shape

        dbeta = np.sum(dout, axis=0, keepdims=True)
        dgamma = np.sum(dout * xhat, axis=0, keepdims=True)
        dxhat = dout * self.gamma
        dvar = np.sum(dxhat * xmu * -0.5 * (var + 1e-8) ** (-1.5), axis=0, keepdims=True)
        dmu = np.sum(dxhat * -ivar, axis=0, keepdims=True) + dvar * np.mean(-2. * xmu, axis=0, keepdims=True)
        dx = dxhat * ivar + dvar * 2 * xmu / N + dmu / N

        return dx, dgamma, dbeta

    def backward(self, upstream_grad):
        if self.activation == 'relu':
            activation_derivative = self.relu_derivative(self.n)

            self.delta = upstream_grad * activation_derivative 
            if self.drop_out_prob is not None and self.training:
                self.delta *= self.dropout_mask 

            dz, dgamma, dbeta = self.batchnorm_backward(self.delta) 

           
            gradient_w = np.dot(self.prev_input.T, dz) / self.prev_input.shape[0]  
            gradient_b = np.sum(dz, axis=0, keepdims=True) / self.prev_input.shape[0]  

            self.w_optimizer.update(self.w, gradient_w)
            self.b_optimizer.update(self.b, gradient_b)

            self.gamma_optimizer.update(self.gamma, dgamma / self.prev_input.shape[0])
            self.beta_optimizer.update(self.beta, dbeta / self.prev_input.shape[0])

            return np.dot(dz, self.w.T) 

        elif self.activation == 'softmax':
            self.delta = upstream_grad 

            gradient_w = np.dot(self.prev_input.T, self.delta) / self.prev_input.shape[0]  
            gradient_b = np.sum(self.delta, axis=0, keepdims=True) / self.prev_input.shape[0] 

            self.w_optimizer.update(self.w, gradient_w)
            self.b_optimizer.update(self.b, gradient_b)

            return np.dot(self.delta, self.w.T)  

In [17]:
class Network:
    def __init__(self, layers, drop_out = None, hidden_layer_activation='relu', output_layer_activation='softmax', loss_type='cross_entropy', learning_rate=0.0001, beta1 = 0.9, beta2 = 0.999):
        self.loss_type = loss_type
        self.layers = []  
        for i in range(len(layers) - 1):  
            if i == len(layers) - 2: 
                self.layers.append(Layers(layers[i], layers[i + 1],  drop_out_prob=drop_out, activation=output_layer_activation, learning_rate=learning_rate, beta1=beta1, beta2=beta2))
            else:
                self.layers.append(Layers(layers[i], layers[i + 1], activation=hidden_layer_activation,  learning_rate=learning_rate, beta1=beta1, beta2=beta2)) 
        self.output = None
    
    def forward(self, input, training=True):
        for layer in self.layers:
            if hasattr(layer, 'set_training_mode'):
                layer.set_training_mode(training)
            input = layer.forward(input)
        self.output = input
        return self.output
    
    def classify(self, input):
        self.output = self.forward(input, training=False)
        return np.argmax(self.output, axis=1)
    
    def calculateCost(self, target):
        output = self.output
        epsilon = 1e-12
        output = np.clip(output, epsilon, 1. - epsilon)
        return -np.sum(target * np.log(output)) / target.shape[0]
    
    def backward(self, target):
        loss = self.output - target
        for layer in reversed(self.layers):
            loss = layer.backward(loss)

    def save_parameters(self, file_path):
        parameters = {}
        for idx, layer in enumerate(self.layers):
            parameters[f'layer_{idx}'] = {
                'w': layer.w,
                'b': layer.b,
                'gamma': getattr(layer, 'gamma', None),
                'beta': getattr(layer, 'beta', None),
                'running_mean': layer.running_mean,
                'running_var': layer.running_var
            }
        with open(file_path, 'wb') as f:
            pickle.dump(parameters, f)

    def load_parameters(self, file_path):
        with open(file_path, 'rb') as f:
            parameters = pickle.load(f)
            for idx, layer in enumerate(self.layers):
                layer_params = parameters.get(f'layer_{idx}', {})
                if 'w' in layer_params:
                    layer.w = layer_params['w']
                if 'b' in layer_params:
                    layer.b = layer_params['b']
                if 'gamma' in layer_params and layer_params['gamma'] is not None:
                    layer.gamma = layer_params['gamma']
                if 'beta' in layer_params and layer_params['beta'] is not None:
                    layer.beta = layer_params['beta']
                if 'running_mean' in layer_params:
                    layer.running_mean = layer_params['running_mean']
                if 'running_var' in layer_params:
                    layer.running_var = layer_params['running_var']

In [18]:

def load_images(file_path):
    with open(file_path, 'rb') as f:
        magic, num, rows, cols = struct.unpack('>IIII', f.read(16))
        if magic != 2051:
            raise ValueError(f'Invalid magic number {magic} in image file: {file_path}')
        images = np.frombuffer(f.read(), dtype=np.uint8)
        images = images.reshape(num, rows * cols)
        images = images.astype(np.float32) / 255.0 
    return images

def load_labels(file_path):
    with open(file_path, 'rb') as f:
        magic, num = struct.unpack('>II', f.read(8))
        if magic != 2049:
            raise ValueError(f'Invalid magic number {magic} in label file: {file_path}')
        labels = np.frombuffer(f.read(), dtype=np.uint8)
    return labels

train_images = load_images('data/FashionMNIST/raw/train-images-idx3-ubyte')
train_labels = load_labels('data/FashionMNIST/raw/train-labels-idx1-ubyte')
test_images = load_images('data/FashionMNIST/raw/t10k-images-idx3-ubyte')
test_labels = load_labels('data/FashionMNIST/raw/t10k-labels-idx1-ubyte')


def create_batches(images, labels, batch_size=64, shuffle=True):
    num_samples = images.shape[0]
    indices = np.arange(num_samples)
    
    if shuffle:
        np.random.shuffle(indices)
    
    for start_idx in range(0, num_samples, batch_size):
        end_idx = min(start_idx + batch_size, num_samples)
        batch_indices = indices[start_idx:end_idx]
        yield images[batch_indices], labels[batch_indices]

batch_size = 64


train_loader = list(create_batches(train_images, train_labels, batch_size=batch_size, shuffle=True))
test_loader = list(create_batches(test_images, test_labels, batch_size=batch_size, shuffle=False))

def count_batches(generator):
    count = 0
    for _ in generator:
        count += 1
    return count

## Training

In [19]:
# layers = [784, 128, 64, 10] 
# network = Network(layers, drop_out=0.3, learning_rate=0.0001)

# save_path = '1905080.pkl'

# train_costs = []
# train_accuracies = []

# infer_cost = []
# infer_accuracies = []

# val_labels = []
# val_predictions = []

# f1_scores = []

# for epoch in range(25):
#     correct_train = 0
#     total_train = 0
#     epoch_cost_train = 0

#     val_labels = []
#     val_predictions = []

#     for images, labels in train_loader:
#         labels_one_hot = np.eye(10)[labels]
#         network.forward(images)
#         cost = network.calculateCost(labels_one_hot)
#         network.backward(labels_one_hot)
#         predictions = network.classify(images)
#         correct_train += np.sum(predictions == labels)
#         total_train += len(labels)
#         epoch_cost_train += cost

#     avg_cost_train = 100 * epoch_cost_train /len(train_loader)
#     train_costs.append(avg_cost_train)

#     train_accuracy = 100 * correct_train / total_train
#     train_accuracies.append(train_accuracy)
    
#     correct_infer = 0
#     total_infer = 0
#     epoch_cost_infer = 0

#     for images, labels in test_loader:
#         outputs = network.classify(images)
#         correct_infer += np.sum(outputs == labels)
#         labels_one_hot = np.eye(10)[labels]
#         cost = network.calculateCost(labels_one_hot)
#         total_infer += len(labels)
#         epoch_cost_infer += cost

#         val_labels.extend(labels)
#         val_predictions.extend(outputs)
    
#     avg_cost_test = 100 * epoch_cost_infer / len(test_loader)
#     infer_cost.append(avg_cost_test)

#     infer_accuracy = 100 * correct_infer / total_infer
#     infer_accuracies.append(infer_accuracy)

#     f1 = 100* f1_score(val_labels, val_predictions, average='weighted')
#     f1_scores.append(f1)

# class_names = [f'Class {i}' for i in range(10)]
# cm = confusion_matrix(val_labels, val_predictions)
# sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
#             xticklabels=class_names,
#             yticklabels=class_names)
# plt.xlabel('Predicted Label')
# plt.ylabel('True Label')
# plt.title('[784, 128, 10] - lr: 0.0001')
# plt.show()

# plt.plot(train_costs, label='Training Cost')
# plt.plot(infer_cost, label='Inference Cost')
# plt.xlabel('Epoch')
# plt.ylabel('Cost')
# plt.title('[784, 128, 10] - lr: 0.0001')
# plt.legend()
# plt.show()

# plt.plot(train_accuracies, label='Training Accuracy')
# plt.plot(infer_accuracies, label='Inference Accuracy')
# plt.plot(f1_scores, label='F1 Score')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('[784, 128, 10] - lr: 0.0001')
# plt.legend()
# plt.show()


# network.save_parameters(save_path)
# print(f"Parameters saved successfully at '{save_path}'")


## Testing

In [20]:
layers = [784, 128, 64, 10] 
network = Network(layers, drop_out=0.3, learning_rate=0.0001)
model_path = '1905080.pkl'
network.load_parameters(model_path)

correct_infer = 0
total_infer = 0
epoch_cost_infer = 0

val_labels = []
val_predictions = []

for images, labels in test_loader:
    outputs = network.classify(images)
    correct_infer += np.sum(outputs == labels)
    labels_one_hot = np.eye(10)[labels]
    cost = network.calculateCost(labels_one_hot)
    total_infer += len(labels)
    epoch_cost_infer += cost

    val_labels.extend(labels)
    val_predictions.extend(outputs)

avg_cost_test = 100 * epoch_cost_infer / len(test_loader)
print(f'Cost: {avg_cost_test}')

infer_accuracy = 100 * correct_infer / total_infer
print(f'Accuracy: {infer_accuracy}%')

f1 = 100* f1_score(val_labels, val_predictions, average='weighted')
print(f'F1 Score: {f1}%')


Cost: 36.60959061202709
Accuracy: 88.07%
F1 Score: 88.13611184014867%
