In [None]:
# ! pip install librosa
# ! pip install parselmouth
# ! pip install pyAudioAnalysis
# ! pip install torch
# ! pip install transformers
# ! pip install pandas
# ! pip install tqdm
# ! pip install scikit-learn
# ! pip install parselmouth
# ! pip install nolds

In [1]:
%cd Project

/content/Project


In [12]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchaudio
import librosa
import parselmouth
from parselmouth.praat import call
from glob import glob
from tqdm import tqdm
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, recall_score, precision_score, roc_auc_score, roc_curve, auc
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from transformers import AutoFeatureExtractor, WhisperModel, AutoModelForAudioClassification
import logging
import gc
from torch.utils.data import Dataset, DataLoader
import nolds
from sklearn.model_selection import GroupKFold
from pathlib import Path

In [3]:
base_path = "ParkCeleb_filtered"
groups = {"PD": 1, "CN": 0}

parkinson_files = []
control_files = []

# Loop over each group (PD, CN)
for group, label in groups.items():
    group_path = os.path.join(base_path, group)
    # Recursively search for any .wav file
    wav_files = glob(os.path.join(group_path, "**", "*.wav"), recursive=True)

    if label == 1:
        parkinson_files.extend(wav_files)
    else:
        control_files.extend(wav_files)

# Merge and create labels
all_files = parkinson_files + control_files
labels = [1] * len(parkinson_files) + [0] * len(control_files)

print(f"Found {len(parkinson_files)} Parkinson files and {len(control_files)} Control files.")


Found 8708 Parkinson files and 5349 Control files.


# Load ParkCeleb dataset and extract speaker IDs

In [4]:
def load_parkceleb_dataset_with_speakers(dataset_path):
    groups = {"PD": 1, "CN": 0}
    audio_files = []
    labels = []
    speaker_ids = []

    # Loop over each group (PD, CN)
    for group, label in groups.items():
        group_path = os.path.join(dataset_path, group)
        # Recursively search for the specific file
        for audio_file in glob(os.path.join(group_path, "**", "*.wav"), recursive=True):
            audio_files.append(audio_file)
            labels.append(label)

            # Extract speaker ID from the path (assuming structure where speaker ID is the directory name after the group name e.g. CN/cn_01 or PD/pd_01)
            try:
                # Example: dataset_path/PD/speaker01/utterance_1.wav
                relative_path = os.path.relpath(audio_file, group_path)
                speaker_id = relative_path.split(os.sep)[0]
                speaker_ids.append(speaker_id)
            except IndexError:
                print(f"Could not extract speaker ID from path: {audio_file}")
                speaker_ids.append(None) # Handle cases where speaker ID cannot be extracted, caused by errors in dataset

    # Filter out files where speaker ID couldn't be determined, some stupid bug
    valid_indices = [i for i, spkr_id in enumerate(speaker_ids) if spkr_id is not None]
    audio_files = [audio_files[i] for i in valid_indices]
    labels = [labels[i] for i in valid_indices]
    speaker_ids = [speaker_ids[i] for i in valid_indices]

    return audio_files, labels, speaker_ids

# Create whisper features

In [5]:
# Read time before diagnosis from speakers_info.csv
def get_time_before_diagnosis(speaker_path):
    info_path = os.path.join(speaker_path, "speakers_info.csv")

    try:
        df = pd.read_csv(info_path)
        # Filter rows based on conditions
        filtered = df[
            (df['status'] == 'target') &
            (df['before_after_diagnosis'] == 'before') &
            (df['years_from_diagnosis'].between(0, 11, inclusive='neither'))
        ]
        if not filtered.empty:
            return filtered['years_from_diagnosis'].iloc[0]
    except Exception as e:
        print(f"Error reading {info_path}: {e}")
    return None

