###  Module & Utility Imports

In [1]:
import os
import re
import pandas as pd
import numpy as np

from Utilities.EvaluationMain import *
from Utilities.Utilities import ReadYaml, SerializeObjects, DeserializeObjects, LoadModelConfigs, LoadParams
from Models.Caller64 import *
from Utilities.Visualization import VisReconGivenZ_FCA, HeatMapFreqZ_FCA, VisReconGivenFC_ZA, VisReconExtractZ_FC

### Load Model Configurations and Evaluation Tables (Accuracy & MI)

In [2]:
def load_evaluation_tables(directory, acc_keyword, acc_pattern, mi_keyword, mi_pattern):
    """
    Load and combine evaluation tables from a specified directory based on filtering keywords.

    Parameters:
        directory (str): Path to the directory containing CSV table files.
        acc_keyword (str): Keyword to identify accuracy tables.
        acc_pattern (str): Additional substring that accuracy table filenames must contain.
        mi_keyword (str): Keyword to identify MI (Mutual Information) tables.
        mi_pattern (str): Additional substring that MI table filenames must contain.

    Returns:
        acc_df (DataFrame): A concatenated DataFrame of accuracy tables with an added 'RMSE' column.
        mi_df (DataFrame): A concatenated DataFrame of MI tables.
    """
    # List all files in the specified directory
    table_list = os.listdir(directory)
    
    # Load and combine accuracy tables
    acc_list = [tab for tab in table_list if acc_keyword in tab and acc_pattern in tab]
    acc_df = pd.DataFrame()
    for tab in acc_list:
        file_path = os.path.join(directory, tab)
        df = pd.read_csv(file_path)
        acc_df = pd.concat([acc_df, df], axis=0)
    # Compute RMSE if the 'MSEdenorm' column is available
    if 'MSEdenorm' in acc_df.columns:
        acc_df['RMSE'] = np.sqrt(acc_df['MSEdenorm'])
    
    # Load and combine MI tables
    mi_list = [tab for tab in table_list if mi_keyword in tab and mi_pattern in tab]
    mi_df = pd.DataFrame()
    for tab in mi_list:
        file_path = os.path.join(directory, tab)
        df = pd.read_csv(file_path)
        mi_df = pd.concat([mi_df, df], axis=0)
    
    return acc_df, mi_df


def load_config_models(config_directory, include_keyword='Config', exclude_keyword='Eval', key='Models'):
    """
    Load configuration files from the specified directory and extract model keys.

    Parameters:
        config_directory (str): Path to the directory containing YAML configuration files.
        include_keyword (str): Only consider files that include this keyword.
        exclude_keyword (str): Exclude files that contain this keyword.
        key (str): The key in the YAML file from which to extract model definitions.

    Returns:
        model_dict (dict): A dictionary mapping configuration file names (without extension)
                           to a list of model keys.
    """
    config_files = [f for f in os.listdir(config_directory)
                    if include_keyword in f and exclude_keyword not in f]
    model_dict = {}
    for config in config_files:
        full_path = os.path.join(config_directory, config)
        config_data = ReadYaml(full_path)
        model_dict[config.split('.')[0]] = list(config_data.get(key, {}).keys())
    return model_dict








# Main evaluation tables
eval_directory = './EvalResults/Tables/'
AcctableSet, MItableSet = load_evaluation_tables(
    eval_directory,
    acc_keyword='Acc',
    acc_pattern='Nj1_FC',
    mi_keyword='MI',
    mi_pattern='Nj1_FC')


# Benchmark evaluation tables
bench_directory = './Benchmarks/EvalResults/Tables/'
BenchAcctableSet, BenchMItableSet = load_evaluation_tables(
    bench_directory,
    acc_keyword='Acc',
    acc_pattern='NjAll',
    mi_keyword='MI',
    mi_pattern='NjAll')

# Define a mapping for metrics to be unified
metrics_map = {
    '(i) $I(V;\\acute{\\Theta} \\mid X)$': '(ii) $I(V;\\acute{\\Theta} \\mid \\acute{Z})$',
    '(ii) $I(S;\\acute{\\Theta} \\mid X)$': '(iii) $I(S;\\acute{\\Theta} \\mid \\acute{Z})$'}

# Create a new column ('UnifiedMetric') while preserving the original 'Metrics'
BenchMItableSet['Metrics'] = BenchMItableSet['Metrics'].replace(metrics_map)

### Functions to Construct Analysis Table and Perform ISCORE-Based Parameter Selection for Main Models


In [3]:
def softplus(x):
    return np.log1p(np.exp(x))  # numerically stable version of log(1 + exp(x))

