# Implementar un MLP con PyTorch para clasificación basado en el dataset de agresividad

1. **Definir los preprocesamientos para el texto**:  
   - convertir a minúsculas
   - normalizar el texto: borrar símbolos, puntuación, caracteres duplicados, etc.

2. **Separar los datos para entrenamiento y prueba**:  
   - Crear los dataset de entrenamiento y test con al función train_test_split 

3. **Construir la matriz de Documento-Término**:  
   - Definir los parámetros para usar unigramas
   - Usar la clase TfidfVectorizer para construir la matriz con los datos de entrenamiento
   
4. **Preparar los lotes de datos (minibatches) para el entrenamiento de la red**:  
   - Definir los minibatches con la matriz TFIDF construida

5. **Definir la arquitectura de la red**:  
   - Definir una red de 2 capas, con funciones PReLU en las capas ocultas y una capa de salida

6. **Entrenar el modelo**:  
   - Definir los parámetros de las red como: número de épocas, learning_rate, número de neuronas para las capas ocultas, etc.
   
7. **Evaluar el modelo**:  
   - Después del entrenamiento, probar la red con las entradas del conjunto de test y evaluar el desempeño con las métricas: Precisión, Recall, F1-score o F1-Measure y Accuracy.
   


# Definición de los datos y minibatches

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk import word_tokenize
from sklearn.model_selection import train_test_split


# colocar la semilla para la generación de números aleatorios para la reproducibilidad de experimentos

random_state = 42
torch.manual_seed(random_state)
np.random.seed(random_state)

#cargar los datos
dataset = pd.read_json("./data_aggressiveness_es.json", lines=True)
#conteo de clases
print("Total de ejemplos de entrenamiento")
print(dataset.klass.value_counts())
# Extracción de los textos en arreglos de numpy
X = dataset['text'].to_numpy()
# Extracción de las etiquetas o clases de entrenamiento
Y = dataset['klass'].to_numpy()



# TODO: Definir las funciones de preprocesamiento de texto vinculadas al proceso de creación de la matriz 
# Documeno-Término creada con TfidfVectorizer.

_STOPWORDS = stopwords.words("spanish")  # agregar más palabras a esta lista si es necesario

# Normalización del texto

import unicodedata
import re
PUNCTUACTION = ";:,.\\-\"'/"
SYMBOLS = "()[]¿?¡!{}~<>|"
NUMBERS= "0123456789"
SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)
SKIP_SYMBOLS_AND_SPACES = set(PUNCTUACTION + SYMBOLS + '\t\n\r ')

def normaliza_texto(input_str,
                    punct=False,
                    accents=False,
                    num=False,
                    max_dup=2):
    """
        punct=False (elimina la puntuación, True deja intacta la puntuación)
        accents=False (elimina los acentos, True deja intactos los acentos)
        num= False (elimina los números, True deja intactos los acentos)
        max_dup=2 (número máximo de símbolos duplicados de forma consecutiva, rrrrr => rr)
    """
    
    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    for c in nfkd_f:
        if not num:
            if c in NUMBERS:
                continue
        if not punct:
            if c in SKIP_SYMBOLS:
                continue
        if not accents and unicodedata.combining(c):
            continue
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto


# Preprocesamiento personalizado 
def mi_preprocesamiento(texto):
    #convierte a minúsculas el texto antes de normalizar
    tokens = word_tokenize(texto.lower())
    texto = " ".join(tokens)
    texto = normaliza_texto(texto)
    return texto
    
# Tokenizador personalizado 
def mi_tokenizador(texto):
    # Elimina stopwords: palabras que no se consideran de contenido y que no agregan valor semántico al texto
    #print("antes: ", texto)
    texto = [t for t in texto.split() if t not in _STOPWORDS]
    #print("después:",texto)
    return texto

# TODO: Codificar las etiquetas de los datos a una forma categórica numérica: LabelEncoder.

le = LabelEncoder()
# Normalizar las etiquetas a una codificación ordinal para entrada del clasificador
Y_encoded= le.fit_transform(Y)
print("Clases:")
print(le.classes_)
print("Clases codificadas:")
print(le.transform(le.classes_))

# TODO: Dividir el conjunto de datos en conjunto de entrenamiento (80%) y conjunto de pruebas (20%)


X_train, X_test, Y_train, Y_test =  train_test_split(X, Y_encoded, test_size=0.2, stratify=Y_encoded, random_state=42)

# Divide el conjunto de entrenamiento en:  entrenamiento (90%) y validación (10%)
X_train, X_val, Y_train, Y_val =  train_test_split(X_train, Y_train, test_size=0.1, stratify=Y_train, random_state=42)



# TODO: Crear la matriz Documento-Término con el dataset de entrenamiento: tfidfVectorizer


vec_tfidf = TfidfVectorizer(analyzer="word", preprocessor=mi_preprocesamiento, tokenizer=mi_tokenizador,  ngram_range=(1,1))
X_train_tfidf = vec_tfidf.fit_transform(X_train)

