In [142]:
import pickle
import os
import pandas as pd
import numpy as np
import torch

In [144]:
import numpy as np
from scipy.ndimage import rotate, shift, zoom

def augment_mnist_image(
    img,
    rotate_range=15,
    shift_range=2,
    zoom_range=0.1,
    noise_std=0.05,
    flip_horizontal=False,
    flip_vertical=False
):
    """
    Apply random augmentations to a single MNIST image (28x28).

    Parameters
    ----------
    img : np.ndarray
        Input 28×28 image (float32 or uint8)
    rotate_range : float
        Max rotation (± degrees)
    shift_range : int
        Max translation in pixels (for x and y)
    zoom_range : float
        Max zoom percentage (0.1 means between 0.9× and 1.1×)
    noise_std : float
        Standard deviation of Gaussian noise
    flip_horizontal : bool
        Random horizontal flip
    flip_vertical : bool
        Random vertical flip

    Returns
    -------
    np.ndarray
        Augmented 28×28 image
    """

    img = img.astype(np.float32)

    # 1. random rotation
    angle = np.random.uniform(-rotate_range, rotate_range)
    img = rotate(img, angle, reshape=False, mode='nearest')

    # 2. random shift
    shift_x = np.random.uniform(-shift_range, shift_range)
    shift_y = np.random.uniform(-shift_range, shift_range)
    img = shift(img, shift=(shift_x, shift_y), mode='nearest')

    # 3. random zoom
    z = np.random.uniform(1 - zoom_range, 1 + zoom_range)
    zoomed = zoom(img, z)

    # crop or pad to restore 28×28
    if zoomed.shape[0] > 28:
        start = (zoomed.shape[0] - 28) // 2
        img = zoomed[start:start+28, start:start+28]
    else:
        pad = (28 - zoomed.shape[0]) // 2
        img = np.pad(zoomed,
                     ((pad, 28 - zoomed.shape[0] - pad),
                      (pad, 28 - zoomed.shape[0] - pad)),
                     mode='constant')

    # 4. random flips
    if flip_horizontal and np.random.rand() < 0.5:
        img = np.fliplr(img)

    if flip_vertical and np.random.rand() < 0.5:
        img = np.flipud(img)

    # 5. Gaussian noise
    img += np.random.normal(0, noise_std, img.shape)

    # clip to valid range
    img = np.clip(img, 0, 255)

    return img


In [145]:
train_file = "fii-nn-2025-homework-2/extended_mnist_train.pkl"
test_file = "fii-nn-2025-homework-2/extended_mnist_test.pkl"

with open(train_file, "rb") as fp:
    train = pickle.load(fp)

with open(test_file, "rb") as fp:
    test = pickle.load(fp)

train_data = []
train_labels = []
for image, label in train:
    train_data.append(image.flatten())
    train_labels.append(label)

test_data = []
for image, label in test:
    test_data.append(image.flatten())


In [146]:
augmented_train_data = []
augmented_train_labels = []

for img, label in zip(train_data, train_labels):
    img_2d = img.reshape(28, 28)

    # create 3 augmented versions
    for _ in range(3):
        aug = augment_mnist_image(img_2d)
        augmented_train_data.append(aug.flatten())
        augmented_train_labels.append(label)


In [147]:

shuffle_idx = np.random.permutation(len(train_data))
train_data = np.array(train_data)[shuffle_idx]
train_labels = np.array(train_labels)[shuffle_idx]

test_data = np.array(test_data)

print("Train samples: ", len(train_data))
print("Test samples: ", len(test_data))
print("Image shape: ", train_data[1].shape)

Train samples:  60000
Test samples:  10000
Image shape:  (784,)


In [148]:
def normalize(data):
    return data / 255.0

train_data = np.array(normalize(train_data)).astype(np.float64)
test_data = np.array(normalize(test_data)).astype(np.float64)

