In [None]:
!pip install -U -q mlflow datasets>=2.14.5 nlp 2>/dev/null

In [None]:
wandb_key = ''

In [None]:
import pandas as pd
import gc
import re
import numpy as np

import warnings
warnings.filterwarnings("ignore")

import torch
from transformers import AutoModel, AutoTokenizer
from transformers import TextDataset, LineByLineTextDataset, DataCollatorForLanguageModeling, \
pipeline, Trainer, TrainingArguments, DataCollatorWithPadding
from transformers import AutoModelForSequenceClassification

# from nlp import Dataset
from imblearn.over_sampling import RandomOverSampler
import datasets
from datasets import Dataset, Image, ClassLabel
from transformers import pipeline
from bs4 import BeautifulSoup

import matplotlib.pyplot as plt
import itertools
from sklearn.metrics import (
    accuracy_score,
    roc_auc_score,
    confusion_matrix,
    classification_report,
    f1_score
)

from datasets import load_metric

from tqdm import tqdm
tqdm.pandas()

In [None]:
train_fraction = 0.8
num_train_epochs = 10
learning_rate = 1e-7
train_batch_size = 8
eval_batch_size = 64
warmup_steps = 50
weight_decay = 0.02
BERT_MODEL = "distilbert-base-cased"
output_dir = "ai-generated-essay-detection-distilbert"

In [None]:
df = pd.read_csv("Training_Essay_Data.csv", encoding='latin-1')

item0 = df.shape[0]
df = df.drop_duplicates()
item1 = df.shape[0]
# print(f"There are {item0-item1} duplicates found in the dataset")

In [None]:
df.head()

In [None]:
df = df.rename(columns={'generated': 'label', 'text': 'title'})

def change_label(x):
    if x:
        return 'AI-generated'
    else:
        return 'Not AI-generated'
df['label'] = df['label'].apply(change_label)

df = df[['label', 'title']]
df = df[~df['title'].isnull()]
df = df[~df['label'].isnull()]

print(df.shape)
df.sample(5).T

In [None]:
from sklearn.utils.class_weight import compute_class_weight

classes = np.unique(df[['label']])

print(classes)

weights = compute_class_weight(class_weight='balanced',classes=classes, y=df['label']) 
class_weights = dict(zip(classes, weights))

print(class_weights)

In [None]:
len(df[df['label']=='AI-generated'])

In [None]:
len(df[df['label']=='Not AI-generated'])

In [None]:
labels_list = list(df['label'].unique())
label2id, id2label = dict(), dict()

for i, label in enumerate(labels_list):
    label2id[label] = i
    id2label[i] = label

print("Mapping of IDs to Labels:", id2label, '\n')
print("Mapping of Labels to IDs:", label2id)

In [None]:
ordered_weigths = [class_weights[x] for x in id2label.values()]
ordered_weigths

In [None]:
dataset = Dataset.from_pandas(df)

In [None]:
df.head()

In [None]:
dataset

In [None]:
ClassLabels = ClassLabel(num_classes=len(labels_list), names=labels_list)

def map_label2id(example):
    example['label'] = ClassLabels.str2int(example['label'])
    return example

dataset = dataset.map(map_label2id, batched=True)
dataset = dataset.cast_column('label', ClassLabels)

# dataset = dataset.class_encode_column('label') # This line can also be used instead of 
# using all above lines for converting to classlabel

dataset = dataset.train_test_split(test_size=1-train_fraction, shuffle=True) #, stratify_by_column="label"

df_train = dataset['train']
df_test = dataset['test']

In [None]:
df_train['label'][:5]

In [None]:
del df
gc.collect()

In [None]:
tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL, model_max_length=512, use_fast=True, low_cpu_mem_usage=False)

In [None]:
def preprocess_function(examples):
    return tokenizer(examples["title"], truncation=True) #, max_length=516, padding=True, truncation=True, add_special_tokens = True

df_train = df_train.map(preprocess_function, batched=True)
df_test = df_test.map(preprocess_function, batched=True)

In [None]:
df_train.features

In [None]:
df_train['input_ids'][0][:10]

In [None]:
df_train = df_train.remove_columns(['title'])
df_test = df_test.remove_columns(['title'])

In [None]:
df_train

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
tokenizer.decode(df_train[1]['input_ids'])

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    BERT_MODEL, num_labels=len(labels_list),
    output_attentions=False,
    output_hidden_states=False
)

model.config.id2label = id2label
model.config.label2id = label2id

print(model.num_parameters(only_trainable=True) / 1e6)

