In [461]:
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt

In [None]:
test_data = np.load("test_data.npy")
test_label = np.load("test_label.npy")
train_data = np.load("train_data.npy")
train_label = np.load("train_label.npy")

print(train_data.shape)
print(train_label.shape)
print(test_data.shape)
print(test_label.shape)
print(np.unique(train_label))

# Activation Function

In [463]:
class Activation:
    '''
    An Activation object for the purposes of calling Activation functions and their respective derivatives in the MLP.
    '''
    def __relu(self, x):
        return np.maximum(0, x)
  
    def __relu_deriv(self, a):
        return np.heaviside(a, 0)

    def __gelu(self, x):
        return x * (1.0 / (1.0 + np.exp(-1.702 * x)))
    
    def __gelu_deriv(self, a):
        sigmoid = 1.0 / (1.0 + np.exp(-1.702 * a))
        return sigmoid + a * 1.702 * sigmoid * (1 - sigmoid)

    def __softmax(self, z):
        z = np.atleast_2d(z)
        max_z = np.max(z, axis=1)
        z = [z[i] - max_z[i] for i in range(max_z.shape[0])]
        z = np.array(z)
        return np.divide(np.exp(z).T, np.sum(np.exp(z), axis=1)).T

    def __softmax_deriv(self, y, y_hat):
        return y_hat - y

    def __init__(self, activation='relu'):
        if activation == 'relu': 
            self.f = self.__relu
            self.f_deriv = self.__relu_deriv
        elif activation == "softmax":
            self.f = self.__softmax
            self.f_deriv = self.__softmax_deriv
        elif activation == "gelu":
            self.f = self.__gelu
            self.f_deriv = self.__gelu_deriv
        else:
            raise ValueError(f"Unsupported activation function: {activation}")


# Hidden Layer

In [464]:


