<h1 align="center">Lab 2:  Sexism Identification in Twitter</h1>
<h2 align="center">Session 4. Transformers and Explainability</h2>
<h3 style="display:block; margin-top:5px;" align="center">Natural Language and Information Retrieval</h3>
<h3 style="display:block; margin-top:5px;" align="center">Degree in Data Science</h3>
<h3 style="display:block; margin-top:5px;" align="center">2024-2025</h3>    
<h3 style="display:block; margin-top:5px;" align="center">ETSInf. Universitat Politècnica de València</h3>
<br>

### Put your names here

- Jaime Ballester Solá
- Marc Romeu Ferras

### CONSTANTS

In [5]:
COLAB = True # IF YOU USE GOOGLE COLAB -> COLAB = True
PIP = True # IF YOU NEED INSTALL LIBRARIES -> PIP = True

**If you have trouble installing ferret-xai or jsonnet on Linux, try installing these packages first:**

- sudo apt install cmake   
- sudo apt install g++     
- sudo apt install make

**Or that after installing ferret-xai:**

- conda install -c conda-forge libstdcxx-ng --update-deps


## Some libraries

In [10]:
if PIP:
    !pip install pip --upgrade
    !pip install transformers --upgrade
    !pip  install datasets accelerate --upgrade
    !pip install ferret-xai --upgrade
    !pip install tqdm
    !pip install transformers

Collecting pip
  Downloading pip-25.1.1-py3-none-any.whl.metadata (3.6 kB)
Downloading pip-25.1.1-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m33.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.1.1
Collecting datasets
  Using cached datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Using cached datasets-3.6.0-py3-none-any.whl (491 kB)
Installing collected packages: datasets
  Attempting uninstall: datasets
    Found existing installation: datasets 2.21.0
    Uninstalling datasets-2.21.0:
      Successfully uninstalled datasets-2.21.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ferret-xai 0.4.2 req

In [11]:
import os
import sys
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, classification_report, accuracy_score
from sklearn.preprocessing import LabelEncoder
import pandas as pd
from typing import List, Tuple
import tqdm

ModuleNotFoundError: Could not import module 'AutoTokenizer'. Are this object's requirements defined correctly?

## Import readerEXIST2025 library, and read the dataset

In [None]:
from pathlib import Path

if COLAB is True:
  from google.colab import drive
  drive.mount('/content/drive',force_remount=True)
  base_path = "/content/drive/MyDrive/EXISTS2025_TweetBusters"
  library_path = base_path + "/Functions"
else:
  base_path = Path.cwd().parent
  library_path = base_path / "Functions"

sys.path.insert(0, str(library_path))
from readerEXIST2025 import EXISTReader

In [None]:
dataset_path = os.path.join(base_path, "EXIST_2025_Dataset_V0.2/EXIST_2025_Dataset_V0.2/")

file_train = os.path.join(dataset_path, "EXIST2025_training.json")
file_dev = os.path.join(dataset_path, "EXIST2025_dev.json")

reader_train = EXISTReader(file_train)
reader_dev = EXISTReader(file_dev)

EnTrainTask1, EnDevTask1 = reader_train.get(lang="EN", subtask="1"), reader_dev.get(lang="EN", subtask="1")
SpTrainTask1, SpDevTask1 = reader_train.get(lang="ES", subtask="1"), reader_dev.get(lang="ES", subtask="1")

## Dataset class

In [None]:
class SexismDataset(Dataset):
    def __init__(self, texts, labels, ids, tokenizer, max_len=128, pad="max_length", trunc=True,rt='pt'):
        self.texts = texts.tolist()
        self.labels = labels
        self.ids = ids
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.pad = pad
        self.trunc = trunc
        self.rt = rt

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        inputs = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,padding=self.pad, truncation=self.trunc,
            return_tensors=self.rt
        )

        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long),
            'id': torch.tensor(self.ids[idx], dtype=torch.long)
        }

## Auxiliary functions

