In [None]:
!pip install transformers datasets

In [2]:
from typing import Tuple, Mapping
import pandas as pd
import torch
from transformers import AutoTokenizer
import datasets

class QGDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        data: datasets.Dataset,
        max_length: int,
        pad_mask_id: int,
        tokenizer: AutoTokenizer
    ) -> None:
        self.data = pd.DataFrame(data)
        self.max_length = max_length
        self.pad_mask_id = pad_mask_id
        self.tokenizer = tokenizer

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, index: int) -> Mapping[str, torch.Tensor]:
        item = self.data.loc[index]
        input_ids, attention_mask = self._encode_text(item.text)
        labels, _ = self._encode_text(item.question)
        masked_labels = self._mask_label_padding(labels)
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": masked_labels
        }

    def _encode_text(self, text: str) -> Tuple[torch.Tensor, torch.Tensor]:
        encoded_text = self.tokenizer(
            text,
            padding="max_length",
            max_length=self.max_length,
            truncation=True,
            return_tensors='pt'
        )
        return (
            encoded_text["input_ids"].squeeze(),
            encoded_text["attention_mask"].squeeze()
        )

    def _mask_label_padding(self, labels: torch.Tensor) -> torch.Tensor:
        labels[labels == self.tokenizer.pad_token_id] = self.pad_mask_id
        return labels

In [1]:
import numpy as np
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    AutoModelForSequenceClassification,
)
from typing import Any, List, Mapping, Tuple


class QuestionGenerator:

    def __init__(self) -> None:

        QG_PRETRAINED = "drive/MyDrive/final582/models/version1/IAmA-question-generator"
        self.TOPIC_TOKEN = "<topic>"
        self.CONTEXT_TOKEN = "<context>"
        self.SEQ_LENGTH = 512

        self.device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu")

        self.qg_tokenizer = AutoTokenizer.from_pretrained(
            QG_PRETRAINED, use_fast=False)
        self.qg_model = AutoModelForSeq2SeqLM.from_pretrained(QG_PRETRAINED)
        self.qg_model.to(self.device)
        self.qg_model.eval()

        self.qa_evaluator = QAEvaluator()

    def generate(
        self,
        qg_inputs: str,
        use_evaluator: bool = False,
        num_questions: bool = None
    ) -> List:
        """Takes an article and generates a set of question and answer pairs. If use_evaluator
        is True then QA pairs will be ranked and filtered based on their quality. answer_style
        should selected from ["all", "sentences", "multiple_choice"].
        """

        print("Generating questions...\n")

        qg_answers = qg_inputs
        generated_questions = self.generate_questions_from_inputs(qg_inputs)

        # message = "{} questions doesn't match {} answers".format(
        #     len(generated_questions), len(qg_answers)
        # )
        # assert len(generated_questions) == len(qg_answers), message

        if use_evaluator:
            print("Evaluating QA pairs...\n")
            encoded_qa_pairs = self.qa_evaluator.encode_qa_pairs(
                generated_questions, qg_answers
            )
            scores = self.qa_evaluator.get_scores(encoded_qa_pairs)

            if num_questions:
                qa_list = self._get_ranked_qa_pairs(
                    generated_questions, qg_answers, scores, num_questions
                )
            else:
                qa_list = self._get_ranked_qa_pairs(
                    generated_questions, qg_answers, scores
                )

        else:
            print("Skipping evaluation step.\n")
            qa_list = self._get_all_qa_pairs(generated_questions, qg_inputs)

        return qa_list

    def generate_qg_inputs(self, text: str) -> Tuple[List[str], List[str]]:
        """Given a text, returns a list of model inputs and a list of corresponding answers.
        Model inputs take the form "TOPIC_TOKEN <answer text> context_token <context text>" where
        the answer is a string extracted from the text, and the context is the wider text surrounding
        the context.
        """
        inputs = []
        answers = []
        segments = self._split_into_segments(text)

        for segment in segments:
            sentences = self._split_text(segment)
            prepped_inputs, prepped_answers = self._prepare_qg_inputs(
                sentences, segment
            )
            inputs.extend(prepped_inputs)
            answers.extend(prepped_answers)

        return inputs, answers

    def generate_questions_from_inputs(self, qg_inputs: List) -> List[str]:
        """Given a list of concatenated answers and contexts, with the form:
        "TOPIC_TOKEN <answer text> context_token <context text>", generates a list of 
        questions.
        """
        generated_questions = []

        for qg_input in qg_inputs:
            question = self._generate_question(qg_input)
            generated_questions.append(question)

        return generated_questions



    @torch.no_grad()
    def _generate_question(self, qg_input: str) -> str:
        """Takes qg_input which is the concatenated answer and context, and uses it to generate
        a question sentence. The generated question is decoded and then returned.
        """
        encoded_input = self._encode_qg_input(qg_input)
        output = self.qg_model.generate(input_ids=encoded_input["input_ids"])
        question = self.qg_tokenizer.decode(
            output[0],
            skip_special_tokens=True
        )
        return question

    def _encode_qg_input(self, qg_input: str) -> torch.tensor:
        """Tokenizes a string and returns a tensor of input ids corresponding to indices of tokens in 
        the vocab.
        """
        return self.qg_tokenizer(
            qg_input,
            padding='max_length',
            max_length=self.SEQ_LENGTH,
            truncation=True,
            return_tensors="pt",
        ).to(self.device)

    def _get_ranked_qa_pairs(
        self, generated_questions: List[str], qg_answers: List[str], scores, num_questions: int = 10
    ) -> List[Mapping[str, str]]:
        """Ranks generated questions according to scores, and returns the top num_questions examples.
        """
        if num_questions > len(scores):
            num_questions = len(scores)
            print((
                f"\nWas only able to generate {num_questions} questions.",
                "For more questions, please input a longer text.")
            )

        qa_list = []

        for i in range(num_questions):
            index = scores[i]
            qa = {
                "question": generated_questions[index].split("?")[0] + "?",
                "answer": qg_answers[index]
            }
            qa_list.append(qa)

        return qa_list

    def _get_all_qa_pairs(self, generated_questions: List[str], qg_answers: List[str]):
        """Formats question and answer pairs without ranking or filtering."""
        qa_list = []

        for question, answer in zip(generated_questions, qg_answers):
            qa = {
                "question": question.split("?")[0] + "?",
                "answer": answer
            }
            qa_list.append(qa)

        return qa_list


