In [2]:
import pickle
import os
import pandas as pd
import numpy as np
from numpy import dtype, ndarray


In [3]:
train_file = "extended_mnist_train.pkl"
test_file = "extended_mnist_test.pkl"

with open(train_file, "rb") as fp:
    train = pickle.load(fp)

with open(test_file, "rb") as fp:
    test = pickle.load(fp)

In [4]:
train_data = []
train_labels = []
for image, label in train:
    train_data.append(image.flatten())
    train_labels.append(label)


In [5]:
test_data = []
for image, label in test:
    test_data.append(image.flatten())


In [6]:
# You must use NumPy to implement from scratch
from typing import Tuple, Dict, Any

class MLP:

    def __init__(self, input_dim=784, hidden_dim=100, output_dim=10, weight_decay=1e-4, seed=42):
        """
        Initializes the network's weights and biases.
        """
        rng = np.random.default_rng(seed)

        # Layer 1: Input to Hidden Layer
        # Using He initialization, which is well-suited for ReLU activation functions.
        self.WeightMatrix1 = rng.normal(0.0, np.sqrt(2.0 / input_dim), size=(input_dim, hidden_dim)).astype(np.float32)
        self.BiasVectorInput = np.zeros((hidden_dim,), dtype=np.float32)

        # Layer 2: Hidden Layer to Output Layer
        
        self.WeightMatrix2 = rng.normal(0.0, np.sqrt(2.0 / hidden_dim), size=(hidden_dim, output_dim)).astype(np.float32)
        self.BiasVectorOutput = np.zeros((output_dim,), dtype=np.float32)

        self.weight_decay = float(weight_decay)  # Weight decay coefficient for L2 regularization

    @staticmethod
    def _preprocess(X: np.ndarray) -> np.ndarray:
        """
        Normalizes pixel values from the 0-255 range to the 0.0-1.0 range.
        """
        X_float = np.asarray(X, dtype=np.float32)
        return X_float / 255.0

    # ReLU: f(x) = max(0, x)
    @staticmethod
    def _relu(z: np.ndarray) -> np.ndarray:
        return np.maximum(z, 0.0, dtype=np.float32)

    @staticmethod
    def _relu_grad(z: np.ndarray) -> np.ndarray:
        # The gradient is 1 for inputs > 0, and 0 otherwise.
        grad = np.zeros_like(z, dtype=np.float32)
        grad[z > 0] = 1.0
        return grad

    @staticmethod
    def _softmax(logits: np.ndarray) -> np.ndarray: # converts logits into softmaxProbabilities.
        stabilizederivativeLossLogits = logits - logits.max(axis=1, keepdims=True)
        # Use higher precision for the exponentiation step
        exponentials = np.exp(stabilizederivativeLossLogits, dtype=np.float64)
        # Normalize to get softmaxProbabilities
        softmaxProbabilities = exponentials / exponentials.sum(axis=1, keepdims=True)
        return softmaxProbabilities.astype(np.float32)

    def forward(self, X: np.ndarray, cache: bool = False) -> tuple[ndarray[tuple[Any, ...], dtype[Any]], dict[
        str, ndarray[tuple[Any, ...], dtype[Any]] | ndarray[tuple[Any, ...], dtype[Any]]]] | tuple[
                                                                 ndarray[tuple[Any, ...], dtype[Any]], None]:
        X_normalized = self._preprocess(X)

        z1 = X_normalized @ self.WeightMatrix1 + self.BiasVectorInput
        h1 = self._relu(z1)

        logits = h1 @ self.WeightMatrix2 + self.BiasVectorOutput

        if cache:
            saved_cache = {"X_normalized": X_normalized, "z1": z1, "h1": h1}
            return logits, saved_cache
        return logits, None

    def _loss_and_grads(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, Dict[str, np.ndarray]]:
        # Get the final scores (logits) and the intermediate values
        logits, cache = self.forward(X, cache=True)
        num_samples = y.shape[0]

        # --- Loss Calculation ---
        softmaxProbabilities = self._softmax(logits)

        # log-likelihood loss
        correct_logsoftmaxProbabilities = -np.log(np.clip(softmaxProbabilities[np.arange(num_samples), y], 1e-12, 1.0))
        data_loss = float(correct_logsoftmaxProbabilities.mean())

        # L2
        regularizationLoss = 0.5 * self.weight_decay * (np.sum(self.WeightMatrix1**2) + np.sum(self.WeightMatrix2**2)) # a penalty to prevent overfitting
        total_loss = data_loss + float(regularizationLoss)

        # Backpropagation
        # Gradient of the loss
        derivativeLossLogits = softmaxProbabilities
        derivativeLossLogits[np.arange(num_samples), y] -= 1.0
        derivativeLossLogits /= num_samples

        # Gradients for WeightMatrix2 and BiasVectorOutput (Output Layer)
        d_WeightMatrix2 = cache["h1"].T @ derivativeLossLogits + self.weight_decay * self.WeightMatrix2
        d_BiasVectorOutput = derivativeLossLogits.sum(axis=0)

        # gradient back to the hidden layer
        derivativeHiddenLayer1 = derivativeLossLogits @ self.WeightMatrix2.T
        lossGradientBeforeReLU = derivativeHiddenLayer1 * self._relu_grad(cache["z1"])   # chain rule

        # gradients for WeightMatrix1 and BiasVectorInput (Hidden Layer)
        d_WeightMatrix1 = cache["X_normalized"].T @ lossGradientBeforeReLU + self.weight_decay * self.WeightMatrix1
        d_BiasVectorInput = lossGradientBeforeReLU.sum(axis=0)

        # a dictionary for gradients
        grads = {"WeightMatrix1": d_WeightMatrix1, "BiasVectorInput": d_BiasVectorInput, "WeightMatrix2": d_WeightMatrix2, "BiasVectorOutput": d_BiasVectorOutput}
        return total_loss, grads

    def _accuracy(self, X: np.ndarray, y: np.ndarray, batch_size: int = 1024) -> float:
        predictions = self.predict(X, batch_size=batch_size)
        return float((predictions == y).mean())

    def evaluate(self, X: np.ndarray, y: np.ndarray, batch_size: int = 1024) -> Tuple[float, float]:
        num_samples = X.shape[0]
        total_data_loss = 0.0

        # calculates loss
        for i in range(0, num_samples, batch_size):
            X_batch = X[i:i + batch_size]
            y_batch = y[i:i + batch_size]
            num_batch_samples = y_batch.shape[0]

            logits, _ = self.forward(X_batch, cache=False)
            softmaxProbabilities = self._softmax(logits)
            correct_logsoftmaxProbabilities = -np.log(np.clip(softmaxProbabilities[np.arange(num_batch_samples), y_batch], 1e-12, 1.0))
            total_data_loss += correct_logsoftmaxProbabilities.sum()

        # average data loss + regularization loss
        mean_data_loss = total_data_loss / num_samples
        regularizationLoss = 0.5 * self.weight_decay * (np.sum(self.WeightMatrix1**2) + np.sum(self.WeightMatrix2**2))
        total_loss = mean_data_loss + regularizationLoss

        # Calculate accuracy
        acc = self._accuracy(X, y, batch_size=batch_size)
        return float(total_loss), acc

    def fit(self,
            X_train: np.ndarray,
            y_train: np.ndarray,
            X_val: np.ndarray,
            y_val: np.ndarray,
            epochs: int = 20,
            batch_size: int = 256,
            lr: float = 0.1,
            seed: int = 42) -> Dict[str, list]:

        # Trains the model using mini-batch Stochastic Gradient Descent (SGD).

        rng = np.random.default_rng(seed)
        X_train = np.asarray(X_train)
        y_train = np.asarray(y_train, dtype=np.int64)
        X_val = np.asarray(X_val)
        y_val = np.asarray(y_val, dtype=np.int64)

        history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

        for epoch in range(1, epochs + 1):
            # Shuffle the training data at the beginning of each epoch
            shuffled_indices = rng.permutation(X_train.shape[0])
            X_train_shuffled = X_train[shuffled_indices]
            y_train_shuffled = y_train[shuffled_indices]

            # Process the data in mini-batches
            for i in range(0, X_train.shape[0], batch_size):
                X_batch = X_train_shuffled[i:i + batch_size]
                y_batch = y_train_shuffled[i:i + batch_size]

                loss, grads = self._loss_and_grads(X_batch, y_batch)

                # Update parameters using SGD
                self.WeightMatrix1 -= lr * grads["WeightMatrix1"]
                self.BiasVectorInput -= lr * grads["BiasVectorInput"]
                self.WeightMatrix2 -= lr * grads["WeightMatrix2"]
                self.BiasVectorOutput -= lr * grads["BiasVectorOutput"]

            # Evaluate and record metrics at the end of the epoch
            tr_loss, tr_acc = self.evaluate(X_train, y_train)
            va_loss, va_acc = self.evaluate(X_val, y_val)
            history["train_loss"].append(tr_loss)
            history["train_acc"].append(tr_acc)
            history["val_loss"].append(va_loss)
            history["val_acc"].append(va_acc)
            print(f"epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.4f} | val loss {va_loss:.4f} acc {va_acc:.4f}")

        return history

    def predict(self, X: np.ndarray, batch_size: int = 1024) -> np.ndarray:
        """
        Makes predictions for a given dataset.
        """
        X = np.asarray(X)
        num_samples = X.shape[0]
        predictions = np.empty((num_samples,), dtype=np.int64)

        # Process in batches to conserve memory
        for i in range(0, num_samples, batch_size):
            X_batch = X[i:i + batch_size]
            logits, _ = self.forward(X_batch, cache=False)
            predictions[i:i + batch_size] = np.argmax(logits, axis=1)

        return predictions

