<a href="https://colab.research.google.com/github/SaiRajesh228/DeepLearningAssignment1/blob/main/DA6401_ASSIGNMENT1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import wandb
from keras.datasets import fashion_mnist
import argparse

# -----------------------
# Activation functions and their derivatives
def relu(z):
    return np.maximum(0, z)
def relu_deriv(z):
    return (z > 0).astype(float)
def sigmoid(z):
    return 1 / (1 + np.exp(-z))
def sigmoid_deriv(z):
    s = sigmoid(z)
    return s * (1 - s)
def tanh(z):
    return np.tanh(z)
def tanh_deriv(z):
    return 1 - np.tanh(z)**2

activations = {
    "relu": (relu, relu_deriv),
    "sigmoid": (sigmoid, sigmoid_deriv),
    "tanh": (tanh, tanh_deriv)
}

# -----------------------
# FeedForward Neural Network Class
class FeedForwardNN:
    def __init__(self, input_size, hidden_sizes, output_size, activation="relu", initializer="Xavier"):
        self.num_layers = len(hidden_sizes) + 1  # hidden layers + output layer
        self.activation = activation
        self.weights = []
        self.biases = []

        layer_sizes = [input_size] + hidden_sizes + [output_size]
        for i in range(self.num_layers):
            if initializer == "Xavier":
                limit = np.sqrt(6 / (layer_sizes[i] + layer_sizes[i+1]))
                W = np.random.uniform(-limit, limit, (layer_sizes[i], layer_sizes[i+1]))
            else:
                W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
            b = np.zeros((1, layer_sizes[i+1]))
            self.weights.append(W)
            self.biases.append(b)

    def forward(self, X):
        activ_func, _ = activations[self.activation]
        self.z_values = []
        self.a_values = [X]
        A = X
        for i in range(self.num_layers):
            Z = A @ self.weights[i] + self.biases[i]
            self.z_values.append(Z)
            if i == self.num_layers - 1:
                # Use softmax at the output layer for classification
                exp_scores = np.exp(Z - np.max(Z, axis=1, keepdims=True))
                A = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
            else:
                A = activ_func(Z)
            self.a_values.append(A)
        return A

    def compute_loss(self, Y_pred, Y_true, loss_type="cross_entropy"):
        m = Y_true.shape[0]
        if loss_type == "cross_entropy":
            loss = -np.sum(Y_true * np.log(Y_pred + 1e-8)) / m
        elif loss_type == "mean_squared_error":
            loss = np.sum((Y_true - Y_pred)**2) / (2 * m)
        return loss

    def backprop(self, X, Y, loss_type="cross_entropy"):
        m = X.shape[0]
        grads_W = [None] * self.num_layers
        grads_b = [None] * self.num_layers

        A_final = self.a_values[-1]
        if loss_type == "cross_entropy":
            dA = A_final - Y  # derivative for softmax with cross entropy
        elif loss_type == "mean_squared_error":
            dA = (A_final - Y)

        for i in reversed(range(self.num_layers)):
            if i == self.num_layers - 1:
                dZ = dA
            else:
                _, deriv_func = activations[self.activation]
                dZ = dA * deriv_func(self.z_values[i])
            A_prev = self.a_values[i]
            grads_W[i] = A_prev.T @ dZ / m
            grads_b[i] = np.sum(dZ, axis=0, keepdims=True) / m
            if i > 0:
                dA = dZ @ self.weights[i].T
        return grads_W, grads_b

    def update_parameters(self, grads_W, grads_b, optimizer, config, caches):
        # Fix KeyError issue by accessing learning_rate as an attribute
        lr = config.learning_rate

        # Basic SGD optimizer
        if optimizer == "sgd":
            for i in range(self.num_layers):
                self.weights[i] -= lr * grads_W[i]
                self.biases[i] -= lr * grads_b[i]

        # Mini-batch Gradient Descent (identical to SGD in this implementation)
        elif optimizer == "mbgd":
            for i in range(self.num_layers):
                self.weights[i] -= lr * grads_W[i]
                self.biases[i] -= lr * grads_b[i]

        # Nesterov Accelerated Gradient
        elif optimizer == "nesterov":
            # Initialize momentum if not present
            if "nesterov" not in caches:
                caches["nesterov"] = {
                    "v_W": [np.zeros_like(w) for w in self.weights],
                    "v_b": [np.zeros_like(b) for b in self.biases]
                }

            mu = 0.9  # momentum coefficient
            for i in range(self.num_layers):
                # Weights update with Nesterov momentum
                v_prev = caches["nesterov"]["v_W"][i].copy()
                caches["nesterov"]["v_W"][i] = mu * caches["nesterov"]["v_W"][i] - lr * grads_W[i]
                self.weights[i] += -mu * v_prev + (1 + mu) * caches["nesterov"]["v_W"][i]

                # Biases update with Nesterov momentum
                v_prev = caches["nesterov"]["v_b"][i].copy()
                caches["nesterov"]["v_b"][i] = mu * caches["nesterov"]["v_b"][i] - lr * grads_b[i]
                self.biases[i] += -mu * v_prev + (1 + mu) * caches["nesterov"]["v_b"][i]

        # RMSprop optimizer
        elif optimizer == "rmsprop":
            # Initialize RMSprop cache if not present
            if "rmsprop" not in caches:
                caches["rmsprop"] = {
                    "cache_W": [np.zeros_like(w) for w in self.weights],
                    "cache_b": [np.zeros_like(b) for b in self.biases]
                }

            decay_rate = 0.9
            epsilon = 1e-8

            for i in range(self.num_layers):
                # Update for weights
                caches["rmsprop"]["cache_W"][i] = decay_rate * caches["rmsprop"]["cache_W"][i] + (1 - decay_rate) * (grads_W[i]**2)
                self.weights[i] -= lr * grads_W[i] / (np.sqrt(caches["rmsprop"]["cache_W"][i]) + epsilon)

                # Update for biases
                caches["rmsprop"]["cache_b"][i] = decay_rate * caches["rmsprop"]["cache_b"][i] + (1 - decay_rate) * (grads_b[i]**2)
                self.biases[i] -= lr * grads_b[i] / (np.sqrt(caches["rmsprop"]["cache_b"][i]) + epsilon)

        # Adam optimizer
        elif optimizer == "adam":
            # Initialize Adam parameters if not already done
            if "adam" not in caches:
                caches["adam"] = {
                    "m_W": [np.zeros_like(w) for w in self.weights],
                    "v_W": [np.zeros_like(w) for w in self.weights],
                    "m_b": [np.zeros_like(b) for b in self.biases],
                    "v_b": [np.zeros_like(b) for b in self.biases],
                    "t": 0
                }

            # Adam hyperparameters
            beta1 = 0.9
            beta2 = 0.999
            epsilon = 1e-8

            # Update time step
            caches["adam"]["t"] += 1
            t = caches["adam"]["t"]

            # Update parameters for each layer
            for i in range(self.num_layers):
                # Update momentum and RMSprop terms for weights
                caches["adam"]["m_W"][i] = beta1 * caches["adam"]["m_W"][i] + (1 - beta1) * grads_W[i]
                caches["adam"]["v_W"][i] = beta2 * caches["adam"]["v_W"][i] + (1 - beta2) * (grads_W[i]**2)

                # Bias correction
                m_W_corrected = caches["adam"]["m_W"][i] / (1 - beta1**t)
                v_W_corrected = caches["adam"]["v_W"][i] / (1 - beta2**t)

                # Update weights
                self.weights[i] -= lr * m_W_corrected / (np.sqrt(v_W_corrected) + epsilon)

                # Same for biases
                caches["adam"]["m_b"][i] = beta1 * caches["adam"]["m_b"][i] + (1 - beta1) * grads_b[i]
                caches["adam"]["v_b"][i] = beta2 * caches["adam"]["v_b"][i] + (1 - beta2) * (grads_b[i]**2)
                m_b_corrected = caches["adam"]["m_b"][i] / (1 - beta1**t)
                v_b_corrected = caches["adam"]["v_b"][i] / (1 - beta2**t)
                self.biases[i] -= lr * m_b_corrected / (np.sqrt(v_b_corrected) + epsilon)

        # Nadam optimizer (Adam with Nesterov momentum)
        elif optimizer == "nadam":
            # Initialize Nadam parameters if not already done
            if "nadam" not in caches:
                caches["nadam"] = {
                    "m_W": [np.zeros_like(w) for w in self.weights],
                    "v_W": [np.zeros_like(w) for w in self.weights],
                    "m_b": [np.zeros_like(b) for b in self.biases],
                    "v_b": [np.zeros_like(b) for b in self.biases],
                    "t": 0
                }

            # Nadam hyperparameters
            beta1 = 0.9
            beta2 = 0.999
            epsilon = 1e-8

            # Update time step
            caches["nadam"]["t"] += 1
            t = caches["nadam"]["t"]

            # Update parameters for each layer
            for i in range(self.num_layers):
                # Update momentum and RMSprop terms for weights
                caches["nadam"]["m_W"][i] = beta1 * caches["nadam"]["m_W"][i] + (1 - beta1) * grads_W[i]
                caches["nadam"]["v_W"][i] = beta2 * caches["nadam"]["v_W"][i] + (1 - beta2) * (grads_W[i]**2)

                # Bias correction
                m_W_corrected = caches["nadam"]["m_W"][i] / (1 - beta1**t)
                v_W_corrected = caches["nadam"]["v_W"][i] / (1 - beta2**t)

                # Nesterov momentum update
                m_W_nesterov = beta1 * m_W_corrected + (1 - beta1) * grads_W[i] / (1 - beta1**t)

                # Update weights with Nadam
                self.weights[i] -= lr * m_W_nesterov / (np.sqrt(v_W_corrected) + epsilon)

                # Same for biases
                caches["nadam"]["m_b"][i] = beta1 * caches["nadam"]["m_b"][i] + (1 - beta1) * grads_b[i]
                caches["nadam"]["v_b"][i] = beta2 * caches["nadam"]["v_b"][i] + (1 - beta2) * (grads_b[i]**2)
                m_b_corrected = caches["nadam"]["m_b"][i] / (1 - beta1**t)
                v_b_corrected = caches["nadam"]["v_b"][i] / (1 - beta2**t)
                m_b_nesterov = beta1 * m_b_corrected + (1 - beta1) * grads_b[i] / (1 - beta1**t)
                self.biases[i] -= lr * m_b_nesterov / (np.sqrt(v_b_corrected) + epsilon)

        return caches