NUM_CLASSES = 2

# Codificación de la salida onehot

Y_train_one_hot = nn.functional.one_hot(torch.from_numpy(Y_train), num_classes=NUM_CLASSES).float()
Y_test_one_hot = nn.functional.one_hot(torch.from_numpy(Y_test), num_classes=NUM_CLASSES).float()
Y_val_one_hot = nn.functional.one_hot(torch.from_numpy(Y_val), num_classes=NUM_CLASSES).float()


# Convertir a matriz densa de tipo de dato float32 (tipo de dato por default en Pytorch)
X_train_tfidf = X_train_tfidf.toarray().astype(np.float32)

# Tranforma los datos de validación al espacio de representación del entrenamiento
X_val_tfidf = vec_tfidf.transform(X_val)

# Convertir a matriz densa de tipo de dato float32 (tipo de dato por default en Pytorch)
X_val_tfidf = X_val_tfidf.toarray().astype(np.float32)



# Crear minibatches en PyTorch usando DataLoader
def create_minibatches(X, Y, batch_size):
    # Recibe los documentos en X y las etiquetas en Y
    dataset = TensorDataset(X, Y) # Cargar los datos en un dataset de tensores
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    # loader = DataLoader(dataset, batch_size=batch_size)
    return loader


FileNotFoundError: File ./data_aggressiveness_es.json does not exist

In [3]:
X_train.shape, Y_train.shape

((3694,), (3694,))

In [4]:
Y_train_one_hot.shape, Y_test_one_hot.shape,  Y_val_one_hot.shape

(torch.Size([3694, 2]), torch.Size([1027, 2]), torch.Size([411, 2]))

# Definición de la arquitectura de la red

In [5]:

# Definir la red neuronal en PyTorch heredando de la clase base de Redes Neuronales: Module
class MLP(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        # Definición de capas, funciones de activación e inicialización de pesos
        input_size_h1 = 128
        input_size_h2 = 8 
        self.fc1 = nn.Linear(input_size, input_size_h1)
        # PReLU tiene parámetros aprendibles: Se recomienda una función de activación independiente por capa
        self.act1= nn.PReLU()

        self.fc2 = nn.Linear(input_size_h1, input_size_h2)
        # PReLU tiene parámetros aprendibles: Se recomienda una función de activación independiente por capa
        self.act2= nn.PReLU()

        self.output = nn.Linear(input_size_h2, output_size)
        
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)
        nn.init.xavier_uniform_(self.output.weight)

        if self.fc1.bias is not None:
            nn.init.zeros_(self.fc1.bias)
        if self.fc2.bias is not None:
            nn.init.zeros_(self.fc2.bias)        
        if self.output.bias is not None:
            nn.init.zeros_(self.output.bias)        

    
    def forward(self, X):
        # Definición del orden de conexión de las capas y aplición de las funciones de activación
        x = self.fc1(X)
        x = self.act1(x)
        x = self.fc2(x)
        x = self.act2(x)
        x = self.output(x)
        # Nota la última capa de salida 'output' no se activa debido a que CrossEntropyLoss usa LogSoftmax internamente. 
        return x

# Entrenamiento de la red

In [12]:
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Establecer los parámetros de la red

# Parámetros de la red
input_size =  X_train_tfidf.shape[1]

output_size = 2   # 2 clases

epochs = 100 # variar el número de épocas, para probar que funciona la programación 
                 # solo usar 2 épocas, para entrenamiento total usar por ejemplo 1000 épocas
learning_rate = 0.01 # Generalmente se usan learning rate pequeños (0.001), 

# Se recomiendan tamaños de batch_size potencias de 2: 16, 32, 64, 128, 256
# Entre mayor el número más cantidad de memoria se requiere para el procesamiento
batch_size = 128 # definir el tamaño del lote de procesamiento 


# TODO: Convertir los datos de entrenamiento y etiquetas a tensores  de PyTorch

X_train_t = torch.from_numpy(X_train_tfidf)
Y_train_t = Y_train_one_hot

X_val_t = torch.from_numpy(X_val_tfidf)

# Crear la red
model = MLP(input_size, output_size)

# Definir la función de pérdida
# Mean Square Error (MSE)
# criterion = nn.MSELoss()
# criterion = nn.BCELoss() 
criterion = nn.CrossEntropyLoss() 

# Definir el optimizador
#Parámetros del optimizador: parámetros del modelo y learning rate 
# Stochastic Gradient Descent (SGD)
# optimizer = optim.SGD(model.parameters(), lr=learning_rate)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Entrenamiento
print("Iniciando entrenamiento en PyTorch")