def extract_whisper_features(audio_paths, model_name="openai/whisper-small"):
    model = WhisperModel.from_pretrained(model_name)
    feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    features = []
    labels = []
    ids = []
    times = []
    lengths = []
    successful_paths = []

    for audio_path in tqdm(audio_paths, desc="Extracting Whisper features"):
        try:
            audio_array, sr = librosa.load(audio_path, sr=16000, mono=True)
            # print(audio_path)

            # Extract features using the feature extractor
            inputs = feature_extractor(
                audio_array,
                sampling_rate=16000,
                return_tensors="pt"
            )

            # Define a decoder input for Whisper (needed for the forward pass structure)
            # This is not really used, but whisper requires it to be passed, regardless if you do or do not use the decoder.
            decoder_input_ids = torch.tensor([[1] * 100]).to(device) # Add 100 tokens for the decoder input

            # Move inputs to the correct device
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # Forward pass to get hidden states
            with torch.no_grad():
                outputs = model(**inputs, decoder_input_ids=decoder_input_ids, output_hidden_states=True)

            # Get the last hidden state from the encoder and average across the sequence dimension
            embeddings = outputs.encoder_hidden_states[-1].mean(dim=1).squeeze().cpu().numpy()


            # get patient id (pd_01 etc)
            patient_id = audio_path.split("/")[2]

            # Get time before diagnosis
            speaker_dir = Path(audio_path).parent
            audio_dir = Path(speaker_dir).parent
            time_before_diagnosis = get_time_before_diagnosis(audio_dir)

            group = Path(audio_path).parent.parent.name  # Parent directory of speaker dir
            label = 1 if group == "PD" else 0  # Directly use group name

            features.append(embeddings)
            successful_paths.append(audio_path)
            labels.append(label)  # Use corrected label
            ids.append(patient_id)
            times.append(time_before_diagnosis)
            lengths.append(librosa.get_duration(y=audio_array, sr=sr))

        except Exception as e:
            print(f"Failed to process {audio_path}: {e}")

    if not features:
        return np.array([]), [] # Return empty arrays if no features were extracted

    return np.array(features), list(labels), list(ids), list(times), list(lengths), successful_paths


# Extract traditional features

In [6]:
def extract_traditional_features(audio_paths):
    features = []
    labels = []
    ids = []
    times = []
    lengths = []
    successful_paths = []

    for audio_path in tqdm(audio_paths, desc="Extracting traditional features"):
        try:
            # Load audio ONCE with librosa
            audio_array, sr = librosa.load(audio_path, sr=16000)

            patient_id = audio_path.split("/")[2]
            speaker_dir = Path(audio_path).parent
            audio_dir = Path(speaker_dir).parent
            time_before_diagnosis = get_time_before_diagnosis(audio_dir)
            duration = librosa.get_duration(y=audio_array, sr=sr)
            group = Path(audio_path).parent.parent.name
            label = 1 if group == "PD" else 0

            # Create parselmouth Sound object from librosa's data
            try:
                # Ensure 'audio_array' is float64 for parselmouth, librosa might return float32
                sound = parselmouth.Sound(audio_array.astype(np.float64), sampling_frequency=sr)
            except Exception as e:
                print(f"Failed to create parselmouth.Sound from array for {audio_path}: {e}")
                # Skip this file if parselmouth object creation fails
                continue

            feature_vector = []

            # Jitter features
            try:
                pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 600)
                jitter_local = call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3)
                jitter_local_absolute = call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3)
                jitter_rap = call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3)
                jitter_ppq5 = call(pointProcess, "Get jitter (ppq5)", 0, 0, 0.0001, 0.02, 1.3)
                feature_vector.extend([jitter_local, jitter_local_absolute, jitter_rap, jitter_ppq5])
            except Exception as e:
                feature_vector.extend([0, 0, 0, 0])

            # Shimmer features
            try:
                shimmer_local = call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
                shimmer_local_db = call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
                feature_vector.extend([shimmer_local, shimmer_local_db])
            except Exception as e:
                feature_vector.extend([0, 0])

            # Harmonics-to-noise ratio
            try:
                harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0)
                hnr = call(harmonicity, "Get mean", 0, 0)
                feature_vector.append(hnr)
            except Exception as e:
                feature_vector.append(0)

            # Percentage of vocalic intervals (using RMS energy)
            rms_frames = librosa.feature.rms(y=audio_array).mean()
            feature_vector.append(rms_frames)

            # MFCCs
            try:
                mfccs = librosa.feature.mfcc(y=audio_array, sr=sr, n_mfcc=13)
                mfcc_means = np.mean(mfccs, axis=1)
                feature_vector.extend(mfcc_means)
            except Exception as e:
                feature_vector.extend([0] * 13)

            # Spectral features
            try:
                spectral_centroid = librosa.feature.spectral_centroid(y=audio_array, sr=sr).mean()
                spectral_bandwidth = librosa.feature.spectral_bandwidth(y=audio_array, sr=sr).mean()
                feature_vector.extend([spectral_centroid, spectral_bandwidth])
            except Exception as e:
                feature_vector.extend([0, 0])

            # F0 Statistics (Prosodic Features)
            f0_values = []
            try:
                pitch = sound.to_pitch()
                f0_values = pitch.selected_array['frequency']
                f0_values = f0_values[f0_values != 0] # Exclude unvoiced frames (0 Hz)
                if len(f0_values) > 0:
                    f0_mean = np.mean(f0_values)
                    f0_std = np.std(f0_values)
                    f0_range = np.max(f0_values) - np.min(f0_values)
                else:
                    f0_mean, f0_std, f0_range = 0, 0, 0 # Handle case with no voiced frames

                feature_vector.extend([f0_mean, f0_std, f0_range])
            except Exception as e:
                feature_vector.extend([0, 0, 0])

            # Nonlinear Dynamics (RPDE, D2, DFA)
            rpde_val, d2_val, dfa_val = 0, 0, 0 # Default to 0 if not computed

            if len(f0_values) > 10: # Need enough points for nolds functions
                try:
                    # These can be computationally expensive, consider skipping for large files
                    if len(f0_values) < 5000:  # Limit to prevent memory issues
                        rpde_val = nolds.rpde(f0_values)
                        d2_val = nolds.d2(f0_values)
                        dfa_val = nolds.dfa(f0_values)
                except Exception as e:
                    pass # Keep default 0 values

            feature_vector.extend([rpde_val, d2_val, dfa_val])

            # Check if feature vector has consistent length
            expected_length = 29 # Update if adding/removing features
            if len(feature_vector) == expected_length:
                features.append(feature_vector)
                labels.append(label)
                ids.append(patient_id)
                times.append(time_before_diagnosis)
                lengths.append(duration)  # Using duration instead of audio_length
                successful_paths.append(audio_path)
            else:
                print(f"Skipping {audio_path} due to inconsistent feature vector length ({len(feature_vector)} != {expected_length})")

            # Force garbage collection to free memory
            gc.collect()

        except Exception as e:
            print(f"An error occurred processing file: {audio_path}. Error: {e}")

    features_array = np.array(features, dtype=np.float32)
    return features_array, labels, ids, times, lengths, successful_paths  # Return all data, not just features

