In [1]:
import numpy as np
import math


def sigmoid(x):
    # applying the sigmoid function
    return 1 / (1 + np.exp(-x))


def sigmoid_derivative(x):
    # computing derivative to the Sigmoid function
    return x * (1 - x)


def relu(x):
    # compute relu
    return np.maximum(0, x)


def relu_derivative(x):
    return np.where(x < 0, 0, 1)


def linear(x):
    return x


def linear_derivative(x):
    return np.full(x.shape, 1)


def softmax(x):
    ret = np.zeros(x.shape)
    for i in range(x.shape[1]):
        ex = np.exp(x[:, i])
        x[:, i] = ex / np.sum(ex)
    return ret


def softmax_derivative(x):
    ret = np.zeros(x.shape);
    for i in range(x.shape[1]):
        j = np.sum(softmax_derivative_util(x[:, i]), axis=1)
        ret[:, i] = j
    return ret


def softmax_derivative_util(x):
    rx = x.reshape(-1, 1)
    return np.diagflat(rx) - np.dot(rx, rx.T)


def sum_of_squared_error(t, o):
    sub = t - o
    return 0.5 * np.sum(sub**2)


def cross_entropy(t, o):
    ret = 0
    for i in range(o.shape[1]):
        j = np.argmax(o[:, i])
        ct = clip_scalar(t[j, i])
        ret += -np.log2(ct)
    return ret


def cross_entropy_derivative(t, o):
    ret = o
    for i in range(o.shape[1]):
        j = np.argmax(o[:, i])
        ret[j, i] = -(1-ret[j, i])
    return ret


clip_upper_threshold = 5
clip_lower_threshold = 0.5
def clip(x):
    ret = x
    norm = np.sum(x * x)
    if norm > clip_upper_threshold ** 2:
        ret = ret * (clip_upper_threshold / np.sqrt(norm))
#     if norm < clip_lower_threshold ** 2:
#         ret = ret * (clip_lower_threshold / np.sqrt(norm))
    return ret


activation_functions = {
    # activation_functions
    "relu": relu,
    "sigmoid": sigmoid,
    "softmax": softmax,
    "linear": linear,
}

activation_functions_derivative = {
    # activation_functions_derivative
    "relu": relu_derivative,
    "sigmoid": sigmoid_derivative,
    "softmax": softmax_derivative,
    "linear": linear_derivative
}

error_functions = {
    # error_functions
    "relu": sum_of_squared_error,
    "sigmoid": sum_of_squared_error,
    "softmax": cross_entropy,
    "linear": sum_of_squared_error,
}

class Layer:
    def __init__(self, activation, input, output):
        self.activation_name = activation
        self.activation = activation_functions[activation]
        self.cost_function = error_functions[activation]
        self.activation_derivative = activation_functions_derivative[activation]
        self.W = np.random.randn(output, input)
        self.b = np.random.randn(output, 1)
        
        self.reset_delta(output)
        self.reset_delta_weight()
        self.reset_delta_bias()
        self.output = np.zeros(output)
        self.net = np.zeros(output)
    
    def set_delta(self, delta):
        self.delta = delta
    
    def set_weight(self, weight):
        self.W = weight

    def set_bias(self, bias):
        self.b = bias

    def add_delta_weight(self, delta):
        self.delta_weight += delta

    def add_delta_bias(self, delta_bias):
        self.delta_bias += delta_bias
    
    def reset_delta(self, output):
        self.delta = np.zeros(output)

    def reset_delta_weight(self):
        self.delta_weight = np.zeros(self.W.shape)

    def reset_delta_bias(self):
        self.delta_bias = np.zeros(self.b.shape)

    def set_output(self, output):
        self.output = output

    def set_net(self, net):
        self.net = net

