# Google Drive Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
base_dir = '/content/drive/MyDrive/AI_Detection/'

# Import Libraries

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import AdamW
import numpy as np
import pandas as pd
from datasets import concatenate_datasets, Dataset
from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Dataset Import

In [None]:
# dataset_original = pd.read_csv(base_dir + 'aiDetect.csv')
# dataset_original.to_pickle(base_dir + 'aiDetect_pickle.pkl')

In [None]:
df_original = pd.read_pickle(base_dir + 'aiDetect_pickle.pkl')
df_original.rename(columns={"text": "sentence", "label": "labels"}, inplace=True)

In [None]:
# short_length = 10000
# df_0 = df_original[df_original['labels'] == 0].sample(int(short_length / 2), random_state=1)
# df_1 = df_original[df_original['labels'] == 1].sample(int(short_length / 2), random_state=1)
# df_short = pd.concat([df_0, df_1], axis=0).reset_index(drop=True)

# Model

In [None]:
model_id = 'bert-base-uncased'

# Loss Functions

## Classification Losses - Single sentence column and sentiment label

### Cross-Entropy Loss

In [None]:
def cross_entropy_loss(logits, labels):
    return nn.CrossEntropyLoss()(logits, labels)

### Label Smoothing Cross-Entropy Loss

In [None]:
def label_smoothing_cross_entropy_loss(logits, labels, smoothing=0.1):
    confidence = 1.0 - smoothing
    log_probs = F.log_softmax(logits, dim=-1)

    # Initializing true distribution with smoothing value for all classes...
    true_dist = torch.full_like(log_probs, smoothing / (log_probs.size(1) - 1))
    # Setting the true label confidence in the correct class...
    true_dist.scatter_(1, labels.unsqueeze(1), confidence)

    loss = torch.mean(torch.sum(-true_dist * log_probs, dim=-1))
    return loss

## Embedding Losses

### Pairs - 2 sentence columns and label

#### Pair Generation

In [None]:
def generate_pairs(embeddings, labels):
    embedding1_list = []
    embedding2_list = []
    similarity_labels = []

    for i in range(len(labels)):
        for j in range(i + 1, len(labels)):
            embedding1 = embeddings[i]
            embedding2 = embeddings[j]

            # If the labels are the same, labeling the pair as 1 (similar)...
            if labels[i] == labels[j]:
                similarity_labels.append(1)
            else:
                # If the labels are different, labeling the pair as 0 (dissimilar)...
                similarity_labels.append(0)

            embedding1_list.append(embedding1)
            embedding2_list.append(embedding2)

    embedding1_tensor = torch.stack(embedding1_list)
    embedding2_tensor = torch.stack(embedding2_list)
    labels_tensor = torch.tensor(similarity_labels).to(labels.device)

    return embedding1_tensor, embedding2_tensor, labels_tensor

#### CoSENT Loss

In [None]:
def cosent_loss(embeddings, labels, tau=20.0):
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    # Input preparation...
    labels = (labels[:, None] < labels[None, :]).float()

    # Normalization of Logits...
    embedding1 = F.normalize(embedding1, p=2, dim=1)
    embedding2 = F.normalize(embedding2, p=2, dim=1)

    # Cosine Similarity Calculation...
    # The dot product of these pairs gives the cosine similarity, scaled by a factor of tau to control the sharpness of similarity scores...
    y_pred = torch.sum(embedding1 * embedding2, dim=1) * tau

    # Pairwise cosine similarity difference calculation...
    y_pred = y_pred[:, None] - y_pred[None, :]

    y_pred = (y_pred - (1 - labels) * 1e12).view(-1)

    zero = torch.Tensor([0]).to(y_pred.device)
    y_pred = torch.concat((zero, y_pred), dim=0)
    return torch.logsumexp(y_pred, dim=0)

#### In-Batch Negatives Loss

In [None]:
def categorical_crossentropy(y_true: torch.Tensor, y_pred: torch.Tensor) -> torch.Tensor:
    return -(F.log_softmax(y_pred, dim=1) * y_true).sum(dim=1)

