<a href="https://colab.research.google.com/github/Sharanya-Parimanoharan/AI-Generated-Text-Detection/blob/main/BERT_CustomClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import transformers
from transformers import AutoModel, BertTokenizerFast
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report
import os
import matplotlib.pyplot as plt

In [None]:
##### Set random seeds for reproducibility
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
   torch.cuda.manual_seed_all(seed)

In [None]:
# specify GPU
##device = torch.device("cuda")
# Check if CUDA (GPU) is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"CUDA is available. Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU.")



In [None]:

df = pd.read_csv("drive/MyDrive/data_2.csv")
print(df.head())

In [None]:

# import BERT-base pretrained model
bert = AutoModel.from_pretrained('bert-base-uncased')

#Load bert tokenizer
tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")

label_mapping = {'"AI"': 1, '"Human"': 0}



In [None]:

# split train dataset into train, validation and test sets
train_text, temp_text, train_labels, temp_labels = train_test_split(df['text'], df['label'],
                                                                    random_state=2018,
                                                                    test_size=0.3,
                                                                    stratify=df['label'])


val_text, test_text, val_labels, test_labels = train_test_split(temp_text, temp_labels,
                                                                random_state=2018,
                                                                test_size=0.5,
                                                                stratify=temp_labels)



In [None]:

# get length of all the messages in the train set
seq_len = [len(i.split()) for i in train_text]

##pd.Series(seq_len).hist(bins = 30)

# tokenize and encode sequences in the training set
#breaks into single words and returns tokenid(integers corresponding to words)
tokens_train = tokenizer.batch_encode_plus(
    train_text.tolist(),
    max_length = 25,
    padding='max_length',
    truncation=True
)

# tokenize and encode sequences in the validation set
tokens_val = tokenizer.batch_encode_plus(
    val_text.tolist(),
    max_length = 25,
    padding='max_length',
    truncation=True
)

# tokenize and encode sequences in the test set
tokens_test = tokenizer.batch_encode_plus(
    test_text.tolist(),
    max_length = 25,
    padding='max_length',
    truncation=True
)

In [None]:

## convert lists to tensors
label_mapping = {'"AI"': 1, '"Human"': 0}

train_seq = torch.tensor(tokens_train['input_ids'])
train_mask = torch.tensor(tokens_train['attention_mask'])
train_labels_encoded = train_labels.map(label_mapping).tolist()
print(train_labels_encoded)
train_y = torch.tensor(train_labels_encoded,dtype=torch.long)

val_seq = torch.tensor(tokens_val['input_ids'])
val_mask = torch.tensor(tokens_val['attention_mask'])
val_labels_encoded = val_labels.map(label_mapping).tolist()
val_y = torch.tensor(val_labels_encoded,dtype=torch.long)

test_seq = torch.tensor(tokens_test['input_ids'])
test_mask = torch.tensor(tokens_test['attention_mask'])
test_labels_encoded = test_labels.map(label_mapping)
test_y = torch.tensor(test_labels_encoded.tolist(),dtype=torch.long)


In [None]:

from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

#define a batch size
batch_size = 32

# wrap tensors
train_data = TensorDataset(train_seq, train_mask, train_y)

# sampler for sampling the data during training
train_sampler = RandomSampler(train_data)

# dataLoader for train set
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

# wrap tensors
val_data = TensorDataset(val_seq, val_mask, val_y)

# sampler for sampling the data during training
val_sampler = SequentialSampler(val_data)

# dataLoader for validation set
val_dataloader = DataLoader(val_data, sampler = val_sampler, batch_size=batch_size)



# freeze all the parameters
for param in bert.parameters():
    param.requires_grad = False


In [None]:

class BERT_Arch(nn.Module):

    ##Constructor method for BERT_Arch class
    def __init__(self, bert):

      super(BERT_Arch, self).__init__()

      self.bert = bert

      # dropout layer-prevents overfitting
      self.dropout = nn.Dropout(0.1)

      # relu activation function
      self.relu =  nn.ReLU()

      # dense layer 1
      self.fc1 = nn.Linear(768,512)

      # dense layer 2 (Output layer)
      self.fc2 = nn.Linear(512,2)

      #softmax activation function is used to compute class probabilities
      self.softmax = nn.LogSoftmax(dim=1)

    #define the forward pass
    def forward(self, sent_id, mask):

      #pass the inputs to the model
      ##      _, cls_hs = self.bert(sent_id, attention_mask=mask)

     # Pass the inputs to the BERT model
      outputs = self.bert(sent_id, attention_mask=mask)

    # Extract the 'last_hidden_state' from the BERT model's outputs
      hidden_states = outputs.last_hidden_state

    # Apply pooling or any other operations on the hidden_states if needed
    # For example, to get a fixed-size representation (CLS token):
      cls_token = hidden_states[:, 0, :]
      x=self.dropout(cls_token)

      x = self.fc1(x)

      x = self.relu(x)

      x = self.dropout(x)

      # output layer
      x = self.fc2(x)

      # apply softmax activation
      x = self.softmax(x)

      return x