# --- Training and evaluation (place this cell before the existing 'predictions = mlp.predict(test_data)' cell) ---

# Use the prepared lists from the notebook: train_data, train_labels
X = np.asarray(train_data, dtype=np.float32)
y = np.asarray(train_labels, dtype=np.int64)

# Train/validation split
rng = np.random.default_rng(123)
perm = rng.permutation(X.shape[0])
val_ratio = 0.1
val_size = int(len(perm) * val_ratio)
val_idx = perm[:val_size]
train_idx = perm[val_size:]

X_tr, y_tr = X[train_idx], y[train_idx]
X_va, y_va = X[val_idx], y[val_idx]

# Model, train, and report
# Using the new, slower, more readable model with 100 hidden neurons
mlp = MLP(input_dim=784, hidden_dim=100, output_dim=10, weight_decay=5e-4, seed=1)
history = mlp.fit(X_tr, y_tr, X_va, y_va, epochs=80, batch_size=256, lr=0.1, seed=1)

# Final metrics
final_train_loss, final_train_acc = mlp.evaluate(X_tr, y_tr)
final_val_loss, final_val_acc = mlp.evaluate(X_va, y_va)
print(f"final train: loss={final_train_loss:.4f}, acc={final_train_acc:.4f}")
print(f"final val:   loss={final_val_loss:.4f}, acc={final_val_acc:.4f}")