class QAEvaluator:
    """Wrapper for a transformer model which evaluates the quality of question-answer pairs.
    Given a QA pair, the model will generate a score. Scores can be used to rank and filter
    QA pairs.
    """

    def __init__(self) -> None:

        QAE_PRETRAINED = "iarfmoose/bert-base-cased-qa-evaluator"
        self.SEQ_LENGTH = 512

        self.device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu")

        self.qae_tokenizer = AutoTokenizer.from_pretrained(QAE_PRETRAINED)
        self.qae_model = AutoModelForSequenceClassification.from_pretrained(
            QAE_PRETRAINED
        )
        self.qae_model.to(self.device)
        self.qae_model.eval()

    def encode_qa_pairs(self, questions: List[str], answers: List[str]) -> List[torch.tensor]:
        """Takes a list of questions and a list of answers and encodes them as a list of tensors."""
        encoded_pairs = []

        for question, answer in zip(questions, answers):
            encoded_qa = self._encode_qa(question, answer)
            encoded_pairs.append(encoded_qa.to(self.device))

        return encoded_pairs

    def get_scores(self, encoded_qa_pairs: List[torch.tensor]) -> List[float]:
        """Generates scores for a list of encoded QA pairs."""
        scores = {}

        for i in range(len(encoded_qa_pairs)):
            scores[i] = self._evaluate_qa(encoded_qa_pairs[i])

        return [
            k for k, v in sorted(scores.items(), key=lambda item: item[1], reverse=True)
        ]

    def _encode_qa(self, question: str, answer: str) -> torch.tensor:
        """Concatenates a question and answer, and then tokenizes them. Returns a tensor of 
        input ids corresponding to indices in the vocab.
        """
        if type(answer) is list:
            for a in answer:
                if a["correct"]:
                    correct_answer = a["answer"]
        else:
            correct_answer = answer

        return self.qae_tokenizer(
            text=question,
            text_pair=correct_answer,
            padding="max_length",
            max_length=self.SEQ_LENGTH,
            truncation=True,
            return_tensors="pt",
        )

    @torch.no_grad()
    def _evaluate_qa(self, encoded_qa_pair: torch.tensor) -> float:
        """Takes an encoded QA pair and returns a score."""
        output = self.qae_model(**encoded_qa_pair)
        return output[0][0][1]


def print_qa(qa_list: List[Mapping[str, str]], show_answers: bool = True) -> None:
    """Formats and prints a list of generated questions and answers."""

    for i in range(len(qa_list)):
        # wider space for 2 digit q nums
        space = " " * int(np.where(i < 9, 3, 4))

        print(f"{i + 1}) Q: {qa_list[i]['question']}")

        answer = qa_list[i]["answer"]

        # print a list of multiple choice answers
        if type(answer) is list:

            if show_answers:
                print(
                    f"{space}A: 1. {answer[0]['answer']} "
                    f"{np.where(answer[0]['correct'], '(correct)', '')}"
                )
                for j in range(1, len(answer)):
                    print(
                        f"{space + '   '}{j + 1}. {answer[j]['answer']} "
                        f"{np.where(answer[j]['correct']==True,'(correct)', '')}"
                    )

            else:
                print(f"{space}A: 1. {answer[0]['answer']}")
                for j in range(1, len(answer)):
                    print(f"{space + '   '}{j + 1}. {answer[j]['answer']}")

            print("")

        # print full sentence answers
        else:
            if show_answers:
                print(f"{space}A: {answer}\n")


