In [1]:
from math import floor

from typing import List
import warnings

from tqdm.notebook import tqdm

import logging
from io import StringIO

import numpy as np
import pandas as pd

### Define signature of a generic activation function class.

In [2]:
from abc import ABC, abstractmethod
class ActivationFunction(ABC):
    def __init__(self):
        pass
    @abstractmethod
    def function(self):
        pass
    @abstractmethod
    def vectorized_function(self):
        pass
    @abstractmethod
    def derivative(self):
        pass
    @abstractmethod
    def __call__(self):
        pass

### Define all activation functions

In [3]:
class Sigmoid(ActivationFunction):
    def __call__(self, x):
        return self.vectorized_function(x)
    def function(self, x):
        x = np.clip(x, -700, 700)
        return 1/(1+np.exp(-x))
    def vectorized_function(self, x):
        return np.vectorize(self.function)(x)
    def derivative(self, x):
        return self.vectorized_function(x)*(1-self.vectorized_function(x))
    def vectorized_derivative(self, x):
        return self.vectorized_function(x)*(1-self.vectorized_function(x))

class Tanh(ActivationFunction):
    def __call__(self, x):
        return self.vectorized_function(x)
    def function(self, x):
        x = np.clip(x, -100, 100)
        # print(x)
        expr1, expr2 = np.exp(x), np.exp(-x)
        return (expr1-expr2)/(expr1+expr2)
        
    def vectorized_function(self, x):
        return np.vectorize(self.function)(x)
    def derivative(self, x):
        expr1, expr2 = np.exp(x), np.exp(-x)
        return 4/((expr1+expr2)**2)
    def vectorized_derivative(self, x):
        return self.derivative(x)

class ReLU(ActivationFunction):
    def __call__(self, x):
        return self.vectorized_function(x)
    def function(self, x):
        return max(0, x)
    def derivative(self, x):
        if max(0,x) == 0:
            return 0
        return 1
    def vectorized_function(self, x):
        return np.vectorize(self.function)(x)
    def vectorized_derivative(self, x):
        return np.vectorize(self.derivative)(x)

class LeakyReLU(ActivationFunction):
    def __call__(self, x):
        return self.vectorized_function(x)
    def function(self, x):
        return max(0.3*x, x)
    def derivative(self, x):
        if max(0.3*x, x) == x:
            return 1
        return 0.3
    def vectorized_function(self, x):
        return np.vectorize(self.function)(x)
    def vectorized_derivative(self, x):
        return np.vectorize(self.derivative)(x)

class Linear:
    def __call__(self, x):
        return x
    def vectorized_derivative(self, x):
        return np.ones_like(x)

### Import existing loss functions and code new ones

In [4]:
def huberLoss(y_true, y_pred, delta=10):
    err = y_true - y_pred
    n_samples = y_true.shape[0]
    abs_err = np.abs(err)
    delta_sq = 0.5*(delta ** 2)
    huber_loss_vectorized = np.vectorize(lambda x: (x**2)*0.5 if x <= delta else delta*x - delta)
    huber_loss_vec = huber_loss_vectorized(abs_err)
    return np.sum(huber_loss_vec)/n_samples
    # return np.sum(huber_loss_vectorized(abs_err))

In [5]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, log_loss

class Crossentropy:
    def __call__(self, y_true, y_pred):
        return self.function(y_true, y_pred)
    def function(self, y_true, y_pred):
        return log_loss(y_true, y_pred, labels = np.arange(y_pred.shape[1]))
    def derivative(self, y_true, y_pred):
        # return an n x 1 matrix
        epsilon = 1e-6
        n_samples = y_true.shape[0]
        y_pred_capped = np.clip(y_pred, epsilon, 1-epsilon) # cap predicted probabilities to avoid floating point issues after taking reciprocal
        y_pred_inv = 1/y_pred_capped # 1/y_hat
        n_classes = y_pred.shape[1]
        y_true_proba = np.eye(n_classes)[y_true] # for ease of computing the derivative, basically a one-hot encoding
        derivative_loss_arr = -np.multiply(y_true_proba, y_pred_inv)# -[y/y_hat, (1-y)/(1-y_hat)]
        derivative_loss_arr = np.sum(derivative_loss_arr, axis=1) # sum up, i.e., y/y_hat + (1-y)/(1-y_hat)  
        return derivative_loss_arr/n_samples

