**To Do List**

1. Implementation of the equation 5 for the variable `weight_classes_dataset`
2. Evaluation methods (e.g., Quadratic Weighted Kappa (QWK))

In [None]:
# !pip install -U transformers

In [None]:
# !pip install gdown

In [None]:
import os
import re
import random
import time
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, BertForSequenceClassification, GPT2Tokenizer, GPT2LMHeadModel

# Set random seed for reproducibility
manualSeed = 0
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

## Inputs




In [None]:
# Number of training epochs
num_epochs = 10

# Learning rate for optimizers
lr = 0.0002

# number of classes in the dataset
n_class_dataset = 6

# name of each class in the dataset
label2class = {0: "human", 1: "chatGPT", 2: "cohere", 3: "davinci", 4: "bloomz", 5: "dolly"}

# batch size
batch_size = 20

# Output dimension of G1 network
d_out = 768

# Dropout parameter
p_dropout = 0.5

# beta1 and beta2 for the ADAM optimizers
betas_ADAM = (0.9, 0.999) # Note: no values reported in the paper

# Decide which device we want to run on
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# lambda values in the loss functions
lambda_score = 1
lambda_feature_matching = 1

## Data




In [None]:
if not os.path.exists("/content/SubtaskB"):
    # https://github.com/mbzuai-nlp/SemEval2024-task8
    url = "https://drive.google.com/drive/folders/11YeloR2eTXcTzdwI04Z-M2QVvIeQAU6-"
    !gdown --folder $url

In [None]:
class SubtaskBDataset(Dataset):
    def __init__(self, file_path):
        self.data = []
        self.models = []
        self.sources = []
        with open(file_path, 'r') as file:
            for line in file:
                sample = json.loads(line)
                model = sample["model"]
                source = sample["source"]
                text = sample["text"]
                y_label = sample["label"]
                input_prompt = "Model: {0}".format(model)
                item = (text, y_label, input_prompt)
                self.data.append(item)
                self.models.append(model)
                self.sources.append(source)
        self.models = np.unique(self.models)
        self.sources = np.unique(self.sources)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
          return self.data[idx]

In [None]:
datasetTrain = SubtaskBDataset("/content/SubtaskB/subtaskB_train.jsonl")
dataloaderTrain =  DataLoader(datasetTrain, batch_size=batch_size, shuffle=True)

print(datasetTrain.__len__())
print(datasetTrain.models)
print(datasetTrain.sources)
print(datasetTrain.__getitem__(0))

In [None]:
datasetVal = SubtaskBDataset("/content/SubtaskB/subtaskB_dev.jsonl")
dataloaderVal =  DataLoader(datasetVal, batch_size=batch_size, shuffle=False)

print(datasetVal.__len__())
print(datasetVal.models)
print(datasetVal.sources)
print(datasetVal.__getitem__(0))

# The fine-tuned models/tokenizers

In [None]:
def loadGPT2(freeze=True, fine_tuned_model_path="alinourian/GPT2-SemEval2023"):
    # Load the fine-tuned model and tokenizer
    modelGPT2 = GPT2LMHeadModel.from_pretrained(fine_tuned_model_path)
    tokenizerGPT2 = GPT2Tokenizer.from_pretrained(fine_tuned_model_path)
    if freeze:
        for param in modelGPT2.parameters():
            param.requires_grad = False
    return tokenizerGPT2, modelGPT2

In [None]:
def loadBERT(freeze, fine_tuned_model_path="mohammadhossein/SemEvalTask8_SubTaskB"):
    # Load the fine-tuned model and tokenizer
    modelBERT = BertForSequenceClassification.from_pretrained(fine_tuned_model_path)
    tokenizerBERT = AutoTokenizer.from_pretrained(fine_tuned_model_path)
    if freeze:
        for param in modelBERT.parameters():
            param.requires_grad = False
    return tokenizerBERT, modelBERT

In [None]:
def blockGPT2(input_prompt, tokenizerGPT2, modelGPT2):
    # Example of an input text : ["Model: chatGPT"]
    encoding = tokenizerGPT2(input_prompt, padding=True, return_tensors="pt").to(device)
    # https://stackoverflow.com/questions/69609401/suppress-huggingface-logging-warning-setting-pad-token-id-to-eos-token-id
    output = modelGPT2.generate(
    **encoding,
    max_length=400,
    num_beams=1,
    temperature=0.8,
    do_sample=True,
    top_k=50,
    top_p=0.95,
    pad_token_id=tokenizerGPT2.eos_token_id
    )
    # A batch of generated texts
    generated_text = tokenizerGPT2.batch_decode(output, skip_special_tokens=True)
    # Remove the model names from the generated texts
    generated_text = [re.sub(r'Model:.+Text: ','', text, flags=re.IGNORECASE) for text in generated_text]

    return generated_text

In [None]:
def blockBERT(input_text, tokenizerBERT, modelBERT):
    # https://discuss.huggingface.co/t/how-to-get-cls-embeddings-from-bertfortokenclassification-model/9276/2
    inputs = tokenizerBERT(input_text, return_tensors="pt", truncation=True, padding=True).to(device)
    outputs = modelBERT(**inputs, output_hidden_states=True)
    last_hidden_states = outputs.hidden_states[-1]
    CLS_hidden_states = last_hidden_states[:, 0, :] # (bs, 768)
    return CLS_hidden_states

## Generator




