In [2]:
start_time = time.time()

In [None]:
train_file = "/kaggle/input/fii-nn-2025-homework-3/extended_mnist_train.pkl"
test_file = "/kaggle/input/fii-nn-2025-homework-3/extended_mnist_test.pkl"

with open(train_file, "rb") as fp:
    train = pickle.load(fp)

In [None]:
with open(test_file, "rb") as fp:
    test = pickle.load(fp)

In [4]:
train_data = []
train_labels = []
for image, label in train:
    train_data.append(image.flatten())
    train_labels.append(label)


In [5]:
test_data = []
for image, label in test:
    test_data.append(image.flatten())


In [6]:
# convert to numpy arrays
X_train = np.array(train_data)
y_train = np.array(train_labels)
X_test = np.array(test_data)

# normalize data (255 values for 0-255 black-white images)
X_train = X_train / 255.0
X_test = X_test / 255.0
# shuffle data
shuffle_indexes = np.random.permutation(len(X_train))
X_train = X_train[shuffle_indexes]
y_train = y_train[shuffle_indexes]

# train and validation data split
num_samples = X_train.shape[0]
split_index = int(num_samples * 0.9)
X_val = X_train[split_index:]
y_val = y_train[split_index:]
X_train = X_train[:split_index]
y_train = y_train[:split_index]

# standarizare (z-score normalization)
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0) + 1e-9
X_train = (X_train - mean) / std
X_val = (X_val - mean) / std
X_test = (X_test - mean) / std

def one_hot(y, num_classes):
    # one-hot, meaning:
    # for digit n, we create a vector of size num_classes
    # all values are 0, except the nth position, which is 1
    one_hot_y = np.zeros((y.shape[0], num_classes))
    for row, col in enumerate(y):
        one_hot_y[row, col] = 1 # row = sample, col = digit 0-9
    return one_hot_y