class MSE:
    def __call__(self, y_true, y_pred):
        return self.function(y_true, y_pred)
    def function(self, y_true, y_pred):
        return mean_squared_error(y_true, y_pred)
    def derivative(self, y_true, y_pred):
        n_samples = y_true.shape[0]
        y_true_reshaped = y_true.copy()
        y_true_reshaped = y_true_reshaped.reshape(-1, 1)
        return (-2*(y_true_reshaped - y_pred))/n_samples

class MAE:
    def __call__(self, y_true, y_pred):
        return self.function(y_true, y_pred)
    def function(self, y_true, y_pred):
        return mean_absolute_error(y_true, y_pred)
    def derivative(self, y_true, y_pred):
        n_samples = y_true.shape[0]
        y_true_reshaped = y_true.copy()
        y_true_reshaped = y_true_reshaped.reshape(-1, 1)
        return np.where(y_true > y_pred, 1, -1)/n_samples

class HuberLoss:
    def __init__(self, delta=10):
        self.delta=delta
    def __call__(self, y_true, y_pred):
        return self.function(y_true, y_pred)
    def function(self, y_true, y_pred):
        return huberLoss(y_true, y_pred, self.delta)
    def derivative(self, y_true, y_pred):
        n_samples = y_true.shape[0]
        y_true_reshaped = y_true.copy()
        y_true_reshaped = y_true_reshaped.reshape(-1, 1)
        err = y_true_reshaped - y_pred
        huber_loss_derivative_vectorized = np.vectorize(lambda x: x if np.abs(x) <= self.delta else -self.delta if x < 0 else self.delta)
        return huber_loss_derivative_vectorized(err)/n_samples



#### Test on binary classification

In [None]:
y_true, y_pred = np.random.randint(0, 2, size=(3,)), np.random.uniform(0,1,(3,1))
print(y_true,"\n\n", y_pred)
y_pred_proba = np.hstack([y_pred, 1-y_pred])
print(y_pred_proba)

In [None]:
ce = Crossentropy()
print(f"Loss = {round(ce(y_true, y_pred_proba), 2)}, Derivative = {ce.derivative(y_true, y_pred_proba)}.")

#### Test on Multi-class classification

In [None]:
y_true_multiclass, y_pred_multiclass_preprocessed = np.random.randint(0, 3, size=(5,)), np.random.uniform(0,1,(5, 3))
y_pred_multiclass = y_pred_multiclass_preprocessed / y_pred_multiclass_preprocessed.sum(axis=1, keepdims=True)
print(f"{y_true_multiclass}\n\n{y_pred_multiclass}\n")

In [None]:
print(f"Loss = {round(ce(y_true_multiclass, y_pred_multiclass), 2)}, Derivative = {ce.derivative(y_true_multiclass, y_pred_multiclass), 2}.")

Rest assured, the return value is a numpy.ndarray

#### Test on Regression using Mean Squared Error

In [None]:
yt, yp = np.random.randint(0,10,(4,)), np.random.randint(0,10,(4,))
print(f"{yt}\n\n{yp}")

In [None]:
mse_loss_fn = MSE()
print(f"Loss = {mse_loss_fn(yt, yp)}\nderivative = \n{mse_loss_fn.derivative(yt, yp)}\n")

#### Test on Regression using Mean Absolute Error

In [None]:
mae_loss_fn = MAE()
print(f"Loss = {mae_loss_fn(yt, yp)}\nderivative = \n{mae_loss_fn.derivative(yt, yp)}\n")

#### Test on Regression using Huber Loss

