In [2]:
from datetime import datetime
from torcheval.metrics.functional import multiclass_accuracy
from torcheval.metrics.functional import multiclass_f1_score
import gzip
import pickle
import numpy as np
import torch


class DatasetWrapper:
    def __init__(self, train_set_elements, train_set_labels):
        self.elements = train_set_elements
        self.labels = train_set_labels


def get_datasets():
    with gzip.open("mnist.pkl.gz", "rb") as fd:
        initial_train_set, valid_set, initial_test_set = pickle.load(fd, encoding="latin")

    # Combined the train set and the test set
    train_set_x = torch.tensor(np.concatenate([initial_train_set[0], initial_test_set[0]]), dtype=torch.float32)
    train_set_y = torch.tensor(np.concatenate([initial_train_set[1], initial_test_set[1]]), dtype=torch.long)

    valid_set_x = torch.tensor(valid_set[0], dtype=torch.float32)
    valid_set_y = torch.tensor(valid_set[1], dtype=torch.long)

    print(f'Loaded training dataset: {train_set_x.shape}')
    print(f'Loaded validation dataset: {valid_set_x.shape}')

    return DatasetWrapper(train_set_x, train_set_y), DatasetWrapper(valid_set_x, valid_set_y)


def init_kaiming_uniform(tensor, fan_in):
    """
    Because we are using the ReLU activation function, initializing the weights using the
    Kaiming uniform method is recommended.
    https://ai.stackexchange.com/questions/32247/what-is-the-analytical-formula-for-kaiming-he-probability-density-function
    :param tensor: The tensor we want to initialize the weights with.
    :param fan_in: Number of input units
    :return: The initialized weights
    """
    gain = torch.sqrt(torch.tensor(2.0))
    std = gain / torch.sqrt(torch.tensor(fan_in, dtype=torch.float32))
    bound = torch.sqrt(torch.tensor(3.0)) * std
    with torch.no_grad():
        tensor.uniform_(-bound, bound)


def relu(x):
    return torch.maximum(x, torch.tensor(0.0))


def softmax(x, dim=1):
    exps = torch.exp(x - torch.max(x, dim=dim, keepdim=True).values)
    return exps / exps.sum(dim=dim, keepdim=True)


class DigitClassifier:
    def __init__(self):
        # These values will be used later
        self.output = None
        self.hidden_activation = None
        self.hidden_output = None
        
        # Input layer - 784 units
        # Hidden Layer - 100 units
        self.hidden_weight = torch.zeros(784, 100)
        self.hidden_bias = torch.zeros(100)

        # Hidden layer - 100 units
        # Output layer - 10 units
        self.output_weight = torch.zeros(100, 10)
        self.output_bias = torch.zeros(10)

        # Initialize the weights
        init_kaiming_uniform(self.hidden_weight, 784)
        init_kaiming_uniform(self.output_weight, 100)

    def forward(self, x):
        # Manual forward propagation
        self.hidden_output = x @ self.hidden_weight + self.hidden_bias
        # Use ReLU for the hidden layer
        self.hidden_activation = relu(self.hidden_output)
        self.output = self.hidden_activation @ self.output_weight + self.output_bias
        # Use softmax for the output layer
        return softmax(self.output, dim=1)


    def backward(self, x, y, output):
        batch_size = y.size(0)

        # Backpropagation through the output layer (softmax + gradient descent)
        d_output = output.clone()
        d_output[range(batch_size), y] -= 1  # Gradient of softmax and cross-entropy combined
        d_output /= batch_size

        # Gradients calculated based on output layer weights and biases
        dw_output = self.hidden_activation.T @ d_output
        db_output = d_output.sum(dim=0)

        # Backpropagation through the hidden layer
        d_hidden_activation = d_output @ self.output_weight.T
        d_hidden_output = d_hidden_activation.clone()
        # ReLU derivative: 0 if input <= 0, 1 otherwise
        d_hidden_output[self.hidden_output <= 0] = 0

        dw_hidden = x.T @ d_hidden_output
        db_hidden = d_hidden_output.sum(dim=0)

        # Manually set gradients
        self.hidden_weight.grad = dw_hidden
        self.hidden_bias.grad = db_hidden
        self.output_weight.grad = dw_output
        self.output_bias.grad = db_output
        
