## Download Vehicle MPG Dataset

In [1]:
from ucimlrepo import fetch_ucirepo
import pandas as pd

vehicle_mpg = fetch_ucirepo(id=9)

X = vehicle_mpg.data.features
y = vehicle_mpg.data.targets

data = pd.concat([X, y], axis=1)

cleaned_data = data.dropna()

X = cleaned_data.iloc[:, :-1]
y = cleaned_data.iloc[:, -1]

rows_removed = data.shape[0] - cleaned_data.shape[0]
print(f"Rows removed: {rows_removed}")

Rows removed: 6


## Data Splitting and Standardization

In [2]:
from sklearn.model_selection import train_test_split
X_train, X_leftover, y_train, y_leftover = train_test_split(X, y, test_size=0.3, random_state=42, shuffle = True)
X_val, X_test, y_val, y_test = train_test_split(X_leftover, y_leftover, test_size=0.5, random_state=42, shuffle = True)

X_mean = X_train.mean(axis=0)
X_std = X_train.std(axis=0)

X_train = ((X_train - X_mean) / X_std).to_numpy()
X_val = ((X_val - X_mean) / X_std).to_numpy()
X_test = ((X_test - X_mean) / X_std).to_numpy()

y_mean = y_train.mean()
y_std = y_train.std()

y_train = ((y_train - y_mean) / y_std).to_numpy().reshape(-1, 1)
y_val = ((y_val - y_mean) / y_std).to_numpy().reshape(-1, 1)
y_test = ((y_test - y_mean) / y_std).to_numpy().reshape(-1, 1)

In [3]:
X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape

((274, 7), (274, 1), (59, 7), (59, 1), (59, 7), (59, 1))

## Load MNIST

In [4]:
# #
# # This code comes from: https://www.kaggle.com/code/hojjatk/read-mnist-dataset
# #
# import matplotlib
# matplotlib.use('TkAgg')
# import numpy as np  # linear algebra
# import struct
# from array import array
# from os.path import join
# import random
# import matplotlib.pyplot as plt


# #
# # MNIST Data Loader Class
# #
# class MnistDataloader(object):
#     def __init__(self, training_images_filepath, training_labels_filepath,
#                  test_images_filepath, test_labels_filepath):
#         self.training_images_filepath = training_images_filepath
#         self.training_labels_filepath = training_labels_filepath
#         self.test_images_filepath = test_images_filepath
#         self.test_labels_filepath = test_labels_filepath

#     def read_images_labels(self, images_filepath, labels_filepath):
#         labels = []
#         with open(labels_filepath, 'rb') as file:
#             magic, size = struct.unpack(">II", file.read(8))
#             if magic != 2049:
#                 raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
#             labels = array("B", file.read())

#         with open(images_filepath, 'rb') as file:
#             magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
#             if magic != 2051:
#                 raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
#             image_data = array("B", file.read())
#         images = []
#         for i in range(size):
#             images.append([0] * rows * cols)
#         for i in range(size):
#             img = np.array(image_data[i * rows * cols:(i + 1) * rows * cols])
#             img = img.reshape(28, 28)
#             images[i][:] = img

#         return images, labels

#     def load_data(self):
#         x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
#         x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
#         return (np.array(x_train), np.array(y_train)),(np.array(x_test), np.array(y_test))

# #
# # Set file paths based on added MNIST Datasets
# #
# input_path = './data'
# training_images_filepath = join(input_path, 'train-images.idx3-ubyte')
# training_labels_filepath = join(input_path, 'train-labels-idx1-ubyte')
# test_images_filepath = join(input_path, 't10k-images.idx3-ubyte')
# test_labels_filepath = join(input_path, 't10k-labels.idx1-ubyte')

# #
# # Helper function to show a list of images with their relating titles
# #
# def show_images(images, title_texts):
#     cols = 5
#     rows = int(len(images)/cols) + 1
#     plt.figure(figsize=(30,20))
#     index = 1
#     for x in zip(images, title_texts):
#         image = x[0]
#         title_text = x[1]
#         plt.subplot(rows, cols, index)
#         plt.imshow(image, cmap=plt.cm.gray)
#         if (title_text != ''):
#             plt.title(title_text, fontsize=15)
#         index += 1
#     plt.show()

