# Artificial Neural Network

Felix Limanta (13515065), Holy Lovenia (13515113), Agus Gunawan (13515143)

## Implementation of ANN

Artificial neural network (ANN) is made of interconnected layers of nodes, which is similar to the structures and functions of neurons in human brain.

Normally, connections between the nodes (neurons) are called as edges. These edges are associated with weights, which will adjust themselves during learning.

![Artificial neural network](https://icdn5.digitaltrends.com/image/artificial_neural_network_1-791x388.jpg)

Nodes are typically grouped into specific layers. Different layers perform different transformations on their inputs. The first layer is regarded as input layer, while the last is output layer. Output layer is used to represent the final outputs as their corresponding predicted classes. Between input and output layer, hidden layers may be present to process the inputs by applying an activation function and produce results according to the needs of output layer.

The disconnected nodes in the network are called as bias nodes, which are useful to shift the activation function to the desired direction. Below is an example of network with the presence of bias nodes.

![Network with bias](http://documentation.statsoft.com/STATISTICAHelp/SANN/Images/mlpdiagram.jpg)

In [1]:
from random import random, randint
from sklearn.metrics import confusion_matrix, mean_squared_error
from sklearn.model_selection import train_test_split
from tqdm import tqdm, tnrange

import math
import numpy as np
import pandas as pd

### Layer implementation

In this step, we implement `LunakDense` class to represent the hidden layer and output layer, with parameters as listed below.

1. `units`: `int`, the number of nodes in the layer

2. `activation`: `'sigmoid'`, activation function

3. `input_dim`: `int`, dimension of the input (e.g. 2D, 3D, ...)

4. `init`: `'uniform', 'random'`, type of distribution for the initial weight matrix

5. `use_bias`: `boolean`, whether there will be bias node present or not (default=`False`)

6. `seed`: `int`, the number of random seed (default=`None`)

**Propagation function**

It computes input $p_j(t)$ to neuron $j$ from the outputs $o_i(t)$ of predecessor neurons and bias $w_{0j}$

![Sigma](https://wikimedia.org/api/rest_v1/media/math/render/svg/53a6369d6948c2a582469ed48def95b151953e9d)

In [2]:
class LunakDense:
    def __init__(self, units, activation, input_dim, init, use_bias=False, seed=None):
        self.units = units
        self.input_dim = input_dim
        
        if activation == 'sigmoid':
            self.activation_function = self.sigmoid
        else:
            print('Activation function not supported')
        
        np.random.seed(seed)
        
        if init == 'uniform':
            self.weight_matrix = np.random.uniform(-0.05, 0.05, size=(self.units, input_dim)) 
        elif init == 'random':
            self.weight_matrix = np.random.random(size=(self.units, input_dim))
        else:
            print('Init function not supported')
        
        self.delta_weight_matrix_before = np.zeros((self.units, input_dim))
        self.delta_weight_matrix = np.zeros((self.units, input_dim))
        
        self.use_bias = use_bias
        if self.use_bias:
            bias = np.zeros((units, 1))
            self.weight_matrix = np.hstack((self.weight_matrix, bias))
            self.delta_weight_matrix_before = np.hstack((self.delta_weight_matrix_before, np.zeros((units, 1))))
            self.delta_weight_matrix = np.hstack((self.delta_weight_matrix, np.zeros((units, 1))))
            
    def calculate_sigma(self, input_list):
        if self.use_bias:
            input_list = np.append(input_list, 1)
        
        result_list = np.array([])
        for weight_neuron in self.weight_matrix:
            result_list = np.append(result_list, np.dot(weight_neuron, input_list))
        return np.array(result_list)
    
    def calculate_output(self, input_list):
        output_list = np.array([])
        for sigma_neuron in self.calculate_sigma(input_list):
            output_list = np.append(output_list, self.activation_function(sigma_neuron))
        self.output_list = output_list
        return output_list.copy()
    
    def calculate_local_gradient_output_layer(self, target_list):
        """
        Use this if the layer is output layer
        """
        result_list = np.array([])
        for index, output in enumerate(self.output_list):
            local_gradient = output * (1 - output) * (target_list[index] - output)
            result_list = np.append(result_list, local_gradient)  
        self.local_gradient = result_list
        return result_list.copy()
    
    def calculate_local_gradient_hidden_layer(self, local_gradient_output_list, output_layer_weight_matrix):
        """
        Use this if the layer is hidden layer
        """
        result_list = np.array([])
        for index, output in enumerate(self.output_list):
            sigma_local_gradient_output = 0
            for unit_number, local_gradient in enumerate(local_gradient_output_list):
                sigma_local_gradient_output += output_layer_weight_matrix[unit_number][index] * local_gradient
            error_hidden = output * (1 - output) * sigma_local_gradient_output
            result_list = np.append(result_list, error_hidden)
        self.local_gradient = result_list
        return result_list.copy()
    
    def update_delta_weight(self, lr, input_list, momentum=None):
        """
        Function to update delta weight
        """
        if self.use_bias:
            input_list = np.append(input_list, 1)
        if momentum == None:
            for j, unit in enumerate(self.weight_matrix): #j  
                for i, source in enumerate(unit): #i
                    delta_weight = lr * self.local_gradient[j] * input_list[i]
                    self.delta_weight_matrix[j][i] = delta_weight.copy()
        else:
            for j, unit in enumerate(self.weight_matrix): #j  
                for i, source in enumerate(unit): #i
                    delta_weight = lr * self.local_gradient[j] * input_list[i] + momentum * self.delta_weight_matrix_before[j][i]
                    
                    # Update Delta Weight
                    self.delta_weight_matrix_before[j][i] = delta_weight.copy()
            
            # Copy Last Update of Weight Matrix Before (Equal to Last Weight Matrix)
            for j, unit in enumerate(self.delta_weight_matrix_before):
                for i, source in enumerate(unit):
                    self.delta_weight_matrix[j][i] = self.delta_weight_matrix_before[j][i].copy()
            
    def update_weight(self):
        """
        Function to update weight
        """
        for j, unit in enumerate(self.delta_weight_matrix_before):
            for i, source in enumerate(unit):
                self.weight_matrix[j][i] += self.delta_weight_matrix[j][i]
    
    def sigmoid(self, x):
        return 1 / (1 + math.exp(-x))

### Model implementation

ANN model is implemented using stochastic gradient descent (SGD) as the learning rule. SGD is known as a strategy for searching through a large or infinite hypothesis space.

ANN typically consists of two steps: feed-forward and backpropagation.

![ANN steps](https://www.researchgate.net/profile/Morteza_Esfandyari/publication/241741756/figure/fig2/AS:298577172680729@1448197753779/Back-propagation-multilayer-ANN-with-one-hidden-layer.png)

Feed-forward is used to predict the given inputs based on the weights the network currently has.

![Feed-forward](https://d18rbf1v22mj88.cloudfront.net/wp-content/uploads/sites/3/2018/03/13094759/neural_networks_fully_connected_layers_gumgum1.gif)

Backpropagation is used to calculate the error in each node and update the weights based on the error.

![Backpropagation](https://raw.githubusercontent.com/mtoto/mtoto.github.io/master/data/2017-11-08-net/result.gif)

The algorithm used for the feed-forward and backpropagation is based on [Machine Learning book by Tom Mitchell](https://www.cs.ubbcluj.ro/~gabis/ml/ml-books/McGrawHill%20-%20Machine%20Learning%20-Tom%20Mitchell.pdf) on page 98.

Parameters used for training (`fit` method).

1. `X`: `data`, training data

2. `y`: `data`, labels for training data

3. `epochs`: `int`, the number of epochs that will be run

4. `lr`: `float`, learning rate

5. `momentum`: `float`, momentum (used to prevent local minima)

6. `batch_size`: `int`, incremental when 1 (default=`len(X)`)

7. `val_data`: `(X_val, y_val)`, for validation purposes (default=`None`, will use `val_size`=0.1)

8. `val_size`: `float`, used to split X and y to get validation data (default=0)

In [3]:
class LunakArtificialNeuralNetwork:
    def __init__(self, loss='root_mean_squared', optimizer='sgd'):
        assert loss == 'root_mean_squared', 'loss function not supported'
        assert optimizer == 'sgd', 'optimizer not supported'
        self.layers = []
    
    def add(self, layer):
        self.layers.append(layer)
        
    def feed_forward(self, X_instance):
        # Calculate output with the first hidden layer
        output_list = self.layers[0].calculate_output(X_instance)
        # Calculate output with the second until the last layer
        for layer in self.layers[1:]:
            next_output_list = layer.calculate_output(output_list)
            output_list = next_output_list
        return output_list.copy()
            
    def backpropagation(self, y_instance):
        # Calculate local gradient for output layer
        next_local_gradient_list = self.layers[-1].calculate_local_gradient_output_layer([y_instance])
        next_layer_weight_matrix = self.layers[-1].weight_matrix

        # Calculate local gradient for hidden layer(s)
        for layer_idx, layer in enumerate(reversed(self.layers[0:-1])):
            next_local_gradient_list = layer.calculate_local_gradient_hidden_layer(next_local_gradient_list, next_layer_weight_matrix)
            next_layer_weight_matrix = layer.weight_matrix
            
    def calculate_delta_weight(self, X_instance, lr, momentum):
        # Update delta weight for first hidden layer
        self.layers[0].update_delta_weight(lr, X_instance, momentum)
        
        # Update delta weight for other layers
        for layer_idx, layer in enumerate(self.layers[1:]):
            layer.update_delta_weight(lr, self.layers[layer_idx].output_list, momentum)
    
    def fit(self, X, y, epochs, lr, momentum=None, batch_size=None, val_data=None, val_size=0):
        assert X.shape[1] == self.layers[0].input_dim, 'Input dimension must be same with the column'
        self.classes_ = np.unique(y)
        
        if batch_size == None:
            batch_size = len(X)
            
        if val_data is None:
            val_size = 0.1
            X, X_val, y, y_val = train_test_split(X, y, test_size=val_size)
        else:
            X_val = val_data[0]
            y_val = val_data[1]
            
        print('Train on {} samples, validate on {} samples'.format(len(X), len(X_val)))
        
        if val_data is not None and val_size != 0:
            print('Validation data will be used instead of val_size.')
            
        for epoch in range(epochs):
            delta = batch_size
            
            with tnrange(0, len(X), delta, desc='Epoch {}'.format(epoch + 1)) as pbar:
                for start in pbar:
                    X_batch = X[start:start+delta]
                    y_batch = y[start:start+delta]

                    for idx, X_instance in enumerate(X_batch):
                        self.feed_forward(X_instance)
                        self.backpropagation(y_batch[idx][0])
                        self.calculate_delta_weight(X_instance, lr, momentum)

                    for layer in self.layers:
                        layer.update_weight()

                    pred = self.predict(X)
                    pred_val = self.predict(X_val)
                    
                    pred_proba = self.predict_proba(X)
                    pred_proba_val = self.predict_proba(X_val)

                    acc = self.calculate_accuracy(y, pred)
                    val_acc = self.calculate_accuracy(y_val, pred_val)
                    loss = mean_squared_error(y, pred_proba)
                    val_loss = mean_squared_error(y_val, pred_proba_val)

                    postfix = {
                        'loss': loss,
                        'acc': acc,
                        'val_loss': val_loss,
                        'val_acc': val_acc
                    }
                    pbar.set_postfix(postfix, refresh=True)
    
    def predict_proba(self, X):
        predictions = []
        for idx, X_instance in enumerate(X):
            X_pred = self.feed_forward(X_instance)
            predictions.append([np.mean(X_pred.copy())])
        return predictions
    
    def predict(self, X):
        predictions = []
        for idx, X_instance in enumerate(X):
            X_pred_proba = self.feed_forward(X_instance)
            X_pred = min(self.classes_, key=lambda pred_class:abs(pred_class - np.mean(X_pred_proba)))
            predictions.append([X_pred])
        return predictions
    
    def calculate_accuracy(self, y_true, y_pred):
        if len(confusion_matrix(y_true, y_pred).ravel()) > 1:
            tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        else:
            tp = confusion_matrix(y_true, y_pred).ravel()[0]
            fp = 0
            fn = 0
            tn = 0
        return (tp + tn) / (tp + tn + fp + fn)

## Data Preparation

In [4]:
from scipy.io.arff import loadarff
import pandas as pd

### Read weather data

Dataset is obtained from [Weka Data Sets](http://storm.cis.fordham.edu/~gweiss/data-mining/weka-data/weather.arff).

In [5]:
raw_data = loadarff('dataset/weather.arff')

In [6]:
data = pd.DataFrame(raw_data[0])

In [7]:
data.head()

Unnamed: 0,outlook,temperature,humidity,windy,play
0,b'sunny',85.0,85.0,b'FALSE',b'no'
1,b'sunny',80.0,90.0,b'TRUE',b'no'
2,b'overcast',83.0,86.0,b'FALSE',b'yes'
3,b'rainy',70.0,96.0,b'FALSE',b'yes'
4,b'rainy',68.0,80.0,b'FALSE',b'yes'


### Preprocessing

As we can see, preprocessing is needed because the current data types are not appropriate to be fed to the ANN.

In [8]:
def convert_to_binary_vector(data):
    return pd.get_dummies(data)

First, decode the string and boolean data as UTF-8.

In [9]:
for idx, column in enumerate(['outlook', 'windy', 'play']):
    data[column] = data[column].str.decode('utf-8')

Then, the string data (`outlook` and `windy`) are converted to binary vectors.

In [10]:
bv_outlook = convert_to_binary_vector(data['outlook'])
bv_outlook.head()

Unnamed: 0,overcast,rainy,sunny
0,0,0,1
1,0,0,1
2,1,0,0
3,0,1,0
4,0,1,0


In [11]:
bv_windy = convert_to_binary_vector(data['windy'])
bv_windy.head()

Unnamed: 0,FALSE,TRUE
0,1,0
1,0,1
2,1,0
3,1,0
4,1,0


Drop the former `outlook` and `windy` data.

In [12]:
preproc_data = data.drop('outlook', 1).drop('windy', 1)

Process `play` data as categorical.

In [13]:
preproc_data['play'] = preproc_data['play'].astype('category')

Concatenate the data with the binary vectors.

In [14]:
preprocessed_data = pd.concat([bv_outlook, bv_windy, preproc_data], axis=1)
preprocessed_data.head()

Unnamed: 0,overcast,rainy,sunny,FALSE,TRUE,temperature,humidity,play
0,0,0,1,1,0,85.0,85.0,no
1,0,0,1,0,1,80.0,90.0,no
2,1,0,0,1,0,83.0,86.0,yes
3,0,1,0,1,0,70.0,96.0,yes
4,0,1,0,1,0,68.0,80.0,yes


Convert `play` data as numerical.

In [15]:
y = pd.DataFrame({'play': preprocessed_data['play'].cat.codes})
y.head()

Unnamed: 0,play
0,0
1,0
2,1
3,1
4,1


Delete label data from `X`, and process `X` as float data.

In [16]:
X = preprocessed_data.drop('play', 1)

In [17]:
for column in X.columns:
    X[column] = X[column].astype('float')

Normalize real data (`temperature` and `humidity`) to 0-1 range.

In [18]:
for column in ['temperature', 'humidity']:
    X[column] = (X[column] - min(X[column])) / (max(X[column]) - min(X[column]))

In [19]:
X.head()

Unnamed: 0,overcast,rainy,sunny,FALSE,TRUE,temperature,humidity
0,0.0,0.0,1.0,1.0,0.0,1.0,0.645161
1,0.0,0.0,1.0,0.0,1.0,0.761905,0.806452
2,1.0,0.0,0.0,1.0,0.0,0.904762,0.677419
3,0.0,1.0,0.0,1.0,0.0,0.285714,1.0
4,0.0,1.0,0.0,1.0,0.0,0.190476,0.483871


Process `y` (label) as float.

In [20]:
y = y.astype('float')

Turn `X` and `y` to numpy arrays.

In [21]:
X = np.array(X)
y = np.array(y)

Final result of preprocessed `X` and `y`.

In [22]:
X

array([[0.        , 0.        , 1.        , 1.        , 0.        ,
        1.        , 0.64516129],
       [0.        , 0.        , 1.        , 0.        , 1.        ,
        0.76190476, 0.80645161],
       [1.        , 0.        , 0.        , 1.        , 0.        ,
        0.9047619 , 0.67741935],
       [0.        , 1.        , 0.        , 1.        , 0.        ,
        0.28571429, 1.        ],
       [0.        , 1.        , 0.        , 1.        , 0.        ,
        0.19047619, 0.48387097],
       [0.        , 1.        , 0.        , 0.        , 1.        ,
        0.04761905, 0.16129032],
       [1.        , 0.        , 0.        , 0.        , 1.        ,
        0.        , 0.        ],
       [0.        , 0.        , 1.        , 1.        , 0.        ,
        0.38095238, 0.96774194],
       [0.        , 0.        , 1.        , 1.        , 0.        ,
        0.23809524, 0.16129032],
       [0.        , 1.        , 0.        , 1.        , 0.        ,
        0.52380952, 0.4

In [23]:
y

array([[0.],
       [0.],
       [1.],
       [1.],
       [1.],
       [0.],
       [1.],
       [0.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [0.]])

### Hold-out split

Split the data: 90% for training data and 10% for validation data.

In [24]:
from sklearn.model_selection import train_test_split

In [25]:
X, X_val, y, y_val = train_test_split(X, y, test_size=0.1)

## Artificial Neural Network Experiment

Let's try training the data and predicting the labels.

### Lunak

Initialize Lunak ANN.

In [66]:
lunak_ann = LunakArtificialNeuralNetwork()

Add a hidden layer with two units and an output layer with one unit.

In [67]:
lunak_ann.add(LunakDense(2, 'sigmoid', 7, 'uniform', use_bias=True, seed=5))
lunak_ann.add(LunakDense(1, 'sigmoid', 2, 'uniform', use_bias=True, seed=5))

Train Lunak ANN using preprocessed `X` and `y`.

In [68]:
lunak_ann.fit(X, y, epochs=10, momentum=0.001, lr=0.01, batch_size=2, val_data=(X_val, y_val))

Train on 12 samples, validate on 2 samples


HBox(children=(IntProgress(value=0, description='Epoch 1', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 2', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 3', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 4', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 5', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 6', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 7', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 8', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 9', max=6, style=ProgressStyle(description_width='initi…




HBox(children=(IntProgress(value=0, description='Epoch 10', max=6, style=ProgressStyle(description_width='init…




It yields prediction probabilities as below.

In [69]:
pred_proba = lunak_ann.predict_proba(X)

In [70]:
pred_proba

[[0.5100260265353183],
 [0.5098863792792119],
 [0.5099781754308265],
 [0.5098956803318473],
 [0.5101922973441301],
 [0.5098273720026467],
 [0.5098801583917586],
 [0.510178020640108],
 [0.5098564077998636],
 [0.5102136998191488],
 [0.509962999390932],
 [0.5101034248789686]]

The predicted classes as below.

In [71]:
predictions = lunak_ann.predict(X)

In [72]:
predictions

[[1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0],
 [1.0]]

### Keras

In [73]:
from keras.models import Sequential
from keras.layers import Dense
from keras.initializers import RandomUniform
import keras
import pandas as pd
import tensorflow as tf

Let's initialize ANN using Keras.

In [74]:
keras_ann = Sequential()

Add random uniform initializer for the initial weight matrix.

In [75]:
initializer = RandomUniform(minval=-0.05, maxval=0.05, seed=5)

Add a hidden layer with 2 units and an output layer with 1 unit.

In [76]:
keras_ann.add(Dense(2, activation='sigmoid', input_dim=7, use_bias=True, kernel_initializer=initializer))
keras_ann.add(Dense(1, activation='sigmoid', use_bias=True, kernel_initializer=initializer))

Add stochastic gradient descent as optimizer.

In [77]:
optimizer_ = keras.optimizers.SGD(momentum=0.001, lr=0.01)

In [78]:
keras_ann.compile(loss='mean_squared_error', optimizer=optimizer_, metrics=['accuracy'])

Train the data using Keras.

In [79]:
keras_ann.fit(X, y, epochs=10, batch_size=2, validation_data=(X_val, y_val))

Train on 12 samples, validate on 2 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f9988257518>

Predict using Keras.

In [80]:
keras_ann.predict(X)

array([[0.51846087],
       [0.51847076],
       [0.5184803 ],
       [0.5184764 ],
       [0.5183501 ],
       [0.5185053 ],
       [0.5184343 ],
       [0.51839614],
       [0.51846385],
       [0.51843315],
       [0.51841855],
       [0.5183843 ]], dtype=float32)

## Result Analysis

Fel semangat ya

## Responsibilities

**Felix Limanta (13515065)**: Layer and model implementation, debugging, Keras exploration, writing report

**Holy Lovenia (13515113)**: Layer and model implementation, debugging, Keras exploration, writing report

**Agus Gunawan (13515143)**: Layer and model implementation, debugging, Keras exploration, writing report