Implementación Base


In [227]:
import numpy as np

class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward_propagation(self, input):
        raise NotImplementedError

    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError

class FCLayer(Layer):
    def __init__(self, input_size, output_size, lambda_reg=0):
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5
        self.lambda_reg = lambda_reg

    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)
        weights_error += self.lambda_reg * self.weights
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error
        return input_error

class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime

    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        return self.activation_prime(self.input) * output_error

class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None

    def add(self, layer):
        self.layers.append(layer)

    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime

    def predict(self, input_data):
        if input_data.ndim == 1:
            input_data = np.array([[x] for x in input_data])
        samples = len(input_data)
        result = []
        for i in range(samples):
            output = input_data[i]
            for layer in self.layers:
                output = layer.forward_propagation(output)
            result.append(output)
        return result

    def fit(self, x_train, y_train, epochs, learning_rate):
        if x_train[0].ndim == 1:
            x_train = np.array([[x] for x in x_train])
        samples = len(x_train)
        for i in range(epochs):
            err = 0
            for j in range(samples):
                output = x_train[j]
                for layer in self.layers:
                    output = layer.forward_propagation(output)
                err += self.loss(y_train[j], output)
                error = self.loss_prime(y_train[j], output)
                for layer in reversed(self.layers):
                    error = layer.backward_propagation(error, learning_rate)
            err /= samples
            err = np.mean(err)
            print('epoch %d/%d   error=%f' % (i+1, epochs, err))

def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x)**2

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_prime(x):
    sig = sigmoid(x)
    return sig * (1 - sig)

def relu(x):
    return np.maximum(0, x)

def relu_prime(x):
    return np.where(x > 0, 1, 0)

def leaky_relu(x, alpha=0.01):
    return np.where(x > 0, x, alpha * x)

def leaky_relu_prime(x, alpha=0.01):
    return np.where(x > 0, 1, alpha)

def mse(y_true, y_hat):
    return (y_true - y_hat)**2

def mse_prime(y_true, y_hat):
    return 2 * (y_hat - y_true)

def bce(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true * np.log(y_hat) + (1 - y_true) * np.log(1 - y_hat))

def bce_prime(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true / y_hat) + (1 - y_true) / (1 - y_hat)


Capa de Convolución 

In [228]:
import numpy as np

# La capa de convolución hereda de la clase base, madre de todas las capas
class ConvLayer(Layer):
    def __init__(self, input_shape, kernel_shape, num_kernels, lambda_reg=0):
        self.input_shape = input_shape  # Tamaño de la entrada (alto, ancho, profundidad)
        self.input_depth = input_shape[2]  # Profundidad de la entrada (número de canales)
        self.kernel_shape = kernel_shape  # Tamaño del kernel (alto, ancho)
        self.num_kernels = num_kernels  # Número de kernels (cada uno genera una salida)
        self.lambda_reg = lambda_reg  # Factor de regularización L2
        # La salida tendrá el mismo tamaño de entrada
        # esto no tendría por qué ser así, pero nos ahorra problemas para manejar tamaños entre capas
        self.output_shape = (input_shape[0], input_shape[1], num_kernels) 
        # Inicializamos los pesos de forma aleatoria con rango entre -0.5 y 0.5
        self.weights = np.random.rand(kernel_shape[0], kernel_shape[1], self.input_depth, num_kernels) - 0.5
        self.bias = np.random.rand(num_kernels) - 0.5

    # Función para agregar padding a la entrada
    # es necesario para mantener el tamaño de la matriz
    def pad_input(self, input_matrix, pad):
        # Agregamos padding de ceros alrededor de la matriz de entrada
        return np.pad(input_matrix, ((pad, pad), (pad, pad)), mode='constant', constant_values=0)

    # Correlación 2D
    # la parte previa a la convolución, que además sirve de convolución en fordward prop
    def correlate(self, input_matrix, kernel):
        # Obtenemos las dimensiones de entrada y del kernel
        input_height, input_width = input_matrix.shape
        kernel_height, kernel_width = kernel.shape
        # Calculamos las dimensiones de la salida tras la correlación
        output_height = input_height - kernel_height + 1
        output_width = input_width - kernel_width + 1
        # Inicializamos la salida como una matriz de ceros
        output = np.zeros((output_height, output_width))

        # Recorremos la entrada y aplicamos la correlación
        for i in range(output_height):
            for j in range(output_width):
                # Las dos iteraciones controlan el movimiento de una ventana que recorre
                # a lo ancho y a lo alto la matriz original, extayendo submatrices
                sub_matrix = input_matrix[i:i+kernel_height, j:j+kernel_width]  # Seleccionamos la submatriz de la entrada
                output[i, j] = np.sum(sub_matrix * kernel)  # Producto punto entre la submatriz y el kernel

        return output

    # Convolución 2D (correlación con kernel volteado)
    # usaremos la convolución en la etapa de backward prop
    def convolve(self, input_matrix, kernel):
        # Volteamos el kernel en ambas direcciones (filas y columnas)
        flipped_kernel = np.flipud(np.fliplr(kernel))
        # Aplicamos la correlación con el kernel volteado
        return self.correlate(input_matrix, flipped_kernel)

    def forward_propagation(self, input_data):
        # Guardamos la entrada para usarla en backward_propagation
        self.input = input_data
        # Inicializamos la salida con ceros
        self.output = np.zeros(self.output_shape)

        # Calculamos el padding en función del tamaño del kernel
        pad_h = self.kernel_shape[0] // 2
        pad_w = self.kernel_shape[1] // 2

        # Recorremos cada filtro en la profundidad de la capa
        for k in range(self.num_kernels):
            for d in range(self.input_depth):
                # Agregamos padding a la entrada actual (canal d)
                padded_input = self.pad_input(self.input[:,:,d], pad_h)
                # Realizamos la correlación y obtenemos la salida parcial
                correlated_output = self.correlate(padded_input, self.weights[:,:,d,k])
                # Sumamos el resultado de la correlación al acumulador de la salida
                self.output[:,:,k] += correlated_output[:self.output_shape[0], :self.output_shape[1]]  
            # Añadimos el bias al filtro actual
            self.output[:,:,k] += self.bias[k]

        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # Inicializamos los errores de entrada y de pesos como matrices de ceros
        in_error = np.zeros(self.input_shape)
        weights_error = np.zeros((self.kernel_shape[0], self.kernel_shape[1], self.input_depth, self.num_kernels))
        dBias = np.zeros(self.num_kernels)

        # Calculamos el padding basado en el tamaño del kernel
        pad_h = self.kernel_shape[0] // 2
        pad_w = self.kernel_shape[1] // 2

        # Con cada kernel recorreremos cada capa de la entrada. 
        for k in range(self.num_kernels):
            for d in range(self.input_depth):
                # Aplicamos padding al error de salida antes de convolucionarlo con los pesos
                padded_output_error = self.pad_input(output_error[:,:,k], pad_h)
                # Realizamos la convolución inversa y la sumamos al error de entrada
                convolved_error = self.convolve(padded_output_error, self.weights[:,:,d,k])
                in_error[:,:,d] += convolved_error[:self.input_shape[0], :self.input_shape[1]]  
                # Calculamos el error de los pesos usando correlación entre entrada y error
                weights_error[:,:,d,k] = self.correlate(self.input[:,:,d], output_error[:,:,k])
            # Sumamos el error del bias en la salida actual
            dBias[k] = np.sum(output_error[:,:,k])  

        # Aplicamos la regularización L2 sobre los errores de los pesos
        weights_error += self.lambda_reg * self.weights

        # Actualizamos los pesos y los sesgos usando el gradiente descendente
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * dBias

        # Retornamos el error de la entrada para la siguiente capa
        return in_error


