# Entrenamiento K-Folds con parada temprana

Autores:
- Pablo Quito
- Juan Valdiviezo 

In [None]:
!pip install transformers

In [None]:
from transformers import BertModel, AdamW, get_linear_schedule_with_warmup
import torch
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torch.optim import AdamW
import pandas as pd
from textwrap import wrap
from transformers import AutoTokenizer
from transformers import RobertaModel, AutoModel
import torch.nn as nn
from google.colab import drive
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc
import numpy as np

In [None]:
# Inicialización
ROUTE = '/content/drive/My Drive/Intelektubies/Datos/Entrenamiento V5'
FILE_NAME = 'df_entrenamiento_v5.xlsx'
RANDOM_SEED = 25
MAX_LEN = 130
BATCH_SIZE = 16  #Anterior 8
N_SPLITS = 5  # K-Folds
DATASET_PATH = ROUTE + '/' + FILE_NAME
NCLASES = 4  # Positivo, Negativo, Neutro, Alerta

np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("Fuente de datos: " + DATASET_PATH)
print("Dispositivo: " + str(device))
drive.mount('/content/drive')

In [None]:
# Cargar datos
df = pd.read_excel(DATASET_PATH)
print(df.shape)
print("\n".join(wrap(df['comentario'][666])))
df.head()

In [None]:
#Mapeo a las categorias para el modelo
df['sentimiento'] = df['sentimiento'].map({'Positiva': 2, 'Negativa': 0, 'Neutral': 1,'Alerta': 3}).astype(int)
df

In [None]:
# TOKENIZACIÓN
PRE_TRAINED_MODEL = 'pysentimiento/robertuito-base-uncased-emotion'
tokenizer = AutoTokenizer.from_pretrained(PRE_TRAINED_MODEL)  # Usa AutoTokenizer para elegir el tokenizador correcto

In [None]:
#Ejemplo de tokenización
sample_txt = 'hola mundo'
tokens = tokenizer.tokenize(sample_txt)
tokens_id = tokenizer.convert_tokens_to_ids(tokens)
print(tokens)
tokens_id

In [None]:
#Codificación
encoding = tokenizer.encode_plus(
    sample_txt,
    max_length = 10,
    add_special_tokens = True, # Agrega [CLS] y [SEP]
    return_token_type_ids = False,
    padding='max_length',
    return_attention_mask = True,
    return_tensors = 'pt'
)

encoding.keys()

In [None]:
print(tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]))
print(encoding['input_ids'][0])
print(encoding['attention_mask'][0])

In [None]:
#Crear el dataset
class IMDBDataset(Dataset):
    def __init__(self,comentarios,sentimiento,tokenizer,max_len):
        self.comentarios = comentarios
        self.sentimiento = sentimiento
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.comentarios)

    def __getitem__(self, item):
        comentario = str(self.comentarios[item])
        label = self.sentimiento[item]
        encoding = tokenizer.encode_plus(
            comentario,
            max_length = self.max_len,
            add_special_tokens = True, # Agrega [CLS] y [SEP]
            return_token_type_ids = False,
            padding='max_length',
            return_attention_mask = True,
            return_tensors = 'pt',
            truncation= True
        )
        return {
            'review': comentario,
            'input_ids':encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
            }

In [None]:
#Data Loader

def data_loader(df,tokenizer,max_len,batch_size):
    dataset = IMDBDataset(
        comentarios=df.comentario.to_numpy(),
        sentimiento = df.sentimiento.to_numpy(),
        tokenizer=tokenizer,
        max_len=MAX_LEN
    )
    #Definir Samplers para clases desbalanceadas

    #Calcular pesos inversos a la frecuencia de cada clase
    class_counts = df['sentimiento'].value_counts().sort_index().values
    class_weights = 1/np.array(class_counts)
    sample_weights = class_weights[df['sentimiento'].values]

    #Crear sampler ponderado
    sampler = WeightedRandomSampler(sample_weights,num_samples=len(sample_weights),replacement = True)
    return DataLoader(dataset,batch_size=BATCH_SIZE,num_workers=2,sampler = sampler)

