# Zadanie 5


Celem ćwiczenia jest implementacja perceptronu wielowarstwowego oraz wybranego algorytmu optymalizacji gradientowej z algorytmem propagacji wstecznej.

Następnie należy wytrenować perceptron wielowarstwowy do klasyfikacji zbioru danych [MNIST](http://yann.lecun.com/exdb/mnist/). Zbiór MNIST dostępny jest w pakiecie `scikit-learn`.

Punktacja:
1. Implementacja propagacji do przodu (`forward`) [1 pkt]
2. Implementacja wstecznej propagacji (zademonstrowana na bramce XOR) (`backward`) [2 pkt]
3. Przeprowadzenie eksperymentów na zbiorze MNIST, w tym:
    1. Porównanie co najmniej dwóch architektur sieci [1 pkt]
    2. Przetestowanie każdej architektury na conajmniej 3 ziarnach [1 pkt]
    3. Wnioski 1.[5 pkt]
4. Jakość kodu 0.[5 pkt]

Polecane źródła - teoria + intuicja:
1. [Karpathy, CS231n Winter 2016: Lecture 4: Backpropagation, Neural layersworks 1](https://www.youtube.com/watch?v=i94OvYb6noo&ab_channel=AndrejKarpathy)
2. [3 Blude one Brown, Backpropagation calculus | Chapter 4, Deep learning
](https://www.youtube.com/watch?v=tIeHLnjs5U8&t=4s&ab_channel=3Blue1Brown)


In [2]:
from abc import abstractmethod, ABC
from typing import List
import numpy as np
from sklearn import datasets 
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml

In [4]:
class Layer(ABC):
    """Basic building block of the Neural layerswork"""

    def __init__(self) -> None:
        self._learning_rate = 0.01

    @abstractmethod
    def forward(self, x:np.ndarray)->np.ndarray:
        """Forward propagation of x through layer"""
        pass

    @abstractmethod
    def backward(self, output_error_derivative, learning_rate) ->np.ndarray:
        """Backward propagation of output_error_derivative through layer"""
        pass

    @property
    def learning_rate(self):
        return self._learning_rate

    @learning_rate.setter
    def learning_rate(self, learning_rate):
        assert learning_rate < 1, f"Given learning_rate={learning_rate} is larger than 1"
        assert learning_rate > 0, f"Given learning_rate={learning_rate} is smaller than 0"
        self._learning_rate = learning_rate

class FullyConnected(Layer):
    def __init__(self, input_size:int, output_size:int) -> None:
        super().__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.input = None
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5

    def forward(self, x:np.ndarray)->np.ndarray:
        self.input = x
        return np.dot(x, self.weights) + self.bias

    def backward(self, output_error_derivative, learning_rate)->np.ndarray:
        # dE/dx
        error_of_input = np.dot(output_error_derivative, self.weights.T)
        weights_error = np.dot(self.input.T, output_error_derivative)
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error_derivative
        # will be used as dE/dy by previous layer
        return error_of_input

class Tanh(Layer):
    def __init__(self) -> None:
        super().__init__()
        self.input = None

    def forward(self, x:np.ndarray)->np.ndarray:
        self.input = x
        return np.tanh(x)

    def backward(self, output_error_derivative, learning_rate=None)->np.ndarray:
        return  (1-np.tanh(self.input)**2) * output_error_derivative

class Loss:
    def __init__(self, loss_function:callable, loss_function_derivative:callable)->None:
        self.loss_function = loss_function
        self.loss_function_derivative = loss_function_derivative

    def loss(self, result_y, actual_y):
        """Loss function for a particular result_y and actual_y"""
        return self.loss_function(result_y, actual_y)

    def loss_derivative(self, result_y:np.ndarray, actual_y:np.ndarray)->np.ndarray:
        """Loss function derivative for a particular result_y and actual_y"""
        derivative = self.loss_function_derivative(result_y, actual_y)
        return derivative

class Network:
    def __init__(self, layers:List[Layer], learning_rate:float)->None:
        self.layers = layers
        self.learning_rate = learning_rate
        self.loss = None

    def compile(self, loss:Loss)->None:
        """Define the loss function and loss function derivative"""
        self.loss = loss

    def __call__(self, x:np.ndarray) -> np.ndarray:
        """Forward propagation of x through all layers"""
        input_for_next_layer = x
        for layer in self.layers:
            input_for_next_layer = layer.forward(input_for_next_layer)
        return input_for_next_layer

    def fit(self,
            x_train:np.ndarray,
            y_train:np.ndarray,
            epochs:int,
            verbose:int=0)->None:
        """Fit the layerswork to the training data"""
        for epoch in range(epochs):
            error = 0
            for i in range(len(x_train)):
                x = x_train[i]
                #forward prop
                result = self.__call__(x)
                #for stats output only
                for result_y, actual_y in zip(result, y_train[i]):
                    error += self.loss.loss(result_y, actual_y)
                output_error = self.loss.loss_derivative(result, y_train[i])
                #backward prop
                for layer in reversed(self.layers):
                    output_error = layer.backward(output_error, self.learning_rate)

            error /= len(x_train)
            if verbose: print('epoch %d/%d   error=%f' % (epoch+1, epochs, error))
    


In [5]:
def mse(result_y, actual_y):
    # calculate mean squared error
    return np.mean(np.power(actual_y - result_y, 2))

def mse_derivative(result_y, actual_y):
    derivative = 2*(result_y-actual_y)/len(actual_y)
    return derivative

# XOR

In [3]:
x_train = np.array([[[0,0]], [[0,1]], [[1,1]], [[1,0]]])
y_train = np.array([[[0]], [[1]], [[0]], [[1]]])
layers = [FullyConnected(2, 3), Tanh(), FullyConnected(3, 1), Tanh()]
xor_net = Network(layers, 0.1)
xor_net.compile(Loss(mse, mse_derivative))
xor_net.fit(x_train, y_train, epochs=1000, verbose=1)

NameError: name 'FullyConnected' is not defined

# Eksperymenty

In [6]:
digits = fetch_openml('mnist_784', version=1, as_frame=False)

In [7]:
def number_as_vector(n):
    return [1 if x==n else 0 for x in range(10)]

def change_numbers_to_vectors(numbers):
    new_numbers = []
    for i in range(len(numbers)):
        new_numbers.append(number_as_vector(numbers[i]))
    return new_numbers

def calculate_accuracy(net: Network, x_test, y_test):
    samples = len(y_test)
    correct = 0
    for i in range(samples):
        result = net.__call__(x_test[i])
        max_probability = np.amax(result)
        predicted_digit = (result.reshape((10))).tolist().index(max_probability)
        correct += 1 if predicted_digit == y_test[i].reshape((10)).tolist().index(1) else 0
    acc = correct/samples
    return acc

def prepare_data(x:np.array, y:np.array):
    x = x.reshape(x.shape[0], 1, 28*28)
    y = y.astype(np.int16)
    vector_y = change_numbers_to_vectors(y)
    y = np.reshape(vector_y, (y.shape[0], 1, 10))
    return (x, y)

Trenowanie + testowanie modelu 1. Architektura: (784, 50) -> (50, 50) -> (50,10)

In [21]:
lr_rates = [0.001, 0.005, 0.01]
for i in range(len(lr_rates)):
    x_train, x_test, y_train, y_test = train_test_split(
        digits.data, digits.target, test_size=0.8
    )
    x_train, y_train = prepare_data(x=x_train, y=y_train)
    layers = [FullyConnected(28*28, 50), Tanh(), FullyConnected(50, 50), Tanh(), FullyConnected(50, 10), Tanh()]
    first_net = Network(layers, lr_rates[i])
    first_net.compile(Loss(mse, mse_derivative))
    first_net.fit(x_train, y_train, epochs=80, verbose=0)
    # test
    x_test, y_test = prepare_data(x_test, y_test)
    acc = calculate_accuracy(first_net, x_test, y_test)
    print(f"Acc: {acc}")


Acc: 0.7899642857142857
Acc: 0.38648214285714283
Acc: 0.34014285714285714


In [8]:
lr_rates = [0.0005, 0.001, 0.002]
for i in range(len(lr_rates)):
    x_train, x_test, y_train, y_test = train_test_split(
        digits.data, digits.target, test_size=0.8
    )
    x_train, y_train = prepare_data(x=x_train, y=y_train)
    layers = [FullyConnected(28*28, 50), Tanh(), FullyConnected(50, 50), Tanh(), FullyConnected(50, 10), Tanh()]
    first_net = Network(layers, lr_rates[i])
    first_net.compile(Loss(mse, mse_derivative))
    first_net.fit(x_train, y_train, epochs=100, verbose=0)
    # test
    x_test, y_test = prepare_data(x_test, y_test)
    acc = calculate_accuracy(first_net, x_test, y_test)
    print(f"Acc: {acc}")


Acc: 0.8385714285714285
Acc: 0.8118392857142858
Acc: 0.6892678571428571


Trenowanie + testowanie modelu 2. Architektura: (784, 100) -> (100, 100) -> (100,10)

In [9]:
lr_rates_for_wide = [0.0005, 0.001, 0.002]
for learning_rate in lr_rates_for_wide:
    x_train, x_test, y_train, y_test = train_test_split(
        digits.data, digits.target, test_size=0.8
    )
    x_train, y_train = prepare_data(x=x_train, y=y_train)
    layers = [FullyConnected(28*28, 100), Tanh(), FullyConnected(100, 100), Tanh(), FullyConnected(100, 10), Tanh()]
    second_net = Network(layers, learning_rate)
    second_net.compile(Loss(mse, mse_derivative))
    second_net.fit(x_train, y_train, epochs=50)
    # test
    x_test, y_test = prepare_data(x_test, y_test)
    acc = calculate_accuracy(second_net, x_test, y_test)
    print(f"Acc={acc}")

Acc=0.7789285714285714
Acc=0.715
Acc=0.7285178571428571


Trenowanie + testowanie modelu 3. Architektura: (784, 50) -> (50, 50) -> (50, 50) -> (50, 50)

In [11]:
lr_rates_for_long = [0.0005, 0.001, 0.002]
for learning_rate in lr_rates_for_long:
    x_train, x_test, y_train, y_test = train_test_split(
        digits.data, digits.target, test_size=0.8
    )
    x_train, y_train = prepare_data(x=x_train, y=y_train)
    layers = [FullyConnected(28*28, 50), Tanh(), FullyConnected(50, 50), Tanh(), FullyConnected(50, 50), Tanh(), FullyConnected(50, 10), Tanh()]
    third_net = Network(layers, learning_rate)
    third_net.compile(Loss(mse, mse_derivative))
    third_net.fit(x_train, y_train, epochs=70)
    # test
    x_test, y_test = prepare_data(x_test, y_test)
    acc = calculate_accuracy(third_net, x_test, y_test)
    print(f"Acc={acc}")

Acc=0.8368214285714286
Acc=0.7799821428571428
Acc=0.5724464285714286


# Wnioski

- Ilość epok pozytywnie wpływa na jakość sieci ale przy małych learning_rate, przy większych learning_rate będzie oscylować wokół jakiejś wartości.
- Więcej neuronów nie znaczy lepsze wyniki
- Więcej warstw zawsze pozytywnie wpływa na jakość sieci, ale potrzebują więcej epok dla wytrenowania. Przez to mogą dawać gorsze wyniki niż mniejsze sieci przy taką samej ilości epok
- Dla sieci neuronowych hiperparametrami są ilość epok, learning_rate, glębokość i szerokość sieci.