In [26]:
from keras.datasets import mnist
import random
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# No necesitamos tantos datos.
(X_train, y_train), (X_test, y_test) = mnist.load_data()
#random.seed(123) # Vamos a controlar la aleatoriedad en adelante. 
X, y = zip(*random.sample(list(zip(X_train, y_train)), 2000))

# Sí necesitamos que la forma de X sea la de un vector, en lugar de una matriz. 
X, y = np.array(X, dtype='float64'), np.array(y, dtype='float64')
X = np.reshape(X, (X.shape[0], -1))

# Normalizamos Min-Max
X= MinMaxScaler().fit_transform(X)

# Dividimos la muestra en dos, una para entrenar y otra para testing, como tenemos 
# muestra de sobra nos damos el lujo de testear con la misma cantidad que entrenamos.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=123)

# Necesitamos que y_train sea un valor categórico, en lugar de un dígito entero.
y_train_value = y_train # Guardaremos y_train como valor para un observación más abajo.
from keras.utils import to_categorical
y_train = to_categorical(y_train)

<h1>Early Stopping</h1>

In [27]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience  # Número de épocas sin mejoría para detener el entrenamiento
        self.min_delta = min_delta  # Mínima diferencia para considerar una mejoría
        self.best_loss = None  # Para registrar la mejor pérdida observada
        self.wait = 0  # Contador de épocas sin mejoría

    def should_stop(self, val_loss):
        # Si no se ha establecido mejor pérdida, tomamos la primera
        if self.best_loss is None:
            self.best_loss = val_loss
            return False

        # Si la pérdida mejora por más que el delta mínimo, actualizamos mejor pérdida
        if self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.wait = 0  # Reiniciamos el contador de épocas sin mejoría
        else:
            self.wait += 1  # Incrementamos el contador

        # Si hemos alcanzado el número de épocas sin mejoría, detenemos el entrenamiento
        return self.wait >= self.patience

