### Setup: Install and import

In [None]:
!pip install transformers torch
!pip install nltk
!pip install tqdm
!pip install openai

In [None]:
# In order to make things work on google drive
from google.colab import drive

drive.mount('/content/gdrive')

In [None]:
import random
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from transformers import RobertaTokenizer, RobertaModel, RobertaConfig, AdamW
import nltk

from tqdm import tqdm
import numpy as np

import openai

### Load Pre-trained RoBERTa Model and Tokenizer

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
roberta_base = RobertaModel.from_pretrained('roberta-base')

### Dataset

In [None]:
!pip install datasets

In [None]:
# GPT- wiki-intro
# https://huggingface.co/datasets/aadityaubhat/GPT-wiki-intro
from datasets import load_dataset

dataset = load_dataset("aadityaubhat/GPT-wiki-intro")['train']

In [None]:
# truncate
def truncate(example):
    """
    Truncate 'wiki_intro' and 'generated_intro' to shorter length
    """
    min_length = min(len(example['wiki_intro']), len(example['generated_intro']))
    truncated_wiki_intro = example['wiki_intro'][:min_length]
    truncated_generated_intro = example['generated_intro'][:min_length]

    return {
        'wiki_intro': truncated_wiki_intro,
        'generated_intro': truncated_generated_intro,
        'title_len': example['title_len'],
        'wiki_intro_len': example['wiki_intro_len'],
        'generated_intro_len': example['generated_intro_len'],
        'prompt_tokens': example['prompt_tokens'],
        'generated_text_tokens': example['generated_text_tokens']
    }


Wiki_data = dataset.map(truncate)

In [None]:
# Generate labels
Wiki_texts = Wiki_data['wiki_intro'] + Wiki_data['generated_intro']

# 1 for human generated, 0 for machine generated
Wiki_labels = [1] * len(Wiki_data['wiki_intro']) + [0] * len(Wiki_data['generated_intro'])

In [None]:
def downsample_data(texts, labels, num_samples=2400):
    combined_data = list(zip(texts, labels))
    sampled_data = random.sample(combined_data, num_samples)
    sampled_texts, sampled_labels = zip(*sampled_data)
    sampled_indices = [texts.index(text) for text, label in sampled_data]

    return list(sampled_texts), list(sampled_labels), sampled_indices

In [None]:
Wiki_sampled_texts, Wiki_sampled_labels, Wiki_sampled_indices = downsample_data(Wiki_texts, Wiki_labels, 2000)

Wiki_train_texts = Wiki_sampled_texts[:1700]
Wiki_train_labels = Wiki_sampled_labels[:1700]
Wiki_train_indices = Wiki_sampled_indices[:1700]

Wiki_val_texts = Wiki_sampled_texts[1700:1850]
Wiki_val_labels = Wiki_sampled_labels[1700:1850]
Wiki_val_indices = Wiki_sampled_indices[1700:1850]

Wiki_test_texts = Wiki_sampled_texts[1850:]
Wiki_test_labels = Wiki_sampled_labels[1850:]
Wiki_test_indices = Wiki_sampled_indices[1850:]

In [None]:
print(f"Training Data Size: {len(Wiki_train_texts)}")
print(f"Validation Data Size: {len(Wiki_val_texts)}")
print(f"Testing Data Size: {len(Wiki_test_texts)}")

In [None]:
# PubMedQA
# https://pubmedqa.github.io/

# a directory structure in Files:
# data/ori_pqaa.json      - 2.6 MB Downloaded from https://drive.google.com/file/d/15v1x6aQDlZymaHGP7cZJZZYFfeJt2NdS/view
# data/ori_pqal.json      - 533.4 MB Downloaded from https://github.com/pubmedqa/pubmedqa/blob/master/data/ori_pqal.json
import json
import random

ori_pqal_path = './gdrive/MyDrive/CPSC_588_dataset/ori_pqal.json'
with open(ori_pqal_path, 'r') as file:
    ori_pqal = json.load(file)