epoch 01 | train loss 0.4188 acc 0.9012 | val loss 0.4344 acc 0.8977
epoch 02 | train loss 0.3562 acc 0.9165 | val loss 0.3752 acc 0.9127
epoch 03 | train loss 0.3214 acc 0.9276 | val loss 0.3447 acc 0.9225
epoch 04 | train loss 0.2966 acc 0.9345 | val loss 0.3226 acc 0.9262
epoch 05 | train loss 0.2786 acc 0.9398 | val loss 0.3067 acc 0.9322
epoch 06 | train loss 0.2617 acc 0.9445 | val loss 0.2898 acc 0.9375
epoch 07 | train loss 0.2512 acc 0.9487 | val loss 0.2818 acc 0.9405
epoch 08 | train loss 0.2417 acc 0.9501 | val loss 0.2736 acc 0.9420
epoch 09 | train loss 0.2283 acc 0.9551 | val loss 0.2608 acc 0.9465
epoch 10 | train loss 0.2216 acc 0.9568 | val loss 0.2546 acc 0.9452
epoch 11 | train loss 0.2133 acc 0.9599 | val loss 0.2467 acc 0.9480
epoch 12 | train loss 0.2069 acc 0.9617 | val loss 0.2413 acc 0.9513
epoch 13 | train loss 0.2006 acc 0.9630 | val loss 0.2362 acc 0.9500
epoch 14 | train loss 0.1950 acc 0.9646 | val loss 0.2325 acc 0.9513
epoch 15 | train loss 0.1921 acc 0

In [25]:
predictions = mlp.predict(test_data)


In [26]:
# This is how you prepare a submission for the competition
predictions_csv = {
    "ID": [],
    "target": [],
}

for i, label in enumerate(predictions):
    predictions_csv["ID"].append(i)
    predictions_csv["target"].append(label)

df = pd.DataFrame(predictions_csv)
df.to_csv("submission.csv", index=False)