In [1]:
import sys
import os

original_sys_path = sys.path.copy()
try:
    sys.path.insert(0, os.path.abspath(".."))
    from src.modeling_helpers import (
        MultilabelModel,
        AutoTokenizer,
        load_trial_data,
        CustomDataset,
        custom_collate_fn,
    )
finally:
    sys.path = original_sys_path

import numpy as np
import pandas as pd
from statsmodels.stats.contingency_tables import mcnemar
import torch
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
import itertools


def mcnemar_test(true_labels, preds_model_1, preds_model_2, exact):
    """
    Performs McNemar's test to compare the performance of two binary predictive models
    separately for positive instances, negative instances, and all instances.

    Parameters:
    - true_labels: The true labels of the data.
    - preds_model_1: The predictions made by the first model.
    - preds_model_2: The predictions made by the second model.
    - exact: If exact test should be used.

    Returns:
    - A dictionary with the McNemar Test Statistic, the p-value, and the contingency table
      for positive instances, negative instances, and all instances.
    """
    results = {}

    # Define a local function to calculate the contingency table and perform the test
    def perform_test(true_labels_subset, preds_model_1_subset, preds_model_2_subset):
        good_preds_model_1 = (preds_model_1_subset == true_labels_subset).astype(int)
        good_preds_model_2 = (preds_model_2_subset == true_labels_subset).astype(int)

        a, b, c, d = 0, 0, 0, 0
        for i in range(len(good_preds_model_1)):
            if good_preds_model_1[i] == 1 and good_preds_model_2[i] == 1:
                a += 1
            elif good_preds_model_1[i] == 1 and good_preds_model_2[i] == 0:
                b += 1
            elif good_preds_model_1[i] == 0 and good_preds_model_2[i] == 1:
                c += 1
            elif good_preds_model_1[i] == 0 and good_preds_model_2[i] == 0:
                d += 1

        contingency_table = np.array([[a, b], [c, d]])
        mcnemar_result = mcnemar(contingency_table, exact=exact)
        return mcnemar_result.statistic, mcnemar_result.pvalue, contingency_table

    # All instances
    all_stat, all_pval, all_contingency = perform_test(
        true_labels, preds_model_1, preds_model_2
    )
    results["all"] = (all_stat, all_pval, all_contingency)

    # Positive instances
    positive_mask = np.where(true_labels == 1)
    positive_stat, positive_pval, positive_contingency = perform_test(
        true_labels[positive_mask],
        preds_model_1[positive_mask],
        preds_model_2[positive_mask],
    )
    results["positive"] = (positive_stat, positive_pval, positive_contingency)

    # Negative instances
    negative_mask = np.where(true_labels == 0)
    negative_stat, negative_pval, negative_contingency = perform_test(
        true_labels[negative_mask],
        preds_model_1[negative_mask],
        preds_model_2[negative_mask],
    )
    results["negative"] = (negative_stat, negative_pval, negative_contingency)

    # Formatting the results for output
    formatted_results = {
        category: {
            "Statistic": stat,
            "P-Value": pval,
            "Contingency Table": contingency.tolist(),
        }
        for category, (stat, pval, contingency) in results.items()
    }

    return formatted_results


def load_model(model_config):
    """
    Loads a model based on the provided configuration.

    Parameters:
    - model_config: A dictionary containing the model's configuration,
                    including its feature use configuration and model saving path.

    Returns:
    - model: The loaded model.
    """
    # Unpack the configuration
    feature_use_config = model_config["feature_use_config"]
    model_saving_path = model_config["model_saving_path"]

    # Load tokenizers (adjust as necessary for your specific setup)
    group_desc_tokenizer = AutoTokenizer.from_pretrained(
        "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext"
    )
    eligibility_tokenizer = AutoTokenizer.from_pretrained(
        "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext"
    )
    smiles_tokenizer = AutoTokenizer.from_pretrained("DeepChem/ChemBERTa-77M-MLM")
    smiles_tokenizer.add_tokens(["[PLACEBO]", "[NOSMILES]"])

    # Placeholder for NUM_LABELS
    NUM_LABELS = 27

    # Instantiate model
    model = MultilabelModel(
        "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext",
        "DeepChem/ChemBERTa-77M-MLM",
        len(smiles_tokenizer),
        NUM_LABELS,
        feature_use_config=feature_use_config,
    )

    # Load the model's state dict
    model.load_state_dict(torch.load(model_saving_path))

    return model


