# Fine Tuning Roberta to Detect Machine Generated Text





## Setup

### Env & Packages

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install transformers
!pip install jsonlines
!pip install gdown==4.6.3

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import svm, datasets
from sklearn.metrics import confusion_matrix
import torch
import seaborn as sns
import transformers
import json
import jsonlines
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaModel, RobertaTokenizer, AutoModel, AutoTokenizer
import logging
logging.basicConfig(level=logging.ERROR)

In [None]:
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

### Load Packages & Dataset

In [None]:
# Get data from SemEval
!gdown --folder https://drive.google.com/drive/folders/1CAbb3DjrOPBNm0ozVBfhvrEh9P9rAppc
!gdown --folder https://drive.google.com/drive/folders/10DKtClzkwIIAatzHBWXZXuQNID-DNGSG

# Monolingual dataset
train_data_path = "/content/SubtaskA/subtaskA_train_monolingual.jsonl"
dev_data_path = "/content/SubtaskA/subtaskA_dev_monolingual.jsonl"

# The following is only for submission
submission_data_path = "/content/SemEval2024-Task8-test/subtaskA_monolingual.jsonl"

In [None]:
# Data loader
def load_data(path):
  data = []
  with jsonlines.open(path) as f:
    for line in f.iter():
      data.append(line)
  pd_data = pd.json_normalize(data, meta=['id', 'label', 'text', 'model', 'source'])

  return pd_data

In [None]:
train_data = load_data(train_data_path)
eval_data = load_data(dev_data_path)
# The following is only for submission
submission_data = load_data(submission_data_path)

In [None]:
print("Train:", train_data.shape)
print("Eval:", eval_data.shape)
print("Test:", submission_data.shape)
print("-"*200)
print("Summary:")
print(train_data.head())
print(eval_data.head())
print(submission_data.head())
print("-"*200)
print("Labels:")
train_data['label'].unique()
eval_data['label'].unique()
print("-"*200)
print("Statistics:")
print(train_data.describe())
print(eval_data.describe())
print(submission_data.describe())

In [None]:
train_df = train_data[['text', 'label']]
eval_df = eval_data[['text', 'label']]
submission_df = submission_data[['text']]

<a id='section03'></a>
### Preparing Model Parameters

In [None]:
MAX_LEN = 512
TRAIN_BATCH_SIZE = 10
VALID_BATCH_SIZE = 10
# EPOCHS = 1
LEARNING_RATE = 4e-05
# base checkpoint is roberta-base
CHECKPOINT = "roberta-base" #@param {type:"string"}

<a id='section04'></a>
### Creating the Neural Network

#### The Neural Network
 A neural network with the `RobertaClass`. The Roberta Language model is used with a classifier head on the top of it. The model is followed by a `dropout` and finally a `Linear` layer to obtain the final outputs.

In [None]:
from transformers import AutoModel, AutoConfig

class RobertaClass(torch.nn.Module):
    def __init__(self):
        super(RobertaClass, self).__init__()
        self.l1 = RobertaModel.from_pretrained(CHECKPOINT)
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.5)
        self.classifier = torch.nn.Linear(768, 2)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        hidden_state = output_1[0]
        pooler = hidden_state[:, 0]
        pooler = self.pre_classifier(pooler)
        pooler = torch.nn.ReLU()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

In [None]:
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT, truncation=True, do_lower_case=True)
model = RobertaClass()
model.to(device)

<a id='section03'></a>
### Preparing the Dataset and the Dataloader

In [None]:
class ClassifyData(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.text = dataframe.text
        self.max_len = max_len
        if 'label' in self.data:
          self.targets = self.data.label

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]

        items = {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
        }
        if 'label' in self.data:
            items['targets'] = torch.tensor(self.targets[index], dtype=torch.float)

        return items

In [None]:
train_size = 0.9
train_data=train_df.sample(frac=train_size,random_state=200)
test_data=train_df.drop(train_data.index).reset_index(drop=True)
# train_data=new_df.sample(n=10000, random_state=200)
train_data = train_data.reset_index(drop=True)
eval_data=eval_df.sample(5000, random_state=200)

print("FULL Dataset: {}".format(train_df.shape))
print("TRAIN Dataset: {}".format(train_data.shape))
print("TEST Dataset: {}".format(test_data.shape))
print("EVAL Dataset: {}".format(eval_data.shape))

