In [None]:
import pandas as pd
import os

import numpy as np
import utils
import matplotlib.pyplot as plt
from seaborn import heatmap
from sklearn.metrics import f1_score

from sklearn.metrics import confusion_matrix
from statsmodels.stats.contingency_tables import mcnemar
from statsmodels.stats.multitest import multipletests



In [None]:
plt.rcParams['svg.fonttype'] = 'none'

## Functions

In [None]:
def preprocess_dataframes(dataframes):
    for df_name, df in dataframes.items():
        # First, replace specific values, to make them all conform to one standard
        df.replace({"yes": "Yes", "no": "No", "None": "No"}, inplace=True)
        df.replace({1: "Yes", "0": "No", -1: "Maybe", "None": "No"}, inplace=True)
        df.replace({1.0: "Yes", 0.0: "No", -1.0: "Maybe", np.nan: "No"}, inplace=True)
        df.replace({"1.0": "Yes", "0.0": "No", "-1.0": "Maybe", "nan": "No"}, inplace=True)
        df.replace({"Maybe": "Yes"}, inplace=True)

        

        # Then, replace all values that are not "Yes" with "No"
        dataframes[df_name] = dataframes[df_name].apply(lambda col: col.map(lambda x: "No" if x != "Yes" else "Yes"))
    return dataframes

In [None]:
def calculate_f1_scores(df_ground_truth, df_predictions):
    """
    Calculate individual, average, micro, and macro F1 scores for multi-label classification.

    Parameters:
    - df_ground_truth: DataFrame containing the ground truth labels.
    - df_predictions: DataFrame containing the predicted labels.

    Returns:
    - A tuple containing the dictionary of F1 scores for each label, average F1 score, micro F1 score, and macro F1 score.
    """
    # Check for values other than 'Yes' or 'No' and notify replacement
    for df in [df_ground_truth, df_predictions]:
        if df.isin(['Yes', 'No']).all().all() == False:
            raise ValueError(f"{df} dataframe contains values other than 'Yes' or 'No'. Please ensure all values are 'Yes' or 'No'.")

    # Convert 'Yes'/'No' to binary format
    df_ground_truth_binary = df_ground_truth.apply(lambda col: col.map({'Yes': 1, 'No': 0}))
    df_predictions_binary = df_predictions.apply(lambda col: col.map({'Yes': 1, 'No': 0}))

    # Align columns
    common_columns = df_ground_truth.columns.intersection(df_predictions.columns)
    df_ground_truth_binary = df_ground_truth_binary[common_columns]
    df_predictions_binary = df_predictions_binary.reindex(columns=df_ground_truth_binary.columns)

    # Calculate F1 scores for each label
    f1_scores = {label: f1_score(df_ground_truth_binary[label], df_predictions_binary[label])
                 for label in common_columns}

    # Calculate average, micro, and macro F1 scores
    average_f1 = sum(f1_scores.values()) / len(f1_scores)
    micro_f1 = f1_score(df_ground_truth_binary.values.ravel(), df_predictions_binary.values.ravel(), average='micro')
    macro_f1 = f1_score(df_ground_truth_binary, df_predictions_binary, average='macro', zero_division=0)

    return f1_scores, average_f1, micro_f1, macro_f1


## Constants

In [None]:
base_path = '/base/path/for/inference_results/'
few_shot_df = pd.read_csv('/base/path/for/inference_results/few_shot_reports_imagenome.csv')
few_shot_list = few_shot_df['acc'].tolist()

diseases = ['Atelectasis', 'Pleural_Effusion', 'Pneumonia', 'Pneumothorax']

In [None]:
file_paths = {
    'gpt_4': os.path.join(base_path, 'zero_shot_gpt_4/gpt_labeled_reports.csv'),
    'gpt_35': os.path.join(base_path, 'zero_shot_gpt_35/gpt_labeled_reports.csv'),
    'llama13b': os.path.join(base_path, 'zero_shot_llama13b/llm_labeled_reports.csv'),
    'llama70b': os.path.join(base_path, 'zero_shot_llama70b/llm_labeled_reports.csv'),
    'mistral7b': os.path.join(base_path, 'zero_shot_mistral7b/llm_labeled_reports.csv'),
    'mixtral8x7b': os.path.join(base_path, 'zero_shot_mistral7b/llm_labeled_reports.csv'),
    'qwen72b': os.path.join(base_path, 'zero_shot_qwen72b/llm_labeled_reports.csv'),

    
    'ground_truth': "/base/path/for/inference_results/reports_imagenome_labeled_wo_few_shots.csv",
    'chexbert': "/base/path/for/inference_results/chexbert_labeled_imagenome.csv",
    'chexpert': "/base/path/for/inference_results/chexPert_labeled_imagenome.csv"
}


