In [1]:
# utils
import numpy as np
import json
from xml.dom import minidom
import os
import abc


# mean squared error
def mse(y_true, y_pred, status=''):
    return mse_derivative(y_true, y_pred) if status=='derivative' else np.mean(np.power(y_true-y_pred, 2))


# sigmoid
def sigmoid(input_data, status=''):
    return sigmoid_derivative(input_data) if status =='derivative' else 1/(1 + np.ex(-input_data))


# tg() activation function 
def tanh(input_data, status=''):
    return tanh_derivative(input_data) if status=='derivative' else np.tanh(input_data);


#  relu activation function
def relu(input_data, status=''):
    if status=='derivative':
        return relu_derivative(input_data)
    data = [max(0.05*value, value) for array in input_data for value in array]
    return np.array(data).reshape(input_data.shape)


# derivative of mean squared error
def mse_derivative(y_true, y_pred):
    return 2*(y_pred-y_true)/y_true.size


# derivative of sigmoid
def sigmoid_derivative(input_data):
    return sigmoid(input_data)*(1 - sigmoid(input_data))


# derivative of tag() activation functions
def tanh_derivative(input_data):
    return 1-np.tanh(input_data)**2


# derivative of relu
def relu_derivative(input_data):
    data = [1 if value > 0 else 0.05 for array in input_data for value in array]
    return np.array(data).reshape(input_data.shape)


# calculate gradient
def calculate_gradients(input_data, weights, bias, loss):
    # db==loss
    input_loss = np.dot(loss, weights.T)
    dw = np.dot(input_data.T, loss)

    return input_loss, dw, loss


# get the true value
def get_value(array):
    arr = np.where(array==np.amax(array))
    return int(arr[0]) + 1

In [2]:
class Layer(abc.ABC):
    
    def __init__(self):
        self._input = None
        self._output = None
    
    @abc.abstractmethod
    def forward(self, input_data):
        pass
    
    @abc.abstractmethod
    def backward(self, error, learning_rate):
        pass

In [3]:
class Linear(Layer):
    
    def __init__(self, input_size, output_size):
        self._weights = np.random.rand(input_size, output_size) - 0.5
        self._bias = np.random.rand(1, output_size) - 0.5
        self._m_dw = np.zeros((input_size, output_size))
        self._m_db = np.zeros((1, output_size))
        self._v_dw = np.zeros((input_size, output_size))
        self._v_db = np.zeros((1, output_size))
        
    def forward(self, input_data):
        self._input = input_data
        self._output = np.dot(self._input, self._weights) + self._bias
        
        return self._output

    def backward(self, loss, optimizer):
        self._weights, self._bias, input_loss, self._m_dw, self._m_db, self._v_dw, self._v_db = optimizer.update(self._input, self._weights, self._bias, loss, 
                                                                                                                 self._m_dw, self._m_db, self._v_dw, self._v_db)
        
        return input_loss

In [4]:
class Activation(Layer):
    
    def __init__(self, activation_function):
        self._activation_function = activation_function

    def forward(self, input_data):
        self._input = input_data
        self._output = self._activation_function(self._input)
        
        return self._output

    def backward(self, loss, optimizer):
        return self._activation_function(self._input, 'derivative') * loss

In [5]:
class NeuralNetwork():
    
    def __init__(self):
        self._layers = []
        self._json = {}
        self._root = minidom.Document()
        
        self._xml_results = self._root.createElement('results') 
        self._root.appendChild(self._xml_results)

    def add(self, layer):
        self._layers.append(layer)

    def fit(self, x_train, y_train, epochs, optimizer, loss_func):
        # dictionary for epoch results storing
        d = {}
        
        for epoch in range(epochs):
            loss_display = 0
            for i in range(len(x_train)):
                output = x_train[i]
                
                # forward propagation
                for layer in self._layers:
                    output = layer.forward(output)
                
                # calculating errors (1st to display, 2nd for backpropagataion)
                loss_display += loss_func(y_train[i], output)
                loss = loss_func(y_train[i], output, 'derivative')
                
                # backward propagation
                for layer in reversed(self._layers):
                    loss = layer.backward(loss, optimizer)
            
            loss_epoch = loss_display/len(x_train)
            print('Epoch: {}/{}  Loss: {}'.format(epoch + 1, epochs, loss_epoch))
            
            d['Epoch: {}/{}'.format(epoch + 1, epochs)] = loss_epoch
            self._xml_results.appendChild(self.xml_element('epoch: {}/{}'.format(epoch + 1, epochs), 'loss', str(loss_epoch)))
        
        self._json['Training'] = d
        self._json['Optimizer'] = type(optimizer).__name__
        self._json['Loss Function'] = loss_func.__name__
        
        self._xml_results.appendChild(self.xml_element('optimizer', 'name', str(type(optimizer).__name__)))
        self._xml_results.appendChild(self.xml_element('loss function', 'name', str(loss_func.__name__)))
            
    def predict(self, input_data, true_value):
        predictions = []
        
        # run network over all samples
        for i in range(len(input_data)):
            # forward propagation
            output = input_data[i]
            for layer in self._layers:
                output = layer.forward(output)
                
            output = output.flatten()
            predictions.append(get_value(output)==get_value(true_value[i]))
            
        accuracy = (sum(predictions)/len(predictions))*100
        print('Accuracy: {}'.format(accuracy))
        self._json['Accuracy'] = accuracy
        self._xml_results.appendChild(self.xml_element('accuracy', 'value', str(accuracy)))
        
        return accuracy
    
    def create_json(self, fname):
        json_object = json.dumps(self._json, indent = 4)
        
        with open(fname, "w") as outfile:
            outfile.write(json_object)
    
    def create_xml(self, fname):
        xml_str = self._root.toprettyxml(indent ="\t") 
        
        with open(fname, "w") as f:
            f.write(xml_str) 
    
    def xml_element(self, name_element, name_info, info):
        xml_element = self._root.createElement(name_element)
        xml_element.setAttribute(name_info, info)
        return xml_element

