<a href="https://colab.research.google.com/github/dgambone3/M.S.-Capstone/blob/main/M_S_capstone.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Detecting Student Written vs AI Generated Essays

### Can we detect if an essay was written by a student or generated by LLM?

# Setup and import data

In [None]:
! pip install -q kaggle
! pip install pyspellchecker
! pip install umap-learn

In [None]:
from google.colab import files

files.upload()

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
# ! kaggle datasets list

In [None]:
!kaggle datasets download -d thedrcat/daigt-v2-train-dataset

In [None]:
! unzip daigt-v2-train-dataset.zip -d /content

In [None]:
! nvidia-smi

In [None]:
import torch
torch.cuda.is_available()
# torch.cuda.device_count()
# torch.cuda.current_device()
# torch.cuda.device(0)
# torch.cuda.get_device_name(0)

# EDA

In [None]:
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer

import warnings
warnings.filterwarnings("ignore")

import torch

In [None]:
data = pd.read_csv("train_v2_drcat_02.csv")
data.head()

In [None]:
print(data.shape)

In [None]:
label_counts = data['label'].value_counts()
print(label_counts)

In [None]:
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]

In [None]:
def check_unique(label_0_text, label_1_text, type):
  # set of words in student essays
  student_set_checked = set()
  for row in label_0_text:
    for word in row.split(" "):
      student_set_checked.add(word)


  # set of words in ai essays
  ai_set_checked = set()
  for row in label_1_text:
    for word in row.split(" "):
      ai_set_checked.add(word)

  # words unique to only student essays (not included in ai)
  unique_student_set_checked = student_set_checked - ai_set_checked

  # words unique to only ai essays (exclude student words)
  unique_ai_set_checked = ai_set_checked - student_set_checked

  print(f"Set unique student words {type}: ",len(unique_student_set_checked))
  print(f"Set unique ai words {type}: ",len(unique_ai_set_checked))



check_unique(label_0['text'], label_1['text'], 'not preprocessed')

## Basic text cleaning

In [None]:
# basic text preprocessing
data['text'] = data['text'].str.replace('\xa0', ' ')
data['text'] = data['text'].str.replace('&', 'and')
data['text'] = data['text'].str.replace('-', ' ')
data['text'] = data['text'].str.replace('\n', ' ')
data['text'] = data['text'].str.replace('\r', ' ')

data['text'] = data['text'].apply(lambda x: re.sub(r'[^a-zA-Z\s]', '', x).lower())
# data.head()

In [None]:
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]

check_unique(label_0['text'], label_1['text'], 'preprocessed')

**Student essays have more unique words and larger vocabulary (10x)**

In [None]:
data_temp = pd.read_csv("train_v2_drcat_02.csv")

ax = data_temp.label.value_counts().plot(kind='bar')

ax.patches[0].set_facecolor('blue')  # Set color for label 0
ax.patches[1].set_facecolor('green') # Set color for label 1

plt.title("Label Distribution - Unbalanced")
plt.xlabel('Label')
plt.ylabel('Counts')
ax.set_xticklabels(['Student', 'AI'])
plt.xticks(rotation=0)

plt.show()

## boxplots


In [None]:
def calculate_length(text):
    return len(text.split(" "))
data['len'] = data['text'].apply(lambda x: calculate_length(x))
data.len.describe()

In [None]:
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]
data_to_plot_0 = label_0['len']
data_to_plot_1 = label_1['len']

# boxplot for label student
plt.boxplot(data_to_plot_0, positions=[1], widths=0.6, patch_artist=True, boxprops=dict(facecolor='blue'))

# boxplot for ai
plt.boxplot(data_to_plot_1, positions=[2], widths=0.6, patch_artist=True, boxprops=dict(facecolor='green'))

plt.xlabel('Label')
plt.ylabel('Length')
plt.title('Boxplots Before Removing Outliers')
plt.xticks([1, 2], ['Student', 'AI'])
plt.show()

### remove outlier lengths

In [None]:
data2 = data[data['len'] <= 500]
data2 = data2[data2['len'] >= 200]
data2.shape

In [None]:
data=data2

In [None]:
data.shape

In [None]:
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]
data_to_plot_0 = label_0['len']
data_to_plot_1 = label_1['len']