def compute_baseline_predictions(
    train_df: pd.DataFrame, test_df: pd.DataFrame
) -> float:

    # Extract actual labels from the test set
    actuals = test_df.drop(
        columns=["group_description", "eligibility_criteria", "smiles"]
    ).values

    # Calculate the majority class from the training set
    train_df_label = train_df.drop(
        columns=["group_description", "eligibility_criteria", "smiles"]
    )
    majority_classes = (train_df_label.mean() > 0.5).astype(int).values

    # Create predictions using majority classes
    predictions_majority_class = np.tile(majority_classes, (actuals.shape[0], 1))

    return predictions_majority_class


def compute_predictions(model, dataloader, device):
    """
    Validate the model on a given dataset.

    Args:
        model (nn.Module): The model to be evaluated.
        dataloader (DataLoader): Dataloader for the validation data.
        device (torch.device): Device to run the validation (e.g., 'cuda' or 'cpu').
    """
    model.eval()
    model = model.to(device)
    predictions, actuals = [], []
    progress_bar = tqdm(dataloader, leave=False)

    with torch.no_grad():
        for batch in progress_bar:
            inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}
            outputs = model(**inputs)

            logits = torch.sigmoid(outputs).cpu().numpy()

            predictions.append(logits > 0.5)

    model = model.to("cpu")

    return np.vstack(predictions).astype(int)

In [2]:
# Load data
train_df_base, val_df, test_df = load_trial_data(
    "../data/classification/smiles/train_base"
)
train_df_augmented, _, _ = load_trial_data(
    "../data/classification/smiles/train_augmented"
)

# Configuration for each model
model_configs = [
    {
        "feature_use_config": {
            "group_desc": False,
            "eligibility": False,
            "smiles": True,
        },
        "model_saving_path": "../models/smiles/train_base/smiles_only/model.pt",
    },
    {
        "feature_use_config": {
            "group_desc": True,
            "eligibility": False,
            "smiles": True,
        },
        "model_saving_path": "../models/smiles/train_base/smiles_group_desc/model.pt",
    },
    {
        "feature_use_config": {"group_desc": True, "eligibility": True, "smiles": True},
        "model_saving_path": "../models/smiles/train_base/all/model.pt",
    },
    {
        "feature_use_config": {
            "group_desc": True,
            "eligibility": False,
            "smiles": True,
        },
        "model_saving_path": "../models/smiles/train_augmented/smiles_group_desc/model.pt",
    },
    {
        "feature_use_config": {"group_desc": True, "eligibility": True, "smiles": True},
        "model_saving_path": "../models/smiles/train_augmented/all/model.pt",
    },
]

# Load all models
models = [load_model(config) for config in tqdm(model_configs)]

group_desc_tokenizer = AutoTokenizer.from_pretrained(
    "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext"
)
eligibility_tokenizer = AutoTokenizer.from_pretrained(
    "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext"
)
smiles_tokenizer = AutoTokenizer.from_pretrained("DeepChem/ChemBERTa-77M-MLM")
smiles_tokenizer.add_tokens(["[PLACEBO]", "[NOSMILES]"])

# Test datasets
test_dataset = CustomDataset(
    test_df,
    group_desc_tokenizer,
    eligibility_tokenizer,
    smiles_tokenizer,
    text_max_len=512,
    smiles_max_len=512,
)

# Test dataloaders
test_dataloader = DataLoader(
    test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn
)

# Use .flatten() to micro-average
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
true_labels = test_df.drop(
    columns=["group_description", "eligibility_criteria", "smiles"]
).values.flatten()
baseline_base_preds = compute_baseline_predictions(train_df_base, test_df).flatten()
baseline_augmented_preds = compute_baseline_predictions(
    train_df_augmented, test_df
).flatten()
models_0_preds = compute_predictions(models[0], test_dataloader, device).flatten()
models_1_preds = compute_predictions(models[1], test_dataloader, device).flatten()
models_2_preds = compute_predictions(models[2], test_dataloader, device).flatten()
models_3_preds = compute_predictions(models[3], test_dataloader, device).flatten()
models_4_preds = compute_predictions(models[4], test_dataloader, device).flatten()

