# Neural Network as function

In [2]:
# Author: H. Benhabiles
# Version date: January 2022

""" MLP or fully connected neural network with dynamic Structure developped from scratch
This code example is provided in the frame of the Advanced Machine Learning JUNIA-M1 course.
It demonstrates how to build an MLP and code it from scratch. The code permits to solve logic functions but
can be easily adapted to sovle other non linear functions. It offers the possibility to build different MLP structures by
changing dynamically the depth of the MLP (number of hidden layers) and the size of each layer (number of neurones).
It offers implementation of several activation functions, optimizers and kernel weights initializers.
"""

import numpy as np
import math

def sigmoid(x):
    return 1/(1+math.exp(-x))

def relu(x):
    if x <0:
        return 0
    else:
        return 1

def tanh(x):
    return math.tanh(x)

sigmoid_v = np.vectorize(sigmoid)
relu_v = np.vectorize(relu)
tanh_v = np.vectorize(tanh)
sqrt_v = np.vectorize(math.sqrt)
log_v = np.vectorize(math.log)

def activation_fct (x, name):
    if name == 'sigmoid':
        return sigmoid_v(x)
    elif name == 'relu':
        return relu_v(x)
    elif name == 'tanh':
        return tanh_v(x)

def der_activation_fct (x, name):
    if name == 'sigmoid':
        return x*(1-x)
    elif name == 'relu':
        return relu_v(x) #np.full(x.shape,1)
    elif name == 'tanh':
        return 1-(x*x)

def entropy_fct(c,o, eps=1e-8):
    loss = 0
    for i in range (c.shape[0]):
        if c[i] == 0:
            loss = loss + abs(math.log((1-o[i]) + eps)) # add epsilon to avoid undefined values
        else:
            loss = loss + abs(math.log(o[i] + eps))

    return loss / c.shape[0]

# change the order of appearance of the 4 examples
def toshuffle(X,C, shuffle):
    if shuffle:
        # shuffle X and C same way
        randomize = np.arange(len(C))
        np.random.shuffle(randomize)
        X = X[randomize]
        C = C[randomize]

    return X,C

def model_setinput(inputdim, dense, activation, initializer):
    model = []
    # first hidden layer (weights and baias)
    #W = np.random.uniform(-w_range,w_range,[inputdim,dense])
    if initializer == 'normal':
        W = np.random.normal(0,1,[inputdim,dense])
        B = np.zeros([dense,1])
    elif initializer == 'constant':
        W = np.full((inputdim,dense), 0.05)
        B = np.full((dense,1),0.05)
    elif initializer == 'uniform':
        W = np.random.uniform(-0.05,0.05,[inputdim,dense])
        B = np.random.uniform(-0.05,0.05,[dense,1])
    #elif initializer == 'glorot': #### to do
    # associate weights to the dense layer
    dense_weight =[]
    dense_weight.append(W)
    dense_weight.append(B)
    dense_weight.append(activation)
    model.append(dense_weight)
    return model

def model_addlayer(model, dense, activation, initializer):
    dense_previous = model[-1][1].shape[0] # size of length of last hidden layer
    #hidden layer (weights and baias)
    #W = np.random.uniform(-w_range,w_range,[dense_previous,dense])
    if initializer == 'normal':
        W = np.random.normal(0,1,[dense_previous,dense])
        B = np.zeros([dense,1])
    elif initializer == 'constant':
        W = np.full((dense_previous,dense),0.5)
        B = np.full((dense,1),0.5)
    elif initializer == 'uniform':
        W = np.random.uniform(-0.5,0.5,[dense_previous,dense])
        B = np.random.uniform(-0.5,0.5,[dense,1])
    # associate weights to the dense layer
    dense_weight =[]
    dense_weight.append(W)
    dense_weight.append(B)
    dense_weight.append(activation)
    model.append(dense_weight)
    return model