for epoch in range(epochs):
# Poner el modelo en modo de entrenamiento
    model.train()  
    lossTotal = 0
    #definir el batch_size
    dataloader = create_minibatches(X_train_t, Y_train_t, batch_size=batch_size)
    for X_tr, y_tr in dataloader:
        # inicializar los gradientes en cero para cada época
        optimizer.zero_grad()
        
        # Propagación hacia adelante
        y_pred = model(X_tr)  #invoca al método forward de la clase MLP
        # Calcular el error MSE
        loss = criterion(y_pred, y_tr)
        #Acumular el error 
        lossTotal += loss.item()
        
        # Propagación hacia atrás: cálculo de los gradientes de los pesos y bias
        loss.backward()
        
        # actualización de los pesos: regla de actualización basado en el gradiente:
        #  W = W - learning_rate * dE/dW
        optimizer.step()
        if np.random.random() < 0.1:
            print(f"Batch Error : {loss.item()}")

    print(f"Época {epoch+1}/{epochs}, Pérdida: {lossTotal/len(dataloader)}")
    
    # Evalúa el modelo con el conjunto de validación
    model.eval()  # Establecer el modo del modelo a "evaluación"
    with torch.no_grad():  # No  calcular gradientes 
        y_pred = model(X_val_t)
        # Aplica softmax para obtener las probabilidades en la evaluación
        y_pred = torch.softmax(y_pred, dim=1)
        # Obtiene una única clase, la más probable
        y_pred = torch.argmax(y_pred, dim=1)        
        print(f"Época {epoch+1}/{epochs}")
        print("P=", precision_score(Y_val, y_pred, average='macro'))
        print("R=", recall_score(Y_val, y_pred, average='macro'))
        print("F1=", f1_score(Y_val, y_pred, average='macro'))
        print("Acc=", accuracy_score(Y_val, y_pred))




Iniciando entrenamiento en PyTorch
Batch Error : 0.5418707728385925
Batch Error : 0.5832555890083313
Batch Error : 0.5617039799690247
Batch Error : 0.4275079369544983
Batch Error : 0.4394717812538147
Batch Error : 0.4274562895298004
Época 1/100, Pérdida: 0.5183003082357603
Época 1/100
P= 0.7067865981343994
R= 0.6885087059640191
F1= 0.695877378435518
Acc= 0.7615571776155717
Batch Error : 0.10893426090478897
Batch Error : 0.12142549455165863
Batch Error : 0.10180370509624481
Época 2/100, Pérdida: 0.09554767370994749
Época 2/100
P= 0.7070299771912676
R= 0.7205269855961127
F1= 0.7125874125874125
Acc= 0.7566909975669099
Batch Error : 0.0131586454808712
Batch Error : 0.003791796276345849
Época 3/100, Pérdida: 0.012999055561898598
Época 3/100
P= 0.7228848991637973
R= 0.7096951466419853
F1= 0.7154438860971524
Acc= 0.7737226277372263
Batch Error : 0.0019520187051966786
Batch Error : 0.028602613136172295
Época 4/100, Pérdida: 0.003343283452679692
Época 4/100
P= 0.7133286713286713
R= 0.7205848325

### Modo para predicción de datos

In [13]:
# TODO: Transformar el dataset de test con los mismos preprocesamientos y al  espacio de 
# representación vectorial que el modelo entrenado, es decir, al espacio de la matriz TFIDF

# Convertir los datos de prueba a tensores de PyTorch

X_test_tfidf = vec_tfidf.transform(X_test)

# Convertir a matriz densa de tipo de dato float32 (tipo de dato por default en Pytorch)
X_test_tfidf = X_test_tfidf.toarray().astype(np.float32)
X_t = torch.from_numpy(X_test_tfidf)

# Desactivar el comportamiento de modo de  entrenamiento: por ejemplo, capas como Dropout
model.eval()  # Establecer el modo del modelo a "evaluación"

with torch.no_grad():  # No  calcular gradientes 
    y_pred_test= model(X_t)

# y_test_pred contiene las predicciones

# Obtener la clase real
y_pred_test = torch.argmax(y_pred_test, dim=1)

print(y_pred_test)


tensor([0, 1, 1,  ..., 0, 1, 0])


### Evaluación

In [14]:
# TODO: Evaluar el modelo con las predicciones obtenidas y las etiquetas esperadas: 
# classification_report y  matriz de confusión (métricas Precisión, Recall, F1-measaure, Accuracy)

from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix


print(confusion_matrix(Y_test, y_pred_test))
print(classification_report(Y_test, y_pred_test, digits=4, zero_division='warn'))


[[159 137]
 [ 88 643]]
              precision    recall  f1-score   support

           0     0.6437    0.5372    0.5856       296
           1     0.8244    0.8796    0.8511       731

    accuracy                         0.7809      1027
   macro avg     0.7340    0.7084    0.7184      1027
weighted avg     0.7723    0.7809    0.7746      1027



### Evaluación de datos nuevos

In [15]:

x_new_data = ["Ese perro me robo mis cosas", "ese hdp se llevo el dinero", "mi app de calendario no sirve"]
x_new_data_tfidf = vec_tfidf.transform(x_new_data)
# Convertir a matriz densa de tipo de dato float32 (tipo de dato por default en Pytorch)
x_new_data_tfidf = x_new_data_tfidf.toarray().astype(np.float32)
X_new_t = torch.from_numpy(x_new_data_tfidf)