class HiddenLayer(object):
    def __init__(self, 
                 n_in, 
                 n_out, 
                 activation_last_layer='relu',
                 activation='relu',
                 W=None,
                 b=None,
                 last_hidden_layer=False):
        self.last_hidden_layer = last_hidden_layer
        self.input = None
        self.activation = Activation(activation).f
        self.activation_deriv = None

        if activation_last_layer:
            self.activation_deriv = Activation(activation_last_layer).f_deriv

        # Initialize weights and biases
        self.W = np.random.uniform(low=-np.sqrt(6. / (n_in + n_out)),
                                   high=np.sqrt(6. / (n_in + n_out)),
                                   size=(n_in, n_out))
        self.b = np.zeros(n_out,)
        self.grad_W = np.zeros(self.W.shape)
        self.grad_b = np.zeros(self.b.shape)
        # Adam-specific moment estimates for weights and biases
        self.m_W = np.zeros_like(self.W)
        self.v_W = np.zeros_like(self.W)
        self.m_b = np.zeros_like(self.b)
        self.v_b = np.zeros_like(self.b)
        # Momentum SGD velocity terms
        self.v_W_sgd = np.zeros_like(self.W)
        self.v_b_sgd = np.zeros_like(self.b)
        self.binomial_array = np.zeros(n_out)

        # Batch normalization parameters (only for hidden layers, not output)
        self.use_batch_norm = False  # Will be set by MLP
        self.bn_momentum = 0.9
        self.bn_epsilon = 1e-5
        self.gamma = np.ones(n_out)  # Scale parameter
        self.beta = np.zeros(n_out)  # Shift parameter
        self.running_mean = np.zeros(n_out)
        self.running_var = np.ones(n_out)
        self.grad_gamma = np.zeros_like(self.gamma)
        self.grad_beta = np.zeros_like(self.beta)
        # Adam-specific moment estimates for batch norm parameters
        self.m_gamma = np.zeros_like(self.gamma)
        self.v_gamma = np.zeros_like(self.gamma)
        self.m_beta = np.zeros_like(self.beta)
        self.v_beta = np.zeros_like(self.beta)
        # Cache for batch normalization
        self.bn_input = None
        self.bn_normalized = None
        self.bn_mean = None
        self.bn_var = None

    def batch_norm_forward(self, x, training=True):
        if not self.use_batch_norm or self.last_hidden_layer:
            return x
        if training:
            # Compute batch mean and variance
            self.bn_mean = np.mean(x, axis=0)
            self.bn_var = np.var(x, axis=0)
            # Update running statistics
            self.running_mean = self.bn_momentum * self.running_mean + (1 - self.bn_momentum) * self.bn_mean
            self.running_var = self.bn_momentum * self.running_var + (1 - self.bn_momentum) * self.bn_var
            # Normalize
            self.bn_normalized = (x - self.bn_mean) / np.sqrt(self.bn_var + self.bn_epsilon)
        else:
            # Use running statistics for inference
            self.bn_normalized = (x - self.running_mean) / np.sqrt(self.running_var + self.bn_epsilon)
        # Scale and shift
        out = self.gamma * self.bn_normalized + self.beta
        self.bn_input = x
        return out

    def batch_norm_backward(self, delta):
        if not self.use_batch_norm or self.last_hidden_layer:
            return delta
        batch_size = self.bn_input.shape[0]
        # Gradients for gamma and beta
        self.grad_gamma = np.sum(delta * self.bn_normalized, axis=0)
        self.grad_beta = np.sum(delta, axis=0)
        # Gradient of the normalized input
        d_normalized = delta * self.gamma
        d_var = np.sum(d_normalized * (self.bn_input - self.bn_mean) * -0.5 * (self.bn_var + self.bn_epsilon) ** (-1.5), axis=0)
        d_mean = np.sum(d_normalized * -1 / np.sqrt(self.bn_var + self.bn_epsilon), axis=0) + d_var * np.sum(-2 * (self.bn_input - self.bn_mean), axis=0) / batch_size
        delta = d_normalized / np.sqrt(self.bn_var + self.bn_epsilon) + d_var * 2 * (self.bn_input - self.bn_mean) / batch_size + d_mean / batch_size
        return delta

    @staticmethod
    def dropout_forward(X, p_dropout):
        u = np.random.binomial(1, 1 - p_dropout, size=X.shape) 
        out = X * u
        binomial_array = u
        return out, binomial_array
    
    @staticmethod
    def dropout_backward(delta, binomial_array, layer_num):
        delta *= nn.layers[layer_num - 1].binomial_array
        return delta
    
    def forward(self, input, training=True):
        self.input = input
        lin_output = np.dot(input, self.W) + self.b
        # Apply batch normalization before activation
        bn_output = self.batch_norm_forward(lin_output, training)
        self.output = (
            bn_output if self.activation is None
            else self.activation(bn_output)
        ) 
        if not self.last_hidden_layer and training:
            self.output, self.binomial_array = self.dropout_forward(self.output, DROPOUT_PROB)
        return self.output

    def backward(self, delta, layer_num, output_layer=False):
        if self.activation_deriv and not output_layer:
            delta = delta * self.activation_deriv(self.output)
        if not output_layer:
            delta = self.batch_norm_backward(delta)
        self.grad_W = np.atleast_2d(self.input).T.dot(np.atleast_2d(delta))
        self.grad_b = np.average(delta, axis=0)
        if layer_num != 0 and self.activation_deriv:
            delta = delta.dot(self.W.T)
        if layer_num != 0:
            delta = self.dropout_backward(delta, self.binomial_array, layer_num)
        return delta

# MLP