In [None]:
huber_loss_fn = HuberLoss(delta=8)
print(f"Delta = {huber_loss_fn.delta}, Loss = {huber_loss_fn(yt, yp)}\nderivative = \n{huber_loss_fn.derivative(yt, yp)}\n")

### Define Layer and Sequential Model

In [41]:
class ListHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.log = []

    def emit(self, record):
        self.log.append(self.format(record))

class Layer():
    
    __valid_activations = {'sigmoid': Sigmoid, 'tanh': Tanh, 'relu': ReLU, 'leaky_relu': LeakyReLU, 'linear': Linear}
    
    def __init__(self, activation, in_dim, out_dim, learning_rate=0.01):
        if activation.lower() not in list(Layer.__valid_activations.keys()):
            raise Exception(f"Valid activations are {Layer.__valid_activations}.")
        self.activation = Layer.__valid_activations[activation.lower()]()
        self.learning_rate = learning_rate
        self.in_dim = in_dim
        self.out_dim = out_dim
        self.weights = np.random.uniform(-1,1, size=(in_dim, out_dim))
        self.batch_size = None

        logger_name = f'MyClass_{id(self)}'  # Generate a unique name for the logger
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.DEBUG)
        self.log_handler = ListHandler()
        self.log_handler.setLevel(logging.DEBUG)
        self.logger.addHandler(self.log_handler)

        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - \n%(message)s')
        self.log_handler.setFormatter(formatter)

    def forward_compute(self, X, compute_gradient=False, print_logs=False):
        '''
        compute_gradient: bool , is True if updates are to be performed. is False if only a prediction is to be made in a single forward pass,
        '''
        output_prime = X.dot(self.weights) # of order n x out_dim
        output_val = self.activation(output_prime) # of order n x out_dim

        if compute_gradient:
            # computing stuff for eventual backpropagation
            self._activation_gradient = self.activation.vectorized_derivative(output_prime) # of order n x out_dim
            self._input = X.copy()
        if print_logs:
            self.logger.info(f"\tFORWARD COMPUTE: Input to this layer = \n\t{X[:5]}\n")
            self.logger.info(f"\tFORWARD COMPUTE: Weights for this layer = \n\t{self.weights}\n")
            self.logger.info(f"\tFORWARD COMPUTE: Output from this layer = \n\t{output_val[:5]}\n")
        
        return output_val

    def backprop_compute(self, prev_grad_multipliers, print_logs=False):
        gradient_mat = 1
        
        # To Do: find a way of multiplying pre v_grad_multipliers with this layer's gradient multiplier matrix.
        '''
        Needs: 
         1. current-layer's activation gradient matrix(as a function of this layer's input, i.e. prev layers output)
         2. current-layer's input matrix
        '''
        output_prime = self._input.dot(self.weights) # of order n x out_dim
        output_val = self.activation(output_prime) # of order n x out_dim
        
        activ_prev_layer_output_element_wise_product = np.multiply(prev_grad_multipliers, self._activation_gradient) # of order n x out_dim
        weights_gradient = self._input.T.dot(activ_prev_layer_output_element_wise_product)
        update_qty = self.learning_rate * weights_gradient
        old_weights = self.weights.copy()
        self.weights = self.weights - update_qty

        if print_logs:
            self.logger.info(f"\tBACKPROP: Weights before update of this layer = \n\t{old_weights}\n")
            self.logger.info(f"\tBACKPROP: Weights-gradient for this layer = \n\t{weights_gradient}\n")
            self.logger.info(f"\tBACKPROP: Weight-update quantity for this layer = \n\t{update_qty}\n")
            self.logger.info(f"\tBACKPROP: Weights post update for this layer = \n\t{self.weights}\n")
        

        send_mat_to_prev_layer = activ_prev_layer_output_element_wise_product.dot(self.weights.T) # of order n x in_dim, 
                                                                      # which is basically n x out_dim for the previous layer.
        return send_mat_to_prev_layer

    def print_logs(self):
        for log_record in self.log_handler.log:
            print(log_record)