In [None]:
from collections import Counter # Counter counts the number of occurrences of each item
from itertools import tee, count

def uniquify(seq, suffs = count(1)):
    """Make all the items unique by adding a suffix (1, 2, etc).

    `seq` is mutable sequence of strings.
    `suffs` is an optional alternative suffix iterable.
    """
    not_unique = [k for k,v in Counter(seq).items() if v > 1] # so we have: ['name', 'zip']
    # suffix generator dict - e.g., {'name': <my_gen>, 'zip': <my_gen>}
    suff_gens = dict(zip(not_unique, tee(suffs, len(not_unique))))
    for idx,s in enumerate(seq):
        try:
            suffix = str(next(suff_gens[s]))
        except KeyError:
            # s was unique
            continue
        else:
            seq[idx] += suffix

def deduplicate(explanations):
    for i in range(len(explanations)):
        tokens = explanations[i].tokens
        uniquify(tokens, (f'_{x!s}' for x in range(1, 100)))
        explanations[i].tokens=tokens
    return explanations

# Two options to predict

### The simplest

In [None]:
import torch
import numpy as np
from transformers import Trainer, TrainingArguments


def predict_op1(model, dataset, args={}):
    """
    Función predict_op1 modificada:
    - Permite especificar `output_dir` vía `args['output_dir']`
    - Añade parámetro `return_logits` para devolver los logits brutos si se desea
    """
    # Configuración del Trainer
    training_args = TrainingArguments(
        output_dir=args.get("output_dir", "./output"),
        per_device_eval_batch_size=args.get("per_device_eval_batch_size", 16),
        do_train=False,
        do_eval=False,
    )
    trainer = Trainer(model=model, args=training_args)

    # Realizar predicción
    predictions = trainer.predict(dataset)
    logits = predictions.predictions

    # Cálculo de probabilidades con softmax
    probs = torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy()
    # Clases predichas
    pred_classes = np.argmax(logits, axis=-1)
    # Probabilidades asociadas a la clase predicha
    pred_probs = probs[np.arange(len(probs)), pred_classes]

    # Retornar logits si se solicita
    if args.get("return_logits", False):
        return pred_classes, pred_probs, logits
    return pred_classes, pred_probs

### The coolest, if you know what you're doing

In [None]:
import torch
import numpy as np
from torch.utils.data import DataLoader
import tqdm

def predict_op2(model, dataset, args={}):
    """
    Versión mejorada de predict_op2:
    - Parámetro 'per_device_eval_batch_size' configurable vía args
    - Parámetro 'device' configurable vía args['device'], por defecto autodetección
    - Parámetro 'id_key' para eliminar clave de ID personalizada en el batch
    - Opción 'return_logits' para devolver también los logits crudos
    """
    # Dispositivo
    device = args.get("device", None)
    if device is None:
        if torch.cuda.is_available():
            device = torch.device("cuda")
        elif torch.backends.mps.is_available():
            device = torch.device("mps")
        else:
            device = torch.device("cpu")
    model.to(device)
    model.eval()

    # Tamaño de lote
    batch_size = args.get("per_device_eval_batch_size", 16)
    dataloader = DataLoader(dataset, batch_size=batch_size)

    logits_list = []
    with torch.no_grad():
        for batch in tqdm.notebook.tqdm(dataloader, desc="Predicting"):
            # eliminar clave de ID si existe
            batch.pop(args.get("id_key", "id"), None)
            # mover tensores
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            logits_list.append(outputs.logits.cpu())

    # Concatenar logits
    logits = torch.cat(logits_list, dim=0)
    # Probabilidades
    probs = torch.nn.functional.softmax(logits, dim=-1).numpy()
    # Clases predichas
    pred_classes = np.argmax(probs, axis=-1)
    # Probabilidades de la clase predicha
    pred_probs = probs[np.arange(len(pred_classes)), pred_classes]

    if args.get("return_logits", False):
        return pred_classes, pred_probs, logits.numpy()
    return pred_classes, pred_probs

