# LLM Detect AI Generated（bert）
[Link](https://www.kaggle.com/code/sunshine888888/llm-detect-ai-generated-bert/notebook)

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#Load Data, Preprocessing
train = pd.read_csv('/kaggle/input/llm-detect-ai-generated-text/train_essays.csv')
test = pd.read_csv('/kaggle/input/llm-detect-ai-generated-text/test_essays.csv')
train_len = len(train) 

all_data = pd.concat([train,test]) 

In [None]:
train.head()

In [None]:
test.head()

In [None]:
prompts = pd.read_csv('/kaggle/input/llm-detect-ai-generated-text/train_prompts.csv')
prompts

In [None]:
train['generated'].value_counts()

In [None]:
new_data = pd.read_csv('/kaggle/input/daigt-v2-train-dataset/train_v2_drcat_02.csv')
new_data.head()
#在引入的数据库上进行训练,样本数比较多,label分布比较均衡

In [None]:
new_data.label.value_counts()

In [None]:
new_data.prompt_name.value_counts()

In [None]:
#在该数据集中筛选题目给定的promptname
title = prompts['prompt_name']
new_train = new_data.iloc[:,0:2][new_data.prompt_name.isin(title)]
new_train

In [None]:
#重新构建训练集
train_data = new_train.reset_index(drop=True)#修改索引值
train_data #0表示学生写的，1表示机器生成的

In [None]:
train_data.label.value_counts()

In [None]:
#绘制生成与否分布直方图
import matplotlib.pyplot as plt
fig,ax = plt.subplots()
distribution = train_data.label.value_counts()

distribution.plot(kind='bar',label='distribution of generated',alpha=.65)
# 添加数据标签
for i, count in enumerate(distribution):
    ax.text(i, count, str(count), ha='center', va='bottom')# ha参数控制水平对齐方式, va控制垂直对齐方式
ax.set_xticklabels(["ungenerated","generated"], rotation=0)
ax.set_ylabel('count')
plt.legend(loc='best')

plt.show()

In [None]:
import re
import string

# Cleaning Functions
def remove_tag(text):
    tag = re.compile(r'@\S+')#匹配@之后的连续字符，`\S`匹配任何非空白字符（相当于 `[^ \t\n\r\f\v]`），`+` 表示前面的模式可以重复一次或多次。
    return tag.sub(r'',text)#使用sub函数用空串替换

def remove_URL(text):
    # http:... / https:... / www... #匹配网页链接
    url = re.compile(r'https?://\S+|www\.\S+')
    return re.sub(url,'',text)

def remove_html(text):#匹配特殊符号
    # < > / ( )
    html = re.compile(r'<[^>]+>|\([^)]+\)')
    return html.sub(r'',text)

def remove_punct(text):#替换标点符号
    # ['!','"','$','%','&',"'",'(',')','*',
    # '+',',','-','.','/',':',';','<','=',
    # '>','?','@','[','\\',']','^','_','`',
    # '{','|','}','~']
    punctuations = list(string.punctuation)
    table = str.maketrans('', '', ''.join(punctuations))#maketrans空字符串替换标点符号
    return text.translate(table)

In [None]:
train_data['text'][0]

In [None]:
#找到停用词和标点符号
import nltk
nltk.download('stopwords')
nltk.download('punkt')

from nltk.corpus import stopwords
stop = set(stopwords.words('english'))

from nltk.tokenize import word_tokenize #该函数用于将文本分割成单词（tokens）

In [None]:
train_data['cleaned'] = train_data['text'].apply(lambda x:remove_tag(x))
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: remove_URL(x))
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: remove_html(x))
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: remove_punct(x))
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: x.lower()) # lowering全部变成小写字母
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: word_tokenize(x)) # split sentence into words list
# exclude stop words and make them a sentence again把停用词移除并连成一个句子
train_data['cleaned'] = train_data['cleaned'].apply(lambda x: ' '.join([word for word in x if word not in stop]))

