# Setup

Install the requirements needed to execute this notebook.

In [None]:
!pip install datasets=='1.9.0' 
!pip install transformers
!pip install pytorch-lightning
!pip install git+https://github.com/Maluuba/nlg-eval.git@master #evaluation package

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting huggingface-hub<0.1.0
  Using cached huggingface_hub-0.0.19-py3-none-any.whl (56 kB)
Installing collected packages: huggingface-hub
  Attempting uninstall: huggingface-hub
    Found existing installation: huggingface-hub 0.7.0
    Uninstalling huggingface-hub-0.7.0:
      Successfully uninstalled huggingface-hub-0.7.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
transformers 4.19.2 requires huggingface-hub<1.0,>=0.1.0, but you have huggingface-hub 0.0.19 which is incompatible.[0m
Successfully installed huggingface-hub-0.0.19
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting huggingface-hub<1.0,>=0.1.0
  Using cached huggingface_hub-0.7.0-py3-none-any.whl (86 kB)
Installing collected pa

Import the required python packages.

In [None]:
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch import LongTensor
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning import LightningDataModule, LightningModule, Trainer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from datasets import load_dataset
from argparse import ArgumentParser
from nlgeval import compute_individual_metrics, NLGEval
from google.colab import files, drive
from os import listdir, makedirs
from os.path import isfile, join
from string import punctuation
from re import sub
import json
import copy

Set the path to the folder containing the checkpoints and the test dataset. In this case, the folder will be mounted on Google drive, but the path can be set to a local session.

In [None]:
drive.mount('/content/drive')
folder_path = 'drive/MyDrive/Models tfg/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Model hyperparameters

The global variables of the notebook will be defined in this section.

The next cell specifies whether to train or test the model needs to be performed.

In [None]:
TRAIN_MODEL = True
TEST_MODEL = True

Those hyperparameters that are relevant for any model are listed in the following cell. The *PRETRAINED_MODEL* variable indicates which pre-trained model will be used. The different models can be found at *huggingface.com*. Models that have been tried, for instance, are:
                 

*   'facebook/bart-base'
*   't5-small'
*   't5-base'
*   't5-efficient-base-n14'

In [None]:
PRETRAINED_MODEL = 't5-small'
HIGHLIGHT_TOKEN = '[HL]' #token used to highlight the answer inside the context.
LEARNING_RATE = 0.0001
MAX_INPUT_LENGTH = 512
MAX_LABEL_LENGTH = 64

# Load and preprocess Datasets


The notebook will load the training datasets and perform a basic preprocessing of the data in this section.

## Read datasets

Load the train and validation datasets.

In [None]:
datasets = load_dataset("squad")

Downloading:   0%|          | 0.00/1.95k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

Downloading and preparing dataset squad/plain_text (download: 33.51 MiB, generated: 85.63 MiB, post-processed: Unknown size, total: 119.14 MiB) to /root/.cache/huggingface/datasets/squad/plain_text/1.0.0/6b6c4172d0119c74515f44ea0b8262efe4897f2ddb6613e5e915840fdc309c16...


Downloading:   0%|          | 0.00/8.12M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.05M [00:00<?, ?B/s]

0 examples [00:00, ? examples/s]

0 examples [00:00, ? examples/s]

Dataset squad downloaded and prepared to /root/.cache/huggingface/datasets/squad/plain_text/1.0.0/6b6c4172d0119c74515f44ea0b8262efe4897f2ddb6613e5e915840fdc309c16. Subsequent calls will reuse this data.


Load the test dataset from the folder_path.

In [None]:
with open(folder_path + 'test-v2.SQuAD.json', encoding='utf-8') as data:
    test = json.load(data)['data']

Datasets follow a complex format. Therefore, the next functions simplify them.

The datasets will be converted from a json format to a list. Each element of the list is a dataset entry, which is a dictionary with the keys 'context', 'question' and 'answer'. Questions without an answer will be discarded.

As a result, the final format is as follows:



> **[{'context':context1, 'answer':answer1, 'question':question1}, {'context':context2, 'answer':answer2, 'question':question2},...}]**




In [None]:
# Transform a SQuAD validation or training dataset from the original format to 
# a list where each entry looks like:
#             {'context':context1, 'answer':answer1, 'question':question1} 
def transform_dataset_format(dataset):
  dataset_aux = []
  for data in dataset:
      if(len(data['answers'])>0):
        dataset_aux.append({
            'context':data['context'],
            'answers':{
                'answer_start' : data['answers']['answer_start'][0],
                'text': data['answers']['text'][0]
            },
            'question':data['question']
            })
  return dataset_aux