# boxplot for label student
plt.boxplot(data_to_plot_0, positions=[1], widths=0.6, patch_artist=True, boxprops=dict(facecolor='blue'))

# boxplot for ai
plt.boxplot(data_to_plot_1, positions=[2], widths=0.6, patch_artist=True, boxprops=dict(facecolor='green'))

plt.xlabel('Label')
plt.ylabel('Length')
plt.title('Boxplots After Removing Outliers')
plt.xticks([1, 2], ['Student', 'AI'])
plt.show()

## label distribution

In [None]:
# Plotting
ax = data.label.value_counts().plot(kind='bar')

# Customizing bar colors
ax.patches[0].set_facecolor('blue')  # Set color for label 0
ax.patches[1].set_facecolor('green') # Set color for label 1

# Adding labels and title
plt.title("Label Distribution - Unbalanced")
plt.xlabel('Label')
plt.ylabel('Counts')
ax.set_xticklabels(['Student', 'AI'])
plt.xticks(rotation=0)

# Show plot
plt.show()


### Downsample student essays to balance dataset

In [None]:
# separate into 2 dfs based on label (cleaned)
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]

count_label_0 = len(label_0)
count_label_1 = len(label_1)

if count_label_0 > count_label_1:
  label_0_downsampled = label_0.sample(n=count_label_1, random_state=42)
  df_balanced = pd.concat([label_0_downsampled, label_1], axis=0)
else:
  df_balanced = pd.concat([label_0, label_1], axis=0)

data = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

In [None]:
print('student essays: ', count_label_0)
print('ai essays: ', count_label_1)

In [None]:
ax = data.label.value_counts().plot(kind='bar')

ax.patches[0].set_facecolor('blue')  # Set color for label 0
ax.patches[1].set_facecolor('green') # Set color for label 1

plt.title("Label Distribution - Balanced")
plt.xlabel('Label')
plt.ylabel('Counts')
ax.set_xticklabels(['Student', 'AI'])
plt.xticks(rotation=0)

plt.show()


In [None]:
# double check balanced
label_counts = data['label'].value_counts()
print(label_counts)

## Calculate average essay length per label



In [None]:
label_0['len'] = label_0['text'].apply(lambda x: calculate_length(x))
label_1['len'] = label_1['text'].apply(lambda x: calculate_length(x))

print('avg len student essays: ', round(np.mean(label_0.len),2))
print('avg len ai essays: ', round(np.mean(label_1.len),2))

**Student essays are longer on average**

In [None]:
# group by 'source' and count occurrences of each 'label'
grouped_data = data.groupby(['source', 'label']).size().unstack(fill_value=0)

fig, ax = plt.subplots(figsize=(20, 6))
grouped_data.plot(kind='bar', ax=ax, color=['blue', 'green'])

plt.xlabel('Source')
plt.ylabel('Label Counts')
plt.title('Label Counts by Source')
ax.set_xticklabels(grouped_data.index, rotation=45)

# adjust legend labels
legend_labels = {0: 'Student', 1: 'AI'}
plt.legend(title='Label', labels=[legend_labels[label] for label in grouped_data.columns])
plt.show()


## UMAP

In [None]:
data['prompt_name'].unique()

### Student essays

In [None]:
import umap
# filter only student essays
student_data = data[data['label'] == 0]

# tfidf matrix
student_vectorizer = TfidfVectorizer(stop_words='english',
                                  max_features=100,
                                  ngram_range=(1, 1))
tfidf_features_student = student_vectorizer.fit_transform(student_data['text'])

In [None]:
# umap to reduce dimensionality
umap_reducer_student = umap.UMAP(random_state=42)
umap_embedding_student = umap_reducer_student.fit_transform(tfidf_features_student)

In [None]:
# plot umap - prompt
for prompt_name in student_data['prompt_name'].unique():
    plt.scatter(umap_embedding_student[student_data['prompt_name'] == prompt_name][:, 0],
                umap_embedding_student[student_data['prompt_name'] == prompt_name][:, 1],
                label=prompt_name, marker='.', linewidths=0.0000000000001)

