En este notebook veremos como implementar Mini Batch, tanto para el uso en redes densas como convolucionales, agregar a lo ultimo la funcion de MaxPooling y AveragePooling para reducir la dimensionalidad de cada una.

Lo primero sera hacer el modelo de Network, con su funcion de activacion, de capas y otras metricas necesarias para el funcionamiento optimo

In [1]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None
    def forward_propagation(self, input):
        raise NotImplementedError
    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError

class FCLayer(Layer):
    def __init__(self, input_size, output_size, lambda_reg=0):
        #np.random.seed(1234)
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5
        self.lambda_reg = lambda_reg
    # Forward propagation
    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output

    # Backward propagation
    def backward_propagation(self, output_error, learning_rate):
        # Compute the gradient with respect to weights and biases
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)

        # Update parameters
        self.weights -= learning_rate * weights_error

        # Sum the output_error over the batch dimension to match bias shape
        self.bias -= learning_rate * np.sum(output_error, axis=0, keepdims=True)

        return input_error

class ActivationLayer:
    def __init__(self, activation_function, activation_prime_function):
        self.activation_function = activation_function
        self.activation_prime_function = activation_prime_function

    def forward_propagation(self, input_data):
        self.input = input_data
        return self.activation_function(self.input)

    def backward_propagation(self, output_error, learning_rate):
        return output_error * self.activation_prime_function(self.input)