### Predictions from the best Spanish model

In [None]:
from peft import LoraConfig, get_peft_model, TaskType
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import numpy as np
import pandas as pd

def compute_metrics_1(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average='binary', zero_division=0
    )
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

def sexism_classification_pipeline_task1_LoRA(
    trainInfo, devInfo, testInfo=None,
    model_name='pysentimiento/robertuito-base-uncased',
    nlabels=2,
    ptype="single_label_classification",
    **args
):
    # directorio donde se guarda el mejor modelo
    output_dir = "/content/drive/MyDrive/PRACT4/modelo_final"

    # 1) tokenizer + modelo base
    labelEnc = LabelEncoder()
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=nlabels,
        problem_type=ptype
    )

    # 2) LoRA config
    lora_config = LoraConfig(
        task_type = args.get("task_type", TaskType.SEQ_CLS),
        target_modules = args.get("target_modules", ["query","value"]),
        r = args.get("rank", 32),
        lora_alpha = args.get("lora_alpha", 16),
        lora_dropout = args.get("lora_dropout", 0.1),
        bias = args.get("bias", "none")
    )
    peft_model = get_peft_model(model, lora_config)

    # 3) datasets
    train_dataset = SexismDataset(
        trainInfo[1],
        labelEnc.fit_transform(trainInfo[2]),
        [int(x) for x in trainInfo[0]],
        tokenizer
    )
    val_dataset = SexismDataset(
        devInfo[1],
        labelEnc.transform(devInfo[2]),
        [int(x) for x in devInfo[0]],
        tokenizer
    )

    # 4) TrainingArguments
    training_args = TrainingArguments(
        report_to="none",
        output_dir=output_dir,
        num_train_epochs=args.get('num_train_epochs', 3),
        learning_rate=args.get('learning_rate', 2e-5),
        per_device_train_batch_size=args.get('per_device_train_batch_size', 32),
        per_device_eval_batch_size=args.get('per_device_eval_batch_size', 32),
        fp16=True,
        gradient_checkpointing=True,
        logging_dir=args.get('logging_dir', './logs'),
        logging_steps=args.get('logging_steps', 100),
        eval_strategy='epoch',
        save_strategy='epoch',
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model='f1'
    )

    trainer = Trainer(
        model=peft_model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics_1,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=args.get("early_stopping_patience", 2))]
    )

    # 5) Entrena y evalúa
    trainer.train()
    eval_results = trainer.evaluate()
    print("Validation Results:", eval_results)

    # 6) Guarda el mejor checkpoint + modelo final
    trainer.save_model(output_dir)                        # guarda peft_model
    mixModel = peft_model.merge_and_unload()
    mixModel.save_pretrained(output_dir)                  # guarda modelo combinado

    # 7) Si hay testInfo, predice y guarda csv
    if testInfo is not None:
        test_dataset = SexismDataset(
            testInfo[1],
            [0] * len(testInfo[1]),
            [int(x) for x in testInfo[0]],
            tokenizer
        )
        predictions = trainer.predict(test_dataset)
        preds = np.argmax(predictions.predictions, axis=1)
        df = pd.DataFrame({
            'id': testInfo[0],
            'label': labelEnc.inverse_transform(preds),
            "test_case": ["EXIST2025"] * len(preds)
        })
        df.to_csv('sexism_predictions_task1.csv', index=False)
        print("Prediction for TASK 1 completed. Results saved to sexism_predictions_task1.csv")
        return mixModel, df

    return mixModel, eval_results

# Ejemplo de ejecución:
modelname = "pysentimiento/robertuito-base-uncased"
params = {
    "num_train_epochs": 3,
    "learning_rate": 2e-5,
    "per_device_train_batch_size": 32,
    "per_device_eval_batch_size": 32,
    "logging_steps": 100,
    "rank": 32,
    "lora_alpha": 16,
    "lora_dropout": 0.1
}

