# Clase MLP

In [None]:
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Función de activación sigmoide
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Derivada de la sigmoide
def sigmoid_derivative(x):
    return x * (1 - x)

# Establece la semilla para la generación de números aleatorios
def seed(random_state=33):
    np.random.seed(random_state)

# Inicialización de javier
def xavier_initialization(input_size, output_size):
    # ¿En el parametro size es output, input?
    return np.random.normal(scale=np.sqrt(2 / (input_size + output_size)), size=(output_size, input_size))

# Inicialización normal
def normal_initialization(input_size, output_size):
    return np.random.randn(output_size, input_size) * 0.1

# Preprocesado de datos
def preprocesar(ruta):
    datos = pd.read_csv(ruta, header=0)
    datos_crudos = datos.to_numpy()

    x = datos_crudos[:, :-1]
    y = datos_crudos[:, -1:]

    return x, y

# Normalizar los datos
def normalizar_datos(X):
    scaler = StandardScaler()
    return scaler.fit_transform(X)

# Crear mini lotes
def create_minibatches(X, y, batch_size):
    """
    Genera los lotes de datos (batchs) de acuerdo al parámetro batch_size de forma aleatoria para el procesamiento. 
    """
    n_samples = X.shape[0]
    indices = np.random.permutation(n_samples)  # Mezcla los índices aleatoriamente
    X_shuffled, y_shuffled = X[indices], y[indices]  # Reordena X e y según los índices aleatorios
    
    # Divide los datos en minibatches
    for X_batch, y_batch in zip(np.array_split(X_shuffled, np.ceil(n_samples / batch_size)), 
                                np.array_split(y_shuffled, np.ceil(n_samples / batch_size))):
        yield X_batch, y_batch

# Probar modelo
def evaluar_modelo_prueba(modelo, ruta_prueba, normalizar):
    x_test_crudo, Y_test_crudo = preprocesar(ruta_prueba)

    if normalizar:
        x_test = normalizar_datos(x_test_crudo)
    else:
        x_test = x_test_crudo
    
    metricas = modelo.evaluar(x_test, Y_test_crudo)

    return metricas