class NeuralNetwork:
    def __init__(self, learning_rate=0.05, max_iter=500, error_threshold=0.01, batch_size=5, verbose=False):
        # seeding for random number generation
        np.random.seed(1)
        self.layers = []
        self.depth = 0
        self.learning_rate = learning_rate
        self.max_iter = max_iter
        self.batch_size = batch_size
        self.verbose = verbose
        self.error_threshold = error_threshold

    def add(self, layer):
        self.layers.append(layer)
        self.depth += 1

    def set_params(self, **kwargs):
        self.__dict__.update(kwargs)

    def init_layers(self, layer_description):
        for a in layer_description:
            layer = self.Layer(a.activation_type,
                               a.previous_neuron, a.current_neuron)
            self.add(layer)

    def load_model(self, filename):
        f = open(filename, "r")

        self.depth = int(f.readline())

        for i in range(self.depth):

            n_neuron = int(f.readline())
            activation_type = f.readline()[:-1]
            weight = []

            n_neuron_prev = -1
            for j in range(n_neuron):
                temp = list(map(float, f.readline().split()))
                weight.append(temp)
                if (n_neuron_prev == -1):
                    n_neuron_prev = len(temp)

            layer = self.Layer(activation_type, n_neuron_prev, n_neuron)
            layer.set_weight(np.array(weight))
            bias = np.array(
                list(map(lambda x: [float(x)],
                         f.readline().split())))
            layer.set_bias(bias)

            self.layers.append(layer)

    def save_model(self, filename):
        f = open(filename, "w")

        f.write(str(self.depth) + "\n")
        for layer in self.layers:
            n_neuron = len(layer.b)
            f.write(str(n_neuron) + "\n")
            f.write(layer.activation_name + "\n")
            for i in range(n_neuron):
                f.write(" ".join(list(map(str, layer.W[i]))) + "\n")
            f.write(" ".join(list(map(lambda x: str(x[0]), layer.b))) + "\n")

    def forward_propagate(self, x_inputs):
        a = np.array(x_inputs).T
        for layer in self.layers:
            z = np.dot(layer.W, a) + layer.b
            layer.set_net(z)
            a = layer.activation(z)
            layer.set_output(a)
        return a

    def backward_propagate(self, X_train, y_train, prediction):
        grad = {}

        num_layers = len(self.layers)
        
        for i in reversed(range(num_layers)):
            layer = self.layers[i]
            
            # if output layer
            if i == num_layers - 1:
                # use squared error derivative if not softmax
                if self.layers[-1].activation != 'softmax':
                    layer.delta = clip((prediction - y_train) * layer.activation_derivative(layer.net))
                else:
                    dA = prediction
                    layer.delta = cross_entropy_derivative(prediction, y_train) * layer.activation_derivative(layer.net)
            else:
                next_layer = self.layers[i + 1]
                error = np.dot(next_layer.W.T, next_layer.delta)
                layer.delta = clip(error * layer.activation_derivative(layer.net))
            
        for i in range(num_layers):
            layer = self.layers[i]
            input_activation = np.atleast_2d(X_train if i == 0 else self.layers[i - 1].output)
            grad["dW" + str(i)] = clip(np.dot(layer.delta, input_activation.T) * self.learning_rate)
            grad["db" + str(i)] = clip(layer.delta * self.learning_rate)
            
        return grad

    def shuffle(self, x_train, y_train):
        sz = len(y_train)
        ids = [i for i in range(sz)]
        np.random.shuffle(ids)
        ret_x, ret_y = [], []
        for i in ids:
            ret_x.append(list(x_train[i]))
            ret_y.append(list(y_train[i]))
        return (ret_x), (ret_y)

    def split_batch(self, x_train, y_train):
        batches_x = []
        batches_y = []

        length = len(x_train)

        for i in range((length // self.batch_size)):
            x_batch = x_train[i * self.batch_size: (i + 1) * self.batch_size]
            y_batch = y_train[i * self.batch_size: (i + 1) * self.batch_size]
            batches_x.append(np.array(x_batch))
            batches_y.append(np.array(y_batch))
        if length % self.batch_size != 0:
            i = length // self.batch_size
            x_batch = x_train[i * self.batch_size:]
            y_batch = y_train[i * self.batch_size:]
            batches_x.append(np.array(x_batch))
            batches_y.append(np.array(y_batch))

        return (batches_x), (batches_y)

    def fit(self, x_train, y_train):
        for iteration in range(self.max_iter):

            x_train, y_train = self.shuffle(x_train, y_train)

            batches_x, batches_y = self.split_batch(x_train, y_train)

            cost_function = 0
            for i in range(len(batches_x)):
                x_input = batches_x[i]
                y_output = batches_y[i]

                prediction = self.forward_propagate(x_input)
                cost_function += self.layers[-1].cost_function(
                    prediction, y_output.T)
                gradients = self.backward_propagate(
                    x_input.T, y_output.T, prediction)
                
                # update delta phase
                for j, layer in enumerate(self.layers):
                    layer.add_delta_weight(gradients["dW" + str(j)])
                    grad_bias = gradients["db" + str(j)]
                    layer.add_delta_bias(np.sum(grad_bias, axis=1).reshape(len(grad_bias), 1))
                
                # update weights phase
                for j, layer in enumerate(self.layers):
                    layer.W += layer.delta_weight  # gradients["dW" + str(j)]
                    layer.b += layer.delta_bias  # gradients["db" + str(j)]
                
                for j, layer in enumerate(self.layers):
                    layer.reset_delta_weight()
                    layer.reset_delta_bias()
            
            cost_function /= len(x_train)
            if iteration % 50 == 0:
                print(f"Iteration {iteration}: ", cost_function)

            if cost_function < self.error_threshold:
                break

    def predict(self, x_test):
        prediction = self.forward_propagate(x_test)
        return prediction

    def __str__(self):
        index = 1
        res = ""
        for layer in self.layers:
            res += "{}-th layer\n".format(index)
            res += f"Activation: {layer.activation_name}\n"
            res += "Weight matrix:\n"
            res += layer.W.__str__() + "\n"
            res += "Bias\n"
            res += layer.b.__str__() + "\n\n"
            index += 1
        return res

In [2]:
from sklearn.datasets import load_iris
from sklearn.preprocessing import OneHotEncoder

enc = OneHotEncoder(handle_unknown='ignore')

data = load_iris()
X = data.data
y = data.target
y = y.reshape(-1,1)
enc.fit(y)
y = enc.transform(y).toarray()

In [141]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=42)

In [142]:
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

(75, 4)
(75, 4)
(75, 3)
(75, 3)


In [143]:
model = NeuralNetwork(learning_rate=0.001, max_iter=2000, verbose=False)

#input of first layer must match the num of feature in the training data
model.add(Layer("relu", 4, 10))

model.add(Layer("relu", 10, 10))
model.add(Layer("linear", 10, 5))

#output of the layer must match the num of feature in the target classes
model.add(Layer("sigmoid", 5, 3))

# print(model)

model.fit(X_train, y_train)

Iteration 0:  0.4452613371787705
Iteration 50:  0.09023412173680989
Iteration 100:  0.02048628036388502
Iteration 150:  0.049321013558100125
Iteration 200:  0.048293066030894435
Iteration 250:  0.04810742466047692
Iteration 300:  0.02511146602542075
Iteration 350:  0.04721208683808682
Iteration 400:  0.04279774092466287
Iteration 450:  0.02002480909667598
Iteration 500:  0.02515268539908227
Iteration 550:  0.03242371214767572
Iteration 600:  0.045147067438041304
Iteration 650:  0.0309900241390309
Iteration 700:  0.054883093526883954
Iteration 750:  0.015166838629085437
Iteration 800:  0.03224650502348537
Iteration 850:  0.03351787582241902


In [144]:
prediction = model.predict(X_test)

In [145]:
label_pred = []
for i in range(prediction.shape[1]):
    label_pred.append(np.argmax(prediction[:, i]))

y_test_label = []
for i in range(y_test.shape[0]):
    y_test_label.append(np.argmax(y_test[i, :]))

In [146]:
print(label_pred)
print(y_test_label)

[1, 0, 2, 1, 1, 0, 1, 2, 2, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 2, 2, 1, 2, 1, 2, 1, 0, 2, 1, 0, 0, 0, 1, 2, 0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 0, 2, 2, 1]
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 2, 2, 1, 2, 1, 2, 1, 0, 2, 1, 0, 0, 0, 1, 2, 0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 0, 2, 2, 1]