In [465]:
class MLP:
    def __init__(self, layers, activation=[None, 'gelu', 'relu', 'softmax'], weight_decay=1.0, use_batch_norm=False):
        self.layers = []
        self.activation = activation
        self.weight_decay = weight_decay
        self.use_batch_norm = use_batch_norm
        self.t = 0  # Time step for Adam
        
        for i in range(len(layers)-1):
            last_hidden_layer = (i == len(layers) - 2)
            layer = HiddenLayer(layers[i], 
                                layers[i+1], 
                                activation[i], 
                                activation[i+1],
                                last_hidden_layer=last_hidden_layer)
            layer.use_batch_norm = self.use_batch_norm
            self.layers.append(layer)
            
    def forward(self, input, training=True):
        for layer in self.layers: 
            output = layer.forward(input, training)
            input = output 
        return output

    def CE_loss(self, y, y_hat):
        loss = - np.nansum(y * np.log(y_hat + 1e-15))
        loss = loss / y.shape[0] 
        loss = loss * self.weight_decay
        delta = Activation(self.activation[-1]).f_deriv(y, y_hat)
        return loss, delta

    def backward(self, delta):
        delta = self.layers[-1].backward(delta, len(self.layers) - 1, output_layer=True)
        for layer_num, layer in reversed(list(enumerate(self.layers[:-1]))):
            delta = layer.backward(delta, layer_num)
    
    def update(self, lr, optim_params, optim_type='adam'):
        '''
        Update parameters using either Adam or Momentum SGD.
        '''
        if optim_type == 'adam':
            self.t += 1
            beta1, beta2, epsilon = optim_params['beta1'], optim_params['beta2'], optim_params['epsilon']
            for layer in self.layers:
                # Update weights
                layer.m_W = beta1 * layer.m_W + (1 - beta1) * layer.grad_W
                layer.v_W = beta2 * layer.v_W + (1 - beta2) * (layer.grad_W ** 2)
                m_hat_W = layer.m_W / (1 - beta1 ** self.t)
                v_hat_W = layer.v_W / (1 - beta2 ** self.t)
                layer.W -= lr * m_hat_W / (np.sqrt(v_hat_W) + epsilon)
                # Update biases
                layer.m_b = beta1 * layer.m_b + (1 - beta1) * layer.grad_b
                layer.v_b = beta2 * layer.v_b + (1 - beta2) * (layer.grad_b ** 2)
                m_hat_b = layer.m_b / (1 - beta1 ** self.t)
                v_hat_b = layer.v_b / (1 - beta2 ** self.t)
                layer.b -= lr * m_hat_b / (np.sqrt(v_hat_b) + epsilon)
                # Update batch norm parameters if enabled
                if layer.use_batch_norm and not layer.last_hidden_layer:
                    layer.m_gamma = beta1 * layer.m_gamma + (1 - beta1) * layer.grad_gamma
                    layer.v_gamma = beta2 * layer.v_gamma + (1 - beta2) * (layer.grad_gamma ** 2)
                    m_hat_gamma = layer.m_gamma / (1 - beta1 ** self.t)
                    v_hat_gamma = layer.v_gamma / (1 - beta2 ** self.t)
                    layer.gamma -= lr * m_hat_gamma / (np.sqrt(v_hat_gamma) + epsilon)
                    layer.m_beta = beta1 * layer.m_beta + (1 - beta1) * layer.grad_beta
                    layer.v_beta = beta2 * layer.v_beta + (1 - beta2) * (layer.grad_beta ** 2)
                    m_hat_beta = layer.m_beta / (1 - beta1 ** self.t)
                    v_hat_beta = layer.v_beta / (1 - beta2 ** self.t)
                    layer.beta -= lr * m_hat_beta / (np.sqrt(v_hat_beta) + epsilon)
        elif optim_type == 'sgd_momentum':
            momentum = optim_params['momentum']
            for layer in self.layers:
                layer.v_W_sgd = momentum * layer.v_W_sgd + lr * layer.grad_W
                layer.W -= layer.v_W_sgd
                layer.v_b_sgd = momentum * layer.v_b_sgd + lr * layer.grad_b
                layer.b -= layer.v_b_sgd
                if layer.use_batch_norm and not layer.last_hidden_layer:
                    layer.gamma -= lr * layer.grad_gamma
                    layer.beta -= lr * layer.grad_beta
        else:
            raise ValueError(f"Unsupported optimizer type: {optim_type}")
              
    def fit(self, X, y, learning_rate=0.001, epochs=100, optim_params=None, optim_type='adam', batch_size=1):
        X = np.array(X)
        y = np.array(y)
        training_loss = []
        training_accuracy = []
        testing_accuracy = []

        num_batches = int(np.ceil(X.shape[0] / batch_size))
            
        for k in range(epochs):
            loss = np.zeros(num_batches) 
            current_idx = 0 
            X, y = Utils.shuffle(X, y)

            for batch_idx in range(num_batches):
                y_hat = self.forward(X[current_idx : current_idx + batch_size, :], training=True)
                loss[batch_idx], delta = self.CE_loss(y[current_idx : current_idx + batch_size], y_hat)
                self.backward(delta)
                self.update(learning_rate, optim_params, optim_type)

                if (current_idx + batch_size) > X.shape[0]:
                    batch_size = X.shape[0] - current_idx
                current_idx += batch_size

            test_predict = self.predict(test_df.X)
            train_predict = self.predict(train_df.X)
            test_predict = test_df.decode(test_predict)
            train_predict = train_df.decode(train_predict)
            test_accuracy = np.sum(test_predict == test_label[:, 0]) / test_predict.shape[0]
            train_accuracy = np.sum(train_predict == train_label[:, 0]) / train_predict.shape[0]

            training_loss.append(np.mean(loss))
            training_accuracy.append(train_accuracy)
            testing_accuracy.append(test_accuracy)

            output_dict = {'Training Loss': training_loss, 'Training Accuracy': training_accuracy, 'Testing Accuracy': testing_accuracy}

            print(f'Epoch {k+1}/{epochs} has been trained with Train Loss: {str(round(training_loss[-1], 4))}, Training Accuracy: {str(round(training_accuracy[-1] * 100, 4))}% and Testing Accuracy: {str(round(testing_accuracy[-1] * 100, 4))}%.')
         
        return output_dict
            
    def predict(self, x):
        x = np.array(x)
        output = [self.forward(x[i:i+1, :], training=False) for i in range(x.shape[0])]
        output = np.array(output).squeeze()
        return output