# -----------------------
# Utility functions
def one_hot_encode(y, num_classes):
    m = y.shape[0]
    one_hot = np.zeros((m, num_classes))
    one_hot[np.arange(m), y] = 1
    return one_hot

def compute_accuracy(Y_pred, Y_true):
    pred_labels = np.argmax(Y_pred, axis=1)
    true_labels = np.argmax(Y_true, axis=1)
    return np.mean(pred_labels == true_labels)

def plot_confusion_matrix(Y_pred, y_true, class_names=None):
    from sklearn.metrics import confusion_matrix
    import seaborn as sns
    cm = confusion_matrix(y_true, np.argmax(Y_pred, axis=1))
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()

# -----------------------
# Training function for wandb sweep agent
def train():
    # Initialize wandb run and read config values
    wandb.init()
    config = wandb.config

    # Load the Fashion-MNIST dataset
    (X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()
    X_train = X_train.reshape(X_train.shape[0], -1) / 255.
    X_test = X_test.reshape(X_test.shape[0], -1) / 255.
    num_classes = 10
    y_train_oh = one_hot_encode(y_train, num_classes)
    y_test_oh = one_hot_encode(y_test, num_classes)

    # Split validation set (10% of training)
    split = int(0.9 * X_train.shape[0])
    X_val, y_val_oh = X_train[split:], y_train_oh[split:]
    X_train, y_train_oh = X_train[:split], y_train_oh[:split]

    # Build the model using hyperparameters from wandb.config
    input_size = X_train.shape[1]
    hidden_sizes = [config.hiddennodes] * config.hiddenlayers
    model = FeedForwardNN(input_size, hidden_sizes, num_classes,
                          activation=config.activation_func, initializer=config.initializer)

    caches = {}  # For optimizer-specific states if needed
    for epoch in range(config.num_epochs):
        permutation = np.random.permutation(X_train.shape[0])
        X_train = X_train[permutation]
        y_train_oh = y_train_oh[permutation]
        num_batches = X_train.shape[0] // config.batch_size
        epoch_loss = 0
        for i in range(num_batches):
            start = i * config.batch_size
            end = start + config.batch_size
            X_batch = X_train[start:end]
            y_batch = y_train_oh[start:end]
            Y_pred = model.forward(X_batch)
            loss = model.compute_loss(Y_pred, y_batch, loss_type=config.loss)
            epoch_loss += loss
            grads_W, grads_b = model.backprop(X_batch, y_batch, loss_type=config.loss)
            caches = model.update_parameters(grads_W, grads_b, config.opt, config, caches)
        avg_loss = epoch_loss / num_batches
        val_pred = model.forward(X_val)
        val_acc = compute_accuracy(val_pred, y_val_oh)
        wandb.log({"epoch": epoch+1, "loss": avg_loss, "val_accuracy": val_acc})
        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Val Accuracy={val_acc:.4f}")

    test_pred = model.forward(X_test)
    test_acc = compute_accuracy(test_pred, y_test_oh)
    wandb.log({"test_accuracy": test_acc})
    print(f"Test Accuracy: {test_acc:.4f}")
    plot_confusion_matrix(test_pred, y_test, class_names=[str(i) for i in range(num_classes)])

# -----------------------
# Sweep configuration for wandb
sweep_config = {
    'name': "karapa-rajesh",
    'method': 'bayes',
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'hiddenlayers': {'values': [3, 4, 5]},
        'num_epochs': {'values': [5, 10]},
        'hiddennodes': {'values': [32, 64, 128]},
        'learning_rate': {'values': [1e-3, 1e-4]},
        'initializer': {'values': ["random", "Xavier"]},
        'batch_size': {'values': [16, 32, 64]},
        'opt': {'values': ["sgd", "mbgd", "nesterov", "rmsprop", "adam", "nadam"]},
        'activation_func': {'values': ["sigmoid", "tanh", "relu"]},
        'loss': {'values': ["cross_entropy", "mean_squared_error"]}
    }
}