In [229]:
class FlattenLayer(Layer):
    
    def forward_propagation(self, input_data):
        self.input = input_data
        # Aplanamos la entrada
        self.output = input_data.flatten().reshape((1,-1))
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # Remodelamos el error al tamaño original de la entrada
        return output_error.reshape(self.input.shape)


Aplicación de Mini Batch

In [230]:
class MiniBatchNetwork(Network):
    def fit(self, x_train, y_train, epochs, learning_rate, batch_size):
        if x_train[0].ndim == 1:
            x_train = np.array([[x] for x in x_train])
        
        samples = len(x_train)
        for i in range(epochs):
            err = 0
            indices = np.arange(samples)
            np.random.shuffle(indices)
            x_train = x_train[indices]
            y_train = y_train[indices]
            
            for start in range(0, samples, batch_size):
                end = min(start + batch_size, samples)
                x_batch = x_train[start:end]
                y_batch = y_train[start:end]
                
                batch_err = 0
                
                for j in range(len(x_batch)):
                    output = x_batch[j]
                    for layer in self.layers:
                        output = layer.forward_propagation(output)
                    
                    batch_err += self.loss(y_batch[j], output)
                    error = self.loss_prime(y_batch[j], output)
                    
                    for layer in reversed(self.layers):
                        error = layer.backward_propagation(error, learning_rate)
    
                batch_err /= len(x_batch)
                err += batch_err
    
            err /= (samples / batch_size)
            err = np.mean(err)
          

            print('epoch %d/%d   error=%f' % (i+1, epochs, err))


Prueba con MNIST 

In [231]:
from keras.datasets import mnist
import random
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# No necesitamos tantos datos.
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X, y = zip(*random.sample(list(zip(X_train, y_train)), 2000))

# Sí necesitamos que la forma de X sea la de un vector, en lugar de una matriz. 
X, y = np.array(X, dtype='float64'), np.array(y, dtype='float64')
X = np.reshape(X, (X.shape[0], -1))

# Normalizamos Min-Max
X= MinMaxScaler().fit_transform(X)

# Dividimos la muestra en dos, una para entrenar y otra para testing, como tenemos 
# muestra de sobra nos damos el lujo de testear con la misma cantidad que entrenamos.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=123)

# Necesitamos que y_train sea un valor categórico, en lugar de un dígito entero.
y_train_value = y_train # Guardaremos y_train como valor para un observación más abajo.
from keras.utils import to_categorical
y_train = to_categorical(y_train)

Red densa

In [232]:
from sklearn.metrics import confusion_matrix, accuracy_score
start_time = time.time()

# Necesitamos identificar cuántos nodos tiene nuestra entrada, y eso depende del tamaño de X.
entrada_dim = len(X_train[0])

# Crear instancia de MiniBatchNetwork
modelMiniBatch = MiniBatchNetwork()