def load_config_models(config_directory, include_keyword='Config', exclude_keyword='Eval', key='Models'):
    """
    Load configuration files from the specified directory and extract model keys.

    Parameters:
        config_directory (str): Path to the directory containing YAML configuration files.
        include_keyword (str): Only consider files that include this keyword.
        exclude_keyword (str): Exclude files that contain this keyword.
        key (str): The key in the YAML file from which to extract model definitions.

    Returns:
        dict: A dictionary mapping configuration file names (without extension)
              to a list of model keys.
    """
    config_files = [f for f in os.listdir(config_directory)
                    if include_keyword in f and exclude_keyword not in f]
    model_dict = {}
    for config in config_files:
        full_path = os.path.join(config_directory, config)
        config_data = ReadYaml(full_path)
        model_dict[config.split('.')[0]] = list(config_data.get(key, {}).keys())
    return model_dict


def prepare_analysis_table(mi_df, acc_df, target_models, mi_metrics):
    """
    Prepare the analysis table by merging MI and accuracy data, filtering by target models and metrics,
    computing composite score metrics, and parsing model parameters.

    Parameters:
        mi_df (DataFrame): DataFrame containing MI evaluation results.
        acc_df (DataFrame): DataFrame containing accuracy evaluation results.
        target_models (list): List of model names to include in the analysis.
        mi_metrics (list): List of MI metrics (strings) to retain in the analysis.

    Returns:
        DataFrame: The merged and processed analysis table containing performance metrics,
                   composite ISCORE, scaling factors, and parsed model parameters.
    """
    # Filter evaluation tables based on the target models
    mi_table = mi_df[mi_df['Model'].isin(target_models)].reset_index(drop=True)
    acc_table = acc_df[acc_df['Model'].isin(target_models)].reset_index(drop=True)
    
    # Normalize MAPE and select required columns for accuracy table
    if 'MAPEnorm' in acc_table.columns:
        acc_table['MAPEnorm'] = acc_table['MAPEnorm'] / 100
    acc_table = acc_table[['Model', 'MeanKldRes', 'RMSE']].copy()
    acc_table.columns = ['Model', 'FQI', 'RMSE']
    
    # Process MI table: group by Model and Metrics, average values, then filter and pivot the table
    mi_grouped = mi_table.groupby(['Model', 'Metrics']).mean().reset_index()
    #mi_filtered = mi_grouped[mi_grouped['Metrics'].isin(mi_metrics)].reset_index(drop=True)
    mi_pivot = pd.pivot(mi_grouped, index='Model', columns='Metrics', values='Values').reset_index()
    mi_pivot = mi_pivot.fillna(0)
    
    # Merge MI and accuracy tables
    merged_table = pd.merge(mi_pivot, acc_table, on='Model', how='inner').sort_values('Model').reset_index(drop=True)
    
    # Split the 'Model' string into structural parameters
    split_cols = merged_table['Model'].str.split('_', expand=True)
    if split_cols.shape[1] == 6:
        split_cols.columns = ['Prefix', 'Type', 'Depth', 'LatDim', 'Comp', 'Source']
        merged_table = pd.concat([merged_table, split_cols], axis=1)
    elif split_cols.shape[1] == 4:
        mask = split_cols[3].isna() | (split_cols[3] == 'None')
        split_cols.loc[mask, 3] = split_cols.loc[mask, 2]
        split_cols.loc[mask, 2] = 0
        split_cols.columns = ['Prefix', 'Type', 'LatDim', 'Source']
        merged_table = pd.concat([merged_table, split_cols], axis=1)
    else:
        print("Warning: Unexpected model naming format. Check the 'Model' column.")
    
    # Compute composite information score (ISCORE)
    merged_table['ISCORE'] = softplus(
        merged_table['(i) $I(V; \\acute{Z} \\mid Z)$'] +
        merged_table['(iii) $I(S;\\acute{\\Theta} \\mid \\acute{Z})$'] -
        merged_table['(ii) $I(V;\\acute{\\Theta} \\mid \\acute{Z})$']
    )
    
    # Compute scaling based on exponential of FQI and RMSE and then the scaled ISCORE
    merged_table['Scaling'] = (np.exp(-merged_table['FQI']) + np.exp(-merged_table['RMSE'])) / 2
    merged_table['ISCOREScal'] = merged_table['ISCORE'] * merged_table['Scaling']
    
    return merged_table


### Construct Analysis Table and Perform ISCORE-Based Parameter Selection for Main Models