In [6]:
class SGD():
    
    def __init__(self, learning_rate=0.1):
        self._learning_rate = learning_rate
        
    def update(self, input_data, weights, bias, loss, m_dw, m_db, v_dw, v_db):
        input_loss, dw, db = calculate_gradients(input_data, weights, bias, loss)

        weights -= self._learning_rate * dw
        bias -= self._learning_rate * db
        
        return weights, bias, input_loss, m_dw, m_db, v_dw, v_db

In [7]:
class Adam():
    
    def __init__(self, learning_rate=0.01, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self._learning_rate = learning_rate
        self._beta1 = beta1
        self._beta2 = beta2
        self._epsilon = epsilon
        
    def update(self, input_data, weights, bias, loss, m_dw, m_db, v_dw, v_db):
        input_loss, dw, db = calculate_gradients(input_data, weights, bias, loss)
        
        m_dw = self._beta1*m_dw + (1 - self._beta1)*dw
        m_db = self._beta1*m_db + (1 - self._beta1)*db

        v_dw = self._beta2*v_dw + (1 - self._beta2)*(dw**2)
        v_db = self._beta2*v_db + (1 - self._beta2)*(db**2)

        m_dw_corr = m_dw/(1-self._beta1)
        m_db_corr = m_db/(1-self._beta1)
        v_dw_corr = v_dw/(1-self._beta2)
        v_db_corr = v_db/(1-self._beta2)

        weights -= self._learning_rate*(m_dw_corr/(np.sqrt(v_dw_corr) + self._epsilon))
        bias -= self._learning_rate*(m_db_corr/(np.sqrt(v_db_corr) + self._epsilon))
        
        return weights, bias, input_loss, m_dw, m_db, v_dw, v_db

In [28]:
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from keras.utils import np_utils


# load data 
iris = load_iris()
x, y = np.array(iris.data), np.array(iris.target)

# split on training and test data
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)

# preprocess train and test data
x_train = x_train.reshape(x_train.shape[0], 1, 4)
x_train = x_train.astype('float32')
y_train = np_utils.to_categorical(y_train)

x_test = x_test.reshape(x_test.shape[0], 1, 4)
x_test = x_test.astype('float32')
y_test = np_utils.to_categorical(y_test)

# optimizer
sgd = SGD(learning_rate=0.01)
adam = Adam(learning_rate=0.01)

# network
net = NeuralNetwork()
net.add(Linear(4, 20))
net.add(Activation(relu))
net.add(Linear(20, 10))
net.add(Activation(relu))
net.add(Linear(10, 3))
net.add(Activation(relu))

In [29]:
# train
net.fit(x_train, y_train, epochs=100, optimizer=adam, loss_func=mse)

Epoch: 1/100  Loss: 0.18186304938552186
Epoch: 2/100  Loss: 0.05251118388487307
Epoch: 3/100  Loss: 0.043076206663342935
Epoch: 4/100  Loss: 0.02876804423937211
Epoch: 5/100  Loss: 0.025032998570846166
Epoch: 6/100  Loss: 0.02371580694490537
Epoch: 7/100  Loss: 0.02296920560364677
Epoch: 8/100  Loss: 0.022237537983738197
Epoch: 9/100  Loss: 0.021525611737590888
Epoch: 10/100  Loss: 0.020922263255289602
Epoch: 11/100  Loss: 0.02025119987935173
Epoch: 12/100  Loss: 0.019874016040320668
Epoch: 13/100  Loss: 0.019197166589848966
Epoch: 14/100  Loss: 0.019375967236736312
Epoch: 15/100  Loss: 0.01923813551194527
Epoch: 16/100  Loss: 0.01922730466280797
Epoch: 17/100  Loss: 0.019026371930464858
Epoch: 18/100  Loss: 0.01887673327322249
Epoch: 19/100  Loss: 0.01870447854025502
Epoch: 20/100  Loss: 0.0185956130930242
Epoch: 21/100  Loss: 0.018389311442740402
Epoch: 22/100  Loss: 0.018246299526400833
Epoch: 23/100  Loss: 0.018076596412157243
Epoch: 24/100  Loss: 0.017951360158947223
Epoch: 25/100

In [30]:
net.predict(x_test, y_test)

Accuracy: 100.0


100.0

In [31]:
json_name = '1_json'
xml_name = '1_xml'

net.create_json(json_name)
net.create_xml(xml_name)