**Implement a backpropagation algorithm to train a DNN with at least 2 hidden layers.**

In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from keras.activations import sigmoid
from sklearn.model_selection import train_test_split

Backpropogation

In [None]:
class Neuron(object):
    '''
    params: links: array of weights connected to the neuron from previous layer
            is_input: (boolean) tells whether 'this' neuron is an input neuron or not.
            activation_type: Type of activation function
    '''
    def __init__(self, is_input=False, activation_type='sigmoid'):
        self.activation = 0.0
        self.is_input = is_input
        self.error_term = 0.0 # Delta for each neuron

        if activation_type == 'sigmoid':
            self.activation_fn = sigmoid

    def __str__(self):
        output_str = "<< Activation value: {}, Error term: {} >>".format(self.activation, self.error_term)
        return output_str

    def get_activation(self):
        return self.activation

    def set_error_term(self, value):
        self.error_term = value

    def get_error_term(self):
        return self.error_term

    # prev_inputs: 'x', weights: 'w'. Take dot product of these two.
    def activate_neuron(self, prev_inputs, weights, bias_value):
        # sum_value = np.dot(prev_inputs, self.links)
        if weights is None:
            # input layer. 'prev_inputs' is now the actual activation value (from the input_vector)
            self.activation = prev_inputs
        else:
            sum_value = np.dot(prev_inputs, weights) + bias_value
            activation_val = self.activation_fn(sum_value)
            self.activation = activation_val


class Layer(object):

    def __init__(self, layer_properties, prev_layer_neurons):
        self.num_units = layer_properties['neuron_count']
        self.layer_type = layer_properties['layer_type']
        self.layer_ID = layer_properties['layer_ID']
        self.activation_type = layer_properties['activation_type']
        self.layer_number = layer_properties['layer_number']
        self.prev_layer_neurons = prev_layer_neurons


        # Initialize Neurons in the layer and,
        # Initialize the weight matrix with random weights
        # The weight matrix is weights between 'this' and previous layer

        # For hidden and output layers
        if self.layer_type == 'h' or self.layer_type == 'o':
            self.weight_matrix = np.random.uniform(low=-0.5, high=0.5, size=(self.num_units, prev_layer_neurons))
            self.biases = np.random.uniform(low=-0.5, high=0.5, size=(self.num_units,))
            self.neurons = [Neuron() for i in range(self.num_units)]
        # For input layer
        elif self.layer_type == 'i':
            self.weight_matrix = None
            self.biases = None
            self.neurons = [Neuron(is_input=True, activation_type='sigmoid') for i in range(self.num_units)]

    def get_weights(self):
        return self.weight_matrix

    def get_layer_ID(self):
        return self.layer_ID

    def get_layer_type(self):
        return self.layer_type

    def get_weights_for_neuron(self, index):
        # return weight vector for neuron at index 'index'
        return self.weight_matrix[index]

    def get_bias_value(self, index):
        return self.biases[index]

    def get_neuron_activations(self):
        return np.array([n.get_activation() for n in self.neurons])
        # returns Array of activations. shape = (n,)

    def get_neuron_count(self):
        return len(self.neurons)

    def get_neuron_error_terms(self):
        return np.array([n.get_error_term() for n in self.neurons])

    # This following function 'calculate_error_terms()' works in 2 types:
    # 1) if 'is_output' == True, That means error term calculation will be according to output layer formula. 'resource' is target_output_vector here
    # 2) if 'is_output  == False, That means error term calculation is for hidden layer. 'resource' here is tuple of (weights, error_vec) between 'this' layer and 'next' layer.
    # read the README file for better understanding.
    def calculate_error_terms(self, is_output, resource):
        if is_output == True:
            # 'resource' is target output vector
            for n in range(len(self.neurons)):
                # delta = o * (1 - o) * (t - o)
                error_value = self.neurons[n].get_activation() * (1 - self.neurons[n].get_activation()) * (resource[n] - self.neurons[n].get_activation())
                self.neurons[n].set_error_term(error_value)
        else:
            # 'resource' is now weight matrix!
            (weight_matrix, error_vector) = resource
            for n in range(len(self.neurons)):
                temp_sum = np.dot(weight_matrix.T[n], error_vector)
                error_value = self.neurons[n].get_activation() * (1 - self.neurons[n].get_activation()) * temp_sum
                self.neurons[n].set_error_term(error_value)

    # X is previous layer input, DELTA is vector of error terms
    def update_weights(self, X, DELTA, learning_rate):
        # Call this function only in backward pass.
        if self.layer_type != 'i':
            del_W = np.zeros(shape=self.weight_matrix.shape)
            for i in range(len(X)):
                for j in range(len(del_W)):
                    del_W[j,i] = learning_rate * X[i] * DELTA[j]
            self.weight_matrix = self.weight_matrix + del_W
            '''
            Eliminate these Python double loops!!

            '''


    def print_layer_properties(self):
        print('**************')
        print('Layer Number: {}\nLayer ID: {}\nLayer Type: {}\nNeuron count: {}\nActivation type: {}'.format(
            self.layer_number,
            self.layer_ID,
            self.layer_type,
            self.num_units,
            self.activation_type
        ))
        print('**************')

    def print_layer_neurons(self):
        print('Layer {}'.format(self.layer_number))
        temp = []
        for i in range(self.num_units):
            temp.append(self.neurons[i].get_activation())
        print(temp)
        print('==============================')
        #     print('{})'.format(i+1))
        #     print(self.neurons[i])
        #     print('****************')