training_set = ClassifyData(train_data, tokenizer, MAX_LEN)
testing_set = ClassifyData(test_data, tokenizer, MAX_LEN)
eval_set = ClassifyData(eval_data, tokenizer, MAX_LEN)

print("Dataset number by each class:")
print(train_data['label'].value_counts())
print(test_data['label'].value_counts())
print(eval_data['label'].value_counts())

In [None]:
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

eval_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)
eval_loader = DataLoader(eval_set, **eval_params)

## Train & Evaluation

<a id='section05'></a>
### Fine Tuning the Model
The dataloader passes data to the model based on the batch size and the output from the model and the actual label are compared to calculate the loss. After every 5000 steps the loss value is printed in the console.

In [None]:
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(params =  model.parameters(), lr=LEARNING_RATE)

In [None]:
def calculate_accuracy(preds, targets):
    n_correct = (preds==targets).sum().item()
    return n_correct

In [None]:
def train(epoch):
    tr_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    model.train()
    for _,data in tqdm(enumerate(training_loader, 0)):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(ids, mask, token_type_ids)
        loss = loss_function(outputs, targets)
        tr_loss += loss.item()
        big_val, big_idx = torch.max(outputs.data, dim=1)
        n_correct += calculate_accuracy(big_idx, targets)

        nb_tr_steps += 1
        nb_tr_examples+=targets.size(0)

        if _%STEPS==0:
            loss_step = tr_loss/nb_tr_steps
            accu_step = (n_correct*100)/nb_tr_examples
            print(f"Training Loss per {STEPS} steps: {loss_step}")
            print(f"Training Accuracy per {STEPS} steps: {accu_step}")

        optimizer.zero_grad()
        loss.backward()
        # When using GPU
        optimizer.step()

    print(f'The Total Accuracy for Epoch {epoch}: {(n_correct*100)/nb_tr_examples}')
    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    print(f"Training Loss Epoch: {epoch_loss}")
    print(f"Training Accuracy Epoch: {epoch_accu}")

    return epoch_loss

In [None]:
EPOCHS = 2
TOL = 0.35
STEPS = round((EPOCHS * len(train_data)) / TRAIN_BATCH_SIZE)

for epoch in range(EPOCHS):
    print("-" * 200, f"\n Epoch {epoch}:")
    training_loss = train(epoch)
    try:
      if training_loss < TOL: # Early stop
        break
    except:
      pass

<a id='section06'></a>
### Validating the Model

In [None]:
def valid(model, data_loader, loss_function):
    model.eval()
    epoch_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0

    with torch.no_grad():
        for _, data in tqdm(enumerate(data_loader, 0), total=len(data_loader)):
            ids = data['ids'].to(device, dtype=torch.long)
            mask = data['mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype=torch.long)

            outputs = model(ids, mask, token_type_ids).squeeze()
            loss = loss_function(outputs, targets)

            epoch_loss += loss.item()

            big_val, big_idx = torch.max(outputs.data, dim=1)
            n_correct += calculate_accuracy(big_idx, targets)

            nb_tr_steps += 1
            nb_tr_examples += targets.size(0)

            if _ % 5000 == 0:
                loss_step = epoch_loss / nb_tr_steps
                accu_step = (n_correct * 100) / nb_tr_examples
                print(f"Validation Loss per 100 steps: {loss_step}")
                print(f"Validation Accuracy per 100 steps: {accu_step}")

    epoch_loss /= nb_tr_steps
    epoch_accu = (n_correct * 100) / nb_tr_examples
    print(f"Validation Loss Epoch: {epoch_loss}")
    print(f"Validation Accuracy Epoch: {epoch_accu}")

    return epoch_accu, epoch_loss

In [None]:
valid_acc, valid_loss = valid(model, eval_loader, loss_function)
print("Accuracy on test data = %0.2f%%" % valid_acc)
print("Loss on test data = %0.2f%%" % valid_loss)

<a id='section07'></a>
### Saving the Trained Model Weights for Inference
The model and its vocabulary are saved to Google Drive.

In [None]:
checkpoint_save_name = 'model_checkpoint.pt'

In [None]:
#!/bin/bash
checkpoint_save_name = "model_checkpoint.pt"

In [None]:
checkpoint = {
  'epoch': 1, # Change this according to the epochs took to train the model
  'model_state_dict': model,
  'optimizer_state_dict': optimizer.state_dict(),
  'loss': training_loss,
  'val_loss': valid_loss
}
torch.save(checkpoint, checkpoint_save_name)

In [None]:
!cp $checkpoint_save_name drive/MyDrive/SemEval-Task8

## Inference

### Setup the Model

In [None]:
model_path = "/content/drive/MyDrive/SemEval-Task8/model_checkpoint.pt"
map_location = None
if device == "cpu":
  map_location = device
checkpoint = torch.load(model_path, map_location)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm

def inference(model, data_loader, is_labeled):
    model.eval()
    predictions = []
    predicted_probabilities = []
    targets_list = []

    with torch.no_grad():
        for _, data in tqdm(enumerate(data_loader, 0), total=len(data_loader)):
          ids = data['ids'].to(device, dtype=torch.long)
          mask = data['mask'].to(device, dtype=torch.long)
          token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)

          outputs = model(ids, mask, token_type_ids).squeeze()

          # To extract the probabilities
          m = nn.Sigmoid()
          output_probabilities = m(outputs).detach().cpu().numpy()

          _, predicted_class = torch.max(outputs, dim=1)
          predictions.extend(predicted_class.cpu().numpy())
          predicted_probabilities.extend(output_probabilities.cpu().numpy())
          print(predicted_probabilities)
          if is_labeled:
            targets = data['targets'].to(device, dtype=torch.long)
            targets_list.extend(targets.cpu().numpy())
          break

    if is_labeled:
      total_samples = len(targets_list)
      correct_predictions = (torch.tensor(predictions) == torch.tensor(targets_list)).sum().item()
      accuracy = correct_predictions / total_samples * 100.0

      print(f'Accuracy: {accuracy:.2f}%')

      return predictions, targets_list

    return predictions