plt.title('UMAP of student essays by prompt name')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(title='Prompt Name', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

In [None]:
# plot umap - label
for label in student_data['label'].unique():
    plt.scatter(umap_embedding_student[student_data['label'] == label][:, 0],
                umap_embedding_student[student_data['label'] == label][:, 1],
                label=label, marker='.', linewidths=0.0000000000001)

plt.title('UMAP of student essays by label')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(title='Prompt Name', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

### ai essays

In [None]:
# filter for only ai essays
ai_data = data[data['label'] == 1]

# tf-idf
ai_vectorizer = TfidfVectorizer(stop_words='english',
                                      max_features=100,
                                      ngram_range=(1, 1))
tfidf_features_ai = ai_vectorizer.fit_transform(ai_data['text'])

In [None]:
# umap to reduce dimensionality
umap_reducer_ai = umap.UMAP(random_state=42)
umap_embedding_ai = umap_reducer_ai.fit_transform(tfidf_features_ai)

In [None]:
# plot umap - prompt
for prompt_name in ai_data['prompt_name'].unique():
    plt.scatter(umap_embedding_ai[ai_data['prompt_name'] == prompt_name][:, 0],
                umap_embedding_ai[ai_data['prompt_name'] == prompt_name][:, 1],
                label=prompt_name, marker='.', linewidths=0.0000000000001)

plt.title('UMAP of ai essays by prompt name')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(title='Prompt Name', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

In [None]:
# plot umap - label
for label in ai_data['label'].unique():
    plt.scatter(umap_embedding_ai[ai_data['label'] == label][:, 0],
                umap_embedding_ai[ai_data['label'] == label][:, 1],
                label=label, marker='.', linewidths=0.0000000000001)

plt.title('UMAP of ai essays by label')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(title='Prompt Name', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

### whole dataset

In [None]:
# tfidf
vectorizer_all = TfidfVectorizer(stop_words='english',
                                  max_features=100,
                                  ngram_range=(1, 1))
tfidf_features_all = vectorizer_all.fit_transform(data['text'])

In [None]:
# umap to reduce dimensionality
umap_reducer_all = umap.UMAP(random_state=42)
umap_embedding_all = umap_reducer_all.fit_transform(tfidf_features_all)

In [None]:
# plot umap - prompt
for prompt_name in data['prompt_name'].unique():
    plt.scatter(umap_embedding_all[data['prompt_name'] == prompt_name][:, 0],
                umap_embedding_all[data['prompt_name'] == prompt_name][:, 1],
                label=prompt_name, marker='.', linewidths=0.0000000000001)

plt.title('UMAP Visualization all essays by prompt')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

In [None]:
# plot umap - label
for label in data['label'].unique():
    plt.scatter(umap_embedding_all[data['label'] == label][:, 0],
                umap_embedding_all[data['label'] == label][:, 1],
                label=label, marker='.', linewidths=0.000000000000000000001)

plt.title('UMAP Visualization all essays by Label')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

## Apply spell checker

In [None]:
get_vocab = CountVectorizer(stop_words='english',
                            lowercase=True,
                            strip_accents="ascii"
                            )

get_vocab.fit_transform(data['text'])
vocab_dict = get_vocab.vocabulary_

In [None]:
len(vocab_dict)

In [None]:
vocab = list(vocab_dict.keys())
len(vocab)

In [None]:
from spellchecker import SpellChecker
from tqdm import tqdm
tqdm.pandas()

spell = SpellChecker()
# correction_counts = {}

def spell_check(vocabulary):
  # corrected_words = {}
  with tqdm(total=len(vocab), desc="Spell Checking") as pbar:
    for word in vocabulary:
      if word in typos:
        continue
      checked = spell.correction(word)
      if checked is not None and checked != word and "'" not in checked and checked not in list(typos.keys()):
        typos[" " + word + " "] = " " + checked + " "
      pbar.update(1)
  return typos

In [None]:
# add words to be recognized by spell checker based on initial spell check results
spell.word_frequency.load_words(['microsoft', 'apple', 'google', 'theres', 'california', 'nasa', 'theyre','ohio',
                                 'todays', 'thats', 'im', 'american','theyre', 'venus', 'texting','europe','ive',
                                 'wasnt','clinton','thomas','wyoming','donald', 'youre', 'paris', 'america', 'bogota',
                                 'americans', 'mona', 'lisa','richard','hillary', 'luke', 'apps', 'obama', 'whos', 'florida',
                                 'romney','nasas', 'texas', 'george', 'americas', 'partys', 'itll', 'carlos', 'nixon',
                                 'andrew','andrews', 'lifes', 'extracurriculars'])

In [None]:
# create dictionary with {typo:correct} for all spell checked words in top 25000 features
typos = spell_check(vocab)

In [None]:
def apply_spell_check(text, typos):
  text_corr = ""
  for word in typos.keys():
    if word in text:
      corrected_word = typos[word]
      text = text.replace(word, corrected_word)
      if corrected_word not in typo_count:
        typo_count[corrected_word] = 1
      else:
        typo_count[corrected_word] += 1
  return text

# temp = 'testing becuse is spell checked'
# print(apply_spell_check(temp, typos))
# print(typo_count)

In [None]:
# replace typos identified in top 25,000 words with their corrected value from dict
typo_count = {}
data['text_spell_checked'] = data['text'].progress_apply(lambda x : apply_spell_check(x, typos))

In [None]:
# create dictionary of top words corrected
top_typos = {k: abs(v) for k, v in sorted(typo_count.items(), key=lambda item: abs(item[1]), reverse=True)}
top_typos = dict(list(top_typos.items()))
len(top_typos)

In [None]:
def get_key_by_value(dictionary, search_value):
    for key, value in dictionary.items():
        if value == search_value:
            return key
    return 0

In [None]:
top_corrected_dict = {}
for key in top_typos.keys():
  top_corrected_dict[key] = get_key_by_value(typos, key)

# top_corrected_dict # correct : typo

In [None]:
del top_typos[' do ']
del top_typos[' their ']
del top_typos[' were ']
del top_typos[' else ']
del top_typos[' this ']
del top_typos[' the ']

del top_corrected_dict[' the ']
del top_corrected_dict[' do ']
del top_corrected_dict[' their ']
del top_corrected_dict[' were ']
del top_corrected_dict[' else ']
del top_corrected_dict[' this ']

In [None]:
typo_df = pd.DataFrame({'corrected word': top_corrected_dict.keys(),
                        'misspelled': top_corrected_dict.values(),
                        'times corrected': top_typos.values()})

In [None]:
# combined = [str(key.strip()) + " : " + str(value.strip()) for key, value in top_corrected_dict.items()]
# combined = dict(zip(combined, top_typos.values()))

In [None]:
# plt.figure(figsize=(20, 6))
# plt.bar(list(combined.keys())[:30], list(combined.values())[:30])
# plt.xlabel('Typos and their Corrections')
# plt.ylabel('times corrected')
# plt.xticks(rotation=75)
# plt.show()

In [None]:
# check unique words after spell checking
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]

check_unique(label_0['text'], label_1['text'], 'not spell checked')
print('........................................')
check_unique(label_0['text_spell_checked'], label_1['text_spell_checked'], 'spell checked')

## Visualize top (most common) words


In [None]:
essays_text = data['text']
essays_check = data['text_spell_checked']
labels = data['label']

In [None]:
def visualize_top_words(essays, classes, type):
  count = CountVectorizer(stop_words='english',
                          lowercase=True,
                          strip_accents="ascii",
                          max_features=3000
                          )
  X = essays
  y = classes
  X = count.fit_transform(essays)

  feature_names = count.get_feature_names_out()
  word_counts = dict(zip(feature_names, X.sum(axis=0).A1))
  sorted_vocab = dict(sorted(word_counts.items(), key=lambda item: item[1], reverse=True))

  # extract the top 50 items
  top_50 = dict(list(sorted_vocab.items())[:50])

  plt.figure(figsize=(20, 6))
  plt.bar(top_50.keys(), top_50.values())
  plt.xlabel('Words')
  plt.ylabel('Count')
  plt.title(f'Top 50 Words by Count - {type}')
  plt.xticks(rotation=45)
  plt.yticks(range(0, 80001, 10000))
  plt.show()

  return X, y, feature_names, top_50


X_main, y_main, feature_names_count, top_50_words = visualize_top_words(essays_text, labels, 'not spell checked')
X_check, y_check, feature_names_count_check, top_50_words_check = visualize_top_words(essays_check, labels, 'spell checked')

In [None]:
pd.DataFrame(list(top_50_words.items()), columns=['word', 'count'])

In [None]:
pd.DataFrame(list(top_50_words_check.items()), columns=['word', 'count'])

## Create dataframe to hold unique words / vocabulary to visualize word distributions by label (student or AI)

In [None]:
def create_word_df(X, y, feature_names):
  # combine data for both labels
  word_scores_combined = []
  for label in [0, 1]:
      label_indices = (y == label)
      label_X = X[label_indices]
      label_word_scores = label_X.sum(axis=0)
      word_scores_combined.append(label_word_scores)

  # convert combined label scores to array
  word_scores_combined = np.array(word_scores_combined).squeeze()

  # creating a dataframe with words and their scores
  word_df = pd.DataFrame({'word': feature_names,
                          'score_label_0': word_scores_combined[0],
                          'score_label_1': word_scores_combined[1]})
  totals = []
  for i in range(word_df.shape[0]):
    totals.append(word_df.score_label_0[i] + word_df.score_label_1[i])

  word_df['totals'] = totals
  word_df=word_df.sort_values(by='totals', ascending=False)
  return word_df



word_df_main = create_word_df(X_main, y_main, feature_names_count)
word_df_check = create_word_df(X_check, y_check, feature_names_count_check)

### Plot top words split by label, spell checked and not spell checked

In [None]:
def plot_words_split_label(word_df, type):
  short_word_df = word_df[:50]

  df = short_word_df.sort_values(by='totals', ascending=False)

  plt.figure(figsize=(20, 6))
  # plot bars for score_label_0
  plt.bar(df['word'], df['score_label_0'], label='student', color='blue')
  # plot bars for score_label_1 on top of the previous ones
  plt.bar(df['word'], df['score_label_1'], bottom=df['score_label_0'], label='ai', color='green')

  plt.xlabel('Word')
  plt.ylabel('Counts')
  plt.title(f'Top {len(short_word_df)} Most Common Words for Both Labels - {type}')
  plt.legend()
  plt.xticks(rotation=45)
  plt.show()

plot_words_split_label(word_df_main, 'not spell checked')
plot_words_split_label(word_df_check, 'not spell checked')

# ML Models

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import BernoulliNB
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, roc_auc_score, ConfusionMatrixDisplay
from sklearn.metrics import mean_squared_error as MSE

## Output result methods

In [None]:
def get_learning_curve(model, X, y):
  scores=[]
  perc=[]
  for n in range(1,101):
    perc.append(n)
    XX = X[0:int(X.shape[0] * (n/100))] #df
    yy = y[0:int(len(y) * (n/100))] #list
    pred = model.predict(XX)
    score = MSE(y_true=yy, y_pred=pred)
    scores.append(score)
  return pd.DataFrame({'percent':perc, 'scores':scores})

In [None]:
def plot_learning_curves(model, name, X_train, y_train, X_test, y_test):
  model.fit(X_train, y_train)

  pred = model.predict(X_test)

  # calculate test error on test set
  test_error = MSE(y_test, pred)

  # get training and testing scores by calling learning curve function
  train_scores = get_learning_curve(model, X_train, y_train)
  test_scores = get_learning_curve(model, X_test, y_test)

  # plot train
  plt.plot(train_scores['percent'],
          train_scores['scores'],
          label = "train")

  # plot test
  plt.plot(test_scores['percent'],
          test_scores['scores'],
          label = "test")

  plt.xlabel('Sample Size (%)')
  plt.ylabel('Error')
  plt.title(f'Learning Curve for {name}')
  plt.legend()
  plt.show()
  return pred

In [None]:
class_labels = {0 : 'student', 1 : "ai"}
def plot_confusion_matrix(model, y_true, y_pred):
  cm = confusion_matrix(y_true, y_pred)
  disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=[class_labels[i] for i in model.classes_])
  disp.plot()
  plt.show()

## Train models

In [None]:
# data[['text', 'text_spell_checked','label']]

In [None]:
essays_text = data['text']
essays_check = data['text_spell_checked']
labels = data['label']

In [None]:
def models(essays, label, type):
  tfidf = TfidfVectorizer(stop_words='english',
                        lowercase=True,
                        # max_features=5000,
                        strip_accents="ascii"
                        )
  X = essays
  y = label
  X = tfidf.fit_transform(essays)

  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

  nb = BernoulliNB()
  nb_pred = plot_learning_curves(nb, f"Naive Bayes - {type}", X_train, y_train, X_test, y_test)
  plot_confusion_matrix(nb, y_test, nb_pred)

  print(classification_report(y_test, nb_pred))
  print(f'Accuracy: {np.round(accuracy_score(nb_pred, y_test) * 100, 2)}%')
  print(f'Area under ROC curve: {np.round(roc_auc_score(nb_pred, y_test) * 100, 2)}%')


  logreg = LogisticRegression(solver='sag')
  logreg_pred = plot_learning_curves(logreg, f"Logistic Regression - {type}", X_train, y_train, X_test, y_test)
  plot_confusion_matrix(logreg, y_test, logreg_pred)
  print(f'Accuracy: {np.round(accuracy_score(logreg_pred, y_test) * 100, 2)}%')
  print(f'Area under ROC curve: {np.round(roc_auc_score(logreg_pred, y_test) * 100, 2)}%')
  print(classification_report(y_test, logreg_pred))

  coefficients = logreg.coef_[0]

  # get feature names (words)
  feature_names = np.array(tfidf.get_feature_names_out())

  return coefficients, feature_names


coeff, feature_names_tfidf = models(essays_text, labels, 'not spell checked')
coeff_check, feature_names_tfidf_check = models(essays_check, labels, 'spell checked')

### Visualize model weights for Logistic Regression to analyze what features may be impacting the model most or have the highest weights

In [None]:
def viz_weights(feat_name, coeffs, type):
  # zip feature names and coefficients to analyze
  feature_coefficient = dict(zip(feat_name, coeffs))

  # sort df by weights to get top word features
  sorted_feature_coefficients = {k: v for k, v in sorted(feature_coefficient.items(), key=lambda item: abs(item[1]), reverse=True)}
  weighted_words = list(sorted_feature_coefficients.keys())[:30]

  coefficients = list(sorted_feature_coefficients.values())[:30] # top 30 weighted words
  abs_weights = [abs(ele) for ele in coefficients] # already abs

  plt.figure(figsize=(15,4))
  plt.plot(weighted_words, abs_weights)
  plt.xticks(rotation=45)
  plt.xlabel('Word')
  plt.ylabel('Weight (absolute value)')
  plt.title(f'Logistic Regression Weights - {type}')
  plt.show()
  return weighted_words, abs_weights

word_wts_main, abs_wts_main = viz_weights(feature_names_tfidf, coeff, 'not spell checked')
word_wts_check, abs_wts_check = viz_weights(feature_names_tfidf_check, coeff_check, 'spell checked')

In [None]:
def viz_weights(feat_name, coeffs, type):
    feature_coefficient = dict(zip(feat_name, coeffs))
    sorted_feature_coefficients = {k: v for k, v in sorted(feature_coefficient.items(), key=lambda item: abs(item[1]), reverse=True)}
    weighted_words = list(sorted_feature_coefficients.keys())[:30]
    coefficients = list(sorted_feature_coefficients.values())[:30]
    abs_weights = [abs(ele) for ele in coefficients]

    return weighted_words, abs_weights

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8))

