In [None]:
import numpy as np
import math

np.random.seed(42)

class Neural_Network(): 
    def __init__(self, input_size, hidden_size, output_size, l_rate, activation='sigmoid', optimizer ='sgd'):
        # initialize data_size, l_rate, and optimizer
        self.input_size   = input_size
        self.hidden_size  = hidden_size
        self.output_size  = output_size
        self.l_rate       = l_rate
        self.optimizer    = optimizer

        # all results in forward-backward are stored here! 
        self.storage = {}
        self.grads   = {}
        
        # initialize momentum_optimizer
        if self.optimizer == 'momentum': 
            self.momentum_opt = self.initialize_momentum_optimizer()
        
        # initialize activation function
        if activation == 'sigmoid': 
            self.activation = self.sigmoid
        elif activation == 'relu': 
            self.activation = self.relu
        else: 
            raise ValueError("Please choose 'sigmoid' or 'relu' for activation function")
        
        # intialize weight and bias parameter
        self.params = self.initialize()
        
    def apply_dropout(self, x, dropout_prob=0.05):
        # dropout is only used if it is activated and run in training 
        epsilon = 1e-8  # prevent division by zero or small denominators

        self.dropout_mask = np.random.binomial(1, 1-dropout_prob, size = x.shape)

        # make sure that dropout_mask's data type is the same with x's
        self.dropout_mask = self.dropout_mask.astype(x.dtype)
        
        # add epsilon, just in case 1-self.dropout_prob < epsilon, we go with epsilon
        # this is added to avoid denominator being to small.
        x = (x * self.dropout_mask) / max((1 - dropout_prob, epsilon))
        return x
    
    def sigmoid(self, x, derivative=False):
        sig = 1 / (1 + np.exp(-x))
        if derivative:
            return sig * (1 - sig)
        return sig
    
    def relu(self, x, derivative=False): 
        if derivative:
            x = np.where(x < 0, 0, x)
            x = np.where(x >= 0, 1, x)
            return x 
        return np.maximum(0, x)

    def softmax(self, x): 
        exps = np.exp(x - x.max())
        return exps / np.sum(exps, axis = 0) 

    def initialize(self):
        # if sigmoid, we use Xavier (1.0), else HE (2.0)
        factor = 1.0 if self.activation == self.sigmoid else 2.0
        
        scaling_1 = np.sqrt(factor/self.input_size)
        scaling_2 = np.sqrt(factor/self.hidden_size[0])
        scaling_3 = np.sqrt(factor/self.hidden_size[1])
        scaling_4 = np.sqrt(factor/self.hidden_size[2])

        # create value of each parameter randomly and scale the values using the choosen factor
        params = {
        'w1':np.random.randn(self.hidden_size[0], self.input_size) * scaling_1,
        'b1':np.zeros((self.hidden_size[0], 1)) * scaling_1,

        'w2':np.random.randn(self.hidden_size[1], self.hidden_size[0]) * scaling_2,
        'b2':np.zeros((self.hidden_size[1], 1)) * scaling_2,
        
        'w3':np.random.randn(self.hidden_size[2], self.hidden_size[1]) * scaling_3,
        'b3':np.zeros((self.hidden_size[2], 1)) * scaling_3, 

        'w4':np.random.randn(self.output_size, self.hidden_size[2]) * scaling_4,
        'b4':np.zeros((self.output_size, 1)) * scaling_4   
        }
        
        return params        

    def forward(self, x, training=True, dropout=True):
        self.storage['x'] = x
         
        # The multiplication happens first, and then the neuron is deactivated by dropout
        # input layer -> hidden layer 1
        self.storage['z1'] = np.matmul(self.params['w1'], self.storage['x'].T) + self.params['b1']

        self.storage['a1'] = self.activation(self.storage['z1'], derivative=False)

        # drop some neuron in hidden layer 1
        if training and dropout:
            self.storage['a1'] = self.apply_dropout(self.storage['a1']) 

        # hidden layer 1 -> hidden layer 2
        self.storage['z2'] = np.matmul(self.params['w2'], self.storage['a1']) + self.params['b2']
        self.storage['a2'] = self.activation(self.storage['z2'], derivative=False)

        # drop some neuron in hidden layer 2
        if training and dropout:
            self.storage['a2'] = self.apply_dropout(self.storage['a2']) 

        # hidden layer 2 -> hidden layer 3
        self.storage['z3'] = np.matmul(self.params['w3'], self.storage['a2']) + self.params['b3']
        self.storage['a3'] = self.activation(self.storage['z3'], derivative=False)

        # drop some neuron in hidden layer 3
        if training and dropout:
            self.storage['a3'] = self.apply_dropout(self.storage['a3']) 

        # hidden layer 3 -> output layer
        self.storage['z4']     = np.matmul(self.params['w4'], self.storage['a3']) + self.params['b4']
        self.storage['output'] = self.softmax(self.storage['z4'])

        return self.storage['output']
        
    def cross_entropy_loss(self, output, y, lasso=False, ridge=False, lambda_l=1e-5, lambda_r=1e-5):
        # add clipping to avoid log 0 problems
        epsilon    = 1e-15
        output     = np.clip(output, epsilon, 1.0)
        
        # computing cross-entropy loss
        l_sum = np.sum(np.multiply(y.T, np.log(output)))
        m     = y.shape[0]
        avg_loss = -(1./m) * l_sum 
        
        # if lasso is implemented
        if lasso:
            l_reg  = sum(np.abs(values).sum() for key, values in self.params.items() if 'w' in key)
            avg_loss += lambda_l * l_reg
        
        # if ridge is implemented
        if ridge:
            r_reg  = sum(np.square(values).sum() for key, values in self.params.items() if 'w' in key)
            avg_loss += lambda_r * r_reg
        
        return avg_loss
    
    def accuracy(self, y, output):
        return np.mean(np.argmax(y, axis = 1) == np.argmax(output.T, axis = 1))

    def backward(self, output, y, lasso=False, ridge=False, lambda_l=1e-5, lambda_r=1e-5): 
        current_batch_size = y.shape[0]
        output = self.storage['output'].copy()

        # going back: output layer -> hidden layer 3
        dz4 = output - y.T
        self.grads['w4'] = (1./current_batch_size) * (dz4 @ self.storage['a3'].T)
        self.grads['b4'] = (1./current_batch_size) * np.sum(dz4, axis=1, keepdims=True)

        # going back: hidden layer 3 -> hidden layer 2
        da3 = self.params['w4'].T @ dz4 
        dz3 = da3 * self.sigmoid(self.storage['z3'], derivative = True)
        self.grads['w3'] = (1./current_batch_size) * (dz3 @ self.storage['a2'].T)
        self.grads['b3'] = (1./current_batch_size) * np.sum(dz3, axis=1, keepdims=True)

        # going back: hidden layer 2 -> hidden layer 1
        da2 = self.params['w3'].T @ dz3 
        dz2 = da2 * self.sigmoid(self.storage['z2'], derivative = True)
        self.grads['w2'] = (1./current_batch_size) * (dz2 @ self.storage['a1'].T) 
        self.grads['b2'] = (1./current_batch_size) * np.sum(dz2, axis=1, keepdims=True)

        # going back: hidden layer 1 -> input layer
        da1 = self.params['w2'].T @ dz2 
        dz1 = da1 * self.sigmoid(self.storage['z1'], derivative = True)
        self.grads['w1'] = (1./current_batch_size) * (dz1 @ self.storage['x'])
        self.grads['b1'] = (1./current_batch_size) * np.sum(dz1, axis=1, keepdims=True)
        
        # add lasso and ridge penalty to the weights
        for key in [k for k in self.grads if 'w' in k]: 
            if lasso: 
                self.grads[key] += lambda_l * np.sign(self.params[key])
            if ridge: 
                self.grads[key] += 2 * lambda_r * self.params[key]
                    
        return self.grads 
                        
    def apply_gradient_clipping(self, clip_value):
        # apply clipping to avoid gradient exploding
        for key in [k for k in self.grads if 'w' in k]: 
            grad_norm = np.linalg.norm(self.grads[key])
            if grad_norm > clip_value: 
                    self.grads[key] *= (clip_value/grad_norm)
        return self.grads

    def initialize_momentum_optimizer(self):
        # initialize momentum 
        momentum_opt = {
            'w1': np.zeros_like(self.params['w1']),
            'b1': np.zeros_like(self.params['b1']),
            'w2': np.zeros_like(self.params['w2']),
            'b2': np.zeros_like(self.params['b2']), 
            'w3': np.zeros_like(self.params['w3']),
            'b3': np.zeros_like(self.params['b3']),
            'w4': np.zeros_like(self.params['w4']),
            'b4': np.zeros_like(self.params['b4']), 
        }
        return momentum_opt 

    def optimize(self, beta_momentum=0.9):
        # if sgd technique is used
        if self.optimizer == 'sgd':
            for key in self.params:
                self.params[key] -= self.l_rate * self.grads[key]

        elif self.optimizer == 'momentum':
        # if momentum technique is used
            for key in self.params:
                self.momentum_opt[key] = (beta_momentum  * self.momentum_opt[key] + (1. - beta_momentum) * self.grads[key])
                self.params[key]      -= self.l_rate * self.momentum_opt[key]
        else:
            raise ValueError("We only have 'sgd' and 'momentum'. Please choose one!")
        
        return self.params