mejor_modelo, resultados = sexism_classification_pipeline_task1_LoRA(
    SpTrainTask1,
    SpDevTask1,
    None,
    modelname,
    2,
    "single_label_classification",
    **params
)

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.preprocessing import LabelEncoder

# Ruta al tokenizer base y al modelo que guardamos en DRIVE
base_model = "pysentimiento/robertuito-base-uncased"
model_path = "/content/drive/MyDrive/PRACT4/modelo_final"

# Carga tokenizer y modelo
tokenizer = AutoTokenizer.from_pretrained(base_model)
model = AutoModelForSequenceClassification.from_pretrained(model_path)

# Prepara el dataset de validación
label_enc = LabelEncoder()
val_dataset = SexismDataset(
    SpDevTask1[1],
    label_enc.fit_transform(SpDevTask1[2]),
    [int(x) for x in SpDevTask1[0]],
    tokenizer
)

# Haz predicción con ambas funciones
y_pred1, y_prob1 = predict_op1(model, val_dataset, args={"per_device_eval_batch_size": 32})
y_pred2, y_prob2 = predict_op2(model, val_dataset, args={"per_device_eval_batch_size": 32})

# Verifica que ambas salidas coincidan
print("¿Las predicciones coinciden?", all(int(a)==int(b) for a, b in zip(y_pred1, y_pred2)))

### Evaluation of the results

In [None]:
preds_es = y_pred1
probs_es = y_prob1

def compute_metrics(y_true, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='binary'
    )
    acc = accuracy_score(y_true, y_pred)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall,
    }

# Calculamos métricas
metrics_es = compute_metrics(val_dataset.labels, preds_es)
print("Métricas (modelo español):", metrics_es)

# Matriz de confusión
cm_es = confusion_matrix(val_dataset.labels, preds_es)
print("\nMatriz de confusión (modelo español):")
print(cm_es)

# Reporte de clasificación
# Obtenemos los nombres originales de las clases desde el LabelEncoder
target_names_es = label_enc.inverse_transform([0, 1]).tolist()
report_es = classification_report(
    val_dataset.labels,
    preds_es,
    target_names=target_names_es,
    digits=4
)
print("\nReporte de clasificación (modelo español):")
print(report_es)


### Plot confusion matrix


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(6, 4))
sns.heatmap(
    cm_es,
    annot=True,
    fmt="d",
    cmap="Blues",
    cbar=False,
    xticklabels=target_names_es,
    yticklabels=target_names_es
)
plt.title("Matriz de confusión (modelo español)")
plt.xlabel("Predicción")
plt.ylabel("Actual")
plt.show()


### Text and probability of False positive and False negative

In [None]:
from typing import List, Tuple

def false_positive_false_negative(
    y_true: List[int],
    y_pred: List[int],
    pred_probs: List[float],
    texts: List[str],
) -> Tuple[List[Tuple[float, str]], List[Tuple[float, str]]]:
    """
    Identifica y retorna las muestras mal clasificadas de un modelo de clasificación binaria.

    Parámetros:
    - y_true: etiquetas reales (0 o 1).
    - y_pred: etiquetas predichas por el modelo (0 o 1).
    - pred_probs: probabilidades asociadas a la clase predicha.
    - texts: texto de cada muestra.

    Retorna:
    - false_positives: lista de tuplas (probabilidad, texto) para falsos positivos, ordenada por probabilidad descendente.
    - false_negatives: lista de tuplas (probabilidad, texto) para falsos negativos, ordenada por probabilidad descendente.
    """
    # Construir listas con comprensión
    false_positives = [
        (prob, text)
        for true, pred, prob, text in zip(y_true, y_pred, pred_probs, texts)
        if true == 0 and pred == 1
    ]
    false_negatives = [
        (prob, text)
        for true, pred, prob, text in zip(y_true, y_pred, pred_probs, texts)
        if true == 1 and pred == 0
    ]

    # Ordenar por probabilidad (alta a baja)
    false_positives.sort(key=lambda x: x[0], reverse=True)
    false_negatives.sort(key=lambda x: x[0], reverse=True)

    return false_positives, false_negatives

