In [None]:
!pip install -r requirments.txt

In [None]:
import joblib
import pandas as pd
import numpy as np
from sklearn.metrics import log_loss, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import shap
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
import os
from mutagen.mp3 import MP3

In [None]:
testFeaturesPath = "Features/test_audio_features.csv"
metadataPath = 'metadata.csv'
testLabelPath = "acoustic_test_labels.csv"
additionalMetadataPath = "additional_metadata.csv"

In [None]:
model = joblib.load('Model/model.joblib')
TestData = pd.read_csv(testFeaturesPath)
TestData['uid'] = TestData['file_name'].apply(lambda x: x[:-4])
meta = pd.read_csv(metadataPath)[['uid','age','gender']]
meta['gender'] = meta['gender'].apply(lambda x : 1 if x == 'male' else 0)
test_data = pd.merge(TestData, meta, on='uid')
test_label = pd.read_csv(testLabelPath)
test_label['label'] = (test_label['diagnosis_control'] + 2*test_label['diagnosis_mci'] + 3*test_label['diagnosis_adrd'] -1).astype(np.uint8)
test_label.drop(['diagnosis_control'	,'diagnosis_mci'	,'diagnosis_adrd'], axis=1,inplace=True)
test_data = pd.merge(test_data, test_label, on='uid')
test_data.drop('augmentation_type', axis=1, inplace=True)

In [None]:
test_data

In [None]:
uids = test_data['uid'].values
labels = test_data['label'].values
X_test = test_data.drop(columns=['uid', 'label', 'augmentation_type', 'file_name'], errors='ignore')
y_test_proba = model.predict_proba(X_test)
feature_names = X_test.columns.tolist()
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(labels)
class_labels = label_encoder.classes_

# ---------------------------------------------
# Evaluate on Test Set
# ---------------------------------------------

In [None]:
y_test_proba = model.predict_proba(X_test)
y_test_pred = model.predict(X_test)

test_metrics = {
    'Accuracy': [],
    'Precision': [],
    'Recall': [],
    'F1-Score': [],
    'Log Loss': []
}

per_class_test_metrics = {}
for class_idx, class_label in enumerate(class_labels):
    per_class_test_metrics[f'Class_{class_label}_Sensitivity'] = []
    per_class_test_metrics[f'Class_{class_label}_Specificity'] = []

acc = accuracy_score(labels, y_test_pred)
precision = precision_score(labels, y_test_pred, average='weighted', zero_division=0)
recall = recall_score(labels, y_test_pred, average='weighted', zero_division=0)
f1 = f1_score(labels, y_test_pred, average='weighted', zero_division=0)
loss = log_loss(labels, y_test_proba)

test_metrics['Accuracy'].append(acc)
test_metrics['Precision'].append(precision)
test_metrics['Recall'].append(recall)
test_metrics['F1-Score'].append(f1)
test_metrics['Log Loss'].append(loss)

cm = confusion_matrix(labels, y_test_pred, labels=range(len(class_labels)))

for class_idx, class_label in enumerate(class_labels):
    TP = cm[class_idx, class_idx]
    FN = cm[class_idx, :].sum() - TP
    FP = cm[:, class_idx].sum() - TP
    TN = cm.sum() - (TP + FP + FN)

    sensitivity = TP / (TP + FN) if (TP + FN) > 0 else 0.0

    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0.0

    per_class_test_metrics[f'Class_{class_label}_Sensitivity'].append(sensitivity)
    per_class_test_metrics[f'Class_{class_label}_Specificity'].append(specificity)

test_metrics_combined = {**test_metrics, **per_class_test_metrics}

test_metrics_df = pd.DataFrame(test_metrics_combined)