In [149]:
def xavier_init(n_inputs, n_neurons):
    return np.random.uniform(-np.sqrt(6 / (n_inputs + n_neurons)), np.sqrt(6 / (n_inputs + n_neurons)), (n_inputs, n_neurons)).astype(np.float64)

In [None]:
#Initialization
n_out_neurons = 10
n_hidden_neurons = 100
learning_rate = 0.035
step_size = 5
batch_size = 32
lambda_l1 = np.float64(0)
lambda_l2 = np.float64(0.0001)
dropout_p = 00
epochs = 30
momentum = 0.9  
n_inputs = train_data.shape[1]
weights = [xavier_init(n_inputs, n_hidden_neurons), xavier_init(n_hidden_neurons, n_out_neurons)]  # xavier initialization
biases = [np.zeros(n_hidden_neurons, dtype=np.float64), np.zeros(n_out_neurons, dtype=np.float64)]
momentum_w = [np.zeros_like(weights[0]), np.zeros_like(weights[1])] 
momentum_b = [np.zeros_like(biases[0]), np.zeros_like(biases[1])] 

print("Weights shape: ", [w.shape for w in weights])
print("Biases shape: ", [b.shape for b in biases])

Weights shape:  [(784, 100), (100, 10)]
Biases shape:  [(100,), (10,)]


In [151]:
def softmax(z):
    exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))  # Numerical stability improvement
    return exp_z / np.sum(exp_z, axis=1, keepdims=True)

def softmax_test(z):
    exp_z = np.exp(z)
    return exp_z / np.sum(exp_z)


def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def relu(x):
    return np.clip(x, min = 0)

def relu_deriv(x):
    return (x > 0).astype(np.float64)


In [152]:
def one_hot(batch, output_size):
    result = np.zeros((len(batch), output_size))
    for i, l in enumerate(batch):
        result[i, l] = 1
    return result

In [153]:
def cross_entropy(target, y):
    return -np.sum(target * np.log(y + 1e-10))


In [154]:
def split(data, batch_size):
    return np.array_split(data, len(data) / batch_size)

In [155]:
def forward(x, w, b, training = True): 
    # (32, 784) @ (748, 100) = (32, 100)
    # (32, 100) @ (100, 10) = (32, 10)
    x_in = x
    layers = len(w) 
    y_list = []
    z_list = []
    y_list.append(x)

    #ReLU on the hidden layers
    for layer in range(layers - 1): 
        z = x_in @ w[layer] + b[layer] 
        z_list.append(z)

        y = relu(z)

        if training:
            mask = np.random.binomial(size = len(y[1]), n = 1, p=(1 - dropout_p))
            y = y * mask * (1 / 1 - dropout_p)  
    
        y_list.append(y) 

        x_in = y 

    # Softmax on the last layer (output layer)    
    z = x_in @ w[-1] + b[-1]
    z_list.append(z) 

    y = softmax(z) 
    y_list.append(y) 

    return (y_list, z_list)

In [156]:
def l1_l2_penalty(grad_w, weights, l1_lambda, l2_lambda):
    grad_w += l1_lambda * np.sign(weights) / batch_size + l2_lambda * weights / batch_size
    return grad_w

In [None]:
def backward(y_list, z_list, target, w, b): 
    global momentum_w, momentum_b, momentum
    L = len(w) 
    # softmax on the final layer 
    delta = y_list[-1] - target 
    gradient_w = y_list[-2].T @ delta 
    gradient_b = np.sum(delta, axis=0) 

    # Momentum update for last layer
    momentum_w[L - 1] = momentum * momentum_w[L - 1] - (gradient_w * learning_rate / batch_size)
    momentum_b[L - 1] = momentum * momentum_b[L - 1] - (gradient_b * learning_rate / batch_size)
    w[L - 1] += momentum_w[L - 1]
    b[L - 1] += momentum_b[L - 1]

    for i in reversed(range(0, L - 1)): 
        delta = (delta @ w[i+1].T) * relu_deriv(z_list[i]) 

        gradient_w = y_list[i].T @ delta 
        gradient_b = np.sum(delta, axis = 0) 

        # L1, L2 reg
        gradient_w += lambda_l1 * np.sign(w[i]) / batch_size + lambda_l2 * w[i] / batch_size
        
        # Momentum 
        momentum_w[i] = momentum * momentum_w[i] - (gradient_w * learning_rate / batch_size)
        momentum_b[i] = momentum * momentum_b[i] - (gradient_b * learning_rate / batch_size)

        # Update
        w[i] += momentum_w[i]
        b[i] += momentum_b[i]

