In [2]:
!pip install transformers torch datasets



In [3]:
from datasets import load_dataset

# Load the dataset
dataset = load_dataset('ms_marco', 'v1.1')


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [4]:
from transformers import T5Tokenizer, T5ForConditionalGeneration, pipeline

# Load T5 model and tokenizer
tokenizer = T5Tokenizer.from_pretrained('t5-large')
model = T5ForConditionalGeneration.from_pretrained('t5-large')

# Set up T5 for question answering using a text-to-text format
def answer_question_t5(question, context, max_length=200):
    input_text = f"question: {question} context: {context}"
    input_ids = tokenizer.encode(input_text, return_tensors='pt')

    # Generate the answer tokens
    outputs = model.generate(input_ids, max_length=max_length, num_beams=5, early_stopping=True)
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

    return answer


You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [5]:
!pip install nltk



In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
from transformers import DPRQuestionEncoder, DPRContextEncoder
from transformers import DPRQuestionEncoderTokenizer, DPRContextEncoderTokenizer

In [8]:
def getEncoders(base_path = "/content/drive/My Drive/Colab Notebooks/DPR/"):
  question_encoder = DPRQuestionEncoder.from_pretrained(f"{base_path}question_encoder")
  context_encoder = DPRContextEncoder.from_pretrained(f"{base_path}context_encoder")
  return question_encoder, context_encoder

In [9]:
def getTokenizers(base_path = "/content/drive/My Drive/Colab Notebooks/DPR/"):
  question_encoder_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained(f"{base_path}question_encoder_tokenizer")
  context_encoder_tokenizer = DPRContextEncoderTokenizer.from_pretrained(f"{base_path}context_encoder_tokenizer")
  return question_encoder_tokenizer, context_encoder_tokenizer

In [10]:
def normalize_embeddings(embeddings):
    norms = embeddings.norm(p=2, dim=1, keepdim=True)
    return embeddings.div(norms)

def encode_query(query, question_encoder, tokenizer):
    inputs = tokenizer(query, return_tensors="pt", padding=True, truncation=True, max_length=512)
    inputs = {k: v.to(question_encoder.device) for k, v in inputs.items()}
    with torch.no_grad():
        query_embedding = question_encoder(**inputs).pooler_output
    # Normalize the embedding
    query_embedding = normalize_embeddings(query_embedding)
    return query_embedding

def encode_passages(passages, context_encoder, tokenizer):
    inputs = tokenizer(passages, padding=True, truncation=True, max_length=512, return_tensors="pt")
    inputs = {k: v.to(context_encoder.device) for k, v in inputs.items()}
    with torch.no_grad():
        passage_embeddings = context_encoder(**inputs).pooler_output
    # Normalize the embeddings
    passage_embeddings = normalize_embeddings(passage_embeddings)
    return passage_embeddings

In [11]:
import torch
import torch.nn.functional as F

question_encoder, context_encoder = getEncoders()
question_encoder_tokenizer, context_encoder_tokenizer = getTokenizers()

def retrieve_passages(query_embedding, passage_embeddings):
    query_embedding = F.normalize(query_embedding, p=2, dim=1)
    passage_embeddings = F.normalize(passage_embeddings, p=2, dim=1)
    similarities = torch.matmul(query_embedding, passage_embeddings.T)

    # Convert similarities to probabilities to rank passages
    similarities = similarities.squeeze(0).cpu().numpy()

    # Rank passages based on similarities
    ranked_indices = similarities.argsort()[::-1]

    return ranked_indices


def retrieve_top_n_passages(query, passages, n=5):
    query_embedding = encode_query(query, question_encoder, question_encoder_tokenizer)
    passage_embeddings = encode_passages(passages, context_encoder, context_encoder_tokenizer)
    ranked_indices = retrieve_passages(query_embedding, passage_embeddings)
    return [passages[idx] for idx in ranked_indices[:n]]


In [12]:
!pip install Rouge
from rouge import Rouge



8


In [15]:
import nltk
import os
nltk.download('punkt')
from concurrent.futures import ThreadPoolExecutor, as_completed

def calculate_f1(generated_answer, actual_answers):
    """Calculates the F1 score.

    Args:
        generated_answer (str): The answer generated by the model.
        actual_answers (list[str]): A list of correct answers.

    Returns:
        float: The F1 score.
    """
    if not actual_answers or not actual_answers[0]:
        actual_answers.append(" ")
    generated_tokens = nltk.word_tokenize(generated_answer)
    actual_answer_tokens = nltk.word_tokenize(actual_answers[0])

    common_tokens = set(generated_tokens) & set(actual_answer_tokens)

    precision = len(common_tokens) / len(generated_tokens) if len(generated_tokens) > 0 else 0
    recall = len(common_tokens) / len(actual_answer_tokens) if len(actual_answer_tokens) > 0 else 0

    if precision + recall == 0:
        return 0

    f1 = 2 * (precision * recall) / (precision + recall)
    return f1


def process_entry(data_entry, useDpr=True):
    question = data_entry['query']
    context = ', '.join(retrieve_top_n_passages(question, data_entry['passages']['passage_text'], n=3)) if useDpr else ', '.join(data_entry['passages']['passage_text'])
    generated_answer = answer_question_t5(question, context)
    actual_answers = data_entry['answers']
    f1_score = calculate_f1(generated_answer, actual_answers)
    rouge = Rouge()
    rouge_score = rouge.get_scores(generated_answer, actual_answers[0])[0]['rouge-l']['f']
    return (f1_score, rouge_score)

def calculate_avg_f1(dataset, useDpr=True,  max_workers=os.cpu_count()):
    scores = {"f1_score": 0.0, "rouge_score": 0.0}
    f1_scores = []
    rouge_scores = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_entry = {executor.submit(process_entry, entry, useDpr): entry for entry in dataset}
        for future in as_completed(future_to_entry):
            f1_score, rouge_score = future.result()
            f1_scores.append(f1_score)
            rouge_scores.append(rouge_score)

    scores["f1_score"] = sum(f1_scores) / len(f1_scores) if f1_scores else 0
    scores["rouge_score"] = sum(rouge_scores) / len(rouge_scores) if rouge_scores else 0
    return scores

# Usage
val_data = dataset['validation']
filtered_dataset = [entry for entry in val_data if entry['answers'] and entry['answers'][0]]
result = calculate_avg_f1(filtered_dataset, False)
print('score', result)


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


score {'f1_score': 0.2219083806806626, 'rouge_score': 0.2241029025437013}