def model_sgd(model, gradient_list, activation_list, lr, x):
    # need to update first layer of weights alone since linked to input data.
    gradient     = gradient_list[0].T*x.T
    model[0][0]  = model[0][0]  - lr*gradient  # weights
    model[0][1] = model[0][1] - lr*gradient_list[0] # baias
    for layer_index in range (1,len(model)):
        gradient     = gradient_list[layer_index].T*activation_list[layer_index-1]
        model[layer_index][0]  = model[layer_index][0]  - lr*gradient # weights
        model[layer_index][1] = model[layer_index][1] - lr*gradient_list[layer_index] # baias
    return model

def model_adam(model, gradient_list, m, v, lr):
    beta_1 = 0.9
    beta_2 = 0.999
    epsilon = 1/pow(10,8)
    if not m and not v:
        # define m and v with same shape as gradient_list and set their units to 0
        for i in range (len(gradient_list)):
            m.append(np.zeros(gradient_list[i].shape))
            v.append(np.zeros(gradient_list[i].shape))
    # calculate moments and update weights
    for layer_index in range (len(gradient_list)):
        m[layer_index] = beta_1*m[layer_index] + (1-beta_1)*gradient_list[layer_index]
        v[layer_index] = beta_2*v[layer_index] + ((1-beta_2)*gradient_list[layer_index]*gradient_list[layer_index])
        adam_gradient = (lr*m[layer_index])/(sqrt_v(v[layer_index])+epsilon)
        model[layer_index][0] = model[layer_index][0] - adam_gradient.T # weights
        model[layer_index][1] = model[layer_index][1] - adam_gradient # baias
    return model, m, v

def model_train(name, model, X,C, optimizer, lr, loss_fct, num_epochs, shuffle):
    solved = False
    #O = np.empty(C.shape[0])
    # set initially to positive infinity, we suppose that the best error at this stage is too high
    best_error = float('inf')
    if optimizer == 'adam':
        m = []
        v = []
    for epochs in range(num_epochs):
        # We may shuffle the data if required
        X,C = toshuffle(X,C,shuffle)
        for example_index in range (len(C)):
            x = X[example_index].reshape(1,2)
            # setup activation and gradient lists.
            activation_list = []
            gradient_list = []
            ######## propagation #############
            W = model[0][0]
            B = model[0][1]
            transfer = x.dot(W).T+B # first hidden layer
            activation =  activation_fct(transfer, model[0][2])
            activation_list.append(activation)
            for layer_index in range (1,len(model)): # forward pass
                # calculate activation function
                W = model[layer_index][0]
                B = model[layer_index][1]
                transfer = activation_list[-1].T.dot(W).T+B # next hidden layer
                activation = activation_fct(transfer, model[layer_index][2])
                activation_list.append(activation)
            output = activation_list[-1]
            if loss_fct == 'mse':
                loss  = abs(C[example_index].reshape(1,1) - output)
            elif loss_fct == 'entropy':
                if C[example_index] == 0:
                    loss = abs(math.log(1-output+0.00000001))
                else:
                    loss = abs(math.log(output+0.00000001))
            gradient_output  = der_activation_fct(output, model[-1][2]).dot(loss)
            gradient_list.append(gradient_output)
            ######## backpropagation #############
            for layer_index in range (len(model)-1, 0, -1):
                W = model[layer_index][0]
                gradient = der_activation_fct(activation_list[layer_index-1], model[layer_index-1][2])*(W.dot(gradient_list[-1]))
                gradient_list.append(gradient)
            ######## update weights #############
            # gradient_list.reverse() to align with layers orders
            gradient_list.reverse()
            if optimizer == 'sgd':
                model = model_sgd(model, gradient_list, activation_list, lr, x)
            elif optimizer == 'adam':
                model, m, v = model_adam(model, gradient_list, m, v, lr)
        # prepare saving the best model
        O = model_predict(model, X, False)
        if loss_fct == 'mse':
            # calculate MSE the Mean Squared Error
            error = np.average(pow((C-O),2))
        elif loss_fct == 'entropy':
            error = entropy_fct(C,O)
        if error < best_error:
            best_error = error
            # save model (weights and baias)
            best_model = model
        if error > 1:
            break
        print("epoch, ", loss_fct, ": ", epochs, error)
        if np.average(pow((C-np.round(O)),2)) == 0:
            print(name, "solved!")
            print(epochs+1, " epoch(s)")
            print ("C,O ", C, O)
            solved = True
            break
    return solved, best_model

