# AI-text-detector

In [125]:
from transformers import BertTokenizer, BertForSequenceClassification
import torch
import numpy as np
import seaborn as sns
from torch.utils.data import DataLoader, Dataset

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# optim
import torch.optim as optim

## Training

In [129]:
# load texts and labels
def text_edit(text):
    # remove first two lines
    text = '\n'.join(text.split('\n')[2:])

    # cut longer than 2000 characters
    text = text[:2000]

    # # remove everything After Kilde:
    # text = text.split('Kilde:')[0]

    # remove empty lines
    # text = '\n'.join([line for line in text.split('\n') if line != ''])

    # remove new lines
    text = ' '.join(text.split('\n'))

    # remove double spaces
    # text = ' '.join(text.split('  '))

    # remove emojis
    # text = text.encode('ascii', 'ignore').decode('ascii')
   

    # Assuming 'text' is defined and contains sentences
    sentences = text.split('.')
    num_sentences = len(sentences)
    if num_sentences > 4:

        # Create an exponential distribution for probabilities
        # The exponential distribution should favor lower indices for rand_low and higher indices for rand_high
        probabilities = np.exp(np.linspace(0, 2, num_sentences-1))
        probabilities /= probabilities.sum()  # Normalize to make it a valid probability distribution

        # Choose rand_low and rand_high using the defined probabilities
        rand_low = np.random.choice(np.arange(num_sentences-1), p=probabilities[::-1])
        rand_high = np.random.choice(np.arange(rand_low+1, num_sentences), 
                        p=probabilities[rand_low:] / probabilities[rand_low:].sum())

        # Join the selected range of sentences
        text = '.'.join(sentences[rand_low:rand_high])

    return text

def load_data():

    import os

    data = {
        'human': [],
        'bot': []
    }

    data_sources = {
        'human' : ['data/heste-nettet-nyheder/'],
        'bot' : ['data/heste-nettet-nyheder-ai/gpt-3.5-turbo/', 'data/heste-nettet-nyheder-ai/gpt-4-0613/']
    }

    for source in data_sources:
        for path in data_sources[source]:
            for filename in os.listdir(path):
                with open(path + filename, 'r', encoding='utf-8') as f:
                    content = f.read()
                    text = text_edit(content)
                    data[source].append(text)

    # cut to same length
    min_len = min(len(data['human']), len(data['bot']))
    data['human'] = data['human'][:min_len]
    data['bot'] = data['bot'][:min_len]
    
    my_texts = np.array(data['human'] + data['bot'])
    my_labels = np.array([0]*len(data['human']) + [1]*len(data['bot'])) 
    
        

    return list(my_texts), my_labels

def preprocess(text):
    inputs = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors="pt")
    return inputs

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
        self.inputs = preprocess(texts)

        # dict_keys(['input_ids', 'token_type_ids', 'attention_mask'])
        self.input_ids = self.inputs['input_ids']
        self.attention_mask = self.inputs['attention_mask']

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        input_ids = self.input_ids[idx]
        attention_mask = self.attention_mask[idx]

        return {
            # 'text': text,
            'label': label,
            'input_ids': input_ids,
            'attention_mask': attention_mask
        }

model_name = "bert-base-multilingual-cased"  # or another model suitable for Danish
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name)

texts, labels = load_data()
texts_train, texts_test, labels_train, labels_test = train_test_split(texts, labels, test_size=0.2)

dataset_train = TextDataset(texts_train, labels_train)
dataloader_train = DataLoader(dataset_train, batch_size=32, shuffle=True)

dataset_test = TextDataset(texts_test, labels_test)
dataloader_test = DataLoader(dataset_test, batch_size=32, shuffle=True)