word_wts_main, abs_wts_main = viz_weights(feature_names_tfidf, coeff, 'not spell checked')
word_wts_check, abs_wts_check = viz_weights(feature_names_tfidf_check, coeff_check, 'spell checked')

ax1.plot(word_wts_main, abs_wts_main, color='blue', label='not spell checked')
ax1.set_xticklabels(word_wts_main, rotation=45)
ax1.set_ylabel('Weight (absolute value)')
ax1.set_title('Logistic Regression Weights')
ax1.legend()

ax2.plot(word_wts_check, abs_wts_check, color='green', label='spell checked')
ax2.set_xticklabels(word_wts_check, rotation=45)
ax2.set_xlabel('Word')
ax2.set_ylabel('Weight (absolute value)')
ax2.legend()

plt.tight_layout()
plt.show()


In [None]:
word_weight_main = dict(zip(word_wts_main, abs_wts_main))
word_weight_check = dict(zip(word_wts_check, abs_wts_check))

weight_df_main = pd.DataFrame.from_dict(word_weight_main, orient='index')
weight_df_main = weight_df_main.rename_axis('word').reset_index().rename(columns={0: "weight_main"})

weight_df_check = pd.DataFrame.from_dict(word_weight_check, orient='index')
weight_df_check = weight_df_check.rename_axis('word').reset_index().rename(columns={0: "weight_check"})