machine_generated_dataset = [{"text": item["LONG_ANSWER"], "label": "0"} for item in ori_pqal.values()]

ori_pqaa_path = './gdrive/MyDrive/CPSC_588_dataset/ori_pqaa.json'
with open(ori_pqaa_path, 'r') as file:
    ori_pqaa = json.load(file)
human_generated_dataset = [{"text": item["LONG_ANSWER"], "label": "1"} for item in ori_pqaa.values()]

human_generated_dataset = random.sample(human_generated_dataset, 1000)

print(machine_generated_dataset[0]) # {'text': '...', 'label': 'machine_generated'}
print(human_generated_dataset[0]) # {'text': '...', 'label': 'human_generated'}

In [None]:
combined_dataset = machine_generated_dataset + human_generated_dataset

texts = [item['text'] for item in combined_dataset]
labels = [int(item['label']) for item in combined_dataset]

PMQA_train_texts, temp_texts, PMQA_train_labels, temp_labels = train_test_split(
    texts, labels, test_size=0.2, random_state=42)

PMQA_val_texts, PMQA_test_texts, PMQA_val_labels, PMQA_test_labels = train_test_split(
    temp_texts, temp_labels, test_size=0.5, random_state=42)

In [None]:
PMQA_sampled_texts, PMQA_sampled_labels, PMQA_sampled_indices = downsample_data(texts, labels, 2000)

PMQA_train_texts = PMQA_sampled_texts[:1700]
PMQA_train_labels = PMQA_sampled_labels[:1700]
PMQA_train_indices = PMQA_sampled_indices[:1700]

PMQA_val_texts = PMQA_sampled_texts[1700:1850]
PMQA_val_labels = PMQA_sampled_labels[1700:1850]
PMQA_val_indices = PMQA_sampled_indices[1700:1850]

PMQA_test_texts = PMQA_sampled_texts[1850:]
PMQA_test_labels = PMQA_sampled_labels[1850:]
PMQA_test_indices = PMQA_sampled_indices[1850:]

In [None]:
print(f"Training Data Size: {len(PMQA_train_texts)}")
print(f"Validation Data Size: {len(PMQA_val_texts)}")
print(f"Testing Data Size: {len(PMQA_test_texts)}")

print("first string in PMQA_train_texts:", PMQA_train_texts[0])
print("first label in PMQA_train_texts:", PMQA_train_labels[0])
print("first index in PMQA_train_texts:", PMQA_train_indices[0])
print("this index in texts:", texts[PMQA_train_indices[0]])

### Data Preparation

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = str(self.texts[item])
        label = self.labels[item]

        encoding = self.tokenizer.encode_plus(
          text,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          padding='max_length',
          return_attention_mask=True,
          return_tensors='pt',
          truncation=True
        )

        return {
          'text': text,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'labels': torch.tensor(label, dtype=torch.long)
        }

Wiki_train_dataset = TextDataset(Wiki_train_texts, Wiki_train_labels, tokenizer)
Wiki_val_dataset = TextDataset(Wiki_val_texts, Wiki_val_labels, tokenizer)
Wiki_test_dataset = TextDataset(Wiki_test_texts, Wiki_test_labels, tokenizer)

Wiki_train_loader = DataLoader(Wiki_train_dataset, batch_size=16, shuffle=True)
Wiki_val_loader = DataLoader(Wiki_val_dataset, batch_size=16, shuffle=True)
Wiki_test_loader = DataLoader(Wiki_test_dataset, batch_size=16, shuffle=True)

PMQA_train_dataset = TextDataset(PMQA_train_texts, PMQA_train_labels, tokenizer)
PMQA_val_dataset = TextDataset(PMQA_val_texts, PMQA_val_labels, tokenizer)
PMQA_test_dataset = TextDataset(PMQA_test_texts, PMQA_test_labels, tokenizer)