In [147]:
from sklearn.metrics import accuracy_score

print(accuracy_score(label_pred, y_test_label))

0.9866666666666667


In [148]:
from sklearn.neural_network import MLPClassifier 

clf = MLPClassifier(random_state=42, max_iter=2000).fit(X_train, y_train)

prediction_sklearn = clf.predict(X_test)
label_pred_sklearn = []
for i in range(prediction_sklearn.shape[0]):
    label_pred_sklearn.append(np.argmax(prediction_sklearn[i, :]))
    
print(label_pred_sklearn)

[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 0, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 0, 1, 1, 2, 1, 2, 1, 2, 1, 0, 2, 1, 0, 0, 0, 1, 2, 0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 0, 1, 2, 1]


In [149]:
print(accuracy_score(label_pred_sklearn, y_test_label))

0.96


In [150]:
print(accuracy_score(label_pred, label_pred_sklearn))

0.9466666666666667


In [68]:
model2 = NeuralNetwork(learning_rate=0.001, max_iter=2000, verbose=False)

#input of first layer must match the num of feature in the training data
model2.add(Layer("relu", 4, 10))

model2.add(Layer("relu", 10, 10))
model2.add(Layer("linear", 10, 5))

#output of the layer must match the num of feature in the target classes
model2.add(Layer("sigmoid", 5, 3))

