# Modules

In [7]:
!pip install audb




[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [8]:
!pip install audiofile




[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [9]:
!pip install opensmile




[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
!pip install xgboost




[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [11]:
import os
import time
import numpy as np
import pandas as pd
import seaborn as sns
import audb
import audiofile
from xgboost import XGBClassifier
import opensmile
from sklearn.utils import shuffle
import random
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score, roc_curve, auc, confusion_matrix, log_loss, f1_score, accuracy_score, precision_score, recall_score
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
%matplotlib inline


# Code

In [12]:
# from google.colab import drive
# drive.mount('/content/drive')


In [42]:
smile = opensmile.Smile(
    feature_set=opensmile.FeatureSet.ComParE_2016,
    feature_level=opensmile.FeatureLevel.Functionals,
)

In [43]:
import os

def load_data_from_folds(directory, dir_aug):
    data = {"HS": [], "PD": [], "HS_aug": [], "PD_aug": []}
    
    for fold in range(1, 11):
        fold_path = os.path.join(directory, f"fold{fold}")
        hs_folder = os.path.join(fold_path, "HS")
        pd_folder = os.path.join(fold_path, "PD")
        
        hs_aug_folder = os.path.join(dir_aug, f"fold{fold}", "HS_aug")
        pd_aug_folder = os.path.join(dir_aug, f"fold{fold}", "PD_aug")

        hs_files = [os.path.join(hs_folder, f) for f in os.listdir(hs_folder) if f.endswith(".wav")]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith(".wav")]

        hs_aug_files = [os.path.join(hs_aug_folder, f) for f in os.listdir(hs_aug_folder) if f.endswith(".wav")]
        pd_aug_files = [os.path.join(pd_aug_folder, f) for f in os.listdir(pd_aug_folder) if f.endswith(".wav")]

        data["HS"].append(hs_files)
        data["PD"].append(pd_files)
        data["HS_aug"].append(hs_aug_files)
        data["PD_aug"].append(pd_aug_files)
    
    return data


In [44]:
#Test
directory = r'C:\Users\Hp\Desktop\Lecture Files\DH604\Publication\data\datasets_16k'
dir_aug = r'C:\Users\Hp\Desktop\Lecture Files\DH604\Publication\data\datasets_aug_16k'
data_ = load_data_from_folds(directory, dir_aug)

In [45]:
def process_audio_files(files, label, duration=10):
    results = []
    for file_path in files:
        if file_path.endswith(".wav"):
            try:
                signal, sampling_rate = audiofile.read(
                    file_path,
                    duration=duration,
                    always_2d=True
                )
                features = smile.process_signal(signal, sampling_rate)
                features.insert(0, 'file_name', file_path)
                features['Y'] = label
                results.append(features)
            except Exception as e:
                print(f"Error processing {file_path}: {e}")

    df_results = pd.concat(results, ignore_index=True)
    # print(f"Done processing {len(results)} files.")
    return df_results

In [46]:
def drop_na(X, y):
    valid_indices = X.dropna(axis=0).index
    X = X.loc[valid_indices]
    y = y.loc[valid_indices]
    return X, y

In [47]:
def get_train_test_data(fold_number, data):
    test_hs = data["HS"][fold_number-1]
    test_pd = data["PD"][fold_number-1]

    # Combine the remaining folds for training
    #Original
    train_hs_ = [item for i, item in enumerate(data["HS"]) if i != fold_number-1]
    train_pd_ = [item for i, item in enumerate(data["PD"]) if i != fold_number-1]
    # print("train_hs_:", len(train_hs_))
    # print("train_pd_:", len(train_pd_))

    #Aug
    train_hs_aug = [item for i, item in enumerate(data["HS_aug"]) if i != fold_number-1]
    train_pd_aug = [item for i, item in enumerate(data["PD_aug"]) if i != fold_number-1]
    # print("train_hs_aug:", len(train_hs_aug))
    # print("train_pd_aug:", len(train_pd_aug))
    

    train_hs = train_hs_ + train_hs_aug
    train_pd = train_pd_ + train_pd_aug


    # Flatten lists for training
    train_hs = [item for sublist in train_hs for item in sublist]
    # print("train_hs:", len(train_hs))
    train_pd = [item for sublist in train_pd for item in sublist]
    # print("train_pd:", len(train_pd))

    # Process the files
    X_train_hs = process_audio_files(train_hs, label=0)
    X_train_pd = process_audio_files(train_pd, label=1)
    
    X_test_hs = process_audio_files(test_hs, label=0)
    X_test_pd = process_audio_files(test_pd, label=1)

    # Combine HS and PD for train and test
    X_train = pd.concat([X_train_hs, X_train_pd], ignore_index=True)
    X_test = pd.concat([X_test_hs, X_test_pd], ignore_index=True)


    y_train = X_train["Y"]
    X_train.drop(columns=["Y",'file_name'], inplace=True)

    y_test = X_test["Y"]
    X_test.drop(columns=["Y",'file_name'], inplace=True)

    # Shuffle the train and test sets
    X_train, y_train = shuffle(X_train, y_train, random_state=42)
    X_test, y_test = shuffle(X_test, y_test, random_state=42)


    # print("X_train shape:", X_train.shape)
    # print("y_train shape:", y_train.shape)
    # print("X_test shape:", X_test.shape)
    # print("y_test shape:", y_test.shape)

    # Drop NA
    X_train, y_train = drop_na(X_train, y_train)
    X_test, y_test = drop_na(X_test, y_test)

    # Scaling the data
    scaler = StandardScaler()
    X_train = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns, index=X_train.index)
    X_test = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns, index=X_test.index)

    return X_train, y_train, X_test, y_test

In [76]:
from sklearn.base import clone
from itertools import product

def tune_and_evaluate(X_train, y_train, X_test, y_test, model_type):
    results = []

    if model_type == "random_forest":
        base_model = RandomForestClassifier(random_state=42)
        param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [3, 5, 7, 10, None],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 5, 10],
    'max_features': ['sqrt', 'log2', 10, None]}


    elif model_type == "logistic_regression":
       base_model = LogisticRegression(random_state=42)
       param_grid = {
    'penalty': ['l1', 'l2', 'elasticnet', None],
    'C': [0.001, 0.01, 0.1, 1, 10, 100],
    'solver': ['liblinear', 'saga'],
    'max_iter': [1000, 2000],
    'l1_ratio': [0, 0.5, 1]}

    elif model_type == 'svm':
       base_model = SVC(probability=True, random_state=42)
       param_grid = {
    'C': [0.01, 0.1, 1, 10],
    'gamma': ['scale', 'auto', 0.001, 0.01, 0.1],
    'kernel': ['rbf', 'linear'],
    'shrinking': [True, False]}

    elif model_type == 'xgboost':
        base_model = XGBClassifier(random_state=42)
        param_grid = {
    'n_estimators': [50, 100, 150],
    'max_depth': [2, 3, 5],
    'learning_rate': [0.01, 0.05, 0.1],
    'min_child_weight': [1, 3],
    'subsample': [0.8],
    'colsample_bytree': [0.8],
    'gamma': [0, 0.1],
    'reg_alpha': [0, 0.01, 0.1],
    'reg_lambda': [0.5, 1]}

    else:
        raise ValueError(f"Unsupported model_type: {model_type}")

    param_names = list(param_grid.keys())
    param_combinations = list(product(*param_grid.values()))

    for param_values in param_combinations:
        param_dict = dict(zip(param_names, param_values))
        model = clone(base_model).set_params(**param_dict)

        try:
            if model_type == 'xgboost':
                 for X in [X_train, X_test]:
                     X.columns = [col.replace('[', '(').replace(']', ')').replace('<', '_less_than_') for col in X.columns]
            model.fit(X_train, y_train)
            y_train_pred = model.predict(X_train)
            y_test_pred = model.predict(X_test)

            y_train_proba = model.predict_proba(X_train)[:, 1]
            y_test_proba = model.predict_proba(X_test)[:, 1]
            
            train_loss = log_loss(y_train, y_train_proba)
            test_loss = log_loss(y_test, y_test_proba)
            loss_ratio = train_loss / test_loss if test_loss != 0 else None

            metrics_dict = {
                "accuracy_train": accuracy_score(y_train, y_train_pred),
                "accuracy_test": accuracy_score(y_test, y_test_pred),
                "roc_auc_train": roc_auc_score(y_train, y_train_proba),
                "roc_auc_test": roc_auc_score(y_test, y_test_proba),
                "f1_train": f1_score(y_train, y_train_pred),
                "f1_test": f1_score(y_test, y_test_pred),
                "precision_train": precision_score(y_train, y_train_pred),
                "precision_test": precision_score(y_test, y_test_pred),
                "recall_train": recall_score(y_train, y_train_pred),
                "recall_test": recall_score(y_test, y_test_pred),
                "log_loss_train": train_loss,
                "log_loss_test": test_loss,
                **param_dict  # Include parameters in the row
            }

            results.append(metrics_dict)
        except Exception as e:
            print(f" Skipping params {param_dict} due to error: {e}")
            continue

    return results