In [None]:
fpositive, fnegative = false_positive_false_negative(
    y_true=val_dataset.labels,
    y_pred=preds_es,
    pred_probs=probs_es,
    texts=val_dataset.texts
)

print("False positive:", len(fpositive))
for s in fpositive:
    print('**', s)

print("\n\nFalse negative:", len(fnegative))
for s in fnegative:
    print('**', s)


### Select some samples to analyze

You can select the samples with more confidence, those with less confidence, a mix of both, or simply the ones you consider more interesting.



**Try not to choose the same samples that I’ve selected, and Justify your decision**.


In [None]:
# Make your own selection

fpositive_samples = [
    fpositive[3][1],
    fpositive[10][1],
    fpositive[43][1]
]
fnegative_samples = [
    fnegative[1][1],
    fnegative[2][1],
    fnegative[3][1]
]

In [None]:
!pip install ferret-xai

In [None]:
from ferret import Benchmark
from IPython.display import display_html

# selecting the benchmark
bench = Benchmark(model, tokenizer)#, explainers=[s,l])
# If you have trouble with this benchmark (runtime errors), try using just those two explainers.
    #from ferret.explainers.shap import SHAPExplainer
    #from ferret.explainers.lime import LIMEExplainer
    #lexp = LIMEExplainer(model, tokenizer)
    #sexp = SHAPExplainer(model, tokenizer)
    #bench = Benchmark(model, tokenizer, explainers=[sexp,lexp])

def explain_this(benchmark, sentence, target):
    explanations = benchmark.explain(sentence, target=target)
    explanations_de = deduplicate(explanations)
    explanation_evaluations = benchmark.evaluate_explanations(explanations_de, target=target)
    print("Sentence:", sentence)
    print("Class:", target)
    tble = benchmark.show_table(explanations_de)
    tble2 = benchmark.show_evaluation_table(explanation_evaluations)
    display_html(tble.to_html(), raw=True)
    display_html(tble2.to_html(), raw=True)

### Show explanations

In [None]:
for sample in fpositive_samples:
    print("False Positive:")
    explain_this(bench, sample, 1)
    print("\n\n")

for sample in fnegative_samples:
    print("False Negative:")
    explain_this(bench, sample, 0)
    print("\n\n")

# DO IT IN ENGLISH

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 0) Montar Google Drive (si quieres persistir ahí)
# ─────────────────────────────────────────────────────────────────────────────
from google.colab import drive
drive.mount('/content/drive')

# ─────────────────────────────────────────────────────────────────────────────
# 1) Imports y configuración
# ─────────────────────────────────────────────────────────────────────────────
import os
from pathlib import Path

import torch
from torch.utils.data import Dataset
import torch.nn.functional as F

import numpy as np
import pandas as pd

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)
from peft import LoraConfig, get_peft_model, TaskType
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from collections import Counter

os.environ["WANDB_DISABLED"] = "true"

# ─────────────────────────────────────────────────────────────────────────────
# 2) Clase de Dataset
# ─────────────────────────────────────────────────────────────────────────────
class SexismDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=256):
        self.texts     = list(texts)
        self.labels    = list(labels)
        self.tokenizer = tokenizer
        self.max_length= max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        enc = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {k: v.squeeze(0) for k, v in enc.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# ─────────────────────────────────────────────────────────────────────────────
# 3) Función de métricas
# ─────────────────────────────────────────────────────────────────────────────
def compute_metrics_1(pred):
    labels = pred.label_ids
    preds  = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average='binary', zero_division=0
    )
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc, 'f1': f1, 'precision': precision, 'recall': recall}