# Transform a SQuAD test dataset from the original format to 
# a list where each entry looks like:
#             {'context':context1, 'answer':answer1, 'question':question1} 
def transform_dataset_format_test(dataset):
    dataset_aux = []
    for text in dataset:
        for paragraph in text['paragraphs']:
            for question in paragraph['qas']:
                if len(question['answers']) > 0:
                    dataset_aux.append({
                        'context': paragraph['context'],
                        'answers': question['answers'][0],
                        'question': question['question']
                        })

    return dataset_aux

The next cell transforms the datasets' formats.

In [None]:
train_dataset = transform_dataset_format(datasets['train'])
val_dataset = transform_dataset_format(datasets['validation'])
test_dataset = transform_dataset_format_test(test)

The main objective of the SQuAD dataset is to train question answering models, not to train question generation models. For this reason, there is a single question for each entry in the dataset.

This can pose a problem for evaluating the model. For instance, let the entry of the model be:


>* **context**: *Ada Colau, the current mayor of Barcelona, was born in 1974.*
* **answer**: *Ada Colau*
* **expected_question**: *Who is the current mayor of Barcelona?*

But let the question obtained from the model be:

> * **obtained_question**: *Who was born in 1974?*

Although the obtained question is also correct, it is different from the expected one. Therefore, a metric that only compares these two sentences will give a low value, which is undesired.

To avoid this from happening, the next function finds a list of related questions for each entry on the test dataset, by looping through the database to find entries with the same answer. This way, there will be multiple reference questions to compare with for each dataset entry.












In [None]:
# Function that returns a list, where each entry is a set of related questions
# to the corresponding entry in the test dataset. The test dataset needs to be 
# in its original format.
def get_related_questions(dataset):

    related_questions = []

    for text in dataset:
        for paragraph in text['paragraphs']:

            # For each entry in the dataset.
            for question in paragraph['qas']:

                # If the question has an answer.
                if len(question['answers']) > 0:

                    # Add the question to the related_questions set of the current entry.
                    questions = {question['question']}

                    # Save the possible answers to the question.
                    answers = { answer['text'] 
                               for answer in question['answers']}

                    for question2 in paragraph['qas']:
                        # Compare with the other entries in the same paragraph.

                        # Save the possible answers for the second question.
                        if len(question2['answers']) > 0:
                            answers2 = { answer['text'] 
                                        for answer in question2['answers']}
                        else:
                            answers2 = {answer['text'] 
                                        for answer in question2['plausible_answers']} 

                        # If the second questions shares an answer with the 
                        # first question, add the second question to the 
                        # related_questions set of the entry.
                        if len( set(answers) & set(answers2))>0:
                            questions.add(question2['question'])

                    related_questions.append(questions)

    return related_questions

Execute the above function.

In [None]:
related_questions = get_related_questions(test)
for i in range(len(test_dataset)):
  test_dataset[i]['question'] = list(related_questions[i])


For instance, next cell shows an example of a train dataset entry.

In [None]:
train_dataset[0]

{'answers': {'answer_start': 515, 'text': 'Saint Bernadette Soubirous'},
 'context': 'Architecturally, the school has a Catholic character. Atop the Main Building\'s gold dome is a golden statue of the Virgin Mary. Immediately in front of the Main Building and facing it, is a copper statue of Christ with arms upraised with the legend "Venite Ad Me Omnes". Next to the Main Building is the Basilica of the Sacred Heart. Immediately behind the basilica is the Grotto, a Marian place of prayer and reflection. It is a replica of the grotto at Lourdes, France where the Virgin Mary reputedly appeared to Saint Bernadette Soubirous in 1858. At the end of the main drive (and in a direct line that connects through 3 statues and the Gold Dome), is a simple, modern stone statue of Mary.',
 'question': 'To whom did the Virgin Mary allegedly appear in 1858 in Lourdes France?'}

## Tokenize Datasets

At this point, the entries of the datasets are dictionaries of strings with the desired format, but it is still needed to tokenize those strings. It is also needed to highlight the answer inside the context using the HIGHLIGHT_TOKEN.

First, it should exist a function capable of getting a tokenizer. A new token that will highlight the answer inside the token must be defined.

In [None]:
def prepare_tokenizer(pretrained_model):
    tokenizer = AutoTokenizer.from_pretrained(pretrained_model, model_max_length=MAX_INPUT_LENGTH)
    tokenizer.add_tokens([HIGHLIGHT_TOKEN], special_tokens=True)
    return tokenizer