weight_df = pd.merge(weight_df_main, weight_df_check, on='word',how='outer')

plt.figure(figsize=(15,4))
plt.xticks(rotation=45)
plt.xlabel('Word')
plt.ylabel('Weight (absolute value)')
plt.title(f'Logistic Regression Weights Overlay Same Order as not spell checked')
plt.plot(weight_df['word'], weight_df['weight_main'], label='not spell checked', color='blue')
plt.plot(weight_df['word'], weight_df['weight_check'], label='spell checked', color='green')
plt.legend()
plt.show()
# weight_df

## Create dataframe to store weights to view words grouped by label (student or AI)


In [None]:
typo_

In [None]:
def viz_merged_label(weight_df, word_df, col, type):
  merged_df = pd.merge(weight_df, word_df, on='word', how='left')
  fig, ax1 = plt.subplots(figsize=(20, 6))

  # plot bars for score_label_0 and score_label_1
  ax1.bar(merged_df['word'], merged_df['score_label_0'], label='student', color='blue')
  ax1.bar(merged_df['word'], merged_df['score_label_1'], bottom=merged_df['score_label_0'], label='ai', color='green')
  plt.xticks(rotation=45)
  # twin axis
  ax2 = ax1.twinx()

  # plot the line on the second axis
  ax2.plot(weight_df['word'], weight_df[col], color='red', label=type)

  ax1.set_xlabel('Word')
  ax1.set_ylabel('Counts')
  ax2.set_ylabel('Weights')
  ax1.set_title(f'Distribution of Top Weighted Words based on Label and logreg model - {type}')

  # combine legends from both axes
  lines, labels = ax1.get_legend_handles_labels()
  lines2, labels2 = ax2.get_legend_handles_labels()
  ax2.legend(lines + lines2, labels + labels2, loc='upper right')


  plt.show()

