In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import random
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import f1_score, accuracy_score

from sklearn.preprocessing import LabelEncoder

from transformers import AutoTokenizer, AutoModel, Trainer, TrainingArguments
from torch.utils.data import Dataset
import os

# Reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()
os.environ["WANDB_DISABLED"] = "true"

In [None]:
# Raw data
df = pd.read_csv('data.csv')

texts = df['letter_text'].astype(str).tolist()
labels_type = df['referal_type'].astype(str).tolist()
labels_urgency = df['urgency'].astype(str).tolist()

# Encode labels
label_encoder_type = LabelEncoder()
label_encoder_urgency = LabelEncoder()
labels_type_enc = label_encoder_type.fit_transform(labels_type)
labels_urgency_enc = label_encoder_urgency.fit_transform(labels_urgency)

num_classes_type = len(label_encoder_type.classes_)
num_classes_urgency = len(label_encoder_urgency.classes_)

# Train/test split
X_train, X_test, y_train_type, y_test_type, y_train_urgency, y_test_urgency = train_test_split(
    texts, labels_type_enc, labels_urgency_enc, test_size=0.2, random_state=42
)

In [None]:
# Multi-Output BERT Model
class MultiOutputBertModel(nn.Module):
    def __init__(self, model_name, num_type_classes, num_urgency_classes):
        super(MultiOutputBertModel, self).__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)


        self.type_classifier = nn.Linear(self.bert.config.hidden_size, num_type_classes)
        self.urgency_classifier = nn.Linear(self.bert.config.hidden_size, num_urgency_classes)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None,
                type_labels=None, urgency_labels=None):
        # Get BERT outputs
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )

    
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)

        # Get logits for both tasks
        type_logits = self.type_classifier(pooled_output)
        urgency_logits = self.urgency_classifier(pooled_output)

        total_loss = None
        if type_labels is not None and urgency_labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            type_loss = loss_fct(type_logits, type_labels)
            urgency_loss = loss_fct(urgency_logits, urgency_labels)
            total_loss = type_loss + urgency_loss

        return {
            'loss': total_loss,
            'type_logits': type_logits,
            'urgency_logits': urgency_logits
        }

In [None]:
# Multi-Output Dataset Class
class MultiOutputTextDataset(Dataset):
    def __init__(self, texts, type_labels, urgency_labels, tokenizer, max_length=512):
        self.texts = texts
        self.type_labels = type_labels
        self.urgency_labels = urgency_labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'type_labels': torch.tensor(self.type_labels[idx], dtype=torch.long),
            'urgency_labels': torch.tensor(self.urgency_labels[idx], dtype=torch.long)
        }

In [None]:
# Custom Trainer for Multi-Output
class MultiOutputTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        outputs = model(**inputs)
        loss = outputs['loss']
        return (loss, outputs) if return_outputs else loss

    def prediction_step(self, model, inputs, prediction_loss_fn, ignore_keys=None):
        inputs = self._prepare_inputs(inputs)

        with torch.no_grad():
            outputs = model(**inputs)
            loss = outputs['loss']

            # Get predictions for both tasks
            type_preds = outputs['type_logits']
            urgency_preds = outputs['urgency_logits']

            # Stack predictions
            predictions = [type_preds, urgency_preds]
            # Get labels
            type_labels = inputs.get('type_labels')
            urgency_labels = inputs.get('urgency_labels')
            labels = [type_labels, urgency_labels]

        return (loss, predictions, labels)

In [None]:
# Training function
def train_multi_output_model(model_name, train_dataset, eval_dataset, num_type_classes, num_urgency_classes, output_dir):
    # Initialize tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = MultiOutputBertModel(model_name, num_type_classes, num_urgency_classes)

    # Training arguments
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=15,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=32,
        logging_dir=f"{output_dir}/logs",
        report_to="none",
        logging_strategy="epoch",
        eval_strategy="epoch",
        save_strategy="no",
        load_best_model_at_end=False
    )

    # Initialize trainer
    trainer = MultiOutputTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=tokenizer
    )

    # Train the model
    trainer.train()

    return trainer, model, tokenizer

