4. Modelos Contrastivos (CLIP-like)
Entrena un modelo basado en contraste, donde el objetivo es minimizar la distancia entre embeddings de texto humano y maximizar la distancia entre humano y generado por IA.

Ventajas:

Permite aprender representaciones robustas.
Se puede usar junto con un clasificador simple para la predicción final.
Ejemplo de entrenamiento contrastivo:

# Parameters

In [None]:
EPOCHS = 5
LEARNING_RATE = 0.0001
BATCH_SIZE = 256
LAYERS_TO_TRAIN = 1

In [None]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
ai_generated_path = "pan24-generative-authorship-news/machines"
human_path = "pan24-generative-authorship-news/human.jsonl"

In [None]:
import warnings
import logging

warnings.filterwarnings("ignore", message=".*overflowing tokens.*")
logging.disable(logging.WARNING)

## Libraries

In [None]:
import os
import json
import pandas as pd

## Import data

In [None]:
model, id, text = [], [], []

# Loop through every file in the directory
for filename in os.listdir(ai_generated_path):
    # Check if the file is a JSONL file
    if filename.endswith('.jsonl'):
        filepath = os.path.join(ai_generated_path, filename)
        with open(filepath, 'r', encoding='utf-8') as jsonl_file:
            for line in jsonl_file:
                # Each line is a separate JSON object
                data = json.loads(line)
                model.append(filename)
                id.append(data['id'])
                text.append(data['text'])

df_generated = pd.DataFrame({'model': model, 'id': id, 'text': text, 'ai_generated': 1})
df_generated

In [None]:
id, text = [], []

with open(human_path, 'r', encoding='utf-8') as jsonl_file:
    for line in jsonl_file:
        # Each line is a separate JSON object
        data = json.loads(line)
        id.append(data['id'])
        text.append(data['text'])

df_human = pd.DataFrame({'model': 'Human', 'id': id, 'text': text, 'ai_generated': 0})
df_human

In [None]:
df = pd.concat([df_generated, df_human])[['text', 'ai_generated', 'id']]
df

# Process Data - Combinaciones únicamente del mismo id

In [None]:
from sklearn.model_selection import train_test_split
import pandas as pd

test_size = 0.25
val_size = 0.125
_adjusted_val_size = val_size / (1 - test_size)

# Extraer el segundo y tercer segmento de los IDs
df['base_id'] = df['id'].apply(lambda x: '/'.join(x.split('/')[1:]))  # Coger los ids sin la parte que identifica al autor del fragmento de texto.

# Paso 1: Dividir los datos según los `base_id`
base_ids = df['base_id'].unique()
train_base_ids, test_base_ids = train_test_split(base_ids, test_size=test_size, random_state=1337)
train_base_ids, val_base_ids = train_test_split(train_base_ids, test_size=_adjusted_val_size, random_state=1337) 

# Crear DataFrames por conjunto
train = df[df['base_id'].isin(train_base_ids)]
val = df[df['base_id'].isin(val_base_ids)]
test = df[df['base_id'].isin(test_base_ids)]

train.reset_index(drop=True, inplace=True)
val.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)

print(f"train shape: {train.shape}")
print(f"val shape: {val.shape}")
print(f"test shape: {test.shape}")