# Agregamos capas al modelo
modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))
modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))
modelMiniBatch.add(FCLayer(64, 10)) 
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

# Asignamos función de pérdida
modelMiniBatch.use(bce, bce_prime)

# Entrenamos el modelo con datos de entrenamiento
modelMiniBatch.fit(X_train, y_train, epochs=3, learning_rate=0.1, batch_size=16)

# Usamos el modelo para predecir sobre los datos de prueba (validación)
y_hat = modelMiniBatch.predict(X_test)

# Transformamos la salida en un vector one-hot encoded
# Asumimos que y_hat tiene una forma de (num_samples, 10) donde cada fila es la salida de 10 nodos
y_hat = np.array([np.argmax(output) for output in y_hat])  # Obtenemos el índice del nodo con mayor activación

# Reportamos los resultados del modelo
 
print(f"Tiempo total de ejecución: {execution_time:.2f} segundos") 

print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))


epoch 1/3   error=0.003153
epoch 2/3   error=0.001664
epoch 3/3   error=0.001190
MATRIZ DE CONFUSIÓN para modelo ANN
[[90  0  1  1  0  1  3  1  0  1]
 [ 0 94  4  4  1  2  2  4  0  2]
 [ 3  6 37 18  5  1 25  4  4  6]
 [ 7  5  4 68  2  0  2  5  0  3]
 [ 5  3  1  4 56  2  9  7  4 10]
 [10  4  9 30 10  1 11  4  7 11]
 [ 8  3  5  5  4  0 68  1  1  3]
 [ 3  3  4  1  8  0  1 67  1  3]
 [ 6 10 12 15 15  2  9  5 10 13]
 [ 3  1  5  5 34  0  3 24  3 22]] 

La exactitud de testeo del modelo ANN es: 0.806


Prueba con Convolucional

In [234]:
from keras.datasets import mnist
import random
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import matplotlib.pyplot as plt
import tabulate

# No necesitamos tantos datos.
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X, y = zip(*random.sample(list(zip(X_train, y_train)), 2000))

# Sí necesitamos que la forma de X sea la de un vector, en lugar de una matriz. 
X, y = np.array(X, dtype='float64'), np.array(y, dtype='float64')
#X = np.reshape(X, (X.shape[0], -1)) # YA NO QUEREMOS TRANSFORMAR EN VECTO X
# AHORA NOS SERVIRÁ QUE SEA UNA MATRIZ, LA PROCESAREMOS CON CAPA DE CONVOLUCIÓN.

# APLICAREMOS UN RESHAPE Y NORMALIZACIÓN MÁS AD-HOC PARA ESTE CASO
X = X.reshape(X.shape[0], 28, 28, 1)

# Dividimos la muestra en dos, una para entrenar y otra para testing, como tenemos 
# muestra de sobra nos damos el lujo de testear con la misma cantidad que entrenamos.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=123)

# Necesitamos que y_train sea un valor categórico, en lugar de un dígito entero.
y_train_value = y_train # Guardaremos y_train como valor para un observación más abajo.
from keras.utils import to_categorical
y_train = to_categorical(y_train)

# ESTANDARIZACION MANUAL: Habíamos usado MinMax, pero
# no nos servirá al usar red de convolución, que necesita recibir una grilla.
# Además, preferimos usar estandarización (prob normal) en lugar de simple escalado
# Calculamos la media y la desviación estándar
mean = np.mean(X_train)
std = np.std(X_train)
# Estandarizamos calculando el estadístico Z, que reemplaza a los datos originales.
X_train = (X_train - mean) / std
# La misma transformación aplica a los datos de testeo.
# Notese que la media y desviación son los que calculamos para la
# muestra de entrenamiento. O sea, los datos de prueba son observaciones
# limpias, que no siquiera influyeron en la normalización de los datos. 
X_test = (X_test - mean) / std


In [235]:
import time

#Esto se va a demorar, registremos cuanto. 
inicio = time.time()

# Crear instancia de Network
model = MiniBatchNetwork()

# Cantidad y Tamaño del kernel
kernels = 8
kernel_size = 3

# Tamaño / forma de la entrada
input_shape = X_train[0].shape

# Parámetro regularizados
lambda_reg = 0.001