In [None]:
def evaluate_multi_output_model(trainer, test_dataset, y_test_type, y_test_urgency,
                               label_encoder_type, label_encoder_urgency, model_name="Multi-Output BERT"):

    # Get predictions
    predictions = trainer.predict(test_dataset)

    # Extract predictions for both tasks
    type_predictions = predictions.predictions[0]
    urgency_predictions = predictions.predictions[1]

    # Convert to numpy
    if hasattr(type_predictions, 'cpu'):
        type_predictions = type_predictions.cpu().numpy()
    if hasattr(urgency_predictions, 'cpu'):
        urgency_predictions = urgency_predictions.cpu().numpy()

    # Get predicted classes
    y_pred_type = np.argmax(type_predictions, axis=1)
    y_pred_urgency = np.argmax(urgency_predictions, axis=1)

    # Check class presence for both tasks
    present_type_classes = np.unique(np.concatenate([np.unique(y_test_type), np.unique(y_pred_type)]))
    present_urgency_classes = np.unique(np.concatenate([np.unique(y_test_urgency), np.unique(y_pred_urgency)]))

    print("\n" + "="*60)
    print(f"{model_name.upper()} Performance")
    print("="*60)

    # Class analysis
    print(f"\nClass presence analysis:")
    print(f"Referral Type - Classes present: {len(present_type_classes)}/{len(label_encoder_type.classes_)}")
    print(f"Urgency - Classes present: {len(present_urgency_classes)}/{len(label_encoder_urgency.classes_)}")

    # Get labels for present classes
    present_type_names = [label_encoder_type.classes_[i] for i in present_type_classes]
    present_urgency_names = [label_encoder_urgency.classes_[i] for i in present_urgency_classes]

    # Classification reports with proper class handling
    print("\nClassification Report: Referral Type")
    print(classification_report(y_test_type, y_pred_type,
                              labels=present_type_classes,
                              target_names=present_type_names,
                              zero_division=0))

    print("\nClassification Report: Urgency")
    print(classification_report(y_test_urgency, y_pred_urgency,
                              labels=present_urgency_classes,
                              target_names=present_urgency_names,
                              zero_division=0))


    # Confusion Matrices
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    fig.suptitle(f'{model_name} Performance Visualization', fontsize=16)

    # Referral Type Confusion Matrix
    cm_type = confusion_matrix(y_test_type, y_pred_type, labels=present_type_classes)
    sns.heatmap(cm_type, annot=True, fmt='d', cmap='Blues', ax=axes[0],
                xticklabels=present_type_names,
                yticklabels=present_type_names)
    axes[0].set_title("Confusion Matrix: Referral Type")
    axes[0].set_xlabel("Predicted Label")
    axes[0].set_ylabel("True Label")

    # Urgency Confusion Matrix
    cm_urgency = confusion_matrix(y_test_urgency, y_pred_urgency, labels=present_urgency_classes)
    sns.heatmap(cm_urgency, annot=True, fmt='d', cmap='Reds', ax=axes[1],
                xticklabels=present_urgency_names,
                yticklabels=present_urgency_names)
    axes[1].set_title("Confusion Matrix: Urgency")
    axes[1].set_xlabel("Predicted Label")
    axes[1].set_ylabel("True Label")

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

# Full Dataset

## BERT

In [None]:
# Create datasets
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

train_dataset = MultiOutputTextDataset(
    X_train, y_train_type, y_train_urgency, tokenizer
)

test_dataset = MultiOutputTextDataset(
    X_test, y_test_type, y_test_urgency, tokenizer
)

# Train the multi-output model
trainer, model, tokenizer = train_multi_output_model(
    model_name="bert-base-uncased",
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    num_type_classes=num_classes_type,
    num_urgency_classes=num_classes_urgency,
    output_dir="./multi_output_bert"
)

# Evaluate the model
evaluate_multi_output_model(
    trainer, test_dataset, y_test_type, y_test_urgency,
    label_encoder_type, label_encoder_urgency
)

