In [25]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModel, AutoTokenizer
from torch.optim import AdamW
from tqdm import tqdm
from tokenizers import Tokenizer 
from sklearn.metrics import accuracy_score
import os

In [26]:
# --- Configuration ---
MAX_LEN = 512
BATCH_SIZE = 8
NUM_LABELS = 2

# For  BioBert Transformer model
EPOCHS_TRANSFORMER = 3
LEARNING_RATE_TRANSFORMER = 2e-5


# For RNN
EPOCHS_SCRATCH = 20 
LEARNING_RATE_SCRATCH = 1e-3
EMBEDDING_DIM = 256 
RNN_HIDDEN_SIZE = 256
RNN_NUM_LAYERS = 1
RNN_DROPOUT = 0.3 

#Device
DEVICE = torch.device("mps")



In [27]:
class DataCreator_RNN(Dataset):
    def __init__(self, df, tokenizer, max_len):
        self.patient_texts = list(df['patient'])
        self.criteria_texts = list(df['criteria'])
        self.labels = list(df['label'])
        self.tokenizer = tokenizer
        self.max_len = max_len
        
        self.pad_token_id = self.tokenizer.token_to_id("[PAD]")
        if self.pad_token_id is None:
            print("Warning: [PAD] token not found in tokenizer. Using 0 for padding token ID in DataCreator.")
            self.pad_token_id = 0

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        patient_text = str(self.patient_texts[index])
        criteria_text = str(self.criteria_texts[index])
        label = torch.tensor(self.labels[index], dtype=torch.long)

        encoding = self.tokenizer.encode(criteria_text, patient_text)

        ids = encoding.ids
        attention_mask = encoding.attention_mask

        if len(ids) > self.max_len:
            ids = ids[:self.max_len]
            attention_mask = attention_mask[:self.max_len]
        else:
            padding_length = self.max_len - len(ids)
            ids += [self.pad_token_id] * padding_length
            attention_mask += [0] * padding_length

        return {
            "input_ids": torch.tensor(ids, dtype=torch.long),
            "attention_mask": torch.tensor(attention_mask, dtype=torch.long),
            "label": label,
        }


In [28]:
class DataCreator_Transformer(Dataset):
    def __init__(self, df, tokenizer, max_len):
        self.patient_texts = list(df['patient'])
        self.criteria_texts = list(df['criteria'])
        self.labels = list(df['label'])
        self.tokenizer = tokenizer
        self.max_len = max_len
        
        self.pad_token_id = self.tokenizer.pad_token_id
        if self.pad_token_id is None:
            print("Warning: [PAD] token not found in tokenizer. Using 0 for padding token ID.")
            self.pad_token_id = 0

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        patient_text = str(self.patient_texts[index])
        criteria_text = str(self.criteria_texts[index])
        label = torch.tensor(self.labels[index], dtype=torch.long)

        encoding = self.tokenizer(
            criteria_text,
            patient_text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "label": label,
        }

In [29]:
class TransformerClassifier(nn.Module):
    def __init__(self, model_name, num_labels):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.encoder.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]
        cls_output = self.dropout(cls_output)
        logits = self.classifier(cls_output)
        return logits

In [30]:
class RNNClassifierFromScratch(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, hidden_size: int, num_layers: int, num_labels: int, dropout_rate: float):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, 
                            batch_first=True, dropout=dropout_rate if num_layers > 1 else 0)
        
        self.dropout_classifier = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(hidden_size, num_labels)
        
        self.num_layers = num_layers
        self.hidden_size = hidden_size

    def forward(self, input_ids, attention_mask):
        embedded = self.embedding(input_ids) 

        lengths = attention_mask.sum(dim=1)
        
        lengths = lengths.cpu().clamp(min=1) 

        packed_embedded = nn.utils.rnn.pack_padded_sequence(
            embedded, lengths, batch_first=True, enforce_sorted=False 
        )

        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        
        final_hidden_state = hidden[-1, :, :]
        
        pooled_output = self.dropout_classifier(final_hidden_state)
        
        logits = self.classifier(pooled_output)
        
        return logits