In [49]:
# def aggregate_and_save_results(metrics, model_type):
#     all_metrics = []

#     for fold_metrics in metrics:
#         for metric in fold_metrics:
#             all_metrics.append(metric)

#     df = pd.DataFrame(all_metrics)

#     aggregation_columns = ['accuracy_train', 'accuracy_test', 'roc_auc_train', 'roc_auc_test',
#                            'f1_train', 'f1_test', 'precision_train', 'precision_test', 'recall_train', 
#                            'recall_test', 'log_loss_train', 'log_loss_test']
    
#     df_agg = df.groupby([col for col in df.columns if col not in aggregation_columns]).agg(
#         {metric: ['mean', 'std'] for metric in aggregation_columns}
#     ).reset_index()

#     df_agg.columns = ['_'.join(col).strip() for col in df_agg.columns.values]

#     output_file = f"{model_type}_hyperparameter_tuning_results.csv"
#     df_agg.to_csv(output_file, index=False)

#     print(f"Aggregated results saved to {output_file}")


In [72]:
def aggregate_and_save_results(metrics, model_type):
    all_metrics = []

    for fold_number, fold_metrics in enumerate(metrics, start=1):
        for metric in fold_metrics:
            metric['fold_number'] = fold_number
            all_metrics.append(metric)

    df = pd.DataFrame(all_metrics)

    loss_columns = ['log_loss_train', 'log_loss_test']
    other_metrics = ['accuracy_train', 'accuracy_test', 'roc_auc_train', 'roc_auc_test',
                    'f1_train', 'f1_test', 'precision_train', 'precision_test', 
                    'recall_train', 'recall_test']
    
    param_columns = [col for col in df.columns if col not in loss_columns + other_metrics + ['fold_number']]
    
    agg_results = df.groupby(param_columns).agg({
        **{metric: ['mean', 'std'] for metric in other_metrics + loss_columns},
    })
    
    agg_results.columns = ['_'.join(col).strip() for col in agg_results.columns.values]
    agg_results = agg_results.reset_index()
    
    def collect_losses(group):
        train_losses = [round(x, 3) for x in group['log_loss_train'].tolist()]
        test_losses = [round(x, 3) for x in group['log_loss_test'].tolist()]
        
        ratios = []
        for train, test in zip(train_losses, test_losses):
            if test != 0:
                ratios.append(round(train/test, 3))
            else:
                ratios.append(np.nan)
        
        return {
            'train_loss_across_folds': train_losses,
            'test_loss_across_folds': test_losses,
            'train_test_loss_ratio_across_folds': ratios
        }
    
    fold_losses = df.groupby(param_columns).apply(collect_losses).reset_index()
    
    fold_losses = pd.concat([
        fold_losses[param_columns],
        pd.json_normalize(fold_losses[0])
    ], axis=1)
    
    final_results = pd.merge(
        agg_results,
        fold_losses,
        on=param_columns,
        how='left'
    )
    
    final_results['mean_loss_ratio'] = np.where(
        final_results['log_loss_test_mean'] != 0,
        final_results['log_loss_train_mean'] / final_results['log_loss_test_mean'],
        np.nan
    )
    final_results['mean_loss_ratio'] = final_results['mean_loss_ratio'].round(3)
    
    output_file = f"{model_type}_aug_hyperparameter_tuning_results.csv"
    final_results.to_csv(output_file, index=False)
    
    print(f"Results saved to {output_file}")
    return final_results

