In [None]:
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
from IPython.core.debugger import set_trace
import warnings
warnings.filterwarnings('ignore')
import os
from typing import List
from tqdm import tqdm
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split

train_df = pd.read_csv('archive/sign_mnist_train.csv')
test_df = pd.read_csv('archive/sign_mnist_test.csv')

x_train = train_df.iloc[:, 1:].values / 255.0
x_test = test_df.iloc[:, 1:].values / 255.0

x_train_mlp = x_train.reshape(-1, 28*28).astype(np.float32)
x_test_mlp = x_test.reshape(-1, 28*28).astype(np.float32)

y_train = train_df['label'].values
y_test = test_df['label'].values

# Convert labels to one-hot encoded format
num_classes = 26
y_train_encoded = np.eye(num_classes)[y_train]
y_test_encoded = np.eye(num_classes)[y_test]


In [None]:
class NeuralNetLayer:
    def __init__(self):
        self.gradient = None
        self.parameters = None
        
    def forward(self, x):
        raise NotImplementedError

    def backward(self, gradient):
        raise NotImplementedError
    
class LinearLayer(NeuralNetLayer):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.ni = input_size
        self.no = output_size
        self.w = np.random.randn(output_size, input_size) * np.sqrt(2. / input_size)  # He initialization
        self.b = np.random.randn(output_size)
        self.cur_input = None
        self.parameters = [self.w, self.b]

    def forward(self, x):
        self.cur_input = x
        return x @ self.w.T + self.b
        #return (self.w[None, :, :] @ x[:, :, None]).squeeze() + self.b

    def backward(self, gradient):
        assert self.cur_input is not None, "Must call forward before backward"
        #dw = gradient.dot(self.cur_input)
        # dw = gradient[:, :, None] @ self.cur_input[:, None, :]
        # db = gradient
        # self.gradient = [dw, db]
        # return gradient.dot(self.w)
        # Gradient with respect to weights
        dw = gradient.T @ self.cur_input
        # Gradient with respect to biases
        db = gradient.sum(axis=0)
        # Save the gradients
        self.gradient = [dw, db]
        # Gradient with respect to the input of this layer
        return gradient @ self.w
    
class ReLULayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        self.gradient = np.where(x > 0, 1.0, 0.0)
        return np.maximum(0, x)

    def backward(self, gradient):
        assert self.gradient is not None, "Must call forward before backward"
        return gradient * self.gradient
    

class SoftmaxOutputLayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()
        self.cur_probs = None

    def forward(self, x):
        exps = np.exp(x - np.max(x,axis=-1, keepdims=True))
        self.cur_probs = exps / np.sum(exps, axis=-1, keepdims=True)
        return self.cur_probs

    def backward(self, target):
        assert self.cur_probs is not None, "Must call forward before backward"
        return self.cur_probs - target
    
class MLP:
    def __init__(self, *args: List[NeuralNetLayer]):
        self.layers = args

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target)

    def fit(self, x, y, x_val, y_val, learning_rate, num_iterations, batch_size=26):
        optimizer = GradientDescentOptimizer(self, learning_rate)
        sample_count = len(x)
        training_acc_history = []
        training_loss_history = []
        val_acc_history = []
        
        for epoch in range(num_iterations):
            iter_loss = 0.0
            predictions = []

            indices = np.random.permutation(sample_count)
            random_x = x[indices]
            random_y = y[indices]

            for i in range(0, sample_count, batch_size):
                end = min(i + batch_size, sample_count)
                x_batch = random_x[i:end]
                y_batch = random_y[i:end]

                prediction = self.forward(x_batch)
                loss = compute_loss(prediction, y_batch)
                iter_loss += loss

                self.backward(y_batch)
                optimizer.step()

                y_pred = np.argmax(prediction, axis=-1)
                predictions.extend(y_pred)

                # Update learning rate with decay
                decay_rate = 0.95
                current_lr = learning_rate * (decay_rate ** epoch)
                optimizer.lr = current_lr

            training_accuracy = self.evaluate_acc(np.array(predictions), np.argmax(y[indices], axis=1))
            training_acc_history.append(training_accuracy)
            training_loss_history.append(iter_loss)
            
            # Calculate validation accuracy
            y_val_pred = self.predict(x_val)
            val_acc = self.evaluate_acc(y_val_pred, np.argmax(y_val, axis=1))
            val_acc_history.append(val_acc)
            
            print(f"Validation Accuracy: {val_acc*100}%")
            print(f"Epoch {epoch+1}/{num_iterations} Loss: {iter_loss} Training Accuracy: {training_accuracy*100}%")

        return training_accuracy, training_acc_history, training_loss_history, val_acc_history

    def evaluate_acc(self, y_pred, y_true):
        acc = np.mean(y_pred == y_true)
        return acc

    def predict(self, X):
        predictions = self.forward(X)
        return np.argmax(predictions, axis=1)