PMQA_train_loader = DataLoader(PMQA_train_dataset, batch_size=16, shuffle=True)
PMQA_val_loader = DataLoader(PMQA_val_dataset, batch_size=16, shuffle=False)
PMQA_test_loader = DataLoader(PMQA_test_dataset, batch_size=16, shuffle=False)

### Create a Custom Classifier

In [None]:
class BaselineRobertaClassifier(nn.Module):
    def __init__(self, roberta_base):
        super(BaselineRobertaClassifier, self).__init__()
        self.roberta = roberta_base
        self.classifier = nn.Linear(roberta_base.config.hidden_size, 2)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids, attention_mask)
        pooled_output = outputs[1]
        logits = self.classifier(pooled_output)
        return logits

class RobertaClassifier(nn.Module):
    def __init__(self, roberta_base, stat_emb_dim, fusion_type='early'):
        super(RobertaClassifier, self).__init__()
        self.fusion_type = fusion_type
        self.roberta = roberta_base

        # Non-linear transformation for statistical embeddings
        self.stat_emb_transform = nn.Linear(stat_emb_dim, stat_emb_dim)
        self.activation = nn.ReLU()

        if fusion_type == 'early':
            self.classifier = nn.Linear(roberta_base.config.hidden_size + stat_emb_dim, 2)
        else:  # late fusion
            self.classifier = nn.Linear(roberta_base.config.hidden_size, 2)
            self.stat_emb_classifier = nn.Linear(stat_emb_dim, 2)

            # Conditional layer
            self.conditional_weights = nn.Linear(stat_emb_dim, roberta_base.config.hidden_size)

    def forward(self, input_ids, attention_mask, statistical_features):
        outputs = self.roberta(input_ids, attention_mask)
        pooled_output = outputs[1]

        # Apply non-linear transformation to statistical features
        transformed_stat_features = self.activation(self.stat_emb_transform(statistical_features))

        if self.fusion_type == 'early':
            combined_output = torch.cat((pooled_output, transformed_stat_features), dim=1)
            return self.classifier(combined_output)
        else:  # late fusion
            # Apply conditional layer
            conditional_weights = torch.sigmoid(self.conditional_weights(transformed_stat_features))
            conditioned_roberta_output = pooled_output * conditional_weights

            logits_from_roberta = self.classifier(conditioned_roberta_output)
            logits_from_stat_emb = self.stat_emb_classifier(transformed_stat_features)
            combined_logits = logits_from_roberta + logits_from_stat_emb
            return combined_logits

### Calculate the statistical features

In [None]:
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('tagsets')
nltk.help.upenn_tagset()


In [None]:
from nltk.data import load
upenn_tagset_info = load('help/tagsets/upenn_tagset.pickle')
upenn_tagset = list(upenn_tagset_info.keys())
#print(upenn_tagset)
#print(len(upenn_tagset))
for index, tag in enumerate(upenn_tagset):
    print(f"index:{index} , tag:{tag}")
upenn_tagset_meaningful = upenn_tagset[0:3] + upenn_tagset[4:9] + upenn_tagset[10:14] + upenn_tagset[15:19] + upenn_tagset[25:]
#print(upenn_tagset_meaningful)
#print(len(upenn_tagset_meaningful))

In [None]:
import json
import pandas as pd

In [None]:
def calculate_tag_dist(text: str):
    text = nltk.tokenize.word_tokenize(text)
    tagged_text = nltk.pos_tag(text)
    tag_fd = nltk.FreqDist(tag for (word, tag) in tagged_text)
    tag_count = [tag_fd.get(tag, 0) for tag in upenn_tagset_meaningful]
    count_sum = sum(tag_count)
    tag_dist = [count / count_sum for count in tag_count]
    # tag_dist = [tag_fd.freq(tag) for tag in tag_fd]
    # print(dict(tag_fd))
    # print("length", len(tag_dist))
    # print(tag_dist)
    return tag_dist

