# Question Generator example


First we need to install HuggingFace's transformers library.

In [None]:
!pip install transformers

Collecting transformers
[?25l  Downloading https://files.pythonhosted.org/packages/27/3c/91ed8f5c4e7ef3227b4119200fc0ed4b4fd965b1f0172021c25701087825/transformers-3.0.2-py3-none-any.whl (769kB)
[K     |████████████████████████████████| 778kB 2.8MB/s 
Collecting sacremoses
[?25l  Downloading https://files.pythonhosted.org/packages/7d/34/09d19aff26edcc8eb2a01bed8e98f13a1537005d31e95233fd48216eed10/sacremoses-0.0.43.tar.gz (883kB)
[K     |████████████████████████████████| 890kB 16.2MB/s 
Collecting tokenizers==0.8.1.rc1
[?25l  Downloading https://files.pythonhosted.org/packages/40/d0/30d5f8d221a0ed981a186c8eb986ce1c94e3a6e87f994eae9f4aa5250217/tokenizers-0.8.1rc1-cp36-cp36m-manylinux1_x86_64.whl (3.0MB)
[K     |████████████████████████████████| 3.0MB 13.9MB/s 
Collecting sentencepiece!=0.1.92
[?25l  Downloading https://files.pythonhosted.org/packages/d4/a4/d0a884c4300004a78cca907a6ff9a5e9fe4f090f5d95ab341c53d28cbc58/sentencepiece-0.1.91-cp36-cp36m-manylinux1_x86_64.whl (1.1MB)
[K 

Next we have to clone the github repo and import `questiongenerator`:

In [None]:
!git clone https://github.com/amontgomerie/question_generator/

Cloning into 'question_generator'...
remote: Enumerating objects: 80, done.[K
remote: Counting objects: 100% (80/80), done.[K
remote: Compressing objects: 100% (77/77), done.[K
remote: Total 80 (delta 36), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (80/80), done.


In [2]:
!pip install spacy

Collecting spacy
  Downloading spacy-3.0.5-cp38-cp38-win_amd64.whl (11.8 MB)
Collecting wasabi<1.1.0,>=0.8.1
  Downloading wasabi-0.8.2-py3-none-any.whl (23 kB)
Collecting murmurhash<1.1.0,>=0.28.0
  Downloading murmurhash-1.0.5-cp38-cp38-win_amd64.whl (21 kB)
Collecting blis<0.8.0,>=0.4.0
  Downloading blis-0.7.4-cp38-cp38-win_amd64.whl (6.5 MB)
Collecting pathy>=0.3.5
  Downloading pathy-0.4.0-py3-none-any.whl (36 kB)
Collecting spacy-legacy<3.1.0,>=3.0.0
  Downloading spacy_legacy-3.0.1-py2.py3-none-any.whl (7.0 kB)
Collecting preshed<3.1.0,>=3.0.2
  Downloading preshed-3.0.5-cp38-cp38-win_amd64.whl (112 kB)
Collecting catalogue<2.1.0,>=2.0.1
  Downloading catalogue-2.0.1-py3-none-any.whl (9.6 kB)
Collecting srsly<3.0.0,>=2.4.0
  Downloading srsly-2.4.0-cp38-cp38-win_amd64.whl (451 kB)
Collecting pydantic<1.8.0,>=1.7.1
  Downloading pydantic-1.7.3-cp38-cp38-win_amd64.whl (1.8 MB)
Collecting typer<0.4.0,>=0.3.0
  Downloading typer-0.3.2-py3-none-any.whl (21 kB)
Collecting thinc<8.1.0

In [6]:
# %load questiongenerator.py
import os
import sys
import math
import numpy as np
import torch
import spacy
import re
import random
import json
import en_core_web_sm
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    AutoModelForSequenceClassification,
)


class QuestionGenerator:
    def __init__(self, model_dir=None):

        QG_PRETRAINED = "iarfmoose/t5-base-question-generator"
        self.ANSWER_TOKEN = "<answer>"
        self.CONTEXT_TOKEN = "<context>"
        self.SEQ_LENGTH = 512

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.qg_tokenizer = AutoTokenizer.from_pretrained(QG_PRETRAINED, use_fast=False)
        self.qg_model = AutoModelForSeq2SeqLM.from_pretrained(QG_PRETRAINED)
        self.qg_model.to(self.device)

        self.qa_evaluator = QAEvaluator(model_dir)

    def generate(
        self, article, use_evaluator=True, num_questions=None, answer_style="all"
    ):

        print("Generating questions...\n")

        qg_inputs, qg_answers = self.generate_qg_inputs(article, answer_style)
        generated_questions = self.generate_questions_from_inputs(qg_inputs)

        message = "{} questions doesn't match {} answers".format(
            len(generated_questions), len(qg_answers)
        )
        assert len(generated_questions) == len(qg_answers), message

        if use_evaluator:

            print("Evaluating QA pairs...\n")

            encoded_qa_pairs = self.qa_evaluator.encode_qa_pairs(
                generated_questions, qg_answers
            )
            scores = self.qa_evaluator.get_scores(encoded_qa_pairs)
            if num_questions:
                qa_list = self._get_ranked_qa_pairs(
                    generated_questions, qg_answers, scores, num_questions
                )
            else:
                qa_list = self._get_ranked_qa_pairs(
                    generated_questions, qg_answers, scores
                )

        else:
            print("Skipping evaluation step.\n")
            qa_list = self._get_all_qa_pairs(generated_questions, qg_answers)

        return qa_list

    def generate_qg_inputs(self, text, answer_style):

        VALID_ANSWER_STYLES = ["all", "sentences", "multiple_choice"]

        if answer_style not in VALID_ANSWER_STYLES:
            raise ValueError(
                "Invalid answer style {}. Please choose from {}".format(
                    answer_style, VALID_ANSWER_STYLES
                )
            )

        inputs = []
        answers = []

        if answer_style == "sentences" or answer_style == "all":
            segments = self._split_into_segments(text)
            for segment in segments:
                sentences = self._split_text(segment)
                prepped_inputs, prepped_answers = self._prepare_qg_inputs(
                    sentences, segment
                )
                inputs.extend(prepped_inputs)
                answers.extend(prepped_answers)

        if answer_style == "multiple_choice" or answer_style == "all":
            sentences = self._split_text(text)
            prepped_inputs, prepped_answers = self._prepare_qg_inputs_MC(sentences)
            inputs.extend(prepped_inputs)
            answers.extend(prepped_answers)

        return inputs, answers

    def generate_questions_from_inputs(self, qg_inputs):
        generated_questions = []

        for qg_input in qg_inputs:
            question = self._generate_question(qg_input)
            generated_questions.append(question)

        return generated_questions

    def _split_text(self, text):
        MAX_SENTENCE_LEN = 128

        sentences = re.findall(".*?[.!\?]", text)

        cut_sentences = []
        for sentence in sentences:
            if len(sentence) > MAX_SENTENCE_LEN:
                cut_sentences.extend(re.split("[,;:)]", sentence))
        # temporary solution to remove useless post-quote sentence fragments
        cut_sentences = [s for s in sentences if len(s.split(" ")) > 5]
        sentences = sentences + cut_sentences

        return list(set([s.strip(" ") for s in sentences]))

    def _split_into_segments(self, text):
        MAX_TOKENS = 490

        paragraphs = text.split("\n")
        tokenized_paragraphs = [
            self.qg_tokenizer(p)["input_ids"] for p in paragraphs if len(p) > 0
        ]

        segments = []
        while len(tokenized_paragraphs) > 0:
            segment = []
            while len(segment) < MAX_TOKENS and len(tokenized_paragraphs) > 0:
                paragraph = tokenized_paragraphs.pop(0)
                segment.extend(paragraph)
            segments.append(segment)
        return [self.qg_tokenizer.decode(s) for s in segments]

    def _prepare_qg_inputs(self, sentences, text):
        inputs = []
        answers = []

        for sentence in sentences:
            qg_input = "{} {} {} {}".format(
                self.ANSWER_TOKEN, sentence, self.CONTEXT_TOKEN, text
            )
            inputs.append(qg_input)
            answers.append(sentence)

        return inputs, answers

    def _prepare_qg_inputs_MC(self, sentences):

        spacy_nlp = en_core_web_sm.load()
        docs = list(spacy_nlp.pipe(sentences, disable=["parser"]))
        inputs_from_text = []
        answers_from_text = []

        for i in range(len(sentences)):
            entities = docs[i].ents
            if entities:
                for entity in entities:
                    qg_input = "{} {} {} {}".format(
                        self.ANSWER_TOKEN, entity, self.CONTEXT_TOKEN, sentences[i]
                    )
                    answers = self._get_MC_answers(entity, docs)
                    inputs_from_text.append(qg_input)
                    answers_from_text.append(answers)

        return inputs_from_text, answers_from_text

    def _get_MC_answers(self, correct_answer, docs):

        entities = []
        for doc in docs:
            entities.extend([{"text": e.text, "label_": e.label_} for e in doc.ents])

        # remove duplicate elements
        entities_json = [json.dumps(kv) for kv in entities]
        pool = set(entities_json)
        num_choices = (
            min(4, len(pool)) - 1
        )  # -1 because we already have the correct answer

        # add the correct answer
        final_choices = []
        correct_label = correct_answer.label_
        final_choices.append({"answer": correct_answer.text, "correct": True})
        pool.remove(
            json.dumps({"text": correct_answer.text, "label_": correct_answer.label_})
        )

        # find answers with the same NER label
        matches = [e for e in pool if correct_label in e]

        # if we don't have enough then add some other random answers
        if len(matches) < num_choices:
            choices = matches
            pool = pool.difference(set(choices))
            choices.extend(random.sample(pool, num_choices - len(choices)))
        else:
            choices = random.sample(matches, num_choices)

        choices = [json.loads(s) for s in choices]
        for choice in choices:
            final_choices.append({"answer": choice["text"], "correct": False})
        random.shuffle(final_choices)
        return final_choices

    def _generate_question(self, qg_input):
        self.qg_model.eval()
        encoded_input = self._encode_qg_input(qg_input)
        with torch.no_grad():
            output = self.qg_model.generate(input_ids=encoded_input["input_ids"])
        question = self.qg_tokenizer.decode(output[0], skip_special_tokens=True)
        return question

    def _encode_qg_input(self, qg_input):
        return self.qg_tokenizer(
            qg_input,
            padding='max_length',
            max_length=self.SEQ_LENGTH,
            truncation=True,
            return_tensors="pt",
        ).to(self.device)

    def _get_ranked_qa_pairs(
        self, generated_questions, qg_answers, scores, num_questions=10
    ):
        if num_questions > len(scores):
            num_questions = len(scores)
            print(
                "\nWas only able to generate {} questions. For more questions, please input a longer text.".format(
                    num_questions
                )
            )

        qa_list = []
        for i in range(num_questions):
            index = scores[i]
            qa = self._make_dict(
                generated_questions[index].split("?")[0] + "?", qg_answers[index]
            )
            qa_list.append(qa)
        return qa_list

    def _get_all_qa_pairs(self, generated_questions, qg_answers):
        qa_list = []
        for i in range(len(generated_questions)):
            qa = self._make_dict(
                generated_questions[i].split("?")[0] + "?", qg_answers[i]
            )
            qa_list.append(qa)
        return qa_list

    def _make_dict(self, question, answer):
        qa = {}
        qa["question"] = question
        qa["answer"] = answer
        return qa


class QAEvaluator:
    def __init__(self, model_dir=None):

        QAE_PRETRAINED = "iarfmoose/bert-base-cased-qa-evaluator"
        self.SEQ_LENGTH = 512

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.qae_tokenizer = AutoTokenizer.from_pretrained(QAE_PRETRAINED)
        self.qae_model = AutoModelForSequenceClassification.from_pretrained(
            QAE_PRETRAINED
        )
        self.qae_model.to(self.device)

    def encode_qa_pairs(self, questions, answers):
        encoded_pairs = []
        for i in range(len(questions)):
            encoded_qa = self._encode_qa(questions[i], answers[i])
            encoded_pairs.append(encoded_qa.to(self.device))
        return encoded_pairs

    def get_scores(self, encoded_qa_pairs):
        scores = {}
        self.qae_model.eval()
        with torch.no_grad():
            for i in range(len(encoded_qa_pairs)):
                scores[i] = self._evaluate_qa(encoded_qa_pairs[i])

        return [
            k for k, v in sorted(scores.items(), key=lambda item: item[1], reverse=True)
        ]

    def _encode_qa(self, question, answer):
        if type(answer) is list:
            for a in answer:
                if a["correct"]:
                    correct_answer = a["answer"]
        else:
            correct_answer = answer
        return self.qae_tokenizer(
            text=question,
            text_pair=correct_answer,
            padding="max_length",
            max_length=self.SEQ_LENGTH,
            truncation=True,
            return_tensors="pt",
        )

    def _evaluate_qa(self, encoded_qa_pair):
        output = self.qae_model(**encoded_qa_pair)
        return output[0][0][1]


def print_qa(qa_list, show_answers=True):
    for i in range(len(qa_list)):
        space = " " * int(np.where(i < 9, 3, 4))  # wider space for 2 digit q nums

        print("{}) Q: {}".format(i + 1, qa_list[i]["question"]))

        answer = qa_list[i]["answer"]

        # print a list of multiple choice answers
        if type(answer) is list:

            if show_answers:
                print(
                    "{}A: 1.".format(space),
                    answer[0]["answer"],
                    np.where(answer[0]["correct"], "(correct)", ""),
                )
                for j in range(1, len(answer)):
                    print(
                        "{}{}.".format(space + "   ", j + 1),
                        answer[j]["answer"],
                        np.where(answer[j]["correct"] == True, "(correct)", ""),
                    )

            else:
                print("{}A: 1.".format(space), answer[0]["answer"])
                for j in range(1, len(answer)):
                    print("{}{}.".format(space + "   ", j + 1), answer[j]["answer"])
            print("")

        # print full sentence answers
        else:
            if show_answers:
                print("{}A:".format(space), answer, "\n")


Make sure that we're using the GPU:

In [8]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
assert device == torch.device('cuda'), "Not using CUDA. Set: Runtime > Change runtime type > Hardware Accelerator: GPU"

AssertionError: Not using CUDA. Set: Runtime > Change runtime type > Hardware Accelerator: GPU


Now we can create a `QuestionGenerator` and feed it some text. We are going to use a BBC article about Twitter getting hacked.

The models should be automatically loaded when instantiating the `QuestionGenerator` class, but if you have them saved somewhere else you can pass the path to the folder containing them as an argument like `QuestionGenerator(MODEL_DIR)`.

In [16]:
qg = QuestionGenerator()


In [17]:
with open('articles/owl_rescue.txt', 'r') as a:
    article = a.read()

Now We can call `QuestionGenerator`'s `generate()` method. We can choose an answer style from `['all', 'sentences', 'multiple_choice']`. 

You can choose how many questions you want to generate by setting `num_questions`. Note that the quality of questions may decrease if `num_questions` is high.

If you just want to print the questions without showing the answers, you can optionally set `show_answers=False` when calling `print_qa()`.

In [15]:
qa_list = qg.generate(
    article, 
    num_questions=1, 
    answer_style='sentences'
)
print_qa(qa_list)

Generating questions...

Evaluating QA pairs...

1) Q: How many people were involved in the rescue?
   A: </s> The eagle owl rescue involved a team of 12 firefighters, plus a six-member volunteer technical team and two staff from the nearby bat centre, the fire service reported (in German). 