## Pre-Processing

In [None]:
dataframes = {name: utils.load_and_preprocess_data(path, few_shot_list) for name, path in file_paths.items()}


#create raw copy that will not get processed for further analysis
dataframes_raw = dataframes.copy()

In [None]:
# go through dataframes and drop all columns that are not in diseases
for name, df in dataframes.items():
    for col in df.columns:
        if col not in diseases:
            df.drop(col, axis=1, inplace=True)
    df.replace(["No Information", "Undefined"], "No", inplace=True)

In [None]:
dataframes = preprocess_dataframes(dataframes)

In [None]:
gpt_4 = dataframes['gpt_4']
gpt_35 = dataframes['gpt_35']

llama13b = dataframes['llama13b']
llama70b = dataframes['llama70b']
mistral7b = dataframes['mistral7b']
mixtral8x7b = dataframes['mixtral8x7b']
qwen72b = dataframes['qwen72b']
ground_truth = dataframes['ground_truth']
chexbert = dataframes['chexbert']
chexpert = dataframes['chexpert']


In [None]:
# combine Mixtral, Llama70 and qwen in an ensemble
# NOTE: this only works for the binary use case, there is not a straighforward way
# to apply this to the multi-class case using just three models (potentially each model has a different label)
llama70b_numerical = pd.DataFrame(index=llama70b.index, columns=llama70b.columns)
mixtral8x7b_numerical = pd.DataFrame(index=mixtral8x7b.index, columns=mixtral8x7b.columns)
qwen_72b_numerical = pd.DataFrame(index=qwen72b.index, columns=qwen72b.columns)


llama70b_numerical = llama70b.apply(lambda x: x.map({'Yes': 1, 'No': 0}))
mixtral8x7b_numerical = mixtral8x7b.apply(lambda x: x.map({'Yes': 1, 'No': 0}))
qwen_72b_numerical = qwen72b.apply(lambda x: x.map({'Yes': 1, 'No': 0}))

majority_vote_sum = llama70b_numerical + mixtral8x7b_numerical + qwen_72b_numerical


model_ensemble = majority_vote_sum.apply(lambda col: col.map(lambda x: 'Yes' if x >= 2 else 'No'))


## Evaluation

In [None]:
# Ground truth labels
y_true = ground_truth

# Prediction DataFrames
predictions = {
    'gpt_4': gpt_4,
    'gpt_35': gpt_35,
    'llama13b': llama13b,
    'llama70b': llama70b,
    'mistral7b': mistral7b,
    'mixtral8x7b': mixtral8x7b,
    'qwen72b': qwen72b,
    'chexbert': chexbert,
    'chexpert': chexpert,
    'ensemble': model_ensemble
}

In [None]:

# Calculating F1 scores for each model
model_scores = {model_name: calculate_f1_scores(y_true, y_pred) for model_name, y_pred in predictions.items()}

# Creating DataFrame for model scores
scores_data = {}
for model_name, (f1_scores, average_f1, micro_f1, macro_f1) in model_scores.items():
    for finding, score in f1_scores.items():
        scores_data.setdefault(finding, {}).update({f'F1 Score {model_name.title()}': score})
    scores_data.setdefault('Average', {}).update({f'F1 Score {model_name.title()}': average_f1})
    scores_data.setdefault('Micro F1', {}).update({f'F1 Score {model_name.title()}': micro_f1})
    scores_data.setdefault('Macro F1', {}).update({f'F1 Score {model_name.title()}': macro_f1})

f1_scores_df = pd.DataFrame(scores_data).T.round(3)