In [7]:
class MLPerceptron:
    def __init__(self, n_inputs, n_hidden, n_outputs, learning_rate=0.5):
        # INPUT TO HIDDEN
        # weight initialized with small random values
        self.W1 = np.random.randn(n_inputs, n_hidden) * np.sqrt(2.0 / n_inputs) #  He initialization
        # bias initialized with 0
        self.b1 = np.zeros(n_hidden)
        # HIDDEN TO OUTPUT
        self.W2 = np.random.randn(n_hidden, n_outputs) * np.sqrt(2.0 / n_hidden) #  He initialization
        self.b2 = np.zeros(n_outputs)
        # learning rate
        self.lr = learning_rate

    def _relu(self, z):
        # rectified linear unit
        return np.maximum(0, z)

    def _relu_derivative(self, z):
        # 1 if z > 0, 0 if z <= 0
        return z > 0

    def _softmax(self, z):
        # softmax activation formula (numerically stable version)
        # subtract max for numerical stability to prevent overflow in predict()
        z_2 = z - np.max(z, axis=1, keepdims=True)
        exp_z = np.exp(z_2)
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def forward(self, X):
        ### forward step
        # basically calculate z1 from a0(X) @ w1 + b1, through activation function
        # a1 = reLU(z1). repeat again from the 2nd layer (hidden to output)
        # get z2, then y_predicted = a2 (if more layers existed)

        # LAYER 1
        # @ = matrix multiplication
        # z = XW + b
        z1 = X @ self.W1 + self.b1 # X <=> a0 - previous action
        # a = activation(z)
        a1 = self._relu(z1)
        # LAYER 2
        z2 = a1 @ self.W2 + self.b2 # a1 - previous action before z2
        y_predicted = self._softmax(z2)
        return z1, a1, z2, y_predicted

    def backward(self, X_batch, y_batch, z1, a1, z2, y_pred):
        ### backward step
        # C = cost function (cross-entropy loss)
        num_samples_batch = X_batch.shape[0]

        ### LAYER 2 (OUTPUT)
        # error of output layer
        # derivata compusa din softmax+cross-entropy loss
        error_term_2 = y_batch - y_pred # (T-Y) = -dC/dz2
        # how sensitive cost function is to changes in weight w2
        # grad_W2 prin chain rule:
        # z2 = a1 @ W2, deci dz2/dW2 produce a1^T
        # dC/dW2 = dz2/dW2 Ã— dLoss/dz2 = a1^T @ error_term_2
        grad_W2 = a1.T @ error_term_2
        # sum, bias added to each sample
        grad_b2 = np.sum(error_term_2, axis = 0)

        ### LAYER 1 (HIDDEN) 
        # error of hidden layer
        # how sensitive loss is to z1
        # propagate error to hidden layer & apply chain rule with relu
        # dC/da1 = (Y-T) @ W2.T
        # error_term_1 = dC/dz1 = dc/da1 * da1/dz1 = ((Y-T) @ W2.T) * relu_derivative(z1)
        error_term_1 = (error_term_2 @ self.W2.T) * self._relu_derivative(z1)
        # grad_W1 prin chain rule:
        # C -> z1 -> W1 => dC/dW1 = dC/dz1 * dz1/dW1
        # z1 = X @ W1 => dz1/dW1 = X.T
        # => grad_W1 ( = dC/dW1) = dz1/dW1 @ error_term 1 = X.T @ error_term_1
        grad_W1 = X_batch.T @ error_term_1
        grad_b1 = np.sum(error_term_1, axis = 0)

        # gradient ascent (error term 1 negated) => gradient descent >
        # update weights and biases
        self.W1 += self.lr * (grad_W1 / num_samples_batch)
        self.b1 += self.lr * (grad_b1 / num_samples_batch)
        self.W2 += self.lr * (grad_W2 / num_samples_batch)
        self.b2 += self.lr * (grad_b2 / num_samples_batch)
    
    def fit(self, X_train, y_train, X_val, y_val, epochs=100, batch_size=60):
        # number of samples
        num_samples_train = X_train.shape[0]
        num_samples_val = X_val.shape[0]
        num_classes = self.b2.shape[0] # always 10 (digits 0 to 9)
        
        y_train_one_hot = one_hot(y_train, num_classes)
        y_val_one_hot = one_hot(y_val, num_classes)

        # learning rate scheduler data initialization
        # https://docs.pytorch.org/docs/stable/generated/torch.optim.lr_scheduler.ReduceLROnPlateau.html
        best_val_loss = float('inf')
        patience_cnt = 0
        patience = 15
        decay_factor = 0.25
        min_lr = 1e-4
        # early stopping based on accuracy improvement
        best_val_acc = 0
        early_stop_patience = 30 # Stop after no improvements for 30 epochs
        no_improve_epochs = 0
        
        print("TRAINING: START!")
        # pass through the dataset EPOCHS times
        for epoch in range(epochs):
            
            # shuffle training data
            perm = np.random.permutation(num_samples_train)
            X_shuffled = X_train[perm]
            y_shuffled = y_train_one_hot[perm]

            # iterate over batches
            for i in range(0, num_samples_train, batch_size):
                X_batch = X_shuffled[i:i+batch_size]
                y_batch = y_shuffled[i:i+batch_size]

                # forward pass
                z1, a1, z2, y_pred = self.forward(X_batch)
                # backward pass
                self.backward(X_batch, y_batch, z1, a1, z2, y_pred)

            # learning rate scheduler
            _, _, _, y_pred_probs_val = self.forward(X_val)
            val_loss = -np.sum(y_val_one_hot * np.log(y_pred_probs_val + 1e-9)) / num_samples_val
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_cnt = 0
            else:
                patience_cnt += 1
                if patience_cnt > patience:
                    initial_lr = self.lr
                    new_lr = self.lr * decay_factor
                    if new_lr >= min_lr:
                        self.lr = new_lr
                        patience_cnt = 0
                        print(f"(!!!) [EPOCH {epoch}] Learning rate reduced from {initial_lr:.5f} to {new_lr:.5f}")
                    else:
                        print(f"Learning rate already at minimum of {min_lr}, not updating.")
                    
            # early stop
            y_pred_classes_val = np.argmax(y_pred_probs_val, axis=1)
            val_acc = np.mean(y_pred_classes_val == y_val)
            if round(val_acc, 5) > best_val_acc:
                best_val_acc = round(val_acc, 5)
                no_improve_epochs = 0
            else:
                no_improve_epochs += 1
            
            if (epoch + 1) % 10 == 0 or epoch == 0:
                # TRAIN DATA
                _, _, _, y_pred_probs_train = self.forward(X_train)
                # cross-entropy loss
                train_loss = -np.sum(y_train_one_hot * np.log(y_pred_probs_train + 1e-9)) / num_samples_train
                # calculate accuracy: compare predicted classes to true classes
                y_pred_classes_train = np.argmax(y_pred_probs_train, axis=1)
                train_acc = np.mean(y_pred_classes_train == y_train)
                # VALIDATION DATA
                # y_pred_probs_val ^^^
                # cross-entropy loss
                # val_loss ^^^
                # calculate accuracy: compare predicted classes to true classes
                # calculated above ^^^
                
                print(f"--- Epoch {epoch + 1}/{epochs} ---")
                print(f"  Train loss: {train_loss:.5f}, train acc: {train_acc:.5f}")
                print(f"  Val   loss: {val_loss:.5f}, val   acc: {val_acc:.5f}")

            # early stop if no improvement
            if no_improve_epochs >= early_stop_patience:
                print(f"(!!!) Early stop! No validation accuracy improvement for {no_improve_epochs} epochs!")
                break

        print("TRAINING: COMPLETE!")

    def predict(self, X):
        # get predicted probabilities (returns a matrix of 10 columns,
        # one for each digit 0-9) and len(X) rows
        _, _, _, y_probabilities = self.forward(X)
        # return the label with highest probability for each row (sample)
        # np.argmax - index cu val max (predicted)
        return np.argmax(y_probabilities, axis=1)