In [None]:
train_data

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
def get_top_ngram(corpus, n=None):#统计2元高频词元出现次数
    vec = CountVectorizer(ngram_range=(n, n)).fit(corpus)
    bag_of_words = vec.transform(corpus)#这两步合并为fit_transform函数，将文本中的词语转换为词频矩阵,矩阵元素a[i][j]表示j词在第i个文本下的词频，即各个词语出现的次数,
    sum_words = bag_of_words.sum(axis=0)
    words_freq = [(word, sum_words[0, idx])
                  for word, idx in vec.vocabulary_.items()] #vocabulary_属性是一个字典，键是词语，值是词频矩阵的列索引，sum_words[0, idx]表示的是word的词频
    words_freq =sorted(words_freq, key = lambda x: x[1], reverse=True)
    return words_freq[:10]

In [None]:
#AI-generated
top_n_bigrams=get_top_ngram(train_data[train_data['label'] == 1]['cleaned'],2)[:10] #2gram
x,y=map(list,zip(*top_n_bigrams))#（*）操作符用于解包上述生成的元组，map(funciton,iterable..)将function应用于iterable的元素，并返回一个迭代器
plt.barh(x,y)

In [None]:
# Word Cloud for AI-Generated Text
from wordcloud import WordCloud
ai_generated_text = " ".join(train_data[train_data['label'] == 1]['cleaned'])
wordcloud = WordCloud(width=800, height=400, max_words=100, background_color='white').generate(ai_generated_text)

plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')#imshow在图像中显示数组数据,采用双线性插值
plt.title('Word Cloud for AI-Generated Text')
plt.axis('off')#Turn off axis lines and labels
plt.show()

In [None]:
#Student
top_n_bigrams=get_top_ngram(train_data[train_data['label'] == 0]['cleaned'],2)[:10] #2gram
x,y=map(list,zip(*top_n_bigrams)) 
plt.barh(x,y)

In [None]:
# Word Cloud for student Text
from wordcloud import WordCloud
ai_generated_text = " ".join(train_data[train_data['label'] == 0]['cleaned'])
wordcloud = WordCloud(width=800, height=400, max_words=100, background_color='white').generate(ai_generated_text)

plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.title('Word Cloud for AI-Generated Text')
plt.axis('off')
plt.show()

In [None]:
from transformers import BertTokenizer
model_name = 'bert-large-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)#加载特定的预训练模型tokenizer

In [None]:
from torch.utils.data import Dataset
import torch

#定义数据集
class LLMDataset(Dataset):
    def __init__(self,df,is_grad,tokenizer):
        self.df = df # Pandas.DataFrame
        self.is_grad = is_grad # True: train,valid / False: test
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df) # number of samples

    def __getitem__(self,idx):
        text = self.df.loc[idx,'cleaned'] # extracting text from each row
        
        encoded_dict = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,#自动在每个文本前后添加特殊标记(如CLS和SEP)
            padding='max_length',#补0
            truncation=True,#句子长度大于max_length时截断
            max_length=512, # given to the max_length of tokenized text
            return_tensors='pt', # PyTorch
            return_attention_mask=True, # We should put it into the model，计算注意力（attention）时忽略那些paddle值
        )

        if self.is_grad:#训练集
            labels = self.df.loc[idx]['label']
            # [batch,1,max_len(84)] -> [batch,max_len]#使用squeeze降维
            return {'input_ids':encoded_dict['input_ids'].squeeze(),
                    'attention_mask':encoded_dict['attention_mask'].squeeze(),
                    'token_type_ids':encoded_dict['token_type_ids'].squeeze(),
                   # Our loss_fn wants it to be a "float" type
                    'labels':torch.tensor(labels,dtype=torch.float).unsqueeze(dim=0)}
        else:#测试集
            # [batch,1,max_len(84)] -> [batch,max_len]
            return {'input_ids':encoded_dict['input_ids'].squeeze(),
                    'attention_mask':encoded_dict['attention_mask'].squeeze(),
                   'token_type_ids':encoded_dict['token_type_ids'].squeeze()}