In [79]:
class ModelSequential():
    __valid_loss_functions = {
        'crossentropy': Crossentropy, 
        'mse': MSE,
        'mae': MAE,
        'huber': HuberLoss
    }

    __valid_gd_types = ['batch', 'mini_batch', 'stochastic']

    # def __init__(self, layers_arr: List, metrics_to_track, early_stopping=False):
    def __init__(self, layers_arr: List[Layer]):
        self._layers = layers_arr
        self.loss_arr = []

    def compile(self, loss_function, n_iter = 100):
        if loss_function.lower() not in ModelSequential.__valid_loss_functions:
            raise Exception(f"Loss functions should be any of {ModelSequential.__valid_loss_functions.keys()}.")
        
        self.loss_function = ModelSequential.__valid_loss_functions[loss_function.lower()]()
        self.n_iter = n_iter

    def reweight_layers(self):
        n_layers = len(self._layers)
        for i in range(n_layers):
            in_dim, out_dim = self._layers[i].in_dim, self._layers[i].out_dim
            self._layers[i].weights = np.random.uniform(-1,1, size=(in_dim, out_dim))
            print(f"Weights for layer {i} set to \n{self._layers[i].weights}\n\n")

    def batch_fit(self, X, y, print_logs=False):
        '''
        for a given samples of X(features),y(target) , fit the model for n_iter iterations.
        X: features, all numeric columns, n x m matrix
        y: target variable, all numeric columns, n x c matrix , c: no. of classes/no. of numerical variables.
        '''

        self.loss_arr.append(self.loss_function(y_true=y, y_pred=self.predict(X)))        
        
        n_layers = len(self._layers)
        input_for_next_layer = X.copy()

        # forward compute
        for i in range(n_layers):
            try:
                input_for_next_layer = self._layers[i].forward_compute(input_for_next_layer, compute_gradient=True)
            except ValueError as ve:
                raise Exception(f"Forward compute failed at layer {i} with the following exception:\n{ve}")
            except RuntimeWarning as rw:
                raise Exception(f"Forward compute failed at layer {i} with the following exception:\n {rw}")

        y_pred = self.predict(X)
        input_for_next_layer = self.loss_function.derivative(y_true=y, y_pred=y_pred)

    
        # backprop
        for i in range(n_layers-1, -1, -1):
            with warnings.catch_warnings():
                warnings.filterwarnings("error")
                try:
                    input_for_next_layer = self._layers[i].backprop_compute(input_for_next_layer, print_logs)
                except ValueError as ve:
                    return Exception(f"Backpropagation failed at layer {i} with the following exception:\n{ve}")
                except RuntimeWarning as rw:
                    raise Exception(f"Backprop failed at layer {i} with the following exception:\n {rw}")
        
        self.loss_arr.append(self.loss_function(y_true=y, y_pred=self.predict(X)))
    
    def fit(self, X, y, gd_type='mini_batch', batch_size=None):
        if gd_type.lower() not in list(ModelSequential.__valid_gd_types):
            raise Exception(f"Valid Gradient descent types are {ModelSequential.__valid_gd_types}.")

        n_samples = X.shape[0]
        
        if gd_type.lower() == 'batch':
            batch_size = n_samples
        elif gd_type.lower() == 'mini_batch':
            if type(batch_size) != 'int' or type(batch_size) != 'int64' or batch_size >= n_samples:
                batch_size=64
        else:
            batch_size = 1
        print(f"Using batch size as {batch_size}...\n\n")
        n_batches = floor(n_samples/batch_size)
        
        # iterate over all epochs
        for iter_ in tqdm(range(self.n_iter), desc='Iterations...'):
            # iterate over each batch_size sized batch
            for batch_iter in tqdm(range(n_batches), desc='Batches...->'):
                X_new, y_new = X[batch_iter * batch_size:(batch_iter + 1) * batch_size, :].copy(), y[batch_iter * batch_size:(batch_iter + 1) * batch_size].copy()
                if iter_ == 0 and batch_iter == 0:
                    self.batch_fit(X_new, y_new, print_logs=True)
                else:
                    self.batch_fit(X_new, y_new)

    def print_layer_logs(self):
        for i in range(n_layers):
            print(f"Handling layer {i}...")
            self._layers[i].print_logs()
    
    def predict(self, X):
        n_layers = len(self._layers)
        input_for_next_layer = X.copy()

        # forward compute
        for i in range(n_layers):
            with warnings.catch_warnings():
                try:
                    input_for_next_layer = self._layers[i].forward_compute(input_for_next_layer)
                except ValueError as ve:
                    raise Exception(f"Forward compute failed at layer {i} with the following exception:\n{ve}")
                except RuntimeWarning as rw:
                    raise Exception(f"Forward compute failed at layer {i} with the following exception:\n {rw}")
        return input_for_next_layer

