# Zadanie 5


Celem ćwiczenia jest implementacja perceptronu wielowarstwowego oraz wybranego algorytmu optymalizacji gradientowej z algorytmem propagacji wstecznej.

Następnie należy wytrenować perceptron wielowarstwowy do klasyfikacji zbioru danych [MNIST](http://yann.lecun.com/exdb/mnist/). Zbiór MNIST dostępny jest w pakiecie `scikit-learn`.

Punktacja:
1. Implementacja propagacji do przodu (`forward`) [1 pkt]
2. Implementacja wstecznej propagacji (zademonstrowana na bramce XOR) (`backward`) [2 pkt]
3. Przeprowadzenie eksperymentów na zbiorze MNIST, w tym:
    1. Porównanie co najmniej dwóch architektur sieci [1 pkt]
    2. Przetestowanie każdej architektury na conajmniej 3 ziarnach [1 pkt]
    3. Wnioski 1.[5 pkt]
4. Jakość kodu 0.[5 pkt]

Polecane źródła - teoria + intuicja:
1. [Karpathy, CS231n Winter 2016: Lecture 4: Backpropagation, Neural Networks 1](https://www.youtube.com/watch?v=i94OvYb6noo&ab_channel=AndrejKarpathy)
2. [3 Blude one Brown, Backpropagation calculus | Chapter 4, Deep learning
](https://www.youtube.com/watch?v=tIeHLnjs5U8&t=4s&ab_channel=3Blue1Brown)


In [22]:
from abc import abstractmethod, ABC
from typing import List
import numpy as np
import sklearn.metrics

In [11]:
def relu(Z: np.ndarray)->np.ndarray:
    A = np.maximum(0.0, Z)
    return A, Z

def relu_prime(Z: np.ndarray)->np.ndarray:
    return np.where(Z > 0, 1, 0)

def tanh(Z:np.ndarray)->np.ndarray:
    A = np.tanh(Z)
    return A

def tanh_prime(Z:np.ndarray)->np.ndarray:
    P = 1 - np.tanh(Z) ** 2
    return P


#a = np.arange(5)
#print(a)
#print(reLU(a))
#print(reLU_prime(a))

In [56]:
###loss function

def loss_function(x, y):
    return np.mean(pow((x - y), 2))

def loss_function_derivative(x, y):
    return 2 * (x - y) / y.size

#'''def loss_function(x, y):
#    return - np.sum(y * np.log(x))

#def loss_function_derivative(x, y):
#    return '''

In [58]:
class Layer(ABC):
    """Basic building block of the Neural Network"""

    def __init__(self) -> None:
        self._learning_rate = 0.01
        self.input = None
        self.output = None

    @abstractmethod
    def forward(self, x:np.ndarray)->np.ndarray: #
        """Forward propagation of x through layer"""
        pass

    @abstractmethod
    def backward(self, output_error_derivative) ->np.ndarray:
        """Backward propagation of output_error_derivative through layer"""
        pass

    @property
    def learning_rate(self):
        return self._learning_rate

    @learning_rate.setter
    def learning_rate(self, learning_rate):
        assert learning_rate < 1, f"Given learning_rate={learning_rate} is larger than 1"
        assert learning_rate > 0, f"Given learning_rate={learning_rate} is smaller than 0"
        self._learning_rate = learning_rate

class FullyConnected(Layer):
    def __init__(self, input_size:int, output_size:int) -> None:
        super().__init__()
        self.input_size = input_size #size of previous array
        self.output_size = output_size #size of current array

        self.weights = None
        self.biases = None
        self.generate_biases()
        self.generate_weigths()

    def generate_weigths(self):
        self.weights = np.random.randn(self.input_size, self.output_size) / np.sqrt(self.input_size)
    
    def generate_biases(self):
        self.biases = np.random.randn(1, self.output_size)

    def forward(self, input:np.ndarray)->np.ndarray:
        self.input = input
        self.output = np.dot(self.input, self.weights) + self.biases
        return self.output


    def backward(self, output_error_derivative)->np.ndarray:
        input_error = np.dot(output_error_derivative, self.weights.T)
        weights_error = np.dot(self.input.T, output_error_derivative)

        #gradient descent
        self.weights -= self.learning_rate * weights_error
        self.biases -= self.learning_rate * output_error_derivative

        return input_error

class Tanh(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input:np.ndarray)->np.ndarray:
        self.input = input
        return tanh(self.input)

    def backward(self, output_error_derivative)->np.ndarray:
        return tanh_prime(self.input) * output_error_derivative

class reLu(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input:np.ndarray)->np.ndarray:
        self.input = input
        return relu(self.input)

    def backward(self, output_error_derivative)->np.ndarray:
        return relu_prime(self.input) * output_error_derivative #lack of argument of tanh_prime

class Loss:
    def __init__(self, loss_function:callable, loss_function_derivative:callable)->None:
        self.loss_function = loss_function
        self.loss_function_derivative = loss_function_derivative

    def loss(self, input:np.ndarray, y:np.ndarray)->np.ndarray:
        """Loss function for a particular input"""
        return self.loss_function(input, y)
        """sum = 0
        for x, dis in zip(input, y):
            sum += self.loss_function(x, dis)
        return sum"""


    def loss_derivative(self, input:np.ndarray, y:np.ndarray)->np.ndarray:
        """Loss function derivative for a particular x and y"""
        """sum = 0
        for x, dis in zip(input, y):
            sum += self.loss_derivative(x, dis)"""
        return self.loss_function_derivative(input, y)

class Network:
    def __init__(self, layers:List[Layer], learning_rate:float)->None:
        self.layers = layers
        self.learning_rate = learning_rate

    def add(self, layer):
        self.layers.append(layer)

    def compile(self, loss:Loss, input:np.ndarray, y:np.ndarray)->None: #funkcja straty i jej pochodną
        """Define the loss function and loss function derivative"""
        return loss.loss(input, y), loss.loss_derivative(input, y)

    def __call__(self, input:np.ndarray) -> np.ndarray: #funkcje forward, zwraca funkcje wyjścia
        """Forward propagation of input through all layers"""
        for layer in self.layers:
            input = layer.forward(input)
        return input

    def predict(self, input:np.ndarray):
        samples = len(input)
        result = []

        for i in range(samples):
            output = self.__call__(input[i])
            result.append(output)
        return result

    def fit(self,
            x_train:np.ndarray,
            y_train:np.ndarray,
            epochs:int,
            learning_rate:float,
            loss_function:Loss,
            verbose:int=0)->None:
        """Fit the network to the training data"""
        samples = x_train.shape[0]

        for i in range(epochs):
            loss = 0

            for j in range(samples):
                input = x_train[j]
                output = self.__call__(input)

                loss += self.compile(loss_function, output, y_train[j])[0]

                #backward propagation
                loss_d = self.compile(loss_function, output, y_train[j])[1]

                for layer in reversed(self.layers):
                    loss_d = layer.backward(loss_d)

            loss /= samples
            if i == 0 or i == epochs - 1:
                print('epoch %d/%d   error=%f' % (i+1, epochs, loss))


# Solve XOR

In [59]:
x_train = np.array([[[0,0]], [[0,1]], [[1,0]], [[1,1]]])
y_train = np.array([[[0]], [[1]], [[1]], [[0]]])

y_get = [0, 1, 1, 0]

# network
loss = Loss(loss_function, loss_function_derivative)

net = Network([], 0.01)
net.add(FullyConnected(2, 3))

net.add(Tanh())
net.add(FullyConnected(3, 1))
net.add(Tanh())

net.fit(x_train, y_train, 1000, 0.1, loss)


out = net.predict(x_train)

predicted_values = []
for el in out:
    if el >= 0.5:
        predicted_values.append(1)
    else:
        predicted_values.append(0)
print(out)
print(predicted_values)

score_pred = sklearn.metrics.accuracy_score(y_get, predicted_values)
print(f"Accuracy: {score_pred}")

epoch 1/1000   error=0.428239
epoch 1000/1000   error=0.011713
[array([[0.03286256]]), array([[0.851166]]), array([[0.84956767]]), array([[0.02208933]])]
[0, 1, 1, 0]
Accuracy: 1.0


# Eksperymenty

In [65]:
from keras.datasets import mnist
from keras.utils import np_utils

# load MNIST from server
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 28*28)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 28*28)
x_test = x_test.astype('float32')
x_test /= 255
y_test = np_utils.to_categorical(y_test)

loss = Loss(loss_function, loss_function_derivative)

net = Network([], 0.01)
net.add(FullyConnected(28*28, 100))

net.add(Tanh())
net.add(FullyConnected(100, 50))

net.add(Tanh())
net.add(FullyConnected(50, 10))

net.add(Tanh())

#net.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")


Accuracy: 0.73812


# Compare reLU with tanh

In [66]:

from keras.datasets import mnist
from keras.utils import np_utils

# load MNIST from server
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 28*28)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 28*28)
x_test = x_test.astype('float32')
x_test /= 255
y_test = np_utils.to_categorical(y_test)