# ─────────────────────────────────────────────────────────────────────────────
# 4) Pipeline con LoRA y weighted loss
# ─────────────────────────────────────────────────────────────────────────────
class WeightedTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    # Ahora acepta kwargs para num_items_in_batch
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights.to(logits.device))
        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

def sexism_classification_pipeline_task1_LoRA(
    trainInfo, devInfo, testInfo=None,
    model_name='cardiffnlp/twitter-roberta-base-2022-154m',
    nlabels=2,
    ptype="single_label_classification",
    **args
):
    # 1) Preparar salida y tokenizer/modelo
    output_dir = args.get("output_dir", "/content/drive/MyDrive/PRACT4/BEST_MODEL_EN")
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    base_model = AutoModelForSequenceClassification.from_pretrained(
        model_name, num_labels=nlabels, problem_type=ptype
    )

    # 2) Configurar LoRA
    lora_config = LoraConfig(
        task_type      = args.get("task_type", TaskType.SEQ_CLS),
        target_modules = args.get("target_modules", ["query", "key", "value"]),
        r              = args.get("rank", 16),
        lora_alpha     = args.get("lora_alpha", 32),
        lora_dropout   = args.get("lora_dropout", 0.05),
        bias           = args.get("bias", "none")
    )
    peft_model = get_peft_model(base_model, lora_config)

    # 3) Preparar datos y pesos de clase
    train_ids, train_texts, train_labels = trainInfo
    dev_ids,   dev_texts,   dev_labels   = devInfo

    le = LabelEncoder().fit(train_labels)
    y_train = le.transform(train_labels)
    y_dev   = le.transform(dev_labels)

    freq = Counter(y_train)
    weights = torch.tensor([1.0/freq[i] for i in range(nlabels)], dtype=torch.float)
    weights = weights / weights.sum()

    train_ds = SexismDataset(train_texts, y_train, tokenizer, max_length=args.get("max_length",256))
    dev_ds   = SexismDataset(dev_texts,   y_dev,   tokenizer, max_length=args.get("max_length",256))

    # 4) Argumentos de entrenamiento
    training_args = TrainingArguments(
        report_to="none",
        output_dir=output_dir,
        num_train_epochs=args.get('num_train_epochs', 8),
        learning_rate=args.get('learning_rate', 2e-5),
        per_device_train_batch_size=args.get('per_device_train_batch_size', 16),
        per_device_eval_batch_size=args.get('per_device_eval_batch_size', 16),
        fp16=True,
        gradient_checkpointing=True,
        logging_dir=args.get('logging_dir', './logs'),
        logging_steps=args.get('logging_steps', 50),
        eval_strategy='epoch',
        save_strategy='epoch',
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model='f1'
    )

    trainer = WeightedTrainer(
        model=peft_model,
        args=training_args,
        train_dataset=train_ds,
        eval_dataset=dev_ds,
        compute_metrics=compute_metrics_1,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=args.get("early_stopping_patience",3))],
        class_weights=weights
    )

    # 5) Entrenar y evaluar
    trainer.train()
    eval_results = trainer.evaluate()
    print("Validation Results:", eval_results)

    # 6) Merge y guardado en Drive/BEST_MODEL_EN
    trainer.save_model(output_dir)              # pesos LoRA
    merged = peft_model.merge_and_unload()
    merged.save_pretrained(output_dir)          # modelo combinado
    tokenizer.save_pretrained(output_dir)       # tokenizer

    print(f"✔ Modelo final guardado en {output_dir}")
    return merged, eval_results

# ─────────────────────────────────────────────────────────────────────────────
# 5) Ejecutar pipeline
# ─────────────────────────────────────────────────────────────────────────────
params = {
    "num_train_epochs":            8,
    "learning_rate":               2e-5,
    "per_device_train_batch_size": 16,
    "per_device_eval_batch_size":  16,
    "logging_steps":               50,
    "rank":                        16,
    "lora_alpha":                  32,
    "lora_dropout":                0.05,
    "early_stopping_patience":     3,
    "target_modules":             ["query", "key", "value"],
    "output_dir":                 "/content/drive/MyDrive/PRACT4/BEST_MODEL_EN",
    "max_length":                 256
}