weight_df['weight_main'] = weight_df['weight_main'].fillna(0.0)
weight_df['weight_check'] = weight_df['weight_check'].fillna(0.0)

viz_merged_label(weight_df[weight_df['weight_main'] > 0], word_df_main, 'weight_main', 'not spell checked')
viz_merged_label(weight_df[weight_df['weight_check'] > 0], word_df_check, 'weight_check', 'spell checked')

In [None]:
label_0 = data[data['label'] == 0]
label_1 = data[data['label'] == 1]


check_unique(label_0['text'], label_1['text'], 'not spell checked')
print('........................................')
check_unique(label_0['text_spell_checked'], label_1['text_spell_checked'], 'spell checked')

In [None]:
temp_typo_df=typo_df[['corrected word', 'times corrected']]
temp_typo_df.rename(columns={'corrected word':'word'},inplace=True)

# BERT

In [None]:
! pip install transformers datasets evaluate accelerate

In [None]:
from huggingface_hub import notebook_login
import torch
from transformers import DataCollatorWithPadding
import evaluate
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
from transformers import AutoTokenizer
from datasets import Dataset
from sklearn.model_selection import train_test_split


notebook_login()


downsampling dataset for testing

In [None]:
# percent = 5
# data = data.sample(frac=(percent/100))
# print(data.shape)

