In [1]:
# Importing
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import sklearn.datasets
from sklearn.metrics import accuracy_score
from random import uniform
from math import exp, log
from collections import OrderedDict
from pprint import pprint

# Plot settings
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (10.0, 8.0)

In [2]:
# Вспомогательные функции для генерации данных:
def generate_moons(n_samples=200, noise=0.2, visualize=True):
    X, y = sklearn.datasets.make_moons(n_samples, noise=noise)
    if visualize:
        plt.scatter(X[:,0], X[:,1], s=40, c=y, cmap=plt.cm.Spectral)
    return X, y

def get_random_number(a=-1, b=1):
    return uniform(a, b)

# Функции активации и их производные:
def ReLU(x, return_derivative=False):
    if not return_derivative:
        return max(0, x)
    else:
        return (x > 0) * 1

def softplus(x, return_derivative=False, limit=30):
    if not return_derivative:
        if x > limit:
            return x
        else:
            return log(1 + exp(x))
    else:
        return sigmoid(x)

def sigmoid(x, return_derivative=False):
    if not return_derivative:
        try:
            res = 1 / (1 + exp(-x))
        except OverflowError:
            res = 0
    else:
        res = sigmoid(x) * (1 - sigmoid(x))
    return res

def log_loss(y, p, return_derivative=False):
    if not return_derivative:
        try:
            logarithm_p = log(p)
        except ValueError:
            logarithm_p = -744
        try:
            logarithm_1_p = log(1-p)
        except ValueError:
            logarithm_1_p = -744
        return -(y*logarithm_p + (1-y)*logarithm_1_p)
    else:
        return -((y - p) / (p * (1 - p))) * (p * (1 - p))

In [3]:
class WeightedConnection:
    ### Класс для описания связей между нейронами.
    # Содержит: вес связи, входной и выходной нейрон,
    # обновленный вес после прямого прохода нейронной сети,
    # значения антиградиента (для подсчета обратного распространения ошибки).

    def __init__(self, weight=None, input_neuron=None, output_neuron=None):
        if weight is None:
            weight = get_random_number(0, 1)
        self.weight = weight
        self.input_neuron = input_neuron
        self.output_neuron = output_neuron

        self.updated_weight = None
        self.anti_gradient = None
        
    def send_signal_through_connection(self, signal):
        if not self.input_neuron or not self.output_neuron:
            print('Cannot send signal, because '
                  'there is no input/output connection')
            return False
        self.output_neuron.receive_new_input_signal_from(self, signal*self.weight)