### Predicting New Instances

In [None]:
inference_dataset = ClassifyData(eval_df, tokenizer, MAX_LEN) #submission_df
is_labeled = True # False for sumbission

inference_dataset_params = {'batch_size': 32,
                'shuffle': False,
                'num_workers': 0
              }
inference_data_loader = DataLoader(inference_dataset, **inference_dataset_params)

pred = inference(model, inference_data_loader, is_labeled)

### Save Predictions

In [None]:
predictions = []

with jsonlines.open(submission_data_path) as f: # dev_data_path
  for idx, line in enumerate(tqdm(f.iter())):
    predictions.append({'id': line['id'], 'label': int(pred[idx])})

with open("pred.json", 'w') as f:
  for item in predictions:
    f.write(json.dumps(item) + "\n")

In [None]:
!wget https://raw.githubusercontent.com/mbzuai-nlp/SemEval2024-task8/main/subtaskA/format_checker/format_checker.py
!wget https://raw.githubusercontent.com/mbzuai-nlp/SemEval2024-task8/main/subtaskA/scorer/scorer.py

In [None]:
!python3 format_checker.py --pred_files_path=pred.json

In [None]:
!python3 scorer.py --gold_file_path=/content/SubtaskA/subtaskA_dev_monolingual.jsonl --pred_file_path=/content/pred.json

### Plot

In [None]:
import matplotlib.pyplot as plt
import numpy as np

pred_labels, true_labels = pred

def plotGraph(y_test, y_pred, regressorName):
    if max(y_test) >= max(y_pred):
        my_range = int(max(y_test))
    else:
        my_range = int(max(y_pred))
    plt.scatter(range(len(y_test)), y_test, color='blue')
    plt.scatter(range(len(y_pred)), y_pred, color='red')
    plt.title(regressorName)
    plt.show()
    return


plotGraph(true_labels, pred_labels, "Classification")

In [None]:
pred_labels, true_labels = pred

conf_matrix = confusion_matrix(true_labels, pred_labels)

# Plot confusion matrix using seaborn
plt.figure(figsize=(6, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, precision_recall_curve, auc

# ROC Curve
fpr, tpr, thresholds_roc = roc_curve(true_labels, pred_labels)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC Curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Precision-Recall Curve
precision, recall, thresholds_pr = precision_recall_curve(true_labels, pred_labels)
pr_auc = auc(recall, precision)

plt.figure(figsize=(8, 8))
plt.plot(recall, precision, color='darkorange', lw=2, label=f'Precision-Recall Curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall (Sensitivity)')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='upper right')
plt.show()