In [None]:

# pass the pre-trained BERT to our define architecture
model = BERT_Arch(bert)

# push the model to GPU
model = model.to(device)


# define the optimizer
optimizer = optim.AdamW(model.parameters(),
                  lr = 1e-5)          # learning rate

from sklearn.utils.class_weight import compute_class_weight

#compute the class weights
class_weights = compute_class_weight('balanced',classes=np.unique(train_labels),y=train_labels)



# converting list of class weights to a tensor
weights= torch.tensor(class_weights,dtype=torch.float)

# push to GPU
weights = weights.to(device)

# define the loss function
cross_entropy  = nn.CrossEntropyLoss(weight=weights)

# number of training epochs
epochs = 10


In [None]:
# empty lists to store training and validation loss of each epoch
train_losses=[]
valid_losses=[]

accuracies_per_epoch = []

num_runs = 5
accuracies = []

for run in range(num_runs):
    print(f"Run {run + 1}/{num_runs}")

    # Your training and evaluation code...

    # function to train the model
    def train():

      #set the model to training mode
      model.train()

        #to keep the track of entire training dataset
      total_loss, total_accuracy = 0, 0

      # empty list to save model predictions
      total_preds=[]

      # iterate over batches
      for step,batch in enumerate(train_dataloader):

        # progress update after every 50 batches.
        if step % 50 == 0 and not step == 0:
          print('  Batch {:>5,}  of  {:>5,}.'.format(step, len(train_dataloader)))

        # push the batch to gpu
        batch = [r.to(device) for r in batch]

        sent_id, mask, labels = batch

        # clear previously calculated gradients
        model.zero_grad()

        # get model predictions for the current batch
        preds = model(sent_id, mask)
        ##    preds = preds.long()  # Cast predictions to Long data type


        # compute the loss between actual and predicted values
        loss = cross_entropy(preds, labels)

        # add on to the total loss
        total_loss = total_loss + loss.item()

        # backward pass to calculate the gradients
        loss.backward()

        # clip the the gradients to 1.0. It helps in preventing the exploding gradient problem
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # update parameters
        optimizer.step()

        # model predictions are stored on GPU. So, push it to CPU
        preds=preds.detach().cpu().numpy()

        # append the model predictions
        total_preds.append(preds)

      # compute the training loss of the epoch
      avg_loss = total_loss / len(train_dataloader)

      # predictions are in the form of (no. of batches, size of batch, no. of classes).
      # reshape the predictions in form of (number of samples, no. of classes)
      total_preds  = np.concatenate(total_preds, axis=0)

      #returns the loss and predictions
      return avg_loss, total_preds


    # function for evaluating the model
    def evaluate():

      print("\nEvaluating...")

      # deactivate dropout layers
      model.eval()

      total_loss, total_accuracy = 0, 0

      # empty list to save the model predictions
      total_preds = []

      # iterate over batches
      for step,batch in enumerate(val_dataloader):

        # Progress update every 50 batches.
        if step % 50 == 0 and not step == 0:

          # Calculate elapsed time in minutes.
          elapsed = format_time(time.time() - t0)

          # Report progress.
          print('  Batch {:>5,}  of  {:>5,}.'.format(step, len(val_dataloader)))

        # push the batch to gpu
        batch = [t.to(device) for t in batch]

        sent_id, mask, labels = batch

        # deactivate autograd
        with torch.no_grad():

          # model predictions
          preds = model(sent_id, mask)

          # compute the validation loss between actual and predicted values
          loss = cross_entropy(preds,labels)

          total_loss = total_loss + loss.item()

          preds = preds.detach().cpu().numpy()

          total_preds.append(preds)

      # compute the validation loss of the epoch
      avg_loss = total_loss / len(val_dataloader)

      # reshape the predictions in form of (number of samples, no. of classes)
      total_preds  = np.concatenate(total_preds, axis=0)

      return avg_loss, total_preds


    # set initial loss to infinite
    best_valid_loss = float('inf')



    #for each epoch
    for epoch in range(epochs):

        print('\n Epoch {:} / {:}'.format(epoch + 1, epochs))

        #train model
        train_loss, _ = train()

        #evaluate model
        valid_loss, _ = evaluate()

        #save the best model
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), 'saved_weights.pt')

        # append training and validation loss
        train_losses.append(train_loss)
        valid_losses.append(valid_loss)

        print(f'\nTraining Loss: {train_loss:.3f}')
        print(f'Validation Loss: {valid_loss:.3f}')