# print table summary with total samples in each dataset, positive samples, and negative samples
def pretty_print_info_table(labels_train, labels_test):
    data_for_table = {
        'train': {
            'total': len(labels_train),
            'pos': np.sum(labels_train),
            'neg': len(labels_train) - np.sum(labels_train)
        },
        'test': {
            'total': len(labels_test),
            'pos': np.sum(labels_test),
            'neg': len(labels_test) - np.sum(labels_test)
        }
    }
    data_for_table['total'] = {
        'total': data_for_table['train']['total'] + data_for_table['test']['total'],
        'pos': data_for_table['train']['pos'] + data_for_table['test']['pos'],
        'neg': data_for_table['train']['neg'] + data_for_table['test']['neg']
    }

    print("""
    Info table:
    +----------------+---------+---------+-------+
    |                | Training| Testing | Total
    +----------------+---------+---------+-------+
    | Total samples  | {:7d} | {:7d} | {:7d} |
    | Pos. samples   | {:7d} | {:7d} | {:7d} |  (AI generated)
    | Neg. samples   | {:7d} | {:7d} | {:7d} |  (Human written)
    """.format(
        data_for_table['train']['total'],
        data_for_table['test']['total'],
        data_for_table['total']['total'],
        data_for_table['train']['pos'],
        data_for_table['test']['pos'],
        data_for_table['total']['pos'],
        data_for_table['train']['neg'],
        data_for_table['test']['neg'],
        data_for_table['total']['neg'],
        
        ))

pretty_print_info_table(labels_train, labels_test)

# weight for positive class
weight_pos = len(texts_train) / (2 * np.sum(labels_train))
# weight for negative class
weight_neg = len(texts_train) / (2 * (len(texts_train) - np.sum(labels_train)))
# class weights
class_weights = torch.tensor([weight_neg, weight_pos])

# the class weights are defined, but im unsure how to apply them as the loss function is incooperated in the forward pass of the model, so i will just balance the dataset instead.


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-multilingual-cased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



    Info table:
    +----------------+---------+---------+-------+
    |                | Training| Testing | Total
    +----------------+---------+---------+-------+
    | Total samples  |     635 |     159 |     794 |
    | Pos. samples   |     319 |      78 |     397 |  (AI generated)
    | Neg. samples   |     316 |      81 |     397 |  (Human written)
    


In [130]:
# Assume you're using a GPU for training
device = torch.device("mps" if torch.has_mps else "cpu")
print("Using device:", device)
model.to(device)

# Training loop
from transformers import AdamW
optimizer = optim.AdamW(model.parameters(), lr=1e-6, weight_decay=0.01)

# early stopping
def early_stopping(losses, patience=10):
    if len(losses) < patience:
        return False
    return losses[-1] > max(losses[-patience:-1])

losses = {'train': [], 'test': []}
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    # Train
    loss_total_train = 0
    for i, batch in enumerate(dataloader_train):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)


        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        #rint(outputs)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        loss_total_train += loss.item()

    print(f"Epoch {epoch} loss: {loss_total_train / len(dataset_train)}")
    losses['train'].append(loss_total_train / len(dataset_train))

    # Test
    model.eval()
    loss_total_test = 0
    with torch.no_grad():
        for i, batch in enumerate(dataloader_test):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss_total_test += loss.item()

    print(f"Epoch {epoch} test loss: {loss_total_test / len(dataset_test)}")
    losses['test'].append(loss_total_test / len(dataset_test))

    # if validation loss is lowest, save model
    if min(losses['test']) == losses['test'][-1]:
        model.save_pretrained("models/bert_classifier", save_function=torch.save)
        print("Model saved")

    if early_stopping(losses['test']):
        print("Early stopping")
        break

    

# load model
model = BertForSequenceClassification.from_pretrained("models/bert_classifier")


# Plot the losses
plt.figure(figsize=(10, 5))
plt.plot(losses['train'], label='train')
plt.plot(losses['test'], label='test')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss')