# Clase EarlyStopping proporcionada
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.wait = 0

    def should_stop(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
            return False
        if self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.wait = 0
        else:
            self.wait += 1
        return self.wait >= self.patience



<h1>Pooling</h1>

In [None]:
# Función para construir un modelo CNN con pooling
def build_cnn_model_with_pooling():
    model = Network()

    # Primera capa convolucional seguida de MaxPooling
    model.add(Conv2D(32, (3, 3), activation=relu))  # Capa convolucional
    model.add(MaxPooling2D((2, 2)))  # Max Pooling con ventana de 2x2

    # Segunda capa convolucional seguida de AveragePooling
    model.add(Conv2D(64, (3, 3), activation=relu))  # Otra capa convolucional
    model.add(AveragePooling2D((2, 2)))  # Average Pooling con ventana de 2x2

    # Tercera capa convolucional seguida de MaxPooling
    model.add(Conv2D(128, (3, 3), activation=relu))  # Otra capa convolucional
    model.add(MaxPooling2D((2, 2)))  # Max Pooling con ventana de 2x2

    # Aplanar para conectarlo a la capa densa
    model.add(Flatten())  # Aplanamos la salida de las capas convolucionales
    model.add(FCLayer(128, 1))  # Capa densa final para clasificación binaria

    return model

In [28]:
import numpy as np

# Clase base para Capa
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    # computes the output Y of a layer for a given input X
    def forward_propagation(self, input):
        raise NotImplementedError

    # computes dE/dX for a given dE/dY (and update parameters if any)
    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError


# Clase para capas densas (fully connected)
class FCLayer(Layer):
    def __init__(self, input_size, output_size):
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5

    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error
        return input_error


# Clase para Capa de Activación. Junto con la capa densa forman perceptrones.
class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime

    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        return self.activation_prime(self.input) * output_error


# Clases para capas convolucionales y de pooling
class Conv2D(Layer):
    def __init__(self, filters, kernel_size, activation=None):
        self.filters = filters
        self.kernel_size = kernel_size
        self.activation = activation
        self.weights = np.random.rand(kernel_size[0], kernel_size[1], 1, filters) - 0.5
        self.bias = np.random.rand(filters) - 0.5

    def forward_propagation(self, input_data):
        self.input = input_data
        # Aplicar la convolución aquí (simplificada)
        # Se necesita una implementación completa para la convolución
        self.output = np.zeros((input_data.shape[0] - self.kernel_size[0] + 1, input_data.shape[1] - self.kernel_size[1] + 1, self.filters))
        for f in range(self.filters):
            for i in range(self.output.shape[0]):
                for j in range(self.output.shape[1]):
                    self.output[i, j, f] = np.sum(input_data[i:i+self.kernel_size[0], j:j+self.kernel_size[1]] * self.weights[..., f]) + self.bias[f]
        if self.activation:
            return self.activation(self.output)
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # Se necesita implementar la retropropagación para la convolución
        pass


class MaxPooling2D(Layer):
    def __init__(self, pool_size):
        self.pool_size = pool_size

    def forward_propagation(self, input_data):
        self.input = input_data
        # Max pooling (simplificada)
        output_shape = (input_data.shape[0] // self.pool_size[0], input_data.shape[1] // self.pool_size[1], input_data.shape[2])
        self.output = np.zeros(output_shape)
        for i in range(output_shape[0]):
            for j in range(output_shape[1]):
                self.output[i, j] = np.max(input_data[i*self.pool_size[0]:(i+1)*self.pool_size[0], j*self.pool_size[1]:(j+1)*self.pool_size[1], :], axis=(0, 1))
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        # Se necesita implementar la retropropagación para max pooling
        pass


class AveragePooling2D(MaxPooling2D):
    def forward_propagation(self, input_data):
        self.input = input_data
        output_shape = (input_data.shape[0] // self.pool_size[0], input_data.shape[1] // self.pool_size[1], input_data.shape[2])
        self.output = np.zeros(output_shape)
        for i in range(output_shape[0]):
            for j in range(output_shape[1]):
                self.output[i, j] = np.mean(input_data[i*self.pool_size[0]:(i+1)*self.pool_size[0], j*self.pool_size[1]:(j+1)*self.pool_size[1], :], axis=(0, 1))
        return self.output



# Clase Red, conecta múltiples capas.
class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None

    # add layer to network
    def add(self, layer):
        self.layers.append(layer)

    # set loss to use
    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime

    # predict output for given input
    def predict(self, input_data):
        if input_data.ndim == 1:  # Ajuste para arreglos unidimensionales
            input_data = np.array([[x] for x in input_data])
        samples = len(input_data)
        result = []

        for i in range(samples):
            output = input_data[i]
            for layer in self.layers:
                output = layer.forward_propagation(output)
            result.append(output)
        return result

    # train the network with Early Stopping
    def fit(self, x_train, y_train, epochs, learning_rate, x_val=None, y_val=None, early_stopping=None):
        if x_train[0].ndim == 1:
            x_train = np.array([[x] for x in x_train])
        samples = len(x_train)

        for i in range(epochs):
            err = 0
            for j in range(samples):
                output = x_train[j]
                for layer in self.layers:
                    output = layer.forward_propagation(output)

                err += self.loss(y_train[j], output)

                error = self.loss_prime(y_train[j], output)
                for layer in reversed(self.layers):
                    error = layer.backward_propagation(error, learning_rate)

            err /= samples
            err = np.mean(err)

            print('epoch %d/%d   error=%f' % (i + 1, epochs, err))

            # Validación y Early Stopping
            if x_val is not None and y_val is not None and early_stopping is not None:
                val_err = 0
                for j in range(len(x_val)):
                    val_output = x_val[j]
                    for layer in self.layers:
                        val_output = layer.forward_propagation(val_output)
                    val_err += self.loss(y_val[j], val_output)
                val_err /= len(x_val)
                val_err = np.mean(val_err)

                print('Validation error: %f' % val_err)

                if early_stopping.should_stop(val_err):
                    print('Early stopping at epoch %d' % (i + 1))
                    break


# Funciones de Activación
def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x) ** 2

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_prime(x):
    sig = sigmoid(x)
    return sig * (1 - sig)

def relu(x):
    return np.maximum(0, x)

def relu_prime(x):
    return np.where(x > 0, 1, 0)


# Funciones de pérdida
def mse(y_true, y_hat):
    return (y_true - y_hat) ** 2

def mse_prime(y_true, y_hat):
    return 2 * (y_hat - y_true)

def bce(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true * np.log(y_hat) + (1 - y_true) * np.log(1 - y_hat))

def bce_prime(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true / y_hat) + (1 - y_true) / (1 - y_hat)


In [29]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score

# Suponiendo que tienes tus datos X y y
# Dividimos los datos en entrenamiento y validación
X_train_full, X_val, y_train_full, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Crear instancia de Network
model = Network()

# Necesitamos identificar cuantos nodos tiene nuestra entrada, y eso depende del tamaño de X.
entrada_dim = len(X_train_full[0])

# Agregamos capas al modelo (sin regularización L2)
model.add(FCLayer(entrada_dim, 128))
model.add(ActivationLayer(relu, relu_prime))
model.add(FCLayer(128, 64))
model.add(ActivationLayer(sigmoid, sigmoid_prime))
model.add(FCLayer(64, 10))
model.add(ActivationLayer(sigmoid, sigmoid_prime))

# Asignamos función de pérdida
model.use(bce, bce_prime)

# Inicializamos EarlyStopping
early_stopping = EarlyStopping(patience=5, min_delta=0.001)

# Entrenamos el modelo con datos de entrenamiento y validación
model.fit(X_train_full, y_train_full, epochs=30, learning_rate=0.1, x_val=X_val, y_val=y_val, early_stopping=early_stopping)

# Usamos el modelo para predecir sobre los datos de prueba (X_test)
y_hat = model.predict(X_test)

# Transformamos la salida en un vector one-hot encoded, es decir, obtenemos la clase predicha
for i in range(len(y_hat)):
    y_hat[i] = np.argmax(y_hat[i])

# Reportamos los resultados del modelo
matriz_conf = confusion_matrix(y_test, y_hat)
print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))