In [None]:
   torch.save(model.state_dict(), 'drive/MyDrive/saved_weights.pt')


In [None]:

 #load weights of best model
path = 'drive/MyDrive/saved_weights.pt'
model.load_state_dict(torch.load(path))

# get predictions for test data
with torch.no_grad():
   preds = model(test_seq.to(device), test_mask.to(device))
   preds = preds.detach().cpu().numpy()

preds = np.argmax(preds, axis = 1)
#print(classification_report(test_y, preds))


    # Assuming true_labels and predicted_labels are your ground truth and predicted labels, respectively
accuracy = accuracy_score(test_y, preds)
precision = precision_score(test_y, preds, pos_label=1)  # Assuming "AI" is the positive class
recall = recall_score(test_y, preds, pos_label=1)  # Assuming "AI" is the positive class
f1 = f1_score(test_y, preds, pos_label=1)  # Assuming "AI" is the positive class
roc_auc = roc_auc_score(test_y, preds)
#confusion = confusion_matrix(test_y, preds, labels=['"Human"', '"AI"'])
report = classification_report(test_y, preds)


print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)
print("ROC AUC:`", roc_auc)
print("Classification Report:\n", report)
print(test_y)
print()
print(preds)
accuracy = accuracy_score(test_y, preds)
accuracies.append(accuracy)
accuracies_per_epoch.append(accuracies)

### Compute and print the average accuracy
##average_accuracy = np.mean(accuracies)
##print(f"Average Accuracy: {average_accuracy:.4f}")

#print(len(accuracies_per_epoch))


In [None]:

# plot training loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(valid_losses, label='Validation Loss')

plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs')
plt.legend()
plt.show()

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score

pr, tpr, thresholds = roc_curve(test_y, preds)

    # Calculate AUC
auc_value = roc_auc_score(test_y, preds)

        # Plot ROC curve
plt.figure()
plt.plot(pr, tpr, color='darkorange', lw=2, label=f'AUC = {auc_value:.2f}')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic Curve')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Define the path to the saved weights file
path = 'drive/MyDrive/saved_weights.pt'

# Load the weights into the model
model.load_state_dict(torch.load(path))

# Load your labeled test dataset
test_df = pd.read_csv("drive/MyDrive/test.csv")  # Replace with the actual file path

# Assuming your dataset has 'text' and 'label' columns
test_texts = test_df['text'].tolist()
test_labels = test_df['label'].tolist()

# Tokenize the test texts
tokens_test = tokenizer.batch_encode_plus(
    test_texts,
    max_length=25,
    padding='max_length',
    truncation=True
)

# Convert lists to PyTorch tensors
test_seq = torch.tensor(tokens_test['input_ids'])
test_mask = torch.tensor(tokens_test['attention_mask'])
test_labels_encoded = [label_mapping[label] for label in test_labels]
test_y = torch.tensor(test_labels_encoded, dtype=torch.long)

# Create a DataLoader for the test data
test_dataset = TensorDataset(test_seq, test_mask, test_y)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Set the model to evaluation mode
model.eval()

# Make predictions on the test set
with torch.no_grad():
    test_preds = []

    for batch in test_dataloader:
        batch = [t.to(device) for t in batch]
        sent_id, mask, labels = batch

        preds = model(sent_id, mask)
        preds = torch.argmax(preds, dim=1).cpu().numpy()
        test_preds.extend(preds)

# Convert predictions to NumPy array
test_preds = np.array(test_preds)

# Calculate accuracy
accuracy = accuracy_score(test_y.numpy(), test_preds)
print(test_y.numpy())
print(test_preds)
print("Accuracy:", accuracy)


In [None]:
# Print some text, true labels, and predictions
for i in range(min(25, len(test_y))):  # Print the first 5 samples
    print(f"Text: {test_df['text'].iloc[i]}")
    print(f"True Label: {test_df['label'].iloc[i]}, Predicted Label: {test_preds[i]}")
    print("="*50)

In [None]:
# Define the path to the saved weights file
path = 'drive/MyDrive/saved_weights.pt'

# Load the weights into the model
model.load_state_dict(torch.load(path))

# Load your labeled test dataset
test_df = pd.read_csv("drive/MyDrive/test_new.csv")  # Replace with the actual file path

# Assuming your dataset has 'text' and 'label' columns
test_texts = test_df['text'].tolist()
test_labels = test_df['label'].tolist()