# Loading CSV helper functions

In [7]:
# New function to load features from CSV
def load_features_from_csv(filename):
    if not os.path.exists(filename):
        print(f"File not found: {filename}")
        return None, None, None, None, None, None

    print(f"Loading features from: {filename}")
    df = pd.read_csv(filename)
    required_columns = {'label', 'id', 'time_before_diagnosis', 'length'}
    if not required_columns.issubset(df.columns):
        print(f"Error: One or more required columns {required_columns} missing in {filename}")
        return None, None, None, None, None, None

    labels = df['label'].values.astype(int)
    speaker_ids = df['id'].values
    times = df['time_before_diagnosis'].values
    lengths = df['length'].values

    features = df.drop(columns=['label', 'id', 'time_before_diagnosis', 'length']).values.astype(np.float32)
    features = np.nan_to_num(features)

    return features, labels, speaker_ids, times, lengths, df



# Save features function
def save_features(features, labels, ids, time_before_diagnosis, audio_length, filename):
    """Save features and labels to a CSV file."""
    if features.size == 0 or not labels:
        print(f"Warning: No features or labels to save for {filename}")
        return
    df = pd.DataFrame(features)
    df['label'] = labels
    df['id'] = ids
    df['time_before_diagnosis'] = time_before_diagnosis
    df['length'] = audio_length
    df.to_csv(filename, index=False)
    print(f"Saved features to {filename}")


# Neural Networks

In [8]:
class DeepNN(torch.nn.Module):
    def __init__(self, input_dim):
        super(DeepNN, self).__init__()
        self.model = torch.nn.Sequential(
            torch.nn.Linear(input_dim, 256),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.3),
            torch.nn.Linear(256, 128),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.3),
            torch.nn.Linear(128, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1),
            torch.nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)


