In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
import time

# **Available activation functions**

In [3]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    return x * (1 - x)

def linear(x):
    return x

def linear_derivative(x):
    return np.ones_like(x)

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x > 0, 1, 0)

def leaky_relu(x, alpha=0.01):
    return np.where(x > 0, x, alpha * x)

def leaky_relu_derivative(x, alpha=0.01):
    return np.where(x > 0, 1, alpha)

def tanh(x):
    return np.tanh(x)

def tanh_derivative(x):
    return 1 - x**2

def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)

# **Single layer definition**

In [4]:
class Layer:
    
    def __init__(self, input_dim, output_dim, activation='sigmoid', weights_init='uniform'):
        """
        Initializes a single layer in the neural network.

        Parameters:
        - input_dim (int): Number of input neurons (features).
        - output_dim (int): Number of output neurons.
        - activation (str, optional): Activation function to use ('sigmoid' by default). 
          Supported: 'sigmoid', 'linear', 'relu', 'tanh', 'softmax'.
        - weights_init (str, optional): Weight initialization method ('uniform' by default).
          Available methods:
            - 'uniform' : Weights initialized from a uniform distribution.
            - 'normal'  : Weights initialized from a normal distribution.
            - 'xavier'  : Xavier/Glorot initialization (good for tanh).
            - 'he'      : He initialization (good for ReLU).

        Raises:
        - ValueError: If an unknown weight initialization method or activation function is provided.
        """
        
        weight_initializers = {
            'uniform': lambda: np.random.uniform(0, 1, size=(input_dim, output_dim)),
            'normal': lambda: np.random.randn(input_dim, output_dim),  
            'xavier': lambda: np.random.randn(input_dim, output_dim) * np.sqrt(2 / (input_dim + output_dim)),
            'he': lambda: np.random.randn(input_dim, output_dim) * np.sqrt(2 / input_dim) 
        }

        if weights_init not in weight_initializers:
            raise ValueError(f"Unknown initialization method: {weights_init}. Available: {list(weight_initializers.keys())}")

        self.weights = weight_initializers[weights_init]()
        self.biases = np.zeros((1, output_dim))
        
        activation_functions = {
            'sigmoid' : sigmoid,
            'linear' : linear,
            'relu' : relu,
            'tanh' : tanh,
            'softmax' : softmax,
            'leaky_relu' : leaky_relu
        }
        
        activation_derivatives = {  
            'sigmoid': sigmoid_derivative,
            'linear': linear_derivative,
            'relu' : relu_derivative,
            'tanh' : tanh_derivative,
            'leaky_relu' : leaky_relu_derivative
        }

        self.activation = activation_functions.get(activation)
        self.activation_derivative = activation_derivatives.get(activation)
        
        if self.activation is None:
            raise ValueError(f"Unknown activation function: {activation}. Available: {list(activation_functions.keys())}")

        
    def forward(self, X):
        """
        Performs forward propagation through the layer.

        Parameters:
        - X (numpy array): Input data (shape: number of samples × input features).

        Returns:
        - numpy array: Activated output of the layer.
        """
        return self.activation(np.dot(X, self.weights) + self.biases)


# **MLP**