The next cell is used to define the tokenize_database function, which tokenizes the given database and leaves it ready to be the input of the transformer.

In [None]:
# Highlight the answer inside the context using the HIGHLIGHT_TOKEN.
def highlight_answer(context, answer):
    answer_first_character = answer['answer_start']
    answer_last_character = answer_first_character + len(answer['text'])

    highlighted_context = context[:answer_first_character]
    highlighted_context = (highlighted_context + 
                           HIGHLIGHT_TOKEN + answer['text'] + HIGHLIGHT_TOKEN)
    highlighted_context = highlighted_context + context[answer_last_character:]

    return highlighted_context

# This function retokenizes a pad_token, i.e. if the given token is equal
# to the pad_token, it returns the given value. If any other token is given,
# it returns the token without any modifications.
def retokenize_pad_token( token, pad_token, value):
    if token != pad_token:
      return token
    else:
      return value

# For the given highlighted_context and label, it prepares the tokenized
# input for the transformer.
def tokenize_input_train(tokenizer, highlighted_context, label):

    # Prepare the input of the transformer.
    input = tokenizer(highlighted_context, 
                      padding='max_length',
                      max_length=MAX_INPUT_LENGTH, 
                      truncation=True, 
                      add_special_tokens=False)

    # Prepare the target of the transformer.
    target = tokenizer(label, 
                       padding='max_length',
                       max_length=MAX_LABEL_LENGTH,
                       truncation=True,
                       add_special_tokens=False)
    
    # Retokenize the padding tokens.
    tokenized_labels = [ retokenize_pad_token(token, tokenizer.pad_token_id, -100) 
                          for token in target['input_ids']]
    
    return {
        'input_ids': input['input_ids'],
        'attention_mask': input['attention_mask'],
        'labels': tokenized_labels
    }

# For the given highlighted_context, it prepares the tokenized
# input for the transformer.
def tokenize_input_test(tokenizer, highlighted_context):

    # Prepare the input of the transformer.
    input = tokenizer(highlighted_context, 
                      padding= False,
                      max_length=MAX_INPUT_LENGTH, 
                      truncation=True, 
                      add_special_tokens=False)

    return {
        'input_ids': input['input_ids'],
        'attention_mask': input['attention_mask']
    }

# Given a dict of tokenized inputs for the transformers, it converts them
# into torch tensors.
def convert_input_to_tensor(input):
    for key in input.keys():
        input[key] = LongTensor(input[key])

# Given an entry of a dataset, it tokenizes it. Depending on the is_test
# parameter, a tokenized label or a untokenized question will be returned.
def tokenize_entry(entry, is_test, tokenizer):

    highlighted_context = highlight_answer(entry['context'], entry['answers'])
    
    if is_test == False:
        input = tokenize_input_train(tokenizer,
                                     highlighted_context=highlighted_context,
                                     label=entry['question'] + tokenizer.eos_token)
        

        convert_input_to_tensor(input)

        return {'input_ids': input['input_ids'], 
                'attention_mask': input['attention_mask'],
                'labels': input['labels']}
    else:
        input = tokenize_input_test(tokenizer,
                                    highlighted_context=highlighted_context)

        convert_input_to_tensor(input)

        return {'input_ids': input['input_ids'], 
                'attention_mask': input['attention_mask'],
                'question': entry['question']}

# Given a database, it tokenizes each of its entries.
def tokenize_database(database, is_test, tokenizer):
    tokenized_database = []
    for i in range(len(database)):
        tokenized_database.append(tokenize_entry(database[i], is_test, tokenizer))
    return tokenized_database

Get a tokenizer for the pretrained model.

In [None]:
tokenizer = prepare_tokenizer(PRETRAINED_MODEL)

Execute the tokenization of the datasets.

In [None]:
tokenized_train_dataset = tokenize_database(train_dataset, False, tokenizer)
tokenized_val_dataset = tokenize_database(val_dataset, False, tokenizer)
tokenized_test_dataset = tokenize_database(test_dataset, True, tokenizer)

Example of a tokenized entry of the training set.

In [None]:
tokenized_train_dataset[0]