# Agregamos capas al modelo
model.add(ConvLayer(input_shape, (kernel_size, kernel_size), kernels, lambda_reg=lambda_reg)) # Capa de Convolución con un kernels
model.add(ActivationLayer(relu, relu_prime))
model.add(FlattenLayer())  # Capa de Flatten para aplanar la salida
model.add(FCLayer(input_shape[0]*input_shape[1]*kernels, 128, lambda_reg=lambda_reg))
model.add(ActivationLayer(tanh, tanh_prime))
model.add(FCLayer(128, 10, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(sigmoid, sigmoid_prime))

# Asignamos función de pérdida
model.use(mse, mse_prime)

# Entrenamos el modelo con datos de entrenamiento
model.fit(X_train, y_train, epochs=10, learning_rate=0.01, batch_size=32) 

# Usamos el modelo para predecir sobre los datos de prueba (validación)
y_hat = model.predict(X_test)

# Transformamos la salida en un vector one-hot encoded, es decir 0s y un 1. 
for i in range(len(y_hat)):
    y_hat[i] = np.argmax(y_hat[i][0])

# Reportamos los resultados del modelo
matriz_conf = confusion_matrix(y_test, y_hat)

print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf,'\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test,y_hat)))
fin = time.time() 
duración = fin - inicio
print('Se demoró {:.2f} minutos.'.format(duración/60))

epoch 1/10   error=0.003332
epoch 2/10   error=0.002481
epoch 3/10   error=0.002026
epoch 4/10   error=0.001671
epoch 5/10   error=0.001562
epoch 6/10   error=0.001320
epoch 7/10   error=0.001225
epoch 8/10   error=0.001111
epoch 9/10   error=0.001023
epoch 10/10   error=0.000961
MATRIZ DE CONFUSIÓN para modelo ANN
[[76  0  1  1  0  6  2  0  3  0]
 [ 0 95  5  1  0  0  1  1  0  0]
 [ 3  3 80  7  1  0  3  1  1  1]
 [ 0  1  7 69  0 15  1  3  4  5]
 [ 1  2  3  0 49  0 10  2  3 25]
 [ 0  5 12  8  0 66  4  4  3  1]
 [ 3  2  2  0  1  3 96  0  0  0]
 [ 1  4  2  0  0  0  0 84  1  5]
 [ 0  5  9 13  0  7  2  0 68  7]
 [ 0  2  3  1  3  0  0  4  5 72]] 

La exactitud de testeo del modelo ANN es: 0.755
Se demoró 12.97 minutos.


Capa Max Pooling

In [267]:
class MaxPoolingLayer(Layer):
    def __init__(self, pool_size, stride):
        self.pool_size = pool_size  
        self.stride = stride  

    def forward_propagation(self, input_data):
        """
        Realiza la operación de max pooling sobre la entrada.
        Genera una salida reducida en función del tamaño del pool y el stride.
        """
        self.input = input_data
        input_height, input_width, input_depth = input_data.shape

        output_height = (input_height - self.pool_size[0]) // self.stride + 1
        output_width = (input_width - self.pool_size[1]) // self.stride + 1
        self.output = np.zeros((output_height, output_width, input_depth))

        for d in range(input_depth):
            for i in range(0, output_height):
                for j in range(0, output_width):
                    h_start = i * self.stride
                    h_end = h_start + self.pool_size[0]
                    w_start = j * self.stride
                    w_end = w_start + self.pool_size[1]
                    # Extracción de la ventana y selección de valor máximo
                    self.output[i, j, d] = np.max(input_data[h_start:h_end, w_start:w_end, d])

        return self.output

    def backward_propagation(self, output_error, learning_rate=None):
        """
        Calcula el error en la entrada basado en el error de salida.
        Propaga el error hacia atrás a las posiciones donde se encontraban los máximos.
        """
        input_height, input_width, input_depth = self.input.shape
        in_error = np.zeros(self.input.shape)

        for d in range(input_depth):
            for i in range(0, self.output.shape[0]):
                for j in range(0, self.output.shape[1]):
                    h_start = i * self.stride
                    h_end = h_start + self.pool_size[0]
                    w_start = j * self.stride
                    w_end = w_start + self.pool_size[1]
                    # Selección del valor máximo para la propagación del error
                    max_index = np.unravel_index(np.argmax(self.input[h_start:h_end, w_start:w_end, d]), (self.pool_size[0], self.pool_size[1]))
                    # Propagación del error donde se encontraba el máximo
                    in_error[h_start + max_index[0], w_start + max_index[1], d] += output_error[i, j, d]

        return in_error


Capa de AveragePooling

In [188]:
import numpy as np

class AveragePoolingLayer:
    def __init__(self, pool_size, stride):
        self.pool_size = pool_size  
        self.stride = stride  

    def forward_propagation(self, input_data):
        """
        Realiza la operación de average pooling sobre la entrada.
        Genera una salida reducida en función del tamaño del pool y el stride.
        """
        self.input = input_data  
 
        output_height = (input_data.shape[0] - self.pool_size[0]) // self.stride + 1
        output_width = (input_data.shape[1] - self.pool_size[1]) // self.stride + 1
        output_depth = input_data.shape[2]  
        self.output = np.zeros((output_height, output_width, output_depth))
        
        for h in range(output_height):
            for w in range(output_width):
                for d in range(output_depth):
                    h_start = h * self.stride
                    w_start = w * self.stride
                    # Extraemos la ventana actual del input
                    window = input_data[h_start:h_start + self.pool_size[0], w_start:w_start + self.pool_size[1], d]
                    # Calculamos el promedio de la ventana y lo almacenamos en la salida
                    self.output[h, w, d] = np.mean(window)

        return self.output

    def backward_propagation(self, output_error, learning_rate=None):
        """
        Calcula el error en la entrada basado en el error de salida.
        Propaga el error hacia atrás promediando entre los elementos de la ventana.
        """
        in_error = np.zeros(self.input.shape)

        output_height = output_error.shape[0]
        output_width = output_error.shape[1]
        
        for h in range(output_height):
            for w in range(output_width):
                for d in range(output_error.shape[2]):
                    h_start = h * self.stride
                    w_start = w * self.stride
                    # Distribuimos el error de salida entre los elementos de la ventana
                    for i in range(self.pool_size[0]):
                        for j in range(self.pool_size[1]):
                            in_error[h_start + i, w_start + j, d] += output_error[h, w, d] / (self.pool_size[0] * self.pool_size[1])

        return in_error


Con maxpooling

In [187]:
import time
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score

inicio = time.time()

model = MiniBatchNetwork()

kernels = 8
kernel_size = 3

input_shape = X_train[0].shape

lambda_reg = 0.001

model.add(ConvLayer(input_shape, (kernel_size, kernel_size), kernels, lambda_reg=lambda_reg))  
model.add(ActivationLayer(relu, relu_prime))


#Capa de max pooling 2x2
pool_size = (2, 2)  
stride = 2  
model.add(MaxPoolingLayer(pool_size, stride))  

model.add(FlattenLayer())  
model.add(FCLayer(input_shape[0] * input_shape[1] * kernels // 4, 128, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(tanh, tanh_prime))
model.add(FCLayer(128, 10, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(sigmoid, sigmoid_prime))

model.use(mse, mse_prime)

model.fit(X_train, y_train, epochs=10, learning_rate=0.01, batch_size=32) 

y_hat = model.predict(X_test)

for i in range(len(y_hat)):
    y_hat[i] = np.argmax(y_hat[i][0])

matriz_conf = confusion_matrix(y_test, y_hat)

print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))
fin = time.time() 
duración = fin - inicio
print('Se demoró {:.2f} minutos.'.format(duración / 60))


epoch 1/10   error=0.003132
epoch 2/10   error=0.002238
epoch 3/10   error=0.001771
epoch 4/10   error=0.001534
epoch 5/10   error=0.001374
epoch 6/10   error=0.001160
epoch 7/10   error=0.000887
epoch 8/10   error=0.000665
epoch 9/10   error=0.000596
epoch 10/10   error=0.000528
MATRIZ DE CONFUSIÓN para modelo ANN
[[ 91   0   0   1   0   4   1   0   0   1]
 [  0 104   1   0   0   3   0   0   3   2]
 [  3   1  85   5   3   1   5   0   3   3]
 [  0   0   2  75   0  13   0   0   4   2]
 [  2   0   0   0  87   2   3   1   2   4]
 [  2   0   0   7   1  79   2   0   5   1]
 [  2   0   1   0   4   3  86   0   1   1]
 [  0   1   2   1   3   1   0  73   0  10]
 [  2   1   0   5   2   9   7   1  64   6]
 [  0   0   0   2   5   5   1   7   1  79]] 

La exactitud de testeo del modelo ANN es: 0.823
Se demoró 14.26 minutos.


Con AveragePooling

In [189]:
import time
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score

inicio = time.time()

model = MiniBatchNetwork()

kernels = 8
kernel_size = 3

input_shape = X_train[0].shape

lambda_reg = 0.001

model.add(ConvLayer(input_shape, (kernel_size, kernel_size), kernels, lambda_reg=lambda_reg))  
model.add(ActivationLayer(relu, relu_prime))


#Capa de average pooling 2x2
pool_size = (2, 2)  
stride = 2  
model.add(AveragePoolingLayer(pool_size, stride))  

model.add(FlattenLayer())  
model.add(FCLayer(input_shape[0] * input_shape[1] * kernels // 4, 128, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(tanh, tanh_prime))
model.add(FCLayer(128, 10, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(sigmoid, sigmoid_prime))

model.use(mse, mse_prime)

model.fit(X_train, y_train, epochs=10, learning_rate=0.01, batch_size=32) 

y_hat = model.predict(X_test)

for i in range(len(y_hat)):
    y_hat[i] = np.argmax(y_hat[i][0])

matriz_conf = confusion_matrix(y_test, y_hat)

print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))
fin = time.time() 
duración = fin - inicio
print('Se demoró {:.2f} minutos.'.format(duración / 60))


epoch 1/10   error=0.003026
epoch 2/10   error=0.002382
epoch 3/10   error=0.002117
epoch 4/10   error=0.001819
epoch 5/10   error=0.001396
epoch 6/10   error=0.001196
epoch 7/10   error=0.001018
epoch 8/10   error=0.000859
epoch 9/10   error=0.000694
epoch 10/10   error=0.000587
MATRIZ DE CONFUSIÓN para modelo ANN
[[ 94   0   2   0   0   1   0   0   1   0]
 [  0 110   1   0   0   1   1   0   0   0]
 [  0   3  98   1   1   0   3   0   3   0]
 [  4   3   9  56   1  10   1   1   5   6]
 [  2   0   4   0  80   0   2   1   2  10]
 [  2   0   0   4   2  78   3   0   6   2]
 [  3   0   3   0   2   3  83   1   1   2]
 [  1   1   4   0   2   0   0  75   0   8]
 [  2   2   2   1   2   5   7   0  70   6]
 [  2   0   5   0   9   0   0   4   4  76]] 

La exactitud de testeo del modelo ANN es: 0.820
Se demoró 14.33 minutos.


Implementación técnicas de regularizacion

DropOut Layer


In [193]:
import numpy as np

class DropoutLayer:
    def __init__(self, rate):
        self.rate = rate  # Tasa de Dropout (por ejemplo, 0.5 significa 50% de desactivación)

    def forward_propagation(self, input_data, training=True):
        """
        Durante la propagación hacia adelante, si estamos en modo de entrenamiento,
        se desactivan aleatoriamente las neuronas según la tasa de Dropout.
        """
        self.input = input_data
        if training:
            self.mask = np.random.binomial(1, 1 - self.rate, size=input_data.shape)  
            return self.input * self.mask  
        else:
            return self.input 

    def backward_propagation(self, output_error, learning_rate=None):
        """
        Propaga el error de vuelta utilizando la misma máscara que se utilizó en la
        propagación hacia adelante.
        """
        return output_error * self.mask  


In [197]:
inicio = time.time()

model = MiniBatchNetwork()

kernels = 8
kernel_size = 3

input_shape = X_train[0].shape

lambda_reg = 0.001

model.add(ConvLayer(input_shape, (kernel_size, kernel_size), kernels, lambda_reg=lambda_reg))  
model.add(ActivationLayer(relu, relu_prime))

# Capa de average pooling 2x2
pool_size = (2, 2)  
stride = 2  
model.add(AveragePoolingLayer(pool_size, stride))  

model.add(FlattenLayer())

#Capa de DropOut
dropout_rate = 0.5  #Lobotomizando mi red neuronal
model.add(DropoutLayer(dropout_rate))

model.add(FCLayer(input_shape[0] * input_shape[1] * kernels // 4, 128, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(tanh, tanh_prime))
model.add(FCLayer(128, 10, lambda_reg=lambda_reg)) 
model.add(ActivationLayer(sigmoid, sigmoid_prime))

model.use(mse, mse_prime)

# Entrenar el modelo
model.fit(X_train, y_train, epochs=10, learning_rate=0.01, batch_size=32) 

y_hat = model.predict(X_test)

for i in range(len(y_hat)):
    y_hat[i] = np.argmax(y_hat[i][0])

matriz_conf = confusion_matrix(y_test, y_hat)

print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))
fin = time.time() 
duración = fin - inicio
print('Se demoró {:.2f} minutos.'.format(duración / 60))

epoch 1/10   error=0.003831
epoch 2/10   error=0.003219
epoch 3/10   error=0.003112
epoch 4/10   error=0.003035
epoch 5/10   error=0.002902
epoch 6/10   error=0.002797
epoch 7/10   error=0.002607
epoch 8/10   error=0.002487
epoch 9/10   error=0.002325
epoch 10/10   error=0.002290
MATRIZ DE CONFUSIÓN para modelo ANN
[[90  0  1  1  0  1  3  1  0  1]
 [ 0 94  4  4  1  2  2  4  0  2]
 [ 3  6 37 18  5  1 25  4  4  6]
 [ 7  5  4 68  2  0  2  5  0  3]
 [ 5  3  1  4 56  2  9  7  4 10]
 [10  4  9 30 10  1 11  4  7 11]
 [ 8  3  5  5  4  0 68  1  1  3]
 [ 3  3  4  1  8  0  1 67  1  3]
 [ 6 10 12 15 15  2  9  5 10 13]
 [ 3  1  5  5 34  0  3 24  3 22]] 

La exactitud de testeo del modelo ANN es: 0.513
Se demoró 14.28 minutos.


Ruido a los pesos

In [236]:
class MiniBatchNetworkNoise(Network):
    def fit(self, x_train, y_train, epochs, learning_rate, batch_size, weight_noise_std=0.01):
        if x_train[0].ndim == 1:
            x_train = np.array([[x] for x in x_train])
        
        samples = len(x_train)
        for i in range(epochs):
            err = 0
            indices = np.arange(samples)
            np.random.shuffle(indices)
            x_train = x_train[indices]
            y_train = y_train[indices]
            
            for start in range(0, samples, batch_size):
                end = min(start + batch_size, samples)
                x_batch = x_train[start:end]
                y_batch = y_train[start:end]
                
                batch_err = 0
                
                for j in range(len(x_batch)):
                    output = x_batch[j]
                    for layer in self.layers:
                        output = layer.forward_propagation(output)
                    
                    batch_err += self.loss(y_batch[j], output)
                    error = self.loss_prime(y_batch[j], output)
                    
                    for layer in reversed(self.layers):
                        error = layer.backward_propagation(error, learning_rate)

                # Inyección de ruido a los pesos
                for layer in self.layers:
                    if hasattr(layer, 'weights'):
                        noise = np.random.normal(0, weight_noise_std, layer.weights.shape)
                        layer.weights += noise

                batch_err /= len(x_batch)
                err += batch_err
    
            err /= (samples / batch_size)
            err = np.mean(err)
            print('epoch %d/%d   error=%f' % (i+1, epochs, err))



In [237]:
from keras.datasets import mnist
import random
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical

(X_train, y_train), (X_test, y_test) = mnist.load_data()
X, y = zip(*random.sample(list(zip(X_train, y_train)), 2000))

X, y = np.array(X, dtype='float64'), np.array(y, dtype='float64')
X = np.reshape(X, (X.shape[0], -1))

X = MinMaxScaler().fit_transform(X)


X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.5, random_state=123)

X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=123)

y_train_value = y_train  
y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_test = to_categorical(y_test)



Sin Ruido

In [249]:
from sklearn.metrics import confusion_matrix, accuracy_score

entrada_dim = len(X_train[0])

modelMiniBatch = MiniBatchNetwork()

modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))
modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))
modelMiniBatch.add(FCLayer(64, 10)) 
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