def model_predict(model, X, toround):
    O = np.empty(X.shape[0])
    for example_index in range (len(X)):
        x = X[example_index].reshape(1,2)
        # setup activation
        activation_list = []
        ######## propagation #############
        W = model[0][0]
        B = model[0][1]
        transfer = x.dot(W).T+B # first hidden layer
        activation =  activation_fct(transfer, model[0][2])
        activation_list.append(activation)
        for layer_index in range (1,len(model)):
            # calculate activation function
            W = model[layer_index][0]
            B = model[layer_index][1]
            transfer = activation_list[-1].T.dot(W).T+B # second hidden layer
            activation = activation_fct(transfer, model[layer_index][2])
            activation_list.append(activation)
        O[example_index] = activation_list[-1]
    if (toround):
        return np.round(O)
    else:
        return O

In [3]:
############# call MLP train function #################
solved = False
run = 0
X = np.array(([0,0],[0,1],[1,0],[1,1]), dtype=int)#inputs
C = np.array((0,1,1,0),dtype=int)#resultat d'un XOR
lr = 0.1
num_epochs = 50
max_run = 25

i = 0
while (True): # max_run < 25
    i += 1
    print ("====Run {}====".format(max_run))
    # Build a new model
    model = model_setinput(2, 3, 'relu', initializer='normal')
    model = model_addlayer(model, 4,'relu', initializer='normal')
    model = model_addlayer(model, 1,'sigmoid', initializer='normal')
    solved, best_model = model_train("XOR", model, X,C,'sgd',lr, 'mse', num_epochs, shuffle=True)
    #print(model_predict(best_model, X, False))
    #print(model_predict(best_model, X, True) )
    if solved:
        print ("solved :) ---- Congratulation")
        break

    if i == max_run:
        print('Maximum iterations reached, failed to converge !')
        break

====Run 25====


  O[example_index] = activation_list[-1]


epoch,  mse :  0 0.18287103757074152
epoch,  mse :  1 0.22355019277224406
epoch,  mse :  2 0.3385729937788702
epoch,  mse :  3 0.3446458372856569
epoch,  mse :  4 0.35048112131262776
epoch,  mse :  5 0.35603251433605276
epoch,  mse :  6 0.36132649880552503
epoch,  mse :  7 0.36633852717677456
epoch,  mse :  8 0.3711136965072329
epoch,  mse :  9 0.3756430715003496
epoch,  mse :  10 0.37993696925449705
epoch,  mse :  11 0.38400708907368675
epoch,  mse :  12 0.3878650849588289
epoch,  mse :  13 0.39152700356666204
epoch,  mse :  14 0.394995155110473
epoch,  mse :  15 0.398281866071834
epoch,  mse :  16 0.4014047665804128
epoch,  mse :  17 0.40437027797559555
epoch,  mse :  18 0.4071903844691407
epoch,  mse :  19 0.4456992497096297
epoch,  mse :  20 0.4472331583872894
epoch,  mse :  21 0.448687356956879
epoch,  mse :  22 0.45006820147399473
epoch,  mse :  23 0.45138035851273284
epoch,  mse :  24 0.45262991337072755
epoch,  mse :  25 0.4538190605319598
epoch,  mse :  26 0.45495303889325883


# Neural Network as classes

In [4]:
import numpy as np

