In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import joblib

from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, Dropout, Flatten,
                                     Dense, LSTM, MultiHeadAttention, Concatenate, Reshape)
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.saving import register_keras_serializable

from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.neighbors import KNeighborsClassifier, NeighborhoodComponentsAnalysis
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline

# =============================================================================
# --- Configuration ---
# =============================================================================
# DATASET = "MPOWER_DATASET"
MODE = "ALL_VALIDS"
FEATURE_MODE = "BASIC"
MODEL_NAME = "feature_extractor_cnn_att_lstm" # MODIFIED: Model name updated
# ------------------------------------

# Path Setup
dataset = "Italian"
FEATURES_FILE_PATH = os.path.join(os.getcwd(), dataset, "data", f"features_{MODE}_{FEATURE_MODE}.npz")
RESULTS_PATH = os.path.join(os.getcwd(), dataset, f"results_{MODE}_{FEATURE_MODE}")
MODEL_PATH = os.path.join(RESULTS_PATH, MODEL_NAME)
os.makedirs(MODEL_PATH, exist_ok=True)

HISTORY_SAVE_PATH = os.path.join(MODEL_PATH, "history.csv")
BEST_EXTRACTOR_PATH = os.path.join(MODEL_PATH, "best_feature_extractor.keras") # MODIFIED: Path name updated
KNN_MODEL_PATH = os.path.join(MODEL_PATH, "knn_classifier.joblib") # NEW: Path for k-NN model

# Hyperparameters
EPOCHS = 30
BATCH_SIZE = 32
LEARNING_RATE = 0.001
DROPOUT_RATE = 0.5
L2_STRENGTH = 0.01

