# Zadanie 6


Celem ćwiczenia jest implementacja perceptronu wielowarstwowego oraz wybranego algorytmu optymalizacji gradientowej z algorytmem propagacji wstecznej.

Następnie należy wytrenować perceptron wielowarstwowy do klasyfikacji zbioru danych [MNIST](http://yann.lecun.com/exdb/mnist/). Zbiór MNIST dostępny jest w pakiecie `scikit-learn`.

Punktacja:
1. Implementacja propagacji do przodu (`forward`) [1 pkt]
2. Implementacja wstecznej propagacji (zademonstrowana na bramce XOR) (`backward`) [2 pkt]
3. Przeprowadzenie eksperymentów na zbiorze MNIST, w tym:
    1. Porównanie co najmniej dwóch architektur sieci [1 pkt]
    2. Przetestowanie każdej architektury na conajmniej 3 ziarnach [1 pkt]
    3. Wnioski 1.[5 pkt]
4. Jakość kodu 0.[5 pkt]

Polecane źródła - teoria + intuicja:
1. [Karpathy, CS231n Winter 2016: Lecture 4: Backpropagation, Neural Networks 1](https://www.youtube.com/watch?v=i94OvYb6noo&ab_channel=AndrejKarpathy)
2. [3 Blude one Brown, Backpropagation calculus | Chapter 4, Deep learning
](https://www.youtube.com/watch?v=tIeHLnjs5U8&t=4s&ab_channel=3Blue1Brown)


In [2]:
from abc import abstractmethod, ABC
from typing import List
import numpy as np


In [61]:
class Layer(ABC):
    """Basic building block of the Neural Network"""

    def __init__(self) -> None:
        self._learning_rate = 0.01

    @abstractmethod
    def forward(self, x: np.ndarray) -> np.ndarray:
        """Forward propagation of x through layer"""
        raise NotImplementedError

    @abstractmethod
    def backward(self, output_error_derivative) -> np.ndarray:
        """Backward propagation of output_error_derivative through layer"""
        raise NotImplementedError

    @property
    def learning_rate(self):
        return self._learning_rate

    @learning_rate.setter
    def learning_rate(self, learning_rate):
        assert (
            learning_rate < 1
        ), f"Given learning_rate={learning_rate} is larger than 1"
        assert (
            learning_rate > 0
        ), f"Given learning_rate={learning_rate} is smaller than 0"
        self._learning_rate = learning_rate


class FullyConnected(Layer):
    def __init__(self, input_size: int, output_size: int) -> None:
        super().__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.weights = np.random.uniform(
            low=-1 / np.sqrt(input_size),
            high=1 / np.sqrt(input_size),
            size=(input_size, output_size),
        )
        self.biases = np.random.randn(1, output_size)

    def forward(self, x: np.ndarray) -> np.ndarray:
        self.X = x
        self.Y = self.X @ self.weights + self.biases
        return self.Y

    def backward(self, output_error_derivative) -> np.ndarray:
        input_error = output_error_derivative @ self.weights.T
        weights_error = self.X.T @ output_error_derivative
        # dBias = output_error

        # update parameters
        self.weights -= self.learning_rate * weights_error
        self.biases -= self.learning_rate * output_error_derivative
        return input_error


class Tanh(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, x: np.ndarray) -> np.ndarray:
        self.X = x
        self.Y = np.tanh(x)
        return self.Y

    def backward(self, output_error_derivative) -> np.ndarray:
        return (1 - np.tanh(self.X) ** 2) * output_error_derivative


class Loss:
    def __init__(
        self, loss_function: callable, loss_function_derivative: callable
    ) -> None:
        self.loss_function = loss_function
        self.loss_function_derivative = loss_function_derivative

    def loss(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
        """Loss function for a particular x"""
        return self.loss_function(x, y)

    def loss_derivative(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
        """Loss function derivative for a particular x and y"""
        return self.loss_function_derivative(x, y)


class Network:
    def __init__(self, layers: List[Layer], learning_rate: float = 0.01) -> None:
        self.layers = layers
        self.learning_rate = learning_rate
        if learning_rate != 0.01:
            for layer in self.layers:
                layer.learning_rate = learning_rate

    def compile(self, loss: Loss) -> None:
        """Define the loss function and loss function derivative"""
        self.loss = loss

    def __call__(self, x: np.ndarray) -> np.ndarray:
        """Forward propagation of x through all layers"""
        output = x
        for layer in self.layers:
            output = layer.forward(output)
        return output

    def fit(
        self,
        x_train: np.ndarray,
        y_train: np.ndarray,
        epochs: int,
        learning_rate: float = 0.01,
        verbose: int = 0,
    ) -> None:
        """Fit the network to the training data"""
        number_of_samples = len(x_train)
        if learning_rate != self.learning_rate:
            for layer in self.layers:
                layer.learning_rate = learning_rate
        for i in range(epochs):
            err = 0
            for j in range(number_of_samples):
                output = self(x_train[j])
                err += np.mean(self.loss.loss(output, y_train[j]))
                error = self.loss.loss_derivative(output, y_train[j])
                for layer in reversed(self.layers):
                    error = layer.backward(error)
            if verbose > 0 and (i + 1) % verbose == 0:
                err /= number_of_samples
                print(f"episode number: {i+1}   error={err}")

In [59]:
x_train = np.array([[[0,0]], [[0,1]], [[1,0]], [[1,1]]])
y_train = np.array([[[0]], [[1]], [[1]], [[0]]])

def mse(x:np.ndarray, y:np.ndarray):
    return np.power(y - x, 2)
    
def mse_derivative(x:np.ndarray, y:np.ndarray):
    return 2 * (x - y) / y.size
# network
net = Network([FullyConnected(2, 3), Tanh(), FullyConnected(3, 1), Tanh()], learning_rate=0.01)


# train
net.compile(Loss(mse, mse_derivative))
net.fit(x_train, y_train, epochs=1000, learning_rate=0.1, verbose=100)

# test
for x in x_train:
    out = net(x)
    print(out)



episode number: 100   error=0.23723424308807692
episode number: 200   error=0.009953493899367387
episode number: 300   error=0.0018409468986533004
episode number: 400   error=0.0009443000579354027
episode number: 500   error=0.0006222579087431951
episode number: 600   error=0.0004596501074596519
episode number: 700   error=0.00036246412832132243
episode number: 800   error=0.00029817412288108666
episode number: 900   error=0.0002526537730309262
episode number: 1000   error=0.00021881263756939147
[[0.00092376]]
[[0.97917778]]
[[0.97906637]]
[[-0.00038621]]


(1, 2)

In [116]:
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

def mse(x:np.ndarray, y:np.ndarray):
    return np.power(y - x, 2)
    
def mse_derivative(x:np.ndarray, y:np.ndarray):
    return 2 * (x - y) / y.size

msint = load_digits()
x = np.array(msint.data)
y = np.array(msint.target)

y_temp = np.zeros((len(y), 10))
for array, value in zip(y_temp, y):
    array[value] = 1
y = y_temp

x_train, x_test, y_train, y_test = train_test_split(
    x, y, test_size=0.1, random_state=123
)

net = Network([FullyConnected(64, 50), Tanh(), FullyConnected(50, 30), Tanh(), FullyConnected(30, 10)], learning_rate=0.1)

net.compile(Loss(mse, mse_derivative))
net.fit(np.array(x_train).reshape((x_train.shape[0], 1, 64)), y_train, epochs=300, learning_rate=0.1)

error = 0
for x, y in zip(x_test, y_test):
    out = net(x)
    error += (y - out)**2
print(sum(error[0])/len(y_test))

0.07193758565961478


In [91]:
msint = load_digits()
x = np.array(msint.data)
y = msint.target
x_train, x_test, y_train, y_test = train_test_split(
    x, y, test_size=0.1, random_state=123
)
y_train[70]


6

# Eksperymenty

# Wnioski