def compute_loss(y_pred, y_true):
    epsilon = 1e-8  # A small value to prevent log(0)
    probability = np.log(y_pred + epsilon)  # Add epsilon inside the log
    loss = (-np.sum(y_true * probability)) / len(y_true)
    return loss

class Optimizer:
    def __init__(self, net: MLP):
        self.net = net

    def step(self):
        for layer in self.net.layers[::-1]:
            if layer.parameters is not None:
                self.update(layer.parameters, layer.gradient)

    def update(self, params, gradient):
        raise NotImplementedError

class GradientDescentOptimizer(Optimizer):
    def __init__(self, net: MLP, lr: float, l2_lambda=0.001):
        super().__init__(net)
        self.lr = lr
        self.l2_lambda = l2_lambda

    # def update(self, params, gradient):
    #     for (p, g) in zip(params, gradient):
    #         p -= self.lr * (g + self.l2_lambda * p)  # L2 regularization term added
    def update(self, params, gradient):
        # Unpack the gradients for weight and bias
        grad_w, grad_b = gradient
        w, b = params
        
        # Update weights and biases with L2 regularization
        w -= self.lr * (grad_w + self.l2_lambda * w)
        b -= self.lr * (grad_b + self.l2_lambda * b)

# class GradientDescentOptimizer(Optimizer):
#     def __init__(self, net: MLP, lr: float):
#         super().__init__(net)
#         self.lr = lr

#     def update(self, params, gradient):
#         for (p, g) in zip(params, gradient):
#             p -= self.lr * g.mean(axis=0)

# Define the layer sizes
hidden_layer1 = 128
hidden_layer2 = 64

mlp_no_hidden_layer = MLP(
    LinearLayer(x_train_mlp.shape[-1], num_classes),
    SoftmaxOutputLayer()
)

mlp_one_hidden_layer = MLP(
    LinearLayer(x_train_mlp.shape[-1], hidden_layer1),
    ReLULayer(),
    LinearLayer(hidden_layer1, num_classes),
    SoftmaxOutputLayer()
)

mlp_two_hidden_layer = MLP(
    LinearLayer(x_train_mlp.shape[-1], hidden_layer1),
    ReLULayer(),
    LinearLayer(hidden_layer1, hidden_layer2),
    ReLULayer(),
    LinearLayer(hidden_layer2, num_classes),
    SoftmaxOutputLayer()
)