learning_rate = 0.001

def relu(x):
    return np.max((x, 0))

class Neuron:
    def __init__(self, n_inputs, activation=None):

        # weights and bias
        self.W = (np.random.rand(n_inputs + 1) * 2) - 1   # [-1, +1]

        # re-initialize bias
        self.W[-1] = np.random.rand(1) - 0.5   # [-0.5, +0.5]

        self.activation = activation
        self.inputs = None
        self.out = None
        self.gradient = np.zeros(n_inputs + 1)   # gradients of weights and bias

    def forward(self, X):
        X = np.concatenate((X, np.array((1,))))   # we add 1 as input for the bias
        self.inputs = X
        o = np.sum([w * x for w, x in zip(self.W, X)])
        self.out = o

        if self.activation is not None:
            o = self.activation(o)

        return o

    def backward(self, next_layer, loss, idx):
        if loss is not None: # this is last layer
            for i in range(len(self.W)):
                if self.activation is None:
                    self.gradient[i] = loss * self.inputs[i]
                else:
                    self.gradient[i] = loss * self.inputs[i] if self.out > 0 else 0

                self.W[i] = self.W[i] - learning_rate * self.gradient[i]
        else:
            for i in range(len(self.W)):
                grads = 0
                for neuron in next_layer.neurons:
                    if self.activation is None:
                        grads += neuron.W[idx] * neuron.W[idx].gradient * self.inputs[i]
                    else:
                        grads += self.out * neuron.W[idx] * neuron.gradient[idx] * self.inputs[i] if self.out > 0 else 0

                # print(grads)
                self.gradient[i] = grads
                self.W[i] = self.W[i] - learning_rate * self.gradient[i]

class Layer:
    def __init__(self, n_inputs, n_neurons, activation=None):
        self.neurons = [Neuron(n_inputs, activation) for i in range(n_neurons)]

    def forward(self, X):
        o = [neuron.forward(X) for neuron in self.neurons]
        return o

    def backward(self, next_layer, loss=None):
        for idx, neuron in enumerate(self.neurons):
            neuron.backward(next_layer, loss, idx)

class Network:
    def __init__(self, n_inputs, layers, activations=None):
        # layers is list of numbers of neurons in each layer
        # activations is list of activation functions (same length as layers)

        self.layers = []
        inputs_ = n_inputs
        for i in range(len(layers)):
            if activations is not None:
                self.layers.append(Layer(inputs_, layers[i], activations[i]))
            else:
                self.layers.append(Layer(inputs_, layers[i]))

            inputs_ = layers[i]

    def forward(self, X):
        o = X
        for layer in self.layers:
            o = layer.forward(o)

        return o

    def summary(self):
        for i, layer in enumerate(self.layers):
            print('layer {} - {} neurons'.format(i+1, len(layer.neurons)))

    def backward(self, loss):

        # last layer backprop
        self.layers[-1].backward(next_layer=None, loss=loss)

        next_layer = self.layers[-1]
        for layer in self.layers[:-1][::-1]:
            layer.backward(next_layer=next_layer)
            next_layer = layer

In [5]:
dataset = [
    (0, 0, 0),
    (0, 1, 1),
    (1, 0, 1),
    (1, 1, 0),
]

net = Network(n_inputs=2, layers=[3, 4, 1], activations=[relu, relu, relu])
net.summary()

epochs = 1000
for e in range(epochs):
    for a, b, c in dataset:
        o = net.forward((a, b))
        loss = (c - o[0])
        net.backward(loss)

for a, b, c in dataset:
    o = net.forward((a, b))
    print(a, b, c, round(o[0]))

layer 1 - 3 neurons
layer 2 - 4 neurons
layer 3 - 1 neurons


  self.W[-1] = np.random.rand(1) - 0.5   # [-0.5, +0.5]


0 0 0 0
0 1 1 0
1 0 1 0
1 1 0 0
