<a href="https://colab.research.google.com/github/abdmomin/abdmomin-portfolio/blob/main/bioasq_ir_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import json
import random
import timeit
from tqdm import tqdm

import requests
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

from IPython.display import display, HTML

## Download the data

In [None]:
os.makedirs('./data/', exist_ok=True)

In [None]:
# url = 'https://raw.githubusercontent.com/Popescu-PfeifferMarc/ir-bioasq/refs/heads/master/dataset/12B1_golden.json?token=GHSAT0AAAAAAC2O7C4GVUF5YMV2AS7DRZUQZZ74Q2Q'
URL = 'https://raw.githubusercontent.com/Popescu-PfeifferMarc/ir-bioasq/refs/heads/master/dataset/training12b_new.json?token=GHSAT0AAAAAAC2O7C4GUO4JY4TTYFNDAKKSZ2A7LKQ'
DATA_PATH = './data/training12B.json'

In [None]:
res = requests.get(URL)
try:
  data_dict = res.json()
except json.JSONDecodeError as e:
  print("Error parsing JSON: ", e)
  raise

with open(DATA_PATH, 'wb') as f:
  f.write(res.content)

with open(DATA_PATH, 'rb') as f:
  data_dict = json.load(f)

## Reading the data

In [None]:
data_dict.keys()

dict_keys(['questions'])

In [None]:
# questions = []
# answers = []
# context = []

# for question in data_dict['questions']:
#     questions.append(question['body'])

#     if question['snippets']:
#         longest_entry = max(question['snippets'], key=lambda x: len(x["text"]))
#         context.append(longest_entry['text'])

#         answers.append(dict(
#             text=question.get('ideal_answer', [None])[0],
#             start_idx=longest_entry['offsetInBeginSection'],
#             end_idx=longest_entry['offsetInEndSection']))

In [None]:
questions = []
answers = []
context = []
start_idx = []
end_idx = []

for q in data_dict['questions']:
  questions.append(q['body'])
  context.append(q['snippets'][0]['text'])

  answers.append(dict(text=q['ideal_answer'][0],
                      start_idx=q['snippets'][0]['offsetInBeginSection'],
                      end_idx= q['snippets'][0]['offsetInEndSection']))

In [None]:
len(questions), len(answers), len(context)

(5049, 5049, 5049)

In [None]:
train_questions, val_questions, train_answers, val_answers, train_context, val_context = train_test_split(questions,
                                                                                                          answers,
                                                                                                          context,
                                                                                                          test_size=0.2,
                                                                                                          random_state=42)

In [None]:
test_questions, val_questions, test_answers, val_answers, test_context, val_context = train_test_split(val_questions,
                                                                                                       val_answers,
                                                                                                       val_context,
                                                                                                       test_size=0.5,
                                                                                                       random_state=42)

In [None]:
len(train_questions), len(val_questions), len(test_questions)

(4039, 505, 505)

In [None]:
len(train_answers), len(val_answers), len(test_answers)

(4039, 505, 505)

In [None]:
len(train_context), len(val_context), len(test_context)

(4039, 505, 505)

In [None]:
import transformers
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline

MODEL_NAME = "deepset/roberta-base-squad2"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/79.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

In [None]:
train_encodings = tokenizer(train_context, train_questions, truncation=True, padding=True)
val_encodings = tokenizer(val_context, val_questions, truncation=True, padding=True)
test_encodings = tokenizer(test_context, test_questions, truncation=True, padding=True)

In [None]:
def add_token_positions(encodings, answers):
  # initialize lists to contain the token indices of answer start/end
  start_positions = []
  end_positions = []
  for i in range(len(answers)):
    # append start/end token position using char_to_token method
    # Check if start_idx is non-negative before calling char_to_token
    start_idx = answers[i]['start_idx']
    if start_idx >= 0:
      start_positions.append(encodings.char_to_token(i, start_idx))
    else:
      # Handle negative start_idx, e.g., set to 0 or skip
      start_positions.append(0)  # or None, depending on your logic

    # Check if end_idx is non-negative before calling char_to_token
    end_idx = answers[i]['end_idx']
    if end_idx >= 0:
      end_positions.append(encodings.char_to_token(i, end_idx))
    else:
      # Handle negative end_idx, e.g., set to 0 or skip
      end_positions.append(0)  # or None, depending on your logic

    # if start position is None, the answer passage has been truncated
    if start_positions[-1] is None:
      start_positions[-1] = tokenizer.model_max_length
    # end position cannot be found, char_to_token found space, so shift one token forward
    go_back = 1
    while end_positions[-1] is None:
      end_pos_idx = answers[i]['end_idx'] - go_back
      # Check if end_pos_idx is non-negative before calling char_to_token
      if end_pos_idx >= 0:
        end_positions[-1] = encodings.char_to_token(i, end_pos_idx)
      else:
        # Handle negative end_pos_idx, e.g., set to 0 or break
        end_positions[-1] = 0  # or break the loop
        break
      go_back += 1
  # update our encodings object with the new token-based start/end positions
  encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

