<a href="https://colab.research.google.com/github/Sakuni-Weerasinghe/Automatic-Question-and-Answer-Generation-based-on-Large-Language-Models/blob/master/ResearchQuestionGeneration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install tokenizers
!pip install sentencepiece
!pip install pytorch-lightning
!pip install nltk

Import Libraries

In [None]:
from typing import List, Dict
import tqdm.notebook as tq
from tqdm.notebook import tqdm
import json
import pandas as pd
import numpy as np

from transformers import T5Tokenizer
import seaborn as sns
import matplotlib.pyplot as plt
import pytorch_lightning as pl
from sklearn.model_selection import train_test_split
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning import Trainer
from torch.utils.data import Dataset, DataLoader
import torch
from torch.utils.data import Dataset

import nltk
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
from nltk.translate.bleu_score import SmoothingFunction
from nltk import ngrams
from nltk.tokenize import word_tokenize

from transformers import (
    AdamW,
    T5ForConditionalGeneration,
    T5TokenizerFast as T5Tokenizer
    )

In [None]:
def extract_from_my_dataset(data):

    topics = []
    sub_topics = []
    contexts = []
    questions = []
    correct_answers = []
    options1 = []  # Renamed option1
    options2 = []  # Renamed option2
    options3 = []  # Renamed option3
    options4 = []  # Renamed option4

    for topic in data:
        topic_name = topic.get("topic", "")
        sub_topic_name = topic.get("sub-topic", "")
        context = topic.get("context", "")
        topic_questions = topic.get("questions", [])

        for qna_set in topic_questions:
            question = qna_set.get("question", "")
            correct_answer = qna_set.get("correct_answer", "")
            option_1 = qna_set.get("option1", "")  # Renamed option1
            option_2 = qna_set.get("option2", "")  # Renamed option2
            option_3 = qna_set.get("option3", "")  # Renamed option3
            option_4 = qna_set.get("option4", "")  # Renamed option4

            topics.append(topic_name)
            sub_topics.append(sub_topic_name)
            contexts.append(context)
            questions.append(question)
            correct_answers.append(correct_answer)
            options1.append(option_1)  # Appended to options1
            options2.append(option_2)  # Appended to options2
            options3.append(option_3)  # Appended to options3
            options4.append(option_4)  # Appended to options4

    return topics, sub_topics, contexts, questions, correct_answers, options1, options2, options3, options4





In [None]:
def parse_json(filepath):
    data = []

    with open(filepath) as file:
        data = json.load(file)

    return data

In [None]:
data = parse_json("data.json")  # Replace with your actual dataset

topics, sub_topics, contexts, questions, correct_answers, option1,option2,option3,option4 = extract_from_my_dataset(data)

# Create a DataFrame based on extracted data
my_dataset_df = pd.DataFrame({
    'topic': topics,
    'sub_topic': sub_topics,
    'context': contexts,
    'question': questions,
    'correct_answer': correct_answers,
    'option1': option1,
    'option2': option2,
    'option3': option3,
    'option4': option4
})

Export as *.csv and upload to GDrive

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

In [None]:
# Split the data into train, test, and validation sets
train, test = train_test_split(my_dataset_df, test_size=0.2, random_state=42)
train, val = train_test_split(train, test_size=0.2, random_state=42)

print(train.shape)
print(test.shape)
print(val.shape)

In [None]:
import os

train.to_csv("train.csv", index=False)
test.to_csv("test.csv", index=False)
val.to_csv("val.csv", index=False)

!mv train.csv test.csv val.csv drive/MyDrive/Research_Final/QG/DataSet

!gdown 'https://drive.google.com/file/d/1-XbPy7_ooHU1Wg8-RDWXw7ZGiq60kUSP' -O train.csv
!gdown 'https://drive.google.com/file/d/1-SRGIQoJ4fYh1iX9Suc5RmOJgCgny1y3' -O test.csv
!gdown 'https://drive.google.com/file/d/1-OexDBlKaRWIZac2lKqGxIwnZi0UdcDk' -O val.csv


In [None]:
train_dataset = pd.read_csv('train.csv')
test_dataset = pd.read_csv('test.csv')
val_dataset = pd.read_csv('val.csv')

In [None]:
train_dataset_1 = train.copy()
test_dataset_1 = test.copy()
val_dataset_1 = val.copy()

train_dataset_1 = train_dataset_1.dropna() #removing rows with missing values in the DataFrame
test_dataset_1 = test_dataset_1.dropna() #removing rows with missing values in the DataFrame
val_dataset_1 = val_dataset_1.dropna() #removing rows with missing values in the DataFrame

train_dataset_1.drop(columns=['option1', 'option2','option3','option4'], inplace=True)
test_dataset_1.drop(columns=['option1', 'option2','option3','option4'], inplace=True)
val_dataset_1.drop(columns=['option1', 'option2','option3','option4'], inplace=True)