class MLP_TODO:
    def __init__(self, num_entradas, num_neuronas_ocultas, num_salidas, epochs, batch_size=128, learning_rate=0.2, random_state=42, initialization="xavier"):

        # Construcción
        seed(random_state)
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.batch_size = batch_size
        
        self.error_mse = []
        self.accuracy_epoca = []
        
        # definir las capas
        if initialization == 'xavier':
            init_fun = xavier_initialization
        else : 
            init_fun = normal_initialization

        self.W1 = init_fun(num_entradas, num_neuronas_ocultas)
        self.b1 = np.zeros((1, num_neuronas_ocultas))
        self.W2 = init_fun(num_neuronas_ocultas, num_salidas)
        self.b2 = np.zeros((1, num_salidas))

    def forward(self, X):
        #----------------------------------------------
        # 1. Propagación hacia adelante (Forward pass)
        #----------------------------------------------
        self.X = X
        self.z_c1 = X @ self.W1.T + self.b1
        self.a_c1 = sigmoid(self.z_c1)
        self.z_c2 = self.a_c1 @ self.W2.T + self.b2
        y_pred = sigmoid(self.z_c2)  # Y^
        return y_pred

    def loss_function_MSE(self, y_pred, y):
        #----------------------------------------------
        # 2. Cálculo del error con MSE
        #----------------------------------------------
        self.y_pred = y_pred
        self.y = y
        error = 0.5 * np.mean((y_pred - y) ** 2)
        return error
    
    def backward(self):
        #----------------------------------------------
        # 3. Propagación hacia atrás (Backward pass)
        #----------------------------------------------
        
        #----------------------------------------------
        # Gradiente de la salida
        #----------------------------------------------
        dE_dy_pred = (self.y_pred - self.y) / self.y.shape[0] # Derivada del error respecto a la predicción con  N ejemplos
        d_y_pred_d_zc2 = sigmoid_derivative(self.y_pred)
        delta_c2 = dE_dy_pred * d_y_pred_d_zc2

        #----------------------------------------------
        # Gradiente en la capa oculta
        #----------------------------------------------
        # calcular la derivada de las suma ponderada respecto a las activaciones de la capa 1
        delta_c1 = (delta_c2 @ self.W2) * sigmoid_derivative(self.a_c1)

        #calcula el gradiente de pesos y bias
        self.dE_dW2 = delta_c2.T @ self.a_c1
        self.dE_db2 = np.sum(delta_c2, axis=0, keepdims=True)
        self.dE_dW1 = delta_c1.T @ self.X
        self.dE_db1 = np.sum(delta_c1, axis=0, keepdims=True)

    def update(self):  # Ejecución de la actualización de paramámetros
        #----------------------------------------------
        # Actualización de pesos de la capa de salida
        #---------------------------------------------- 
        
        self.W2 = self.W2 - self.learning_rate * self.dE_dW2 # Ojito con la T
        self.b2 = self.b2 - self.learning_rate * self.dE_db2

        #----------------------------------------------
        # Actuailzación de pesos de la capa oculta
        #----------------------------------------------
        #calcula el gradiente de la función de error respecto a los pesos de la capa 1
        self.W1 = self.W1 - self.learning_rate * self.dE_dW1
        self.b1 = self.b1 - self.learning_rate * self.dE_db1

    def predict(self, X):  # Predecir la categoría para datos nuevos
        y_pred = self.forward(X)
        # Obtener la clase para el clasificador binario
        y_pred = np.where(y_pred >= 0.5, 1, 0)
        return y_pred

    def train(self, X, Y):
        for epoch in range(self.epochs):

            num_batch = 0
            epoch_error  = 0

            # Procesamiento por lotes
            for X_batch, y_batch in create_minibatches(X, Y, self.batch_size):
                y_pred = self.forward(X_batch)
                error = self.loss_function_MSE(y_pred, y_batch)
                
                # if np.all(y_pred == Y) : aciertos += 1
                # self.accuracy_epoca.append(aciertos/epoch)

                epoch_error += error
                self.backward() # cálculo de los gradientes
                self.update() # actualización de los pesos y bias
                num_batch += 1
                # Imprimir el error cada N épocas
            
            # Almacena el error promedio por época
            self.error_mse.append(epoch_error/num_batch)

            # Obtener predicciones binarias para todo el conjunto de entrenamiento
            y_pred_total = self.predict(X)

            # Calcular la exactitud
            exactitud = self.calcular_accuracy(y_pred_total, Y) 
            
            # Almacenar la exactitud de la época
            self.accuracy_epoca.append(exactitud)

            #if epoch % 100 == 0: print(f"Época {epoch:05d} | MSE: {epoch_error/num_batch:.6f} | Exactitud: {exactitud:.4f}")

    def graficar(self, graficar_exactitud=True, guardar=True, nombre="grafica"):
        """ 
        Para MSE siempre se muestra 
        """
        # Preparar datos
        mse = np.arange(len(self.error_mse))

        # Crear tabla
        plt.figure(figsize=(10,6))

        #Graficar MSE
        plt.plot(mse, self.error_mse, label="MSE", color="green", linewidth=1)


        """ 
        Para la exactitud 
        """
        if graficar_exactitud and len(self.accuracy_epoca) > 0:
            accuracy = np.arange(len(self.accuracy_epoca))
            plt.plot(accuracy, self.accuracy_epoca, label="Exactitud", color="green", linewidth=1)
            plt.ylabel("MSE / Exactitud")
            titulo = "Evolución del Error (MSE) y Exactitud durante el entrenamiento"
        else:
            plt.ylabel("Error Cuadrático Medio (MSE)")
            titulo = "Evolución del Error (MSE) durante el entrenamiento"

        plt.title(titulo)
        plt.xlabel("Época")
        plt.legend()
        plt.grid(True, alpha=0.3)

        if guardar:
            plt.savefig(f'./graficas/{nombre}.svg')
        plt.show()

    def calcular_accuracy(self, y_pred, y_verdadera):
        return np.mean(y_verdadera.flatten() == y_pred.flatten())

    def analizar(self, X, y):
        # Gráficar
        self.graficar(guardar=True, graficar_exactitud=False)

        # Valores reales y predicción
        y_pred = self.predict(X)
        print(f"valores reales: {y.flatten()}")
        print(f"Predicciones  : {y_pred.flatten()}")

        # Calcular exactitud
        exactitud = self.calcular_accuracy(y_pred, y)
        print(f"Exactitud: {exactitud}")

    def evaluar(self, x_test, y_test):
        y_gorrito = self.predict(x_test)

        accuracy = self.calcular_accuracy(y_gorrito, y_test)

        probabilidad = self.forward(x_test)
        mse = self.loss_function_MSE(probabilidad, y_test)

        metricas = {
            "Exactitud": accuracy,
            "mse": mse
        }

        return metricas

# Abir un archivo

In [None]:
import pandas as pd

# Definir las rutas de los datasets
rutas = {"spanish": {
    "train": "./Recursos/hateval_es_train.json",
    "test": "./Recursos/hateval_es_test.json",
    "all": "./Recursos/hateval_es_all.json"
},
"english": {
    "train": "./Recursos/hateval_en_train.json",
    "test": "./Recursos/hateval_en_test.json",
    "all": "./Recursos/hateval_en_all.json"
    }
}

# Aquí se almacenan todos los datasets
datos = {}

# Cargar todos los datasets
for idioma, archivos in rutas.items():
    datos[idioma] = {}
    for tipo, ruta in archivos.items():
        datos[idioma][tipo] = pd.read_json(ruta, lines=True)
        print(f"Cargando {idioma}_{tipo}, {len(datos[idioma][tipo])} datos")

print(datos["spanish"]["test"].head(10))

# Normalización de textos