def in_batch_negative_loss(embeddings,
                           labels,
                           tau: float = 20.0,
                           negative_weights: float = 0.0) -> torch.Tensor:
    device = labels.device
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    y_pred = torch.empty((2 * embedding1.shape[0], embedding1.shape[1]), device=device)
    y_pred[0::2] = embedding1
    y_pred[1::2] = embedding2
    y_true = labels.repeat_interleave(2).unsqueeze(1)

    def make_target_matrix(y_true: torch.Tensor):
        idxs = torch.arange(0, y_pred.shape[0]).int().to(device)
        y_true = y_true.int()
        idxs_1 = idxs[None, :]
        idxs_2 = (idxs + 1 - idxs % 2 * 2)[:, None]

        idxs_1 *= y_true.T
        idxs_1 += (y_true.T == 0).int() * -2

        idxs_2 *= y_true
        idxs_2 += (y_true == 0).int() * -1

        y_true = (idxs_1 == idxs_2).float()
        return y_true

    neg_mask = make_target_matrix(y_true == 0)

    y_true = make_target_matrix(y_true)

    y_pred = F.normalize(y_pred, dim=1, p=2)
    similarities = y_pred @ y_pred.T
    similarities = similarities - torch.eye(y_pred.shape[0]).to(device) * 1e12
    similarities = similarities * tau

    if negative_weights > 0:
        similarities += neg_mask * negative_weights

    return categorical_crossentropy(y_true, similarities).mean()

#### Angle Loss

In [None]:
def angle_loss(embeddings, labels, tau=1.0):
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    # Input preparation...
    labels = (labels[:, None] < labels[None, :]).float()

    # Chunking into real and imaginary parts...
    y_pred_re1, y_pred_im1 = torch.chunk(embedding1, 2, dim=1)
    y_pred_re2, y_pred_im2 = torch.chunk(embedding2, 2, dim=1)

    a = y_pred_re1
    b = y_pred_im1
    c = y_pred_re2
    d = y_pred_im2

    z = torch.sum(c**2 + d**2, dim=1, keepdim=True)
    re = (a * c + b * d) / z
    im = (b * c - a * d) / z

    dz = torch.sum(a**2 + b**2, dim=1, keepdim=True)**0.5
    dw = torch.sum(c**2 + d**2, dim=1, keepdim=True)**0.5
    re /= (dz / dw)
    im /= (dz / dw)

    y_pred = torch.concat((re, im), dim=1)
    y_pred = torch.abs(torch.sum(y_pred, dim=1)) * tau
    y_pred = y_pred[:, None] - y_pred[None, :]
    y_pred = (y_pred - (1 - labels) * 1e12).view(-1)
    zero = torch.Tensor([0]).to(y_pred.device)
    y_pred = torch.concat((zero, y_pred), dim=0)
    return torch.logsumexp(y_pred, dim=0)

#### Combination of CoSENT, In-Batch Negatives and Angle Losses

In [None]:
def cosent_ibn_angle(embeddings, labels, tau_cosent=20.0, tau_ibn=20.0, tau_angle=1.0):
    return cosent_loss(embeddings, labels, tau_cosent) + in_batch_negative_loss(embeddings, labels, tau_ibn) + angle_loss(embeddings, labels, tau_angle)

## Loss List

In [None]:
losses = [
    {'loss_name': 'without_ft', 'model_type': 'emb', 'loss_kwargs': {}},
    {'loss_name': 'cross_entropy_loss', 'model_type': 'clf', 'loss_kwargs': {}},
    # {'loss_name': 'cosent_loss', 'model_type': 'emb', 'loss_kwargs': {'tau': 20.0}},
    # {'loss_name': 'in_batch_negative_loss', 'model_type': 'emb', 'loss_kwargs': {'tau': 20.0}},
    # {'loss_name': 'angle_loss', 'model_type': 'emb', 'loss_kwargs': {'tau': 1.0}},
    {'loss_name': 'cosent_ibn_angle', 'model_type': 'emb', 'loss_kwargs': {'tau_cosent': 20.0, 'tau_ibn': 20.0, 'tau_angle': 1.0}}
]

# Training

### Training Preparation

#### Device Setting

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

#### Dataset Preparation

In [None]:
def prepare_dataset(df, columns):
    # Dataset Import...
    ds = Dataset.from_pandas(df[columns])

    # Random Split...
    train_test_split = ds.train_test_split(test_size=0.30)
    train_dataset = train_test_split['train']
    test_dataset = train_test_split['test']
    return train_dataset, test_dataset

#### Model and Tokenizer Preparation

In [None]:
def get_model_tokenizer(model_id, type='clf'):
    if type == 'clf':
        model = AutoModelForSequenceClassification.from_pretrained(model_id)
    else:
        model = AutoModel.from_pretrained(model_id)
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model.to(device)
    return model, tokenizer

#### Dataset Tokenization and Batch Processing

In [None]:
def tokenize_dataset_batch(train_dataset, test_dataset, tokenizer, batch_size):
    def tokenize_function(examples):
        return tokenizer(examples['sentence'], padding='max_length', truncation=True, max_length=128)

    train_dataset = train_dataset.map(tokenize_function, batched=True)
    test_dataset = test_dataset.map(tokenize_function, batched=True)

    # Torch format setting...
    train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
    test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

    # Batching using DataLoader...
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    return train_loader, test_loader

