<a href="https://colab.research.google.com/github/AbrahamArellano/do-we-still-need-clinical-lms/blob/main/dlh_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


**Reproduction of paper "Do We Still Need Clinical Language Models?"**


Project work for CS 598 Deep Learning for Healthcare, UIUC, Spring 2025. We are reporducing the [Do We Still Need Clinical Language Models?](https://arxiv.org/pdf/2302.08091).


In [None]:
print("Reproduction of the paper \'Do we still need CLinical Language Models\'")

Reproduction of the paper 'Do we still need CLinical Language Models'


In [None]:
# Connect to google drive and mount the filesystems
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# Access a file
dataset = '/content/drive/MyDrive/DL4H-Project/dataset'
mednli_shared = '/content/drive/Shareddrives'
!ls $dataset


clip  mednli  radqa


In [None]:
import json
import random

## MedNLI Training dataset
# Filepath of the original dataset
input_file = dataset + "/mednli/mli_train_v1.jsonl"

!ls $input_file

# Output filepaths for subsets
output_files = {
    "1%": dataset + "/mednli/mli_train_1_percent.jsonl",
    "5%": dataset + "/mednli/mli_train_5_percent.jsonl",
    "10%": dataset + "/mednli/mli_train_10_percent.jsonl",
    "25%": dataset + "/mednli/mli_train_25_percent.jsonl",
}

# Proportions for subsets
proportions = {
    "1%": 0.01,
    "5%": 0.05,
    "10%": 0.10,
    "25%": 0.25,
}

# Read the original dataset
with open(input_file, "r") as f:
    data = f.readlines()

# Shuffle the data for randomness
random.shuffle(data)

# Create subsets
for subset, proportion in proportions.items():
    subset_size = int(len(data) * proportion)
    subset_data = data[:subset_size]

    # Write the subset to a new file
    with open(output_files[subset], "w") as f:
        f.writelines(subset_data)

print("MedNLI subsets created successfully!")
!ls '/content/drive/MyDrive/DL4H-Project/dataset/mednli'

/content/drive/MyDrive/DL4H-Project/dataset/mednli/mli_train_v1.jsonl
MedNLI subsets created successfully!
LICENSE.txt	   mli_train_10_percent.jsonl  mli_train_5_percent.jsonl  SHA256SUMS.txt
mli_dev_v1.jsonl   mli_train_1_percent.jsonl   mli_train_v1.jsonl
mli_test_v1.jsonl  mli_train_25_percent.jsonl  README.txt


In [None]:
# RadQA Subset creation
# Filepath of the original dataset
input_file = dataset + "/radqa/train.json"

# Output filepaths for subsets
output_files = {
    "1%": dataset + "/radqa/train_1_percent.json",
    "5%": dataset + "/radqa/train_5_percent.json",
    "10%": dataset + "/radqa/train_10_percent.json",
    "25%": dataset + "/radqa/train_25_percent.json",
}

# Proportions for subsets
proportions = {
    "1%": 0.01,
    "5%": 0.05,
    "10%": 0.10,
    "25%": 0.25,
}

# Read the original dataset
with open(input_file, "r") as f:
    data = json.load(f)

# Extract the "data" field containing the dataset
original_data = data["data"]

# Shuffle the data for randomness
random.shuffle(original_data)

# Create subsets
for subset, proportion in proportions.items():
    subset_size = int(len(original_data) * proportion)
    subset_data = original_data[:subset_size]

    # Write the subset to a new file
    subset_output = {"data": subset_data}
    with open(output_files[subset], "w") as f:
        json.dump(subset_output, f, indent=2)

print("Subsets created successfully!")
!ls "/content/drive/MyDrive/DL4H-Project/dataset/radqa/"

Subsets created successfully!
dev.json     README.txt      test.json		    train_1_percent.json   train_5_percent.json
LICENSE.txt  SHA256SUMS.txt  train_10_percent.json  train_25_percent.json  train.json


In [None]:


import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration, T5Model
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split

In [None]:
# Define a custom dataset class
class MedNLIDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        input_text = f"mnli premise: {item['sentence1']} hypothesis: {item['sentence2']}"
        target_text = item['gold_label']

        inputs = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        targets = self.tokenizer(
            target_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return {
            "input_ids": inputs["input_ids"].squeeze(),
            "attention_mask": inputs["attention_mask"].squeeze(),
            "labels": targets["input_ids"].squeeze(),
        }


In [None]:
# Load the dataset
def load_mednli_data(file_path):
    with open(file_path, "r") as f:
        return [json.loads(line) for line in f]


In [None]:
# Filepath to the dataset
train_file = dataset + "/mednli/mli_train_1_percent.jsonl" # Change to desired subset
data = load_mednli_data(train_file)

# Split into train and validation sets
train_data, val_data = train_test_split(data, test_size=0.1, random_state=42)

# Load the tokenizer and model
tokenizer = T5Tokenizer.from_pretrained("t5-base")
model = T5ForConditionalGeneration.from_pretrained("t5-base")
# model = T5Model.from_pretrained("t5-base")

# Create DataLoaders
train_dataset = MedNLIDataset(train_data, tokenizer)
val_dataset = MedNLIDataset(val_data, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]

In [None]:
# Define optimizer and device
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    train_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        loss = outputs.loss
        train_loss += loss.item()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Training Loss: {train_loss / len(train_loader)}")

    # Validation loop
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            val_loss += outputs.loss.item()

    print(f"Epoch {epoch + 1}, Validation Loss: {val_loss / len(val_loader)}")

print("Training complete!")


TypeError: T5Model.forward() got an unexpected keyword argument 'labels'

**RadQA Task**

Prepare the Dataset:
The RadQA dataset is in a question-answering format. We need to preprocess it to create input-output pairs suitable for T5. For example:

**Input:** "question: \<question\> context: \<context\>"<br>
**Output:** The answer text or "no answer" if the question is unanswerable.


In [None]:
import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration, T5Model, EncoderDecoderCache
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import json

# Define a custom dataset class
class RadQADataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        question = item["question"]
        context = item["context"]
        if item["answers"]:
            answer = item["answers"][0]["text"]
        else:
            answer = "no answer"

        input_text = f"question: {question} context: {context}"
        target_text = answer

        inputs = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        targets = self.tokenizer(
            target_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return {
            "input_ids": inputs["input_ids"].squeeze(),
            "attention_mask": inputs["attention_mask"].squeeze(),
            "labels": targets["input_ids"].squeeze(),
        }

In [None]:
# Load the RadQA dataset
def load_radqa_data(file_path):
    with open(file_path, "r") as f:
        data = json.load(f)["data"]
    processed_data = []
    for entry in data:
        for paragraph in entry["paragraphs"]:
            context = paragraph["context"]
            for qa in paragraph["qas"]:
                processed_data.append({
                    "question": qa["question"],
                    "context": context,
                    "answers": qa["answers"]
                })
    return processed_data

# Filepath to the dataset
train_file = dataset + "/radqa/train_1_percent.json"  # Change to desired subset
data = load_radqa_data(train_file)

# Split into train and validation sets
train_data, val_data = train_test_split(data, test_size=0.1, random_state=42)


In [None]:
# Load the tokenizer and model
tokenizer = T5Tokenizer.from_pretrained("t5-base")
model = T5ForConditionalGeneration.from_pretrained("t5-base")
# model =  T5Model.from_pretrained("t5-base")

# If you have past_key_values (tuple format)
# past_key_values = EncoderDecoderCache.from_legacy_cache(past_key_values)


# Create DataLoaders
train_dataset = RadQADataset(train_data, tokenizer)
val_dataset = RadQADataset(val_data, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

# Define optimizer and device
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

T5ForConditionalGeneration(
  (shared): Embedding(32128, 768)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 768)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=768, out_features=768, bias=False)
              (k): Linear(in_features=768, out_features=768, bias=False)
              (v): Linear(in_features=768, out_features=768, bias=False)
              (o): Linear(in_features=768, out_features=768, bias=False)
              (relative_attention_bias): Embedding(32, 12)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseActDense(
              (wi): Linear(in_features=768, out_features=3072, bias=False)
              (wo): Linear(in_features=3072, out_features=768, bias=False)
              (dropout): Dro

In [None]:
# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    train_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels,
            use_cache=True,
        )
        loss = outputs.loss
        train_loss += loss.item()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Training Loss: {train_loss / len(train_loader)}")

    # Validation loop
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            val_loss += outputs.loss.item()

    print(f"Epoch {epoch + 1}, Validation Loss: {val_loss / len(val_loader)}")

print("Training complete!")

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch 1, Training Loss: 16.16879806518555
Epoch 1, Validation Loss: 10.856307983398438
Epoch 2, Training Loss: 8.14484748840332
Epoch 2, Validation Loss: 4.362302780151367
Epoch 3, Training Loss: 3.3304977893829344
Epoch 3, Validation Loss: 0.962807834148407
Training complete!


In [None]:
model.save_pretrained("/content/drive/MyDrive/DL4H-Project/t5_radqa_model")
tokenizer.save_pretrained("/content/drive/MyDrive/DL4H-Project/t5_radqa_model")

('/content/drive/MyDrive/DL4H-Project/t5_radqa_model/tokenizer_config.json',
 '/content/drive/MyDrive/DL4H-Project/t5_radqa_model/special_tokens_map.json',
 '/content/drive/MyDrive/DL4H-Project/t5_radqa_model/spiece.model',
 '/content/drive/MyDrive/DL4H-Project/t5_radqa_model/added_tokens.json')

**Evaluation of RadQA Model**

In [None]:
from transformers import T5Tokenizer, T5ForConditionalGeneration
from transformers import DataCollatorForSeq2Seq
from torch.utils.data import DataLoader, Dataset
import torch
import json
from sklearn.metrics import f1_score

# Define a custom dataset class for evaluation
class RadQAEvalDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        question = item["question"]
        context = item["context"]
        input_text = f"question: {question} context: {context}"

        inputs = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return {
            "input_ids": inputs["input_ids"].squeeze(),
            "attention_mask": inputs["attention_mask"].squeeze(),
            "answers": item["answers"],  # Keep the ground truth answers for evaluation
        }

In [None]:
# Load the RadQA dataset for evaluation
def load_radqa_eval_data(file_path):
    with open(file_path, "r") as f:
        data = json.load(f)["data"]
    processed_data = []
    for entry in data:
        for paragraph in entry["paragraphs"]:
            context = paragraph["context"]
            for qa in paragraph["qas"]:
                processed_data.append({
                    "question": qa["question"],
                    "context": context,
                    "answers": qa["answers"]
                })
    return processed_data

In [None]:
# Function to compute Exact Match (EM) and F1-score
def compute_metrics(predictions, references):
    em = 0
    f1 = 0
    for pred, ref in zip(predictions, references):
        # Exact Match
        em += int(pred.strip() == ref.strip())

        # F1-score
        pred_tokens = pred.split()
        ref_tokens = ref.split()
        common_tokens = set(pred_tokens) & set(ref_tokens)
        if len(common_tokens) == 0:
            f1 += 0
        else:
            precision = len(common_tokens) / len(pred_tokens)
            recall = len(common_tokens) / len(ref_tokens)
            f1 += 2 * (precision * recall) / (precision + recall)

    em = em / len(predictions) * 100
    f1 = f1 / len(predictions) * 100
    return {"Exact Match": em, "F1": f1}

# Evaluation pipeline
def evaluate_model(model_path, eval_file, batch_size=8):
    # Load the trained model and tokenizer
    tokenizer = T5Tokenizer.from_pretrained(model_path)
    model = T5ForConditionalGeneration.from_pretrained(model_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Load the evaluation dataset
    eval_data = load_radqa_eval_data(eval_file)
    eval_dataset = RadQAEvalDataset(eval_data, tokenizer)
    eval_loader = DataLoader(eval_dataset, batch_size=batch_size)

    # Generate predictions
    model.eval()
    predictions = []
    references = []
    with torch.no_grad():
        for batch in eval_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)

            # Generate predictions
            outputs = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=50
            )
            preds = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
            predictions.extend(preds)

            # Collect references
            for answer_list in batch["answers"]:
                if answer_list:  # If there are answers
                    references.append(answer_list[0]["text"])
                else:
                    references.append("no answer")

    # Compute metrics
    metrics = compute_metrics(predictions, references)
    return metrics

# Example usage
model_path = "/content/drive/MyDrive/DL4H-Project/t5_radqa_model"  # Path to the trained model
eval_file = dataset + "/radqa/train_1_percent.json"  # Path to the evaluation dataset
!ls $model_path
!ls $eval_file

metrics = evaluate_model(model_path, eval_file)
print("Evaluation Metrics:", metrics)

added_tokens.json  generation_config.json  special_tokens_map.json  tokenizer_config.json
config.json	   model.safetensors	   spiece.model
/content/drive/MyDrive/DL4H-Project/dataset/radqa/train_1_percent.json


RuntimeError: each element in list of batch should be of equal size