In [None]:
import time

def train(model, x_train, y_train, x_test, y_test, epochs, batch_size):
    start_time = time.time()
    for epoch in range(epochs):
        permutation = np.random.permutation(x_train.shape[0])

        # put data train in batch 32 (for example)
        for batch_start in range(0, x_train.shape[0], batch_size):
            indices = permutation[batch_start:min(batch_start+batch_size, x_train.shape[0])]
            batch_x, batch_y = x_train[indices], y_train[indices]

            # forward
            frw_output = model.forward(batch_x)

            # backward
            _ = model.backward(frw_output, batch_y)

            # uncomment it below if clipping is used
            # model.apply_gradient_clipping(clip_value=0.005)

            # update
            model.optimize()

        # Evaluate performance
        # train
        train_out = model.forward(x_train, training=False, dropout=False)
        train_acc = model.accuracy(y_train, train_out)
        train_loss= model.cross_entropy_loss(train_out, y_train)

        # test
        test_out  = model.forward(x_test, training=False, dropout=False)
        test_acc  = model.accuracy(y_test, test_out)
        test_loss = model.cross_entropy_loss(test_out, y_test)

        epoch_time = time.time() - start_time
        print(f"Epoch {epoch+1} - train acc: {train_acc:.4f}, train loss: {train_loss:.4f}, test acc: {test_acc:.4f}, test loss: {test_loss:.4f}, time: {epoch_time:.2f}s")