loss = Loss(loss_function, loss_function_derivative)

#netReLU
netReLU = Network([], 0.01)
netReLU.add(FullyConnected(28*28, 100))

netReLU.add(Tanh())
netReLU.add(FullyConnected(100, 50))

netReLU.add(Tanh())
netReLU.add(FullyConnected(50, 10))

netReLU.add(Tanh())

#netTanh
netTanh = Network([], 0.01)
netTanh.add(FullyConnected(28*28, 100))

netTanh.add(Tanh())
netTanh.add(FullyConnected(100, 50))

netTanh.add(Tanh())
netTanh.add(FullyConnected(50, 10))

netTanh.add(Tanh())

print("reLU: ")
netReLU.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")

print("tanh: ")
netTanh.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")


reLU: 
epoch 1/35   error=0.107785
epoch 35/35   error=0.020956
Accuracy: 0.73812
tanh: 
epoch 1/35   error=0.111987
epoch 35/35   error=0.020459
Accuracy: 0.73812


# 2 small layers

In [67]:
from keras.datasets import mnist
from keras.utils import np_utils

# load MNIST from server
(x_train, y_train), (x_test, y_test) = mnist.load_data()


# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 28*28)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 28*28)
x_test = x_test.astype('float32')
x_test /= 255

y_test = np_utils.to_categorical(y_test)