def calculate_statistical_features(input_text):
    # Implement the logic to calculate statistical features
    # This function should return a tensor of shape [batch_size, stat_emb_dim]
    # pos tag distribution

    pos_tag_dists = [calculate_tag_dist(text) for text in input_text]
    return torch.tensor(pos_tag_dists)

# pos_embeddings = calculate_statistical_features(train_texts)
# torch.save(pos_embeddings, "pos_embeddings.pt")


### Model Training

In [None]:
def train_epoch_baseline(model, data_loader, loss_fn, optimizer, device, n_examples):
    model.train()
    losses = []
    correct_predictions = 0

    for d in tqdm(data_loader, total=len(data_loader), desc="Training"):
        input_ids = d["input_ids"].to(device)
        attention_mask = d["attention_mask"].to(device)
        labels = d["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, labels)

        correct_predictions += torch.sum(preds == labels)
        losses.append(loss.item())

        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        optimizer.zero_grad()

    return correct_predictions.double() / n_examples, np.mean(losses)

def validate_epoch(model, data_loader, loss_fn, device, n_examples, calculate_stat_features = None):
    model.eval()
    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            labels = d["labels"].to(device)
            texts = d["text"]

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels)

            correct_predictions += torch.sum(preds == labels)
            losses.append(loss.item())

    return correct_predictions.double() / n_examples, np.mean(losses)

def eval_model(model, data_loader, loss_fn, device, n_examples, calculate_stat_features = None):
    model = model.eval()
    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in tqdm(data_loader, total=len(data_loader), desc="Testing"):
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            labels = d["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels)

            correct_predictions += torch.sum(preds == labels)
            losses.append(loss.item())

    return correct_predictions.double() / n_examples, np.mean(losses)


In [None]:
# Initialize model
dataset = "Wiki" # "PMQA"

num_epochs = 5
stat_emb_dim = 36
fusion_type = "early"

model = BaselineRobertaClassifier(roberta_base)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

optimizer = AdamW(model.parameters(), lr=2e-5)
loss_fn = nn.CrossEntropyLoss().to(device)

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print('-' * 10)

    train_acc, train_loss = train_epoch_baseline(
        model,
        Wiki_train_loader if dataset == "Wiki" else PMQA_train_loader,
        loss_fn,
        optimizer,
        device,
        len(Wiki_train_dataset) if dataset == "Wiki" else len(PMQA_train_dataset),
    )
    print(f'Train loss {train_loss}, accuracy {train_acc}')

    val_acc, val_loss = validate_epoch(
        model,
        Wiki_val_loader if dataset == "Wiki" else PMQA_val_loader,
        loss_fn,
        device,
        len(Wiki_val_dataset) if dataset == "Wiki" else len(PMQA_val_dataset)
    )
    print(f'Validation loss {val_loss}, accuracy {val_acc}')


### Model Evaluation

In [None]:
test_acc, test_loss = eval_model(
    model,
    Wiki_test_loader if dataset == "Wiki" else PMQA_test_loader,
    loss_fn,
    device,
    len(Wiki_test_dataset) if dataset == "Wiki" else len(PMQA_test_dataset),
    calculate_statistical_features
)

print(f'Test loss {test_loss}, accuracy {test_acc}')

Testing: 100%|██████████| 107/107 [00:17<00:00,  6.18it/s]

Test loss 5.1303714614146103e-05, accuracy 0.9999999999999999





In [None]:
def preprocess(texts):
    # Tokenize the texts - this can be a single string or a list of strings
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    return inputs

def predict(model, texts, device):
    model.eval()

    inputs = preprocess(texts)
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        predictions = torch.argmax(outputs, dim=1)

    return predictions.cpu().numpy()

text = "As ILC2s are elevated in patients with CRSwNP, they may drive nasal polyp formation in CRS. ILC2s are also linked with high tissue and blood eosinophilia and have a potential role in the activation and survival of eosinophils during the Th2 immune response. The association of innate lymphoid cells in CRS provides insights into its pathogenesis."
single_prediction = predict(model, text, device)
print(single_prediction[0])