In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# create and split data
n_samples  = 10000 
n_features = 10
n_informative = 3
n_classes  = 3
n_cluster_per_class = 2
random_state = 25

x, y  = make_classification(n_samples=n_samples, n_features=n_features, n_informative=n_informative, 
                            n_classes=n_classes, n_clusters_per_class=n_cluster_per_class, random_state = random_state)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, stratify=y, random_state=random_state)


In [4]:
from sklearn import preprocessing

# preprocess the data
x_train_scaled = preprocessing.normalize(x_train)
x_test_scaled  = preprocessing.normalize(x_test)

In [5]:
from sklearn.preprocessing import OneHotEncoder

# apply one hot encoding to y data
encoder = OneHotEncoder(sparse_output=False)
y_train = encoder.fit_transform(y_train.reshape(-1, 1))
y_test  = encoder.transform(y_test.reshape(-1, 1))

In [6]:
# make model
model = Neural_Network(input_size=n_features, hidden_size=[128, 64, 32], output_size=n_classes, l_rate=0.1, activation='sigmoid', optimizer='sgd')


In [7]:
# train and test model
train_model = train(model, x_train_scaled, y_train, x_test_scaled, y_test, epochs=150, batch_size=32)

Epoch 1 - train acc: 0.3351, train loss: 1.0975, test acc: 0.3350, test loss: 1.0975, time: 0.49s
Epoch 2 - train acc: 0.3325, train loss: 1.1053, test acc: 0.3325, test loss: 1.1051, time: 1.04s
Epoch 3 - train acc: 0.4238, train loss: 1.1003, test acc: 0.4275, test loss: 1.1000, time: 1.61s
Epoch 4 - train acc: 0.4284, train loss: 1.0896, test acc: 0.4425, test loss: 1.0889, time: 2.75s
Epoch 5 - train acc: 0.4155, train loss: 1.0731, test acc: 0.4240, test loss: 1.0716, time: 3.75s
Epoch 6 - train acc: 0.4888, train loss: 1.0549, test acc: 0.4985, test loss: 1.0518, time: 4.67s
Epoch 7 - train acc: 0.4716, train loss: 1.0470, test acc: 0.4755, test loss: 1.0421, time: 5.48s
Epoch 8 - train acc: 0.4527, train loss: 1.0290, test acc: 0.4565, test loss: 1.0211, time: 6.34s
Epoch 9 - train acc: 0.4382, train loss: 1.0336, test acc: 0.4510, test loss: 1.0239, time: 6.88s
Epoch 10 - train acc: 0.4685, train loss: 1.0165, test acc: 0.4790, test loss: 1.0075, time: 7.40s
Epoch 11 - train ac

In [8]:
# the codes above was modified from https://github.com/lionelmessi6410/Neural-Networks-from-Scratch/blob/main/model.py