In [None]:
data_simple = data[['text','label']].copy()
# data_simple.head()

train_df, test_df = train_test_split(data_simple, test_size=0.2, random_state=42)

train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)


In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert/distilbert-base-uncased")
max_length = tokenizer.model_max_length  # Get the maximum sequence length supported by the tokenizer


def preprocess_function(examples):
    return tokenizer(
        examples['text'],
        truncation=True,
        padding=True,  # Enable dynamic padding
        return_tensors="pt",  # Return PyTorch tensors
        max_length=512 # otherwise shape too large for distilbert model
    )

In [None]:
train_dataset = train_dataset.map(preprocess_function, batched=True)
test_dataset = test_dataset.map(preprocess_function, batched=True)

train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

In [None]:
id2label = {0: "student", 1: "ai"}
label2id = {"student": 0, "ai": 1}

model = AutoModelForSequenceClassification.from_pretrained(
    "distilbert/distilbert-base-uncased", num_labels=2, id2label=id2label, label2id=label2id)

In [None]:
from transformers import TrainerCallback

class LoggingCallback(TrainerCallback):
    def __init__(self):
        super().__init__()

    def on_log(self, args, state, control, logs=None, **kwargs):
        # Log training loss and evaluation metrics
        if state.is_world_process_zero and logs is not None:
            print(f"Training Loss: {logs.get('loss')}, Eval Loss: {logs.get('eval_loss')}, Eval Accuracy: {logs.get('eval_accuracy')}")


