Building A GAN-Based AI Text Detector

In [8]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import random
import string

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

from transformers import BertTokenizer, BertForSequenceClassification
from transformers import BertConfig
from transformers.models.bert.modeling_bert import BertEncoder
from sklearn.metrics import roc_auc_score

#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [9]:
train = pd.read_csv('/Users/patash/PSTB/Week_6_LLM/day_4/llm-detect-ai-generated-text/train_essays.csv')
test = pd.read_csv('/Users/patash/PSTB/Week_6_LLM/day_4/llm-detect-ai-generated-text/test_essays.csv')
prompt = pd.read_csv('/Users/patash/PSTB/Week_6_LLM/day_4/llm-detect-ai-generated-text/train_prompts.csv')

print(train.shape)
print(test.shape)
print(prompt.shape)

(1378, 4)
(3, 3)
(2, 4)


In [None]:
print(train.head())
# prompt_id: prompt utilisé pour générer le texte 0 ou 1
# text: texte généré ou humain 
# generated: 1 = généré par IA, 0 = humain

         id  prompt_id                                               text  \
0  0059830c          0  Cars. Cars have been around since they became ...   
1  005db917          0  Transportation is a large necessity in most co...   
2  008f63e3          0  "America's love affair with it's vehicles seem...   
3  00940276          0  How often do you ride in a car? Do you drive a...   
4  00c39458          0  Cars are a wonderful thing. They are perhaps o...   

   generated  
0          0  
1          0  
2          0  
3          0  
4          0  


In [11]:
print(test.head())

         id  prompt_id          text
0  0000aaaa          2  Aaa bbb ccc.
1  1111bbbb          3  Bbb ccc ddd.
2  2222cccc          4  CCC ddd eee.


In [12]:
print(prompt.head())

   prompt_id                       prompt_name  \
0          0                   Car-free cities   
1          1  Does the electoral college work?   

                                        instructions  \
0  Write an explanatory essay to inform fellow ci...   
1  Write a letter to your state senator in which ...   

                                         source_text  
0  # In German Suburb, Life Goes On Without Cars ...  
1  # What Is the Electoral College? by the Office...  


In [13]:
print(train.info())
print(test.info())
print(prompt.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1378 entries, 0 to 1377
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   id         1378 non-null   object
 1   prompt_id  1378 non-null   int64 
 2   text       1378 non-null   object
 3   generated  1378 non-null   int64 
dtypes: int64(2), object(2)
memory usage: 43.2+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   id         3 non-null      object
 1   prompt_id  3 non-null      int64 
 2   text       3 non-null      object
dtypes: int64(1), object(2)
memory usage: 204.0+ bytes
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   prompt_id     2 non-null      int64 
 1   prompt_name   2 no

In [14]:
# Model preparation

model = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model)
pretrained_model = BertForSequenceClassification.from_pretrained(model, num_labels=2)
embedding_model = pretrained_model  

# Hyperparameters

train_batch_size = 32
test_batch_size = 32
lr = 0.0002
beta1 = 0.9
nz = 100  # Dimensions of the latent vector
num_epochs = 3
num_hidden_layers = 3
train_ratio = 0.8

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [15]:
# Data preparation
class GANDAIGDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

all_num = len(train) 
train_num = int(all_num * train_ratio)
test_num = all_num - train_num

# Train - 80%, test - 20%
train_set = train.iloc[:train_num] # Les premières 80% pour l’entraînement
test_set = pd.concat([
    train.iloc[train_num:],  
]).reset_index(drop=True)

# Data GAN 
train_dataset = GANDAIGDataset(train_set['text'], train_set['generated'])
test_dataset = GANDAIGDataset(test_set['text'], test_set['generated'])

# Loader
train_loader = torch.utils.data.DataLoader(train_dataset, train_batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, test_batch_size, shuffle=False)

In [16]:
# Generator 
config = BertConfig(num_hidden_layers=num_hidden_layers)