In [None]:
from sklearn.model_selection import train_test_split
df_train, df_test = train_test_split(df,test_size=0.2,random_state=RANDOM_SEED)

train_data_loader = data_loader(df_train,tokenizer,MAX_LEN,BATCH_SIZE)
test_data_loader = data_loader(df_test,tokenizer,MAX_LEN,BATCH_SIZE)

In [None]:
#Early Stopping por val_loss (No usado en la versión final)
class EarlyStopping:
    def __init__(self, patience, min_delta, path="best_model.pth"):
        """
        Args:
            patience (int): Número de épocas sin mejora antes de detener el entrenamiento.
            min_delta (float): Cambio mínimo en `val_loss` para considerar una mejora.
            path (str): Ruta donde se guardará el mejor modelo.
        """
        self.patience = patience
        self.min_delta = min_delta
        self.path = path
        self.best_loss = float('inf')
        self.counter = 0

    def __call__(self, val_loss, model, fold):
        """Verifica si se debe detener el entrenamiento."""
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            best_model_path = f"/content/drive/My Drive/Intelektubies/Modelos/RoBERTuito_folds/V5/model_fold_{fold}.pth"
            torch.save(model.state_dict(), best_model_path)  # Guarda el mejor modelo
            print(f"🔹 Mejor modelo guardado en {best_model_path} (Val Loss: {val_loss:.5f})")
        else:
            self.counter += 1
            print(f"⚠️ No hay mejora en {self.counter}/{self.patience} épocas.")

        if self.counter >= self.patience:
            print("⏹️ Early Stopping activado. Deteniendo el entrenamiento.")
            return True  # Se detiene el entrenamiento

        return False  # Continúa el entrenamiento

In [None]:
class EarlyStoppingF1:
    def __init__(self, patience=5, min_delta=0.001, path="best_model.pth"):
        self.patience = patience
        self.min_delta = min_delta
        self.path = path
        self.best_f1 = 0.0
        self.counter = 0

    def __call__(self, current_f1, model, fold):
        # Buscamos un incremento significativo en macro F1
        if current_f1 > self.best_f1 + self.min_delta:
            self.best_f1 = current_f1
            self.counter = 0
            best_model_path = f"/content/drive/My Drive/Intelektubies/Modelos/RoBERTuito_folds/Early/model_fold_{fold}.pth"
            torch.save(model.state_dict(), best_model_path)
            print(f"🔹 Mejor modelo guardado en {best_model_path} (Macro F1: {current_f1:.4f})")
        else:
            self.counter += 1
            print(f"⚠️ No hay mejora en {self.counter}/{self.patience} épocas (Macro F1: {current_f1:.4f}).")

        if self.counter >= self.patience:
            print("⏹️ Early Stopping activado. Deteniendo el entrenamiento.")
            return True
        return False

In [None]:
#MODELO
from transformers import RobertaModel, AutoModel
import torch.nn as nn
class RoBERTtuitoSentimentClassifier(nn.Module):
    def __init__(self,n_classes):
        super(RoBERTtuitoSentimentClassifier,self).__init__()
        #self.roberta = RobertaModel.from_pretrained(PRE_TRAINED_MODEL)
        self.roberta = AutoModel.from_pretrained(PRE_TRAINED_MODEL,add_pooling_layer=False)
        self.drop = nn.Dropout(p=0.3)
        self.linear = nn.Linear(self.roberta.config.hidden_size,n_classes)
    def forward(self, input_ids, attention_mask):
        output = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        # RoBERTa doesn't use pooler_output like BERT
        # Use the first token's hidden state from the last_hidden_state
        cls_output = output['last_hidden_state'][:, 0, :]  # [batch_size, hidden_size]

        drop_output = self.drop(cls_output)
        output = self.linear(drop_output)
        return output

In [None]:
model = RoBERTtuitoSentimentClassifier(NCLASES)
model = model.to(device=device)

In [None]:
#Parametros del entrenamiento
EPOCHS = 1000
PATIENCE = 7
MIN_DELTA = 0.01