# =============================================================================
# --- Your Custom CNN Models ---
# =============================================================================
@register_keras_serializable()
class ParkinsonDetectorModel(Model):
    """Your end-to-end CNN model for original features."""
    def __init__(self, input_shape, **kwargs):
        super(ParkinsonDetectorModel, self).__init__(**kwargs)
        self.input_shape_config = input_shape
        self.reshape_in = Reshape((input_shape[0], input_shape[1], 1))
        self.conv1a = Conv2D(64, 5, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.conv1b = Conv2D(64, 5, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.pool1 = MaxPooling2D(5)
        self.drop1 = Dropout(DROPOUT_RATE)
        self.conv2a = Conv2D(64, 5, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.conv2b = Conv2D(64, 5, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.pool2 = MaxPooling2D(5)
        self.drop2 = Dropout(DROPOUT_RATE)
        self.flatten_cnn = Flatten()
        self.attention = MultiHeadAttention(num_heads=2, key_dim=64)
        self.flatten_att = Flatten()
        self.lstm1 = LSTM(128, return_sequences=True)
        self.lstm2 = LSTM(128, return_sequences=False)
        self.drop_lstm = Dropout(DROPOUT_RATE)
        self.concat = Concatenate()
        self.dense_bottleneck = Dense(128, activation='relu', name='bottleneck_features')
        self.dense_output = Dense(1, activation='sigmoid')

    def call(self, inputs, extract_features=False):
        x = self.reshape_in(inputs)
        x = self.conv1a(x)
        x = self.conv1b(x)
        x = self.pool1(x)
        x = self.drop1(x)
        x = self.conv2a(x)
        x = self.conv2b(x)
        x = self.pool2(x)
        x = self.drop2(x)
        cnn_flat = self.flatten_cnn(x)
        shape = tf.shape(x)
        sequence = tf.reshape(x, [-1, shape[1] * shape[2], shape[3]])
        att_out = self.attention(query=sequence, key=sequence, value=sequence)
        att_flat = self.flatten_att(att_out)
        lstm_seq = self.lstm1(sequence)
        lstm_out = self.lstm2(lstm_seq)
        lstm_out = self.drop_lstm(lstm_out)
        concatenated = self.concat([cnn_flat, att_flat, lstm_out])
        bottleneck = self.dense_bottleneck(concatenated)
        if extract_features:
            return bottleneck
        return self.dense_output(bottleneck)

    def get_config(self):
        config = super(ParkinsonDetectorModel, self).get_config()
        config.update({"input_shape": self.input_shape_config})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

@register_keras_serializable()
class ParkinsonDetectorModelNCA(Model):
    """A modified version of your model for the small 8x8 NCA input."""
    def __init__(self, input_shape, **kwargs):
        super(ParkinsonDetectorModelNCA, self).__init__(**kwargs)
        self.input_shape_config = input_shape
        self.reshape_in = Reshape((input_shape[0], input_shape[1], 1))
        self.conv1a = Conv2D(64, 3, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.conv1b = Conv2D(64, 3, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.pool1 = MaxPooling2D(2)
        self.drop1 = Dropout(DROPOUT_RATE)
        self.conv2a = Conv2D(64, 3, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.conv2b = Conv2D(64, 3, activation='relu', kernel_regularizer=l2(L2_STRENGTH), padding='same')
        self.pool2 = MaxPooling2D(2)
        self.drop2 = Dropout(DROPOUT_RATE)
        self.flatten_cnn = Flatten()
        self.attention = MultiHeadAttention(num_heads=2, key_dim=64)
        self.flatten_att = Flatten()
        self.lstm1 = LSTM(128, return_sequences=True)
        self.lstm2 = LSTM(128, return_sequences=False)
        self.drop_lstm = Dropout(DROPOUT_RATE)
        self.concat = Concatenate()
        self.dense_bottleneck = Dense(128, activation='relu', name='bottleneck_features')
        self.dense_output = Dense(1, activation='sigmoid')

    def call(self, inputs, extract_features=False):
        x = self.reshape_in(inputs)
        x = self.conv1a(x)
        x = self.conv1b(x)
        x = self.pool1(x)
        x = self.drop1(x)
        x = self.conv2a(x)
        x = self.conv2b(x)
        x = self.pool2(x)
        x = self.drop2(x)
        cnn_flat = self.flatten_cnn(x)
        shape = tf.shape(x)
        sequence = tf.reshape(x, [-1, shape[1] * shape[2], shape[3]])
        att_out = self.attention(query=sequence, key=sequence, value=sequence)
        att_flat = self.flatten_att(att_out)
        lstm_seq = self.lstm1(sequence)
        lstm_out = self.lstm2(lstm_seq)
        lstm_out = self.drop_lstm(lstm_out)
        concatenated = self.concat([cnn_flat, att_flat, lstm_out])
        bottleneck = self.dense_bottleneck(concatenated)
        if extract_features:
            return bottleneck
        return self.dense_output(bottleneck)

    def get_config(self):
        config = super(ParkinsonDetectorModelNCA, self).get_config()
        config.update({"input_shape": self.input_shape_config})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

# =============================================================================
# --- Helper Functions ---
# =============================================================================
def load_data(path: str) -> tuple:
    """Loads features and labels from the .npz file."""
    print(f"--- Loading data from {path} ---")
    with np.load(path) as data:
        mel_spectrograms = data['mel_spectrogram']
        mfccs = data['mfcc']
        labels = data['labels']
        X = np.concatenate((mel_spectrograms, mfccs), axis=1)
    print("Data loaded successfully.")
    return X, labels

def evaluate_cnn_and_report(model, X_test, y_test, model_name):
    """Evaluates a compiled Keras model and prints a classification report."""
    print(f"\n--- Evaluating Model: {model_name} ---")
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test Loss: {loss:.4f}")

    y_pred_proba = model.predict(X_test)
    y_pred = (y_pred_proba > 0.5).astype("int32")

    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, digits=4))
    return accuracy # MODIFIED: Return the accuracy

def tune_and_evaluate_knn_pipeline(X_train, y_train, X_test, y_test, results_path, model_name):
    """Finds the best k-NN pipeline using SMOTE and NCA."""
    print(f"\n--- Tuning and Evaluating k-NN Pipeline for: {model_name} ---")
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('smote', SMOTE(random_state=42)),
        ('nca', NeighborhoodComponentsAnalysis(random_state=42, max_iter=200)),
        ('classifier', KNeighborsClassifier())
    ])
    param_dist = {
        'nca__n_components': [10, 20, 30, 40, 50],
        'classifier__n_neighbors': [5, 7, 9, 11, 13],
        'classifier__weights': ['distance'],
        'classifier__metric': ['manhattan']
    }
    search = RandomizedSearchCV(pipeline, param_dist, n_iter=15, cv=5, scoring='accuracy', n_jobs=1, random_state=42, verbose=1)
    search.fit(X_train, y_train)

    print(f"\n--- Results for {model_name} ---")
    print(f"Best cross-validation accuracy: {search.best_score_:.4f}")
    y_pred = search.predict(X_test)
    test_accuracy = accuracy_score(y_test, y_pred)
    print(f"Test Set Accuracy: {test_accuracy:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, digits=4))

    pipeline_path = os.path.join(results_path, f"{model_name}_pipeline.joblib")
    joblib.dump(search.best_estimator_, pipeline_path)
    print(f"✅ Best k-NN pipeline saved to: {pipeline_path}")
    return test_accuracy # MODIFIED: Return the accuracy

# =============================================================================
# --- Main Execution ---
# =============================================================================
if __name__ == '__main__':
    # MODIFIED: Dictionary to store final results
    results_summary = {}

    X, y = load_data(FEATURES_FILE_PATH)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    n_samples_train, d1, d2 = X_train.shape
    X_train_2d = X_train.reshape((n_samples_train, d1 * d2))
    X_test_2d = X_test.reshape((X_test.shape[0], d1 * d2))
    print(f"Data split and reshaped. Train shape: {X_train.shape}, Test shape: {X_test.shape}")

    # =========================================================================
    # --- Experiment 1: k-NN (Baseline) ---
    # =========================================================================
    print("\n\n" + "="*80)
    print("RUNNING EXPERIMENT 1: k-NN (Baseline on original features)")
    print("="*80)
    acc1 = tune_and_evaluate_knn_pipeline(X_train_2d, y_train, X_test_2d, y_test, RESULTS_PATH, "baseline_knn")
    results_summary["1: k-NN (Baseline)"] = acc1

    # =========================================================================
    # --- Experiment 2: CNN (End-to-End) ---
    # =========================================================================
    print("\n\n" + "="*80)
    print("RUNNING EXPERIMENT 2: CNN (End-to-End)")
    print("="*80)

    MODEL_CNN_PATH = os.path.join(RESULTS_PATH, "cnn_model.keras")
    model_cnn = ParkinsonDetectorModel(input_shape=(d1, d2))
    model_cnn.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss='binary_crossentropy', metrics=['accuracy'])

    print("\n--- Training CNN model ---")
    model_cnn.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=EPOCHS, batch_size=BATCH_SIZE,
                  callbacks=[ModelCheckpoint(MODEL_CNN_PATH, save_best_only=True, monitor='val_accuracy', mode='max')],
                  verbose=1)

    best_model_cnn = tf.keras.models.load_model(MODEL_CNN_PATH)
    acc2 = evaluate_cnn_and_report(best_model_cnn, X_test, y_test, "CNN_End_to_End")
    results_summary["2: CNN (End-to-End)"] = acc2

In [None]:
    # =========================================================================
    # --- Experiment 3: CNN + k-NN ---
    # =========================================================================
    # --- Experiment 3: CNN + k-NN ---
    print("RUNNING EXPERIMENT 3: CNN Feature Extractor + k-NN")
    # ...
    # Call the model to get features as Tensors
    X_train_features_tensor = best_model_cnn(X_train, extract_features=True)
    X_test_features_tensor = best_model_cnn(X_test, extract_features=True)

    # FIX: Convert Tensors to NumPy arrays for scikit-learn
    X_train_features = X_train_features_tensor.numpy()
    X_test_features = X_test_features_tensor.numpy()

    print(f"Extracted feature shapes: Train={X_train_features.shape}, Test={X_test_features.shape}")

    tune_and_evaluate_knn_pipeline(
        X_train_features, y_train, X_test_features, y_test, RESULTS_PATH, "cnn_plus_knn"
    )

In [None]:
    # =========================================================================
    # --- Experiment 4: NCA + CNN (no k-NN) ---
    # =========================================================================
    print("\n\n" + "="*80)
    print("RUNNING EXPERIMENT 4: NCA pre-processing + CNN (End-to-End)")
    print("="*80)

    print("\n--- Applying NCA to original 2D features ---")
    NCA_COMPONENTS = 64
    nca_pipeline = Pipeline([('scaler', StandardScaler()), ('nca', NeighborhoodComponentsAnalysis(n_components=NCA_COMPONENTS, random_state=42, max_iter=200))])
    X_train_nca = nca_pipeline.fit_transform(X_train_2d, y_train)
    X_test_nca = nca_pipeline.transform(X_test_2d)

    nca_img_dim = int(np.sqrt(NCA_COMPONENTS))
    X_train_nca_3d = X_train_nca.reshape(-1, nca_img_dim, nca_img_dim)
    X_test_nca_3d = X_test_nca.reshape(-1, nca_img_dim, nca_img_dim)

    MODEL_NCA_CNN_PATH = os.path.join(RESULTS_PATH, "nca_cnn_model.keras")
    model_nca_cnn = ParkinsonDetectorModelNCA(input_shape=(nca_img_dim, nca_img_dim))
    model_nca_cnn.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss='binary_crossentropy', metrics=['accuracy'])

    print("\n--- Training NCA+CNN model ---")
    model_nca_cnn.fit(X_train_nca_3d, y_train, validation_data=(X_test_nca_3d, y_test), epochs=EPOCHS, batch_size=BATCH_SIZE,
                      callbacks=[ModelCheckpoint(MODEL_NCA_CNN_PATH, save_best_only=True, monitor='val_accuracy', mode='max')],
                      verbose=1)

    best_model_nca_cnn = tf.keras.models.load_model(MODEL_NCA_CNN_PATH)
    acc4 = evaluate_cnn_and_report(best_model_nca_cnn, X_test_nca_3d, y_test, "NCA_plus_CNN")
    results_summary["4: NCA + CNN"] = acc4


In [None]:

    # =========================================================================
    # --- FINAL RESULTS SUMMARY ---
    # =========================================================================
    print("\n\n" + "="*80)
    print("✅ ALL EXPERIMENTS COMPLETE! FINAL RESULTS SUMMARY:")
    print("="*80)

    if not results_summary:
        print("No results were recorded.")
    else:
        # Find the best model
        best_model_name = max(results_summary, key=results_summary.get)
        best_accuracy = results_summary[best_model_name]

        print(f"{'Model':<40} | {'Final Test Accuracy':<20}")
        print("-" * 65)
        for name, acc in results_summary.items():
            print(f"{name:<40} | {acc:<20.4f}")

        print("-" * 65)
        print(f"\n🏆 Best Performing Model: '{best_model_name}' with an accuracy of {best_accuracy:.4f}")

    print("="*80)