print(train_dataset_1.shape,'train_dataset_1')
print(test_dataset_1.shape, 'test_dataset_1')
print(val_dataset_1.shape, 'val_dataset_1')


**Pytorch Lightning Dataset**

In [None]:
SEP_TOKEN = '<sep>'
MASKING_CHANCE = 0.3
#30% chance to replace the answer with '[MASK]'

In [None]:
class QGDataset(Dataset):
    def __init__(
        self,
        data: pd.DataFrame,
        tokenizer: T5Tokenizer,
        source_max_token_len: int,
        target_max_token_len: int,
    ):
        self.tokenizer = tokenizer
        self.data = data
        self.source_max_token_len = source_max_token_len
        self.target_max_token_len = target_max_token_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index: int):
        data_row = self.data.iloc[index]  # Use iloc to access the row by index

        if np.random.rand() > MASKING_CHANCE:
            answer = data_row['correct_answer']
        else:
            answer = '[MASK]'

        source_encoding = self.tokenizer(
            '{} {} {}'.format(answer, SEP_TOKEN, data_row['context']),
            max_length=self.source_max_token_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
        )

        target_encoding = self.tokenizer(
            '{} {} {}'.format(data_row['correct_answer'], SEP_TOKEN, data_row['question']),
            max_length=self.target_max_token_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
        )

        labels = target_encoding['input_ids']
        labels[labels == 0] = -100

        return dict(
            answer_text=data_row['correct_answer'],
            context=data_row['context'],
            question=data_row['question'],
            input_ids=source_encoding['input_ids'].flatten(),
            attention_mask=source_encoding['attention_mask'].flatten(),
            labels=labels.flatten().to(torch.long)
        )


**Pytorch Lightning DataModule**

In [None]:
class QGDataModule(pl.LightningDataModule):
    def __init__(
        self,
        train_dataset_1: pd.DataFrame,
        val_dataset_1: pd.DataFrame,
        test_dataset_1: pd.DataFrame,
        tokenizer: T5Tokenizer,
        batch_size,
        source_max_token_len: int,
        target_max_token_len: int
        ):
        super().__init__()
        self.batch_size = batch_size
        self.train_dataset_1 = train_dataset_1
        self.val_dataset_1 = val_dataset_1
        self.test_dataset_1 = test_dataset_1
        self.tokenizer = tokenizer
        self.source_max_token_len = source_max_token_len
        self.target_max_token_len = target_max_token_len

    def setup(self,stage=None):
        self.train_dataset_2 = QGDataset(self.train_dataset_1, self.tokenizer, self.source_max_token_len, self.target_max_token_len)
        self.val_dataset_2 = QGDataset(self.val_dataset_1, self.tokenizer, self.source_max_token_len, self.target_max_token_len)
        self.test_dataset_2 = QGDataset(self.test_dataset_1, self.tokenizer, self.source_max_token_len, self.target_max_token_len)

    def train_dataloader(self):
        return DataLoader(self.train_dataset_2, batch_size = self.batch_size, shuffle=True, num_workers = 2)

    def val_dataloader(self):
        return DataLoader(self.val_dataset_2, batch_size=1, num_workers=2)

    def test_dataloader(self):
        return DataLoader(self.test_dataset_2, batch_size=1, num_workers=2)

Hyperparameters

In [None]:
#use t5-base, gpt-2 as other models
MODEL_NAME = 't5-small'
SOURCE_MAX_TOKEN_LEN = 1000
TARGET_MAX_TOKEN_LEN = 80

N_EPOCHS = 20
BATCH_SIZE = 8
LEARNING_RATE = 0.0001

In [None]:
DF_TAKE_PERCENTAGE = 1

TAKE_TRAIN = int(len(train_dataset_1) * DF_TAKE_PERCENTAGE)
TAKE_TEST = int(len(test_dataset_1) * DF_TAKE_PERCENTAGE)
TAKE_VAL = int(len(val_dataset_1) * DF_TAKE_PERCENTAGE)

**Initializing training module**

Setting Model

In [None]:
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)
print('tokenizer len before: ', len(tokenizer))
tokenizer.add_tokens(SEP_TOKEN)
print('tokenizer len after: ', len(tokenizer))
TOKENIZER_LEN = len(tokenizer)

train_dataset_3 = train_dataset_1[:TAKE_TRAIN]
test_dataset_3 = test_dataset_1[:TAKE_TEST]
val_dataset_3 = val_dataset_1[:TAKE_VAL]


data_module = QGDataModule(
    train_dataset_3,
    test_dataset_3,
    val_dataset_3,
    tokenizer,
    BATCH_SIZE,
    SOURCE_MAX_TOKEN_LEN,
    TARGET_MAX_TOKEN_LEN)

