In [None]:
!pip install datasets transformers
!pip install --upgrade datasets
!pip install scikit-learn
!pip install imbalanced-learn
!pip install accelerate>=0.26.0
!pip install seaborn
!pip install matplotlib
!pip install tqdm
!pip install sentencepiece
!pip install sacremoses
!pip install nltk

# 1. Library Imports

In [None]:
import pandas as pd
import re
import torch
import torch.nn as nn
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    classification_report,
    roc_curve,
    auc
)
from transformers import (
    AutoTokenizer,
    BertTokenizer,
    BertForSequenceClassification,
    TrainingArguments,
    Trainer,
    AdamW,
    EarlyStoppingCallback,
    BertConfig,
    M2M100Tokenizer,
    M2M100ForConditionalGeneration,
    DataCollatorWithPadding,
    get_scheduler
)
from imblearn.over_sampling import SMOTE
from datasets import Dataset, concatenate_datasets
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss, BCEWithLogitsLoss
import torch.nn.functional as F
from tqdm import tqdm
import sentencepiece
import os

# 2. Reading train.csv and Division by Source

In [None]:
df = pd.read_csv('/kaggle/input/i2c-challenge-dataset/train.csv')
print(df['source'].unique())

detests = df[df['source'] == 'detests']
stereohoax = df[df['source'] == 'stereohoax']

print(detests['stereotype'].value_counts())
print(stereohoax['stereotype'].value_counts())

# 3. Text Preprocessing

In [None]:
def preprocess_text(text):
    # Eliminar enlaces web
    text = re.sub(r'http\S+|www\S+', '', text)
    # Eliminar menciones de usuarios
    text = re.sub(r'@\w+', '', text)
    # Eliminar hashtags
    text = re.sub(r'#\w+', '', text)
    # Eliminar palabras clave específicas (URL, user, etc.)
    text = re.sub(r'\b(URL|user|url|USER)\b', '', text, flags=re.IGNORECASE)
    # Sustituir múltiples signos de exclamación por uno solo
    text = re.sub(r'!+', '!', text)
    # Eliminar caracteres no deseados (manteniendo letras, números y espacios)
    text = re.sub(r'[^\w\sáéíóúÁÉÍÓÚñÑ0-9]', '', text)
    # Eliminar el símbolo @ y comillas dobles
    text = text.replace('@', '').replace('"', '')
    # Eliminar espacios adicionales
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Aplicar el preprocesado a los datasets
detests = detests.copy()
stereohoax = stereohoax.copy()

detests['text'] = detests['text'].apply(preprocess_text)
stereohoax['text'] = stereohoax['text'].apply(preprocess_text)


# 4. Tokenizer

In [None]:
tokenizer = BertTokenizer.from_pretrained('dccuchile/bert-base-spanish-wwm-cased')

# 5. Splitting Datasets into Training, Validation, and Test Subsets

In [None]:
detests = detests.rename(columns={'stereotype': 'labels'})
stereohoax = stereohoax.rename(columns={'stereotype': 'labels'})

detests_dataset = Dataset.from_pandas(detests)
stereohoax_dataset = Dataset.from_pandas(stereohoax)

train_test_split_detests = detests_dataset.train_test_split(test_size=0.2, seed=42)
train_data_detests = train_test_split_detests['train']
test_data_detests = train_test_split_detests['test']

train_valid_split_detests = train_data_detests.train_test_split(test_size=0.3, seed=42)
train_data_detests = train_valid_split_detests['train']
valid_data_detests = train_valid_split_detests['test']

train_test_split_stereohoax = stereohoax_dataset.train_test_split(test_size=0.2, seed=42)
train_data_stereohoax = train_test_split_stereohoax['train']
test_data_stereohoax = train_test_split_stereohoax['test']

train_valid_split_stereohoax = train_data_stereohoax.train_test_split(test_size=0.3, seed=42)
train_data_stereohoax = train_valid_split_stereohoax['train']
valid_data_stereohoax = train_valid_split_stereohoax['test']