In [5]:
class MLP:
    
    def __init__(self, layers, weights_init='uniform', classification=False):
        """
        Initializes the multi-layer perceptron (MLP).
        
        Parameters:
        - layers (list of dicts): Each dictionary defines a Layer object with keys:
            - 'input_dim' (int): Number of input neurons.
            - 'output_dim' (int): Number of output neurons.
            - 'activation' (str): Activation function name.
        - weights_init (str, optional): Method for weight initialization ('uniform' by default).
        """
        self.layers = [Layer(layer['input_dim'], layer['output_dim'], layer['activation'], weights_init) for layer in layers]
        self.classification = classification
        self.weights_history = []


    def feedforward(self, X):
        """
        Performs forward propagation through the network.
        
        Parameters:
        - X (numpy array): Input data (shape: number of samples × input features).

        Returns:
        - activations (list of numpy arrays): Activations of each layer including input and output.
        """
        activations = [X]
        for i, layer in enumerate(self.layers):
            activations.append(layer.forward(activations[i]))
        return activations


    def predict(self, X):
        """
        Computes the network's output for given input data.
        
        Parameters:
        - X (numpy array): Input data.

        Returns:
        - numpy array: The final network output (predicted values).
        """
        if self.classification:
            preds = np.array(self.feedforward(X)[-1])
            return np.argmax(preds, axis=1).flatten()
        return np.array(self.feedforward(X)[-1]).reshape(-1,)


    def train(self, 
        X_train, 
        y_train, 
        epochs=500, 
        learning_rate=0.1, 
        batch_size=None,
        optimizer=None, 
        beta=0.9,  
        epsilon=1e-8 
        ):
        """
        Trains the neural network using mini-batch gradient descent and backpropagation.

        Parameters:
        - X_train (numpy array): Training input data.
        - y_train (numpy array): Training target values.
        - epochs (int, optional): Number of training iterations (default is 500).
        - learning_rate (float, optional): Step size for weight updates (default is 0.1).
        - batch_size (int, optional): Number of samples per batch. If None, full-batch is used.
        - optimizer (str, optional): 'momentum' or 'rmsprop' to apply these optimizers. Default is None.
        - beta (float, optional): Momentum and RMSProp smoothing constant. Default is 0.9.
        - epsilon (float, optional): Small constant for numerical stability in RMSProp. Default is 1e-8.

        Returns:
        - None (prints training progress and plots weight evolution).
        """
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        loss = []
        self.weights_history = []

        if batch_size is None:
            batch_size = len(X_train)
            
        if self.classification:
            y_train_oh = self.one_hot_encode(y_train)

        if optimizer == 'momentum':
            velocity_w = [np.zeros_like(layer.weights) for layer in self.layers]
            velocity_b = [np.zeros_like(layer.biases) for layer in self.layers]
        elif optimizer == 'rmsprop':
            caches_w = [np.zeros_like(layer.weights) for layer in self.layers]
            caches_b = [np.zeros_like(layer.biases) for layer in self.layers]

        for epoch in range(epochs):
            indices = np.arange(len(X_train))
            np.random.shuffle(indices)

            for i in range(0, len(X_train), batch_size):
                batch_indices = indices[i:i + batch_size]
                X_batch = X_train[batch_indices]
                y_batch = y_train[batch_indices].reshape(-1, 1) if not self.classification else y_train_oh[batch_indices]

                weights_error = [np.zeros_like(layer.weights) for layer in self.layers]
                biases_error = [np.zeros_like(layer.biases) for layer in self.layers]

                activations = self.feedforward(X_batch)

                weights_error, biases_error = self.backpropagate(activations, y_batch)

                for j, layer in enumerate(self.layers):
                    if optimizer == 'momentum':
                        velocity_w[j] = beta * velocity_w[j] + learning_rate * weights_error[j]
                        layer.weights -= velocity_w[j]
                        velocity_b[j] = beta * velocity_b[j] + learning_rate * biases_error[j]
                        layer.biases -= velocity_b[j]
                    elif optimizer == 'rmsprop':
                        caches_w[j] = beta * caches_w[j] + (1 - beta) * np.square(weights_error[j])
                        layer.weights -= learning_rate * weights_error[j] / (np.sqrt(caches_w[j]) + epsilon)
                        caches_b[j] = beta * caches_b[j] + (1 - beta) * np.square(biases_error[j])
                        layer.biases -= learning_rate * biases_error[j] / (np.sqrt(caches_b[j]) + epsilon)
                    else:
                        layer.weights -= learning_rate * weights_error[j]
                        layer.biases -= learning_rate * biases_error[j]

            self.weights_history.append([layer.weights.copy() for layer in self.layers])

            if not self.classification:
                y_pred = self.predict(X_train)
                tmp_loss = self.mse(y_pred, y_train)   
                print(f"\nEpoch {epoch + 1}/{epochs}. Loss = {tmp_loss}.")
            else:
                y_pred_oh = self.feedforward(X_train)[-1]
                y_pred = np.argmax(y_pred_oh, axis=1).flatten() 
                tmp_loss = self.cross_entropy(y_train_oh, y_pred_oh)
                tmp_f1 = self.f1_score(y_train, y_pred)
                print(f"\nEpoch {epoch + 1}/{epochs}. Loss = {tmp_loss}. F1 Score = {tmp_f1}.")          
            if epoch >= 10:
                loss.append(tmp_loss)
            else:
                loss.append(None)

        plt.figure(figsize=(14, 3))
        plt.plot(range(epochs), loss, 'o')
        plt.title('Loss over epochs')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.show()
        #self.plot_weights_evolution()


    def backpropagate(self, activations, y_batch):
        """
        Performs backpropagation to compute gradients for weight and bias updates.

        Parameters:
        - activations (list of numpy arrays): Activations from feedforward pass.
        - y_batch (numpy array): Expected output.

        Returns:
        - Tuple (weight_gradients, bias_gradients) where:
            - weight_gradients (list of numpy arrays): Gradients for each layer’s weights.
            - bias_gradients (list of numpy arrays): Gradients for each layer’s biases.
        """
        
        batch_size = y_batch.shape[0]

        if self.classification:
            y_pred = activations[-1]  
        else:
            y_pred = activations[-1].reshape(-1, 1)
        
        if self.classification:
            deltas = [(y_pred - y_batch).reshape(-1, y_pred.shape[1])]
        else:
            error = (y_pred - y_batch).reshape(-1, 1)
            deltas = [error * self.layers[-1].activation_derivative(y_pred)]

        for i in range(len(self.layers) - 2, -1, -1):
            layer = self.layers[i]
            next_layer = self.layers[i + 1]
            
            error = deltas[-1] @ next_layer.weights.T  
            deltas.append(error * layer.activation_derivative(activations[i + 1]))

        deltas.reverse() 

        weight_gradients = [(activations[i].T @ deltas[i]) / batch_size for i in range(len(self.layers))]
        bias_gradients = [np.mean(deltas[i], axis=0, keepdims=True) for i in range(len(self.layers))]

        return weight_gradients, bias_gradients
    
   
    def plot_weights_evolution(self):
        """
        Plots the evolution of weights over epochs for each layer.

        Returns:
        - None (displays plots).
        """
        epochs = len(self.weights_history)
        for layer_idx in range(len(self.layers)):
            weights_per_epoch = np.array([epoch[layer_idx] for epoch in self.weights_history])  
            
            num_weights = weights_per_epoch[0].size  
            weights_per_epoch = weights_per_epoch.reshape(epochs, num_weights)

            plt.figure(figsize=(8, 5))
            for w in range(num_weights):
                plt.plot(range(epochs), weights_per_epoch[:, w], alpha=0.7)

            plt.title(f'Weight Evolution in Layer {layer_idx + 1}')
            plt.xlabel('Epoch')
            plt.ylabel('Weight Value')
            plt.legend()
            plt.show()       
            
    def one_hot_encode(self, y, num_classes=None):
        """
        Converts class labels to one-hot encoding.

        Parameters:
        - y (numpy array): Class labels.

        Returns:
        - numpy array: One-hot encoded labels.
        """
        if num_classes is None:
            num_classes = len(np.unique(y))
            
        y = y.flatten()
        y = y.astype(int)
        y_oh = np.zeros((len(y), num_classes))
        y_oh[np.arange(len(y)), y] = 1
        return y_oh            

   
    def set_weights_and_biases(self, layer_idx, W, b):
        """
        Sets custom weights and biases for a specific layer.

        Parameters:
        - layer_idx (int): Index of the layer (0-based).
        - W (numpy array): New weight matrix for the layer.
        - b (numpy array): New bias vector for the layer.

        Returns:
        - None
        """
        self.layers[layer_idx].weights = W
        self.layers[layer_idx].biases = b


    def mse(self, y_true, y_pred):
        """
        Computes the Mean Squared Error (MSE) loss.

        Parameters:
        - y_true (numpy array): True target values.
        - y_pred (numpy array): Predicted values from the network.

        Returns:
        - float: Computed MSE loss.
        """
        return np.mean((y_true - y_pred) ** 2)
    
    
    def f1_score(self, y_true, y_pred):
        """
        Computes the F1-score for multi-class classification using macro-averaging.
        
        :param y_true: List or array of true class labels
        :param y_pred: List or array of predicted class labels
        :return: Macro-averaged F1-score value
        """
        classes = set(y_true) | set(y_pred)
        total_f1 = 0
        
        for cls in classes:
            tp = sum((yt == cls and yp == cls) for yt, yp in zip(y_true, y_pred))
            fp = sum((yt != cls and yp == cls) for yt, yp in zip(y_true, y_pred))
            fn = sum((yt == cls and yp != cls) for yt, yp in zip(y_true, y_pred))
            
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            
            f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            total_f1 += f1
        
        return total_f1 / len(classes) if classes else 0

    def cross_entropy(self, y_true, y_pred):
        """
        Computes the cross-entropy loss for multi-class classification.

        Parameters:
        - y_true (numpy array): True target values (one-hot encoded).
        - y_pred (numpy array): Predicted probabilities from the network.

        Returns:
        - float: Computed cross-entropy loss.
        """
        epsilon = 1e-15  
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  
        return -np.mean(np.sum(y_true * np.log(y_pred), axis=1))