In [4]:
class Neuron:
    ### Класс для описания нейронов.
    # Содержит: функцию активации нейрона,
    # последнее значение функции активации (от взешенной суммы значений всех входных связей),
    # bias и обновленный bias после прямого прохода нейронной сети,
    # значения антиградиента (для подсчета обратного распространения ошибки),
    # словарь всех входных связей нейрона (со взешенными значениями),
    # множество входных связей, от которых получены сигналы (для проверки готовности активации нейрона),
    # множество всех выходных связей нейрона.
    instance_counter = 0

    def __init__(self, activation_function=ReLU):
        Neuron.instance_counter += 1
        self.name = str(Neuron.instance_counter) 
        self.activation_function = activation_function
        self.last_value = None
        self.bias = 0
        self.updated_bias = None
        # input_connection = {WeightedConnection(): input_value}
        self.input_connections = OrderedDict()
        self.received_signals_from = set()

        # output_connection = set(WeightedConnection())
        self.output_connections = set()

    def create_output_connection_with(self, neuron, weight=None, input_value=None):
        new_neuron_connection = WeightedConnection(weight=weight,
                                                   input_neuron=self,
                                                   output_neuron=neuron)
        self.output_connections.add(new_neuron_connection)
        neuron.input_connections[new_neuron_connection] = input_value
        return True

    def receive_new_input_signal_from(self, connection, weighted_signal):
        if connection not in self.input_connections:
            print('Cannot receive new input signal, because no such input neuron')
            return False
        self.input_connections[connection] = weighted_signal
        self.received_signals_from.add(connection)
        return True

    def has_received_signals_from_all_connections(self):
        if self.is_input_neuron():
            if self.last_value is None:
                return False
        elif self.is_middle_neuron() or self.is_output_neuron():
            if (not self.input_connections) or \
                (not self.received_signals_from) or \
                 (len(self.input_connections) != len(self.received_signals_from)):
                return False
        else:
            print('Cannot determine neuron type')
            return False
        return True
    
    def activate(self):
        # Проверяем, можем ли активировать данный тип нейрона:
        if self.is_input_neuron():
            if len(self.output_connections) == 0:
                print("Cannot activate input neuron, "
                      "because it has no output connecitons")
                return False
        elif self.is_output_neuron():
            if (len(self.input_connections) == 0) or \
                    (not self.has_received_signals_from_all_connections()):
                print("Cannot activate output neuron, "
                      "because it has no input connecitons")
                return False
        elif self.is_middle_neuron():
            if (None in self.input_connections.values()) or \
                   (not self.has_received_signals_from_all_connections()):
                print("Cannot activate neuron, "
                      "because it hasn't received all signals "
                      "from each input neuron yet")
                return False
        else:
            print('Cannot determine neuron type')
            return False
        # calculate activated input signal
        input_signal = 0
        if self.is_input_neuron():
            if self.last_value is None:
                print("Cannot activate input neuron, "
                      "because its last value is empty")
                return False
            input_signal = self.last_value
        else:
            for input_connection, input_value in self.input_connections.items():
                input_signal += input_value
        self.last_value = self.activation_function(input_signal + self.bias)

        # send signal through each output conneciton
        for output_connection in self.output_connections:
            output_connection.send_signal_through_connection(self.last_value)

        # ready to receive new signals
        self.received_signals_from = set()
        return True

    def calculate_anti_gradient(self):
        if self.is_input_neuron():
            if len(self.output_connections) == 0:
                print("Cannot calculate anti-gradient, "
                      "because neuron has no output connecitons")
                return None
        elif self.is_output_neuron():
            print("For calculation of output neuron anti-gradient "
                  "use cost function derivative")
            return None
        elif self.is_middle_neuron():
            if None in self.input_connections.values():
                print("Cannot calculate anti-gradient, "
                      "because neuron hasn't received all signals "
                      "from each input neuron yet")
                return None
        else:
            print('Cannot determine neuron type')
            return None
        # calculate activated output signal
        anti_gradient = 0
        for output_connection in self.output_connections:
            if output_connection.anti_gradient is None:
                print("Cannot calculate neuron anti-gradient, "
                      "because there is 'None' in "
                      "one of neurons output connections")
                return None
            anti_gradient += output_connection.anti_gradient
        return anti_gradient

    def is_input_neuron(self):
        if len(self.input_connections) == 0 and len(self.output_connections) > 0:
            return True
        else:
            return False

    def is_middle_neuron(self):
        if len(self.input_connections) > 0 and len(self.output_connections) > 0:
            return True
        else:
            return False
        
    def is_output_neuron(self):
        if len(self.input_connections) > 0 and len(self.output_connections) == 0:
            return True
        else:
            return False