modelMiniBatch.use(bce, bce_prime)

modelMiniBatch.fit(X_train, y_train, epochs=10, learning_rate=0.1, batch_size=16)

y_val_hat = modelMiniBatch.predict(X_val)


y_val_hat = np.array([np.argmax(output) for output in y_val_hat])  
matriz_conf_val = confusion_matrix(np.argmax(y_val, axis=1), y_val_hat)


print('La exactitud de validación del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_val, axis=1), y_val_hat)))

y_test_hat = modelMiniBatch.predict(X_test)

y_test_hat = np.array([np.argmax(output) for output in y_test_hat]) 

matriz_conf_test = confusion_matrix(np.argmax(y_test, axis=1), y_test_hat)


print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_test, axis=1), y_test_hat)))


epoch 1/10   error=0.004521
epoch 2/10   error=0.002387
epoch 3/10   error=0.001676
epoch 4/10   error=0.001266
epoch 5/10   error=0.000919
epoch 6/10   error=0.000735
epoch 7/10   error=0.000483
epoch 8/10   error=0.000334
epoch 9/10   error=0.000259
epoch 10/10   error=0.000152
La exactitud de validación del modelo ANN es: 0.855
La exactitud de testeo del modelo ANN es: 0.859


Con Ruido


In [254]:

from sklearn.metrics import confusion_matrix, accuracy_score

# Necesitamos identificar cuántos nodos tiene nuestra entrada, y eso depende del tamaño de X.
entrada_dim = len(X_train[0])

# Crear instancia de MiniBatchNetwork
modelMiniBatch = MiniBatchNetworkNoise()

# Agregamos capas al modelo
modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))
modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))
modelMiniBatch.add(FCLayer(64, 10)) 
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

modelMiniBatch.use(bce, bce_prime)

modelMiniBatch.fit(X_train, y_train, epochs=10, learning_rate=0.1, batch_size=16)

y_val_hat = modelMiniBatch.predict(X_val)


y_val_hat = np.array([np.argmax(output) for output in y_val_hat])  

matriz_conf_val = confusion_matrix(np.argmax(y_val, axis=1), y_val_hat)


print('La exactitud de validación del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_val, axis=1), y_val_hat)))

y_test_hat = modelMiniBatch.predict(X_test)

y_test_hat = np.array([np.argmax(output) for output in y_test_hat])  # Obtenemos el índice del nodo con mayor activación

matriz_conf_test = confusion_matrix(np.argmax(y_test, axis=1), y_test_hat)


print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_test, axis=1), y_test_hat)))

epoch 1/10   error=0.210534
epoch 2/10   error=0.114835
epoch 3/10   error=0.076533
epoch 4/10   error=0.056343
epoch 5/10   error=0.046522
epoch 6/10   error=0.042060
epoch 7/10   error=0.031181
epoch 8/10   error=0.025354
epoch 9/10   error=0.012372
epoch 10/10   error=0.012762
La exactitud de validación del modelo ANN es: 0.835
La exactitud de testeo del modelo ANN es: 0.847