model.eval()  # Establecer el modo del modelo a "evaluación"
with torch.no_grad():  # No  calcular gradientes 
    y_pred = model(X_new_t)
    y_pred = torch.argmax(y_pred, dim=1)
    print(le.inverse_transform(y_pred))

['aggressive' 'aggressive' 'nonaggressive']


# Ejercicio.  Modificar la red neuronal MLP con las siguientes características:
- ## Arquitectura:
    - ### 4 capas ocultas
    - ### 2 salidas
    - ### Funciones de activación en capas ocultas ELU
    - ### Número de neuronas por capa oculta a su consideración
- ## Prepocesamiento:
    - ### Normalización
    - ### Repesentación de características: unigramas sin STOPWORDS y con stemming
    - ### Pesado TF-IDF

- ## Evaluación del rendimiento del modelo: 
    - ### 1. Paticiones train (80%), test (20%), validación (10% del train)
    - ### 2. Validación cruzada k-folds = 5


In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score
import pandas as pd
from nltk.corpus import stopwords
from nltk import word_tokenize
import unicodedata
import re

# ============================================================================
# CONFIGURACIÓN INICIAL Y CARGA DE DATOS
# ============================================================================

# Semilla para reproducibilidad - asegura que los resultados sean los mismos en cada ejecución
random_state = 42
torch.manual_seed(random_state)
np.random.seed(random_state)

# Cargar datos desde archivo JSON
dataset = pd.read_json("./data_aggressiveness_es.json", lines=True)
print("Total de ejemplos de entrenamiento")
print(dataset.klass.value_counts())  # Muestra la distribución de clases


Total de ejemplos de entrenamiento
klass
nonaggressive    3655
aggressive       1477
Name: count, dtype: int64


In [18]:

# Extraer textos y etiquetas
X = dataset['text'].to_numpy()  # Textos como array de numpy
Y = dataset['klass'].to_numpy()  # Etiquetas como array de numpy


In [19]:


# ============================================================================
# PREPROCESAMIENTO DE TEXTO
# ============================================================================

# Configuración de recursos para NLP
_STOPWORDS = stopwords.words("spanish")  # Lista de palabras vacías en español
PUNCTUACTION = ";:,.\\-\"'/"
SYMBOLS = "()[]¿?¡!{}~<>|"
NUMBERS = "0123456789"
SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)  # Símbolos a eliminar

def normaliza_texto(input_str, punct=False, accents=False, num=False, max_dup=2):
    """
    Normaliza el texto eliminando puntuación, acentos, números y duplicados excesivos
    """
    # Normalizar caracteres Unicode
    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    
    # Procesar cada carácter del texto
    for c in nfkd_f:
        # Eliminar números si num=False
        if not num and c in NUMBERS:
            continue
        # Eliminar puntuación si punct=False
        if not punct and c in SKIP_SYMBOLS:
            continue
        # Eliminar acentos si accents=False
        if not accents and unicodedata.combining(c):
            continue
        # Controlar caracteres duplicados consecutivos
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    
    # Reconstruir texto y normalizar espacios
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto

def mi_preprocesamiento(texto):
    """
    Preprocesamiento principal: tokenización, minúsculas y normalización
    """
    # Convertir a minúsculas y tokenizar
    tokens = word_tokenize(texto.lower())
    texto = " ".join(tokens)  # Reunir tokens en texto
    texto = normaliza_texto(texto)  # Aplicar normalización
    return texto

def mi_tokenizador(texto):
    """
    Tokenizador personalizado que elimina stopwords
    """
    texto = [t for t in texto.split() if t not in _STOPWORDS]
    return texto


In [20]:

# ============================================================================
# PREPARACIÓN DE ETIQUETAS Y PARTICIONES
# ============================================================================

# Codificar etiquetas de texto a números (ej: "agresivo" -> 0, "no_agresivo" -> 1)
le = LabelEncoder()
Y_encoded = le.fit_transform(Y)# Codificación ordinal de etiquetas
print("Clases:", le.classes_)#  Mostrar las clases originales
print("Clases codificadas:", le.transform(le.classes_))

# 1. PRIMERA PARTICION: 80% entrenamiento+validación, 20% test
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y_encoded, test_size=0.2, stratify=Y_encoded, random_state=42
)

# 2. SEGUNDA PARTICION: Del 80% anterior, 90% entrenamiento, 10% validación
X_train, X_val, Y_train, Y_val = train_test_split(
    X_train, Y_train, test_size=0.1, stratify=Y_train, random_state=42
)