class ImprovedDeepNN(nn.Module):
    def __init__(self, input_dim, num_classes=1, dropout=0.3):
        super(ImprovedDeepNN, self).__init__()

        self.fc1 = nn.Linear(input_dim, 512)
        self.bn1 = nn.BatchNorm1d(512)

        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)

        self.fc3 = nn.Linear(256, 128)
        self.bn3 = nn.BatchNorm1d(128)

        self.fc4 = nn.Linear(128, 64)
        self.bn4 = nn.BatchNorm1d(64)

        self.output = nn.Linear(64, num_classes)  # 1 for binary, >1 for multi-class (depends on number of bins)
        self.dropout = nn.Dropout(dropout)

        self.num_classes = num_classes

    def forward(self, x):
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)

        x = F.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)

        x = F.relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)

        x = F.relu(self.bn4(self.fc4(x)))
        x = self.dropout(x)

        if self.num_classes == 1:
            return torch.sigmoid(self.output(x))  # Binary classification
        else:
            return self.output(x)  # Use CrossEntropyLoss with raw logits

# Model Training/Testing/Comparison Helper Functions

In [9]:
def calculate_metrics(y_true, y_pred_prob):
    y_pred = (y_pred_prob > 0.5).astype(int)
    return {
        "accuracy": accuracy_score(y_true, y_pred),
        "roc_auc": roc_auc_score(y_true, y_pred_prob) if len(np.unique(y_true)) > 1 else np.nan,
        "sensitivity": recall_score(y_true, y_pred),
        "specificity": recall_score(1 - y_true, 1 - y_pred),
        "f1_score": f1_score(y_true, y_pred),
        "confusion_matrix": confusion_matrix(y_true, y_pred)
    }

def average_metrics(results_list):
    return {
        key: np.nanmean([res[key] for res in results_list])
        for key in results_list[0].keys()
    }

def empty_metrics():
    return {
        "accuracy": np.nan,
        "roc_auc": np.nan,
        "sensitivity": np.nan,
        "specificity": np.nan,
        "f1_score": np.nan,
        "confusion_matrix": np.array([[0, 0], [0, 0]])
    }

# Training and testing DNN

In [10]:
def train_evaluate_model(X_train, y_train, X_test, y_test, feature_type, model_to_use):
    """Train and evaluate model for specific feature type"""
    if X_train.size == 0 or X_test.size == 0:
        print(f"Warning: Empty feature sets for {feature_type}. Skipping evaluation.")
        return empty_metrics()

    # For neural network models
    if not model_to_use.__name__.startswith('RandomForestClassifier'):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Create model with appropriate input dimensions
        input_dim = X_train.shape[1]
        model = model_to_use(input_dim).to(device)

        # Rest of training logic remains the same
        criterion = torch.nn.BCELoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

        # Convert data to tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32).reshape(-1, 1).to(device)

        # Training loop
        dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True, drop_last=True)

        for epoch in range(100):
            model.train()
            for batch_X, batch_y in dataloader:
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Evaluation
        model.eval()
        with torch.no_grad():
            y_pred_prob = model(X_test_tensor).cpu().numpy()

    # For Random Forest
    elif model_to_use.__name__ == 'RandomForestClassifier':
        # Create and train the random forest model
        model = model_to_use(
            n_estimators=100,
            max_depth=None,
            min_samples_split=2,
            min_samples_leaf=1,
            max_features='sqrt',
            bootstrap=True,
            random_state=37,
            n_jobs=-1,
            class_weight='balanced'
        )

        model.fit(X_train, y_train)

        # Get probability predictions
        y_pred_prob = model.predict_proba(X_test)[:, 1].reshape(-1, 1)

    # For other sklearn models
    else:
        model = model_to_use()
        model.fit(X_train, y_train)
        y_pred_prob = model.predict_proba(X_test)[:, 1].reshape(-1, 1)

    return calculate_metrics(y_test, y_pred_prob)

# Train model + Feature comparison