Comparación con DropOut Layer

In [252]:
from sklearn.metrics import confusion_matrix, accuracy_score

entrada_dim = len(X_train[0])

modelMiniBatch = MiniBatchNetwork()

dropout_rate = 0.5  #Lobotomizando mi red neuronal
model.add(DropoutLayer(dropout_rate))


modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))
modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))
modelMiniBatch.add(FCLayer(64, 10)) 
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

modelMiniBatch.use(bce, bce_prime)

modelMiniBatch.fit(X_train, y_train, epochs=10, learning_rate=0.1, batch_size=16)

y_val_hat = modelMiniBatch.predict(X_val)


y_val_hat = np.array([np.argmax(output) for output in y_val_hat])  

matriz_conf_val = confusion_matrix(np.argmax(y_val, axis=1), y_val_hat)


print('La exactitud de validación del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_val, axis=1), y_val_hat)))

y_test_hat = modelMiniBatch.predict(X_test)

y_test_hat = np.array([np.argmax(output) for output in y_test_hat])  

matriz_conf_test = confusion_matrix(np.argmax(y_test, axis=1), y_test_hat)


print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(np.argmax(y_test, axis=1), y_test_hat)))

epoch 1/10   error=0.003938
epoch 2/10   error=0.002183
epoch 3/10   error=0.001462
epoch 4/10   error=0.001055
epoch 5/10   error=0.000757
epoch 6/10   error=0.000527
epoch 7/10   error=0.000321
epoch 8/10   error=0.000188
epoch 9/10   error=0.000086
epoch 10/10   error=0.000046
La exactitud de validación del modelo ANN es: 0.875
La exactitud de testeo del modelo ANN es: 0.864


In [255]:
import numpy as np  
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import MinMaxScaler

# Una vez más cargamos y procesamos el Titanic
file = open("Titanic.csv", "r")
titanic = file.read()
file.close()
titanic = titanic.split("\n")
for j in range(len(titanic)):
    titanic[j] = titanic[j].split(",")

# El nombre tiene ',' así que necesitamos volver a unirlo.
for elemento in titanic[1:]:
    elemento[3] = elemento[3]+elemento[4]
    del elemento[4]
titanic = np.array(titanic)
titanic = np.delete(titanic, [3,8,10,11], axis=1)

