In [None]:
# !pip install transformers torch scikit-learn unidecode datasets

In [None]:
# Importar las librerías
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from torch.optim import SGD, Adam, AdamW
from transformers import RobertaForSequenceClassification, RobertaTokenizer, AutoTokenizer,  AutoModelForSequenceClassification


In [None]:
# Detectar dispositivo (GPU o CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
device

In [None]:
lang = "ukr"
emotion = "surprise"
model_name = "google-bert/bert-base-multilingual-uncased"
max_length = 71
num_epochs = 3
adam_lr=2e-5

save_model = True

# Leer los archivos de Google Drive
train_path = f'/content/drive/MyDrive/Proyectos/semeval/data/newest/train/{lang}.csv'
val_path = f'/content/drive/MyDrive/Proyectos/semeval/data/newest/dev/{lang}.csv'
test_path = f'/content/drive/MyDrive/Proyectos/semeval/data/newest/dev/{lang}.csv'

output_dir = f'/content/drive/MyDrive/Proyectos/semeval/models/{lang}/{emotion}_3_level/'

In [None]:

df_train = pd.read_csv(train_path)
df_val = pd.read_csv(val_path)
df_test = pd.read_csv(test_path)

In [None]:
df_train = pd.concat([df_train, df_val], ignore_index=True)

In [None]:
df_train = df_train[["text", emotion]]
df_val = df_val[["text", emotion]]
df_test = df_test[["text", emotion]]

df_train = df_train[df_train[emotion] != 0]
df_val = df_val[df_val[emotion] != 0]
df_test = df_test[df_test[emotion] != 0]

In [None]:
df_train.columns = ["text", "label"]
df_val.columns = ["text", "label"]
df_test.columns = ["text", "label"]

In [None]:
df_train

In [None]:
# Mapeo de etiquetas a índices
label_mapping = {1:0,  2:1, 3:2}
df_train['label'] = df_train['label'].map(label_mapping)
df_val['label'] = df_val['label'].map(label_mapping)
df_test['label'] = df_test['label'].map(label_mapping)


In [None]:
# Convertir a Dataset de Hugging Face
train_dataset = Dataset.from_pandas(df_train[['text', 'label']])
val_dataset = Dataset.from_pandas(df_val[['text', 'label']])
test_dataset = Dataset.from_pandas(df_test[['text', 'label']])


In [None]:
# Tokenizador
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
# Tokenización
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=max_length)


In [None]:
train_dataset = train_dataset.map(tokenize_function, batched=True)
val_dataset = val_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)


In [None]:
# Definir el modelo
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3)
model.config.hidden_dropout_prob = 0.3  # Ajustar el dropout al 30%


In [None]:
# Mover el modelo al dispositivo
model.to(device)


In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='sum'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, logits, labels):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(logits, labels)
        pt = torch.exp(-ce_loss)  # Probabilidades predichas para la clase verdadera
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss  # Sin reducción

In [None]:
class CustomLoss(nn.Module):
    def __init__(self, class_weights, focal_alpha=0.25, focal_gamma=2.0):
        super(CustomLoss, self).__init__()
        # Weighted Cross-Entropy Loss
        self.weighted_loss = nn.CrossEntropyLoss(weight=class_weights)
        # Weighted Smooth Cross-Entropy Loss
        self.smooth_loss = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)
        # Focal Loss personalizada
        self.focal_loss = FocalLoss(alpha=focal_alpha, gamma=focal_gamma)

    def forward(self, logits, labels):
        # Calcular pérdidas
        focal_loss = self.focal_loss(logits, labels)
        weighted_loss = self.weighted_loss(logits, labels)
        smooth_loss = self.smooth_loss(logits, labels)

        # Promediar las pérdidas
        return (focal_loss + weighted_loss + smooth_loss) / 3

In [None]:
# Definir métrica de evaluación
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc, 'precision': precision, 'recall': recall, 'f1': f1}


In [None]:
# Argumentos del entrenamiento
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",  # Evaluar en cada epoch usando el conjunto de validación
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=num_epochs,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    report_to="none"
)


In [None]:
# Pesar las clases
class_counts = df_train['label'].value_counts()
class_weights = torch.tensor([1.0 / class_counts[i] for i in range(len(class_counts))], dtype=torch.float32).to(device)


In [None]:
loss_fn = CustomLoss(class_weights=class_weights)

In [None]:
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels").to(model.device)  # Mover etiquetas al mismo dispositivo que el modelo
        outputs = model(**inputs)  # Obtener las salidas del modelo
        logits = outputs.logits  # Extraer logits
        loss = loss_fn(logits, labels)  # Calcular la pérdida personalizada
        return (loss, outputs) if return_outputs else loss

In [None]:
# Entrenador
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,  # Usamos el conjunto de validación para evaluar en cada epoch
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    optimizers=(AdamW(model.parameters(), lr=adam_lr), None)
    #optimizers=(SGD(model.parameters(), lr=0.01, momentum=0.9), None)

)


In [None]:
# Entrenar el modelo
trainer.train()

# guardado y evaluacion

In [None]:
model.eval()

In [None]:
from torch.utils.data import DataLoader
import torch

# Función collate para convertir a tensores
def collate_fn(batch):
    input_ids = torch.tensor([item['input_ids'] for item in batch])
    attention_mask = torch.tensor([item['attention_mask'] for item in batch])
    labels = torch.tensor([item['label'] for item in batch])
    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'labels': labels
    }

# Crear un DataLoader que use la función collate
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)


In [None]:
# Realizar inferencias sobre el conjunto de test
all_preds = []
all_labels = []

for batch in test_loader:
    # Enviar los tensores a la GPU o CPU
    input_ids = batch['input_ids'].to(model.device)
    attention_mask = batch['attention_mask'].to(model.device)
    labels = batch['labels'].to(model.device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)

    logits = outputs.logits
    preds = torch.argmax(logits, dim=-1).cpu().numpy()
    labels = labels.cpu().numpy()

    all_preds.extend(preds)
    all_labels.extend(labels)


In [None]:
t1 = len(list(set(all_labels)))
t2 = len(list(set(all_preds)))

t3 = max(t1,t2)
target_names = [str(i+1) for i in range(0,t3)]

In [None]:
from sklearn.metrics import classification_report

# Calcular las métricas por clase y globales
report = classification_report(all_labels, all_preds, target_names=target_names)
print(report)


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Calcular y dibujar la matriz de confusión
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['1', '2', '3'], yticklabels=['1', '2', '3'])
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.title('Matriz de Confusión')
plt.show()


In [None]:
# Guardar el modelo en Google Drive

if save_model:
  model.save_pretrained(output_dir)
  tokenizer.save_pretrained(output_dir)