In [77]:
#MIQ
selected_features = [
    "audspecRasta_lengthL1norm_sma_upleveltime75",
    "audSpec_Rfilt_sma[6]_percentile1.0",
    "audSpec_Rfilt_sma[13]_quartile2",
    "audSpec_Rfilt_sma[21]_quartile2",
    "pcm_fftMag_spectralRollOff25.0_sma_iqr1-2",
    "pcm_fftMag_spectralRollOff90.0_sma_quartile1",
    "pcm_fftMag_spectralCentroid_sma_lpc2",
    "mfcc_sma[2]_leftctime",
    "mfcc_sma[4]_minSegLen",
    "mfcc_sma[8]_meanSegLen",
    "audSpec_Rfilt_sma_de[6]_minSegLen",
    "audSpec_Rfilt_sma_de[8]_minSegLen",
    "audSpec_Rfilt_sma_de[12]_minSegLen",
    "audSpec_Rfilt_sma_de[21]_minSegLen",
    "pcm_fftMag_spectralRollOff90.0_sma_de_minSegLen",
    "mfcc_sma_de[3]_minSegLen",
    "mfcc_sma_de[7]_upleveltime25",
    "voicingFinalUnclipped_sma_range",
    "voicingFinalUnclipped_sma_upleveltime90",
    "jitterDDP_sma_de_quartile2",
    "jitterDDP_sma_de_upleveltime75",
    "jitterDDP_sma_de_lpc0",
    "audSpec_Rfilt_sma[7]_qregerrQ",
    "pcm_fftMag_spectralSlope_sma_qregc3",
    "pcm_fftMag_psySharpness_sma_flatness",
    "mfcc_sma[5]_peakMeanRel",
    "mfcc_sma[6]_peakMeanRel",
    "mfcc_sma[7]_peakMeanRel",
    "audSpec_Rfilt_sma_de[1]_peakRangeRel",
    "pcm_fftMag_spectralEntropy_sma_de_stddevFallingSlope"
]