In [None]:
def create_combinations_within_id(df):
    # Lista para almacenar las combinaciones de cada `base_id`
    combinations = []
    
    # Iterar sobre cada `base_id`
    for base_id, group in df.groupby('base_id'):
        # Filtrar textos humanos e IA dentro del grupo
        df_human = group[group['ai_generated'] == 0][['text']].reset_index(drop=True)
        df_ia = group[group['ai_generated'] == 1][['text']].reset_index(drop=True)
        
        # Producto cartesiano dentro del `base_id`
        cartesian_df = df_human.merge(df_ia, how='cross', suffixes=('_human', '_ia'))
        cartesian_df = cartesian_df.sample(frac=1).reset_index(drop=True)
        
        # Crear las dos disposiciones
        total_combinations = len(cartesian_df)
        
        half_1 = cartesian_df.iloc[:total_combinations // 2].copy()
        half_1['comment_text_1'] = half_1['text_human']
        half_1['comment_text_2'] = half_1['text_ia']
        half_1['list'] = 0  # Etiqueta 0
        
        half_2 = cartesian_df.iloc[total_combinations // 2:].copy()
        half_2['comment_text_1'] = half_2['text_ia']
        half_2['comment_text_2'] = half_2['text_human']
        half_2['list'] = 1  # Etiqueta 1
        
        # Combinar y agregar al resultado final
        balanced_df = pd.concat([half_1, half_2], ignore_index=True)
        combinations.append(balanced_df)
    
    # Concatenar todas las combinaciones y barajar
    return pd.concat(combinations, ignore_index=True).sample(frac=1).reset_index(drop=True)


In [None]:
# Generar combinaciones restringidas por `id` para cada conjunto
train = create_combinations_within_id(train)
val = create_combinations_within_id(val)
test = create_combinations_within_id(test)

In [None]:
# Print the dimensions
print(f"train shape: {train.shape} / Text on comment_text_1 is human-generated: {train['list'].value_counts()[0]} - Text on comment_text_2 is human-generated: {train['list'].value_counts()[1]}")
print(f"val shape: {val.shape} / Text on comment_text_1 is human-generated: {val['list'].value_counts()[0]} - Text on comment_text_2 is human-generated: {val['list'].value_counts()[1]}")
print(f"test shape: {test.shape} / Text on comment_text_1 is human-generated: {test['list'].value_counts()[0]} - Text on comment_text_2 is human-generated: {test['list'].value_counts()[1]}")

# Model

In [None]:
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import torch
from transformers import BertTokenizer, BertModel

# Tokenizador y modelo
model_name = "Lau123/distilbert-base-uncased-detect_ai_generated_text"
# tokenizer = DistilBertTokenizer.from_pretrained(model_name)
# individual_model = DistilBertForSequenceClassification.from_pretrained(model_name, num_labels=2)
tokenizer = BertTokenizer.from_pretrained(model_name)
individual_model =  BertModel.from_pretrained(model_name).from_pretrained(model_name, num_labels=2)

# Configuración del dispositivo y optimizador
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# individual_model.to(device)

In [None]:
# # Freeze all layers except the classifier layer
# for name, param in individual_model.named_parameters():
#     if name != "classifier.weight" and name != "classifier.bias":
#         param.requires_grad = False

for param in individual_model.parameters():
    param.requires_grad = False
if LAYERS_TO_TRAIN > 0:
    for layer in individual_model.encoder.layer[-LAYERS_TO_TRAIN:]:
        for param in layer.parameters():
            param.requires_grad = True

for param in individual_model.pooler.dense.parameters():
    param.requires_grad = True

# Verify that only the classifier layer is trainable
for name, param in individual_model.named_parameters():
    print(f"{name}: requires_grad = {param.requires_grad}")

In [None]:
from transformers import AdamW

loss_fn = torch.nn.BCELoss()
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

## Contrastivo

In [None]:
from torch.nn.functional import cosine_similarity

def contrastive_loss(embeddings1, embeddings2, labels, margin=0.5):
    sim = cosine_similarity(embeddings1, embeddings2)
    loss = torch.mean(labels * (1 - sim) + (1 - labels) * torch.clamp(sim - margin, min=0))
    return loss

In [None]:
from transformers import BertModel
import torch
import torch.nn.functional as F

class TransformerContrastive(torch.nn.Module):
    def __init__(self, bert_model):
        super(TransformerContrastive, self).__init__()
        self.l1 = bert_model  # Modelo BERT o similar
        self.l2 = torch.nn.Linear(768, 768)  # Proyección del embedding
        self.l3 = torch.nn.Dropout(0.1)

    def forward(self, ids_0, mask_0, token_type_ids_0, ids_1, mask_1, token_type_ids_1):
        # Generar embeddings para ambos textos
        embed_a = self.l1(ids_0, attention_mask=mask_0, token_type_ids=token_type_ids_0).last_hidden_state[:, 0]
        embed_b = self.l1(ids_1, attention_mask=mask_1, token_type_ids=token_type_ids_1).last_hidden_state[:, 0]

        # Proyección a un espacio latente (opcional, pero puede mejorar resultados)
        embed_a = F.gelu(self.l3(self.l2(embed_a)))
        embed_b = F.gelu(self.l3(self.l2(embed_b)))

        return embed_a, embed_b  # Devuelve los embeddings de ambos textos

# Inicializar modelo
model = TransformerContrastive(individual_model)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
from transformers import AdamW

class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, embed_a, embed_b, label):
        # Distancia euclidiana entre embeddings
        distance = torch.norm(embed_a - embed_b, p=2, dim=1)

        # Pérdida contrastiva
        loss = label * distance.pow(2) + (1 - label) * F.relu(self.margin - distance).pow(2)
        return loss.mean()

# loss_fn = torch.nn.BCELoss()
loss_fn = ContrastiveLoss()
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

# Data Generators

In [None]:
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.comment_text_1 = dataframe.comment_text_1
        self.comment_text_2 = dataframe.comment_text_2
        self.targets = self.data.list
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        comment_text_1 = str(self.comment_text_1[index])
        comment_text_1 = " ".join(comment_text_1.split())
        comment_text_2 = str(self.comment_text_2[index])
        comment_text_2 = " ".join(comment_text_2.split())
        inputs0 = self.tokenizer(comment_text_1, 
                                comment_text_2, 
                                max_length=self.max_len,
                                padding="max_length",
                                truncation=True,
                                # truncation="only_second",
                                # truncation="only_first",
                                # truncation="longest_first",
                                return_overflowing_tokens=False,
                                return_token_type_ids=True,)
                                # return_overflowing_tokens=True)
                                # return_overflowing_tokens=False)
        inputs1 = self.tokenizer(comment_text_1,
                                max_length=self.max_len,
                                padding="max_length",
                                truncation=True,
                                return_overflowing_tokens=False,
                                return_token_type_ids=True,)
        return {
            'ids_0': torch.tensor(inputs0.input_ids, dtype=torch.long),
            'mask_0': torch.tensor(inputs0.attention_mask, dtype=torch.long),
            'token_type_ids_0': torch.tensor(inputs0.token_type_ids, dtype=torch.long),
            'ids_1': torch.tensor(inputs1.input_ids, dtype=torch.long),
            'mask_1': torch.tensor(inputs1.attention_mask, dtype=torch.long),
            'token_type_ids_1': torch.tensor(inputs1.token_type_ids, dtype=torch.long),
            'labels': torch.tensor(self.targets[index], dtype=torch.long)
          }


In [None]:
# Instancia el dataset
train_dataset = CustomDataset(dataframe=train, tokenizer=tokenizer, max_len=512)
val_dataset = CustomDataset(dataframe=val, tokenizer=tokenizer, max_len=512)
test_dataset = CustomDataset(dataframe=test, tokenizer=tokenizer, max_len=512)

In [None]:
from torch.utils.data import DataLoader

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Entrenamiento y validación

In [None]:
def c_at_1(targets, preds):
    """
    Calculates the C@1 metric:
    - Non-answers (predictions marked as -1) are given a score of 0.5.
    - Remaining cases are scored based on accuracy.
    
    Parameters:
        targets (np.array): Ground truth labels.
        preds (np.array): Predictions, where -1 indicates a non-answer.
    
    Returns:
        float: C@1 metric.
    """
    correct = (targets == preds)  # Boolean array for correct predictions
    unanswered = preds == -1     # Boolean array for non-answers
    
    num_correct = correct.sum()
    num_total = len(targets)
    num_unanswered = unanswered.sum()
    
    return (num_correct + num_unanswered * 0.5) / num_total

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score, brier_score_loss, fbeta_score
import numpy as np

# Función de entrenamiento
def train_epoch(model, loader, optimizer, loss_fn, device):
    model.train()
    total_loss = 0
    for i, batch in enumerate(loader):
        print(f"Batch {i+1}/{len(loader)}")
        labels = batch['labels'].unsqueeze(1).to(device).float()
        batch = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        embed_a, embed_b = model(**batch)

        loss = loss_fn(embed_a, embed_b, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(loader)

# Función de evaluación
def evaluate(model, loader, prototype_human, device):
    model.eval()
    preds, targets, probabilities = [], [], []

    with torch.no_grad():
        for batch in loader:
            labels = batch['labels'].unsqueeze(1).float()
            batch = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            embed_a, embed_b = model(**batch)
            similarity_a_human = torch.cosine_similarity(embed_a, prototype_human.unsqueeze(0), dim=1)
            similarity_b_human = torch.cosine_similarity(embed_b, prototype_human.unsqueeze(0), dim=1)
            predictions = (similarity_a_human <= similarity_b_human).int()  # 1 for sim_b > sim_a, else 0
            prob = similarity_b_human - similarity_a_human
            preds.extend(predictions.cpu().numpy())
            targets.extend(labels.cpu().numpy())
            probabilities.extend(prob.cpu().numpy())
    
    targets = np.array(targets).flatten()
    preds = np.array(preds).flatten()
    probabilities = np.array(probabilities).flatten()


    # Calculate metrics
    roc_auc = roc_auc_score(targets, probabilities)
    brier = brier_score_loss(targets, probabilities)
    f1 = f1_score(targets, preds)
    f05u = fbeta_score(targets, preds, beta=0.5)
    c1 = c_at_1(targets, preds)
    mean = np.mean([roc_auc, brier, c1, f1, f05u])
    
    return {
        "accuracy": accuracy_score(targets, preds),
        "roc-auc": roc_auc,
        "brier": brier,
        "c@1": c1,
        "f1": f1,
        "f05u": f05u,
        "mean": mean,
    }

In [None]:
import gc  # Para recolección de basura

human_text_tuples = [(text, text, 0) for text in train['text_human']]
human_df = pd.DataFrame(human_text_tuples, columns=['comment_text_1', 'comment_text_2', 'list'])
human_dataset = CustomDataset(dataframe=human_df, tokenizer=tokenizer, max_len=512)
human_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE)

human_embeddings = []
for i, batch in enumerate(human_loader):
    print(f"Batch {i+1}/{len(human_loader)}")
    labels = batch['labels'].unsqueeze(1).to(device).float()
    batch = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
    embed_a, _ = model(**batch)
    human_embeddings.extend(embed_a.cpu())

stacked = torch.stack(human_embeddings)
prototype_human = stacked.mean(dim=0).to(device)


del human_embeddings, batch, embed_a, labels
torch.cuda.empty_cache()
gc.collect()

In [None]:
history = {
    "train_loss": [],
    "train_metrics": [],
    "val_metrics": []
}

save_path = f"models/models_contrastive/fine_tuned_model_{EPOCHS}_epochs_{LEARNING_RATE}_lr_{LAYERS_TO_TRAIN}_layers_{BATCH_SIZE}_batch_size"

for epoch in range(EPOCHS):
    print(f"Starting Epoch {epoch + 1}/{EPOCHS}")
    print("* Training")
    train_loss = train_epoch(model, train_loader, optimizer, loss_fn, device)

    print("* Saving model")
    _epoch_save_path = f"{save_path}_checkoint_{epoch + 1}.pth"
    torch.save(model, _epoch_save_path)

    print("* Calculating metrics for training")
    train_metrics = evaluate(model, train_loader, prototype_human, device)
    print("* Calculating metrics for validation")
    val_metrics = evaluate(model, val_loader, prototype_human, device)

    history["train_loss"].append(train_loss)
    history["train_metrics"].append(train_metrics)
    history["val_metrics"].append(val_metrics)

    print(f"Epoch {epoch + 1}/{EPOCHS}")
    print(f"Train Loss: {train_loss:.4f}")
    print("Train Metrics:")
    for metric_name, value in train_metrics.items():
        print(f"  {metric_name}: {value:.4f}")
    print("Validation Metrics:")
    for metric_name, value in val_metrics.items():
        print(f"  {metric_name}: {value:.4f}")

In [None]:
# Evaluación final en el conjunto de prueba
test_accuracy = evaluate(model, test_loader, device)
print(f"Test Accuracy: {test_accuracy:.4f}")