# #
# # Load MINST dataset
# #
# mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
# (x_train, y_train), (x_test, y_test) = mnist_dataloader.load_data()


# np.save('./data/mnist-train-x.npy', x_train.reshape(len(x_train), 784))
# np.save('./data/mnist-train-y.npy', y_train)
# np.save('./data/mnist-test-x.npy', x_test.reshape(len(x_test), 784))
# np.save('./data/mnist-test-y.npy', y_test)

# #
# # Show some random training and test images
# #
# images_2_show = []
# titles_2_show = []
# for i in range(0, 10):
#     r = random.randint(1, 60000)
#     images_2_show.append(x_train[r])
#     titles_2_show.append('training image [' + str(r) + '] = ' + str(y_train[r]))

# for i in range(0, 5):
#     r = random.randint(1, 10000)
#     images_2_show.append(x_test[r])
#     titles_2_show.append('test image [' + str(r) + '] = ' + str(y_test[r]))

# show_images(images_2_show, titles_2_show)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from typing import Tuple
import math

def batch_generator(train_x, train_y, batch_size):
    """
    Generator that yields batches of train_x and train_y.

    :param train_x (np.ndarray): Input features of shape (n, f).
    :param train_y (np.ndarray): Target values of shape (n, q).
    :param batch_size (int): The size of each batch.

    :return tuple: (batch_x, batch_y) where batch_x has shape (B, f) and batch_y has shape (B, q). The last batch may be smaller.
    """
    assert len(train_x) == len(train_y), "Number of samples in X and y do not match."

    batch_x = np.array_split(train_x, math.ceil(len(train_x) / batch_size), axis=0)
    batch_y = np.array_split(train_y, math.ceil(len(train_y) / batch_size), axis=0)
    return batch_x, batch_y