def train_and_plot(mlp_model, model_name, x_train, y_train, x_test, y_test, num_iterations=50, learning_rate=0.01):
    print(f"Training {model_name}...")
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1)
    print(f"Shape of y_val: {y_val.shape}")
    unique_values = np.unique(y_val)
    print(f"Unique values in y_val: {unique_values}")
    # Expecting: [0 1]
    # Expecting shape: (number_of_validation_samples, num_classes)
    training_acc, train_acc_history, loss_history, val_acc_history = mlp_model.fit(x_train, y_train, x_val, y_val, learning_rate, num_iterations)
    y_pred = mlp_model.predict(x_test)
    y_test_indices = np.argmax(y_test, axis=1)
    acc = mlp_model.evaluate_acc(y_pred, y_test_indices)
    print(f"Test Accuracy: {acc*100}%")

    # Plotting
    fig, ax1 = plt.subplots()

    color = 'tab:red'
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss', color=color)
    ax1.plot(range(1, num_iterations + 1), loss_history, color=color)
    ax1.tick_params(axis='y', labelcolor=color)

    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

    color = 'tab:blue'
    ax2.set_ylabel('Accuracy', color=color)  # we already handled the x-label with ax1
    ax2.plot(range(1, num_iterations + 1), train_acc_history, color=color)
    ax2.plot(range(1, num_iterations + 1), val_acc_history, color='tab:green')
    ax2.tick_params(axis='y', labelcolor=color)

    plt.title(f'Training Loss and Accuracy - {model_name}')
    fig.tight_layout()  # otherwise the right y-label is slightly clipped
    plt.show()

    return acc

# Train and plot for each model
models = zip(
    [mlp_no_hidden_layer, mlp_one_hidden_layer, mlp_two_hidden_layer],
    ["No Hidden Layer", "One Hidden Layer", "Two Hidden Layers"])

#train_and_plot(mlp_two_hidden_layer,"Two Hidden Layers" , x_train_mlp, y_train_encoded, x_test_mlp, y_test_encoded, num_iterations=500, learning_rate=0.1)
for mlp_model, model_name in models:
    train_and_plot(mlp_model, model_name, x_train_mlp, y_train_encoded, x_test_mlp, y_test_encoded, num_iterations=30, learning_rate=0.001)



In [None]:
# Experiment 3
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt

#(x_train, y_train), (x_test, y_test) = mnist.load_data()
#x_train = x_train.reshape((-1, 28, 28, 1)).astype('float32') / 255
#y_train = to_categorical(y_train)
#y_test = to_categorical(y_test)
x_train_cnn = x_train.reshape(-1, 28, 28, 1).astype('float32')
x_test_cnn = x_test.reshape(-1, 28, 28, 1).astype('float32')

def build_and_train_model(hidden_units):
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Flatten())
    model.add(layers.Dense(hidden_units, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    history = model.fit(x_train_cnn, y_train, epochs=5, batch_size=64, validation_split=0.2)
    test_loss, test_acc = model.evaluate(x_test_cnn, y_test)
    return history, test_acc

hidden_units_options = [32, 64, 128, 256]
performances = {}
histories = []

for units in hidden_units_options:
    print(f"Training model with {units} hidden units")
    history, test_acc = build_and_train_model(units)
    performances[units] = test_acc
    histories.append(history)
    print(f"Accuracy with {units} hidden units: {test_acc}")

accuracies = [performances[units] for units in hidden_units_options]
plt.figure(figsize=(10, 6))
plt.plot(hidden_units_options, accuracies, marker='o', linestyle='-', color='b')
plt.title('Model Performance by Number of Hidden Units')
plt.xlabel('Number of Hidden Units')
plt.ylabel('Accuracy')
plt.xticks(hidden_units_options)
plt.grid(True)
plt.show()

fig, axs = plt.subplots(1, len(hidden_units_options), figsize=(20, 5), sharey=True)

for i, history in enumerate(histories):
    axs[i].plot(history.history['loss'], label='Train Loss')
    axs[i].plot(history.history['val_loss'], label='Validation Loss')
    axs[i].set_title(f'{hidden_units_options[i]} Units')
    axs[i].set_xlabel('Epoch')
    axs[i].set_ylabel('Loss')
    axs[i].legend()

plt.tight_layout()
plt.show()

best_units = max(performances, key=performances.get)
print(f"Best performing model used {best_units} hidden units with an accuracy of {performances[best_units]}")