loss = Loss(loss_function, loss_function_derivative)

net = Network([], 0.01)
net.add(FullyConnected(28*28, 20)) #input layer

net.add(Tanh())
net.add(FullyConnected(20, 20)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(20, 10)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(10, 10)) #output layer

net.add(Tanh())

net.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")

epoch 1/35   error=0.223152
epoch 35/35   error=0.035549
Accuracy: 0.95282


# 2 extensive layers

In [68]:
from keras.datasets import mnist
from keras.utils import np_utils

# load MNIST from server
(x_train, y_train), (x_test, y_test) = mnist.load_data()


# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 28*28)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 28*28)
x_test = x_test.astype('float32')
x_test /= 255

y_test = np_utils.to_categorical(y_test)

loss = Loss(loss_function, loss_function_derivative)

net = Network([], 0.01)
net.add(FullyConnected(28*28, 500)) #input layer

net.add(Tanh())
net.add(FullyConnected(500, 100)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(100, 50)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(50, 10)) #output layer

net.add(Tanh())

net.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")

epoch 1/35   error=0.100090
epoch 35/35   error=0.020416
Accuracy: 0.97016


# 10 extensive layers

In [69]:
from keras.datasets import mnist
from keras.utils import np_utils

# load MNIST from server
(x_train, y_train), (x_test, y_test) = mnist.load_data()


# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 28*28)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 28*28)
x_test = x_test.astype('float32')
x_test /= 255

y_test = np_utils.to_categorical(y_test)

loss = Loss(loss_function, loss_function_derivative)

net = Network([], 0.01)
net.add(FullyConnected(28*28, 500)) #input layer

net.add(Tanh())
net.add(FullyConnected(500, 500)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(500, 500)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(500, 500)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(500, 500)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(500, 500)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(500, 250)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(250, 100)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(100, 50)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(50, 50)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(50, 50)) #hidden layer

net.add(Tanh())
net.add(FullyConnected(50, 10)) #output layer

net.add(Tanh())

net.fit(x_train[0:1000], y_train[0:1000], 35, 0.01, loss)

# test on samples
out = net.predict(x_test)

predicted_values = []
for sample in out:
    for num in sample:
        for el in num:
            if el >= 0.5:
                predicted_values.append(1)
            else:
                predicted_values.append(0)

y_values = []
for sample in y_test:
    for el in sample:
        if el == 1:
            y_values.append(1)
        else:
            y_values.append(0)

score_pred = sklearn.metrics.accuracy_score(y_values, predicted_values)
print(f"Accuracy: {score_pred}")

epoch 1/35   error=0.101556
epoch 35/35   error=0.041005
Accuracy: 0.92151


# Wnioski

1. Zwiększanie liczby neuronów nie zapewnia poprawy wyników, dobór najlepszej ilości neuronów w warstwie ukrytej dokonywany jest w sposób empiryczny.

2. Dobór wartości współczynnika uczenia w sposób znaczący wpływa na działanie sieci, 
zbyt duży współczynnik może spowodować że wyniki będą oscylować wokół oczekiwanej wartości, natomiast zbyt mała wartość spowalnia proces uczenia, wg przeprowadzonych
eksperymentów proponowaną przez nas wartością współczynnika uczenia jest 0.01.

3. W przypadku kiedy ilośc itearcji epochs jest zbyt mała dokładność nie jest zadowalająca, sieć działa poprawnie ( błąd jest znikomy) 
dla epoches w przedziale od 35 do 50 iteracji.

4. Głębokość sieci oraz ilość wejść do neuronów znacznie wpływają na czas działania algorytmu, program działa w akceptowalnym czasie dla wejść nie 
przekraczających 500 wejść w pierwszej warstwie sieci.