class Generator(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.fc = nn.Linear(input_dim, 256 * 128)

        self.conv_net = nn.Sequential(
           nn.Conv1d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1),
           nn.ReLU(),
           nn.Conv1d(in_channels=512, out_channels=768, kernel_size=3, stride=1, padding=1),
           nn.ReLU(),
           nn.Conv1d(in_channels=768, out_channels=768, kernel_size=3, stride=1, padding=1),
           nn.ReLU(),
)
        self.bert_encoder = BertEncoder(config)


    def forward(self, x):
        x = self.fc(x)  # Transformer le bruit latent en une représentation initiale
        x = x.view(-1, 256, 128)  # Reshape pour correspondre aux dimensions attendues par le CNN
        x = self.conv_net(x)  # Passage dans le réseau convolutionnel
        x = x.permute(0, 2, 1)  # Adapter le format pour BERT (batch_size, seq_len, hidden_dim)
    
        encoder_outputs = self.bert_encoder(x)  # Passage dans l'encodeur BERT
    
        return encoder_outputs.last_hidden_state 


In [17]:
# Discriminator
class SumBertPooler(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        sum_hidden = hidden_states.sum(dim=1)
        sum_mask = sum_hidden.sum(1).unsqueeze(1)
        sum_mask = torch.clamp(sum_mask, min=1e-9)

        mean_embeddings = sum_hidden / sum_mask
        return mean_embeddings


class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert_encoder = BertEncoder(config)
        self.bert_encoder.layer = nn.ModuleList([
            layer for layer in pretrained_model.bert.encoder.layer[:6]
        ])
        self.pooler = SumBertPooler()
        self.classifier = torch.nn.Sequential(
            nn.Linear(config.hidden_size, 256),  # Réduction de la dimension des embeddings BERT
            nn.ReLU(),  # Activation non linéaire
            nn.Dropout(0.1),  # Régularisation pour éviter le sur-apprentissage
            nn.Linear(256, 1)  # Projection finale vers une seule sortie (logit)
)

    def forward(self, input):
        out = self.bert_encoder(input)
        out = self.pooler(out.last_hidden_state)
        out = self.classifier(out)
        return torch.sigmoid(out).view(-1)

In [None]:
# Training 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def eval_auc(model):
    model.eval()

    predictions = []
    actuals = []
    with torch.no_grad():
        for batch in test_loader:
            encodings = tokenizer(batch[0], padding=True, truncation=True, return_tensors="pt").to(device)
            input_ids = encodings["input_ids"]
            token_type_ids = encodings["token_type_ids"]
            attention_mask = encodings["attention_mask"]
            
            embeded = embedding_model(input_ids=input_ids, 
                                      token_type_ids=token_type_ids, 
                                      attention_mask=attention_mask).last_hidden_state

            label = batch[1].float().to(device)

            outputs = model(embeded)
            predictions.extend(outputs.cpu().numpy())
            actuals.extend(label.cpu().numpy())

    auc = roc_auc_score(actuals, predictions)
    print("AUC:", auc)
    return auc

# 1. encode le texte avec le tokenizer.
# 2. passe ces encodages au modèle de classification (embedding_model).
# 3. récupère l'output et on applique AUC-ROC via roc_auc_score.

def get_model_info_dict(model, epoch, auc_score):
    # Récupère l'appareil sur lequel le modèle est actuellement (par exemple, CPU ou GPU)
    current_device = next(model.parameters()).device
    
    # Déplace temporairement le modèle sur le CPU pour sauvegarder son état
    model.to('cpu')

    # Crée un dictionnaire contenant les informations clés pour ce checkpoint
    model_info = {
        'epoch': epoch,  # L'époque en cours
        'model_state_dict': model.state_dict(),  # L'état actuel du modèle (poids, biais, etc.)
        'auc_score': auc_score,  # Le score AUC du modèle pour cette époque
    }

    # Restaure le modèle sur son appareil d'origine (GPU ou CPU)
    model.to(current_device)
    
    # Retourne le dictionnaire des informations
    return model_info

def preparation_embedding(texts):
    encodings = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    input_ids = encodings['input_ids']
    token_type_ids = encodings['token_type_ids']
    embeded = embedding_model(input_ids=input_ids, token_type_ids=token_type_ids)
    return embeded

def GAN_step(optimizerG, optimizerD, netG, netD, real_data, label, epoch, i):
    netD.zero_grad()
    batch_size = real_data.size(0)

    output = netD(real_data)
    errD_real = criterion(output, label)
    errD_real.backward()
    D_x = output.mean().item()

    noise = torch.randn(batch_size, nz, device=device)
    fake_data = netG(noise).last_hidden_state
    label.fill_(1)
    output = netD(fake_data.detach())
    errD_fake = criterion(output, label)
    errD_fake.backward()
    D_G_z1 = output.mean().item()
    errD = errD_real + errD_fake
    optimizerD.step()

    netG.zero_grad()
    label.fill_(0)
    output = netD(fake_data)
    errG = criterion(output, label)
    errG.backward()
    D_G_z2 = output.mean().item()
    optimizerG.step()
    if i % 50 == 0:
        print('[%d/%d][%d/%d] Loss_D: %.4f Loss_G: %.4f D(x): %.4f D(G(z)): %.4f / %.4f')
# % (epoch, num_epochs, i, len(train_loader), errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
    return optimizerG, optimizerD, netG, netD

netG = Generator(nz).to(device)
netD = Discriminator().to(device)

criterion = nn.BCELoss() 
optimizerD = torch.optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = torch.optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

# Stockage des informations du modèle
model_infos = []

# Boucle d'entraînement
for epoch in range(num_epochs):
    for i, data in enumerate(train_loader, 0):
        with torch.no_grad():
            embeded = preparation_embedding(data[0])

        optimizerG, optimizerD, netG, netD = GAN_step(
            optimizerG=optimizerG,
            optimizerD=optimizerD,
            netG=netG,
            netD=netD,
            real_data=embeded.to(device),
            label=data[1].float().to(device),
            epoch=epoch, i=i)

    auc_score = eval_auc(netD)  
    model_infos.append(get_model_info_dict(netD, epoch, auc_score))

print('Train complete！')

NameError: name 'device' is not defined

In [None]:
# Inference
# Chargement du modèle avec les meilleurs paramètres
max_auc_model_info = torch.load('chemin/vers/le/meilleur_modele.pth')  # Remplacez par le bon chemin
model = Discriminator()
model.load_state_dict(max_auc_model_info['model_state_dict'])
model.to(device)
model.eval()

# Dataset pour l'inférence
class InferenceDataset(torch.utils.data.Dataset):
    def __init__(self, texts):
        self.texts = texts

    def __getitem__(self, idx):
        return self.texts[idx]

    def __len__(self):
        return len(self.texts)

# Remplissez ici avec les textes à prédire (par exemple, une liste de textes ou une colonne d'un DataFrame)
sub_dataset = InferenceDataset(texts_to_predict)  # Remplacez 'texts_to_predict' par votre liste de textes

# DataLoader pour l'inférence
inference_loader = torch.utils.data.DataLoader(sub_dataset, batch_size=32, shuffle=False)

# Liste pour stocker les prédictions
sub_predictions = []
with torch.no_grad():
    for batch in inference_loader:
        # Tokenisation des textes
        encodings = tokenizer(batch, padding=True, truncation=True, return_tensors="pt")
        input_ids = encodings['input_ids'].to(device)
        token_type_ids = encodings['token_type_ids'].to(device)

        # Extraction des embeddings du modèle
        embeded = embedding_model(input_ids=input_ids, token_type_ids=token_type_ids)

        # Passer les embeddings à travers le modèle
        embeded = embeded.to(device)
        outputs = model(embeded)
        
        # Collecte des prédictions
        sub_predictions.extend(outputs.cpu().numpy())

# Conversion des résultats en DataFrame ou autre format nécessaire
sub_ans_df = pd.DataFrame(sub_predictions, columns=['prediction'])  # Remplacez 'prediction' par le nom de votre colonne

# Afficher les résultats
print(sub_ans_df)