print(f"Detests: train={len(train_data_detests)}, validation={len(valid_data_detests)}, test={len(test_data_detests)}")
print(f"Stereohoax: train={len(train_data_stereohoax)}, validation={len(valid_data_stereohoax)}, test={len(test_data_stereohoax)}")

# 6. Applying Back Translation to Balance Classes

In [None]:
model_name = "facebook/m2m100_418M"
tokenizer = M2M100Tokenizer.from_pretrained(model_name)
model = M2M100ForConditionalGeneration.from_pretrained(model_name)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

model.to(device)

def back_translate_es_de_es(text, src_lang, mid_lang, num_beams, temperature, top_k):
    # Traducción de español a alemán
    tokenizer.src_lang = src_lang
    encoded = tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(device)
    translated_tokens = model.generate(
        **encoded,
        forced_bos_token_id=tokenizer.lang_code_to_id[mid_lang],
        do_sample=True,
        temperature=temperature,
        top_k=top_k,
        num_beams=num_beams
    )
    mid_text = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)

    # Traducción de alemán de vuelta a español
    tokenizer.src_lang = mid_lang
    encoded = tokenizer(mid_text, return_tensors="pt", padding=True, truncation=True).to(device)
    back_translated_tokens = model.generate(
        **encoded,
        forced_bos_token_id=tokenizer.lang_code_to_id[src_lang],
        do_sample=True,
        temperature=temperature,
        top_k=top_k,
        num_beams=num_beams
    )
    back_translated_text = tokenizer.decode(back_translated_tokens[0], skip_special_tokens=True)
    return back_translated_text

def apply_back_translation(dataset, src_lang, mid_langs, num_beams, temperature, top_k, balance_ratio, max_iterations=3):
    df = dataset.to_pandas()
    class_counts = df["labels"].value_counts()
    minority_class = class_counts.idxmin()
    majority_class = class_counts.idxmax()
    minority_df = df[df["labels"] == minority_class]
    minority_count = len(minority_df)
    majority_count = len(df[df["labels"] == majority_class])
    target_size = int(minority_count + (majority_count - minority_count) * balance_ratio)
    
    print(f"Aumentando la clase minoritaria de {minority_count} a {target_size} instancias")
    
    new_translated_texts = []
    new_labels = []
    needed_instances = target_size - minority_count
    instances_to_generate = minority_df["text"].tolist()
    
    # Límite de seguridad para evitar bucles infinitos
    iteration_count = 0
    
    with tqdm(total=needed_instances, desc="Progreso de la traducción", unit="instancia") as pbar:
        while len(new_translated_texts) < needed_instances and iteration_count < max_iterations:
            iteration_count += 1
            print(f"Iteración {iteration_count} de {max_iterations}")
            
            torch.cuda.empty_cache()  # Liberar memoria GPU
            
            for i, text in enumerate(instances_to_generate):
                if len(new_translated_texts) >= needed_instances:
                    break
                    
                # Rotar entre idiomas intermedios
                mid_lang = mid_langs[i % len(mid_langs)]
                
                try:
                    # Realizar la traducción inversa con tiempo de espera limitado
                    back_translated_text = back_translate_es_de_es(
                        text, src_lang, mid_lang, num_beams, temperature, top_k
                    )
                    
                    new_translated_texts.append(back_translated_text)
                    new_labels.append(minority_class)
                    pbar.update(1)
                    
                except Exception as e:
                    print(f"Error en la traducción: {e}")
                    continue
            
            # Si no se añadieron nuevas instancias en esta iteración, salir del bucle
            if iteration_count == max_iterations:
                print("Se alcanzó el número máximo de iteraciones")
                break
                
            torch.cuda.empty_cache()  # Liberar memoria GPU nuevamente
    
    # Si no se generaron suficientes instancias, informar
    if len(new_translated_texts) < needed_instances:
        print(f"Advertencia: Solo se generaron {len(new_translated_texts)} de {needed_instances} instancias necesarias")
    
    # Concatenar los datos originales con los generados
    augmented_df = pd.DataFrame({
        "text": new_translated_texts,
        "labels": new_labels
    })
    
    df = pd.concat([df, augmented_df], ignore_index=True)
    
    # Realizar un shuffle del dataset combinado
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    return Dataset.from_pandas(df)


