In [32]:
!pip install numpy==1.26.4 librosa==0.10.2.post1 tqdm==4.67.1 pandas==2.2.2 joblib==1.4.2 scikit-learn==1.6.1 tensorflow==2.17.1 python_speech_features==0.6 -q

In [None]:
import os
import numpy as np
import librosa
from tqdm import tqdm
import pandas as pd
import warnings
import json
import joblib

from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.naive_bayes import GaussianNB

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
# from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from scikeras.wrappers import KerasClassifier
from tensorflow.keras.callbacks import EarlyStopping

from python_speech_features import logfbank, fbank

In [None]:
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

In [None]:
# ------------------ Feature Extraction ------------------
def pad_or_truncate(features, target_length):
    """
    Pads or truncates the feature matrix to ensure uniform length.
    
    Parameters:
    - features (np.ndarray): Feature matrix of shape (Time, Features).
    - target_length (int): Desired number of time frames.
    
    Returns:
    - np.ndarray: Padded or truncated feature matrix.
    """
    if features.shape[0] < target_length:
        padding = target_length - features.shape[0]
        padded_features = np.pad(features, ((0, padding), (0, 0)), mode='constant')
        return padded_features
    else:
        return features[:target_length, :]

def extract_filterbank_energies(audio, samplerate=44100, nfilt=40):
    """
    Extracts Filterbank Energies from the audio signal.
    
    Parameters:
    - audio (np.ndarray): Audio time series.
    - samplerate (int): Sampling rate.
    - nfilt (int): Number of filterbanks.
    
    Returns:
    - np.ndarray: Filterbank energy features.
    """
    features, _ = fbank(audio, samplerate, nfilt=nfilt)
    return features

def extract_log_filterbank_energies(audio, samplerate=44100, nfilt=40):
    """
    Extracts Log Filterbank Energies from the audio signal.
    
    Parameters:
    - audio (np.ndarray): Audio time series.
    - samplerate (int): Sampling rate.
    - nfilt (int): Number of filterbanks.
    
    Returns:
    - np.ndarray: Log Filterbank energy features.
    """
    features = logfbank(audio, samplerate, nfilt=nfilt)  # Remove unpacking
    return features

def extract_spectral_subband_centroids(audio, samplerate=44100, nfilt=40):
    """
    Extracts Spectral Subband Centroids from the audio signal.
    
    Parameters:
    - audio (np.ndarray): Audio time series.
    - samplerate (int): Sampling rate.
    - nfilt (int): Number of filterbanks.
    
    Returns:
    - np.ndarray: Spectral Subband Centroids features.
    """
    filter_banks, energies = fbank(audio, samplerate, nfilt=nfilt)
    centroids = np.zeros(filter_banks.shape)
    for i in range(filter_banks.shape[0]):
        if np.sum(filter_banks[i]) != 0:
            centroids[i] = np.sum(filter_banks[i] * np.arange(1, nfilt + 1)) / np.sum(filter_banks[i])
        else:
            centroids[i] = 0
    return centroids

def extract_spncc(audio, samplerate=44100, nfilt=40, ncep=13):
    """
    Extracts Power-Normalized Cepstral Coefficients (SPNCC) from the audio signal.
    
    Parameters:
    - audio (np.ndarray): Audio time series.
    - samplerate (int): Sampling rate.
    - nfilt (int): Number of filterbanks.
    - ncep (int): Number of cepstral coefficients.
    
    Returns:
    - np.ndarray: SPNCC features.
    """
    filter_banks = extract_filterbank_energies(audio, samplerate, nfilt)
    power = np.sum(filter_banks, axis=1)
    power_normalized = filter_banks / (power[:, np.newaxis] + 1e-10)
    spncc = librosa.feature.mfcc(S=np.log(power_normalized + 1e-10), n_mfcc=ncep).T
    return spncc

def extract_msrcc(audio, samplerate=44100, nfilt=40, ncep=13):
    """
    Extracts Magnitude-based Spectral Root Cepstral Coefficients (MSRCC) from the audio signal.
    
    Parameters:
    - audio (np.ndarray): Audio time series.
    - samplerate (int): Sampling rate.
    - nfilt (int): Number of filterbanks.
    - ncep (int): Number of cepstral coefficients.
    
    Returns:
    - np.ndarray: MSRCC features.
    """
    filter_banks = extract_filterbank_energies(audio, samplerate, nfilt)
    root_power_spectrum = np.sqrt(filter_banks + 1e-10)
    msrcc = librosa.feature.mfcc(S=np.log(root_power_spectrum + 1e-10), n_mfcc=ncep).T
    return msrcc

