In [None]:
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

## Backpropagation algorithm
<font size="3">
Feedforward (inferencia):
$$W · x = y_{pred}$$

Backpropagation algorithm (entrenamiento):
1. Inicializo los pesos ($W$) de manera aleatoria.
2. Obtengo una primera predicción (pobre ya que los pesos son aleatorios). Cuanto de "pobre" lo calcula la función de pérdida:$$\displaystyle\sum_{k=1}^n(y_{pred} - y_{train})^2$$
3. En el entrenamiento, como $x$ (la entrada) e $y_{train}$ (la salida) son fijas, solo se pueden modificar los pesos ($W$) para ajustar la red neuronal.
4. Para ajustar $W$ calculamos el gradiente de la función de pérdida ya que el gradiente describe la direción que tomaria $y_{pred}$ al aumentar cualquiera de los valores de $W$: $$grad(\displaystyle\sum_{k=1}^n(y_{pred} - y_{train})^2)$$ $$grad(\displaystyle\sum_{k=1}^n(W·x - y_{train})^2)$$
5. Muevo todos los parámetros de $W$ hacia la dirección opuesta del gradiente: $$W -= learningRate * grad$$
</font>

In [None]:
import logging


class NeuralNetwork:
    def __init__(self, input_layer_size, hidden_layer_size, output_layer_size):
        self.input_layer_size = input_layer_size
        self.hidden_layer_size = hidden_layer_size
        self.output_layer_size = output_layer_size
        self.weights_1 = np.random.randn(self.input_layer_size, self.hidden_layer_size) * np.sqrt(2 / self.input_layer_size)
        self.weights_2 = np.random.randn(self.hidden_layer_size, self.output_layer_size) * np.sqrt(2 / self.hidden_layer_size)

    @staticmethod
    def loss_function(result: np.ndarray, expected: np.ndarray):
        return np.mean((result - expected)**2)
    
    @staticmethod  
    def sigmoid(x: np.ndarray) -> np.ndarray:
            return 1 / (1 + np.exp(-x))
    
    @staticmethod
    def sigmoid_derivative(x: np.ndarray) -> np.ndarray:
        return x * (1 - x)
    
    @staticmethod
    def relu(x: np.ndarray) -> np.ndarray:
        return np.maximum(0, x)
    
    @staticmethod
    def relu_derivative(x: np.ndarray) -> np.ndarray:
        return (x > 0).astype(float)
        
    def feedforward(self, input_data: np.ndarray) -> (np.ndarray, np.ndarray):
        h1 = np.dot(input_data, self.weights_1)
        a1 = self.relu(h1)
        
        logging.info(f"Input data shape: {input_data.shape}")
        logging.info(f"Weights hidden layer (W1): {self.weights_1.shape}")
        logging.info(f"Activation hidden layer (a1): {a1.shape}")
        
        h2 = np.dot(a1, self.weights_2)
        a2 = self.sigmoid(h2)
        
        logging.info(f"Weights output layer (W2): {self.weights_2.shape}")
        logging.info(f"Activation output layer (a2): {a2.shape}")
        
        return a2, a1
        
    def backpropagation(self, x_train: np.ndarray, y_train: np.ndarray, learning_rate):
        #y_train = y_train.reshape(-1, self.output_layer_size)

        # Perform forward pass and get predictions and hidden layer activations
        pred_output, hidden_activations = self.feedforward(input_data=x_train)
        
        # Compute output layer error and delta
        output_error = y_train - pred_output
        print(f"y_train: {y_train}")
        print(f"pred_output: {pred_output}")
        print(f"output_error: {output_error}")
        
        output_delta = output_error * self.sigmoid_derivative(pred_output)
        
        logging.info(f"Output error shape: {output_error.shape}")
        logging.info(f"Output derivative shape: {output_delta.shape}")
        
        # Compute hidden layer error and delta
        hidden_error = np.dot(output_delta, self.weights_2.T)
        hidden_delta = hidden_error * self.relu_derivative(hidden_activations)
        
        logging.info(f"Hidden error shape: {hidden_error.shape}")
        logging.info(f"Hidden derivative shape: {hidden_delta.shape}")
        
        # Update weights
        self.weights_2 += learning_rate * np.dot(hidden_activations.T, output_delta)
        self.weights_1 += learning_rate * np.dot(x_train.T, hidden_delta)
        
        return self.loss_function(pred_output, y_train)     
    
    def train(self, x_train: np.ndarray, y_train: np.ndarray, epochs, learning_rate, print_every: int = 100):
        for epoch in range(epochs):
            loss = self.backpropagation(x_train, y_train, learning_rate=learning_rate)
            if (epoch + 1) % print_every == 0:
                print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss}")
                
    def predict(self, x_test):
        predictions, _ = self.feedforward(x_test)
        return predictions

### TRAINING

In [None]:
digits = datasets.load_digits()
training_data = digits.data # Images 8x8pixels -> 64 | 1797 examples -> ndarray (1797, 64)
label_data = digits.target # Labels -> ndarray (1797, 1)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(training_data, label_data, test_size=0.2, random_state=9)

In [None]:
y_train[0]

In [None]:
plt.figure(1, figsize=(3, 3))
plt.matshow(x_train[0].reshape(8,8), cmap='grey')
plt.show()

In [None]:
def encode_digit(digit: int) -> np.ndarray:
    one_hot = np.zeros((10,))
    one_hot[digit] = 1
    return one_hot

def encode_labels(labels_array: np.ndarray) -> np.ndarray:
    encoded = [encode_digit(label) for label in labels_array]
    return np.array(encoded)

In [None]:
nn = NeuralNetwork(input_layer_size=64, hidden_layer_size=64, output_layer_size=10)

In [None]:
y_train = encode_labels(labels_array=y_train)

In [None]:
nn.train(x_train, y_train, epochs=1000000, learning_rate=1, print_every=1000)

In [None]:
nn.feedforward(input_data=x_train[0].reshape(1, -1))[0]

In [None]:
y_train.shape

In [None]:
x_train[0].reshape(1, -1)