## ClinicalBERT

In [None]:
# Create tokenizer for ClinicalBERT
clinical_tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

# Create datasets
tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

train_dataset = MultiOutputTextDataset(
    X_train, y_train_type, y_train_urgency, tokenizer
)

test_dataset = MultiOutputTextDataset(
    X_test, y_test_type, y_test_urgency, tokenizer
)

# Train the multi-output model
trainer, model, tokenizer = train_multi_output_model(
    model_name="emilyalsentzer/Bio_ClinicalBERT",
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    num_type_classes=num_classes_type,
    num_urgency_classes=num_classes_urgency,
    output_dir="./multi_output_clinical_bert"
)

# Evaluate the model
evaluate_multi_output_model(
    trainer, test_dataset, y_test_type, y_test_urgency,
    label_encoder_type, label_encoder_urgency
)


## 5-Fold Cross Validation for Both Models

In [None]:

df = pd.DataFrame({
    'text': texts,
    'referal_type': labels_type_enc,
    'urgency': labels_urgency_enc
}).reset_index(drop=True)

X = df[['text']]
y_type = df['referal_type'].values
y_urgency = df['urgency'].values

# Determine number of classes
num_type_classes = len(np.unique(y_type))
num_urgency_classes = len(np.unique(y_urgency))

print(f"Number of type classes: {num_type_classes}")
print(f"Number of urgency classes: {num_urgency_classes}")
print(f"Data shape: {X.shape}")

# === Prepare Cross-Validation ===
kf = KFold(n_splits=5, shuffle=True, random_state=42)

bert_results = {"f1_type": [], "acc_type": [], "f1_urgency": [], "acc_urgency": []}
clinical_results = {"f1_type": [], "acc_type": [], "f1_urgency": [], "acc_urgency": []}

for model_name, result_dict in [("bert-base-uncased", bert_results),
                                 ("emilyalsentzer/Bio_ClinicalBERT", clinical_results)]:
    print(f"\nRunning {model_name}")

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"\nFold {fold+1}")

        # Split data using proper indexing
        X_train = X.iloc[train_idx].reset_index(drop=True)
        X_val = X.iloc[val_idx].reset_index(drop=True)
        y_train_type = y_type[train_idx]
        y_train_urgency = y_urgency[train_idx]
        y_val_type = y_type[val_idx]
        y_val_urgency = y_urgency[val_idx]

        # Create tokenizer
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        # Datasets
        train_dataset = MultiOutputTextDataset(X_train['text'], y_train_type, y_train_urgency, tokenizer)
        val_dataset = MultiOutputTextDataset(X_val['text'], y_val_type, y_val_urgency, tokenizer)

        # Train
        trainer, model, tokenizer = train_multi_output_model(
            model_name=model_name,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            num_type_classes=num_type_classes,
            num_urgency_classes=num_urgency_classes,
            output_dir=f"./{model_name.replace('/', '_')}_fold{fold+1}"
        )

        # Predict
        predictions = trainer.predict(val_dataset)
        type_preds = predictions.predictions[0].argmax(axis=1)
        urgency_preds = predictions.predictions[1].argmax(axis=1)

        # Evaluate
        result_dict["f1_type"].append(f1_score(y_val_type, type_preds, average='macro'))
        result_dict["acc_type"].append(accuracy_score(y_val_type, type_preds))
        result_dict["f1_urgency"].append(f1_score(y_val_urgency, urgency_preds, average='macro'))
        result_dict["acc_urgency"].append(accuracy_score(y_val_urgency, urgency_preds))

        print(f"Fold {fold+1} Results:")
        print(f"  Type F1: {result_dict['f1_type'][-1]:.4f}, Acc: {result_dict['acc_type'][-1]:.4f}")
        print(f"  Urgency F1: {result_dict['f1_urgency'][-1]:.4f}, Acc: {result_dict['acc_urgency'][-1]:.4f}")