Using device: mps
Epoch 0 loss: 0.022102032871696892
Epoch 0 test loss: 0.021796397443087596
Model saved
Epoch 1 loss: 0.021631550131820318
Epoch 1 test loss: 0.02127708316599048
Model saved
Epoch 2 loss: 0.021069163318694108
Epoch 2 test loss: 0.020522417887201848
Model saved
Epoch 3 loss: 0.020108799108370084
Epoch 3 test loss: 0.01947249931359441
Model saved
Epoch 4 loss: 0.01887018164311807
Epoch 4 test loss: 0.018250042537473282
Model saved
Epoch 5 loss: 0.01759906232826353
Epoch 5 test loss: 0.016826564410947403
Model saved
Epoch 6 loss: 0.0162020371185513
Epoch 6 test loss: 0.015327596814377504
Model saved
Epoch 7 loss: 0.014461705065149022
Epoch 7 test loss: 0.013743061314588823
Model saved
Epoch 8 loss: 0.012437549303835772
Epoch 8 test loss: 0.012518148752128554
Model saved
Epoch 9 loss: 0.010898134088891698
Epoch 9 test loss: 0.01146589735019132
Model saved
Epoch 10 loss: 0.009432944279956067
Epoch 10 test loss: 0.010735477853870991
Model saved
Epoch 11 loss: 0.0083635049776

## Model Validation

In [128]:
# Evaluate the model
from sklearn.metrics import classification_report, confusion_matrix

model.eval()
predictions = []
true_labels = []
with torch.no_grad():
    for i, batch in enumerate(dataloader_test):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predictions.extend(logits.argmax(dim=1).cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

print(classification_report(true_labels, predictions))

# Confusion matrix
cm = confusion_matrix(true_labels, predictions)
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

plt.figure(figsize=(5, 5))
plt.imshow(cm, cmap=plt.cm.RdBu, alpha=.7 )

for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, "{:.5f}".format(cm[i, j]), horizontalalignment="center")

plt.xticks([0, 1], ['human', 'bot'])
plt.yticks([0, 1], ['human', 'bot'])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion matrix for BERT detector, for detecting AI generated texts")

RuntimeError: Placeholder storage has not been allocated on MPS device!

## Model Application

In [None]:
# load my model
from transformers import BertForSequenceClassification
model = BertForSequenceClassification.from_pretrained("models/bert_classifier")
model.to(device);

In [None]:
# test
test_string = "This is a test"

model.eval()    
with torch.no_grad():
    inputs = preprocess(test_string)
    input_ids = inputs['input_ids'].squeeze(1).to(device)
    attention_mask = inputs['attention_mask'].squeeze(1).to(device)
    outputs = model(input_ids, attention_mask=attention_mask)
    logits = outputs.logits
    print(logits)
    print(torch.argmax(logits).item())

tensor([[-0.8824,  1.1385]], device='mps:0')
1


## Ftitting it into router format

In [None]:
# to fit into the pipeline i need to make a predict function

def predict(text):
    model.eval()    
    with torch.no_grad():
        inputs = preprocess(text)
        input_ids = inputs['input_ids'].squeeze(1).to(device)
        attention_mask = inputs['attention_mask'].squeeze(1).to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        return torch.argmax(logits).item()

# test
test_string = "This is a test"
print(predict(test_string))

1


## Test on data.csv

In [None]:
df = pd.read_csv('data.csv')
df

df['bert_prediction'] = df['text'].apply(predict)

df

Unnamed: 0,text,is_generated,bert_prediction
0,"Det gør firmaerne, der står bag AI'en - eksemp...",0,0
1,"Tror det bliver for kedeligt, hvis vi bare læs...",0,0
2,"Godt spørgsmål! Det er nemlig meget, meget svæ...",0,1
3,"Ja, det er begyndt at ske. Det er dog ikke så ...",1,1
4,Det vil jeg tro - uden at vide det helt præcis...,0,1
5,"Selvom det er svært at forudsige, hvornår AI v...",1,1
6,"Det afhænger nok meget af, hvor fremme i skoen...",0,0
7,"En tech-korrespondent er en journalist, der dæ...",1,1
8,"ChatGPT er en generativ AI-model, der lærer at...",1,1
9,"Hej, den har læst en masse tekst og så regner ...",0,1