# print(model)

model2.fit(X, y)

Iteration 0:  0.34405743441544306
Iteration 50:  0.04736218761948282
Iteration 100:  0.02626240241695076
Iteration 150:  0.028019278521715454
Iteration 200:  0.02593208784061147
Iteration 250:  0.019982602242524176
Iteration 300:  0.02657083779051741
Iteration 350:  0.021451467954809043
Iteration 400:  0.02589338842336271
Iteration 450:  0.02642628157034493
Iteration 500:  0.025832439696855444
Iteration 550:  0.03879487350486038
Iteration 600:  0.02505763567028002
Iteration 650:  0.02995436629465716
Iteration 700:  0.02690026254154087
Iteration 750:  0.015413034384120824


In [69]:
model2.save_model("full_training_model.txt")

In [80]:
print(model2)

1-th layer
Activation: relu
Weight matrix:
[[ 1.92924596  0.08208996 -1.8218437  -2.00654359]
 [ 3.62314444 -1.16175077  4.33662595  0.44620839]
 [ 0.31490968 -0.15878208  1.05840844 -2.36278601]
 [-0.59237774 -0.49239878  0.85038436 -1.16354056]
 [-0.17242821 -0.87785842  0.04221375  0.58281521]
 [-1.36839401  0.9165489   0.82232522  0.55544821]
 [ 1.28037678  0.05682586 -1.03457238 -1.61929099]
 [ 1.51704569  2.83801551 -2.80346403 -3.55660795]
 [-0.6871727  -0.84520564 -0.67124613 -0.0126646 ]
 [ 0.00744243  0.04603615  4.29323433  2.39059389]]
Bias
[[ 0.46018384]
 [-0.47171937]
 [-0.48919406]
 [ 1.65136667]
 [ 0.05080775]
 [-0.7356335 ]
 [ 0.72149243]
 [ 4.28316127]
 [ 0.12015895]
 [ 0.12639828]]

2-th layer
Activation: relu
Weight matrix:
[[ 0.94145132 -0.96198531 -1.18256848 -0.44697539 -0.20889423  0.56545594
   1.15518564  2.04265078  0.28558733  0.61806197]
 [ 0.28079504  1.42782049  0.24581805 -0.1814549   0.48851815 -0.3464071
   1.82947651  3.00153225  2.18557541 -2.2269009