# Summary Table
df_results = pd.DataFrame({
    "Fold": [1, 2, 3, 4, 5],
    "BERT_F1_Type": bert_results['f1_type'],
    "ClinicalBERT_F1_Type": clinical_results['f1_type'],
    "BERT_F1_Urgency": bert_results['f1_urgency'],
    "ClinicalBERT_F1_Urgency": clinical_results['f1_urgency']
})

# Calculate means and standard deviations
print("\n Summary Statistics ")
print(f"BERT Type F1: {np.mean(bert_results['f1_type']):.4f} ± {np.std(bert_results['f1_type']):.4f}")
print(f"ClinicalBERT Type F1: {np.mean(clinical_results['f1_type']):.4f} ± {np.std(clinical_results['f1_type']):.4f}")
print(f"BERT Urgency F1: {np.mean(bert_results['f1_urgency']):.4f} ± {np.std(bert_results['f1_urgency']):.4f}")
print(f"ClinicalBERT Urgency F1: {np.mean(clinical_results['f1_urgency']):.4f} ± {np.std(clinical_results['f1_urgency']):.4f}")

print("\nDetailed Results:")
print(df_results)

# Vascular-Only

In [None]:
# Load data and filter out non-vascular
df = pd.read_csv('data.csv')
df = df[df['referal_type'] != 'non vascular']

print("Referral types after filtering:")
print(df['referal_type'].value_counts())

texts = df['letter_text'].astype(str).tolist()
labels_type = df['referal_type'].astype(str).tolist()
labels_urgency = df['urgency'].astype(str).tolist()

# Encode labels
label_encoder_type = LabelEncoder()
label_encoder_urgency = LabelEncoder()
labels_type_enc = label_encoder_type.fit_transform(labels_type)
labels_urgency_enc = label_encoder_urgency.fit_transform(labels_urgency)

num_classes_type = len(label_encoder_type.classes_)
num_classes_urgency = len(label_encoder_urgency.classes_)

print(f"\nNumber of referral type classes: {num_classes_type}")
print(f"Referral type classes: {label_encoder_type.classes_}")
print(f"Number of urgency classes: {num_classes_urgency}")
print(f"Urgency classes: {label_encoder_urgency.classes_}")

# Train/test split
X_train, X_test, y_train_type, y_test_type, y_train_urgency, y_test_urgency = train_test_split(
    texts, labels_type_enc, labels_urgency_enc, test_size=0.2, random_state=42
)

## BERT

In [None]:
# Create datasets for multi-output model
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

train_dataset = MultiOutputTextDataset(
    X_train, y_train_type, y_train_urgency, tokenizer
)

test_dataset = MultiOutputTextDataset(
    X_test, y_test_type, y_test_urgency, tokenizer
)


trainer, model, tokenizer = train_multi_output_model(
    model_name="bert-base-uncased",
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    num_type_classes=num_classes_type,
    num_urgency_classes=num_classes_urgency,
    output_dir="./multi_output_bert_nonvascular"
)

# Evaluate the model
evaluate_multi_output_model(
    trainer, test_dataset, y_test_type, y_test_urgency,
    label_encoder_type, label_encoder_urgency
)

## ClinicalBERT

In [None]:
# Create datasets for multi-output model
tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

train_dataset = MultiOutputTextDataset(
    X_train, y_train_type, y_train_urgency, tokenizer
)

test_dataset = MultiOutputTextDataset(
    X_test, y_test_type, y_test_urgency, tokenizer
)


trainer, model, tokenizer = train_multi_output_model(
    model_name="emilyalsentzer/Bio_ClinicalBERT",
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    num_type_classes=num_classes_type,
    num_urgency_classes=num_classes_urgency,
    output_dir="./multi_output_clinical_bert_nonvascular"
)

# Evaluate the model
evaluate_multi_output_model(
    trainer, test_dataset, y_test_type, y_test_urgency,
    label_encoder_type, label_encoder_urgency
)

## 5-Fold Cross Validation for Both Models