print(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")


Clases: ['aggressive' 'nonaggressive']
Clases codificadas: [0 1]
Train: 3694 | Val: 411 | Test: 1027


In [None]:


# ============================================================================
# TRANSFORMACIÓN TF-IDF (REPRESENTACIÓN NUMÉRICA DE TEXTOS)
# ============================================================================

# Crear vectorizador TF-IDF para convertir textos a vectores numéricos
vec_tfidf = TfidfVectorizer(
    analyzer="word", 
    preprocessor=mi_preprocesamiento,  # Nuestra función de preprocesamiento
    tokenizer=mi_tokenizador,          # Nuestro tokenizador sin stopwords
    ngram_range=(1,1)                  # Usar solo unigramas (palabras individuales)
)

# Transformar textos a matriz TF-IDF (entrenar con train, aplicar a val y test)
X_train_tfidf = vec_tfidf.fit_transform(X_train).toarray().astype(np.float32)# Aqui se usa fit_transform porque es el conjunto de entrenamiento
X_val_tfidf = vec_tfidf.transform(X_val).toarray().astype(np.float32)#Aqui se usa transform porque es el conjunto de validación
X_test_tfidf = vec_tfidf.transform(X_test).toarray().astype(np.float32)#Aqui se usa transform porque es el conjunto de prueba

# Convertir arrays numpy a tensores PyTorch
X_train_tensor = torch.FloatTensor(X_train_tfidf)#
X_val_tensor = torch.FloatTensor(X_val_tfidf)
X_test_tensor = torch.FloatTensor(X_test_tfidf)
Y_train_tensor = torch.LongTensor(Y_train)
Y_val_tensor = torch.LongTensor(Y_val)
Y_test_tensor = torch.LongTensor(Y_test)

# ============================================================================
# DEFINICIÓN DEL MODELO MLP
# ============================================================================

class MLP(nn.Module):
    """
    Red Neuronal Multicapa con 4 capas ocultas y activaciones ELU
    """
    def __init__(self, input_size, output_size=2):
        super().__init__()
        # Capas completamente conectadas con arquitectura descendente
        self.fc1 = nn.Linear(input_size, 256)  # Capa entrada -> 256 neuronas
        self.act1 = nn.ELU()                   # Activación ELU
        self.fc2 = nn.Linear(256, 128)         # 256 -> 128 neuronas
        self.act2 = nn.ELU()
        self.fc3 = nn.Linear(128, 64)          # 128 -> 64 neuronas
        self.act3 = nn.ELU()
        self.fc4 = nn.Linear(64, 32)           # 64 -> 32 neuronas
        self.act4 = nn.ELU()
        self.output = nn.Linear(32, output_size)  # Capa salida (2 clases)
        
        # Inicialización de pesos
        self._initialize_weights()
    
    def _initialize_weights(self):
        """Inicializa los pesos de todas las capas con Xavier Uniform"""
        layers = [self.fc1, self.fc2, self.fc3, self.fc4, self.output]
        for layer in layers:
            nn.init.xavier_uniform_(layer.weight)  # Inicialización Xavier
            if layer.bias is not None:
                nn.init.zeros_(layer.bias)         # Bias en cero
    
    def forward(self, X):
        """Propagación hacia adelante a través de la red"""
        x = self.act1(self.fc1(X))  # Capa 1: Linear + ELU
        x = self.act2(self.fc2(x))  # Capa 2: Linear + ELU
        x = self.act3(self.fc3(x))  # Capa 3: Linear + ELU
        x = self.act4(self.fc4(x))  # Capa 4: Linear + ELU
        x = self.output(x)          # Capa salida (sin activación - CrossEntropy lo maneja)
        return x

# ============================================================================
# FUNCIÓN DE ENTRENAMIENTO
# ============================================================================

def entrenar_modelo(modelo, X_train, y_train, X_val, y_val, epochs=100, lr=0.001):
    """
    Entrena el modelo y evalúa en conjunto de validación
    """
    # Definir función de pérdida y optimizador
    criterion = nn.CrossEntropyLoss()  # Pérdida para clasificación
    optimizer = optim.Adam(modelo.parameters(), lr=lr)  # Optimizador Adam
    
    # Ciclo de entrenamiento
    for epoch in range(epochs):
        modelo.train()  # Modo entrenamiento
        optimizer.zero_grad()  # Limpiar gradientes anteriores
        
        # Forward pass
        outputs = modelo(X_train)
        loss = criterion(outputs, y_train)
        
        # Backward pass
        loss.backward()
        optimizer.step()
    
    # Validación del modelo
    modelo.eval()  # Modo evaluación
    with torch.no_grad():  # Desactivar cálculo de gradientes para evaluación
        val_outputs = modelo(X_val)
        _, val_preds = torch.max(val_outputs, 1)  # Obtener predicciones (índice de clase)
        val_acc = accuracy_score(y_val.numpy(), val_preds.numpy())  # Calcular accuracy
    
    return modelo, val_acc

# ============================================================================
# VALIDACIÓN CRUZADA K-FOLD
# ============================================================================

def validacion_cruzada_kfold(X, y, k_folds=5, epochs=50):
    """
    Realiza validación cruzada con k folds para evaluar robustez del modelo
    """
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    fold_accuracies = []  # Almacenar accuracy de cada fold
    
    # Iterar sobre cada fold
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Fold {fold+1}/{k_folds}")
        
        # Preparar datos del fold actual
        X_train_fold = torch.FloatTensor(X[train_idx])
        X_val_fold = torch.FloatTensor(X[val_idx])
        y_train_fold = torch.LongTensor(y[train_idx])
        y_val_fold = torch.LongTensor(y[val_idx])
        
        # Crear y entrenar modelo para este fold
        modelo = MLP(input_size=X_train_fold.shape[1])
        modelo, val_acc = entrenar_modelo(modelo, X_train_fold, y_train_fold, 
                                        X_val_fold, y_val_fold, epochs)
        
        fold_accuracies.append(val_acc)
        print(f"Accuracy Fold {fold+1}: {val_acc:.4f}")
    
    # Estadísticas de la validación cruzada
    print(f"\nK-Fold Accuracy: {np.mean(fold_accuracies):.4f} (+/- {np.std(fold_accuracies):.4f})")
    return fold_accuracies

# ============================================================================
# EVALUACIÓN COMPLETA DEL MODELO
# ============================================================================

print("\n=== ENTRENAMIENTO CON PARTICIÓN 80-20-10 ===")
# Crear y entrenar modelo final con la partición principal
modelo_final = MLP(input_size=X_train_tensor.shape[1])
modelo_entrenado, val_acc = entrenar_modelo(
    modelo_final, X_train_tensor, Y_train_tensor, X_val_tensor, Y_val_tensor
)

# Evaluar modelo en conjunto de test
modelo_entrenado.eval()
with torch.no_grad():
    test_outputs = modelo_entrenado(X_test_tensor)
    _, test_preds = torch.max(test_outputs, 1)
    test_acc = accuracy_score(Y_test_tensor.numpy(), test_preds.numpy())

print(f"Accuracy Validación: {val_acc:.4f}")
print(f"Accuracy Test: {test_acc:.4f}")

print("\n=== VALIDACIÓN CRUZADA K-FOLD (k=5) ===")
# Preparar datos completos para K-Fold (usando todo el dataset)
X_completo_tfidf = vec_tfidf.transform(X).toarray().astype(np.float32)
kfold_accuracies = validacion_cruzada_kfold(X_completo_tfidf, Y_encoded)

# ============================================================================
# RESULTADOS FINALES
# ============================================================================

print("\n=== RESULTADOS FINALES ===")
print(f"Test Accuracy: {test_acc:.4f}")
print(f"K-Fold Mean Accuracy: {np.mean(kfold_accuracies):.4f}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report, confusion_matrix
import pandas as pd
from nltk.corpus import stopwords
from nltk import word_tokenize
from nltk.stem import SnowballStemmer
import matplotlib.pyplot as plt
import seaborn as sns

# Configuración de semilla para reproducibilidad
random_state = 42
torch.manual_seed(random_state)
np.random.seed(random_state)

# Cargar los datos
dataset = pd.read_json("./data_aggressiveness_es.json", lines=True)
print("Total de ejemplos de entrenamiento")
print(dataset.klass.value_counts())

X = dataset['text'].to_numpy()
Y = dataset['klass'].to_numpy()

# Configuración para preprocesamiento en español
_STOPWORDS = stopwords.words("spanish")
stemmer = SnowballStemmer('spanish')

# Normalización del texto
import unicodedata
import re

PUNCTUACTION = ";:,.\\-\"'/"
SYMBOLS = "()[]¿?¡!{}~<>|"
NUMBERS = "0123456789"
SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)
SKIP_SYMBOLS_AND_SPACES = set(PUNCTUACTION + SYMBOLS + '\t\n\r ')

def normaliza_texto(input_str, punct=False, accents=False, num=False, max_dup=2):
    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    for c in nfkd_f:
        if not num:
            if c in NUMBERS:
                continue
        if not punct:
            if c in SKIP_SYMBOLS:
                continue
        if not accents and unicodedata.combining(c):
            continue
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto

# Preprocesamiento personalizado con stemming
def mi_preprocesamiento(texto):
    tokens = word_tokenize(texto.lower())
    texto = " ".join(tokens)
    texto = normaliza_texto(texto)
    return texto

# Tokenizador personalizado con stemming y eliminación de stopwords
def mi_tokenizador(texto):
    tokens = texto.split()
    # Filtrar stopwords y aplicar stemming
    tokens_filtrados = []
    for t in tokens:
        if t not in _STOPWORDS:
            token_stemmed = stemmer.stem(t)
            tokens_filtrados.append(token_stemmed)
    return tokens_filtrados

# Codificar las etiquetas
le = LabelEncoder()
Y_encoded = le.fit_transform(Y)
print("Clases:")
print(le.classes_)
print("Clases codificadas:")
print(le.transform(le.classes_))

# Dividir el conjunto de datos (80% train, 20% test)
X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y_encoded, test_size=0.2, stratify=Y_encoded, random_state=random_state
)