In [4]:
# Load configuration models and combine all model keys across configuration files
config_directory = './Config/'
TabLists = load_config_models(config_directory)
AnalTabList = list(np.concatenate([tabs for key, tabs in TabLists.items()]))

legend_map = {
    'Depth': r'$\zeta$',
    'LatDim': r'$J$',
    'Comp': r'$C$'
}

# Define ablation models to exclude from final analysis (if needed later)
AblationList = [
    'FC_ART_1_50_800_Mimic', 'FC_ART_1_50_800_VitalDB',
    'SKZ_ART_1_50_800_Mimic', 'SKZ_ART_1_50_800_VitalDB',
    'FC_II_1_50_800_Mimic', 'FC_II_1_50_800_VitalDB',
    'SKZ_II_1_50_800_Mimic', 'SKZ_II_1_50_800_VitalDB'
]

# Define MI metrics to be used in the analysis
AnalMetricList = [
    '(i) $I(V; \\acute{Z} \\mid Z)$',
    '(ii) $I(V;\\acute{\\Theta} \\mid \\acute{Z})$',
    '(iii) $I(S;\\acute{\\Theta} \\mid \\acute{Z})$'
]


# Prepare the merged analysis table using the function
AnalAccMItable = prepare_analysis_table(MItableSet, AcctableSet, AnalTabList, AnalMetricList)

# Exclude ablation models from main analysis
SenseAccMItable = AnalAccMItable[~AnalAccMItable['Model'].isin(AblationList)]


# Parameter search: find optimal Depth, LatDim, Comp per Type based on ISCOREScal
ParamSearch = pd.DataFrame()
SelBestReport = pd.DataFrame()
for metric in ['Depth', 'LatDim', 'Comp']:
    ResTableGroup = SenseAccMItable.groupby(['Type', metric]).mean(numeric_only=True).reset_index()
    Param = ResTableGroup.loc[ResTableGroup.groupby("Type")["ISCOREScal"].idxmax(), ["Type", metric, "ISCOREScal"]]
    Param['Param'] = metric
    Param = Param.rename(columns={metric : 'Value'})[['Type','Param', 'Value', 'ISCOREScal']]
    ParamSearch = pd.concat([ParamSearch, Param], axis=0)

    SelBest =  ResTableGroup[['Type', metric,'ISCOREScal']].copy()
    SelBest['Hyperparameter'] = legend_map[metric]
    SelBest = SelBest.rename(columns={metric:'Setting', 'ISCOREScal':'ISCORE'})
    SelBest = SelBest[['Type','Hyperparameter','Setting', 'ISCORE']]
    SelBestReport = pd.concat([SelBestReport, SelBest], axis=0)



In [5]:
# Step 3: Pivot to wide format: one row per (Hyperparameter, Setting), ISCORE values for ART and II as columns
SelBestReport_wide = SelBestReport.pivot_table(
    index=['Hyperparameter', 'Setting'],
    columns='Type',
    values='ISCORE'
).reset_index()
SelBestReport_wide = SelBestReport_wide.rename(columns={'ART': 'ABP', 'II': 'ECG'})
SelBestReport_wide['Setting'] = SelBestReport_wide['Setting'].replace({'500': '5s', '800': '8s'})


# Step 4: Bold the max ISCORE per Type within each Hyperparameter group
for t in ['ABP', 'ECG']:
    # Find the max value for each (Hyperparameter) group
    max_mask = SelBestReport_wide.groupby('Hyperparameter')[t].transform('max') == SelBestReport_wide[t]
    # Apply bold formatting for those max values
    SelBestReport_wide[t] = SelBestReport_wide.apply(
        lambda row: f"\\textbf{{{row[t]:.3f}}}" if max_mask.loc[row.name] else f"{row[t]:.3f}",
        axis=1
    )

# Step 5: Convert to LaTeX with vertical line between Setting and metrics
latex_code = r"""\begin{table}[h]
\centering
\caption{Group-wise sensitivity analysis results by type and hyperparameter.}
\label{tab:SelBestReport}
""" + SelBestReport_wide.to_latex(index=False, escape=False, column_format="cc|cc") + r"""\end{table}"""

# Output LaTeX code
print(latex_code)

\begin{table}[h]
\centering
\caption{Group-wise sensitivity analysis results by type and hyperparameter.}
\label{tab:SelBestReport}
\begin{tabular}{cc|cc}
\toprule
Hyperparameter & Setting & ABP & ECG \\
\midrule
$C$ & 5s & 2.031 & 2.047 \\
$C$ & 8s & \textbf{2.036} & \textbf{2.419} \\
$J$ & 30 & 2.024 & 2.198 \\
$J$ & 50 & \textbf{2.054} & \textbf{2.489} \\
$\zeta$ & 1 & \textbf{2.159} & \textbf{2.435} \\
$\zeta$ & 2 & 1.909 & 2.155 \\
\bottomrule
\end{tabular}
\end{table}