In [None]:
train_df = train_data #方便后续train不动进行特征工程，不改动新建好的原始数据train_data
train_df

In [None]:
#对测试集数据进行文本预处理
test['cleaned'] = test['text'].apply(lambda x:remove_tag(x))
test['cleaned'] = test['cleaned'].apply(lambda x: remove_URL(x))
test['cleaned'] = test['cleaned'].apply(lambda x: remove_html(x))
test['cleaned'] = test['cleaned'].apply(lambda x: remove_punct(x))
test['cleaned'] = test['cleaned'].apply(lambda x: x.lower()) # lowering全部变成小写字母
test['cleaned'] = test['cleaned'].apply(lambda x: word_tokenize(x)) # split sentence into words list
# exclude stop words and make them a sentence again把停用词移除并连成一个句子
test['cleaned'] = test['cleaned'].apply(lambda x: ' '.join([word for word in x if word not in stop]))

In [None]:
test.head()

In [None]:
train_dataset = LLMDataset(train_df,True,tokenizer)
test_dataset = LLMDataset(test,False,tokenizer)

In [None]:
from torch.utils.data import random_split

train_size = int(0.8 * len(train_dataset)) # train:valid = 8:2
valid_size = len(train_dataset) - train_size

train_dataset,valid_dataset = random_split(train_dataset,[train_size,valid_size])
print(f'{len(train_dataset)} train samples')
print(f'{len(valid_dataset)} valid samples')
print(f'{len(test_dataset)} test samples')

In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(train_dataset,batch_size=8,shuffle=True,pin_memory=True)#锁页内存(pin_memory)能够保持与GPU进行高速传输,在训练时加快数据的读取
valid_dataloader = DataLoader(valid_dataset,batch_size=8,shuffle=False,pin_memory=True)

In [None]:
#生成测试集
test_dataloader = DataLoader(test_dataset,batch_size=1,shuffle=True,pin_memory=True)#锁页内存(pin_memory)能够保持与GPU进行高速传输,在训练时加快数据的读取

In [None]:
valid_eval_dataset = LLMDataset(train_df[train_size:].reset_index(drop=False),False,tokenizer)
valid_eval_dataloader = DataLoader(valid_eval_dataset,batch_size=1,shuffle=False,pin_memory=True)

In [None]:
import torch
import torch.nn as nn

In [None]:
configs = {
    'model_name':'bert-large-uncased',
    'num_labels':2,
    'batch_size':8,
    'epochs':4,
    'learning_rate':5e-6,
}

In [None]:
import numpy as np
import torch
import torch.nn as nn
from transformers import BertForSequenceClassification

# Never Detach Tensor during forward
class LLMModel(nn.Module):
    '''
    To be honest, under the setting like this, there is no need to inherit.
    It's because I used "BertForSequenceClassification" which has final layer
    that is composed of "hidden size 2" for binary classification.

    So, you can think of this unnecessary inheritance is kind of "practice" for myself :)
    '''    
    def __init__(self,model_name):
        super().__init__()
        self.model = BertForSequenceClassification.from_pretrained(model_name)#from_pretrained方法会加载相应的预训练模型权重

    def forward(self,input_ids,attention_mask):
        output = self.model(input_ids=input_ids,attention_mask=attention_mask)
        logits = output.logits
        return logits
    #`logits` 是预训练bert的输出经过线性层的输出，通常用于二分类或多分类任务，通过softmax转换为概率

In [None]:
#检查gpu配置
if torch.cuda.is_available():
    device = 'cuda'
    print('GPU is running on..')
else: 
    device = 'cpu'
    print('CPU is running on..')

In [None]:
model = LLMModel(configs['model_name']).to(device)

In [None]:
# loss function
loss_fn = nn.CrossEntropyLoss()
# optimizer
from transformers import AdamW