In [5]:
class FullyConnectedNeuroNetwork:
    ### Класс для описания полносвязной нейронной сети.
    # Содержит: функцию потерь,
    # learning_rate - скорость обучения,
    # derivative_delta - константа для числовой проверки вычисленной производной.

    def __init__(self, cost_function=log_loss, learning_rate=0.01):
        self.layers = []
        self.cost_function = cost_function
        self.learning_rate = learning_rate
        self.derivative_delta = 0.001
        self.ready_to_back_propagate = False
        Neuron.instance_counter = 0
    
    def add_layer(self, number_of_neurons, activation_function=ReLU):
        if len(self.layers) == 0:
            if not self.add_input_layer(number_of_neurons, activation_function):
                return False
        else:
            if not self.add_fully_connected_layer(number_of_neurons, activation_function):
                return False
        return True
        
    def add_input_layer(self, number_of_neurons, activation_function=ReLU):
        if len(self.layers) > 0:
            print('Cannot add input layer, because it is already exists')
            return False
        else:
            new_neuron_layer = tuple(
                Neuron(activation_function=activation_function) \
                    for neuron in range(number_of_neurons)
            )
            self.layers.append(new_neuron_layer)
            return True
        
    def add_fully_connected_layer(self, number_of_neurons, activation_function=ReLU):
        if len(self.layers) > 0:
            new_neuron_layer = tuple(
                Neuron(activation_function=activation_function) \
                    for neuron in range(number_of_neurons)
            )
            previous_layer = self.layers[-1]
            for output_neuron in previous_layer:
                for input_neuron in new_neuron_layer:
                    output_neuron.create_output_connection_with(input_neuron)
            self.layers.append(new_neuron_layer)
            return True
        else:
            print('Cannot add fully_connected layer, because neural network is empty')
            return False
        
    def get_input_layer(self):
        if len(self.layers) > 0:
            return self.layers[0]
        else:
            print('Cannot get input layer, because neural network is empty')
            return None
        
    def get_output_layer(self):
        if len(self.layers) > 0:
            return self.layers[-1]
        else:
            print('Cannot get output layer, because neural network is empty')
            return None

    def is_input_layer(self, layer_index):
        return layer_index == 0

    def is_middle_layer(self, layer_index):
        return layer_index > 0 and layer_index < len(self.layers)

    def is_output_layer(self, layer_index):
        return layer_index == len(self.layers)
        
    def forward_calculate(self, input_values, true_y):
        # Прямой проход
        if len(self.layers) < 1:
            print('Cannot forward calculate, because neural network is empty')
            return False
        elif len(input_values) < len(self.layers[0]):
            print('Cannot forward calculate, not enough input values were given')
            return False
        else:
            for input_neuron, input_value in zip(self.layers[0], input_values):
                input_neuron.last_value = input_value
            for layer in self.layers:
                for neuron in layer:
                    neuron.activate()
            output_neurons_values = [neuron.last_value for neuron in self.layers[-1]]
            predicted_y = \
                sum(output_neurons_values) / len(output_neurons_values)
            self.ready_to_back_propagate = True
            loss = self.cost_function(true_y, predicted_y)
            return true_y, predicted_y, loss
    
    def backward_propagate(self, true_y, predicted_y):
        # Обратный проход
        if len(self.layers) < 1:
            print('Cannot backward propagate, because neural network is empty')
            return False
        elif not self.ready_to_back_propagate:
            print('Cannot backward propagate, because did not forward calculated yet')
            return False
        else:
            for layer in reversed(self.layers):
                anti_gradient = None
                if layer == self.layers[-1]:
                    # для последнего слоя антиградиент рассчитывается
                    # как производная функции потерь:
                    anti_gradient = self.cost_function(true_y,
                                                       predicted_y,
                                                       return_derivative=True)
                if not self.backward_propagate_layer(layer, anti_gradient=anti_gradient):
                    print('Cannot backward propagate through layer:', self.layers.index(layer))
                    return False
            self.ready_to_back_propagate = False
            if not self.update_weights_and_biases():
                print('Cannot update weights')
                return False
            self.reset_anti_gradients_updated_weights_and_last_values()
            return True
    
    def backward_propagate_layer(self, layer, anti_gradient=None):
        # Обратный проход по слою
        for neuron in layer:
            if not self.backward_propagate_neuron(neuron, anti_gradient=anti_gradient):
                print('Cannot backward propagate through neuron:', neuron.name)
                return False
        return True
    
    def backward_propagate_neuron(self, neuron, anti_gradient=None):
        # Обратный проход по нейрону
        if anti_gradient is None:
            anti_gradient = neuron.calculate_anti_gradient()
        # update weights
        for input_connection in neuron.input_connections:
            if not self.backward_propagate_input_connection(input_connection,
                                                            anti_gradient):
                print('Cannot back propagate connection:', input_connection)
                return False
        # bias update
        new_anti_gradient = \
            anti_gradient * \
                neuron.activation_function(neuron.bias, return_derivative=True)
        neuron.updated_bias = \
            neuron.bias - self.learning_rate * new_anti_gradient
        return True
    
    def backward_propagate_input_connection(self, connection, anti_gradient):
        # Обратный проход по связи
        if anti_gradient is None:
            print("Cannot back propagate, because connection anti-gradient is 'None'")
            return False
        weighted_signal = connection.output_neuron.input_connections.get(connection)
        if weighted_signal is None:
            print("Cannot back propagate, because no signal was send "
                  "through connection:", connection)
            return False
        if not self.check_derivative(connection.output_neuron.activation_function,
                                     weighted_signal):
            print('Derivative caluclation is wrong, check for errors')
            return False
        new_anti_gradient = \
            anti_gradient * \
                connection.output_neuron.activation_function(
                    weighted_signal,
                    return_derivative=True
                )
        connection.updated_weight = \
            connection.weight - self.learning_rate*new_anti_gradient
        connection.anti_gradient = new_anti_gradient
        return True

    def check_derivative (self, activation_function, weighted_signal):
        # Численная проверка правильности вычисленной производной
        formal_derivative = activation_function(weighted_signal, return_derivative=True)
        numeric_deivative = (  activation_function(weighted_signal + self.derivative_delta)   \
                             - activation_function(weighted_signal - self.derivative_delta) ) \
                                                / (self.derivative_delta*2)
        if abs(formal_derivative - numeric_deivative) > self.derivative_delta:
            return False
        else:
            return True

    def update_weights_and_biases(self):
        # Обновляем веса и bias'ы
        for layer in self.layers:
            for neuron in layer:
                for output_connection in neuron.output_connections:
                    if output_connection.updated_weight is None:
                        print("Updated weight is 'None'")
                        return False
                    output_connection.weight = output_connection.updated_weight
                if neuron.updated_bias is None:
                    print("Neuron updated bias is 'None'")
                else:
                    neuron.bias = neuron.updated_bias
        return True
    
    def reset_anti_gradients_updated_weights_and_last_values(self):
        # Сбрасываем временные значения весов, bias'ов и антиградиентов 
        for layer in self.layers:
            for neuron in layer:
                neuron.last_value = None
                neuron.updated_bias = None
                for output_connection in neuron.output_connections:
                    output_connection.updated_weight = None
                    output_connection.anti_gradient = None
        return True