In [None]:
# ------------------ Dataset Loader ------------------
class DysarthriaDataset:
    def __init__(self, data_path, severity_mapping, sr=44100, target_length=128, feature_type='logfbank', n_mfcc=13, nfilt=40):
        """
        Initializes the dataset loader.
        
        Parameters:
        - data_path (str): Path to the dataset directory.
        - severity_mapping (dict): Mapping of severity levels to speaker IDs.
        - sr (int): Sampling rate for audio files.
        - target_length (int): Target length for feature matrices.
        - feature_type (str): Type of feature to extract ('fbank', 'logfbank', 'spncc', 'msrcc', 'subband_centroid').
        - n_mfcc (int): Number of MFCC coefficients to extract (for SPNCC and MSRCC).
        - nfilt (int): Number of filterbanks.
        """
        self.data_path = data_path
        self.severity_mapping = severity_mapping
        self.sr = sr
        self.target_length = target_length
        self.feature_type = feature_type
        self.n_mfcc = n_mfcc
        self.nfilt = nfilt
        self.data = []
        self.labels = []
        
        severity_to_label = {severity: i for i, severity in enumerate(severity_mapping.keys())}
        
        for severity, speakers in severity_mapping.items():
            for speaker in speakers:
                speaker_path = os.path.join(data_path, speaker)
                if not os.path.exists(speaker_path):
                    print(f"Warning: Speaker path {speaker_path} does not exist. Skipping.")
                    continue
                for file in os.listdir(speaker_path):
                    if file.endswith(".wav"):
                        self.data.append(os.path.join(speaker_path, file))
                        self.labels.append(severity_to_label[severity])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        """
        Retrieves the feature vector and label for a given index.
        
        Parameters:
        - idx (int): Index of the sample.
        
        Returns:
        - tuple: (feature_vector (np.ndarray), label (int))
        """
        file_path = self.data[idx]
        label = self.labels[idx]
        audio, _ = librosa.load(file_path, sr=self.sr)
        
        # Extract features based on feature_type
        if self.feature_type == 'fbank':
            features = extract_filterbank_energies(audio, self.sr, self.nfilt)
        elif self.feature_type == 'logfbank':
            features = extract_log_filterbank_energies(audio, self.sr, self.nfilt)
        elif self.feature_type == 'subband_centroid':
            features = extract_spectral_subband_centroids(audio, self.sr, self.nfilt)
        elif self.feature_type == 'spncc':
            features = extract_spncc(audio, self.sr, self.nfilt, self.n_mfcc)
        elif self.feature_type == 'msrcc':
            features = extract_msrcc(audio, self.sr, self.nfilt, self.n_mfcc)
        else:
            raise ValueError(f"Unsupported feature type: {self.feature_type}")
        
        features_fixed = pad_or_truncate(features, self.target_length)
        
        # Aggregate features
        if self.feature_type in ['fbank', 'logfbank', 'subband_centroid', 'spncc', 'msrcc']:
            # Calculate mean and std across time frames
            feature_mean = features_fixed.mean(axis=0)
            feature_std = features_fixed.std(axis=0)
            # Concatenate mean and std
            feature_vector = np.concatenate([feature_mean, feature_std])
        else:
            feature_vector = features_fixed.flatten()
        
        return feature_vector, label
    
    def get_all_features_labels(self):
        """
        Extracts features and labels for the entire dataset.
        
        Returns:
        - tuple: (X (np.ndarray), y (np.ndarray))
        """
        X = []
        y = []
        for idx in tqdm(range(len(self)), desc=f"Extracting {self.feature_type} Features"):
            features, label = self[idx]
            X.append(features)
            y.append(label)
        return np.array(X), np.array(y)

# ------------------ Artificial Neural Network (ANN) with TensorFlow ------------------
def create_ann_model(input_dim, hidden_layers=[64, 32], dropout_rate=0.5, activation='relu', optimizer='adam'):
    """
    Creates a TensorFlow Keras ANN model with mixed precision and early stopping.
    
    Parameters:
    - input_dim (int): Number of input features.
    - hidden_layers (list): List containing the number of neurons in each hidden layer.
    - dropout_rate (float): Dropout rate for regularization.
    - activation (str): Activation function for hidden layers.
    - optimizer (str): Optimizer for training.
    
    Returns:
    - tf.keras.Model: Compiled Keras model.
    """
    model = Sequential()
    model.add(Dense(hidden_layers[0], input_dim=input_dim, activation=activation))
    model.add(Dropout(dropout_rate))
    
    for neurons in hidden_layers[1:]:
        model.add(Dense(neurons, activation=activation))
        model.add(Dropout(dropout_rate))
    
    model.add(Dense(4, activation='softmax', dtype='float32'))  # Ensure output layer is float32
    
    model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model