src_lang = "es"
mid_langs = ["de", "fr", "pl"]  # Lista de idiomas intermedios
num_beams = 5
temperature = 0.8
top_k = 50
balance_ratio = 1

train_detests_resampled = apply_back_translation(
    train_data_detests, src_lang, mid_langs, num_beams, temperature, top_k, balance_ratio
)

train_stereohoax_resampled = apply_back_translation(
    train_data_stereohoax, src_lang, mid_langs, num_beams, temperature, top_k, balance_ratio
)

valid_data_detests_resampled = apply_back_translation(
    valid_data_detests, src_lang, mid_langs, num_beams, temperature, top_k, balance_ratio
)

valid_data_stereohoax_resampled = apply_back_translation(
    valid_data_stereohoax, src_lang, mid_langs, num_beams, temperature, top_k, balance_ratio
)


In [None]:
import random
import nltk
from nltk.corpus import wordnet
from tqdm import tqdm
import pandas as pd
from datasets import Dataset

# Descargar WordNet y WordNet en español
nltk.download('wordnet')
nltk.download('omw-1.4')  # Open Multilingual WordNet (incluye español)

def get_synonyms(word):
    """Obtiene una lista de sinónimos para una palabra usando WordNet."""
    synonyms = set()
    for syn in wordnet.synsets(word, lang="spa"):
        for lemma in syn.lemmas(lang="spa"):
            synonyms.add(lemma.name())
    return list(synonyms)

def replace_with_synonyms(text, synonym_prob=0.3):
    """Reemplaza palabras en un texto por sus sinónimos con una probabilidad dada."""
    words = text.split()
    new_words = []
    for word in words:
        if random.random() < synonym_prob:
            synonyms = get_synonyms(word)
            if synonyms:
                new_words.append(random.choice(synonyms))
            else:
                new_words.append(word)
        else:
            new_words.append(word)
    return " ".join(new_words)

def generate_synthetic_data(dataset, balance_ratio, synonym_prob=0.3):
    """Genera nuevas instancias para balancear las clases utilizando sinónimos."""
    df = dataset.to_pandas()
    class_counts = df["labels"].value_counts()
    minority_class = class_counts.idxmin()
    majority_class = class_counts.idxmax()
    minority_df = df[df["labels"] == minority_class]
    minority_count = len(minority_df)
    majority_count = len(df[df["labels"] == majority_class])
    target_size = int(minority_count + (majority_count - minority_count) * balance_ratio)
    print(f"Aumentando la clase minoritaria de {minority_count} a {target_size} instancias")

    new_texts = []
    new_labels = []
    needed_instances = target_size - minority_count
    instances_to_generate = minority_df["text"].tolist()

    with tqdm(total=needed_instances, desc="Generando instancias", unit="instancia") as pbar:
        while len(new_texts) < needed_instances:
            for text in instances_to_generate:
                synthetic_text = replace_with_synonyms(text, synonym_prob=synonym_prob)
                if len(new_texts) < needed_instances:
                    new_texts.append(synthetic_text)
                    new_labels.append(minority_class)
                    pbar.update(1)
                if len(new_texts) >= needed_instances:
                    break

    # Concatenar los datos originales con los generados
    df = pd.concat([df, pd.DataFrame({
        "text": new_texts,
        "labels": new_labels
    })], ignore_index=True)

    # Realizar un shuffle del dataset combinado
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)

    return Dataset.from_pandas(df)

# Configuración
synonym_prob = 0.8  # Probabilidad de sustituir palabras por sinónimos
balance_ratio = 1