In [14]:
def crossval_compare_features(dataset_path, k=5, save_figures=True, min_audio_length=0, deep_features="Whisper Small" ,model=None):
    # Determine which model to use
    if model == "random_forest":
        model_to_use = RandomForestClassifier
        model_name = "RandomForest"
    elif isinstance(model, type) or callable(model):
        # If model is a class or function
        model_to_use = model
        model_name = model.__name__
    else:
        # If model is an instance
        model_to_use = model
        model_name = model.__class__.__name__

    save_folder = f'visualization_results_{model_name}'
    # Create directory for saving figures if needed
    os.makedirs(save_folder, exist_ok=True)

    # Load data with proper alignment
    all_audio_paths, all_labels, all_speaker_ids = load_parkceleb_dataset_with_speakers(dataset_path)

    # Load features
    if deep_features == "Whisper Small":
        whisper_data = load_features_from_csv("whisper_small_all_features.csv")
        traditional_data = load_features_from_csv("traditional_all_features.csv")
    elif deep_features == "Whisper Medium":
        whisper_data = load_features_from_csv("whisper_medium_all_features.csv")
        traditional_data = load_features_from_csv("traditional_all_features.csv")


    if whisper_data[0] is None:
        print("Whisper data is None. Generating features.")
        features, labels, ids, times, lengths, _ = extract_whisper_features(all_audio_paths)
        # Change CSV names according to whisper model used
        save_features(features, labels, ids, times, lengths, "whisper_medium_all_features.csv")
        whisper_data = load_features_from_csv("whisper_medium_all_features.csv")


    if traditional_data[0] is None:
        print("Traditional data is None. Generating features.")
        features, labels, ids, times, lengths, _ = extract_traditional_features(dataset_path)
        save_features(features, labels, ids, times, lengths, "traditional_all_features.csv")
        traditional_data = load_features_from_csv("traditional_all_features.csv")

    # Dictionary to store all results
    results = {}
    all_fold_metrics = {}  # Store metrics for each fold for later visualization

    for feature_type, (features, labels, speakers, times, lengths, _) in [
        ("Whisper", whisper_data),
        ("Traditional", traditional_data)
    ]:
        if features is None:
            continue

        length_mask = lengths >= min_audio_length
        features = features[length_mask]
        labels = labels[length_mask]
        speakers = speakers[length_mask]
        times = times[length_mask]
        lengths = lengths[length_mask]

        print(f"\n=== Processing {feature_type} Features with {model_name} ===")
        print(f"Feature dimension: {features.shape[1]}")
        print(f"Class distribution: {np.bincount(labels)}")

        # Speaker-level stratified K-Fold
        unique_speakers = np.unique(speakers)
        speaker_labels = np.array([1 if 'pd' in s else 0 for s in unique_speakers])

        skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=37)
        fold_results = []
        fold_metrics_dict = {
            'accuracy': [], 'roc_auc': [], 'sensitivity': [],
            'specificity': [], 'f1_score': [], 'confusion_matrices': [],
            'y_test': [], 'y_pred_prob': [], 'feature_importance': []  # Added feature importance
        }

        for fold, (train_idx, test_idx) in enumerate(skf.split(unique_speakers, speaker_labels)):
            print(f"\n=== Fold {fold+1} ===")
            train_speakers = set(unique_speakers[train_idx])
            test_speakers = set(unique_speakers[test_idx])

            # Select data for current fold
            train_mask = np.isin(speakers, list(train_speakers))
            test_mask = np.isin(speakers, list(test_speakers))

            X_train, X_test = features[train_mask], features[test_mask]
            y_train, y_test = labels[train_mask], labels[test_mask]

            # Check for valid class distribution
            if len(np.unique(y_test)) < 2:
                print(f"Skipping fold {fold+1} - single class in test set")
                continue

            # Standardize features
            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)

            # Train and Eval
            metrics = train_evaluate_model(X_train, y_train, X_test, y_test, feature_type, model)
            fold_results.append(metrics)

            # Extract y_pred_prob for ROC curve generation (we'll need to regenerate it)
            y_pred = (metrics['confusion_matrix'][:, 1].sum(), metrics['confusion_matrix'][:, 0].sum())

            # Store detailed metrics for visualization
            for key in ['accuracy', 'roc_auc', 'sensitivity', 'specificity', 'f1_score']:
                fold_metrics_dict[key].append(metrics[key])
            fold_metrics_dict['confusion_matrices'].append(metrics['confusion_matrix'])
            fold_metrics_dict['y_test'].append(y_test)

            # Visualize confusion matrix for this fold
            plt.figure(figsize=(6, 5))
            cm = metrics['confusion_matrix']
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                        xticklabels=['Control', 'PD'], yticklabels=['Control', 'PD'])
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.title(f'{feature_type} - Fold {fold+1} Confusion Matrix')
            if save_figures:
                plt.savefig(f'{save_folder}/{feature_type}_fold{fold+1}_confusion.png', bbox_inches='tight')
                plt.close()
            else:
                plt.show()

        # Store results
        if fold_results:
            results[feature_type] = average_metrics(fold_results)
            all_fold_metrics[feature_type] = fold_metrics_dict

            # Visualize metrics across folds
            plt.figure(figsize=(10, 6))
            metrics_across_folds = pd.DataFrame({
                'Accuracy': fold_metrics_dict['accuracy'],
                'AUC': fold_metrics_dict['roc_auc'],
                'Sensitivity': fold_metrics_dict['sensitivity'],
                'Specificity': fold_metrics_dict['specificity'],
                'F1 Score': fold_metrics_dict['f1_score']
            })

            metrics_across_folds.index = [f'Fold {i+1}' for i in range(len(metrics_across_folds))]
            ax = metrics_across_folds.plot(kind='bar', figsize=(10, 6))
            plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.3)  # baseline
            plt.title(f'{feature_type} - Performance Metrics Across Folds')
            plt.ylabel('Score')
            plt.ylim(0, 1.05)
            plt.grid(axis='y', linestyle='--', alpha=0.7)
            plt.legend(loc='lower center', bbox_to_anchor=(0.5, -0.25), ncol=5)

            # Add value labels on bars
            for container in ax.containers:
                ax.bar_label(container, fmt='%.2f', fontsize=8)

            if save_figures:
                plt.savefig(f'{save_folder}/{feature_type}_metrics_by_fold.png', bbox_inches='tight')
                plt.close()
            else:
                plt.show()

            # Visualize average confusion matrix
            plt.figure(figsize=(6, 5))
            avg_cm = np.mean([cm for cm in fold_metrics_dict['confusion_matrices']], axis=0)
            sns.heatmap(avg_cm, annot=True, fmt='.1f', cmap='Blues',
                        xticklabels=['Control', 'PD'], yticklabels=['Control', 'PD'])
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.title(f'{feature_type} - Average Confusion Matrix')
            if save_figures:
                plt.savefig(f'{save_folder}/{feature_type}_avg_confusion.png', bbox_inches='tight')
                plt.close()
            else:
                plt.show()


        else:
            results[feature_type] = empty_metrics()

    # Print results
    print("\n=== Final Results ===")
    for feature_type, metrics in results.items():
        print(f"\n{feature_type} Features:")
        for metric, value in metrics.items():
            if metric != 'confusion_matrix':
                print(f"{metric:15}: {value:.4f}")

    # Compare feature types if both are available
    if len(results) > 1:
        # Create comparison bar chart
        metric_names = ['Accuracy', 'AUC', 'Sensitivity', 'Specificity', 'F1 Score']
        feature_types = list(results.keys())

        plt.figure(figsize=(10, 6))
        width = 0.35
        x = np.arange(len(metric_names))

        for i, feature_type in enumerate(feature_types):
            metrics_values = [
                results[feature_type]['accuracy'],
                results[feature_type]['roc_auc'],
                results[feature_type]['sensitivity'],
                results[feature_type]['specificity'],
                results[feature_type]['f1_score']
            ]
            plt.bar(x + (i - 0.5*(len(feature_types)-1)) * width, metrics_values,
                   width, label=feature_type)

            # Add value labels on bars
            for j, value in enumerate(metrics_values):
                plt.text(x[j] + (i - 0.5*(len(feature_types)-1)) * width, value + 0.01,
                         f'{value:.3f}', ha='center', fontsize=8)

        plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.3)  # baseline
        plt.xlabel('Metric')
        plt.ylabel('Score')
        plt.title('Performance Comparison Between Feature Types')
        plt.xticks(x, metric_names)
        plt.ylim(0, 1.05)
        plt.legend()
        plt.grid(axis='y', linestyle='--', alpha=0.5)

        if save_figures:
            plt.savefig(f'{save_folder}/feature_comparison.png', bbox_inches='tight')
            plt.close()
        else:
            plt.show()

    return results