# Preprocessing

In [466]:
class Preprocessing:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.predictions = None

    def normalize(self):     
        norm_data = (self.X - np.min(self.X))/(np.max(self.X) - np.min(self.X))
        self.X = norm_data

    def standardize(self):
        self.X = (self.X - np.mean(self.X)) / np.std(self.X)

    @staticmethod
    def label_encode(label_vector):
        num_classes = np.unique(label_vector).size
        encoded_label_vector = []
        for label in label_vector:
            encoded_label = np.zeros(num_classes)
            encoded_label[int(label)] = 1
            encoded_label_vector.append(encoded_label)
        encoded_label_vector = np.array(encoded_label_vector) 
        return encoded_label_vector
    
    @staticmethod
    def decode(prediction_matrix):
        decoded_predictions = np.zeros(prediction_matrix.shape[0])
        for prediction_idx, prediction_vector in enumerate(prediction_matrix):
            decoded_predictions[prediction_idx] = int(np.argmax(prediction_vector))
        return decoded_predictions

# Utils

In [467]:
class Utils:
    @staticmethod
    def shuffle(X, y):
        shuffled_idx = np.arange(X.shape[0])
        np.random.shuffle(shuffled_idx)
        X = X[shuffled_idx]
        y = y[shuffled_idx]
        return X, y
    
    @staticmethod
    def create_confusion_mat(df):
        confusion_mat = pd.DataFrame(0, index=np.unique(df.y), columns=np.unique(df.y))
        for i in range(0, len(df.y)):
            confusion_mat[int(df.y[i])].iloc[int(df.predictions[i])] += 1
        return confusion_mat
    
    @staticmethod
    def confusion_mat_measures(confusion_matrix):
        scores_df = pd.DataFrame(0, index=confusion_matrix.index, columns=['Precision', 'Recall', 'F1'])
        for i in confusion_matrix.index:
            TP = confusion_matrix[i][i]
            FN = np.array(confusion_matrix[i].iloc[0:i].values.tolist() + confusion_matrix[i].iloc[i+1:].values.tolist()).sum()
            FP = np.array(confusion_matrix.iloc[i][0:i].values.tolist() + confusion_matrix.iloc[i][i + 1:].values.tolist()).sum()
            TN = confusion_matrix.sum().sum() - TP - FN - FP
            Precision = TP / (TP + FP) if (TP + FP) != 0 else 0
            Recall = TP / (TP + FN) if (TP + FN) != 0 else 0
            F1 = (2 * Precision * Recall) / (Precision + Recall) if (Precision + Recall) != 0 else 0
            scores_df.loc[i, 'Precision'] = Precision
            scores_df.loc[i, 'Recall'] = Recall
            scores_df.loc[i, 'F1'] = F1
        scores_df.index.name = 'Label'
        return scores_df