### Test

In [80]:
from sklearn.datasets import load_diabetes

X, y = load_diabetes(return_X_y=True)
print(X.shape, y.shape)

(442, 10) (442,)


In [81]:
l1, l2, l3 = Layer('sigmoid', X.shape[1], 3, learning_rate=0.05), Layer('leaky_relu', 3, 2, learning_rate=0.05), Layer('linear', 2, 1, learning_rate=0.05)

In [82]:
# Load the weights_list from the file using NumPy
loaded_weights = np.load('model_weights.npy', allow_pickle=True)

# Separate the loaded weights into individual arrays
loaded_layer1_weights = loaded_weights[0][0].copy()
loaded_layer2_weights = loaded_weights[1][0].copy()
loaded_layer3_weights = loaded_weights[2][0].copy()

# Now you have loaded the weights into separate NumPy arrays
# loaded_layer1_weights, loaded_layer2_weights, loaded_layer3_weights
l1.weights = loaded_layer1_weights
l2.weights = loaded_layer2_weights
l3.weights = loaded_layer3_weights

In [83]:
print(loaded_layer3_weights)

[[0.65107906]
 [0.5028274 ]]


In [84]:
print(loaded_layer2_weights)

[[-0.3344916   0.68226814]
 [-0.31428093  0.6829982 ]
 [-1.0913304  -0.75725216]]


In [85]:
model = ModelSequential([l1, l2, l3])

In [86]:
# model.compile(loss_function='huber', n_iter=10)
model.compile(loss_function='mse', n_iter=10)

In [87]:
init_pred = model.predict(X)
print(model.loss_function(y_true=y, y_pred=init_pred))

29079.742540893883


In [88]:
# model.reweight_layers()

n_layers = len(model._layers)
for i in range(n_layers):
    print(f"Layer {i}\n{model._layers[i].weights}\n\n")


model.fit(X, y, batch_size=10)

Layer 0
[[ 0.38481784 -0.2873984   0.2569083 ]
 [ 0.63190603  0.0207603  -0.01701885]
 [-0.14716172 -0.3139873   0.13053173]
 [-0.5871992   0.58255076 -0.06661969]
 [-0.22049424 -0.00403428 -0.14160568]
 [-0.3763931  -0.22384077 -0.27747136]
 [-0.04296637 -0.07706636  0.31943142]
 [-0.02129191 -0.07783592 -0.5941539 ]
 [-0.17432415 -0.03620058  0.60362446]
 [ 0.40245163  0.31902552 -0.0473246 ]]


Layer 1
[[-0.3344916   0.68226814]
 [-0.31428093  0.6829982 ]
 [-1.0913304  -0.75725216]]


Layer 2
[[0.65107906]
 [0.5028274 ]]


Using batch size as 64...




Iterations...:   0%|          | 0/10 [00:00<?, ?it/s]

Batches...->:   0%|          | 0/6 [00:00<?, ?it/s]

Exception: Backprop failed at layer 0 with the following exception:
 invalid value encountered in multiply

In [89]:
model._layers[2].print_logs()