optimizer = AdamW(model.parameters(),lr=2e-5,weight_decay=0.001)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps= total_steps
)
#loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1).to(device=device)
class_counts = df['sentimiento'].value_counts().sort_index().values
class_weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
loss_fn = nn.CrossEntropyLoss(weight=class_weights.to(device), label_smoothing=0.1).to(device)
#early_stopping = EarlyStopping(patience=PATIENCE, min_delta=MIN_DELTA) - No se usa en la versión final por no mostrar un buen rendimiento en validación
early_stoppingf1 = EarlyStoppingF1(patience=7, min_delta=0.01)


In [None]:
#Definición de Entrenamiento
def train_model(model,data_loader,loss_fn,optimazer,device,scheduler,n_examples):
    model = model.train()
    losses = []
    correct_predictions = 0
    for batch in data_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids=input_ids,attention_mask=attention_mask)
        _,preds = torch.max(outputs,dim=1)
        loss = loss_fn(outputs,labels)
        correct_predictions += torch.sum(preds==labels)
        losses.append(loss.item())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=1.0) #Evita que el entrenamiento se estanque
        optimazer.step()
        scheduler.step()
        optimazer.zero_grad() # reset
    return correct_predictions.double()/n_examples,np.mean(losses)

def eval_model(model,data_loader,loss_fn,device,n_examples):
    model = model.eval()
    losses = []
    correct_preds = 0
    with torch.no_grad(): #No modificar ningun parametro
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids=input_ids,attention_mask=attention_mask)
            _,preds = torch.max(outputs,dim=1)
            loss = loss_fn(outputs,labels)
            correct_preds += torch.sum(preds==labels)
            losses.append(loss.item())
    return correct_preds.double()/n_examples,np.mean(losses)

In [None]:

target_names = ['class Negativo', 'class Neutral', 'class Positivo', 'class Alerta']
def eval_model_with_metrics(model, data_loader, loss_fn, device, n_examples, plot_confusion=False):
    model = model.eval()
    losses = []
    correct_preds = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():  # No modificar ningun parametro
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels)
            correct_preds += torch.sum(preds == labels)
            losses.append(loss.item())

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    accuracy = correct_preds.double() / n_examples
    avg_loss = np.mean(losses)
    macro_f1 = f1_score(all_labels, all_preds, average='macro')
    # Métricas de evaluación usando scikit-learn
    print("Classification Report:\n", classification_report(all_labels, all_preds,target_names=target_names))
    print("Confusion Matrix:\n", confusion_matrix(all_labels, all_preds))
    print("Accuracy:", accuracy.item())
    print(f"Macro F1 Score: {macro_f1:.4f}")

    if plot_confusion:
        # Visualizar la matriz de confusión solo cuando se indique
        cm = confusion_matrix(all_labels, all_preds)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm)
        disp.plot(cmap=plt.cm.Blues)
        plt.title("Confusion Matrix")
        plt.show()

    return accuracy, avg_loss, macro_f1, all_labels, all_preds


In [None]:


def eval_model_with_auc(model, data_loader, loss_fn, device, n_examples):
    model = model.eval()
    losses = []
    all_labels = []
    all_preds_probs = []

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)

            # Salida de logits y cálculo de probabilidades
            probs = torch.nn.functional.softmax(outputs, dim=1)
            loss = loss_fn(outputs, labels)
            losses.append(loss.item())

            all_labels.extend(labels.cpu().numpy())
            all_preds_probs.extend(probs.cpu().numpy().tolist())

    avg_loss = np.mean(losses)

    # Convierte all_preds_probs a un arreglo de NumPy
    all_preds_probs = np.array(all_preds_probs)
    all_labels = np.array(all_labels)

    # Calcula el ROC-AUC para cada clase usando el enfoque One-vs-One (OvO)
    roc_auc = roc_auc_score(all_labels, all_preds_probs, multi_class='ovo')

    # Calcula la curva Precision-Recall y el AUC para cada clase
    n_classes = all_preds_probs.shape[1]
    pr_auc_list = []

    for i in range(n_classes):
        # Obtener los valores binarios para la clase actual
        binarized_labels = (all_labels == i).astype(int)
        precision, recall, _ = precision_recall_curve(binarized_labels, all_preds_probs[:, i])
        pr_auc = auc(recall, precision)
        pr_auc_list.append(pr_auc)

    avg_pr_auc = np.mean(pr_auc_list)

    print(f'ROC AUC Score: {roc_auc}')
    print(f'Average Precision-Recall AUC: {avg_pr_auc}')

    return avg_loss