In [None]:
df = pd.DataFrame({
    'text': texts,
    'referal_type': labels_type_enc,
    'urgency': labels_urgency_enc
}).reset_index(drop=True)

X = df[['text']]
y_type = df['referal_type'].values
y_urgency = df['urgency'].values

# Determine number of classes
num_type_classes = len(np.unique(y_type))
num_urgency_classes = len(np.unique(y_urgency))

print(f"Number of type classes: {num_type_classes}")
print(f"Number of urgency classes: {num_urgency_classes}")
print(f"Data shape: {X.shape}")

# Prepare Cross-Validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

bert_results = {"f1_type": [], "acc_type": [], "f1_urgency": [], "acc_urgency": []}
clinical_results = {"f1_type": [], "acc_type": [], "f1_urgency": [], "acc_urgency": []}

for model_name, result_dict in [("bert-base-uncased", bert_results),
                                 ("emilyalsentzer/Bio_ClinicalBERT", clinical_results)]:
    print(f"\nRunning {model_name}")

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"\nFold {fold+1}")

        # Split data using proper indexing
        X_train = X.iloc[train_idx].reset_index(drop=True)
        X_val = X.iloc[val_idx].reset_index(drop=True)
        y_train_type = y_type[train_idx]
        y_train_urgency = y_urgency[train_idx]
        y_val_type = y_type[val_idx]
        y_val_urgency = y_urgency[val_idx]

        # Create tokenizer
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        # Datasets
        train_dataset = MultiOutputTextDataset(X_train['text'], y_train_type, y_train_urgency, tokenizer)
        val_dataset = MultiOutputTextDataset(X_val['text'], y_val_type, y_val_urgency, tokenizer)

        # Train
        trainer, model, tokenizer = train_multi_output_model(
            model_name=model_name,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            num_type_classes=num_type_classes,
            num_urgency_classes=num_urgency_classes,
            output_dir=f"./{model_name.replace('/', '_')}_fold{fold+1}"
        )

        # Predict
        predictions = trainer.predict(val_dataset)
        type_preds = predictions.predictions[0].argmax(axis=1)
        urgency_preds = predictions.predictions[1].argmax(axis=1)

        # Evaluate
        result_dict["f1_type"].append(f1_score(y_val_type, type_preds, average='macro'))
        result_dict["acc_type"].append(accuracy_score(y_val_type, type_preds))
        result_dict["f1_urgency"].append(f1_score(y_val_urgency, urgency_preds, average='macro'))
        result_dict["acc_urgency"].append(accuracy_score(y_val_urgency, urgency_preds))

        print(f"Fold {fold+1} Results:")
        print(f"  Type F1: {result_dict['f1_type'][-1]:.4f}, Acc: {result_dict['acc_type'][-1]:.4f}")
        print(f"  Urgency F1: {result_dict['f1_urgency'][-1]:.4f}, Acc: {result_dict['acc_urgency'][-1]:.4f}")


# Summary Table
df_results = pd.DataFrame({
    "Fold": [1, 2, 3, 4, 5],
    "BERT_F1_Type": bert_results['f1_type'],
    "ClinicalBERT_F1_Type": clinical_results['f1_type'],
    "BERT_F1_Urgency": bert_results['f1_urgency'],
    "ClinicalBERT_F1_Urgency": clinical_results['f1_urgency']
})

# Calculate means and standard deviations
print("\nSummary Statistics")
print(f"BERT Type F1: {np.mean(bert_results['f1_type']):.4f} ± {np.std(bert_results['f1_type']):.4f}")
print(f"ClinicalBERT Type F1: {np.mean(clinical_results['f1_type']):.4f} ± {np.std(clinical_results['f1_type']):.4f}")
print(f"BERT Urgency F1: {np.mean(bert_results['f1_urgency']):.4f} ± {np.std(bert_results['f1_urgency']):.4f}")
print(f"ClinicalBERT Urgency F1: {np.mean(clinical_results['f1_urgency']):.4f} ± {np.std(clinical_results['f1_urgency']):.4f}")

print("\nDetailed Results:")
print(df_results)