{'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0

# Pytorch lightning modules

In order to perform train, test and inference on a model using the pytorch lightning Trainer, two modules must be build:



*   **Data Module** : This module performs the task of wrapping the access to the datasets, by storing them and providing methods that return their entries.
*   **Model Module** : This module is a wrapper for the different steps that the model needs to perform in each phase of training and test. For instance, it should provide the test_step(), train_step() or forward() functions.

Both modules are defined in the subsections below.



## Data Module


In this section, the data module will be defined.

First, the next class implements the __getitem__ and __len__ operators for the databases.

In [None]:
class SQuADDataLoader:
    def __init__(self, dataset:list, is_test):
        self.dataset = dataset
        self.is_test = is_test

    def __getitem__(self, index):
        if self.is_test:
            return [self.dataset[index]['input_ids'], 
            self.dataset[index]['attention_mask'], 
            self.dataset[index]['question']]
        else:
            return [self.dataset[index]['input_ids'], 
            self.dataset[index]['attention_mask'], 
            self.dataset[index]['labels']]

    def __len__(self):
        return len(self.dataset)


Finally, the next class defines the data module for the pytorch lightning trainer, using the class defined in the cell above to store the databases.

In [None]:
class SQuADDataModule(LightningDataModule):
    def __init__(self, train, val, test, batch_size):
        super().__init__()
        self.batch_size = batch_size

        self.train = SQuADDataLoader(train, False)
        self.val = SQuADDataLoader(val, False)
        self.test = SQuADDataLoader(test, True)

    def train_dataloader(self):
        return DataLoader(self.train, num_workers=8, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val,num_workers = 8, batch_size=self.batch_size, shuffle=False)

    def test_dataloader(self):
        return DataLoader(self.test,num_workers = 8, batch_size=1, shuffle=False)

## Model Module


In this section, the model module will be defined.

The next class implements the metrics evaluation of an obtained output and a set of expected outputs.

In [None]:
class evaluator:
    def __init__(self, eval):
        self.eval = eval
        self.total = 0                      # Total number of entries evaluated.
        self.total_metrics = {'Bleu_1': 0,  # Summatory of each metric.
          'Bleu_2': 0,
          'Bleu_3': 0,
          'Bleu_4': 0,
          'METEOR': 0,
          'ROUGE_L': 0} 
        self.total_exact_match = 0          # Exact match account.

    # It updates the total_metrics dict and total_exact_match 
    # from the given obtained and expected outputs.
    def update_total_metrics(self, obtained_output, expected_output):

        metrics_dict = self.nlg_eval(obtained_output, expected_output)
        exact_match = self.exact_match_score(obtained_output, expected_output)

        self.total += 1
        self.total_exact_match += exact_match
        for key in metrics_dict.keys():
            self.total_metrics[key] += metrics_dict[key]

    # Return the total metrics stored in the class in form of percentage.
    # (By dividing the summatories by the number of evaluated entries).
    def get_total_metrics(self):
        exact_match = 100.0 * self.total_exact_match / self.total

        total_metrics = copy.deepcopy(self.total_metrics)
        for key in self.total_metrics.keys():
            total_metrics[key] = 100.0 *  self.total_metrics[key] / self.total
        total_metrics['exact_match'] = exact_match
        
        return total_metrics  

    # Normalize a question by removing articles, extra whitespaces, camel case
    # and punctuation.
    def normalize_question(self, question):
        # Lower case
        normalized_question = question.lower()

        # Remove articles
        normalized_question = sub(r"\b(a|an|the)\b", " ", normalized_question)

        # Remove punctuation
        normalized_question = "".join(c for c in normalized_question 
                                      if c not in set(punctuation))

        # Remove reduntant white spaces.
        normalized_question = " ".join(normalized_question.split())
        
        return normalized_question

    # Returns a metrics_dict containing the metrics that compare the 
    # obtained_output with the expected_output.
    def nlg_eval(self, obtained_output, expected_output):

        normalized_obtained_output = self.normalize_question(obtained_output)
        normalized_expected_output = []
        for quesion in expected_output:
            normalized_expected_output.append(self.normalize_question(quesion))

        metrics_dict = self.eval.compute_individual_metrics(normalized_expected_output, normalized_obtained_output)
        return metrics_dict

    # Returns if the obtained_output is exactly equal to a expected_output.
    def exact_match_score(self, obtained_output, expected_output):
        for question in expected_output:
            if self.normalize_question(obtained_output) == self.normalize_question(question):
              return 1
        return 0

The next class implements the model module for the pytorch lightning trainer.

The test step will predict a question for the given batch input, and then compare them using the evaluator class. After finishing the testing of the model, the metrics will be available via the get_metrics method.

In [None]:
class SQuADModel(LightningModule):
    def __init__(self, **kwargs):
        super().__init__()

        self.tokenizer = tokenizer

        self.model = AutoModelForSeq2SeqLM.from_pretrained(PRETRAINED_MODEL)
        self.model.resize_token_embeddings(len(self.tokenizer))
              
        self.evaluator = evaluator(NLGEval(no_glove=True,no_skipthoughts=True))
        

    def forward(self, input_ids,attention_mask=None,labels=None):
        return self.model(input_ids=input_ids,
                          attention_mask=attention_mask,
                          labels=labels,return_dict=True)
    
    def training_step(self, batch, batch_idx):
        outputs = self(batch[0],batch[1],batch[2])
        loss = outputs['loss']
        return loss
    
    def validation_step(self, batch, batch_idx):
        outputs = self(batch[0],batch[1],batch[2])
        loss = outputs['loss']
        self.log('dev_loss',loss)

    def test_step(self, batch, batch_idx):
        # Read the reference questions.
        ref_questions = [question[0] for question in batch[2]]

        # Get the outputs for the given input (10 different outputs are returned).
        sample_outputs = self.model.generate(
            input_ids =  batch[0],
            attention_mask = batch[1],
            max_length=MAX_INPUT_LENGTH,
            no_repeat_ngram_size=3,
            num_return_sequences=10
        )

        # Decode the outputs.
        decoded_questions = [self.tokenizer.decode(sample_output, skip_special_tokens=True)
                    for sample_output in sample_outputs]

        # Compare each output with the reference questions using the ROUGLE_L metric
        metrics = [self.evaluator.nlg_eval(decoded_question, ref_questions)['ROUGE_L']
                   for decoded_question in decoded_questions]

        # Choose the best question according to ROUGE_L.
        decoded_question = self.tokenizer.decode(sample_outputs[metrics.index(max(metrics))],
                                                skip_special_tokens=True)
        
        # Update the evaluator metrics using the choosen question.
        self.evaluator.update_total_metrics(decoded_question, ref_questions)
    
    def get_metrics(self):
        return self.evaluator.get_total_metrics()

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=LEARNING_RATE)  