In [None]:
training_args = TrainingArguments(
    output_dir="capstone_model_small",
    learning_rate=2e-5,
    per_device_train_batch_size=2, #16
    per_device_eval_batch_size=2, #16
    num_train_epochs=10, # 2
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[TrainerCallback()],
)

In [None]:
# torch.cuda.empty_cache()

# !pip install numba

# from numba import cuda
# device = cuda.get_current_device()
# device.reset()


In [None]:
torch.cuda.is_available()

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()

## Inference

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from transformers import AutoModel

In [None]:
predictions = trainer.predict(test_dataset)
predicted_labels = np.argmax(predictions.predictions, axis=1)
true_labels = test_dataset["label"]

In [None]:
accuracy = accuracy_score(true_labels, predicted_labels)
print("Accuracy:", accuracy)

In [None]:
predicted_probabilities = predictions.predictions[:, 1]  # Assuming the positive class is index 1
auc = roc_auc_score(true_labels, predicted_probabilities)
print("AUC:", auc)

In [None]:
train_logs = trainer.state.log_history
# print(type(train_logs))

# Convert the list of dictionaries into a DataFrame
df = pd.DataFrame(train_logs)
# print(df.head())

loss_df = df[df['loss'].notna()]
# print(len(loss_df))

# Plot the loss
plt.plot(loss_df['epoch'], loss_df['loss'], label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.show()

In [None]:
from transformers import pipeline
car_free_cities_prompt = """Write an explanatory essay to inform fellow citizens
                          about the advantages of limiting car usage. Your essay
                          must be based on ideas and information that can be found
                          in the passage set. Manage your time carefully so that
                          you can read the passages; plan your response; write your
                          response; and revise and edit your response. Be sure to use
                          evidence from multiple sources; and avoid overly relying on
                          one source. Your response should be in the form of a multiparagraph
                          essay. Write your essay in the space provided."""

gpt_essay = """It's super important to think about using cars less because it can help our
planet and make things better for everyone. When we use cars less, it's like giving a big
high-five to the environment, our wallets, and even our communities! First off, using cars
less can make the air cleaner. Cars let out stuff called pollutants that can make the air
dirty and not good to breathe. When we don't use cars as much, there's less of that yucky
stuff in the air, which is awesome for our health and the planet. Also, using cars less
can help with traffic. Have you ever been stuck in a big line of cars that doesn't move? That's
called traffic, and it's no fun! When we don't drive as much, there are fewer cars on the road,
which means less traffic jams. That means we can get where we need to go faster and without all
the honking and frustration. Another cool thing about using cars less is that it can save us
money. Cars cost a lot of money to buy, put gas in, and fix when they break. But if we don't
use them as much, we don't have to spend as much money on them. That means we can save money
for other fun things, like going on trips or buying snacks! And guess what? Using cars less
can even help make our communities better. When we walk or bike instead of driving everywhere,
we get to see our neighborhood up close. We might even run into friends or neighbors along the
way! Plus, walking and biking are good exercise, so it's good for our bodies too. So, it's pretty
clear that using cars less is a super smart idea. It helps keep the air clean, makes traffic less
of a headache, saves us money, and brings our communities closer together. Let's all try to use
cars a little less and make our world a better place!"""



classifier = pipeline("text-classification", model="dgambone/capstone_model")

classifier(gpt_essay)

In [None]:
# ######### TEMP ##########
# # import torch

# batch_size = 16
# num_samples = len(test_dataset["text"])
# predicted_labels = []

# for i in range(0, num_samples, batch_size):
#     batch_texts = test_dataset["text"][i:i+batch_size]
#     inputs = tokenizer(batch_texts, padding=True, truncation=True, return_tensors="pt", max_length=512)
#     with torch.no_grad():
#         outputs = model(**inputs)
#     batch_predictions = np.argmax(outputs.logits.cpu().numpy(), axis=1)
#     predicted_labels.extend(batch_predictions)