In [None]:
add_token_positions(train_encodings, train_answers)
add_token_positions(val_encodings, val_answers)
add_token_positions(test_encodings, test_answers)

In [None]:
class BioASQDataset(torch.utils.data.Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    def __len__(self):
        return len(self.encodings.input_ids)

train_dataset = BioASQDataset(train_encodings)
val_dataset = BioASQDataset(val_encodings)
test_dataset = BioASQDataset(test_encodings)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !rm -rf /content/drive/MyDrive/models/deepset

In [None]:
LEARNING_RATE = 5e-5
BATCH_SIZE = 16
EPOCHS = 3
MODEL_SAVE_PATH = f"/content/drive/MyDrive/models/{MODEL_NAME}-lr{LEARNING_RATE}-epoch{EPOCHS}/"

In [None]:
transformers.utils.logging.set_verbosity_error()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [None]:
model = AutoModelForQuestionAnswering.from_pretrained(MODEL_NAME).to(device)

print(model.num_parameters())

model.safetensors:   0%|          | 0.00/496M [00:00<?, ?B/s]

124056578


In [None]:
train_dataloader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=BATCH_SIZE,
                                               shuffle=True)

val_dataloader = torch.utils.data.DataLoader(dataset=val_dataset,
                                             batch_size=BATCH_SIZE,
                                             shuffle=False)

test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset,
                                              batch_size=BATCH_SIZE,
                                              shuffle=False)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

start = timeit.default_timer()
for epoch in range(EPOCHS):
  model.train()
  train_running_loss = 0
  for idx, sample in enumerate(tqdm(train_dataloader, leave=True)):
    input_ids = sample['input_ids'].to(device)
    attention_mask = sample['attention_mask'].to(device)
    start_positions = sample['start_positions'].to(device)
    end_positions = sample['end_positions'].to(device)
    outputs = model(input_ids=input_ids,
                    attention_mask=attention_mask,
                    start_positions=start_positions,
                    end_positions=end_positions)

    loss = outputs.loss

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    train_running_loss += loss.item()

  train_loss = train_running_loss / (idx + 1)

  model.eval()
  val_running_loss = 0
  with torch.inference_mode():
    for idx, sample in enumerate(tqdm(val_dataloader)):
      input_ids = sample['input_ids'].to(device)
      attention_mask = sample['attention_mask'].to(device)
      start_positions = sample['start_positions'].to(device)
      end_positions = sample['end_positions'].to(device)
      outputs = model(input_ids=input_ids,
                      attention_mask=attention_mask,
                      start_positions=start_positions,
                      end_positions=end_positions)

      val_running_loss += outputs.loss.item()
    val_loss = val_running_loss / (idx + 1)

  print("-"*30)
  print(f"EPOCH: {epoch+1:02d} | Train Loss: {train_loss:.4f}")
  print(f"EPOCH: {epoch+1:02d} | Valid Loss: {val_loss:.4f}")
  print("-"*30)
  stop = timeit.default_timer()
  print(f"Training Time: {stop-start:.2f}s")

  model.save_pretrained(MODEL_SAVE_PATH)
  tokenizer.save_pretrained(MODEL_SAVE_PATH)

  torch.cuda.empty_cache()

In [None]:
preds = []
true = []
running_accuracy = []

