In [34]:
import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, auc, RocCurveDisplay, balanced_accuracy_score
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Benchmarking for all videos considered at once

In [35]:
def compute_roc_auc(df, csv_file, proba='post_proba'):
    y = df['label']
    ŷ = df[proba]
    result = roc_auc_score(y, ŷ)
    csv_file['roc_auc'] = result

    return result

In [36]:
def get_confusion_matrix(true_labels: list, avg_model_labels: list, file_name: str, output_dir: str):
    # Calculate the confusion matrix
    conf_matrix = confusion_matrix(true_labels, avg_model_labels, labels=[0, 1])

    # Calculate percentages
    total = np.sum(conf_matrix)
    percentage_matrix = (conf_matrix / total) * 100

    # Create an annotated matrix that includes percentages
    annot_matrix = np.array([[f"{val}\n({percentage:.2f}%)" for val, percentage in zip(row, percentage_row)]
                             for row, percentage_row in zip(conf_matrix, percentage_matrix)])

    # Visualize the confusion matrix
    plt.figure(figsize=(10, 7))
    sns.heatmap(conf_matrix, annot=annot_matrix, fmt='',
                xticklabels=[0, 1],
                yticklabels=[0, 1],
                cmap="Blues")
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title(f'Confusion Matrix', fontsize=14, fontweight='bold')

    conf_matrix_path = os.path.join(output_dir, file_name)
    plt.savefig(conf_matrix_path)

    plt.show()

In [37]:
def calculate_metrics(true_labels, pred_labels, label_name):
    # Calculate metrics
    accuracy = round(accuracy_score(true_labels, pred_labels) * 100, 2)  # TP + TN / n
    precision = round(precision_score(true_labels, pred_labels, pos_label=1, average='binary') * 100,
                      2)  # TP / TP + FP
    recall = round(recall_score(true_labels, pred_labels, pos_label=1, average='binary') * 100, 2)  # TP / TP + FN
    f1 = round(f1_score(true_labels, pred_labels, pos_label=1, average='binary') * 100,
               2)  # 2 * [(recall * precision) / (recall + precision)]
    balanced_accuracy = round(balanced_accuracy_score(true_labels, pred_labels) * 100, 2)

    # Calculate percentages
    correct_no = round(
        sum([1 for true, pred in zip(true_labels, pred_labels) if true == pred and true == 0]) / true_labels.count(
            0) * 100, 2)
    incorrect_no = round(
        sum([1 for true, pred in zip(true_labels, pred_labels) if true != pred and true == 0]) / true_labels.count(
            0) * 100, 2)
    correct_spur = round(
        sum([1 for true, pred in zip(true_labels, pred_labels) if true == pred and true == 1]) / true_labels.count(
            1) * 100, 2)
    incorrect_spur = round(
        sum([1 for true, pred in zip(true_labels, pred_labels) if true != pred and true == 1]) / true_labels.count(
            1) * 100, 2)

    return [label_name, balanced_accuracy, accuracy, precision, recall, f1, correct_no, incorrect_no, correct_spur, incorrect_spur]

In [38]:
def evaluate_models(df, video_idx=None):
    # Initialize DataFrame
    if video_idx:
        results_df = pd.DataFrame(
            columns=['video_id', 'balanced_accuracy', 'accuracy', 'precision', 'recall', 'f1_score', 'correct_no', 'incorrect_no', 'correct_spur',
                 'incorrect_spur'])
    else:
        results_df = pd.DataFrame(
            columns=['k', 'balanced_accuracy', 'accuracy', 'precision', 'recall', 'f1_score', 'correct_no', 'incorrect_no', 'correct_spur',
                 'incorrect_spur'])

    # Extract true labels from df and filter None values
    true_labels_k = [label for label in df['label'].tolist() if label is not None]

    for col in df.columns:
        if col.startswith('post_pred'):
            # Filter None values
            model_labels_k = [label for label in df[col].tolist() if label is not None]

            if len(true_labels_k) != len(model_labels_k):
                print(f"Skipping evaluation for {col} due to inconsistent label lengths after filtering None values.")
                continue

            model_name = col.split('_')[1]
            results_df.loc[len(results_df)] = calculate_metrics(true_labels_k, model_labels_k, video_idx)

    return results_df

In [39]:
def compute_auc_roc(output_file_path, results_frame_df, roc_curve_file_name, csv_file, save=True):
    auc_roc = compute_roc_auc(results_frame_df, csv_file)
    fpr, tpr, thresholds = compute_roc_curve(results_frame_df)
    roc_display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc_roc).plot()
    if save: roc_display.figure_.savefig(os.path.join(output_file_path, roc_curve_file_name))

In [40]:
def compute_roc_curve(df, proba='post_proba'):
    #y = df['label'].map({'no': 0, 'spur': 1})
    y = df['label']
    ŷ = df[proba].astype(float)

    result = roc_curve(y, ŷ, pos_label=1)

    return result

In [41]:
def create_benchmark(video_df, sub_dest_dir):
    #get_confusion_matrix(true_labels=video_df['label'].tolist(), avg_model_labels=video_df['pred_avg'].tolist(),
                         #file_name=f'confusion_matrix.png', output_dir=sub_dest_dir)
    
    results_frame_df = evaluate_models(video_df)
    compute_auc_roc(sub_dest_dir, video_df, 'roc_curve.pdf', results_frame_df, save=False)
    
    output_file_path = os.path.join(sub_dest_dir, 'eval_output.csv')
    results_frame_df.to_csv(output_file_path, index=False)

# Get parameters

In [42]:
source_dir = Path('../output/benchmarking/post_pro_sources')
# Where to save
results_dir = Path('../output/benchmarking/post_pro_result')

# Run Benchmarking

In [43]:
def process_directories(source_dir, results_dir):
    for model_dir in source_dir.iterdir():
        if model_dir.is_dir():
            model_name = model_dir.name
            all_results = []

            # Lecture et traitement de chaque fichier CSV
            for file in model_dir.glob("id_*.csv"):
                video_name = file.stem
                temp_df = pd.read_csv(file)

                # Vérification de l'existence des colonnes nécessaires
                required_columns = ['frame', 'post_proba', 'post_pred', 'label']
                if all(column in temp_df.columns for column in required_columns):
                    temp_df = temp_df[required_columns]

                    if model_name == 'nnunet' or model_name == 'vresnet':
                        temp_df['label'] = temp_df['label'].replace({'no': 0, 'spur': 1})
                    #     temp_df['post_pred'] = temp_df['post_pred'].replace({'no': 0, 'spur': 1})

                    # Application des fonctions d'évaluation sur les données temporaires
                    results_frame_df = evaluate_models(temp_df, video_name)
                    auc_roc = compute_roc_auc(temp_df, results_frame_df)
                    # Agrégation des résultats
                    all_results.append(results_frame_df)
                else:
                    print(f"Le fichier {file} ne contient pas toutes les colonnes requises.")

            # Concaténation des résultats de tous les fichiers
            final_results_df = pd.concat(all_results)
            numeric_cols = final_results_df.select_dtypes(include=['number']).columns
            mean_values = final_results_df[numeric_cols].mean()
            mean_values_list = ['average'] + mean_values.tolist()

            mean_row = pd.DataFrame([mean_values_list], columns=final_results_df.columns)
            final_results_df = final_results_df._append(mean_row, ignore_index=True)

            # Sauvegarde du fichier CSV
            final_results_df.to_csv(results_dir / f"{model_name}.csv", index=False)
        

In [44]:
final_results = process_directories(source_dir, results_dir)

# Compare models