epoch 1/30   error=0.229760
Validation error: 0.192313
epoch 2/30   error=0.115505
Validation error: 0.231704
epoch 3/30   error=0.080549
Validation error: 0.117621
epoch 4/30   error=0.064104
Validation error: 0.129325
epoch 5/30   error=0.044275
Validation error: 0.094506
epoch 6/30   error=0.031006
Validation error: 0.093984
epoch 7/30   error=0.021331
Validation error: 0.099247
epoch 8/30   error=0.010514
Validation error: 0.094320
epoch 9/30   error=0.004902
Validation error: 0.099775
epoch 10/30   error=0.002902
Validation error: 0.097042
Early stopping at epoch 10
MATRIZ DE CONFUSIÓN para modelo ANN
[[ 90   0   1   0   0   1   5   1   0   0]
 [  0 118   0   0   0   0   0   1   0   0]
 [  1   1  70   3   1   0   5   0   0   0]
 [  1   1   4  91   0   3   1   2   4   0]
 [  0   0   1   0  86   0   3   0   2   4]
 [  3   0   1   4   1  80   3   0   6   0]
 [  2   0   0   0   0   1  91   0   2   0]
 [  0   1   0   0   0   0   0 104   2   2]
 [  0   4   1   5   0   1   2   0  96   3]

In [21]:
import numpy as np

# Clase base para Capa
class Layer:
    def __init__(self):
        self.input = None
        self.output = None
    # computes the output Y of a layer for a given input X
    def forward_propagation(self, input):
        raise NotImplementedError
    # computes dE/dX for a given dE/dY (and update parameters if any)
    def backward_propagation(self, output_error, learning_rate):
        raise NotImplementedError
        
# Clase para capas densas (fully connected)
class FCLayer(Layer):
    def __init__(self, input_size, output_size):
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5

    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output

    def backward_propagation(self, output_error, learning_rate):
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)

        # Actualizar los parámetros
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error
        return input_error
    