# Dividir el conjunto de entrenamiento en train (90%) y validación (10%)
X_train, X_val, Y_train, Y_val = train_test_split(
    X_train, Y_train, test_size=0.1, stratify=Y_train, random_state=random_state
)

# Crear la matriz Documento-Término con TF-IDF (unigramas)
vec_tfidf = TfidfVectorizer(
    analyzer="word", 
    preprocessor=mi_preprocesamiento, 
    tokenizer=mi_tokenizador,  
    ngram_range=(1, 1)  # Unigramas
)
X_train_tfidf = vec_tfidf.fit_transform(X_train)

# Aplicar normalización a los datos TF-IDF
scaler = StandardScaler(with_mean=False)  # with_mean=False para matrices sparse
X_train_tfidf = scaler.fit_transform(X_train_tfidf)

NUM_CLASSES = 2

# Codificación one-hot
Y_train_one_hot = nn.functional.one_hot(torch.from_numpy(Y_train), num_classes=NUM_CLASSES).float()
Y_test_one_hot = nn.functional.one_hot(torch.from_numpy(Y_test), num_classes=NUM_CLASSES).float()
Y_val_one_hot = nn.functional.one_hot(torch.from_numpy(Y_val), num_classes=NUM_CLASSES).float()

# Convertir a matrices densas
X_train_tfidf = X_train_tfidf.toarray().astype(np.float32)
X_val_tfidf = vec_tfidf.transform(X_val)
X_val_tfidf = scaler.transform(X_val_tfidf)
X_val_tfidf = X_val_tfidf.toarray().astype(np.float32)