In [None]:
metric = load_metric("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = metric.compute(predictions=predictions, references=labels)
    return accuracy

In [None]:
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = torch.nn.CrossEntropyLoss(weight=torch.tensor(ordered_weigths, device=model.module.device).float())
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
training_args = TrainingArguments(
    output_dir=output_dir,
    logging_dir='./logs',
    num_train_epochs=num_train_epochs,
    per_device_train_batch_size=train_batch_size,
    per_device_eval_batch_size=eval_batch_size,
    logging_strategy='steps',
    logging_first_step=True,
    load_best_model_at_end=True,
    logging_steps=1,
    learning_rate=learning_rate,
    evaluation_strategy='epoch',
    warmup_steps=warmup_steps,
    weight_decay=weight_decay,
    eval_steps=1,
    save_strategy='epoch',
    save_total_limit=1,
#     report_to="mlflow",
)

trainer = WeightedTrainer(
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=df_train,
    eval_dataset=df_test,
    data_collator=data_collator
)

In [None]:
trainer.evaluate()

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
df_test

In [None]:
outputs = trainer.predict(df_test)
print(outputs.metrics)

In [None]:
outputs

In [None]:
y_true = outputs.label_ids
y_pred = outputs.predictions.argmax(1)

def plot_confusion_matrix(cm, classes, title='Confusion Matrix', cmap=plt.cm.Blues, figsize=(10, 8), is_norm=True):
    """
    Parameters:
        cm (array-like): Confusion matrix as returned by sklearn.metrics.confusion_matrix.
        classes (list): List of class names, e.g., ['Class 0', 'Class 1'].
        title (str): Title for the plot.
        cmap (matplotlib colormap): Colormap for the plot.
    """
    plt.figure(figsize=figsize)    
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)
    
    if is_norm:
        fmt = '.3f'
    else:
        fmt = '.0f'
    # Add text annotations to the plot indicating the values in the cells
    thresh = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")

    # Label the axes
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    # Ensure the plot layout is tight
    plt.tight_layout()
    # Display the plot
    plt.show()

# Calculate accuracy and F1 score
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='macro')

# Display accuracy and F1 score
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")

# Get the confusion matrix if there are a relatively small number of labels
if len(labels_list) <= 120:
    # Compute the confusion matrix
    cm = confusion_matrix(y_true, y_pred, normalize='true')

    # Plot the confusion matrix using the defined function
    plot_confusion_matrix(cm, labels_list, figsize=(8, 6))

# Finally, display classification report
print()
print("Classification report:")
print()
print(classification_report(y_true, y_pred, target_names=labels_list, digits=4))

In [None]:
trainer.save_model()

In [None]:
tokenizer.save_vocabulary(save_directory=f"./{output_dir}")

In [None]:
pipe = pipeline("text-classification", output_dir, tokenizer=BERT_MODEL)
sample_title = '''Elon Musk buys Twitter'''
pipe(sample_title, top_k=10)

In [None]:
train = pd.read_csv('train_essays.csv')
test = pd.read_csv('test_essays.csv')

In [None]:
pipe(train.loc[0, 'text'][:1000])

In [None]:
for i in tqdm(range(50)): #train.shape[0]
    text = train.loc[i, 'text']
#     print(len(text))
    cum_score = []
    for j in range(int(len(text)/1000)+1):
#         print(int(len(text)/1000)+1)
        sample_text = text[j*1000:(j+1)*1000]
        out = pipe(sample_text)[0]
        if out['label'] == 'Not AI-generated':
            score = 1 - out['score']
        else:
            score = out['score']
        cum_score.append(score)

#     if np.average(cum_score)>0.50 and any([x<0.45 for x in cum_score]):
#         print(i, np.average(cum_score), cum_score)

In [None]:
preds_test = []
for i in tqdm(range(test.shape[0])): #test.shape[0]
    text = test.loc[i, 'text']
    cum_score = []
    for j in range(int(len(text)/1000)+1):
        sample_text = text[j*1000:(j+1)*1000]
        out = pipe(sample_text)[0]
        if out['label'] == 'Human':
            score = 1 - out['score']
        else:
            score = out['score']
        cum_score.append(score)
        
    preds_test.append(np.average(cum_score))

In [None]:
from sklearn import preprocessing
import numpy as np

x_array = np.array(preds_test)

normalized_arr = preprocessing.normalize([x_array])
norm_preds_test = normalized_arr[0]

In [None]:
pd.DataFrame({'id':test["id"],'generated':preds_test}).to_csv('submission.csv', index=False)