early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

In [None]:
# ------------------ Main Pipeline ------------------
def main():
    # ------------------ Configuration ------------------
    # Define severity mapping
    severity_mapping = {
        "HIGH": ["M01", "M04", "M12", "F03"],
        "MEDIUM": ["F02", "M07", "M16"],
        "LOW": ["F04", "M05", "M11"],
        "VERY LOW": ["F05", "M08", "M09", "M10", "M14"]
    }

    dataset_path = "/kaggle/input/dysarthria-data/noisereduced-uaspeech"  # Update this path as needed

    # Define the list of MFCCs to evaluate
    mfcc_list = [13, 26, 39, 52]

    # Define target length for feature matrices
    target_length = 128

    # Define random state for reproducibility
    RANDOM_STATE = 42

    # Define the path for the CSV file
    results_csv_path = "classifier_results.csv"

    # Initialize or load existing results
    if os.path.exists(results_csv_path):
        print(f"Loading existing results from {results_csv_path}...")
        results_df = pd.read_csv(results_csv_path)
        processed_classifiers = results_df['Classifier'].tolist()
    else:
        print("Initializing a new results DataFrame.")
        results_df = pd.DataFrame(columns=["MFCCs", "Classifier", "Accuracy", "Precision", "Recall", "F1-Score"])
        processed_classifiers = []

    # Iterate over different numbers of MFCCs
    for n_mfcc in mfcc_list:
        print(f"\n=== Processing with {n_mfcc} MFCCs ===")
        
        # Define list of feature types to evaluate separately
        feature_types = ['msrcc', 'fbank', 'logfbank', 'subband_centroid', 'spncc']
        
        for feature_type in feature_types:
            print(f"\n--- Extracting and Evaluating Feature: {feature_type} ---")
            
            # Initialize dataset with current feature type and MFCC
            dataset = DysarthriaDataset(
                data_path=dataset_path,
                severity_mapping=severity_mapping,
                sr=44100,
                target_length=target_length,
                feature_type=feature_type,
                n_mfcc=n_mfcc,
                nfilt=40
            )

            # Extract features and labels
            X, y = dataset.get_all_features_labels()
            
            print(f"Feature matrix shape: {X.shape}")
            print(f"Labels distribution: {np.bincount(y)}")

            # ------------------ Dataset Splitting ------------------
            # Split data into Train (70%), Validation (15%), Test (15%)
            X_train, X_temp, y_train, y_temp = train_test_split(
                X, y, test_size=0.30, stratify=y, random_state=RANDOM_STATE
            )
            X_val, X_test, y_val, y_test = train_test_split(
                X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=RANDOM_STATE
            )

            print(f"Train set: {X_train.shape[0]} samples")
            print(f"Validation set: {X_val.shape[0]} samples")
            print(f"Test set: {X_test.shape[0]} samples")

            # ------------------ Classifier Definitions and Hyperparameter Grids ------------------
            # Define classifiers and their hyperparameter grids
            classifiers = {
                "Logistic Regression": {
                    "model": LogisticRegression(random_state=RANDOM_STATE, max_iter=1000),
                    "params": {
                        "classifier__C": [0.1, 1, 10, 100],
                        "classifier__penalty": ['l2'],
                        "classifier__solver": ['lbfgs']
                    }
                },
                "Support Vector Machine": {
                    "model": SVC(probability=True, random_state=RANDOM_STATE),
                    "params": {
                        "classifier__C": [0.1, 1, 10],
                        "classifier__gamma": ['scale', 'auto'],
                        "classifier__kernel": ['rbf', 'poly']
                    }
                },
                "k-Nearest Neighbors": {
                    "model": KNeighborsClassifier(),
                    "params": {
                        "classifier__n_neighbors": [3, 5, 7],
                        "classifier__weights": ['uniform', 'distance'],
                        "classifier__metric": ['euclidean', 'manhattan']
                    }
                },
                "Decision Tree": {
                    "model": DecisionTreeClassifier(random_state=RANDOM_STATE),
                    "params": {
                        "classifier__max_depth": [None, 10, 20, 30],
                        "classifier__min_samples_split": [2, 5, 10],
                        "classifier__criterion": ['gini', 'entropy']
                    }
                },
                "Random Forest": {
                    "model": RandomForestClassifier(random_state=RANDOM_STATE),
                    "params": {
                        "classifier__n_estimators": [100, 200],
                        "classifier__max_depth": [None, 10, 20],
                        "classifier__min_samples_split": [2, 5],
                        "classifier__criterion": ['gini', 'entropy']
                    }
                },
                "Gradient Boosting": {
                    "model": GradientBoostingClassifier(random_state=RANDOM_STATE),
                    "params": {
                        "classifier__n_estimators": [100, 200],
                        "classifier__learning_rate": [0.01, 0.1],
                        "classifier__max_depth": [3, 5],
                        "classifier__subsample": [0.8, 1.0]
                    }
                },
                "Naive Bayes": {
                    "model": GaussianNB(),
                    "params": {
                        "classifier__var_smoothing": [1e-9, 1e-8, 1e-7]
                    }
                },
                "Artificial Neural Network (ANN)": {
                     "model": KerasClassifierWithCallbacks(build_fn=create_ann_model, verbose=0),
                     "params": {
                         "classifier__hidden_layers": [[64, 32], [128, 64, 32]],
                         "classifier__dropout_rate": [0.3, 0.5],
                         "classifier__activation": ['relu', 'tanh'],
                         "classifier__optimizer": ['adam', 'rmsprop'],
                         "classifier__batch_size": [32, 64],
                         "classifier__epochs": [50, 100],
                         "classifier__callbacks": [[early_stopping]]
                     }
                 }
            }

            # ------------------ Training, Evaluation, and Result Storage ------------------
            for clf_name, clf_info in classifiers.items():
                # Skip already processed classifiers for current feature and MFCC setting
                if ((results_df['MFCCs'] == n_mfcc) & 
                    (results_df['Classifier'] == clf_name) &
                    (results_df['Feature_Type'] == feature_type)).any():
                    print(f"Skipping {clf_name} with {feature_type} and {n_mfcc} MFCCs as it has already been processed.")
                    continue

                print(f"\n--- Training {clf_name} with {feature_type} and {n_mfcc} MFCCs ---")

                # Create pipeline
                pipeline = Pipeline([
                    ('scaler', StandardScaler()),
                    ('classifier', clf_info['model'])
                ])

                # Define GridSearchCV
                grid = GridSearchCV(
                    estimator=pipeline,
                    param_grid=clf_info['params'],
                    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
                    scoring='accuracy',
                    n_jobs=-1,
                    verbose=0
                )

                # Fit GridSearchCV
                grid.fit(X_train, y_train)

                # Best estimator
                best_estimator = grid.best_estimator_
                best_params = grid.best_params_
                best_score = grid.best_score_

                print(f"Best Parameters for {clf_name}: {best_params}")
                print(f"Best Cross-Validation Accuracy: {best_score * 100:.2f}%")

                # Evaluate on Test Set
                y_pred = best_estimator.predict(X_test)
                test_accuracy = accuracy_score(y_test, y_pred) * 100
                test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(
                    y_test, y_pred, average='weighted'
                )

                print(f"Test Accuracy: {test_accuracy:.2f}%")
                print(f"Test Precision: {test_precision * 100:.2f}%")
                print(f"Test Recall: {test_recall * 100:.2f}%")
                print(f"Test F1-Score: {test_f1 * 100:.2f}%")

                # Append results to DataFrame
                new_row = {
                    "MFCCs": n_mfcc,
                    "Feature_Type": feature_type,
                    "Classifier": clf_name,
                    "Accuracy": f"{test_accuracy:.2f}%",
                    "Precision": f"{test_precision * 100:.2f}%",
                    "Recall": f"{test_recall * 100:.2f}%",
                    "F1-Score": f"{test_f1 * 100:.2f}%"
                }
                results_df = results_df.append(new_row, ignore_index=True)

                # Save results to CSV
                results_df.to_csv(results_csv_path, index=False)
                print(f"Results for {clf_name} with {feature_type} and {n_mfcc} MFCCs saved to {results_csv_path}.")

                # Optionally, save the best model for each classifier and feature setting
                model_save_path = f"models/{clf_name.replace(' ', '_')}_{feature_type}_mfcc_{n_mfcc}.joblib"
                os.makedirs(os.path.dirname(model_save_path), exist_ok=True)
                joblib.dump(best_estimator, model_save_path)
                print(f"Model saved to {model_save_path}.")

In [None]:
if __name__ == "__main__":
    main()