In [31]:
def train_and_evaluate_transformer_model(
    model_name: str,
    df: pd.DataFrame,
    num_labels: int,
    max_len: int,
    batch_size: int,
    epochs: int,
    learning_rate: float,
    device: torch.device
):
    print(f"\n--- Starting training for Transformer model: {model_name} ---")

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    print(f"Tokenizer for {model_name} loaded.")

    df['label'] = pd.to_numeric(df['label'], errors='coerce')
    df.dropna(subset=['label'], inplace=True)
    df['label'] = df['label'].astype(int)

    train_val_df, test_df = train_test_split(
        df, test_size=0.2, stratify=df['label'], random_state=42
    )
    train_df, val_df = train_test_split(
        train_val_df, test_size=0.25, stratify=train_val_df['label'], random_state=42
    )

    print(f"Dataset split: Train={len(train_df)} | Val={len(val_df)} | Test={len(test_df)}")

    train_dataset = DataCreator_Transformer(df=train_df, tokenizer=tokenizer, max_len=max_len)
    val_dataset = DataCreator_Transformer(df=val_df, tokenizer=tokenizer, max_len=max_len)
    test_dataset = DataCreator_Transformer(df=test_df, tokenizer=tokenizer, max_len=max_len)

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = TransformerClassifier(model_name, num_labels)
    model.to(device)
    print(f"Model {model_name} initialized")

    optimizer = AdamW(model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()

    best_val_accuracy = 0.0
    model_save_name = model_name.replace('/', '_').replace('-', '_')
    model_save_path = f"{model_save_name}_clinical_model.pt"

    for epoch in range(epochs):
        model.train()
        total_train_loss = 0

        print(f"\nEpoch {epoch + 1}/{epochs} (Model: {model_name})")
        loop = tqdm(train_dataloader, leave=True)

        for batch in loop:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)

            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            loop.set_description(f"Epoch {epoch + 1}")
            loop.set_postfix(loss=loss.item())

        avg_train_loss = total_train_loss / len(train_dataloader)
        print(f"Average Training Loss: {avg_train_loss:.4f}")

        model.eval()
        total_val_loss = 0
        correct_val_preds = 0

        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                labels = batch["label"].to(device)

                outputs = model(input_ids, attention_mask)
                loss = loss_fn(outputs, labels)
                total_val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1)
                correct_val_preds += (preds == labels).sum().item()

        avg_val_loss = total_val_loss / len(val_dataloader)
        val_accuracy = correct_val_preds / len(val_dataset)

        print(f"Validation loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}")

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved to {model_save_path} (Best validation accuracy: {best_val_accuracy:.4f})")
    
    print(f"\n--- Finished training for {model_name} ---")

    print(f"\n--- Evaluating {model_name} on the TEST SET ---")
    model.eval()
    total_test_loss = 0
    correct_test_preds = 0

    with torch.no_grad():
        for batch in tqdm(test_dataloader, desc="Test Evaluation"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)
            total_test_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            correct_test_preds += (preds == labels).sum().item()

    avg_test_loss = total_test_loss / len(test_dataloader)
    test_accuracy = correct_test_preds / len(test_dataset)

    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


    return {
        "model_name": model_name,
        "final_val_loss": avg_val_loss,
        "final_val_accuracy": val_accuracy,
        "best_val_accuracy": best_val_accuracy,
        "final_test_loss": avg_test_loss,
        "final_test_accuracy": test_accuracy,
        "saved_model_path": model_save_path
    }