# Generar datasets balanceados
train_detests_resampled = generate_synthetic_data(train_data_detests, balance_ratio, synonym_prob)
train_stereohoax_resampled = generate_synthetic_data(train_data_stereohoax, balance_ratio, synonym_prob)
valid_data_detests_resampled = generate_synthetic_data(valid_data_detests, balance_ratio, synonym_prob)
valid_data_stereohoax_resampled = generate_synthetic_data(valid_data_stereohoax, balance_ratio, synonym_prob)

# 7. Saving Balanced Datasets

In [None]:
train_detests_resampled.to_csv("train_detests_resampledBERTO.csv", index=False)
train_stereohoax_resampled.to_csv("train_stereohoax_resampledBERTO.csv", index=False)

valid_data_detests_resampled.to_csv("valid_data_detests_resampledBERTO.csv", index=False)
valid_data_stereohoax_resampled.to_csv("valid_data_stereohoax_resampledBERTO.csv", index=False)


# 8. Loading Balanced Datasets

In [None]:
train_detests_resampled = pd.read_csv("train_detests_resampledBERTO.csv")
train_stereohoax_resampled = pd.read_csv("train_stereohoax_resampledBERTO.csv")

valid_data_detests_resampled = pd.read_csv("valid_data_detests_resampledBERTO.csv")
valid_data_stereohoax_resampled = pd.read_csv("valid_data_stereohoax_resampledBERTO.csv")

train_detests_resampled = Dataset.from_pandas(train_detests_resampled)
train_stereohoax_resampled = Dataset.from_pandas(train_stereohoax_resampled)

valid_data_detests_resampled = Dataset.from_pandas(valid_data_detests_resampled)
valid_data_stereohoax_resampled = Dataset.from_pandas(valid_data_stereohoax_resampled)

# 9. Tokenization of Balanced Subsets

In [None]:
bert_tokenizer = BertTokenizer.from_pretrained("dccuchile/bert-base-spanish-wwm-cased")

tokenized_train_detests_resampled = train_detests_resampled.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)
tokenized_train_stereohoax_resampled = train_stereohoax_resampled.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)

tokenized_train_detests_resampled = tokenized_train_detests_resampled.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)
tokenized_train_stereohoax_resampled = tokenized_train_stereohoax_resampled.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)

tokenized_valid_data_detests_resampled = valid_data_detests_resampled.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)

tokenized_valid_data_stereohoax_resampled = valid_data_stereohoax_resampled.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)

tokenized_valid_data_detests_resampled = tokenized_valid_data_detests_resampled.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)
tokenized_valid_data_stereohoax_resampled = tokenized_valid_data_stereohoax_resampled.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)

# 10. Checking Class Distribution

In [None]:
def check_class_distribution(dataset):

    df = dataset.to_pandas();

    class_counts = df['labels'].value_counts()

    print("\nDistribución de clases después de la generación:")
    for class_id, count in class_counts.items():
        print(f"Clase {class_id}: {count} instancias")

    majority_count = class_counts.max()
    minority_count = class_counts.min()
    proportion = majority_count / minority_count if minority_count > 0 else float('inf')

    print(f"\nProporción entre clases: {proportion:.2f} (Mayoritaria vs Minoritaria)")

check_class_distribution(train_detests_resampled)
check_class_distribution(train_stereohoax_resampled)

check_class_distribution(valid_data_detests_resampled)
check_class_distribution(valid_data_stereohoax_resampled)

# 11. Definition of Compute Metrics Function (Added Class Weighting)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = 'dccuchile/bert-base-spanish-wwm-cased'
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device)

loss_fn = BCEWithLogitsLoss()