class NeuralNetwork(object):

    def __init__(self, network_file_path):
        # Get network settings, from network config file
        with open(network_file_path, "r") as net:
            self.network_properties = json.load(net)

        print("Loaded network file: {}".format(self.network_properties["network_name"]))
        self.layers = [] # Array of layers
        prev_layer_units = None
        for i in range(self.network_properties['n_layers']+1):
            current_layer = self.network_properties['layers'][i]
            self.layers.append(Layer(current_layer, prev_layer_units))
            prev_layer_units = current_layer['neuron_count']
        # Load dataset here.
        df = pd.read_csv(self.network_properties['dataset_path'])
        data = np.array(df)
        data_block = data[:, 1:]
        self.train, self.test = train_test_split(data_block, train_size=0.8, random_state=42)
        trainX = self.train[:, :self.network_properties['input_size']]
        trainY = self.train[:, self.network_properties['input_size']:]
        size = len(self.train)
        self.training_data = (trainX, trainY, size)
        print("Initially layers are:")
        self.print_layers()

    def print_training_data(self):
        print("data: ")
        print(self.training_data)

    def print_layers(self):
        print('printing all the layers:')
        for i in range(self.network_properties['n_layers']+1):
            self.layers[i].print_layer_neurons()
            print('#################')

    def forward_pass(self, input_vector, return_value=False):

        # print("Forward pass: input='{}'".format(input_vector))
        # Input vector should be of same length as the number of neurons in input layer.
        assert len(input_vector)==self.layers[0].get_neuron_count()

        # For every layer after input layer,
        previous_layer_input = input_vector
        for layer in self.layers:
            if layer.get_layer_type() == 'i':
                for n in np.arange(layer.get_neuron_count()):
                    layer.neurons[n].activate_neuron(input_vector[n], None, None)
            else:
                temp = []
                # Calculate the weighted sum for every neuron
                for n in np.arange(layer.get_neuron_count()):
                    # pass the required vectors (x, w) to the activate_neuron() method
                    layer.neurons[n].activate_neuron(previous_layer_input, layer.get_weights_for_neuron(n), layer.get_bias_value(n))
                    # accumulate the current activation values
                    temp.append(layer.neurons[n].get_activation())

                # Update the previous layer input, which will be fed to the next layer
                previous_layer_input = np.array(temp)
                temp.clear()
        if return_value == True:
            return previous_layer_input

    def backward_pass(self, target_output_vector):
        error_vector = None
        weight_matrix = None
        # Calculate Error terms for all layers (end to start):
        for current_layer in self.layers[::-1]:
            if current_layer.get_layer_type() == 'o':
                current_layer.calculate_error_terms(True, target_output_vector)
            elif current_layer.get_layer_type() == 'h':
                current_layer.calculate_error_terms(False, (weight_matrix, error_vector))
            else:
                # If current layer is 'input layer' then stop processing
                break
            error_vector = current_layer.get_neuron_error_terms()
            weight_matrix = current_layer.get_weights()

        # Update every weight now:
        previous_layer_neuron_activations = None
        for layer in self.layers:
            if layer.get_layer_type() != 'i':
                layer.update_weights(previous_layer_neuron_activations, layer.get_neuron_error_terms(), float(self.network_properties['learning_rate']))
            previous_layer_neuron_activations = layer.get_neuron_activations()

    def calculate_error(self):
        pass

    # Train the network using back propagation algorithm
    def train_network(self):
        X_train, y_train, size = self.training_data

        for epoch in range(self.network_properties["epochs"]):
            print("Epoch {}".format(epoch+1))
            for i in tqdm(range(size)):
                # print('Training example {}'.format(i+1))
                # print('train_X: {}\ntrain_y: {}'.format(X_train[i], y_train[i]))
                self.forward_pass(X_train[i])
                self.backward_pass(y_train[i])
                # print("iteration completed. Result:")
                # self.print_layers()


    def predict_answer(self):
        testX, testY = self.test[:, :self.network_properties['input_size']], self.test[:, self.network_properties['input_size']:]
        for i in range(len(self.test)):
            prediction_vector = self.forward_pass(testX[i], return_value=True)
            # print(prediction_vector)
            print("Prediction: {} Actual: {}".format(np.argmax(prediction_vector), np.argmax(testY[i])))

