In [1]:
import pandas as analytics
import numpy as maths
from numba import cuda , jit
import warnings 
warnings.filterwarnings("ignore")

In [2]:
df_data = analytics.read_csv('data1.csv',header = None , names = ['x0','x1','y'])
df_data

Unnamed: 0,x0,x1,y
0,22.930,-20.9020,1
1,12.410,2.0033,1
2,-20.686,-33.3030,1
3,46.974,-13.5550,1
4,41.965,20.3680,1
...,...,...,...
195,-29.232,28.4410,-1
196,-10.159,7.3065,-1
197,-32.026,14.9380,-1
198,-49.533,-11.1280,-1


In [127]:
class neural_network :
    
    def __init__(self,df_data , number_of_neurons = [2,1], epsilon = 5e-2, alpha = 0.2, hidden_function = 'relu', output_function = 'sigmoid' , initialisation_technique = 'xavier_normal', batch_size = 15) :
        self.number_of_hidden_layers = len(number_of_neurons) - 1
        self.number_of_neurons = number_of_neurons
        self.df_data = df_data
        self.epsilon = epsilon 
        self.N = df_data.shape[0]
        self.alpha = alpha
        self.batch_size = batch_size

        functions_list = {'relu':self.relu , 'sigmoid' : self.sigmoid , 'identity' : self.identity}
        intialisation_list = {'xavier_normal':self.xavier_normal , 'he_normal':self.he_normal, 'xavier_uniform' : self.xavier_uniform, 'he_uniform' : self.he_uniform, 'uniform' : self.uniform }
        loss_function_list = {'sigmoid' : self.cross_entropy , 'identity' : self.rmse }
    
        self.hidden = functions_list[hidden_function]
        self.output = functions_list[output_function]
        self.loss = loss_function_list[output_function]
        self.initialise = intialisation_list[initialisation_technique]

        if self.output == self.sigmoid and self.number_of_neurons[-1] != 1 :
            print("Sigmoid must have only one neuron in the output. Please check the structure !")
            raise ValueError('Sigmoid must have only one neuron in the output. Please check the structure !')

    def relu(self,x) :
        """Hidden Activation Function"""
        return max(0,x)
    
    def sigmoid(self,x):
        """Output Function"""
        return 1 / (1 + maths.exp(-x))

    def identity(self,x):
        return x

    def calculate_derivative(self, data, func):
        if func == self.relu :
            if data > 0 : data = 1
            derivative = func(data)

        if func == self.sigmoid :
            derivative = func(data) * ( 1- func(data))

        if func == self.identity :
            derivative = 1

        if func == self.rmse:
            derivative = data

        if func == self.cross_entropy :
            y = data[0]
            fx = data[1]
            x = data[2]
            derivative = (y-fx)*x

        return derivative

    def xavier_normal(self,fan_in, fan_out):
        mu = 0 
        sigma = (2 / (fan_in + fan_out))**0/5

        weight = maths.random.normal(mu, sigma,(fan_out, fan_in))
        bias = maths.random.normal(mu,sigma,(fan_out,1))

        return weight , bias

    def he_normal(self,fan_in, fan_out):
        mu = 0
        sigma = (2/fan_in) ** 0.5
        
        weight = maths.random.normal(0,sigma,(fan_out,fan_in))
        bias = maths.random.normal(0,sigma,(fan_out,1))

        return weight , bias

    def xavier_uniform(self, fan_in, fan_out) :
        weight = maths.random.uniform(-( 6 ** 0.5 / (fan_in + fan_out)) ** 0.5 , (6 / (fan_in + fan_out)) ** 0.5, (fan_out,fan_in))
        bias = maths.random.uniform(-( 6 ** 0.5 / (fan_in + fan_out)) ** 0.5 , (6 / (fan_in + fan_out)) ** 0.5, (fan_out,1))

        return weight, bias
        
    def he_uniform(self, fan_in, fan_out) :
        weight = maths.random.uniform(-( 6 ** 0.5 / fan_in) ** 0.5 , (6 / fan_out) ** 0.5, (fan_out,fan_in))
        bias = maths.random.uniform(-( 6 ** 0.5 / fan_in) ** 0.5 , (6 / fan_out) ** 0.5, (fan_out,1))

        return weight, bias

    
    def uniform(self, fan_in, fan_out) :
        weight = maths.random.uniform(-1/fan_in**0.5 , 1/fan_in**0.5,(fan_out,fan_in))
        bias = maths.random.uniform(-1/fan_in**0.5 , 1/fan_in**0.5,(fan_out,1))

        return weight, bias

    def rmse(y,x):
        return (y-x)**2

    def cross_entropy(y,x):
        return -(y*maths.log(abs(x)) + (1-y)*maths.log(abs(1-x)))
    
    
    def initialisation(self):
        weights = []
        bias = []
        for l in range(self.number_of_hidden_layers + 1):
            if l == 0 : previous_layer = df_data.shape[1] - 1
            else : previous_layer = self.number_of_neurons[l-1]
            present_layer = self.number_of_neurons[l]

            
            weight , bia = self.initialise(previous_layer , present_layer)    
            
            weights.append(weight)
            bias.append(bia)
        
        self.weights = weights
        self.bias = bias


    def forward_propagation(self,x):
        activations = []
        activations.append(x)   # inputdata point is the initial activation
        
        hidden_layers = []
        
        for l in range(self.number_of_hidden_layers):
            neurons = self.weights[l] @ activations[l] + self.bias[l]   # it actually should be activations[l+1] but because indexing in python starts from 0, so it is activations[l]
            activation = maths.matrix([self.hidden(float(neuron)) for neuron in neurons]).reshape(-1,1)
            hidden_layers.append(neurons)
            activations.append(activation)
   
        neurons = self.weights[-1] @ activations[-1] + self.bias[-1]
        outputs = maths.matrix([float(self.output(x)) for x in neurons]).reshape(-1,1)
        hidden_layers.append(neurons)
        activations.append(outputs)
        
        self.hidden_layers = hidden_layers
        self.activations = activations


    def backward_propagation(self,x,y) :
      
        deltas = []
        if self.loss == self.rmse :
            t = self.activations[-1]
            
        if self.loss == self.cross_entropy :
            t = (y, self.activations[-1] , self.hidden_layers[-1])

        
        loss_change_wrt_output = self.calculate_derivative(t,self.loss)

        outputs = maths.matrix([float(self.calculate_derivative(x,self.output)) for x in self.hidden_layers[-1]]).reshape(loss_change_wrt_output.shape)
        delta = maths.multiply(loss_change_wrt_output , outputs)   # change in loss wrt output layer
        deltas.append(delta)

        grad_weights = []
        grad_biases = []

        for l in range(self.number_of_hidden_layers , -1, -1 ): # it is actually running from Output layer to the first hidden layer. Due to indexing convention, loop starts from number_of_hidden_layers and goes till 0
            grad_weight = deltas[0] @ self.activations[l].T
            grad_bias = deltas[0]

            
            info_passed_to_weights = self.weights[l].T @ deltas[0]   # information from present layer passed on to the weights connecting present and previous layers 
            if maths.any(maths.isinf(info_passed_to_weights)) :
                print(deltas[0])
                print(self.weights[l])
                raise ValueError("Infinite values encountered while passing information to weights")
            
            if l > 0 : layer = self.hidden_layers[l-1]
            else : layer = self.activations[0]

            change_in_neurons = maths.matrix([self.calculate_derivative(float(i),self.hidden) for i in layer ]).reshape(-1,1)  # change of neurons of the previous layer

            
            delta =  maths.multiply(info_passed_to_weights , change_in_neurons ).reshape(-1,1)      # changes in loss wrt the previous layer's neurons
            if maths.any(maths.isnan(delta)) :
                raise ValueError("NaN values encountered in delta")
            deltas.insert(0,delta)

            grad_weights.insert(0,grad_weight)
            grad_biases.insert(0,grad_bias)
    
        return grad_weights, grad_biases


    def has_converged(self, prev_weights, prev_bias) :
        self._converged = (maths.linalg.norm(maths.matrix(self.weights[0]) - maths.matrix(prev_weights[0])) < self.epsilon) and (maths.linalg.norm(maths.matrix(self.bias[0]) - maths.matrix(prev_bias[0])) < self.epsilon)


    def update_weights(self):
        
        prev_weights = [w + 1 for w in self.weights]
        prev_bias = [b + 1 for b in self.bias]
        
        self._converged = False
        
        while not self._converged:
            prev_weights = self.weights.copy()
            prev_bias = self.bias.copy()

            number_of_batches = len(self.df_data) // self.batch_size

            for M in range(number_of_batches) :
                batch_grad_weights = []
                batch_grad_bias = []
                for m in range(self.batch_size) :
                    if min(M*self.batch_size + m , self.N) != self.N :
                        x = maths.matrix(self.df_data.iloc[M*self.batch_size + m][:-1]).reshape(-1,1)
                        y = self.df_data.iloc[M*self.batch_size + m][-1]
                        self.forward_propagation(x)
                        grad_weights, grad_bias = self.backward_propagation(x,y)
                        batch_grad_weights.append(grad_weights)
                        batch_grad_bias.append(grad_bias)
                
                self.sum_of_grad_bias = batch_grad_bias[0].copy()
                self.sum_of_grad_weights = batch_grad_weights[0].copy()

                
                for i in range(1,len(batch_grad_bias)):
                    for j in range(len(batch_grad_bias[i])):
                        self.sum_of_grad_bias[j] = self.sum_of_grad_bias[j] + batch_grad_bias[i][j]

                for i in range(1,len(batch_grad_weights)):
                    for j in range(len(batch_grad_weights[i])):
                        self.sum_of_grad_weights[j] = self.sum_of_grad_weights[j] + batch_grad_weights[i][j]
                        
                
                for l in range(self.number_of_hidden_layers + 1) :
                    self.weights[l] = self.weights[l] - float(self.alpha/self.batch_size) * self.sum_of_grad_weights[l]
                    self.bias[l] = self.bias[l] - float(self.alpha/self.batch_size) * self.sum_of_grad_bias[l]
                        
            
            self.has_converged(prev_weights, prev_bias)