In [None]:
# Batch Training
batches = split(train_data, batch_size)
label_batches = split(train_labels, batch_size)
label_batches = [one_hot(batch, n_out_neurons) for batch in label_batches]
print("Batch shape: ", batches[0].shape)
print("Batch label shape: ", label_batches[0].shape)
print("Weights shape: ", [w.shape for w in weights])
print("Biases shape: ", [b.shape for b in biases])
print("Starting training...")

for epoch in range(epochs):
    epoch_loss = np.float64(0.0)

    for x, target in zip(batches, label_batches):
        (y_list, z_list) = forward(x, weights, biases)

        loss = torch.nn.functional.cross_entropy(input = torch.tensor(y_list[-1]), target=torch.tensor(target))
        epoch_loss += loss

        backward(y_list, z_list, target, weights, biases)
    
    # Calculate test accuracy
    test_y_list, _ = forward(test_data, weights, biases, training = False)
    test_predictions = np.argmax(test_y_list[-1], axis=1)
    test_labels = np.array([label for _, label in test])
    test_accuracy = np.mean(test_predictions == test_labels)
    
    print(f"Epoch {epoch + 1} completed. Loss: {epoch_loss / len(batches):.4f}, Test Accuracy: {test_accuracy:.4f}")

Batch shape:  (32, 784)
Batch label shape:  (32, 10)
Weights shape:  [(784, 100), (100, 10)]
Biases shape:  [(100,), (10,)]
Starting training...
Epoch 1 completed. Loss: 1.5633, Test Accuracy: 0.9602
Epoch 2 completed. Loss: 1.5081, Test Accuracy: 0.9678
Epoch 3 completed. Loss: 1.4953, Test Accuracy: 0.9684
Epoch 4 completed. Loss: 1.4877, Test Accuracy: 0.9720
Epoch 5 completed. Loss: 1.4827, Test Accuracy: 0.9734
Epoch 6 completed. Loss: 1.4785, Test Accuracy: 0.9764
Epoch 7 completed. Loss: 1.4754, Test Accuracy: 0.9740
Epoch 8 completed. Loss: 1.4736, Test Accuracy: 0.9720
Epoch 9 completed. Loss: 1.4721, Test Accuracy: 0.9737
Epoch 10 completed. Loss: 1.4704, Test Accuracy: 0.9772
Epoch 11 completed. Loss: 1.4693, Test Accuracy: 0.9773
Epoch 12 completed. Loss: 1.4686, Test Accuracy: 0.9760
Epoch 13 completed. Loss: 1.4676, Test Accuracy: 0.9761
Epoch 14 completed. Loss: 1.4678, Test Accuracy: 0.9758
Epoch 15 completed. Loss: 1.4664, Test Accuracy: 0.9771
Epoch 16 completed. Loss

In [160]:
def test_network(test_data, weights, biases):
    preds = []
    for x in test_data:
        # batchify the sample so forward() works correctly
        x = x.reshape(1, -1)

        y_list, _ = forward(x, weights, biases, training=False)
        output = y_list[-1]           # softmax output (1, num_classes)
        preds.append(np.argmax(output))
    
    return preds


In [161]:
predictions_csv = {
    "ID": [],
    "target": [],
}

predictions = test_network(test_data, weights, biases)

for i, label in enumerate(predictions):
    predictions_csv["ID"].append(i)
    predictions_csv["target"].append(label)

df = pd.DataFrame(predictions_csv)
df.to_csv("submission.csv", index=False)