In [None]:
# # Generator G1, see figure 2a
# class GeneratorG1(nn.Module):
#     def __init__(self):
#         super(GeneratorG1, self).__init__()
#         self.main = nn.Sequential(
#             # input: Z
#             nn.Linear(d_in, d_out),
#             nn.LeakyReLU(0.2),
#             nn.Dropout(p=p_dropout),
#             nn.Linear(d_out, d_out)
#             # output: v_G
#         )

#     def forward(self, input):
#         return self.main(input)

In [None]:
# Generator G3, see figure 2c
class GeneratorG3(nn.Module):
    def __init__(self):
        super(GeneratorG3, self).__init__()
        tokenizerGPT2, modelGPT2 = loadGPT2(freeze=True)
        tokenizerBERT, modelBERT = loadBERT(freeze=False)
        self.tokenizerGPT2 = tokenizerGPT2
        self.tokenizerBERT = tokenizerBERT
        self.modelGPT2 = modelGPT2
        self.modelBERT = modelBERT

    def forward(self, input_prompt):
        generated_text = blockGPT2(input_prompt, self.tokenizerGPT2, self.modelGPT2)
        v_G = blockBERT(generated_text, self.tokenizerBERT, self.modelBERT)
        return generated_text, v_G

In [None]:
# Create the generator
netG = GeneratorG3().to(device)

# Print the model
print(netG)

## Discriminator




In [None]:
# Discriminator D, see figure 2d
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.seq1 = nn.Sequential(
            # input: v_G or v_B
            nn.Dropout(p=p_dropout),
            nn.Linear(d_out, d_out))
        self.seq2 = nn.Sequential(
            nn.LeakyReLU(0.2),
            nn.Dropout(p=p_dropout),
            nn.Linear(d_out, 1 + n_class_dataset), # +1 for the probability of this sample being fake/real.
            # output: logits, format: [fake score, dataset classes]
        )
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, input):
        features = self.seq1(input) # required for the feature matching loss
        logits = self.seq2(features)
        probs = self.softmax(logits)
        return features, logits, probs

In [None]:
# Create the Discriminator
netD = Discriminator().to(device)

# Print the model
print(netD)

## BERT (red block)

In [None]:
# BERT block for processing the real dataset, pictured in red in figure 1
tokenizerBERT_red, modelBERT_red = loadBERT(freeze=True)
modelBERT_red.to(device)

## Training




In [None]:
criterionGAN = nn.BCEWithLogitsLoss()
criterionScore = nn.CrossEntropyLoss()

# Setup Adam optimizers for both G and D
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=betas_ADAM)
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=betas_ADAM)

# Establish convention for real and fake labels during training
real_label = 0.
fake_label = 1.

In [None]:
# Training Loop

# Lists to keep track of progress
D_losses = []
G_losses = []
iters = 0

time_start = time.time()
print("Starting Training Loop ...")
# For each epoch
for epoch in range(num_epochs):
    # For each batch in the dataloader
    for i, data in enumerate(dataloaderTrain, 0):

        # data is a list of [text, y_label, input_prompt]
        text = data[0] # text samples
        y_label = data[1].to(device) # true class labels
        input_prompt = data[2] # used for GPT2
        # output of the BERT module for real samples (CLS hidden state)
        v_B = blockBERT(text, tokenizerBERT_red, modelBERT_red)

        ############################
        # (1) Update D network
        ############################
        ## Train with all-real batch
        netD.zero_grad()

        labelGAN = torch.full((batch_size,), real_label, dtype=torch.float, device=device)
        _, logits_real, _ = netD(v_B)
        loss_D_real = criterionGAN(logits_real[:, 0], labelGAN)
        loss_D_score = criterionScore(logits_real[:, 1:], y_label)
        loss_D_real_and_score = loss_D_real + lambda_score*loss_D_score
        loss_D_real_and_score.backward() # Calculate gradients for D

        ## Train with all-fake batch
        _, v_G = netG(input_prompt)
        labelGAN.fill_(fake_label)
        _, logits_fake, _ = netD(v_G.detach())
        loss_D_fake = criterionGAN(logits_fake[:, 0], labelGAN)
        loss_D_fake.backward() # Calculate gradients for D

        loss_D_total = loss_D_real + loss_D_fake + lambda_score*loss_D_score
        optimizerD.step() # Update D

        ############################
        # (2) Update G network
        ############################
        netG.zero_grad()

        labelGAN.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass
        features_real, _, _ = netD(v_B)
        features_fake, logits_fake, _ = netD(v_G)
        loss_G_caught = criterionGAN(logits_fake[:, 0], labelGAN)
        loss_G_feature_matching = torch.mean(torch.square(torch.mean(features_real, dim=0) - torch.mean(features_fake, dim=0)))

        loss_G_total = loss_G_caught + lambda_feature_matching*loss_G_feature_matching
        loss_G_total.backward() # Calculate gradients for G
        optimizerG.step() # Update G

        ############################
        # Training statistics
        ############################
        if i % 10 == 0:
            elapsed_time = time.time() - time_start
            print('[%3d/%3d][%3d/%3d]\tLoss_D: %.4f\tLoss_G: %.4f\tElapsedTime: %.1f s'
                  % (epoch, num_epochs, i, len(dataloaderTrain), loss_D_total.item(), loss_G_total.item(), elapsed_time))

        # Save Losses for plotting later
        D_losses.append(loss_D_total.item())
        G_losses.append(loss_G_total.item())

        iters += 1
print("--------------------------------------------------------------------------------------------")

Training Loss


In [None]:
plt.figure(figsize=(6, 4))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses, label="G")
plt.plot(D_losses, label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()