In [137]:
neurons_in_each_layer = [2,4]
epsilon = 1e-3
alpha = 0.4
hidden_function = 'relu'
output_function = 'identity'
initialise_through = 'xavier_normal'


ann = neural_network(df_data ,
                     number_of_neurons = neurons_in_each_layer,
                     epsilon = epsilon,
                     alpha = alpha,
                     hidden_function = hidden_function,
                     output_function = output_function,
                     initialisation_technique = initialise_through, 
                     batch_size = 1
                    )

ann.initialisation()
ann.update_weights()

for i in range(len(neurons_in_each_layer)):
    print("LAYER",i+1)
    print("Weights")
    print(ann.weights[i])
    print("Bias")
    print(ann.bias[i].T)
    print()

[[-inf]
 [-inf]
 [ inf]
 [ inf]]
[[-4.36741620e+162 -2.88960446e+157]
 [-7.87552844e+161 -5.21066943e+156]
 [ 2.25457414e+162  1.49168918e+157]
 [ 1.72867921e+162  1.14374241e+157]]


ValueError: Infinite values encountered while passing information to weights

In [100]:
ann.activations

[matrix([[22.005],
         [10.409]]),
 matrix([[3.77041072e+95],
         [0.00000000e+00]]),
 matrix([[-1.69933318e+191],
         [ 8.89120575e+190]])]