# Codificamos género como 0-1
for linea in titanic[1:]:
    linea[3] = '0' if linea[3] == 'male' else '1'

# Quitamos las observaciones con datos perdidos
borrar = []
for l in range(len(titanic)):
    if '' in titanic[l]:
        borrar.append(l)
titanic = np.delete(titanic, borrar, axis=0)

# Arreglamos vector y matriz como array
y = titanic[:,1][1:]
X = titanic[:,2:][1:]
X, y = np.array(X, dtype='float64'), np.array(y, dtype='float64')

# Aplicamos Normalización Min-Max
X= MinMaxScaler().fit_transform(X)

# Separamos la muestra al azar, 60% para entrenar, 40% para testeo final.
# La proporción para entrenamiento es alta dado que el Perceptrón es sensible
# a datos "adversos", así que estamos tratando de reducir ese riesgo.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4)


Ejemplo aplicando todo al titanic

In [265]:
from sklearn.metrics import confusion_matrix, accuracy_score
import time

entrada_dim = len(X_train[0])

modelMiniBatch = Network()

modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))

modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))

modelMiniBatch.add(FCLayer(64, 1))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

modelMiniBatch.use(bce, bce_prime)

modelMiniBatch.fit(X_train, y_train, epochs=100, learning_rate=0.1)

y_hat = modelMiniBatch.predict(X_test)

y_hat = np.array([1 if output >= 0.5 else 0 for output in y_hat])

matriz_conf = confusion_matrix(y_test, y_hat)
accuracy = accuracy_score(y_test, y_hat)


print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print(f'La exactitud de testeo del modelo ANN es: {accuracy:.3f}')


epoch 1/100   error=0.539130
epoch 2/100   error=0.486969
epoch 3/100   error=0.468688
epoch 4/100   error=0.461149
epoch 5/100   error=0.451651
epoch 6/100   error=0.446106
epoch 7/100   error=0.436333
epoch 8/100   error=0.435332
epoch 9/100   error=0.421551
epoch 10/100   error=0.419598
epoch 11/100   error=0.417719
epoch 12/100   error=0.416918
epoch 13/100   error=0.409313
epoch 14/100   error=0.410835
epoch 15/100   error=0.405645
epoch 16/100   error=0.408080
epoch 17/100   error=0.400601
epoch 18/100   error=0.399965
epoch 19/100   error=0.396040
epoch 20/100   error=0.395977
epoch 21/100   error=0.394391
epoch 22/100   error=0.391109
epoch 23/100   error=0.395930
epoch 24/100   error=0.386268
epoch 25/100   error=0.386604
epoch 26/100   error=0.381939
epoch 27/100   error=0.385312
epoch 28/100   error=0.385684
epoch 29/100   error=0.383177
epoch 30/100   error=0.389138
epoch 31/100   error=0.382108
epoch 32/100   error=0.378473
epoch 33/100   error=0.380378
epoch 34/100   erro

In [266]:
from sklearn.metrics import confusion_matrix, accuracy_score
import time

entrada_dim = len(X_train[0])

modelMiniBatch = MiniBatchNetwork()

dropout_rate = 0.2  #Lobotomizando mi red neuronal
model.add(DropoutLayer(dropout_rate))


modelMiniBatch.add(FCLayer(entrada_dim, 128))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))

modelMiniBatch.add(FCLayer(128, 64))
modelMiniBatch.add(ActivationLayer(relu, relu_prime))

modelMiniBatch.add(FCLayer(64, 1))
modelMiniBatch.add(ActivationLayer(sigmoid, sigmoid_prime))

modelMiniBatch.use(bce, bce_prime)

modelMiniBatch.fit(X_train, y_train, epochs=100, learning_rate=0.1, batch_size=16)

y_hat = modelMiniBatch.predict(X_test)

y_hat = np.array([1 if output >= 0.5 else 0 for output in y_hat])

matriz_conf = confusion_matrix(y_test, y_hat)
accuracy = accuracy_score(y_test, y_hat)


print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print(f'La exactitud de testeo del modelo ANN es: {accuracy:.3f}')


epoch 1/100   error=0.021046
epoch 2/100   error=0.018957
epoch 3/100   error=0.018826
epoch 4/100   error=0.017646
epoch 5/100   error=0.018024
epoch 6/100   error=0.017376
epoch 7/100   error=0.017389
epoch 8/100   error=0.017160
epoch 9/100   error=0.016621
epoch 10/100   error=0.016926
epoch 11/100   error=0.016804
epoch 12/100   error=0.016831
epoch 13/100   error=0.016465
epoch 14/100   error=0.016228
epoch 15/100   error=0.016230
epoch 16/100   error=0.016069
epoch 17/100   error=0.015620
epoch 18/100   error=0.015906
epoch 19/100   error=0.016378
epoch 20/100   error=0.015868
epoch 21/100   error=0.015535
epoch 22/100   error=0.015811
epoch 23/100   error=0.015943
epoch 24/100   error=0.015813
epoch 25/100   error=0.015497
epoch 26/100   error=0.015861
epoch 27/100   error=0.015684
epoch 28/100   error=0.015260
epoch 29/100   error=0.015491
epoch 30/100   error=0.015248
epoch 31/100   error=0.014870
epoch 32/100   error=0.015599
epoch 33/100   error=0.015265
epoch 34/100   erro