best_model, results = sexism_classification_pipeline_task1_LoRA(
    EnTrainTask1,
    EnDevTask1,
    testInfo=None,
    model_name="cardiffnlp/twitter-roberta-base-2022-154m",
    nlabels=2,
    ptype="single_label_classification",
    **params
)

print("Pipeline completado. Métricas:", results)

In [None]:
# 1) Monta tu Drive (si no lo has hecho aún)

# 2) Define dónde está tu modelo merged
model_dir = "/content/drive/MyDrive/PRACT4/BEST_MODEL_EN"

# 3) Carga tokenizer, config y modelo
from transformers import AutoTokenizer, AutoConfig, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained(model_dir)
config    = AutoConfig.from_pretrained(model_dir)
model     = AutoModelForSequenceClassification.from_pretrained(model_dir, config=config)

# 4) Prepara tu Dataset de validación
from sklearn.preprocessing import LabelEncoder

label_enc = LabelEncoder().fit(EnTrainTask1[2])

val_dataset = SexismDataset(
    texts     = EnDevTask1[1],
    labels    = label_enc.transform(EnDevTask1[2]),
    tokenizer = tokenizer,
    max_length=256
)

# 5) Corre la predicción
preds, probs = predict_op1(model, val_dataset)

# 6) Verifica que ambas funciones coincidan
y_pred1, _ = predict_op1(model, val_dataset)
y_pred2, _ = predict_op2(model, val_dataset)
print("¿Predicciones idénticas?", all(x1 == x2 for x1, x2 in zip(y_pred1, y_pred2)))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    precision_recall_fscore_support,
    accuracy_score,
    confusion_matrix,
    classification_report
)

# 1) Recupera las etiquetas verdaderas (transformadas)
#    Asumiendo que `label_enc` ya está entrenado sobre EnTrainTask1[2]
y_true = label_enc.transform(EnDevTask1[2])

# 2) Tus predicciones (de predict_op1)
#    `preds` ya viene como un array de enteros
y_pred = preds

# 3) Función de métricas
def compute_metrics_adj(y_true, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='binary', zero_division=0
    )
    acc = accuracy_score(y_true, y_pred)
    return {
        'accuracy':  acc,
        'precision': precision,
        'recall':    recall,
        'f1':         f1
    }

metrics = compute_metrics_adj(y_true, y_pred)
print("Overall metrics:")
for k, v in metrics.items():
    print(f"  {k}: {v:.4f}")

# 4) Matriz de confusión
cm = confusion_matrix(y_true, y_pred)
print("\nConfusion matrix:")
print(cm)

# 5) Reporte de clasificación
report = classification_report(
    y_true, y_pred,
    target_names=label_enc.inverse_transform([0, 1]),
    digits=4
)
print("\nClassification report:")
print(report)

# 6) Visualización con heatmap
plt.figure(figsize=(6, 5))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    cbar=False,
    xticklabels=label_enc.inverse_transform([0, 1]),
    yticklabels=label_enc.inverse_transform([0, 1])
)
plt.title("Confusion Matrix")
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.tight_layout()
plt.show()

In [None]:
from typing import List, Tuple