def compute_metrics(p):
    predictions = p.predictions
    labels = p.label_ids

    if not isinstance(predictions, torch.Tensor):
        predictions = torch.tensor(predictions)

    preds = predictions.argmax(dim=-1)

    if torch.cuda.is_available():
        predictions = predictions.to(device)
        preds = preds.to(device)
        labels = torch.tensor(labels).to(device)

    labels_one_hot = torch.nn.functional.one_hot(labels, num_classes=2).float()

    loss = loss_fn(predictions, labels_one_hot)

    # Calculate metrics with zero_division handling
    precision = precision_score(labels.cpu().numpy(), preds.cpu().numpy(), average="weighted", zero_division=0)
    recall = recall_score(labels.cpu().numpy(), preds.cpu().numpy(), average="weighted", zero_division=0)
    f1 = f1_score(labels.cpu().numpy(), preds.cpu().numpy(), average="weighted", zero_division=0)

    return {
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "loss": loss.item()
    }

# 12. Training and Evaluation on Both Datasets

## 12.1 Training and Evaluation

In [None]:
# Configuración del dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print(f"Usando {'CUDA' if torch.cuda.is_available() else 'CPU'}")

# Data collator
data_collator = DataCollatorWithPadding(tokenizer, padding=True)

# Optimizer (con weight decay explícito)
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=1e-5,  # Tasa de aprendizaje
    weight_decay=0.1  # Regularización explícita
)

# Configuración para el modelo de detests
training_args_detests = TrainingArguments(
    output_dir='./results_detests',
    num_train_epochs=3,  # 8 épocas
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    eval_strategy="epoch",
    logging_dir='./logs_detests',
    load_best_model_at_end=True,
    metric_for_best_model='f1',
    save_strategy='epoch',
    weight_decay=0.24766057445487644,  # También definido en los TrainingArguments
    learning_rate=1.6082616103256584e-05,
    logging_steps=10,
    save_total_limit=3,
    lr_scheduler_type='linear',
    warmup_ratio=0.1
)

# Configuración del early stopping
early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=2,  # Número de épocas sin mejora antes de detener
    early_stopping_threshold=0.01  # Umbral mínimo de mejora requerido
)

trainer_detests = Trainer(
    model=model,
    args=training_args_detests,
    train_dataset=tokenized_train_detests_resampled,
    eval_dataset=tokenized_valid_data_detests_resampled,
    compute_metrics=compute_metrics,
    callbacks=[early_stopping_callback],
    optimizers=(optimizer, None),  # Aquí se usa el optimizador explícito
    data_collator=data_collator
)

# Entrenamiento y evaluación del modelo de detests
trainer_detests.train()
trainer_detests.evaluate()

# Configuración para el modelo de stereohoax
training_args_stereohoax = TrainingArguments(
    output_dir='./results_stereohoax',
    num_train_epochs=6,  # 4 épocas
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    eval_strategy="epoch",
    logging_dir='./logs_stereohoax',
    load_best_model_at_end=True,
    metric_for_best_model='f1',
    save_strategy='epoch',
    weight_decay=0.06242226257042495,  # También definido aquí
    learning_rate=2.103090443160085e-05,
    logging_steps=10,
    save_total_limit=3,
    lr_scheduler_type='linear',
    warmup_ratio=0.1
)

trainer_stereohoax = Trainer(
    model=model,
    args=training_args_stereohoax,
    train_dataset=tokenized_train_stereohoax_resampled,
    eval_dataset=tokenized_valid_data_stereohoax_resampled,
    compute_metrics=compute_metrics,
    callbacks=[early_stopping_callback],
    optimizers=(optimizer, None),  # Aquí también
    data_collator=data_collator
)

# Entrenamiento y evaluación del modelo de stereohoax
trainer_stereohoax.train()
trainer_stereohoax.evaluate()


# 13. Saving Models

In [None]:
trainer_detests.save_model("./model_detests")

trainer_stereohoax.save_model("./model_stereohoax")

# 14. Testing Models

## 14.1. Tokenization of Test Subset

In [None]:
# Tokenizar los textos de prueba
tokenized_test_data_detests = test_data_detests.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)

tokenized_test_data_stereohoax = test_data_stereohoax.map(
    lambda x: bert_tokenizer(x['text'], padding=True, truncation=True, max_length=256),
    batched=True
)