In [16]:
DeepNN_small_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Small" ,model=DeepNN)

Loading features from: whisper_small_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with DeepNN ===
Feature dimension: 768
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with DeepNN ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.5445
roc_auc        : 0.5582
sensitivity    : 0.5504
specificity    : 0.5279
f1_score       : 0.5948

Traditional Features:
accuracy       : 0.5547
roc_auc        : 0.5735
sensitivity    : 0.5370
specificity    : 0.6020
f1_score       : 0.5954


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

In [17]:
ImprovedDeepNN_small_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Small", model=ImprovedDeepNN)

Loading features from: whisper_small_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with ImprovedDeepNN ===
Feature dimension: 768
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with ImprovedDeepNN ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.5282
roc_auc        : 0.5490
sensitivity    : 0.5180
specificity    : 0.5405
f1_score       : 0.5725

Traditional Features:
accuracy       : 0.5635
roc_auc        : 0.5913
sensitivity    : 0.5703
specificity    : 0.5752
f1_score       : 0.6146


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

In [18]:
RF_small_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Small", model=RandomForestClassifier)

Loading features from: whisper_small_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with RandomForestClassifier ===
Feature dimension: 768
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with RandomForestClassifier ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.6056
roc_auc        : 0.5710
sensitivity    : 0.7975
specificity    : 0.3222
f1_score       : 0.7094