In [8]:
n_inputs = 784 # 28x28 images flattened
n_hidden = 100
n_outputs = 10 # digits 0-9
learning_rate = 0.1
epochs = 300
batch_size= 60

model = MLPerceptron(n_inputs=n_inputs, n_hidden=n_hidden, n_outputs=n_outputs, learning_rate=learning_rate)
# data, labels, no. epochs
model.fit(X_train, y_train, X_val, y_val, epochs=epochs, batch_size=batch_size)

TRAINING: START!
--- Epoch 1/300 ---
  Train loss: 0.14509, train acc: 0.95961
  Val   loss: 0.20448, val   acc: 0.95017
--- Epoch 10/300 ---
  Train loss: 0.01332, train acc: 0.99798
  Val   loss: 0.13673, val   acc: 0.97117
--- Epoch 20/300 ---
  Train loss: 0.00310, train acc: 0.99998
  Val   loss: 0.14703, val   acc: 0.97300
(!!!) [EPOCH 23] Learning rate reduced from 0.10000 to 0.02500
--- Epoch 30/300 ---
  Train loss: 0.00195, train acc: 1.00000
  Val   loss: 0.15097, val   acc: 0.97300
(!!!) [EPOCH 39] Learning rate reduced from 0.02500 to 0.00625
--- Epoch 40/300 ---
  Train loss: 0.00170, train acc: 1.00000
  Val   loss: 0.15247, val   acc: 0.97383
--- Epoch 50/300 ---
  Train loss: 0.00165, train acc: 1.00000
  Val   loss: 0.15265, val   acc: 0.97383
(!!!) [EPOCH 55] Learning rate reduced from 0.00625 to 0.00156
--- Epoch 60/300 ---
  Train loss: 0.00161, train acc: 1.00000
  Val   loss: 0.15292, val   acc: 0.97383
--- Epoch 70/300 ---
  Train loss: 0.00160, train acc: 1.000

In [9]:
predictions = model.predict(X_test)

In [10]:
predictions_csv = {
    "ID": list(range(len(predictions))),
    "target": predictions,
}

df = pd.DataFrame(predictions_csv)
df.to_csv("submission.csv", index=False)

In [11]:
end_time = time.time()
total_time = end_time - start_time
print(f"TOTAL RUNTIME: {total_time:.2f} sec ({int(total_time/60)} min and {(total_time - int(total_time/60) * 60)} sec)")

TOTAL RUNTIME: 73.83 sec (1 min and 13.834747314453125 sec)