In [None]:
# Inicializar variables para almacenar las métricas por época
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)
    print(len(train_data_loader))
    train_acc, train_loss = train_model(
        model,
        train_data_loader,
        loss_fn,
        optimizer,
        device,
        scheduler,
        len(df_train)
    )

    val_acc, val_loss, macro_f1, test_labels, test_preds = eval_model_with_metrics(
        model,
        test_data_loader,
        loss_fn,
        device,
        len(df_test),
        plot_confusion=False  # No graficar durante las épocas
    )
        # Almacenar las métricas
    train_losses.append(train_loss)
    test_losses.append(val_loss)
    train_accuracies.append(train_acc.item())  # Convertir a float si es tensor
    test_accuracies.append(val_acc.item())  # Convertir a float si es tensor

    print(f"📉 Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"📈 Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
    print()
    # Verificar si hay que detener el entrenamiento
    if early_stoppingf1(macro_f1, model, 5):
      break  # Se interrumpe el entrenamiento si no hay mejora


In [None]:
def classifySentiment(review_text):
    encoding_review = tokenizer.encode_plus(
        review_text,  #
        max_length=10,
        add_special_tokens=True,  # Add '[CLS]' and '[SEP]'
        return_token_type_ids=False,
        padding='max_length',
        return_attention_mask=True,
        return_tensors='pt'  # Return PyTorch tensors

    )

    input_ids = encoding_review['input_ids'].to(device)
    attention_mask = encoding_review['attention_mask'].to(device)
    output = model(input_ids, attention_mask)
    _, prediction = torch.max(output, dim=1)

    # Mapeo
    sentiment_mapping = {0: 'Negativo', 1: 'Neutral', 2: 'Positivo',3 : 'Alerta'}


    predicted_sentiment = sentiment_mapping[prediction.item()]
    print(f"Texto: {review_text}")
    print(f"Sentimiento predicho: {predicted_sentiment}")

    return predicted_sentiment

In [None]:
sample_txt = 'Debe mejorar: Muy buen profesor, pero debería ser más exigente porque cualquiera puede pasar'

print(classifySentiment(sample_txt))

In [None]:
sample_list = ['Tiene una actitud sumamente correcta que se irradia a los estudiantes',
  'Su conocimiento',
  'respeta los horarios de clases',
  'nada, no sabe enseñar y no sabe la materia practica',
  'Excelente docente, nos guio para poder sobresalir siempre en su materia y nos enseno del futuro de un ingeniero.',
  'es bien organizada y exigente',
  'Buen dominio en la MATERIA',
  'nada, no sabe enseñar y no sabe la materia practica',
  'El interés por fomentar el trabajo en equipo',
  'Buena enseñanza',
  'conocimientos  de la asignatura',
  'Desarrollo de la materia  con claridad',
  'SU CONOCIMIENTO DE LA ASIGNATURA',
  'Su compromiso',
  'Su puntualidad',
  'la puntualidad',
  '-Capacidad de aprendizaje. -Conocimiento de los temas. -CAPACIDAD PARA PROPONER ACTIVIDADES RELACIONADAS CON CADA UNO DE LOS TEMAS DICTADOS EN CLASE.',
  'Conceptos generales',
  'mucho conocimiento de la asignatura',
  'SU compresion de la materia']


for sample in sample_list:
  print(classifySentiment(sample))

In [None]:
sample_txt = 'En una práctica, no respondí una pregunta y la docente asumió que no asistí, dándome cero en el informe.luego en clase, mencionó el incidente avergonzándome frente a todos y no solo ami sino a dos estudiantes mas.no deberia hacer eso'
print(classifySentiment(sample_txt))