# Clase para Capa de Activación. Junto con la capa densa forman perceptrones. 
class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime

    # returns the activated input
    def forward_propagation(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    # Returns input_error=dE/dX for a given output_error=dE/dY.
    # learning_rate is not used because there is no "learnable" parameters.
    def backward_propagation(self, output_error, learning_rate):
        return self.activation_prime(self.input) * output_error
    
# Clase Red, conecta múltiples capas.
class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None

    # add layer to network
    def add(self, layer):
        self.layers.append(layer)

    # set loss to use
    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime

    # predict output for given input
    def predict(self, input_data):
        if input_data.ndim == 1:  # Manejar vectores unidimensionales
            input_data = np.array([[x] for x in input_data])
        samples = len(input_data)
        result = []

        # run network over all samples
        for i in range(samples):
            # forward propagation
            output = input_data[i]
            for layer in self.layers:
                output = layer.forward_propagation(output)
            result.append(output)
        return result

    # train the network with Early Stopping
    def fit(self, x_train, y_train, x_val, y_val, epochs, learning_rate, early_stopping=None):
        if x_train[0].ndim == 1:
            x_train = np.array([[x] for x in x_train])
        samples = len(x_train)
        
        for epoch in range(epochs):
            err = 0
            for j in range(samples):
                # forward propagation
                output = x_train[j]
                for layer in self.layers:
                    output = layer.forward_propagation(output)

                # compute loss
                err += self.loss(y_train[j], output)

                # backward propagation
                error = self.loss_prime(y_train[j], output)
                for layer in reversed(self.layers):
                    error = layer.backward_propagation(error, learning_rate)

            # calculate average error on all samples
            err /= samples
            err = np.mean(err)
            print(f'epoch {epoch+1}/{epochs}   error={err}')
            
            # Validación y Early Stopping
            val_err = 0
            for i in range(len(x_val)):
                val_output = x_val[i]
                for layer in self.layers:
                    val_output = layer.forward_propagation(val_output)
                val_err += self.loss(y_val[i], val_output)

            val_err /= len(x_val)
            val_err = np.mean(val_err)
            print(f'Validation error after epoch {epoch+1}: {val_err}')
            
            # Early Stopping
            if early_stopping and early_stopping.should_stop(val_err):
                print(f"Early stopping at epoch {epoch+1}")
                break

# Implementación de Early Stopping
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.wait = 0

    def should_stop(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
            return False

        if self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.wait = 0
        else:
            self.wait += 1

        return self.wait >= self.patience

# Funciones de Activación y su derivada
def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x)**2

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_prime(x):
    sig = sigmoid(x)
    return sig * (1 - sig)

def relu(x):
    return np.maximum(0, x)

def relu_prime(x):
    return np.where(x > 0, 1, 0)

# Funciones de pérdida
def mse(y_true, y_hat):
    return (y_true - y_hat) ** 2

def mse_prime(y_true, y_hat):
    return 2 * (y_hat - y_true)

def bce(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true * np.log(y_hat) + (1 - y_true) * np.log(1 - y_hat))

def bce_prime(y_true, y_hat):
    epsilon = 1e-15
    y_hat = np.clip(y_hat, epsilon, 1 - epsilon)
    return -(y_true / y_hat) + (1 - y_true) / (1 - y_hat)

# Entrenamiento con Early Stopping
early_stopping = EarlyStopping(patience=5, min_delta=0.01)

# Crear instancia de Network
model = Network()

# Agregar capas
entrada_dim = len(X_train[0])
model.add(FCLayer(entrada_dim, 128))
model.add(ActivationLayer(relu, relu_prime))
model.add(FCLayer(128, 64))
model.add(ActivationLayer(sigmoid, sigmoid_prime))
model.add(FCLayer(64, 10))
model.add(ActivationLayer(sigmoid, sigmoid_prime))

# Asignar función de pérdida
model.use(bce, bce_prime)

# Entrenar con datos de entrenamiento y validación, e integrar Early Stopping
model.fit(X_train, y_train, X_val, y_val, epochs=30, learning_rate=0.1, early_stopping=early_stopping)

# Reportamos los resultados del modelo
matriz_conf = confusion_matrix(y_test, y_hat)
print('MATRIZ DE CONFUSIÓN para modelo ANN')
print(matriz_conf, '\n')
print('La exactitud de testeo del modelo ANN es: {:.3f}'.format(accuracy_score(y_test, y_hat)))


epoch 1/30   error=0.19994082851470596
Validation error after epoch 1: 0.09929526961847532
epoch 2/30   error=0.11022641078987586
Validation error after epoch 2: 0.07349405239883439
epoch 3/30   error=0.0804191777324974
Validation error after epoch 3: 0.050170871151870314
epoch 4/30   error=0.05258249069141635
Validation error after epoch 4: 0.034300667803674786
epoch 5/30   error=0.03977050836029096
Validation error after epoch 5: 0.031085400018210047
epoch 6/30   error=0.02438922985602992
Validation error after epoch 6: 0.018035743832928674
epoch 7/30   error=0.01639608310799743
Validation error after epoch 7: 0.005455967107241914
epoch 8/30   error=0.014376399231121056
Validation error after epoch 8: 0.005691513423113552
epoch 9/30   error=0.008352195058858028
Validation error after epoch 9: 0.0028982430478194385
epoch 10/30   error=0.0038091532899831534
Validation error after epoch 10: 0.0014131359653005112
epoch 11/30   error=0.0016096231300527012
Validation error after epoch 11: 