In [None]:
from nltk.stem import SnowballStemmer

# Procesar un dataset aplicando noramalización y stemming
def preprocesar_dataset(df, idioma="spanish"):
    # Segun el idioma
    stemmer = SnowballStemmer(idioma)

    def aplicar_stemming(texto):
        if pd.isna(texto): return texto
        palabras = str(texto).split()
        return ' '.join([stemmer.stem(palabra) for palabra in palabras])
    
    df["text"] = df["text"].apply(normaliza_texto)
    df["text"] = df["text"].apply(aplicar_stemming)

    return df

In [None]:
# Normalización del texto

import unicodedata
import re
PUNCTUACTION = ";:,.\\-\"'/"
SYMBOLS = "()[]¿?¡!{}~<>|"
NUMBERS= "0123456789"
SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)
SKIP_SYMBOLS_AND_SPACES = set(PUNCTUACTION + SYMBOLS + '\t\n\r ')

def normaliza_texto(input_str,
                    punct=False,
                    accents=False,
                    num=False,
                    max_dup=2):
    """
        punct=False (elimina la puntuación, True deja intacta la puntuación)
        accents=False (elimina los acentos, True deja intactos los acentos)
        num= False (elimina los números, True deja intactos los acentos)
        max_dup=2 (número máximo de símbolos duplicados de forma consecutiva, rrrrr => rr)
    """
    
    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    for c in nfkd_f:
        if not num:
            if c in NUMBERS:
                continue
        if not punct:
            if c in SKIP_SYMBOLS:
                continue
        if not accents and unicodedata.combining(c):
            continue
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto



# Aplicar normalización a los datasets

In [None]:
datos_preprocesados = {}

for idioma in datos:
    datos_preprocesados[idioma] = {}

    for tipo in datos[idioma]:
        datos_preprocesados[idioma][tipo] = preprocesar_dataset(datos[idioma][tipo], idioma)

print(datos_preprocesados["spanish"]["test"].head(10))

# Documento-Término

In [None]:
# Construye la matriz Documento-Término (num_documentos x tamaño_vocabulario) por frecuencia de aparición

def documento_termino(documentos, vocabulario):
    num_documentos = len(documentos)
    tamaño_vocabulario = len(vocabulario)
    matriz_frecuencias = np.zeros((num_documentos, tamaño_vocabulario), dtype=int)
    
    # Crear diccionario para búsqueda más cómoda
    indice_terminos = {termino: indice for indice, termino in enumerate(vocabulario)}
    
    terminos_desconocidos = set()
    
    for indice_documento, texto_documento in enumerate(documentos):
        # Dividir el documento en términos individuales
        terminos_documento = texto_documento.lower().split()
        
        for termino_actual in terminos_documento:
            if termino_actual in indice_terminos:
                indice_termino = indice_terminos[termino_actual]
                matriz_frecuencias[indice_documento, indice_termino] += 1
            else:
                terminos_desconocidos.add(termino_actual)
    
    # Reportar términos fuera del vocabulario si los hay
    if terminos_desconocidos:
        print(f"Advertencia: {len(terminos_desconocidos)} términos no encontrados en el vocabulario")
        # Opcional: mostrar algunos ejemplos de términos desconocidos
        # print(f"Ejemplos de términos desconocidos: {list(terminos_desconocidos)[:5]}")
    
    return matriz_frecuencias


# Construir el vocabulario único a partir de los textos procesados
def construir_vocabulario(datos_procesados, idioma, tipo="train"):
    textos = datos_preprocesados[idioma][tipo]["text"]
    
    palabras = []
    for texto in textos:
        palabras.extend(texto.lower().split())
    
    vocabulario = sorted(set(palabras))
    print(f"Vocabulario: {idioma}-{tipo}: {len(vocabulario)} términos únicos")
    return vocabulario

In [103]:
matrices_vsm = {}

for idioma in datos_preprocesados:
    matrices_vsm[idioma] = {}  # CORRECCIÓN: inicializar por idioma
    
    print(f"\nProcesando: {idioma}")
    
    for tipo in datos_preprocesados[idioma]:
        textos = datos_preprocesados[idioma][tipo]["text"].to_list()
        
        vsm = documento_termino(textos, vocabularios[idioma])
        
        matrices_vsm[idioma][tipo] = vsm
        print(f"Matriz {idioma}_{tipo}: {vsm.shape}")


Procesando: spanish
Matriz spanish_train: (4500, 13706)
Advertencia: 1073 términos no encontrados en el vocabulario
Matriz spanish_test: (500, 13706)
Advertencia: 1073 términos no encontrados en el vocabulario
Matriz spanish_all: (5000, 13706)

Procesando: english
Matriz english_train: (9000, 23250)
Advertencia: 1927 términos no encontrados en el vocabulario
Matriz english_test: (1000, 23250)
Advertencia: 1927 términos no encontrados en el vocabulario
Matriz english_all: (10000, 23250)


In [106]:
print(matrices_vsm["spanish"]["train"])

[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]