models_predictions = {
    "Majority Class Base": baseline_base_preds,
    "Majority Class Augmented": baseline_augmented_preds,
    "SMILES Base": models_0_preds,
    "Group Description & SMILES Base": models_1_preds,
    "Eligibility Criteria, Group Description & SMILES Base": models_2_preds,
    "Group Description & SMILES Augmented": models_3_preds,
    "Eligibility Criteria, Group Description & SMILES Augmented": models_4_preds,
}

# Perform pairwise comparisons
comparison_results = {}

for (model_1_name, preds_model_1), (
    model_2_name,
    preds_model_2,
) in itertools.combinations(models_predictions.items(), 2):
    results = mcnemar_test(true_labels, preds_model_1, preds_model_2, exact=True)
    all_instance_results = results["all"]
    statistic, pvalue = (
        all_instance_results["Statistic"],
        all_instance_results["P-Value"],
    )
    comparison_name = f"{model_1_name} vs {model_2_name}"
    comparison_results[comparison_name] = {"Statistic": statistic, "P-Value": pvalue}

original_alpha = 0.05
number_of_comparisons = len(comparison_results)
adjusted_alpha = original_alpha / number_of_comparisons  # Bonferroni correction

model_names = list(models_predictions.keys())
comparison_matrix = pd.DataFrame(index=model_names, columns=model_names)

# Populate the DataFrame with p-values
for comparison, result in comparison_results.items():
    model_1_name, model_2_name = comparison.split(" vs ")
    pvalue = result["P-Value"]
    # Fill symmetric cells with the p-value
    comparison_matrix.loc[model_1_name, model_2_name] = pvalue
    comparison_matrix.loc[model_2_name, model_1_name] = pvalue

# Fill the diagonal with 1 since a model compared with itself is not significant
np.fill_diagonal(comparison_matrix.values, 1)
comparison_matrix = comparison_matrix.applymap(
    lambda x: "{:.2e}".format(float(x)) if isinstance(x, (int, float)) else x
)
comparison_matrix

  0%|          | 0/5 [00:00<?, ?it/s]

  return self.fget.__get__(instance, owner)()
Some weights of RobertaModel were not initialized from the model checkpoint at DeepChem/ChemBERTa-77M-MLM and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at DeepChem/ChemBERTa-77M-MLM and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at DeepChem/ChemBERTa-77M-MLM and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not i

  0%|          | 0/24 [00:00<?, ?it/s]

  0%|          | 0/24 [00:00<?, ?it/s]

  0%|          | 0/24 [00:00<?, ?it/s]

  0%|          | 0/24 [00:00<?, ?it/s]

  0%|          | 0/24 [00:00<?, ?it/s]

  comparison_matrix = comparison_matrix.applymap(


Unnamed: 0,Majority Class Base,Majority Class Augmented,SMILES Base,Group Description & SMILES Base,"Eligibility Criteria, Group Description & SMILES Base",Group Description & SMILES Augmented,"Eligibility Criteria, Group Description & SMILES Augmented"
Majority Class Base,1.0,1.0,0.448,1.56e-24,1.0500000000000001e-58,2.11e-62,1.2899999999999998e-134
Majority Class Augmented,1.0,1.0,0.448,1.56e-24,1.0500000000000001e-58,2.11e-62,1.2899999999999998e-134
SMILES Base,0.448,0.448,1.0,1.77e-25,1.54e-64,6.6e-64,3.62e-151
Group Description & SMILES Base,1.56e-24,1.56e-24,1.77e-25,1.0,9.589999999999999e-23,9.76e-18,2.72e-76
"Eligibility Criteria, Group Description & SMILES Base",1.0500000000000001e-58,1.0500000000000001e-58,1.54e-64,9.589999999999999e-23,1.0,0.703,4.5e-31
Group Description & SMILES Augmented,2.11e-62,2.11e-62,6.6e-64,9.76e-18,0.703,1.0,6.2599999999999994e-30
"Eligibility Criteria, Group Description & SMILES Augmented",1.2899999999999998e-134,1.2899999999999998e-134,3.62e-151,2.72e-76,4.5e-31,6.2599999999999994e-30,1.0