# Definir la nueva arquitectura MLP con 4 capas ocultas y ELU
class MLP(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        # 4 capas ocultas con diferentes tamaños
        self.fc1 = nn.Linear(input_size, 512)
        self.act1 = nn.ELU()
        
        self.fc2 = nn.Linear(512, 256)
        self.act2 = nn.ELU()
        
        self.fc3 = nn.Linear(256, 128)
        self.act3 = nn.ELU()
        
        self.fc4 = nn.Linear(128, 64)
        self.act4 = nn.ELU()
        
        self.output = nn.Linear(64, output_size)
        
        # Inicialización de pesos
        self._initialize_weights()
    
    def _initialize_weights(self):
        for layer in [self.fc1, self.fc2, self.fc3, self.fc4, self.output]:
            nn.init.xavier_uniform_(layer.weight)
            if layer.bias is not None:
                nn.init.zeros_(layer.bias)
    
    def forward(self, X):
        x = self.act1(self.fc1(X))
        x = self.act2(self.fc2(x))
        x = self.act3(self.fc3(x))
        x = self.act4(self.fc4(x))
        x = self.output(x)
        return x

# Función para crear minibatches
def create_minibatches(X, Y, batch_size):
    dataset = TensorDataset(X, Y)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return loader

# Función para entrenar y evaluar el modelo
def train_and_evaluate_model(X_train_t, Y_train_t, X_val_t, Y_val, input_size, output_size=2):
    # Parámetros de entrenamiento
    epochs = 100
    learning_rate = 0.001
    batch_size = 128
    
    # Crear modelo
    model = MLP(input_size, output_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Listas para almacenar métricas
    train_losses = []
    val_metrics = []
    
    for epoch in range(epochs):
        model.train()
        loss_total = 0
        dataloader = create_minibatches(X_train_t, Y_train_t, batch_size=batch_size)
        
        for X_tr, y_tr in dataloader:
            optimizer.zero_grad()
            y_pred = model(X_tr)
            loss = criterion(y_pred, y_tr)
            loss_total += loss.item()
            loss.backward()
            optimizer.step()
        
        avg_loss = loss_total / len(dataloader)
        train_losses.append(avg_loss)
        
        # Evaluación en validación
        model.eval()
        with torch.no_grad():
            y_pred_val = model(X_val_t)
            y_pred_val = torch.softmax(y_pred_val, dim=1)
            y_pred_classes = torch.argmax(y_pred_val, dim=1)
            
            precision = precision_score(Y_val, y_pred_classes, average='macro')
            recall = recall_score(Y_val, y_pred_classes, average='macro')
            f1 = f1_score(Y_val, y_pred_classes, average='macro')
            acc = accuracy_score(Y_val, y_pred_classes)
            
            val_metrics.append({
                'epoch': epoch + 1,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'accuracy': acc
            })
        
        if (epoch + 1) % 10 == 0:
            print(f"Época {epoch+1}/{epochs}, Pérdida: {avg_loss:.4f}")
            print(f"  P={precision:.4f}, R={recall:.4f}, F1={f1:.4f}, Acc={acc:.4f}")
    
    return model, train_losses, val_metrics

# Entrenamiento inicial con división train/val
print("=== ENTRENAMIENTO INICIAL ===")
input_size = X_train_tfidf.shape[1]
X_train_t = torch.from_numpy(X_train_tfidf)
X_val_t = torch.from_numpy(X_val_tfidf)

model, train_losses, val_metrics = train_and_evaluate_model(
    X_train_t, Y_train_one_hot, X_val_t, Y_val, input_size
)

# Validación cruzada k-folds (k=5)
print("\n=== VALIDACIÓN CRUZADA (5-FOLDS) ===")
kf = KFold(n_splits=5, shuffle=True, random_state=random_state)
fold_metrics = []

# Combinar train y validation para k-fold
X_train_full = np.concatenate([X_train, X_val])
Y_train_full = np.concatenate([Y_train, Y_val])

for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_full)):
    print(f"\n--- Fold {fold + 1}/5 ---")
    
    # Preprocesamiento para este fold
    X_fold_train, X_fold_val = X_train_full[train_idx], X_train_full[val_idx]
    Y_fold_train, Y_fold_val = Y_train_full[train_idx], Y_train_full[val_idx]
    
    # TF-IDF para el fold
    X_fold_train_tfidf = vec_tfidf.transform(X_fold_train)
    X_fold_train_tfidf = scaler.transform(X_fold_train_tfidf)
    X_fold_train_tfidf = X_fold_train_tfidf.toarray().astype(np.float32)
    
    X_fold_val_tfidf = vec_tfidf.transform(X_fold_val)
    X_fold_val_tfidf = scaler.transform(X_fold_val_tfidf)
    X_fold_val_tfidf = X_fold_val_tfidf.toarray().astype(np.float32)
    
    # Codificación one-hot
    Y_fold_train_one_hot = nn.functional.one_hot(torch.from_numpy(Y_fold_train), num_classes=NUM_CLASSES).float()
    
    # Entrenar modelo para este fold
    X_fold_train_t = torch.from_numpy(X_fold_train_tfidf)
    X_fold_val_t = torch.from_numpy(X_fold_val_tfidf)
    
    fold_model, _, fold_val_metrics = train_and_evaluate_model(
        X_fold_train_t, Y_fold_train_one_hot, X_fold_val_t, Y_fold_val, input_size
    )
    
    # Guardar métricas del último epoch
    best_metrics = fold_val_metrics[-1]
    fold_metrics.append(best_metrics)
    print(f"Fold {fold + 1} - P: {best_metrics['precision']:.4f}, R: {best_metrics['recall']:.4f}, F1: {best_metrics['f1']:.4f}")

