# Evaluate models on testset

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
!pip install -q transformers
!pip install -q datasets
!pip install -q sentencepiece

[K     |████████████████████████████████| 365 kB 34.8 MB/s 
[K     |████████████████████████████████| 212 kB 73.7 MB/s 
[K     |████████████████████████████████| 115 kB 68.2 MB/s 
[K     |████████████████████████████████| 141 kB 60.7 MB/s 
[K     |████████████████████████████████| 127 kB 74.2 MB/s 
[K     |████████████████████████████████| 1.3 MB 35.2 MB/s 
[?25h

In [8]:
from datasets import load_dataset, load_from_disk, Dataset
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from tqdm import tqdm
from IPython.display import clear_output
import torch
from transformers import AutoModelForQuestionAnswering, AutoTokenizer
model_checkpoint = "HooshvareLab/bert-fa-base-uncased"
DRIVE_PATH = "/content/drive/MyDrive/PQA/"
max_length = 512 # The maximum length of a feature (question and context)
doc_stride = 256 # The authorized overlap between two part of the context when splitting it is needed.

# Load and prepare Test set

In [9]:
def prepare_train_features(examples):
    tokenized_examples = tokenizer(
        examples["question"],
        examples["context"],
        truncation="only_second",
        max_length=max_length,
        stride=doc_stride,
        return_overflowing_tokens=True,
        return_offsets_mapping=True,)
    
    sample_mapping = tokenized_examples.pop("overflow_to_sample_mapping")
    offset_mapping = tokenized_examples.pop("offset_mapping")
    tokenized_examples["start_positions"] = []
    tokenized_examples["end_positions"] = []
    for i, offsets in enumerate(offset_mapping):
        # We will label impossible answers with the index of the CLS token.
        input_ids = tokenized_examples["input_ids"][i]
        cls_index = input_ids.index(tokenizer.cls_token_id)
        # Grab the sequence corresponding to that example (to know what is the context and what is the question).
        sequence_ids = tokenized_examples.sequence_ids(i)
        # One example can give several spans, this is the index of the example containing this span of text.
        sample_index = sample_mapping[i]
        answers = examples["answers"][sample_index]
        # If no answers are given, set the cls_index as answer.
        if len(answers["answer_start"]) == 0:
            tokenized_examples["start_positions"].append(cls_index)
            tokenized_examples["end_positions"].append(cls_index)
        else:
            # Start/end character index of the answer in the text.
            start_char = answers["answer_start"][0]
            end_char = start_char + len(answers["text"][0])
            # Start token index of the current span in the text.
            token_start_index = 0
            while sequence_ids[token_start_index] != 1:
                token_start_index += 1
            # End token index of the current span in the text.
            token_end_index = len(input_ids) - 1
            while sequence_ids[token_end_index] != 1:
                token_end_index -= 1
            # Detect if the answer is out of the span (in which case this feature is labeled with the CLS index).
            if not (offsets[token_start_index][0] <= start_char and offsets[token_end_index][1] >= end_char):
                tokenized_examples["start_positions"].append(cls_index)
                tokenized_examples["end_positions"].append(cls_index)
            else:
                # Otherwise move the token_start_index and token_end_index to the two ends of the answer.
                # Note: we could go after the last offset if the answer is the last word (edge case).
                while token_start_index < len(offsets) and offsets[token_start_index][0] <= start_char:
                    token_start_index += 1
                tokenized_examples["start_positions"].append(token_start_index - 1)
                while offsets[token_end_index][1] >= end_char:
                    token_end_index -= 1
                tokenized_examples["end_positions"].append(token_end_index + 1)

    return tokenized_examples

In [10]:
test_dataset = load_from_disk(DRIVE_PATH + "test.hf").shuffle(seed=42)
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
tokenized_test = test_dataset.map(prepare_train_features, batched=True, remove_columns=test_dataset.column_names)



Downloading config.json:   0%|          | 0.00/440 [00:00<?, ?B/s]

Downloading vocab.txt:   0%|          | 0.00/1.14M [00:00<?, ?B/s]



# Load Pars BERT model

In [6]:
DRIVE_PATH = "/content/drive/MyDrive/PQA/"
model_path = DRIVE_PATH + f"checkpoints/checkpoint-2548/"  ## load model trained for 2 epochs
model = AutoModelForQuestionAnswering.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Prediction Class

In [7]:
class AnswerPredictor:
  def __init__(self, model, tokenizer, device='cuda', n_best=10, max_length=512, stride=256, no_answer=False):
      """Initializes PyTorch Question Answering Prediction
      It's best to leave use the default values.
      Args:
          model: Fine-tuned torch model
          tokenizer: Transformers tokenizer
          device (torch.device): Running device
          n_best (int): Number of best possible answers
          max_length (int): Tokenizer max length
          stride (int): Tokenizer stride
          no_answer (bool): If True, model can return "no answer"
      """
      self.model = model.eval().to(device)
      self.tokenizer = tokenizer
      self.device = device
      self.max_length = max_length
      self.stride = stride
      self.no_answer = no_answer
      self.n_best = n_best


  def model_pred(self, questions, contexts, batch_size=1):
      n = len(contexts)
      if n%batch_size!=0:
          raise Exception("batch_size must be divisible by sample length")

      tokens = self.tokenizer(questions, contexts, add_special_tokens=True, 
                              return_token_type_ids=True, return_tensors="pt", padding=True, 
                              return_offsets_mapping=True, truncation="only_second", 
                              max_length=self.max_length, stride=self.stride)

      start_logits, end_logits = [], []
      for i in tqdm(range(0, n-batch_size+1, batch_size)):
          with torch.no_grad():
              out = self.model(tokens['input_ids'][i:i+batch_size].to(self.device), 
                          tokens['attention_mask'][i:i+batch_size].to(self.device), 
                          tokens['token_type_ids'][i:i+batch_size].to(self.device))

              start_logits.append(out.start_logits)
              end_logits.append(out.end_logits)

      return tokens, torch.stack(start_logits).view(n, -1), torch.stack(end_logits).view(n, -1)


  def __call__(self, questions, contexts, batch_size=1, answer_max_len=100):
      """Creates model prediction
      
      Args: 
          questions (list): Question strings
          contexts (list): Contexts strings
          batch_size (int): Batch size
          answer_max_len (int): Sets the longests possible length for any answer
        
      Returns:
          dict: The best prediction of the model
              (e.g {0: {"text": str, "score": int}})
      """
      tokens, starts, ends = self.model_pred(questions, contexts, batch_size=batch_size)
      start_indexes = starts.argsort(dim=-1, descending=True)[:, :self.n_best]
      end_indexes = ends.argsort(dim=-1, descending=True)[:, :self.n_best]

      preds = {}
      for i, (c, q) in enumerate(zip(contexts, questions)):  
          min_null_score = starts[i][0] + ends[i][0] # 0 is CLS Token
          start_context = tokens['input_ids'][i].tolist().index(self.tokenizer.sep_token_id)
          
          offset = tokens['offset_mapping'][i]
          valid_answers = []
          for start_index in start_indexes[i]:
              # Don't consider answers that are in questions
              if start_index<start_context:
                  continue
              for end_index in end_indexes[i]:
                  # Don't consider out-of-scope answers, either because the indices are out of bounds or correspond
                  # to part of the input_ids that are not in the context.
                  if (start_index >= len(offset) or end_index >= len(offset)
                      or offset[start_index] is None or offset[end_index] is None):
                      continue
                  # Don't consider answers with a length that is either < 0 or > max_answer_length.
                  if end_index < start_index or (end_index-start_index+1) > answer_max_len:
                      continue

                  start_char = offset[start_index][0]
                  end_char = offset[end_index][1]
                  valid_answers.append({"score": (starts[i][start_index] + ends[i][end_index]).item(),
                                        "text": c[start_char: end_char]})
                  
          if len(valid_answers) > 0:
              best_answer = sorted(valid_answers, key=lambda x: x["score"], reverse=True)[0]
          else:
              best_answer = {"text": "", "score": min_null_score}

          if self.no_answer:
              preds[i] = best_answer if best_answer["score"] >= min_null_score else {"text": "", "score": min_null_score}
          else:
              preds[i] = best_answer

      return preds


# EM and F1 

In [11]:
def compute_f1(prediction, answer):
    pred_tokens = prediction.split()
    answer_tokens = answer.split()
    
    # if either the prediction or the truth is no-answer then f1 = 1 if they agree, 0 otherwise
    if len(pred_tokens) == 0 or len(answer_tokens) == 0:
        return int(pred_tokens == answer_tokens)
    
    common_tokens = set(pred_tokens) & set(answer_tokens)
    
    # if there are no common tokens then f1 = 0
    if len(common_tokens) == 0:
        return 0
    
    prec = len(common_tokens) / len(pred_tokens)
    rec = len(common_tokens) / len(answer_tokens)
    
    return 2 * (prec * rec) / (prec + rec)

def compute_exact_match(prediction, answer):
    return int(prediction == answer)

# Model Evaluation

In [None]:
predictor = AnswerPredictor(model, tokenizer, device='cuda', n_best=10, no_answer=True)

EH = 0
F1 = 0

for example in test_dataset: 

  #print(len(example['answers']))
  #if len(example['answers']) != 2: 
    #print(example['answers'])
  if example['answers']['text'] == [] :
    context = example['context'] 
    question = example['question'] 
    preds = predictor([question], [context], batch_size=1)
    pred = preds[0]['text'].strip()
    if pred == [] : 
      EH += 1
      F1 += 1

    continue

  context = example['context'] 
  question = example['question'] 
  answer = example['answers']['text'][0]
  preds = predictor([question], [context], batch_size=1)
  pred = preds[0]['text'].strip()

  EH += compute_exact_match(pred, answer)
  F1 += compute_f1(pred, answer)

EH /= len(test_dataset)
F1 /= len(test_dataset)

In [11]:
print('Exact match of Pars BERT on testset : ' + str(EH))
print('F1 score of Pars BERT on testset : ' + str(F1))

Exact match of Pars BERT on testset : 0.1896853146853147
F1 score of Pars BERT on testset : 0.3427832896026578


#Evaluation of ParsT5

In [None]:
! gdown 1Lcs5eGTIhy0JUY9FW2pn-80m3CHyVtvQ
! unzip ParsT5.zip
!pip install transformers

In [24]:
from transformers import T5ForConditionalGeneration, AutoTokenizer
model_path = '/content/content/drive/MyDrive/parsT5_QA/model_4'
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = T5ForConditionalGeneration.from_pretrained(model_path)

In [None]:
EH = 0
F1 = 0

cnt = 0 
for example in test_dataset: 

  cnt += 1
  print(cnt)

  context = example['context'] 
  question = example['question']
  input = 'متن: ' + context + '، پرسش: ' + question
  input_ids = tokenizer.encode(input, return_tensors='pt')
  output_ids = model.generate(input_ids, max_length=150, num_beams=2, repetition_penalty=2.5, length_penalty=1.0, early_stopping=True)
  output = ' '.join([tokenizer.decode(id) for id in output_ids])
  pred = output.replace('<pad>', '').replace('</s>', '').strip()

  
  if example['answers']['text'] == [] :
    if pred == 'بدون پاسخ' : 
      EH += 1
      F1 += 1
    continue

  answer = example['answers']['text'][0]


  EH += compute_exact_match(pred, answer)
  F1 += compute_f1(pred, answer)

EH /= len(test_dataset)
F1 /= len(test_dataset)

In [35]:
print('Exact match of Pars BERT on testset : ' + str(EH))
print('F1 score of Pars BERT on testset : ' + str(F1))

Exact match of Pars BERT on testset : 0.3173076923076923
F1 score of Pars BERT on testset : 0.38419642354479394