class ActivationFunction(ABC):
    @abstractmethod
    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        Computes the output of the activation function, evaluated on x

        Input args may differ in the case of softmax

        :param x (np.ndarray): input
        :return: output of the activation function
        """
        pass

    @abstractmethod
    def derivative(self, x: np.ndarray) -> np.ndarray:
        """
        Computes the derivative of the activation function, evaluated on x
        :param x (np.ndarray): input
        :return: activation function's derivative at x
        """
        pass


class Sigmoid(ActivationFunction):

    def forward(self, x):
        sigma_x = 1 / (1 + np.exp(-x))
        return sigma_x
    
    def derivative(self, x):
        sigma_x = self.forward(x)
        return sigma_x * (1-sigma_x)           

class Tanh(ActivationFunction):
    
    def forward(self, x):
        tanh_x = (np.exp(x) - np.exp(-x))/(np.exp(x) + np.exp(-x))
        return tanh_x
    
    def derivative(self, x):
        tanh_x = self.forward(x)
        return 1 - (tanh_x)**2

class Relu(ActivationFunction):

    def forward(self, x):
        return np.max(0, x)
    
    def derivative(self, x):
        return np.where(x > 0, 1, 0)
    

class Softmax(ActivationFunction): 

    def forward(self, x):
        shift_x = x - np.max(x, axis=1, keepdims=True)
        exps = np.exp(shift_x)
        return exps / np.sum(exps, axis=1, keepdims=True)
    
    def derivative(self):
        raise NotImplementedError("Softmax derivative is not implemented")

class Linear(ActivationFunction):

    def forward(self, x):
        return x
    
    def derivative(self, x):
        return np.ones_like(x)

class Softplus(ActivationFunction):

    def forward(self, x):
        return np.log1p(np.exp(x))
    
    def derivative(self, x):
        return 1 / (1 + np.exp(-x))
    
class Mish(ActivationFunction):

    def forward(self, x):
        softplus = Softplus()
        return x * np.tanh(softplus.forward(x))
    
    def derivative(self, x):
        softplus = Softplus()
        tanh_softplus = np.tanh(softplus.forward(x))
        sigmoid = Sigmoid.forward(x)
        return tanh_softplus + x * sigmoid * (1 - tanh_softplus**2)
    

class LossFunction(ABC):
    @abstractmethod
    def loss(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pass

    @abstractmethod
    def derivative(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pass

class SquaredError(LossFunction):

    def loss(self, y_true, y_pred):
        return 0.5 * np.mean(np.square(y_pred - y_true))
    
    def derivative(self, y_true, y_pred):
        return (y_pred - y_true) / y_true.shape[0]
        

class CrossEntropy(LossFunction):
    def loss(self, y_true, y_pred):
        # Clip predictions to avoid log(0) issues
        eps = 1e-12
        y_pred = np.clip(y_pred, eps, 1. - eps)
        # Compute cross-entropy loss averaged over the batch.
        return -np.mean(np.sum(y_true * np.log(y_pred), axis=1))
    
    def derivative(self, y_true, y_pred):
        return (y_pred - y_true) / y_true.shape[0]

class Layer:
    def __init__(self, fan_in: int, fan_out: int, activation_function: ActivationFunction, loss_function: LossFunction, dropout_rate: float = 0.0):
        """
        Initializes a layer of neurons

        :param fan_in: number of neurons in previous (presynpatic) layer
        :param fan_out: number of neurons in this layer
        :param activation_function: instance of an ActivationFunction
        """
        self.fan_in = fan_in
        self.fan_out = fan_out
        self.activation_function = activation_function
        self.loss_function = loss_function

        # this will store the activations (forward prop)
        self.activations = None
        # this will store the delta term (dL_dPhi, backward prop)
        self.delta = None
        self.input = None
        self.Z = None
        self.dropout_rate = dropout_rate

        # Initialize weights and biaes
        limit = np.sqrt(6 / (self.fan_in + self.fan_out))
        self.W = np.random.uniform(-limit, limit, (self.fan_in, self.fan_out)) #weights
        # print(self.W.shape)
        self.b = np.zeros((1, self.fan_out))  # biases

    # def predict(self, x: np.ndarray):
    #    return self.W @ x.T + self.b

    def forward(self, h: np.ndarray, training: bool = True) -> np.ndarray:
        """
        Computes the activations for this layer

        :param h: input to layer
        :return: layer activations
        """
        self.input = h
        self.Z = h @ self.W + self.b
        activation = self.activation_function.forward(self.Z)

        if training and self.dropout_rate > 0:
            keep_prob = 1 - self.dropout_rate
            mask = np.random.binomial(1, keep_prob, size=activation.shape) / keep_prob
            activation = activation * mask
        self.activations = activation
        return activation

    def backward(self, delta: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Apply backpropagation to this layer and return the weight and bias gradients

        :param h: input to this layer
        :param delta: delta term from layer above
        :return: (weight gradients, bias gradients)
         """

        act_deriv = self.activation_function.derivative(self.Z)
        self.delta = np.dot(np.multiply(delta, act_deriv), self.W.T)
        dL_dW = np.dot(self.input.T, np.multiply(delta, act_deriv))
        dL_db = np.sum(np.multiply(delta, act_deriv))
        return dL_dW, dL_db