Traditional Features:
accuracy       : 0.5909
roc_auc        : 0.6060
sensitivity    : 0.7080
specificity    : 0.4384
f1_score       : 0.6777


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

In [19]:
DeepNN_med_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Medium" ,model=DeepNN)

Loading features from: whisper_medium_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with DeepNN ===
Feature dimension: 1024
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with DeepNN ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.5509
roc_auc        : 0.5661
sensitivity    : 0.5745
specificity    : 0.5179
f1_score       : 0.6069

Traditional Features:
accuracy       : 0.5413
roc_auc        : 0.5763
sensitivity    : 0.5271
specificity    : 0.5809
f1_score       : 0.5805


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

In [20]:
ImprovedDeepNN_med_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Medium", model=ImprovedDeepNN)

Loading features from: whisper_medium_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with ImprovedDeepNN ===
Feature dimension: 1024
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with ImprovedDeepNN ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.5668
roc_auc        : 0.5785
sensitivity    : 0.5785
specificity    : 0.5367
f1_score       : 0.6208

Traditional Features:
accuracy       : 0.5290
roc_auc        : 0.5663
sensitivity    : 0.5299
specificity    : 0.5500
f1_score       : 0.5754


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

In [21]:
RF_med_results = crossval_compare_features("parkceleb_filtered", min_audio_length=0.1, deep_features="Whisper Medium", model=RandomForestClassifier)

Loading features from: whisper_medium_all_features.csv
Loading features from: traditional_all_features.csv

=== Processing Whisper Features with RandomForestClassifier ===
Feature dimension: 1024
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Processing Traditional Features with RandomForestClassifier ===
Feature dimension: 29
Class distribution: [5239 8614]

=== Fold 1 ===

=== Fold 2 ===

=== Fold 3 ===

=== Fold 4 ===

=== Fold 5 ===

=== Final Results ===

Whisper Features:
accuracy       : 0.6094
roc_auc        : 0.6003
sensitivity    : 0.7973
specificity    : 0.3372
f1_score       : 0.7108

Traditional Features:
accuracy       : 0.5909
roc_auc        : 0.6060
sensitivity    : 0.7080
specificity    : 0.4384
f1_score       : 0.6777


<Figure size 1000x600 with 0 Axes>

<Figure size 1000x600 with 0 Axes>

# Sanity checks
I messed up the save to csv part of whisper feature generation, so I had to fix it. That's what the code below is for. The function is also fixed now.

In [None]:
df = pd.read_csv("whisper_small_all_features.csv")
print("Unique speaker IDs:", df['id'].unique())

In [None]:
# import shutil
shutil.copy2("whisper_all_features.csv", "whisper_all_features_BACKUP.csv")
shutil.copy2("traditional_all_features.csv", "traditional_all_features_BACKUP.csv")

In [None]:
df = pd.read_csv("whisper_all_features.csv")
print("PD count:", df[df['label'] == 1].shape[0])
print("CN count:", df[df['label'] == 0].shape[0])


df = pd.read_csv("traditional_all_features.csv")
print("PD count:", df[df['label'] == 1].shape[0])
print("CN count:", df[df['label'] == 0].shape[0])

In [None]:
def fix_labels_in_csv(csv_path):
    df = pd.read_csv(csv_path)

    # Create labels based on 'id' column
    df['label'] = df['id'].str.contains('pd', case=False).astype(int)

    # Save fixed CSV (backup original first!)
    df.to_csv(csv_path, index=False)
    print(f"Updated labels in {csv_path}. Class distribution: {df['label'].value_counts().to_dict()}")

# Apply to both feature CSVs
fix_labels_in_csv("whisper_medium_all_features.csv")
# fix_labels_in_csv("traditional_all_features.csv")

df = pd.read_csv("whisper_medium_all_features.csv")
print("PD count:", df[df['label'] == 1].shape[0])
print("CN count:", df[df['label'] == 0].shape[0])