optimizer = AdamW(model.parameters(),
                lr=6e-6,
                eps=1e-8,
                no_deprecation_warning=True)

# metric for validation
# f1_score(y_label,y_pred)
from sklearn.metrics import f1_score

metric = f1_score

In [None]:
import gc,os
from tqdm.auto import tqdm # visualizing tool for progress

# They will be used to pick the best model.pt given to the valid loss
best_model_epoch, valid_loss_values = [],[] 
valid_loss_min = [1] # arbitrary loss I set here
def train(model,device,train_dataloader,valid_dataloader,epochs,loss_fn,optimizer,metric):

    for epoch in range(epochs):
        gc.collect() # memory cleaning垃圾回收机制，减少占用内存
        model.train()

        train_loss = 0
        train_step = 0
        pbar = tqdm(train_dataloader)#tqdm参数是一个iterable

        for batch in pbar: # you can also write like "for batch in tqdm(train_dataloader"
            optimizer.zero_grad() # initialize
            train_step += 1

            train_input_ids = batch['input_ids'].to(device)
            train_attention_mask = batch['attention_mask'].to(device)
            train_labels = batch['labels'].squeeze().to(device).long()#long()转化成一维张量
            
            # You can refer to the class "TweetsModel" for understand 
            # what would be logits
            logits = model(train_input_ids, train_attention_mask).to(device)
            predictions = torch.argmax(logits, dim=1) # get an index from larger one
            detached_predictions = predictions.detach().cpu().numpy()
            
            loss = loss_fn(logits, train_labels)
            loss.backward() 
            optimizer.step()
            model.zero_grad()

            train_loss += loss.detach().cpu().numpy().item()

            pbar.set_postfix({'train_loss':train_loss/train_step})#设置进度条显示信息
        pbar.close()

        with torch.no_grad():
            model.eval()

            valid_loss = 0
            valid_step = 0
            total_valid_score = 0

            y_pred = [] # for getting f1_score that is a metric of the competition
            y_true = []

            pbar = tqdm(valid_dataloader)
            for batch in pbar:
                valid_step += 1

                valid_input_ids = batch['input_ids'].to(device)
                valid_attention_mask = batch['attention_mask'].to(device)
                valid_labels = batch['labels'].squeeze().to(device).long()

                logits = model(valid_input_ids, valid_attention_mask).to(device)
                predictions = torch.argmax(logits, dim=1)
                detached_predictions = predictions.detach().cpu().numpy()
                
                loss = loss_fn(logits, valid_labels)
                valid_loss += loss.detach().cpu().numpy().item()

                y_pred.extend(predictions.cpu().numpy())
                y_true.extend(valid_labels.cpu().numpy())

            valid_loss /= valid_step
            f1 = f1_score(y_true,y_pred)

            print(f'Epoch [{epoch+1}/{epochs}] Score: {f1}')
            print(f'Epoch [{epoch+1}/{epochs}] Valid_loss: {valid_loss}')

            if valid_loss < min(valid_loss_min):
                print('model improved!')
            else:
                print('model not improved')
    
            torch.save(model.state_dict(), f'epoch:{epoch+1}_model.pt')#state_dict 是一个字典对象，包含了模型的所有可学习参数（如权重和偏置）及其当前值
            print('save checkpoint!')
            valid_loss_min.append(valid_loss)
            print(f'valid_loss_min:{min(valid_loss_min)}')

        best_model_epoch.append(f'/kaggle/working/epoch:{epoch+1}_model.pt')
        valid_loss_values.append(valid_loss)
        print('='*100)

    select_best_model() # refer to below function
    print('Train/Valid Completed!!')
    del train_dataloader, valid_dataloader # memory cleaning
    gc.collect()

def select_best_model():
    best_model = best_model_epoch[np.array(valid_loss_values).argmin()]
    os.rename(best_model, best_model.split('.pt')[0] + '_best.pt')#重命名文件