class MultilayerPerceptron:
    def __init__(self, layers: Tuple[Layer]):
        """
        Create a multilayer perceptron (densely connected multilayer neural network)
        :param layers: list or Tuple of layers
        """
        self.layers = layers

    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        This takes the network input and computes the network output (forward propagation)
        :param x: network input
        :return: network output
        """
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, loss_grad: np.ndarray) -> Tuple[list, list]:
        """
        Applies backpropagation to compute the gradients of the weights and biases for all layers in the network
        :param loss_grad: gradient of the loss function
        :param input_data: network's input data
        :return: (List of weight gradients for all layers, List of bias gradients for all layers)
        """
        dl_dw_all = []
        dl_db_all = []
        delta = loss_grad

        for i, layer in enumerate(reversed(self.layers)):
            dl_dw, dl_db = layer.backward(delta)
            dl_dw_all.insert(0, dl_dw)
            dl_db_all.insert(0, dl_db)
            # Only propagate delta if this is not the input layer
            if i < len(self.layers) - 1:
                delta = layer.delta
        return dl_dw_all, dl_db_all

    def train(self, train_x: np.ndarray, train_y: np.ndarray, val_x: np.ndarray, val_y: np.ndarray, loss_func: LossFunction, learning_rate: float=1E-3, batch_size: int=16, epochs: int=32) -> Tuple[np.ndarray, np.ndarray]:
        """
        Train the multilayer perceptron

        :param train_x: full training set input of shape (n x d) n = number of samples, d = number of features
        :param train_y: full training set output of shape (n x q) n = number of samples, q = number of outputs per sample
        :param val_x: full validation set input
        :param val_y: full validation set output
        :param loss_func: instance of a LossFunction
        :param learning_rate: learning rate for parameter updates
        :param batch_size: size of each batch
        :param epochs: number of epochs
        :return:
        """
        x_batches, y_batches = batch_generator(train_x, train_y, batch_size)

        training_losses = []
        validation_losses = []

        for epoch in range(epochs):
            total_loss = 0
            bi = 0
            for bx, by in zip(x_batches, y_batches):
                y_pred = self.forward(bx)
                # print("y_pred shape:", y_pred.shape, "y_true shape:", by.shape)

                loss = loss_func.loss(by, y_pred)
                total_loss += loss
                loss_grad = loss_func.derivative(by, y_pred)
                weight_grad, bias_grad = self.backward(loss_grad)
                for i, layer in enumerate(self.layers):
                    layer.W -= learning_rate * weight_grad[i]
                    layer.b -= learning_rate * bias_grad[i]
                bi += 1
            val_loss = loss_func.loss(val_y, self.forward(val_x))
            train_loss = total_loss / len(x_batches)

            training_losses.append(train_loss)
            validation_losses.append(val_loss)

            print(f"Epoch {epoch+1}  :::  Train Loss={train_loss}  :::  Val Loss={val_loss}")
        return np.array(training_losses), np.array(validation_losses)
    

In [None]:
mlp = mlp = MultilayerPerceptron([
    Layer(fan_in=X_train.shape[1], fan_out=7, activation_function=Linear(), loss_function=SquaredError(), dropout_rate=0.2),
    Layer(fan_in=7, fan_out=7, activation_function=Linear(), loss_function=SquaredError(), dropout_rate=0.2),
    Layer(fan_in=7, fan_out=1, activation_function=Linear(), loss_function=SquaredError())
    # Layer(fan_in=4, fan_out=1, activation_function=Sigmoid(), loss_function=SquaredError())
])
    
training_losses, validation_losses = mlp.train(
    train_x=X_train,
    train_y=y_train,
    val_x=X_val,
    val_y=y_val,
    loss_func=SquaredError(),
    learning_rate=1E-3,
    batch_size=16,
    epochs=32
)

Epoch 1  :::  Train Loss=0.7872392750442238  :::  Val Loss=0.6819663289711323
Epoch 2  :::  Train Loss=0.5478916906533422  :::  Val Loss=0.5360706169806865
Epoch 3  :::  Train Loss=0.4363871830099957  :::  Val Loss=0.45765672789038986
Epoch 4  :::  Train Loss=0.37465248362263187  :::  Val Loss=0.40678656754229764
Epoch 5  :::  Train Loss=0.33409583674235965  :::  Val Loss=0.36870676413051146
Epoch 6  :::  Train Loss=0.30380252011131764  :::  Val Loss=0.3377142206662117
Epoch 7  :::  Train Loss=0.2793605816864061  :::  Val Loss=0.31140948458750783
Epoch 8  :::  Train Loss=0.25881032307886975  :::  Val Loss=0.28862984604646136
Epoch 9  :::  Train Loss=0.24115510163397497  :::  Val Loss=0.2687012844071395
Epoch 10  :::  Train Loss=0.22580386424449334  :::  Val Loss=0.25116381798352694
Epoch 11  :::  Train Loss=0.2123557719787918  :::  Val Loss=0.2356672281879073
Epoch 12  :::  Train Loss=0.2005123804638482  :::  Val Loss=0.22192859494696873
Epoch 13  :::  Train Loss=0.1900386519266676  ::