---

---

---

Задача: разделить на два класса случайные точки с коордиантами (x, y), где [0 < x < 1] и [0 < y < 1].
Разделяющая прямая: y = x, т.е. всё что выше – класс '1', всё что ниже – класс '0'.

In [6]:
# Генерируем данные:
def generate_data(count=1000):
    data = []
    for i in range(count):
        x = get_random_number(0, 1)
        y = get_random_number(0, 1)
        true_class = 1 if y > x else 0
        data.append((x, y, true_class)) # [x, y, true_class]
    return data

---

In [7]:
neural_network = FullyConnectedNeuroNetwork()
neural_network.add_layer(2, activation_function=sigmoid)
neural_network.add_layer(1, activation_function=sigmoid)

True

In [8]:
for i in range(0, 20):
    counter = 0
    sum_loss = 0
    data = generate_data()
    for j in range(len(data)):
        input_values = data[j][:-1]
        true_class = data[j][-1]
        counter += 1
        true_class, predicted_class, loss = \
            neural_network.forward_calculate(input_values, true_class)
        sum_loss += loss
        neural_network.backward_propagate(true_class, predicted_class)
    print('Loss: {:.5f}'.format(sum_loss / counter), end=' -->  ')

Loss: 0.69696 -->  Loss: 0.69738 -->  Loss: 0.69668 -->  Loss: 0.69617 -->  Loss: 0.69663 -->  Loss: 0.69664 -->  Loss: 0.69721 -->  Loss: 0.69702 -->  Loss: 0.69721 -->  Loss: 0.69641 -->  Loss: 0.69707 -->  Loss: 0.69727 -->  Loss: 0.69724 -->  Loss: 0.69724 -->  Loss: 0.69620 -->  Loss: 0.69710 -->  Loss: 0.69580 -->  Loss: 0.69784 -->  Loss: 0.69721 -->  Loss: 0.69737 -->  