# Training Loop

In [468]:
# Instantiating our data and pre-processing it as required
train_df = Preprocessing(train_data, train_label)
test_df = Preprocessing(test_data, test_label)

# Standardize X matrix (features)
train_df.standardize()
test_df.standardize()

# Perform one-hot encoding for our label vector (ONLY ON TRAIN)
train_df.y = train_df.label_encode(train_df.y)

# Hyperparameters
LAYER_NEURONS = [128, 128, 64, 10]
LAYER_ACTIVATION_FUNCS = [None, 'relu', 'gelu', 'softmax']
LEARNING_RATE = 0.001  # Suitable for Adam; adjust for SGD if used
EPOCHS = 300
DROPOUT_PROB = 0.3
ADAM_PARAMS = {'beta1': 0.9, 'beta2': 0.999, 'epsilon': 1e-8}
SGD_OPTIM = {'momentum': 0.9}  # Momentum parameter for SGD
OPTIM_TYPE = 'adam'  # Can be 'adam' or 'sgd_momentum'
BATCH_SIZE = 32
WEIGHT_DECAY = 0.98
USE_BATCH_NORM = True

# Select optimizer parameters based on type
OPTIM_PARAMS = ADAM_PARAMS if OPTIM_TYPE == 'adam' else SGD_OPTIM

# Instantiate the multi-layer neural network
nn = MLP(LAYER_NEURONS, LAYER_ACTIVATION_FUNCS, weight_decay=WEIGHT_DECAY, use_batch_norm=USE_BATCH_NORM)

# Perform fitting using the training dataset
t0 = time.time()
trial1 = nn.fit(train_df.X, train_df.y, learning_rate=LEARNING_RATE, epochs=EPOCHS, optim_params=OPTIM_PARAMS, optim_type=OPTIM_TYPE, batch_size=BATCH_SIZE)
t1 = time.time()
print(f"============= Model Build Done =============")
print(f"Time taken to build model: {round(t1 - t0, 4)} seconds.")

Epoch 37/500 has been trained with Train Loss: 1.632, Training Accuracy: 48.448% and Testing Accuracy: 47.66%.


KeyboardInterrupt: 

In [None]:
fig, ax = plt.subplots(2, 1, figsize = (20, 10))
ax[0].plot(trial1['Training Loss'])
ax[0].title.set_text("Cross-Entropy Loss over Epoch")
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('Loss')
ax[1].plot(trial1['Training Accuracy'], label = "Training Accuracy")
ax[1].plot(trial1['Testing Accuracy'], label = "Testing Accuracy")
ax[1].title.set_text("Training & Testing Accuracy over Epoch")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Accuracy")
ax[1].legend()
plt.show()

In [457]:
# Perform prediction of the test dataset
test_df.predictions = nn.predict(test_df.X)
train_df.predictions = nn.predict(train_df.X)
test_df.predictions = test_df.decode(test_df.predictions)
train_df.predictions = train_df.decode(train_df.predictions)

In [None]:

# Confusion Matrix
CM = Utils.create_confusion_mat(test_df)
CM  

In [None]:
#Performance Metrics/Measures
measures = Utils.confusion_mat_measures(CM)
measures

In [None]:
# Accuracy & Performance Metrics
test_accuracy = np.sum(test_df.predictions == test_df.y[:, 0]) / test_df.predictions.shape[0]
train_accuracy = np.sum(train_df.predictions == train_label[:, 0]) / train_df.predictions.shape[0]
F1_avg = measures['F1'].mean()
CELoss = trial1['Training Loss'][-1]
print(f'Final Cross-Entropy Training Loss: {round(CELoss, 4)}.')
print(f'Final Train accuracy: {round(train_accuracy * 100, 4)}%.')
print(f'Final Test accuracy: {round(test_accuracy * 100, 4)}%.')
print(f'Final Average F1 Score: {round(F1_avg, 4)}.')