In [74]:
def main_with_tuning(directory, dir_aug, model_type, feature_list):
    data = load_data_from_folds(directory, dir_aug)
    metrics = []

    for fold_number in range(1,11):
        print(f"\n Evaluating Fold {fold_number} with Hyperparameter Tuning...")
        X_train, y_train, X_test, y_test = get_train_test_data(fold_number, data)

        if feature_list:
            X_train = X_train[selected_features]
            X_test = X_test[selected_features]

        print("X_train shape:", X_train.shape)
        print("y_train shape:", y_train.shape)
        print("X_test shape:", X_test.shape)
        print("y_test shape:", y_test.shape)

        fold_metrics = tune_and_evaluate(X_train, y_train, X_test, y_test, model_type)
        metrics.append(fold_metrics)

        # print(f"Fold {fold_number} Best Metrics:\n{ {k:v for k,v in fold_metrics.items() if k != 'cv_results'} }")

    # Save per-parameter results across all folds
    # save_all_cv_results(metrics, model_type, output_path)
    aggregate_and_save_results(metrics, model_type)


In [60]:
# main_with_tuning(directory, dir_aug, model_type="random_forest" , feature_list=True)

In [61]:
# main_with_tuning(directory, dir_aug, model_type="svm" , feature_list=True)

In [62]:
# main_with_tuning(directory, dir_aug, model_type="logistic_regression" , feature_list=True)

In [78]:
main_with_tuning(directory, dir_aug, model_type="xgboost" , feature_list=True)


 Evaluating Fold 1 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 2 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 3 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 4 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 5 with Hyperparameter Tuning...
X_train shape: (108, 30)
y_train shape: (108,)
X_test shape: (6, 30)
y_test shape: (6,)

 Evaluating Fold 6 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 7 with Hyperparameter Tuning...
X_train shape: (105, 30)
y_train shape: (105,)
X_test shape: (8, 30)
y_test shape: (8,)

 Evaluating Fold 8 with Hyperpara