---

In [18]:
neural_network = FullyConnectedNeuroNetwork()
neural_network.add_layer(2, activation_function=sigmoid)
neural_network.add_layer(4, activation_function=sigmoid)
neural_network.add_layer(2, activation_function=sigmoid)
neural_network.add_layer(1, activation_function=sigmoid)

True

In [21]:
for i in range(0, 20):
    counter = 0
    sum_loss = 0
    data = generate_data()
    for j in range(len(data)):
        input_values = data[j][:-1]
        true_class = data[j][-1]
        counter += 1
        true_class, predicted_class, loss = \
            neural_network.forward_calculate(input_values, true_class)
        sum_loss += loss
        neural_network.backward_propagate(true_class, predicted_class)
    print('Loss: {:.5f}'.format(sum_loss / counter), end=' -->  ')

Loss: 0.69332 -->  Loss: 0.69357 -->  Loss: 0.69327 -->  Loss: 0.69432 -->  Loss: 0.69145 -->  Loss: 0.69481 -->  Loss: 0.69283 -->  Loss: 0.69414 -->  Loss: 0.69393 -->  Loss: 0.69347 -->  Loss: 0.69329 -->  Loss: 0.69338 -->  Loss: 0.69399 -->  Loss: 0.69389 -->  Loss: 0.69339 -->  Loss: 0.69399 -->  Loss: 0.69389 -->  Loss: 0.69380 -->  Loss: 0.69382 -->  Loss: 0.69337 -->  

---

In [11]:
neural_network = FullyConnectedNeuroNetwork()
neural_network.add_layer(2, activation_function=sigmoid)
neural_network.add_layer(8, activation_function=sigmoid)
neural_network.add_layer(1, activation_function=sigmoid)

True

In [12]:
for i in range(0, 20):
    counter = 0
    sum_loss = 0
    data = generate_data()
    for j in range(len(data)):
        input_values = data[j][:-1]
        true_class = data[j][-1]
        counter += 1
        true_class, predicted_class, loss = \
            neural_network.forward_calculate(input_values, true_class)
        sum_loss += loss
        neural_network.backward_propagate(true_class, predicted_class)
    print('Loss: {:.5f}'.format(sum_loss / counter), end=' -->  ')

Loss: 0.72006 -->  Loss: 0.69435 -->  Loss: 0.69401 -->  Loss: 0.69372 -->  Loss: 0.69232 -->  Loss: 0.69389 -->  Loss: 0.69323 -->  Loss: 0.69338 -->  Loss: 0.69271 -->  Loss: 0.69314 -->  Loss: 0.69369 -->  Loss: 0.69375 -->  Loss: 0.69338 -->  Loss: 0.69349 -->  Loss: 0.69237 -->  Loss: 0.69309 -->  Loss: 0.69324 -->  Loss: 0.69355 -->  Loss: 0.69269 -->  Loss: 0.69259 -->  

---

---

---

In [13]:
# true_y = []
# predicted_y = []

# data = generate_data(10000)
# for each in data:
#     true_class, predicted_class, loss = \
#         neural_network.forward_calculate(each[:-1], each[-1])
#     neural_network.reset_anti_gradients_updated_weights_and_last_values()
#     true_y.append(each[-1])
#     predicted_class = \
#         1 if predicted_class >= 0.5 else 0
#     predicted_y.append(predicted_class)

In [14]:
# y_true = np.array(true_y)
# y_pred = np.array(predicted_y)

In [15]:
# accuracy_score(y_true, y_pred)