Activation function

In [None]:
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def relu(x):
    return 0 if x <= 0 else x

def tanh(x):
    return np.tanh(x)

def leaky_relu(x):
    return 0.1*x if x <= 0 else x

def threshold(x, t=0):
    return 0 if x <= t else 1

Dataset creator

In [None]:
import numpy as np
import pandas as pd
from os.path import join

def dataset_creator():
    rows = int(input("Enter no. of training examples you want: "))
    train_size = int(input("Enter size of input vector: "))
    test_size = int(input("Enter size of target output vector: "))
    cols = train_size + test_size
    data_block = np.random.choice([0, 1], size=(rows, cols), p=[0.5, 0.5])
    print("Your data block has been created, first {} columns are training examples, last {} columns are target outputs".format(train_size, test_size))
    done = False
    file_name = None
    while done != True:
        file_name = input("Enter name of the file you want to save it as: ")
        if len(file_name) != 0:
            done = True
        else:
            print("Try again, enter valid path!")

    df = pd.DataFrame(data_block)
    df.to_csv(join('Input', file_name+".csv"))
    print("File name {}.csv is saved in 'Input' directory".format(file_name))

if __name__ == "__main__":
    dataset_creator()

Enter no. of training examples you want: 5
Enter size of input vector: 5
Enter size of target output vector: 5
Your data block has been created, first 5 columns are training examples, last 5 columns are target outputs
Enter name of the file you want to save it as: iris


OSError: ignored

Iris dataset making

In [None]:
import pandas as pd
import numpy as np
import os

DATA_DIR = os.path.join(os.getcwd(), 'Input')

df = pd.read_csv(os.path.join(DATA_DIR, "/content/iris_dataset.csv"))

print(df.head())

columns = df.columns

unique_species = df['Species'].unique()

categorical_vectors = {unique_species[0]: [1,0,0], unique_species[1]: [0,1,0], unique_species[2]: [0,0,1]}

print("loading data...")
data = df[['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]

print("Making categorical (One hot) vectors...")
c = []
for i in df['Species'].values:
    c.append(categorical_vectors[i])

c = np.array(c)
print("Creating final data block...")
final_data = np.hstack([data, c])

print("Saving final data block as 'iris_dataset_final.csv' in {}".format(DATA_DIR))
final_iris_data = pd.DataFrame(final_data)
final_iris_data.to_csv(os.path.join(DATA_DIR, "iris_dataset_final.csv"))
print("The dataset is saved as '{}'".format(os.path.join(DATA_DIR, "iris_dataset_final.csv")))

   Id  SepalLengthCm  SepalWidthCm  PetalLengthCm  PetalWidthCm      Species
0   1            5.1           3.5            1.4           0.2  Iris-setosa
1   2            4.9           3.0            1.4           0.2  Iris-setosa
2   3            4.7           3.2            1.3           0.2  Iris-setosa
3   4            4.6           3.1            1.5           0.2  Iris-setosa
4   5            5.0           3.6            1.4           0.2  Iris-setosa
loading data...
Making categorical (One hot) vectors...
Creating final data block...
Saving final data block as 'iris_dataset_final.csv' in /content/Input


OSError: ignored

Runner

In [None]:
import os
import sys
import numpy as np

from BackProp import NeuralNetwork, Layer, Neuron
from dataset_creator import dataset_creator

NETWORK_DIRECTORY = os.path.join(os.getcwd(), 'Network_structures')

print("Welcome to BackProp simulation")
print("The network will be loaded from a JSON file, which you can provide")
print("Some sample testing JSON files are given, refer those to make your own custom network")

network = NeuralNetwork(os.path.join(NETWORK_DIRECTORY, 'network_2.json'))

network.train_network()
network.predict_answer()

ModuleNotFoundError: ignored