model.eval()
with torch.inference_mode():
  for idx, sample in enumerate(tqdm(test_dataloader, leave=True)):
    input_ids = sample['input_ids'].to(device)
    attention_mask = sample['attention_mask'].to(device)
    start_positions = sample['start_positions']
    end_positions = sample['end_positions']

    outputs = model(input_ids=input_ids, attention_mask=attention_mask)

    start_pred = torch.argmax(outputs['start_logits'], dim=1).cpu().detach()
    end_pred = torch.argmax(outputs['end_logits'], dim=1).cpu().detach()

    preds.extend([[int(i), int(j)] for i, j in zip(start_pred, end_pred)])
    true.extend([[int(i), int(j)] for i, j in zip(start_positions, end_positions)])

    running_accuracy.append(((start_pred == start_positions).sum()/len(start_positions)).item())
    running_accuracy.append(((end_pred == end_positions).sum()/len(end_positions)).item())

preds = [item for sublist in preds for item in sublist]
true = [item for sublist in true for item in sublist]

accuracy = sum(running_accuracy)/len(running_accuracy) # average accuracy
f1_value = f1_score(true, preds, average="macro")
print(f"\nAccuracy: {accuracy*100:.2f}% | F1 Score: {f1_value*100:.2f}%")

100%|██████████| 16/16 [00:06<00:00,  2.30it/s]


Accuracy: 65.49% | F1 Score: 71.17%





In [None]:
def model_inference(question, context, model_path=MODEL_SAVE_PATH):
  model = AutoModelForQuestionAnswering.from_pretrained(model_path)
  tokenizer = AutoTokenizer.from_pretrained(model_path)
  start = timeit.default_timer()
  qa_model = pipeline(task="question-answering", model=model, tokenizer=tokenizer)
  stop = timeit.default_timer()
  print(f"Inference Time: {stop-start:.2f}s")
  result = qa_model(question=question, context=context)

  before_text = context[:result['start']]
  colored_text = context[result['start']:result['end']+1]
  after_text = context[result['end']+1:]
  print(f"Answer: {result['answer']}")
  display(HTML(f"""<p style='font-size: 16px; width: 50%;'>{before_text}
    <span style='background-color: #33447f; color: white; width: {len(result["answer"])}em;'>{colored_text}</span>
    {after_text}</p>"""))

In [None]:
random_idx = random.randint(0, len(test_questions)-1)
question = test_questions[random_idx]
context = test_context[random_idx]
print(question)
model_inference(question, context)

What is a ciliopathy?
Inference Time: 0.00s
Answer: multiple organ systems.


In [None]:
# data = [
#   {
#     "question": "What is the role of p53 in cancer suppression?",
#     "answer": "p53 acts as a tumor suppressor by regulating cell cycle and apoptosis.",
#     "context": "p53, known as the guardian of the genome, plays a critical role in preventing cancer. It induces apoptosis, DNA repair, or cell cycle arrest in response to genomic instability. Mutations in p53 are found in approximately 50% of human cancers."
#   },
#   {
#     "question": "How does metformin help in managing diabetes?",
#     "answer": "Metformin reduces glucose production in the liver and improves insulin sensitivity.",
#     "context": "Metformin is the first-line medication for type 2 diabetes. It works by suppressing hepatic gluconeogenesis and increasing insulin-mediated glucose uptake. It is well-tolerated and can also have cardiovascular benefits."
#   },
#   {
#     "question": "What are the common side effects of aspirin?",
#     "answer": "The common side effects of aspirin include gastrointestinal irritation and increased risk of bleeding.",
#     "context": "Aspirin, widely used as an analgesic and antipyretic, is associated with adverse effects such as gastric ulcers, bleeding, and allergic reactions. It inhibits COX-1 and COX-2 enzymes, which are involved in prostaglandin synthesis."
#   },
#   {
#     "question": "What is the function of CRISPR-Cas9 in genetic engineering?",
#     "answer": "CRISPR-Cas9 is a tool used for precise genome editing by cutting DNA at specific sites.",
#     "context": "CRISPR-Cas9, derived from bacterial defense systems, enables scientists to target and modify DNA sequences with high precision. It is used in correcting genetic disorders, creating disease models, and studying gene functions."
#   },
#   {
#     "question": "What is the mechanism of action of beta-blockers?",
#     "answer": "Beta-blockers block beta-adrenergic receptors, reducing heart rate and blood pressure.",
#     "context": "Beta-blockers, such as propranolol and metoprolol, are used in managing hypertension, angina, and arrhythmias. They inhibit the effects of adrenaline on beta-receptors, leading to decreased cardiac output and reduced oxygen demand."
#   }
# ]