# Define Training Parameters


The next cell defines the training parameters that the pytorch lightning trainer will use during the training of the model, as well as the checkpoint callback.

In [None]:
# Declare a parser.
parent_parser = ArgumentParser(add_help=False)
parent_parser = Trainer.add_argparse_args(parent_parser)
parser = ArgumentParser(parents=[parent_parser])

# Set arguments.
parser.set_defaults(
    deterministic=True,
    max_epochs=1, 
    gpus=1
)
args, extra = parser.parse_known_args()

# Declare the checpoint callback
checkpoint_callback = ModelCheckpoint(
     monitor='dev_loss',
     dirpath= folder_path + 'checkpoints/',
     filename='squad-checkpoint--{loss:.2f}--' + PRETRAINED_MODEL,
)

# Training

This section is used to perform the training of a model.

Declare the pytorch lightning trainer.

In [None]:
trainer = Trainer.from_argparse_args(args,
    callbacks=[checkpoint_callback]
    )    

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Declare the data and model modules.

In [None]:
model = SQuADModel()
data_module = SQuADDataModule(tokenized_train_dataset, tokenized_val_dataset, tokenized_test_dataset, batch_size=32)

Downloading:   0%|          | 0.00/231M [00:00<?, ?B/s]

Perform the training of the model. It is possible to train the model from a checkpoint using the ckpt_path parameter of the fit function.

In [None]:
if TRAIN_MODEL:
  trainer.fit(model, data_module)
  #trainer.fit(model, data_module, ckpt_path=folder_path + 'checkpoints/squad-questionanswer--epoch00--dev_loss1.36--t5-base.ckpt')

Missing logger folder: /content/lightning_logs
  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type                       | Params
-----------------------------------------------------
0 | model | T5ForConditionalGeneration | 60.5 M
-----------------------------------------------------
60.5 M    Trainable params
0         Non-trainable params
60.5 M    Total params
241.971   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

# Test

This section is used to perform the testing of a model.

Find the path to the best checkpoint (according to the dev loss monitor) for the given pre-trained model.

In [None]:
# Given a checkpoint name, it returns its loss.
def get_checkpoint_loss(checkpoint_name):
  loss_text = checkpoint_name.split("--")[1]
  loss = loss_text.split("loss")[1]
  return float(loss)