# Convertir los conjuntos de datos tokenizados al formato de PyTorch
tokenized_test_data_detests = tokenized_test_data_detests.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)
tokenized_test_data_stereohoax = tokenized_test_data_stereohoax.with_format(
    "torch", columns=["input_ids", "attention_mask", "labels"]
)

## 14.2. Model Testing

In [None]:
test_results_detests = trainer_detests.predict(tokenized_test_data_detests)
predictions_detests = test_results_detests.predictions.argmax(axis=-1)
labels_detests = test_results_detests.label_ids

print(predictions_detests)
print(classification_report(labels_detests, predictions_detests))

test_results_stereohoax = trainer_stereohoax.predict(tokenized_test_data_stereohoax)
predictions_stereohoax = test_results_stereohoax.predictions.argmax(axis=-1)
labels_stereohoax = test_results_stereohoax.label_ids  # Corrige esta línea

assert len(predictions_stereohoax) == len(labels_stereohoax), "Dimensiones no coinciden."
print(predictions_stereohoax)
print(classification_report(labels_stereohoax, predictions_stereohoax))


# 15. ROC Curves

In [None]:
from sklearn.metrics import roc_curve, auc, precision_recall_curve
import matplotlib.pyplot as plt
import torch

def plot_curves(trainer, eval_dataset, title):
    # Obtener predicciones y etiquetas reales
    predictions = trainer.predict(eval_dataset)
    
    # Extraer las probabilidades y las etiquetas reales
    probs = predictions.predictions  # Logits o probabilidades
    labels = predictions.label_ids
    
    # Convertir logits a probabilidades con softmax (si no es directamente una probabilidad)
    probs = torch.nn.functional.softmax(torch.tensor(probs), dim=-1).numpy()
    
    # Para las curvas, necesitamos las probabilidades de la clase positiva
    positive_probs = probs[:, 1]
    
    # Calcular la curva ROC
    fpr, tpr, _ = roc_curve(labels, positive_probs)
    roc_auc = auc(fpr, tpr)
    
    # Calcular la curva de precisión-recall
    precision, recall, _ = precision_recall_curve(labels, positive_probs)
    pr_auc = auc(recall, precision)
    
    # Graficar la curva ROC
    plt.figure(figsize=(12, 5))
    
    # Subplot para la curva ROC
    plt.subplot(1, 2, 1)
    plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')  # Línea diagonal (clasificador aleatorio)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curve - {title}')
    plt.legend(loc="lower right")
    plt.grid(alpha=0.3)
    
    # Subplot para la curva de precisión-recall
    plt.subplot(1, 2, 2)
    plt.plot(recall, precision, color='green', lw=2, label=f'PR curve (area = {pr_auc:.2f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'Precision-Recall Curve - {title}')
    plt.legend(loc="lower left")
    plt.grid(alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Generar y mostrar las curvas para el modelo detests
plot_curves(trainer_detests, tokenized_test_data_detests, title="detests")

# Generar y mostrar las curvas para el modelo stereohoax
plot_curves(trainer_stereohoax, tokenized_test_data_stereohoax, title="stereohoax")


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

def plot_confusion_matrix(trainer, eval_dataset, title):
    # Obtener predicciones y etiquetas reales
    predictions = trainer.predict(eval_dataset)
    
    # Extraer las probabilidades y las etiquetas reales
    probs = predictions.predictions  # Logits o probabilidades
    labels = predictions.label_ids
    
    # Convertir logits a probabilidades con softmax (si no es directamente una probabilidad)
    probs = torch.nn.functional.softmax(torch.tensor(probs), dim=-1).numpy()
    
    # Obtener la clase predicha (la clase con mayor probabilidad)
    predicted_classes = probs.argmax(axis=-1)
    
    # Calcular la matriz de confusión
    cm = confusion_matrix(labels, predicted_classes)
    
    # Mostrar la matriz de confusión
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1])  # Cambiar las etiquetas si no son binarias
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix - {title}')
    plt.show()