class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None

    # add layer to network
    def add(self, layer):
        self.layers.append(layer)

    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime

    def predict(self, input_data):
        result = input_data
        for layer in self.layers:
            result = layer.forward_propagation(result)
        return result

    @staticmethod
    def mse(y_real, y_hat):
        return np.mean(np.power(y_real - y_hat, 2))

    @staticmethod
    def sigmoid(x):
        return 1 / (1 + np.exp(-x))

    @staticmethod
    def sigmoid_prime(x):
        sig = Network.sigmoid(x)  # Calculamos sigmoid(x) para cada elemento del vector
        return sig * (1 - sig)

    @staticmethod
    def relu(x):
        return np.maximum(0, x)

    @staticmethod
    def relu_prime(x):
        # Versión aproximada de ReLu', porque ReLu no es redivable en x=0
        # La derivada de ReLU es 1 si x > 0, y 0 si x <= 0
        return np.where(x > 0, 1, 0)

    @staticmethod
    def mse_prime(y_real, y_hat):
        return 2*(y_hat-y_real)

    # train the network with mini-batch gradient descent
    def fit(self, x_train, y_train, epochs, learning_rate, batch_size):
        samples = len(x_train)

        for i in range(epochs):
            err = 0
            indices = np.arange(samples)
            np.random.shuffle(indices)
            x_train = x_train[indices]
            y_train = y_train[indices]

            for j in range(0, samples, batch_size):
                end = j + batch_size
                if end > samples:
                    end = samples
                x_batch = x_train[j:end]
                y_batch = y_train[j:end]

                # # Check the shape before forward propagation
                # print(f"x_batch shape: {x_batch.shape}")  # This should be (32, 28, 28, 1) for ConvLayer

                # Forward pass
                output = x_batch  # Ensure x_batch is not flattened here
                for layer in self.layers:
                    output = layer.forward_propagation(output)

                # Compute loss (ensure it's scalar)
                batch_err = np.mean(self.loss(y_batch, output))  # Reduce the error to a scalar
                err += batch_err

                # Backward pass
                error = self.loss_prime(y_batch, output)
                for layer in reversed(self.layers):
                    error = layer.backward_propagation(error, learning_rate)

            # Print average error per epoch
            err /= (samples // batch_size)  # Average over samples
            print(f'epoch {i + 1}/{epochs}   error={err/(samples // batch_size):.6f}')


Dentro de este codigo, se encuentra lo siguiente:
    Layer: Viene siendo lo que representa la base de una neurona, que viene siendo el forward propagation y backward propagation, para modificacion de pesos y sesgos, entre otras funciones de una neurona.
    FCLayer: Esto es lo mismo que lo anterior, pero para una red densa, donde se inicializa el peso y sesgo
    ActivationLayer: Funcion de activacion, se le aplica a las salidas de la capa anterior.
    Network: Donde se inicializa la red de neuronas, dentro de ella hay funciones de forward propagation y backward propagation.
    Training (fit): Aqui es donde se genera el loop y el gradiente de mini batch es utilizado, donde itera en el dataset multiples veces (esto se representa en los epochs)
    Loop de Fit: Dentro del loop se encuentran secciones apuntadas a minimizar el mini_batch de cada iteracion, con el objetivo de no entregar el dataset entero de una vez
    Backward propagation: Funcion donde se calculan los pesos y sesgos
    Forward propagation: Funcion que mueve la data a traves de la red
    MSE: Funcion para saber la perdida de cada iteracion.

Ya con el codigo hecho, podemos hacer pruebas con una red densa, para ver su funcionamiento y que tan preciso puede llegar a hacer.    

In [2]:
# Se importa las librerias utilizadas
import numpy as np
from keras.datasets import mnist
from sklearn.preprocessing import MinMaxScaler
from keras.utils import to_categorical # No recuerdo si se podia usar keras, pero asumo que debido a que el profesor la utiilizo, deberia poder utilizarse :)
from sklearn.metrics import confusion_matrix, accuracy_score

# Se carga el set de data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = np.array([x.flatten() for x in X_train], dtype='float64')
X_test = np.array([x.flatten() for x in X_test], dtype='float64')    

# Se normaliza la data, esto debido a que si no tira error.
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

# Se define el tamaño de entrada, en este caso 784 debido a que MNIST se compone de una dimension de 28x28, donde da 784 entradas.
entrada_dim = 784

# Se crea la instancia del Network
model = Network()

lambda_reg = 0.001

# Se agregan capas al modelo
model.add(FCLayer(entrada_dim, 256, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(model.sigmoid, model.sigmoid_prime))
model.add(FCLayer(256, 256, lambda_reg=lambda_reg))
model.add(ActivationLayer(model.sigmoid, model.sigmoid_prime))
model.add(FCLayer(256, 10, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(model.sigmoid, model.sigmoid_prime))

# Se define el tamaño del batch, donde 32 suele ser un numero generalmente utilizado debido a que es un buen punto medio entre efectividad y eficiencia.
batch_size = 32

# Se modifica el learning rate debido a que no se consiguio el aprendizaje deseado, bajandolo de 0.1 a 0.01
# El learning rate afecta que tanto se mueven los pesos y sesgo, valores mas bajos permiten que no se sobreajuste.
model.use(model.mse, model.mse_prime)
model.fit(X_train, y_train, epochs=20, learning_rate=0.01, batch_size=batch_size)

# Se predice en la seccion de testeo
y_hat = model.predict(X_test)

# Se pasan las predicciones a una clase (No se porque exactamente pero supongo que se deben pasar a clase para sacar la precision del modelo)
y_hat_labels = np.argmax(y_hat, axis=1) 
y_test_labels = np.argmax(y_test, axis=1) 

# Se hace la matriz de confusion y se mide la precision del modelo. 
matriz_conf = confusion_matrix(y_test_labels, y_hat_labels)

print('\nMATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test_labels, y_hat_labels)))

# Lo mismo pero en el set de entrenamiento.
y_hat_train = model.predict(X_train)

y_hat_train_labels = np.argmax(y_hat_train, axis=1)
y_train_labels = np.argmax(y_train, axis=1)

# Compute confusion matrix for training set
matriz_conf_train = confusion_matrix(y_train_labels, y_hat_train_labels)

print('\nMATRIZ DE CONFUSIÓN para modelo ANN (entrenamiento)')
print(matriz_conf_train, '\n')
print('La exactitud de ENTRENAMIENTO del modelo ANN es: {:.3f}'.format(accuracy_score(y_train_labels, y_hat_train_labels)))


epoch 1/20   error=0.000014
epoch 2/20   error=0.000007
epoch 3/20   error=0.000005
epoch 4/20   error=0.000005
epoch 5/20   error=0.000004
epoch 6/20   error=0.000004
epoch 7/20   error=0.000003
epoch 8/20   error=0.000003
epoch 9/20   error=0.000003
epoch 10/20   error=0.000003
epoch 11/20   error=0.000002
epoch 12/20   error=0.000002
epoch 13/20   error=0.000002
epoch 14/20   error=0.000002
epoch 15/20   error=0.000002
epoch 16/20   error=0.000002
epoch 17/20   error=0.000002
epoch 18/20   error=0.000002
epoch 19/20   error=0.000001
epoch 20/20   error=0.000001

MATRIZ DE CONFUSIÓN para modelo ANN
[[ 972    0    1    0    0    2    2    1    2    0]
 [   0 1124    1    3    0    0    2    1    4    0]
 [   5    0  999    3    1    1    3    6   14    0]
 [   0    0    7  979    0    8    0    3   10    3]
 [   1    0    3    0  959    0    4    0    1   14]
 [   2    0    0    5    1  871    4    1    5    3]
 [   7    2    1    1    5    4  932    0    6    0]
 [   0    8   11    4

Ahora que sabemos que para las redes densas funciona de manera correcta, podemos probar con redes convolucionales.

In [4]:
# Se hace la clase de la capa de convolucion.
class ConvLayer:
    def __init__(self, input_shape, filter_size, num_filters):
        self.input_shape = input_shape # Forma del input, como altura, ancho.
        self.filter_size = filter_size # El tamaño de la capa de filtro, por ejemplo que cada filtro sea 3x3
        self.num_filters = num_filters # Numero de filtros utilizados
        # Pesos (filtros) y sesgo
        self.filters = np.random.randn(num_filters, input_shape[2], filter_size, filter_size) / filter_size**2
        self.biases = np.zeros((num_filters, 1))

    def forward_propagation(self, input_data):
        self.input = input_data
        # Revisa si se respeta la estructura planteada
        if len(input_data.shape) != 4:
            raise ValueError(f"Expected 4D input, but got {input_data.shape}")
        batch_size, h, w, c = input_data.shape  # Se organiza la informacion de la estructura en el shape(batch_size, height, width, channels)
        output_dim = h - self.filter_size + 1

        # Inicializa la salida con 0s
        self.output = np.zeros((batch_size, output_dim, output_dim, self.num_filters))

        for f in range(self.num_filters):
            for i in range(output_dim):
                for j in range(output_dim):
                    region = input_data[:, i:i+self.filter_size, j:j+self.filter_size, :]

                    self.output[:, i, j, f] = np.sum(region * self.filters[f].transpose(1, 2, 0), axis=(1, 2, 3)) + self.biases[f]


        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # Aqui se puede modificar para las necesidades de cada uno, por ahora solo pasaremos.
        # Se configuran los gradientes para los filtros de peso, sesgo y se actualizan.
        pass

Para que funcione correctamente las capas convolucionales, se deben agregar lo que se denomina Pooling, que es la reduccion de dimensionalidad de cada capa de convolucion, esto con el objetivo de reducir los parametros y por consecuencia la capacidad computacional que se requiere para poder obtener un resultado.

Tambien al eliminar parametros, ayudan a evitar el overfitting, que es cuando hay una gran cantidad de datos que se podrian evitar o eliminar.

Existen dos tipos de pooling, MaxPooling y AveragePooling, la diferencia siendo la siguiente:
    MaxPooling: Toma el valor maximo de una ventana de Pooling, por ejemplo si tenemos una entrada de 2x2 [1, 2, 3, 4] se tomara 4, esto con el objetivo de tomar el dato mas prominente.
    AveragePooling: Toma el valor promedio de una ventana de Pooling, utilizando el ejemplo anterior, [1, 2, 3, 4], se suman los valores y se dividen por la cantidad de valores ((1+2+3+4) / 4) = 2.5, este es menos agresivo y puede llevar a mejores resultados si la data utilizada tiene valores extremos muy altos y distintos, lo que podria llevar a errores en la capa. 

In [5]:
# Se crea la capa de pooling, que sirve para reducir la dimensionalidad y se retiene la informacion importante, utilizamos MaxPooling, que toma el valor mayor para la reduccion y AveragePooling, que toma el valor medio en cambio.
class MaxPoolLayer:
    def __init__(self, pool_size=2, stride=2):
        self.pool_size = pool_size
        self.stride = stride

    def forward_propagation(self, input_data):
        self.input = input_data
        batch_size, h, w, c = input_data.shape
        output_dim = h // self.pool_size
        self.output = np.zeros((batch_size, output_dim, output_dim, c))

        for i in range(output_dim):
            for j in range(output_dim):
                region = input_data[:, i*self.pool_size:(i+1)*self.pool_size, j*self.pool_size:(j+1)*self.pool_size, :]
                self.output[:, i, j, :] = np.max(region, axis=(1, 2))

        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # De igual manera, ahora mismo no utilizamos la propagacion pero se puede implementar luego.
        pass

class AveragePoolLayer:
    def __init__(self, pool_size=2, stride=2):
        self.pool_size = pool_size
        self.stride = stride

    def forward_propagation(self, input_data):
        self.input = input_data
        batch_size, h, w, c = input_data.shape
        output_dim = h // self.pool_size
        self.output = np.zeros((batch_size, output_dim, output_dim, c))

        for i in range(output_dim):
            for j in range(output_dim):
                region = input_data[:, i*self.pool_size:(i+1)*self.pool_size, j*self.pool_size:(j+1)*self.pool_size, :]
                self.output[:, i, j, :] = np.mean(region, axis=(1, 2))  

        return self.output

    def backward_propagation(self, output_error, learning_rate):
        
        pass

Tambien necesitamos una capa para aplanar los datos, debido a que las redes convolucionales utilizan dos dimensiones o mas en los datos y el metodo de fitting que utilizamos solo ocupa 1 dimension, debemos pasar las dimensiones utilizadas a una que pueda usar nuestra red neuronal.

In [6]:
class FlattenLayer:
    def forward_propagation(self, input_data):
        self.input_shape = input_data.shape
        batch_size = input_data.shape[0]
        # flattened_size = np.prod(self.input_shape[1:]) # Se calcula el tamaño una vez aplanado, esto es utilizado para debugear errores
        # print(f"Flattened output size: {flattened_size}") # print utilizado para debugear errores.
        return input_data.reshape(batch_size, -1)  # Aplana y se modifica para la data que le sirve a la capa densa.

    def backward_propagation(self, output_error, learning_rate):
        return output_error.reshape(self.input_shape)

In [7]:
# Solo se haran comentarios en los lugares relevantes, donde no se haya comentado anteriormente.
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Esto esta hecho para MNIST especificamente, lo sabemos porque ocupamos nuevamente la dimension de 28x28
X_train = X_train.reshape(-1, 28, 28, 1)
X_test = X_test.reshape(-1, 28, 28, 1)

# Mucha de esta parte es para resolucion de errores que no se entienden, por lo que se soluciono a base de ChatGPT y Fé
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train.reshape(-1, 28*28)).reshape(-1, 28, 28, 1)
X_test = scaler.transform(X_test.reshape(-1, 28*28)).reshape(-1, 28, 28, 1)

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

# Print utilizado para debugear un error que se encontro anteriormente, que no se hacia la forma de manera correcta.
# print(f"Input shape to ConvLayer: {X_train.shape}")

# Build the model
model = Network()

# Se agregan las capas convolucionales a la red.
model.add(ConvLayer(input_shape=(28, 28, 1), filter_size=3, num_filters=16))
model.add(MaxPoolLayer(pool_size=2, stride=2))  # Pooling
model.add(ConvLayer(input_shape=(13, 13, 16), filter_size=3, num_filters=32)) 
model.add(MaxPoolLayer(pool_size=2, stride=2))  # Pooling

# Se aplana la capa para el funcionamiento correcto anteriormente mencionado.
model.add(FlattenLayer())

# Capas conectadas totalmente, Ocupamos 800 como parametro para las capas
model.add(FCLayer(800, 128)) 
model.add(ActivationLayer(Network.relu, Network.relu_prime))
model.add(FCLayer(128, 64)) 
model.add(ActivationLayer(Network.relu, Network.relu_prime))
model.add(FCLayer(64, 10)) 
model.add(ActivationLayer(Network.sigmoid, Network.sigmoid_prime))


# Se puede reducir los epochs y el tamaño de los batch para que se demore menos, debido a que esta implementacion no ocupa GPU, se va a demorar cerca de 2 horas

print("Se comienza el proceso... Si no da error, dejalo andar")
model.use(Network.mse, Network.mse_prime)
model.fit(X_train, y_train, epochs=20, learning_rate=0.01, batch_size=32)

y_hat = model.predict(X_test)

y_hat_labels = np.argmax(y_hat, axis=1)

y_test_labels = np.argmax(y_test, axis=1)

matriz_conf = confusion_matrix(y_test_labels, y_hat_labels)

print('\nMATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')

accuracy = accuracy_score(y_test_labels, y_hat_labels)
print(f"La exactitud del modelo es: {accuracy:.4f}")

Se comienza el proceso... Si no da error, dejalo andar
epoch 1/20   error=0.000013
epoch 2/20   error=0.000004
epoch 3/20   error=0.000003
epoch 4/20   error=0.000003
epoch 5/20   error=0.000002
epoch 6/20   error=0.000002
epoch 7/20   error=0.000002
epoch 8/20   error=0.000002
epoch 9/20   error=0.000001
epoch 10/20   error=0.000001
epoch 11/20   error=0.000001
epoch 12/20   error=0.000001
epoch 13/20   error=0.000001
epoch 14/20   error=0.000001
epoch 15/20   error=0.000001
epoch 16/20   error=0.000001
epoch 17/20   error=0.000001
epoch 18/20   error=0.000001
epoch 19/20   error=0.000001
epoch 20/20   error=0.000001

MATRIZ DE CONFUSIÓN para modelo ANN
[[ 976    0    1    1    0    0    0    1    1    0]
 [   1 1130    2    2    0    0    0    0    0    0]
 [   3    1 1013    1    3    0    0    8    3    0]
 [   1    0    2  995    0    2    0    3    5    2]
 [   0    0    1    1  972    0    3    0    0    5]
 [   2    0    1   12    0  866    4    1    5    1]
 [   8    2    3   