# Métricas promedio de validación cruzada
avg_precision = np.mean([m['precision'] for m in fold_metrics])
avg_recall = np.mean([m['recall'] for m in fold_metrics])
avg_f1 = np.mean([m['f1'] for m in fold_metrics])
avg_accuracy = np.mean([m['accuracy'] for m in fold_metrics])

print(f"\n=== RESULTADOS VALIDACIÓN CRUZADA ===")
print(f"Precisión promedio: {avg_precision:.4f}")
print(f"Recall promedio: {avg_recall:.4f}")
print(f"F1-score promedio: {avg_f1:.4f}")
print(f"Accuracy promedio: {avg_accuracy:.4f}")

# Evaluación final en conjunto de test
print("\n=== EVALUACIÓN EN CONJUNTO DE TEST ===")
X_test_tfidf = vec_tfidf.transform(X_test)
X_test_tfidf = scaler.transform(X_test_tfidf)
X_test_tfidf = X_test_tfidf.toarray().astype(np.float32)
X_test_t = torch.from_numpy(X_test_tfidf)

model.eval()
with torch.no_grad():
    y_pred_test = model(X_test_t)
    y_pred_test_classes = torch.argmax(y_pred_test, dim=1)

print("Matriz de confusión:")
print(confusion_matrix(Y_test, y_pred_test_classes))
print("\nReporte de clasificación:")
print(classification_report(Y_test, y_pred_test_classes, digits=4))

# Prueba con nuevos datos
print("\n=== PREDICCIÓN CON NUEVOS DATOS ===")
x_new_data = [
    "Ese perro me robo mis cosas", 
    "ese hdp se llevo el dinero", 
    "mi app de calendario no sirve",
    "gracias por tu ayuda",
    "eres un idiota completo"
]
x_new_data_tfidf = vec_tfidf.transform(x_new_data)
x_new_data_tfidf = scaler.transform(x_new_data_tfidf)
x_new_data_tfidf = x_new_data_tfidf.toarray().astype(np.float32)
X_new_t = torch.from_numpy(x_new_data_tfidf)

model.eval()
with torch.no_grad():
    y_pred_new = model(X_new_t)
    y_pred_new_classes = torch.argmax(y_pred_new, dim=1)
    print("Predicciones para nuevos datos:")
    for i, text in enumerate(x_new_data):
        print(f"  '{text}' -> {le.inverse_transform([y_pred_new_classes[i]])[0]}")