/Users/ehsanulkabir/question_generator


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
assert device == torch.device('cuda'), "Not using CUDA. Set: Runtime > Change runtime type > Hardware Accelerator: GPU"

dataset = datasets.load_dataset("ehsanul007/IAmA-question-generator")
qg = QGGenerator()

Downloading and preparing dataset csv/ehsanul007--IAmA-question-generator to /Users/ehsanulkabir/.cache/huggingface/datasets/ehsanul007___csv/ehsanul007--IAmA-question-generator-8b5b8739bed0e292/0.0.0/6954658bab30a358235fa864b05cf819af0e179325c740e4bc853bcc7ec513e1...


Downloading data: 100%|██████████| 5.02M/5.02M [00:00<00:00, 22.6MB/s]
Downloading data: 100%|██████████| 1.15M/1.15M [00:00<00:00, 10.8MB/s]
Downloading data: 100%|██████████| 4.96M/4.96M [00:00<00:00, 22.6MB/s]
Downloading data files: 100%|██████████| 2/2 [00:01<00:00,  1.37it/s]
Extracting data files: 100%|██████████| 2/2 [00:00<00:00, 965.21it/s]
                                                             

Dataset csv downloaded and prepared to /Users/ehsanulkabir/.cache/huggingface/datasets/ehsanul007___csv/ehsanul007--IAmA-question-generator-8b5b8739bed0e292/0.0.0/6954658bab30a358235fa864b05cf819af0e179325c740e4bc853bcc7ec513e1. Subsequent calls will reuse this data.


100%|██████████| 2/2 [00:00<00:00, 993.09it/s]


In [12]:
# load sample values from dataset
text = dataset["train"]["text"][:10]

In [11]:
qg.generate(text)

['<topic> Vegan lifestyle. <context> I am a VIP Host at a Las Vegas Strip Club! A strip club is a venue where strippers provide adult entertainment, predominantly in the form of striptease or other erotic dances. Strip clubs typically adopt a nightclub or bar style, and can also adopt a theatre or cabaret-style. American-style strip clubs began to appear outside North America after World War II, arriving in Asia in the late 1980s and Europe in 1978, where they competed against the local English and French styles of striptease and erotic performances.',
 "<topic> DLC policy <context> We are Haemimont Games, Developers of Tropico and Surviving Mars. Haemimont Games AD is a Bulgarian video game developer founded by Gabriel Dobrev in September 1997 and based in Sofia. The company primarily focuses on producing simulation, city-building and ancient history strategy games but has developed titles in the role-playing genre as well. It has about 60 employees, making it Bulgaria's largest video

In [None]:
qg = QuestionGenerator()

with open('articles/indian_matchmaking.txt', 'r') as a:
    article = a.read()

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=1208.0, style=ProgressStyle(description…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=791656.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=39.0, style=ProgressStyle(description_w…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=121.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=25.0, style=ProgressStyle(description_w…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=891612736.0, style=ProgressStyle(descri…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=482.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=213450.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=112.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=49.0, style=ProgressStyle(description_w…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=433295505.0, style=ProgressStyle(descri…




Now We can call `QuestionGenerator`'s `generate()` method. We can choose an answer style from `['all', 'sentences', 'multiple_choice']`. 

You can choose how many questions you want to generate by setting `num_questions`. Note that the quality of questions may decrease if `num_questions` is high.

If you just want to print the questions without showing the answers, you can optionally set `show_answers=False` when calling `print_qa()`.

In [None]:
qa_list = qg.generate(
    article, 
    num_questions=10, 
    answer_style='all'
)
print_qa(qa_list)

Generating questions...

Evaluating QA pairs...

1) Q: What would have been offended if Sima Aunty spoke about?
   A: In fact, I would have been offended if Sima Aunty was woke and spoke about choice, body positivity and clean energy during matchmaking. 

2) Q: What does she think of Indian Matchmaking?
   A: " Ms Vetticad describes Indian Matchmaking as "occasionally insightful" and says "parts of it are hilarious because Ms Taparia's clients are such characters and she herself is so unaware of her own regressive mindset". 

3) Q: What do parents do to find a suitable match?
   A: Parents also trawl through matrimonial columns in newspapers to find a suitable match for their children. 

4) Q: In what country does Sima taparia try to find suitable matches for her wealthy clients?
   A: 1. Sima Aunty 
      2. US (correct)
      3. Delhi 
      4. Netflix 

5) Q: What is the reason why she is being called out?
   A: No wonder, then, that critics have called her out on social media for p