In [32]:
def train_and_evaluate_rnn_model(
    df: pd.DataFrame,
    num_labels: int,
    max_len: int,
    batch_size: int,
    epochs: int,
    learning_rate: float,
    embedding_dim: int,
    hidden_size: int,
    num_rnn_layers: int,
    dropout: float,
    device: torch.device
):
    print(f"\n--- Starting training for Simple RNN Model ---")

    tokenizer = Tokenizer.from_file("/Users/nishitha/Desktop/Learn/NLP/Clinical trials eligibility/BPE/bpe_tokenizer.json")
    vocab_size = tokenizer.get_vocab_size()
    print(f"Custom tokenizer loaded. Vocabulary size: {vocab_size}")

    df['label'] = pd.to_numeric(df['label'], errors='coerce')
    df.dropna(subset=['label'], inplace=True)
    df['label'] = df['label'].astype(int)

    train_val_df, test_df = train_test_split(
        df, test_size=0.2, stratify=df['label'], random_state=42
    )
    train_df, val_df = train_test_split(
        train_val_df, test_size=0.25, stratify=train_val_df['label'], random_state=42
    )

    print(f"Dataset split: Train={len(train_df)} | Val={len(val_df)} | Test={len(test_df)}")

    train_dataset = DataCreator_RNN(df=train_df, tokenizer=tokenizer, max_len=max_len)
    val_dataset = DataCreator_RNN(df=val_df, tokenizer=tokenizer, max_len=max_len)
    test_dataset = DataCreator_RNN(df=test_df, tokenizer=tokenizer, max_len=max_len)

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = RNNClassifierFromScratch(vocab_size, embedding_dim, hidden_size, num_rnn_layers, num_labels, dropout)
    model.to(device)
    print(f"RNNClassifierFromScratch initialized on {device}.")

    optimizer = AdamW(model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()

    best_val_accuracy = 0.0
    model_save_path = "scratch_rnn_clinical_model.pt"

    for epoch in range(epochs):
        model.train()
        total_train_loss = 0

        print(f"\nEpoch {epoch + 1}/{epochs} (Model: Simple RNN)")
        loop = tqdm(train_dataloader, leave=True)

        for batch in loop:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)

            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            loop.set_description(f"Epoch {epoch + 1}")
            loop.set_postfix(loss=loss.item())

        avg_train_loss = total_train_loss / len(train_dataloader)
        print(f"Average Training Loss: {avg_train_loss:.4f}")

        model.eval()
        total_val_loss = 0
        correct_val_preds = 0

        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                labels = batch["label"].to(device)

                outputs = model(input_ids, attention_mask)
                loss = loss_fn(outputs, labels)
                total_val_loss += loss.item()
                preds = torch.argmax(outputs, dim=1)
                correct_val_preds += (preds == labels).sum().item()

        avg_val_loss = total_val_loss / len(val_dataloader)
        val_accuracy = correct_val_preds / len(val_dataset)

        print(f"Validation loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}")

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved to {model_save_path} (Best validation accuracy: {best_val_accuracy:.4f})")
    
    print(f"\n--- Finished training for Simple RNN Model ---")

    print(f"\n--- Evaluating Simple RNN Model on the TEST SET ---")
    model.eval()
    total_test_loss = 0
    correct_test_preds = 0

    with torch.no_grad():
        for batch in tqdm(test_dataloader, desc="Test Evaluation"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)
            total_test_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            correct_test_preds += (preds == labels).sum().item()

    avg_test_loss = total_test_loss / len(test_dataloader)
    test_accuracy = correct_test_preds / len(test_dataset)

    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


    return {
        "model_name": "Simple RNN Classifier",
        "final_val_loss": avg_val_loss,
        "final_val_accuracy": val_accuracy,
        "best_val_accuracy": best_val_accuracy,
        "final_test_loss": avg_test_loss,
        "final_test_accuracy": test_accuracy,
        "saved_model_path": model_save_path
    }



In [33]:
if __name__ == "__main__":
    df_full = pd.read_csv('../Dataset/cleaned_data_3.csv')

    all_results = []

    model_name_to_test = "dmis-lab/biobert-v1.1"

    result = train_and_evaluate_transformer_model(
        model_name=model_name_to_test,
        df=df_full.copy(),
        num_labels=NUM_LABELS,
        max_len=MAX_LEN,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS_TRANSFORMER,
        learning_rate=LEARNING_RATE_TRANSFORMER,
        device=DEVICE
    )
    all_results.append(result)

    rnn_result = train_and_evaluate_rnn_model(
        df=df_full.copy(),
        num_labels=NUM_LABELS,
        max_len=MAX_LEN,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS_SCRATCH,
        learning_rate=LEARNING_RATE_SCRATCH,
        embedding_dim=EMBEDDING_DIM,
        hidden_size=RNN_HIDDEN_SIZE,
        num_rnn_layers=RNN_NUM_LAYERS,
        dropout=RNN_DROPOUT,
        device=DEVICE
    )
    all_results.append(rnn_result)


    print("\n--- Final Comparative Study Summary ---")
    for res in all_results:
        print(f"Model: {res['model_name']}")
        print(f"  Best Val Accuracy: {res['best_val_accuracy']:.4f}")
        print(f"  Final Test Accuracy: {res['final_test_accuracy']:.4f}")
        print(f"  Final Test Loss: {res['final_test_loss']:.4f}")
        print(f"  Saved Model: {res['saved_model_path']}")
        print("-" * 30)



--- Starting training for Simple RNN Model ---
Custom tokenizer loaded. Vocabulary size: 15552
Dataset split: Train=529 | Val=177 | Test=177
RNNClassifierFromScratch initialized on mps.

Epoch 1/20 (Model: Simple RNN)


Epoch 1:   7%|▋         | 5/67 [00:08<01:44,  1.68s/it, loss=0.627]


KeyboardInterrupt: 