# Generar y mostrar la matriz de confusión para el modelo detests
plot_confusion_matrix(trainer_detests, tokenized_test_data_detests, title="detests")

# Generar y mostrar la matriz de confusión para el modelo stereohoax
plot_confusion_matrix(trainer_stereohoax, tokenized_test_data_stereohoax, title="stereohoax")


# 16. Creation, Training and Evaluation of the Ensemble

In [None]:
import os
import re
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification

# Crear un nuevo modelo para el ensamble
class EnsembleModel(torch.nn.Module):
    def __init__(self, model1, model2):
        super(EnsembleModel, self).__init__()
        self.model1 = model1
        self.model2 = model2

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        outputs1 = self.model1(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        outputs2 = self.model2(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        logits = (outputs1.logits + outputs2.logits) / 2
        return logits

# Dataset para el test
class TestDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=512):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        inputs = self.tokenizer(
            text,
            max_length=self.max_length,
            truncation=True,
            padding="max_length",
            return_tensors="pt"
        )
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
        }

# Configurar el dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Cargar modelos entrenados
model1 = BertForSequenceClassification.from_pretrained("./model_detests").to(device)
model2 = BertForSequenceClassification.from_pretrained("./model_stereohoax").to(device)

# Crear el modelo ensamblado
ensemble_model = EnsembleModel(model1, model2).to(device)
ensemble_model.eval()

# Cargar el tokenizador existente
tokenizer = BertTokenizer.from_pretrained("dccuchile/bert-base-spanish-wwm-cased")

# Cargar y preprocesar el archivo test.csv
test_data = pd.read_csv("/kaggle/input/i2c-challenge-dataset/test.csv")
test_data["text"] = test_data["text"].apply(preprocess_text)
texts = test_data["text"].tolist()
ids = test_data["id"].tolist()

# Crear DataLoader para el test set
test_dataset = TestDataset(texts, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=16)

# Realizar predicciones
predictions = []
with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        logits = ensemble_model(input_ids, attention_mask)
        preds = torch.argmax(logits, dim=1).cpu().numpy()
        predictions.extend(preds)

# Crear el archivo de resultados
results = pd.DataFrame({"id": ids, "stereotype_predicted": predictions})
results.to_csv("run_1_berto_sinonimos_08.csv", index=False)

print("Predicciones guardadas en 'predictions.csv'")



# 17. Testing the Ensemble with train.csv

In [None]:
# Cargar el dataset original
train_data = pd.read_csv("/kaggle/input/i2c-challenge-dataset/train.csv")

# Filtrar datos por las fuentes 'detests' y 'stereohoax'
detests = train_data[train_data['source'] == 'detests']
stereohoax = train_data[train_data['source'] == 'stereohoax']

# Concatenar los dos conjuntos de datos
original_data = pd.concat([detests, stereohoax])

# Aplicar preprocesado a los textos
original_data["text"] = original_data["text"].apply(preprocess_text)

# Extraer textos preprocesados y etiquetas reales
texts = original_data["text"].tolist()
labels = original_data["stereotype"].tolist()

# Crear DataLoader para el conjunto original utilizando TestDataset
original_dataset = TestDataset(texts, tokenizer)
original_dataloader = DataLoader(original_dataset, batch_size=16)

# Realizar predicciones y calcular el porcentaje de acierto con barra de progreso
predictions = []
with torch.no_grad():
    for batch in tqdm(original_dataloader, desc="Procesando batches"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        logits = ensemble_model(input_ids, attention_mask)
        preds = torch.argmax(logits, dim=1).cpu().numpy()
        predictions.extend(preds)

# Comparar predicciones con etiquetas reales
correct_predictions = sum(p == l for p, l in zip(predictions, labels))
total_samples = len(labels)
accuracy = correct_predictions / total_samples * 100

f1 = f1_score(labels, predictions, average="weighted")

# Mostrar resultados
print(f"Porcentaje de acierto: {accuracy:.2f}%")
print(f"F1 Score: {f1:.2f}")