In [None]:
print(f'Before training, files in current directory: {os.listdir()}')#列出当前工作目录下的所有文件和子目录

In [None]:
print('Training Start!')
print('=' * 100)

train(model,
    device,
    train_dataloader,
    valid_dataloader,
    configs['epochs'],
    loss_fn,
    optimizer,
    metric)

del model,train_dataloader, valid_dataloader
gc.collect()

In [None]:
def inference(model,test_dataloader):
    all_preds = []
    model.eval()

    with torch.no_grad():
        for batch in tqdm(test_dataloader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            
            logits = model(input_ids,attention_mask)
            logits = logits.detach().cpu().numpy()
            all_preds.append(logits)
    
    return all_preds

In [None]:
for filename in os.listdir():
    if 'best.pt' in filename: 
        best_pt = filename
print(f'Best model.pt: {best_pt}')
check_point = torch.load(best_pt)

# We have to load a model again because I deleted after training/validation
model = LLMModel(configs['model_name']).to(device)
model.to(device)
model.load_state_dict(check_point)#将之前保存的模型参数加载到当前模型的实例中

In [None]:
train_df[train_size:]

In [None]:
pre_valid = inference(model,valid_eval_dataloader)#验证集预测值

In [None]:
pre_valid = np.argmax(pre_valid,axis=2)
y_valid = train_df.label[train_size:].reset_index(drop=True)#验证集真实值

In [None]:
#绘制ROC曲线，计算AUC面积
from sklearn.metrics import roc_auc_score, confusion_matrix, accuracy_score, roc_curve, auc, ConfusionMatrixDisplay  # 导入一些评估指标和绘图函数


# 计算ROC曲线的参数
fpr, tpr, thresholds = roc_curve(y_valid, pre_valid)  # 计算真阳性率、假阳性率和阈值
roc_auc = auc(fpr, tpr)  # 计算AUC值
print(roc_auc)

# 绘制ROC曲线
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)  # 绘制ROC曲线
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')  # 绘制随机猜测曲线
plt.xlim([0.0, 1.0])  # 设置x轴的取值范围
plt.ylim([0.0, 1.05])  # 设置y轴的取值范围
plt.xlabel('False Positive Rate')  # 设置x轴标签
plt.ylabel('True Positive Rate')  # 设置y轴标签
plt.title('Receiver operating characteristic example')  # 设置图表标题
plt.legend(loc="lower right")  # 添加图例
plt.grid(color='purple', linestyle='--')  # 添加网格线
plt.show()  # 显示图表

# 绘制混淆矩阵
ConfusionMatrixDisplay.from_predictions(y_valid,pre_valid, colorbar=True, display_labels=["0", "1"],
                                         cmap=plt.cm.Reds)  # 根据预测结果和真实结果绘制混淆矩阵
plt.title("Confusion Matrix")  # 设置图表标题
plt.show()  # 显示图表

In [None]:
# Pick up the model.pt written with the best
# which has the lowest validation loss through all Epochs.

for filename in os.listdir():
    if 'best.pt' in filename: 
        best_pt = filename
print(f'Best model.pt: {best_pt}')
check_point = torch.load(best_pt)

# We have to load a model again because I deleted after training/validation
model = LLMModel(configs['model_name']).to(device)
model.to(device)
model.load_state_dict(check_point)#将之前保存的模型参数加载到当前模型的实例中

predictions = inference(model,test_dataloader) #predictions形状为(batch_size,num_labels)

In [None]:
predictions

In [None]:
import torch.nn.functional as F
pre_test = F.softmax(torch.tensor(predictions),dtype=torch.float32,dim=-1) #logits经过softmax之后的概率
prob_test = pre_test[:,0,1] #generated的概率
prob_test

In [None]:
sample = pd.read_csv('/kaggle/input/llm-detect-ai-generated-text/sample_submission.csv')
sample

In [None]:
sample['generated'] = prob_test
sample.head(10)

In [None]:
sample.to_csv('submission.csv',index=False,header=True)