2023-11-29 09:59:58,709 - MyClass_6054062064 - INFO - 
	BACKPROP: Weights before update of this layer = 
	[[0.65107906]
 [0.5028274 ]]

2023-11-29 09:59:58,709 - MyClass_6054062064 - INFO - 
	BACKPROP: Weights-gradient for this layer = 
	[[ 71.88931727]
 [-82.75701192]]

2023-11-29 09:59:58,709 - MyClass_6054062064 - INFO - 
	BACKPROP: Weight-update quantity for this layer = 
	[[ 3.59446586]
 [-4.1378506 ]]

2023-11-29 09:59:58,709 - MyClass_6054062064 - INFO - 
	BACKPROP: Weights post update for this layer = 
	[[-2.9433868]
 [ 4.640678 ]]



In [90]:
model._layers[1].print_logs()

2023-11-29 09:59:58,710 - MyClass_6054063552 - INFO - 
	BACKPROP: Weights before update of this layer = 
	[[-0.3344916   0.68226814]
 [-0.31428093  0.6829982 ]
 [-1.0913304  -0.75725216]]

2023-11-29 09:59:58,710 - MyClass_6054063552 - INFO - 
	BACKPROP: Weights-gradient for this layer = 
	[[ 121.17662885 -636.84201588]
 [ 121.07777436 -636.32248752]
 [ 121.881573   -640.54683959]]

2023-11-29 09:59:58,710 - MyClass_6054063552 - INFO - 
	BACKPROP: Weight-update quantity for this layer = 
	[[  6.05883144 -31.84210079]
 [  6.05388872 -31.81612438]
 [  6.09407865 -32.02734198]]

2023-11-29 09:59:58,710 - MyClass_6054063552 - INFO - 
	BACKPROP: Weights post update for this layer = 
	[[-6.39332305 32.52436894]
 [-6.36816965 32.49912256]
 [-7.18540906 31.27008982]]



In [91]:
model._layers[0].print_logs()

2023-11-29 09:59:58,713 - MyClass_6051467120 - INFO - 
	BACKPROP: Weights before update of this layer = 
	[[ 0.38481784 -0.2873984   0.2569083 ]
 [ 0.63190603  0.0207603  -0.01701885]
 [-0.14716172 -0.3139873   0.13053173]
 [-0.5871992   0.58255076 -0.06661969]
 [-0.22049424 -0.00403428 -0.14160568]
 [-0.3763931  -0.22384077 -0.27747136]
 [-0.04296637 -0.07706636  0.31943142]
 [-0.02129191 -0.07783592 -0.5941539 ]
 [-0.17432415 -0.03620058  0.60362446]
 [ 0.40245163  0.31902552 -0.0473246 ]]

2023-11-29 09:59:58,713 - MyClass_6051467120 - INFO - 
	BACKPROP: Weights-gradient for this layer = 
	[[ 72.71771589  72.6645262   70.46615276]
 [ 11.06609168  10.97005852  10.68479269]
 [-44.60487937 -44.66964115 -43.23590991]
 [ 29.57770543  29.43229677  28.66354577]
 [118.88668169 118.66370185 114.90403887]
 [161.59461344 161.37765908 156.10981706]
 [-25.56196236 -25.53104693 -24.49743414]
 [ 66.97099186  66.80690729  64.60558725]
 [-48.84420494 -48.94586891 -47.2241913 ]
 [114.0332815  113.921

In [None]:
y_pred = model.predict(X)
model.loss_function(y_true=y, y_pred=y_pred)

In [None]:
n_layers = len(model._layers)
for i in range(n_layers):
    print(f"Layer {i}\n{model._layers[i].weights}\n\n")

In [None]:
print(model.loss_arr)

### Print all layer's weights

In [None]:
n_layers = len(model._layers)
for i in range(n_layers):
    print(f"Layer {i}\n{model._layers[i].weights}\n\n")

In [None]:
print(dir(model))

In [None]:
import sys
print(sys.getsizeof(model))