# change the order of the columns 
f1_scores_df = f1_scores_df[['F1 Score Chexpert', 'F1 Score Chexbert', 'F1 Score Mistral7B',  'F1 Score Llama13B',  'F1 Score Mixtral8X7B', 'F1 Score Llama70B','F1 Score Qwen72B', 'F1 Score Ensemble', 'F1 Score Gpt_35', 'F1 Score Gpt_4']]
f1_scores_df

In [None]:
# save inference results
f1_scores_df.to_csv(os.path.join(base_path, 'f1_scores_few_shot_positive.csv'))

## Label Frequency in Groundtruth

In [None]:
# count values for each column
ground_truth_count = ground_truth.apply(pd.Series.value_counts)
ground_truth_count = ground_truth_count.T.fillna(0).astype(int)

#ground_truth_count.to_csv('ground_truth_count.csv')
ground_truth_count

## Statistics

In [None]:
def mcnemar_test_corrected(model1, model2, diseases, ground_truth):
    valid_values = {"Yes", "No"}
    
    def check_values(df, columns):
        for col in columns:
            if not df[col].isin(valid_values).all():
                raise ValueError(f"Column {col} contains values other than 'Yes' or 'No'.")
    
    check_values(model1, diseases)
    check_values(model2, diseases)

    results = []
    combined_contingency = [0, 0, 0, 0]  # yes_yes, yes_no, no_yes, no_no

    for disease in diseases:
        correct_correct = ((model1[disease] == ground_truth[disease]) & (model2[disease] == ground_truth[disease])).sum()
        correct_incorrect = ((model1[disease] == ground_truth[disease]) & (model2[disease] != ground_truth[disease])).sum()
        incorrect_correct = ((model1[disease] != ground_truth[disease]) & (model2[disease] == ground_truth[disease])).sum()
        incorrect_incorrect = ((model1[disease] != ground_truth[disease]) & (model2[disease] != ground_truth[disease])).sum()
        
        combined_contingency[0] += correct_correct
        combined_contingency[1] += correct_incorrect
        combined_contingency[2] += incorrect_correct
        combined_contingency[3] += incorrect_incorrect
        
        contingency_table = [[correct_correct, correct_incorrect], [incorrect_correct, incorrect_incorrect]]

        result = mcnemar(contingency_table, exact=True if min(correct_incorrect, incorrect_correct) < 25 else False)
        stat = result.statistic
        p = result.pvalue
        
        results.append({
            "Disease": disease,
            "McNemar Statistic": stat,
            "P-value": p
        })
    
    # Perform McNemar test on the combined contingency table
    combined_table = [[combined_contingency[0], combined_contingency[1]],
                      [combined_contingency[2], combined_contingency[3]]]

    combined_result = mcnemar(combined_table, exact=True if np.sum([combined_contingency[1], combined_contingency[2]])<25 else False)
    # Append combined results
    results.append({
        "Disease": "All Combined",
        "McNemar Statistic": combined_result.statistic,
        "P-value": combined_result.pvalue
    })
    
    results_df = pd.DataFrame(results)
    p_values = results_df["P-value"].values
    bonferroni_corrected = multipletests(p_values, alpha=0.05, method='bonferroni')
    results_df['Bonferroni-corrected P-value'] = bonferroni_corrected[1]
    
    return results_df


aggregated_results_mcnemar = pd.DataFrame()

for model_name, model_predictions in predictions.items():


    results_df_mcnemar = mcnemar_test_corrected(gpt_4, model_predictions, diseases, y_true)
    results_df_mcnemar['Model'] = model_name  # Add a column for the model name
    # Ensure the 'Disease' column is included before concatenation
    aggregated_results_mcnemar = pd.concat([aggregated_results_mcnemar, results_df_mcnemar], axis=0)


# Resetting index for better readability and to avoid duplicate indices
aggregated_results_mcnemar.reset_index(drop=True, inplace=True)
aggregated_results_mcnemar.set_index(["Model"], inplace=True)
total_mcnemar = aggregated_results_mcnemar[aggregated_results_mcnemar["Disease"] == "All Combined"]
total_mcnemar

In [None]:
# save the test results to csv
total_mcnemar.to_csv(os.path.join(base_path, 'total_mcnemar_zero_shot.csv'))