def write_data_set_scores(prediction, file, dataset_wrapper: DatasetWrapper):
    """
    Method writes the training results in a file.
    :param prediction: The predictions on the dataset, at the end of the training loop.
    :param file: The file in which the results should be written to.
    :param dataset_wrapper: The class containing the dataset elements and labels
    :return: Void method
    """

    file.write(f"Accuracy: {multiclass_accuracy(prediction, dataset_wrapper.labels)}\n")
    file.write(f"F1-Score: {multiclass_f1_score(prediction, dataset_wrapper.labels)}\n")
    file.write(f"All Classes Accuracy: \n")
    for cls, value in enumerate(multiclass_accuracy(prediction, dataset_wrapper.labels, average=None, num_classes=10)):
        file.write(f"     {cls}: {value}\n")
    file.write(f"All Classes F1-Score: \n")
    for cls, value in enumerate(multiclass_f1_score(prediction, dataset_wrapper.labels, average=None, num_classes=10)):
        file.write(f"     {cls}: {value}\n")


def cross_entropy_loss(output, target):
    """
    Method computes the cross entropy loss.
    :param output: This is the predicted output from the model, typically after applying softmax
    :param target: This is a tensor containing the true class labels (integers) for each sample, of shape.
    :return: Returns the cross entropy loss.
    """
    # log_probs = torch.log(output) # This takes the natural logarithm of the predicted probabilities
    # return -torch.mean(torch.gather(log_probs, 1, target.unsqueeze(1)))
    batch_size = output.size(0)
    num_classes = output.size(1)

    log_probs = torch.log(output)  #Calculate the logarithm of the predicted probabilities

    # Create one-hot encoded target tensor of shape (batch_size, num_classes)
    one_hot_target = torch.zeros(batch_size, num_classes).to(output.device)
    one_hot_target[range(batch_size), target] = 1

    # Calculate the cross-entropy loss using the formula in Course 1
    loss = -torch.sum(one_hot_target * log_probs) / batch_size

    return loss


def train_network():
    start = datetime.now()
    train_set, validation_set = get_datasets()

    lr = 0.005
    epochs = 100
    batch_size = 32

    model = DigitClassifier()

    for epoch in range(epochs):
        for i in range(0, len(train_set.elements), batch_size):
            images = train_set.elements[i: i + batch_size].view(-1, 784)
            labels = train_set.labels[i: i + batch_size]
            output = model.forward(images)

            model.backward(images, labels, output)

            # Disable autograd for manual update
            with torch.no_grad():
                for param in [model.hidden_weight, model.hidden_bias, model.output_weight, model.output_bias]:
                    param -= lr * param.grad

            # Zero gradients for the next step
            for param in [model.hidden_weight, model.hidden_bias, model.output_weight, model.output_bias]:
                param.grad.zero_()

        print(f'Epoch {epoch + 1} with loss'
              f' {cross_entropy_loss(model.forward(train_set.elements.view(-1, 784)), train_set.labels)}')

    train_set_file = open("train_set.txt", "w")
    valid_set_file = open("valid_set.txt", "w")
    with torch.no_grad():
        predicted_train = model.forward(train_set.elements)
    write_data_set_scores(predicted_train, train_set_file, train_set)
    train_set_file.close()
    with torch.no_grad():
        predicted_valid = model.forward(validation_set.elements)
    write_data_set_scores(predicted_valid, valid_set_file, validation_set)
    valid_set_file.close()

    end = datetime.now()

    print(f'Runtime: {end - start}')


In [3]:
train_network()

Loaded training dataset: torch.Size([60000, 784])
Loaded validation dataset: torch.Size([10000, 784])
Epoch 1 with loss 0.4783589839935303
Epoch 2 with loss 0.37855955958366394
Epoch 3 with loss 0.33716341853141785
Epoch 4 with loss 0.31142786145210266
Epoch 5 with loss 0.29236891865730286
Epoch 6 with loss 0.27691611647605896
Epoch 7 with loss 0.2636857330799103
Epoch 8 with loss 0.25213858485221863
Epoch 9 with loss 0.2416916936635971
Epoch 10 with loss 0.2321300059556961
Epoch 11 with loss 0.22328853607177734
Epoch 12 with loss 0.2150094211101532
Epoch 13 with loss 0.20727549493312836
Epoch 14 with loss 0.20003801584243774
Epoch 15 with loss 0.19321204721927643
Epoch 16 with loss 0.18687781691551208
Epoch 17 with loss 0.18092043697834015
Epoch 18 with loss 0.1753610223531723
Epoch 19 with loss 0.17017148435115814
Epoch 20 with loss 0.16527734696865082
Epoch 21 with loss 0.1606440395116806
Epoch 22 with loss 0.15630143880844116
Epoch 23 with loss 0.1521693468093872
Epoch 24 with loss

  num_correct = mask.new_zeros(num_classes).scatter_(0, target, mask, reduce="add")