In [6]:
def create_plots(X_train, y_train, X_test, y_test, y_train_pred, y_test_pred, classification=False):
    if not classification:
        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.plot(X_train, y_train, 'o', label='Actual')
        plt.plot(X_train, y_train_pred, 'o', label='Prediction')
        plt.title('Training Set')
        plt.xlabel('X_train')
        plt.ylabel('y_train')
        plt.legend()
        plt.grid(True)

        plt.subplot(1, 2, 2)
        plt.plot(X_test, y_test, 'o', label='Actual')
        plt.plot(X_test, y_test_pred, 'o', label='Prediction')
        plt.title('Test Set')
        plt.xlabel('X_test')
        plt.ylabel('y_test')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()
    else:
        plt.figure(figsize=(8, 3))

        plt.subplot(1, 2, 1)
        plt.scatter(X_train.iloc[:, 0], X_train.iloc[:, 1], c=y_train)
        plt.title("True Labels - TRAIN")

        plt.subplot(1, 2, 2)
        plt.scatter(X_train.iloc[:, 0], X_train.iloc[:, 1], c=y_train_pred)
        plt.title("Predicted Labels - TRAIN")

        plt.tight_layout()
        plt.show()

        plt.figure(figsize=(8, 3))

        plt.subplot(1, 2, 1)
        plt.scatter(X_test.iloc[:, 0], X_test.iloc[:, 1], c=y_test)
        plt.title("True Labels - TEST")

        plt.subplot(1, 2, 2)
        plt.scatter(X_test.iloc[:, 0], X_test.iloc[:, 1], c=y_test_pred)
        plt.title("Predicted Labels - TEST")

        plt.tight_layout()
        plt.show()

# **Tests**

In [None]:
X_train = pd.read_csv('data/feature_engineering/processed_x_train.csv')
X_test = pd.read_csv('data/feature_engineering/processed_x_valid.csv')

y_train_orig = pd.read_csv('data/y_train.csv')
y_train = y_train_orig.copy().drop(columns=['clicked'])
y_train

y_test_orig = pd.read_csv('data/y_valid.csv')
y_test = y_test_orig.copy().drop(columns=['clicked'])
y_test

NN = MLP(layers=[
    {'input_dim': , 'output_dim': 10, 'activation': 'relu'},
    {'input_dim': 10, 'output_dim': 5, 'activation': 'relu'},
    {'input_dim': 5, 'output_dim': 2, 'activation': 'softmax'}
], 
         classification=True)

Unnamed: 0,label
0,8
1,1
2,6
3,7
4,3
...,...
2781,2
2782,7
2783,4
2784,7