# Tokenize the test texts
tokens_test = tokenizer.batch_encode_plus(
    test_texts,
    max_length=25,
    padding='max_length',
    truncation=True
)

# Convert lists to PyTorch tensors
test_seq = torch.tensor(tokens_test['input_ids'])
test_mask = torch.tensor(tokens_test['attention_mask'])
test_labels_encoded = [label_mapping[label] for label in test_labels]
test_y = torch.tensor(test_labels_encoded, dtype=torch.long)

# Create a DataLoader for the test data
test_dataset = TensorDataset(test_seq, test_mask, test_y)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Set the model to evaluation mode
model.eval()

# Make predictions on the test set
with torch.no_grad():
    test_preds = []

    for batch in test_dataloader:
        batch = [t.to(device) for t in batch]
        sent_id, mask, labels = batch

        preds = model(sent_id, mask)
        preds = torch.argmax(preds, dim=1).cpu().numpy()
        test_preds.extend(preds)

# Convert predictions to NumPy array
test_preds = np.array(test_preds)

np.savetxt('/content/drive/MyDrive/pred_BERT.csv', test_preds, delimiter=',',  fmt='%.2f')


# Calculate accuracy
accuracy = accuracy_score(test_y.numpy(), test_preds)
precision_ai = precision_score(test_y, test_preds, pos_label=1)  # Assuming "AI" is the positive class
precision_human = precision_score(test_y, test_preds, pos_label=0)  # Assuming "AI" is the positive class

recall_ai = recall_score(test_y, test_preds, pos_label=1)  # Assuming "AI" is the positive class
recall_human = recall_score(test_y, test_preds, pos_label=0)  # Assuming "AI" is the positive class

f1_ai = f1_score(test_y, test_preds, pos_label=1)  # Assuming "AI" is the positive class
f1_human = f1_score(test_y, test_preds, pos_label=0)  # Assuming "AI" is the positive class
conf_matrix = confusion_matrix(test_y, test_preds)


roc_auc = roc_auc_score(test_y, test_preds)
print(test_y.numpy())
print(test_preds)
print("Accuracy:", accuracy)
print("Precision:", precision_ai)
print("Precision:", precision_human)

print("Recall:", recall_ai)
print("Recall:", recall_human)

print("F1-Score:", f1_ai)
print("F1-Score:", f1_human)

print("ROC AUC:`", roc_auc)

In [None]:

import matplotlib.pyplot as plt
import numpy as np

# Sample data (replace with your actual values)
ai_values = [precision_ai,recall_ai,f1_ai]
human_values = [precision_human,recall_human,f1_human]

# Metrics names
metrics = [ 'Precision', 'Recall', 'F1-Score']

# Number of metrics
num_metrics = len(metrics)

# Create an array of indices for each metric
indices = np.arange(num_metrics)

# Bar width
bar_width = 0.1

# Plotting
fig, ax = plt.subplots(figsize=(6, 4))

# Plot AI values
ax.bar(indices, ai_values, bar_width, label='AI', color='blue',alpha=0.5)

# Plot Human values
ax.bar(indices + bar_width, human_values, bar_width, label='Human', color='orange')

# Customize the plot
ax.tick_params(axis='both', labelsize=15, labelcolor='black')

# Make the axis labels (x and y) bold
ax.xaxis.label.set_fontweight('bold')
ax.yaxis.label.set_fontweight('bold')
ax.set_xticks(indices + bar_width / 2)

ax.set_xticklabels(metrics)
ax.legend()
ax.set_ylabel('Metric Values')
#ax.set_title('Comparison of AI and Human Performance Metrics')


# Show the plot
plt.show()


In [None]:
# Print some text, true labels, and predictions
for i in range(min(25, len(test_y))):  # Print the first 5 samples
    print(f"Text: {test_df['text'].iloc[i]}")
    print(f"True Label: {test_df['label'].iloc[i]}, Predicted Label: {test_preds[i]}")
    print("="*50)

In [None]:
# Extract values from confusion matrix
tn, fp, fn, tp = conf_matrix.ravel()

# Plotting
labels = ['True Negative', 'False Positive', 'False Negative', 'True Positive']
values = [tn, fp, fn, tp]

fig, ax = plt.subplots()
bars = ax.bar(labels, values, color=['pink', 'yellow', 'orange', 'blue'],alpha=0.8,width=0.3)

# Add labels and title
ax.set_ylabel('Count')
ax.set_title('Confusion Matrix')

# Add value annotations on top of the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, round(yval, 2), ha='center', va='bottom')

ax.tick_params(axis='y', labelsize=20, labelcolor='black')
ax.set_xticklabels(labels, fontweight='bold')
plt.show()