data_module.setup()

train_dataloader = data_module.train_dataloader()
val_dataloader = data_module.val_dataloader()
test_dataloader = data_module.test_dataloader()

In [None]:
class QGModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True)
        self.model.resize_token_embeddings(TOKENIZER_LEN) #resizing after adding new tokens to the tokenizer

    def forward(self, input_ids, attention_mask, labels=None):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        return output.loss, output.logits

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('train_loss', loss, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('val_loss', loss, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('test_loss', loss, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=LEARNING_RATE)

Setting trainer

In [None]:
checkpoint_callback = ModelCheckpoint(
    dirpath='checkpoints',
    filename='best-checkpoint',
    save_top_k=-1,
    verbose=True,
    monitor='val_loss',
    mode='min'
    )

trainer = pl.Trainer(
    callbacks=[checkpoint_callback],
    max_epochs=N_EPOCHS,
    devices=1
    )

**Training**

In [None]:
%load_ext tensorboard

In [None]:
# model = QGModel.load_from_checkpoint('checkpoints/best-checkpoint-v42.ckpt')
model = QGModel()
trainer.fit(model, data_module)
trainer.test(model, data_module)

### Load model

In [None]:
import os

checkpoint_path = 'best-checkpoint-v5.ckpt'

# Check if the file exists before attempting to load
if os.path.exists(checkpoint_path):
    best_model = QGModel.load_from_checkpoint(checkpoint_path)
    best_model.freeze()
    best_model.eval()
    print("Model loaded successfully.")
else:
    print(f"Checkpoint file '{checkpoint_path}' not found.")


Model loaded successfully.


### Common functions

In [None]:
def generate(qgmodel: QGModel, answer: str, context: str) -> str:
    source_encoding = tokenizer(
        '{} {} {}'.format(answer, SEP_TOKEN, context),
        max_length=SOURCE_MAX_TOKEN_LEN,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        add_special_tokens=True,
        return_tensors='pt'
    ).to('cuda')

    qgmodel.model.to('cuda')

    generated_ids = qgmodel.model.generate(
        input_ids=source_encoding['input_ids'].to('cuda'),
        attention_mask=source_encoding['attention_mask'].to('cuda'),
        num_beams=1,
        max_length=TARGET_MAX_TOKEN_LEN,
        repetition_penalty=2.5,
        length_penalty=1.0,
        early_stopping=True,
        use_cache=True
    )

    preds = {
        tokenizer.decode(generated_id, skip_special_tokens=False, clean_up_tokenization_spaces=True)
        for generated_id in generated_ids
    }

    return ''.join(preds)

In [None]:
def show_result(generated: str, answer: str, topic:str,sub_topic:str):
    print('Generated: ', generated)
    print()
    print('Answer: ', answer)
    print('Conext: ', topic)
    print('Conext: ', sub_topic)
    print('-----------------------------')

### View results manually

In [None]:
sample_question = test_dataset_1.iloc[12]


generated = generate(best_model, sample_question['correct_answer'], sample_question['context'])
show_result(generated, sample_question['correct_answer'], sample_question['context'], sample_question['question'])

#### Answer-aware question generation

In [None]:
for i in range(len(test_dataset_1[:10])):
    context = test_dataset_1.iloc[i]['context']
    answer = test_dataset_1.iloc[i]['correct_answer']

    generated = generate(best_model, answer, context)

    show_result(generated, answer, context, test_dataset_1.iloc[i]['question'])

#### Generating both answer and question

In [None]:
for i in range(len(test_dataset_1[:10])):
    context = test_dataset_1.iloc[i]['context']
    original_answer = test_dataset_1.iloc[i]['correct_answer']
    input_answer = '[MASK]'

    generated = generate(best_model, input_answer, context)

    show_result(generated, original_answer, context, test_dataset_1.iloc[i]['question'])

**Evaluation**

In [None]:
def calculate_bleu(reference, candidate):
    reference = [reference.split()]
    candidate = candidate.split()
    return sentence_bleu(reference, candidate, smoothing_function=SmoothingFunction().method4)

def calculate_rouge_n(reference, generated, n=1):
    reference_tokens = word_tokenize(reference)
    generated_tokens = word_tokenize(generated)

    reference_ngrams = list(ngrams(reference_tokens, n))
    generated_ngrams = list(ngrams(generated_tokens, n))

    # Calculate overlap (common n-grams)
    overlapping_ngrams = set(reference_ngrams) & set(generated_ngrams)

    # Calculate precision, recall, and F1-score
    precision = len(overlapping_ngrams) / len(generated_ngrams) if len(generated_ngrams) > 0 else 0
    recall = len(overlapping_ngrams) / len(reference_ngrams) if len(reference_ngrams) > 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1_score