### Construct Analysis Table (ISCORE-Based) for Benchmarks

In [6]:
# Load configuration models and combine all model keys across configuration files
Bench_config_directory = './Benchmarks/Config/'
BenchTabLists = load_config_models(Bench_config_directory)
BenchAnalTabList = list(np.concatenate([tabs for key, tabs in BenchTabLists.items()]))

# Prepare the merged analysis table using the function
BenchAnalAccMItable = prepare_analysis_table(BenchMItableSet, BenchAcctableSet, BenchAnalTabList, AnalMetricList)

### Best model selection

In [7]:
# Convert parameter search result into dictionary format:
# e.g., ParamDict = {'ART': {'Depth': '1', 'LatDim': '50', 'Comp': '800'}, ...}
ParamDict = { t: { row['Param']: row['Value']
                    for _, row in ParamSearch[ParamSearch['Type'] == t].iterrows()
                 }  for t in ParamSearch['Type'].unique() }


BestModelList = pd.DataFrame()
# Iterate over sources and types to find best models based on selected parameters
for Source in ['Mimic','VitalDB']:
    SelDataset = SenseAccMItable[SenseAccMItable['Source'] == Source]
    for Type, Value in ParamDict.items():
        SelModels = SenseAccMItable[(SenseAccMItable['Type'] == Type) & 
                        (SenseAccMItable['Depth'] == Value['Depth']) & 
                        (SenseAccMItable['LatDim'] == Value['LatDim']) & 
                        (SenseAccMItable['Comp'] == Value['Comp']) &
                        (SenseAccMItable['Source'] == Source)]
        BestModelList = pd.concat([BestModelList, SelModels]) 

# Build the comparison table including both ablation models and selected best models
AblaAccMItable = AnalAccMItable[AnalAccMItable['Model'].isin( AblationList + BestModelList['Model'].tolist())]

# Build the comparison table including both benchmark models and selected best models
MainCompList = ['SKZFC_ART_1_30_800_Mimic',  'SKZFC_II_1_30_800_Mimic', 'SKZFC_ART_1_30_800_VitalDB', 'SKZFC_II_1_30_800_VitalDB']
SelModelComp = AnalAccMItable[AnalAccMItable['Model'].isin( MainCompList)][BenchAnalAccMItable.columns]
BenchAnalAccMItable = pd.concat([BenchAnalAccMItable, SelModelComp]).copy()


# BenchAnalAccMItable[(BenchAnalAccMItable['Type'] =='II') & (BenchAnalAccMItable['Source'] =='Mimic')].sort_values('ISCOREScal')
# 1. ART & VitalDB: SKZFC_ART_1_30_800_VitalDB, VDWave_ART_VitalDB, TCVAE_ART_30_VitalDB
# 2. ART & Mimic: SKZFC_ART_1_30_800_Mimic, VDWave_ART_Mimic, TCVAE_ART_30_Mimic
# 3. II & VitalDB: SKZFC_II_1_30_800_VitalDB, VDWave_II_VitalDB, TCVAE_II_30_VitalDB
# 4. II & Mimic: SKZFC_II_1_30_800_Mimic, VDWave_II_Mimic, TCVAE_II_30_Mimic

### Save Evaluation Summary Tables to 'EvalResults/SummaryTables/' Directory

In [8]:
# Define the output directory
output_dir = './EvalResults/SummaryTables/'
os.makedirs(output_dir, exist_ok=True)

# Define table name to DataFrame mapping
tables_to_save = {
    'AnalAccMItable.csv': AnalAccMItable,
    'SenseAccMItable.csv': SenseAccMItable,
    'BenchAnalAccMItable.csv': BenchAnalAccMItable,
    'AblaAccMItable.csv': AblaAccMItable,
}

# Save each DataFrame to the specified directory
for filename, df in tables_to_save.items():
    save_path = os.path.join(output_dir, filename)
    df.to_csv(save_path, index=False)
    print(f"Saved: {save_path}")

Saved: ./EvalResults/SummaryTables/AnalAccMItable.csv
Saved: ./EvalResults/SummaryTables/SenseAccMItable.csv
Saved: ./EvalResults/SummaryTables/BenchAnalAccMItable.csv
Saved: ./EvalResults/SummaryTables/AblaAccMItable.csv