#### Embedding Extraction

In [None]:
def extract_embeddings(model, device, dataloader):
    all_embeddings = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Extracting embeddings", leave=False):
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(input_ids=batch['input_ids'], attention_mask=batch['attention_mask'])

            # [CLS] token embeddings...
            embeddings = outputs.last_hidden_state[:, 0, :]
            all_embeddings.append(embeddings.cpu())
            all_labels.append(batch['labels'].cpu())

    return torch.cat(all_embeddings), torch.cat(all_labels)

#### Train and Evaluation Drivers

In [None]:
def train(model, train_loader, model_type='clf', epochs=10, loss_name='cross_entropy_loss', **loss_kwargs):
    # Optimizer setting...
    optimizer = AdamW(model.parameters(), lr=5e-5)

    # Training loop...
    num_epochs = epochs
    model.train()
    for epoch in range(num_epochs):
        # print(f"Epoch {epoch + 1}/{num_epochs}")
        for batch in tqdm(train_loader, desc="Training", leave=False):
            batch = {k: v.to(device) for k, v in batch.items()}

            if model_type == 'clf':
                # Cross-Entropy Losses...
                outputs = model(**batch)
                logits = outputs.logits
                loss = globals()[loss_name](logits, batch['labels'], **loss_kwargs)
            else:
                # Embedding Loss...
                outputs = model(input_ids=batch['input_ids'], attention_mask=batch['attention_mask'])

                # [CLS] token embedding...
                embeddings = outputs.last_hidden_state[:, 0, :]
                loss = globals()[loss_name](embeddings, batch['labels'], **loss_kwargs)
                if loss == 0.0:
                    continue

            # Backpropagation...
            loss.backward()

            # Updating weights...
            optimizer.step()
            optimizer.zero_grad()
    return model

In [None]:
def evaluate_clf(model, test_loader):
    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in test_loader:
            batch = {k: v.to(device) for k, v in batch.items()}

            # Forward pass...
            outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            total_correct += (predictions == batch['labels']).sum().item()
            total_samples += batch['labels'].size(0)

    accuracy = total_correct / total_samples
    return accuracy

In [None]:
def evaluate_emb(model, train_loader, test_loader):
    model.eval()

    # Generating embeddings of the train and test sentences...
    train_embeddings, train_labels = extract_embeddings(model, device, train_loader)
    test_embeddings, test_labels = extract_embeddings(model, device, test_loader)

    train_embeddings_np = train_embeddings.numpy()
    test_embeddings_np = test_embeddings.numpy()
    train_labels_np = train_labels.numpy()
    test_labels_np = test_labels.numpy()

    # Training a Logistic Regression classifier on the training embeddings...
    lr_clf = LogisticRegression(max_iter=10000)
    lr_clf.fit(train_embeddings_np, train_labels_np)

    # Predicting the labels for the test set...
    test_predictions = lr_clf.predict(test_embeddings_np)
    accuracy = accuracy_score(test_labels_np, test_predictions)
    return accuracy

### Loop

In [None]:
total_runs = 3
batch_size = 60
accuracy_list = []
for loss in losses:
    loss_name = loss['loss_name']
    model_type = loss['model_type']
    loss_kwargs = loss['loss_kwargs']

    total_accuracy = 0.
    for loop_count in range(0, total_runs):
        # Dataset Preparation...
        train_dataset, test_dataset = prepare_dataset(df_original, ['sentence', 'labels'])

        # Model Preparation...
        model, tokenizer = get_model_tokenizer(model_id, model_type)

        # Tokenize Batch...
        train_loader, test_loader = tokenize_dataset_batch(train_dataset, test_dataset, tokenizer, batch_size=batch_size)

        # Training Loop...
        if loss_name != 'without_ft':
            model = train(model, train_loader, model_type, epochs=10, loss_name=loss_name, **loss_kwargs)

        # Evaluation loop...
        if model_type == 'clf':
            accuracy = evaluate_clf(model, test_loader)
        else:
            accuracy = evaluate_emb(model, train_loader, test_loader)
        total_accuracy += accuracy
    accuracy_list.append({'loss': loss_name, 'accuracy': total_accuracy / total_runs})



Map:   0%|          | 0/3500 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]



In [None]:
accuracy_list

[{'loss': 'without_ft', 'accuracy': 0.8846666666666667},
 {'loss': 'cross_entropy_loss', 'accuracy': 0.9216827392819182},
 {'loss': 'cosent_ibn_angle', 'accuracy': 0.9799182733627181}]