print("Test Set Metrics:")
print(f"Accuracy: {acc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"Log Loss: {loss:.4f}")

for class_label in class_labels:
    sens = test_metrics_combined[f'Class_{class_label}_Sensitivity'][0]
    spec = test_metrics_combined[f'Class_{class_label}_Specificity'][0]
    print(f"Class {class_label} Sensitivity: {sens:.4f}")
    print(f"Class {class_label} Specificity: {spec:.4f}")

metrics_summary_test = pd.DataFrame({
    'Metric': list(test_metrics_combined.keys()),
    'Value': [test_metrics_combined[key][0] for key in test_metrics_combined]
})

metrics_summary_test.to_csv("test_validation_results.csv")


In [None]:
proba_df = pd.DataFrame(
    y_test_proba,
    columns=['diagnosis_control', 'diagnosis_mci', 'diagnosis_adrd']
)
proba_df['uid'] = uids
proba_df['confidence'] = proba_df[['diagnosis_control', 'diagnosis_mci', 'diagnosis_adrd']].max(axis=1)
final_proba_df = proba_df.loc[proba_df.groupby('uid')['confidence'].idxmax()]
final_proba_df = final_proba_df.drop(columns=['confidence'])
final_proba_df.reset_index(drop=True, inplace=True)

final_proba_df

In [None]:
final_pred = pd.DataFrame(
    y_test_pred,
    columns=['pred']
)
final_pred['uid'] = uids
final_pred.reset_index(drop=True, inplace=True)


In [None]:
metadata = pd.read_csv(additionalMetadataPath)
mainMetadata = pd.read_csv(metadataPath)

In [None]:
def get_mp3_files_info(folder_path):
    data = []
    for file_name in os.listdir(folder_path):
        if file_name.lower().endswith('.mp3'):
            file_path = os.path.join(folder_path, file_name)
            try:
                audio = MP3(file_path)
                length = audio.info.length 
                data.append({'uid': file_name, 'Length': length})
            except Exception as e:
                print(f"Could not process file {file_name}: {e}")

    df = pd.DataFrame(data)
    return df
folder_path = '/home/siavash/Downloads/ACousticNIHCompetition/test_audios'
mp3_info_df = get_mp3_files_info(folder_path)
mp3_info_df['uid'] = mp3_info_df['uid'].apply(lambda x: x[:-4])
mp3_info_df

In [None]:
bias_analysis_dataset = pd.merge(final_pred, metadata, on='uid')
bias_analysis_dataset = pd.merge(bias_analysis_dataset, test_label, on='uid')
bias_analysis_dataset = pd.merge(bias_analysis_dataset, mainMetadata, on='uid')
bias_analysis_dataset = pd.merge(bias_analysis_dataset, mp3_info_df, on='uid')
bias_analysis_dataset = bias_analysis_dataset.drop(['diagnosis', 'split', 'filesize_kb', 'hash'], axis=1)
bias_analysis_dataset

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import chi2_contingency, ttest_ind
import warnings

warnings.filterwarnings('ignore')

sns.set(style="whitegrid")

df = bias_analysis_dataset.copy()

print("First 5 rows of the dataset:")
print(df.head())

categorical_cols = ['race', 'language', 'handedness', 'education', 'corpus', 'gender']
numerical_cols = ['age', 'Length']

for col in categorical_cols:
    if df[col].isnull().sum() > 0:
        mode = df[col].mode()[0]
        df[col].fillna(mode, inplace=True)
        print(f"Filled missing values in '{col}' with mode: {mode}")

for col in numerical_cols:
    if df[col].isnull().sum() > 0:
        median = df[col].median()
        df[col].fillna(median, inplace=True)
        print(f"Filled missing values in '{col}' with median: {median}")

df['Correct'] = np.where(df['pred'] == df['label'], 'Correct', 'Incorrect')

print("\nPrediction Accuracy Distribution:")
print(df['Correct'].value_counts())

def chi_square_test(df, column, alpha=0.05):
    contingency_table = pd.crosstab(df[column], df['Correct'])
    chi2, p, dof, ex = chi2_contingency(contingency_table)
    result = {
        'Variable': column,
        'Chi2': chi2,
        'p-value': p,
        'Significant': p < alpha
    }
    return result

chi2_results = []
for col in categorical_cols:
    result = chi_square_test(df, col)
    chi2_results.append(result)

chi2_df = pd.DataFrame(chi2_results)

significant_chi2 = chi2_df[chi2_df['Significant']]

print("\nSignificant Chi-Square Test Results:")
print(significant_chi2[['Variable', 'Chi2', 'p-value']])

def t_test(df, column, alpha=0.05):
    group_correct = df[df['Correct'] == 'Correct'][column]
    group_incorrect = df[df['Correct'] == 'Incorrect'][column]
    t_stat, p = ttest_ind(group_correct, group_incorrect, equal_var=False)
    result = {
        'Variable': column,
        't-Statistic': t_stat,
        'p-value': p,
        'Significant': p < alpha
    }
    return result

ttest_results = []
for col in numerical_cols:
    result = t_test(df, col)
    ttest_results.append(result)

ttest_df = pd.DataFrame(ttest_results)

significant_ttest = ttest_df[ttest_df['Significant']]

print("\nSignificant t-Test Results:")
print(significant_ttest[['Variable', 't-Statistic', 'p-value']])

def plot_significant_categorical(df, column):
    plt.figure(figsize=(8,6))
    sns.countplot(data=df, x=column, hue='Correct')
    plt.title(f'{column.capitalize()} vs Prediction Accuracy')
    plt.legend(title='Prediction', loc='upper right')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

for _, row in significant_chi2.iterrows():
    plot_significant_categorical(df, row['Variable'])

def plot_significant_numerical(df, column):
    plt.figure(figsize=(8,6))
    sns.boxplot(data=df, x='Correct', y=column)
    plt.title(f'{column.capitalize()} Distribution by Prediction Accuracy')
    plt.tight_layout()
    plt.show()

for _, row in significant_ttest.iterrows():
    plot_significant_numerical(df, row['Variable'])

def analyze_categorical(df, column):
    contingency_table = pd.crosstab(df[column], df['Correct'], normalize='index') * 100
    print(f"\nDetailed Analysis for '{column}':")
    print(contingency_table)
    print("\nInterpretation:")
    incorrect_rates = contingency_table['Incorrect']
    highest_incorrect = incorrect_rates.idxmax()
    highest_rate = incorrect_rates.max()
    print(f"- Category '{highest_incorrect}' in '{column}' has the highest incorrect prediction rate of {highest_rate:.2f}%.")
    print(f"- Categories with lower incorrect rates may be contributing to higher accuracy.")

for _, row in significant_chi2.iterrows():
    analyze_categorical(df, row['Variable'])

def analyze_numerical(df, column):
    group_correct = df[df['Correct'] == 'Correct'][column]
    group_incorrect = df[df['Correct'] == 'Incorrect'][column]
    mean_correct = group_correct.mean()
    mean_incorrect = group_incorrect.mean()
    print(f"\nDetailed Analysis for '{column}':")
    print(f"- Mean {column} for Correct Predictions: {mean_correct:.2f}")
    print(f"- Mean {column} for Incorrect Predictions: {mean_incorrect:.2f}")
    if mean_incorrect > mean_correct:
        print(f"--> Higher values of '{column}' are associated with incorrect predictions.")
    else:
        print(f"--> Lower values of '{column}' are associated with incorrect predictions.")

for _, row in significant_ttest.iterrows():
    analyze_numerical(df, row['Variable'])


In [None]:
subject_index = 5 

X_single = X_test.iloc[[subject_index]].copy()

selector = model.named_steps['select']
xgb_model_final = model.named_steps['xgb']

X_single_selected = selector.transform(X_single)

selected_indices = selector.get_support(indices=True)
selected_feature_names = [feature_names[i] for i in selected_indices]

explainer = shap.TreeExplainer(xgb_model_final)


shap_values_full = explainer.shap_values(X_single_selected)



num_classes = shap_values_full.shape[2] 
class_names = list(label_encoder.classes_)  

class_names_str = [str(cls) for cls in class_names]

shap_values_list = [shap_values_full[:, :, c] for c in range(num_classes)]


for c in range(num_classes):
    print(f"shap_values_list[{c}].shape =", shap_values_list[c].shape)  


def categorize_feature(feature_name):
    if feature_name.startswith('Embedding1_'):
        return 'Wav2Vec'
    elif feature_name.startswith('Embedding2_'):
        return 'Whisper'
    elif 'age' in feature_name.lower():
        return 'Age'
    elif 'gender' in feature_name.lower():
        return 'Gender'
    else:
        return 'OpenSMILE'

feature_categories = [categorize_feature(fn) for fn in selected_feature_names]
unique_categories = sorted(list(set(feature_categories)))
print("Unique Categories:", unique_categories)

category_shap_sums = {cat: np.zeros(num_classes) for cat in unique_categories}

for c in range(num_classes):
    shap_values_for_c = shap_values_list[c]  
    shap_1d = shap_values_for_c[0, :]      
    
    for feature_idx, cat in enumerate(feature_categories):
        category_shap_sums[cat][c] += shap_1d[feature_idx]

print("\ncategory_shap_sums =", category_shap_sums)


df_category_shap = pd.DataFrame({
    'Category': unique_categories
})

for c in range(num_classes):
    df_category_shap[f'SHAP_{class_names_str[c]}'] = [
        category_shap_sums[cat][c] for cat in unique_categories
    ]

print("\n--- SHAP Summaries by Category for Subject Index:", subject_index, "---")
print(df_category_shap)


category_palette = {
    'Wav2Vec': '#1f77b4',   
    'Whisper': '#ff7f0e',   
    'OpenSMILE': '#2ca02c',  
    'Age': '#9467bd',        
    'Gender': '#8c564b'    
}


for cat in unique_categories:
    if cat not in category_palette:
        category_palette[cat] = '#7f7f7f'  

for c in range(num_classes):
    class_label = class_names_str[c]  
    plt.figure(figsize=(10, 6))  
    sns.barplot(
        x='SHAP_' + class_label,
        y='Category',
        data=df_category_shap,
        order=unique_categories,
        palette=[category_palette[cat] for cat in unique_categories]
    )
    plt.title(f"Subject {subject_index}: SHAP by Category for Class = {class_label}", fontsize=18)
    plt.xlabel("SHAP Contribution", fontsize=14)
    plt.ylabel("Category", fontsize=14)
    plt.axvline(0, color='grey', linewidth=0.8)  
    plt.tight_layout()
    plt.show()