# -----------------------
# Main block to run either a single training run or a sweep agent.
if __name__ == "__main__":
    # To create a sweep (run once and note the sweep ID), uncomment the following:
    #  sweep_id = wandb.sweep(sweep_config, project="DeepLearning")
    #  print("Sweep ID:", sweep_id)

    # For a standalone training run (without sweeps), uncomment the following:
    # train()

    # To run as a sweep agent, replace "YOUR_SWEEP_ID_HERE" with your actual sweep ID and run:
    wandb.agent("ijfbxx3d", function=train, count=5)

[34m[1mwandb[0m: Agent Starting Run: 2ov0u2qd with config:
[34m[1mwandb[0m: 	activation_func: relu
[34m[1mwandb[0m: 	batch_size: 16
[34m[1mwandb[0m: 	hiddenlayers: 5
[34m[1mwandb[0m: 	hiddennodes: 128
[34m[1mwandb[0m: 	initializer: Xavier
[34m[1mwandb[0m: 	learning_rate: 0.0001
[34m[1mwandb[0m: 	loss: cross_entropy
[34m[1mwandb[0m: 	num_epochs: 10
[34m[1mwandb[0m: 	opt: adam


Epoch 1: Loss=0.6056, Val Accuracy=0.8395
Epoch 2: Loss=0.4126, Val Accuracy=0.8623