# Given a checkpoint name, it returns the pre-trained model used.
def get_checkpoint_model(checkpoint_name):
  model_text = checkpoint_name.split("--")[2]
  model = model_text.split(".")[0]
  return model

# Get the list of stored checkpoints.
checkpoints = [checkpoint 
               for checkpoint 
               in listdir(folder_path + 'checkpoints') 
               if isfile(join(folder_path + 'checkpoints', checkpoint))]

# Get the checkpoints for the desired pre-trained model.
model_checkpoints = [checkpoint 
                     for checkpoint 
                     in checkpoints 
                     if get_checkpoint_model(checkpoint) == PRETRAINED_MODEL]

# Search the best checkpoint according to the loss.
best_checkpoint_path = model_checkpoints[0]
best_checkpoint_loss = get_checkpoint_loss(best_checkpoint_path)
for i in range(1, len(model_checkpoints)):
  checkpoint = model_checkpoints[i]
  if get_checkpoint_loss(checkpoint) < best_checkpoint_loss:
    best_checkpoint_loss = get_checkpoint_loss(checkpoint)
    best_checkpoint_path = checkpoint

Perform the testing of the model.

In [None]:
if TEST_MODEL:
  trainer.test(
    model=model,
    datamodule=data_module,
    ckpt_path=folder_path + 'checkpoints/' + best_checkpoint_path
    )

Get the obtained metrics and write them in a file.

In [None]:
if TEST_MODEL:
  # Get the metrics of the model.
  metrics = model.get_metrics()

  # Write the metrics in a file.
  metrics_dir = folder_path + 'metrics/'
  makedirs(metrics_dir,exist_ok=True)
  with open(join(metrics_dir,'metrics_' + 'best_checkpoint_path'[:-5] + '.txt'),
            'w',encoding='utf-8') as metrics_f:
      metrics_f.write(str(metrics))

# Inference

This section is used to perform inference using model.

Prepare the model using the best checkpoint for the given pre-trained model.

In [None]:
model = SQuADModel.load_from_checkpoint(folder_path + 'checkpoints/' + best_checkpoint_path)

Downloading:   0%|          | 0.00/231M [00:00<?, ?B/s]

Next function returns the predicted question for the given context, answer and model.

In [None]:
def predict(context, answer, model):
  # Prepare the input for the model.
  answer_start = context.find(answer)
  input = {'answers': {'answer_start': 0, 'text': answer},
 'context': context,
 'question': ''}
  tokenized_input = tokenize_entry(input, True, tokenizer)

  input_ids = tokenized_input['input_ids'].reshape([1, tokenized_input['input_ids'].size()[0]])
  attention_mask = tokenized_input['attention_mask'].reshape([1, tokenized_input['attention_mask'].size()[0]])
  
  #Use the gpu.
  input_ids = input_ids.to(0)
  attention_mask = attention_mask.to(0)
  model = model.to(0)
  
  # Get the output of the model.
  sample_output = model.model.generate(
      input_ids = input_ids,
      attention_mask = attention_mask,
      max_length=MAX_INPUT_LENGTH,
      no_repeat_ngram_size=3,
      num_return_sequences=1
  )

  # Decode the output.
  decode_question = model.tokenizer.decode(sample_output[0], skip_special_tokens=True)

  return decode_question
    

For instance, declare any context and answer.

In [None]:
context = 'The United Kingdom prime minister has spent the afternoon talking to his ministers.'
answer = 'The United Kingdom prime minister'

Prediction of the model.

In [None]:
predict(context, answer, model)

'Who has spent the afternoon talking to his ministers?'

# OCR Pipeline

In [None]:
!sudo apt install tesseract-ocr

!pip install pytesseract


In [None]:
import pytesseract
import shutil
import os
import random
from PIL import Image


In [None]:
image_path_in_colab='a.jpg'
b = Image.open(image_path_in_colab)

In [None]:
extractedInformation = pytesseract.image_to_string(b)


In [None]:
extractedInformation = sub("\n", " ", extractedInformation)[:-2]

In [None]:
extractedInformation

'This is a lot of 12 point text to test the ocr code and see if it works on all types of file format.  The quick brown dog jumped over the lazy fox. The quick brown dog jumped over the lazy fox. The quick brown dog jumped over the lazy fox. The quick brown dog jumped over the lazy fox.'

In [None]:
model = SQuADModel.load_from_checkpoint(folder_path + 'checkpoints/' + best_checkpoint_path)
context = extractedInformation
answer = 'all types'
predict(context, answer, model)

'What type of 12 point text is used to test the ocr code?'