def false_positive_false_negative(
    y_true: List[int],
    y_pred: List[int],
    pred_probs: List[float],
    texts: List[str],
) -> Tuple[List[Tuple[float, str]], List[Tuple[float, str]]]:
    """
    Identifica y retorna las muestras mal clasificadas de un modelo de clasificación binaria.

    Parámetros:
    - y_true: etiquetas reales (0 o 1).
    - y_pred: etiquetas predichas por el modelo (0 o 1).
    - pred_probs: probabilidades asociadas a la clase predicha.
    - texts: texto de cada muestra.

    Retorna:
    - false_positives: lista de tuplas (probabilidad, texto) para falsos positivos, ordenada por probabilidad descendente.
    - false_negatives: lista de tuplas (probabilidad, texto) para falsos negativos, ordenada por probabilidad descendente.
    """
    # Construir listas con comprensión
    false_positives = [
        (prob, text)
        for true, pred, prob, text in zip(y_true, y_pred, pred_probs, texts)
        if true == 0 and pred == 1
    ]
    false_negatives = [
        (prob, text)
        for true, pred, prob, text in zip(y_true, y_pred, pred_probs, texts)
        if true == 1 and pred == 0
    ]

    # Ordenar por probabilidad (alta a baja)
    false_positives.sort(key=lambda x: x[0], reverse=True)
    false_negatives.sort(key=lambda x: x[0], reverse=True)

    return false_positives, false_negatives

In [None]:
# 1) Ejecuta la predicción
preds, probs = predict_op1(model, val_dataset)

# 2) Construye la lista pred_probs manejando arrays escalares o vectores
pred_probs = []
for i, pred in enumerate(preds):
    p = probs[i]
    if hasattr(p, "__len__") and len(p) > 1:
        # p es [p(no), p(yes)]
        pred_probs.append(float(p[pred]))
    else:
        # p es escalar = P(yes)
        p = float(p)
        pred_probs.append(p if pred == 1 else 1.0 - p)

# 3) Llama a tu función adaptada
fpositive_en, fnegative_en = false_positive_false_negative(
    y_true     = val_dataset.labels,   # List[int]
    y_pred     = preds.tolist() if isinstance(preds, np.ndarray) else preds,
    pred_probs = pred_probs,
    texts      = val_dataset.texts
)

# 4) Imprime resultados
print("False positive:", len(fpositive_en))
for prob, text in fpositive_en:
    print(f"** [p={prob:.3f}] {text}")

print("\n\nFalse negative:", len(fnegative_en))
for prob, text in fnegative_en:
    print(f"** [p={prob:.3f}] {text}")

In [None]:
from ferret import Benchmark
from IPython.display import display_html

# selecting the benchmark
bench = Benchmark(model, tokenizer)#, explainers=[s,l])
# If you have trouble with this benchmark (runtime errors), try using just those two explainers.
    #from ferret.explainers.shap import SHAPExplainer
    #from ferret.explainers.lime import LIMEExplainer
    #lexp = LIMEExplainer(model, tokenizer)
    #sexp = SHAPExplainer(model, tokenizer)
    #bench = Benchmark(model, tokenizer, explainers=[sexp,lexp])

def explain_this(benchmark, sentence, target):
    explanations = benchmark.explain(sentence, target=target)
    explanations_de = deduplicate(explanations)
    explanation_evaluations = benchmark.evaluate_explanations(explanations_de, target=target)
    print("Sentence:", sentence)
    print("Class:", target)
    tble = benchmark.show_table(explanations_de)
    tble2 = benchmark.show_evaluation_table(explanation_evaluations)
    display_html(tble.to_html(), raw=True)
    display_html(tble2.to_html(), raw=True)

In [None]:
# Selecciona solo los ejemplos que quieres explicar
fpositive_samples = [fpositive_en[3][1], fpositive_en[17][1], fpositive_en[21][1]]
fnegative_samples = [fnegative_en[5][1], fnegative_en[8][1], fnegative_en[14][1]]

# Explain para esos falsos positivos concretos
for idx, text in zip([3, 17, 21], fpositive_samples):
    prob = fpositive_en[idx][0]
    print(f"False Positive sample {idx} [p={prob:.3f}]:")
    explain_this(bench, text, 1)
    print("\n\n")

# Explain para esos falsos negativos concretos
for idx, text in zip([5, 8, 14], fnegative_samples):
    prob = fnegative_en[idx][0]
    print(f"False Negative sample {idx} [p={prob:.3f}]:")
    explain_this(bench, text, 0)
    print("\n\n")
