In [1]:
from sklearnex import patch_sklearn

patch_sklearn()

import numpy as np
import scipy.io
import os
import pickle

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.covariance import LedoitWolf
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

from pyriemann.estimation import Covariances
from pyriemann.classification import TSclassifier
from pyriemann.spatialfilters import CSP
from pyriemann.tangentspace import TangentSpace
from pyriemann.utils.mean import mean_logeuclid
#from pyriemann.classification import SVC


from sklearn.svm import SVC

import h5py

import time

import gc

Intel(R) Extension for Scikit-learn* enabled (https://github.com/uxlfoundation/scikit-learn-intelex)


In [2]:
start_time = time.time()

# Functions

In [None]:
import pickle

def load_testing_transformed_data(pickle_file_path, test_sub_id, fold_name):
    """
    Load transformed data (x and y) for a specific test subject and fold.

    Parameters:
        pickle_file_path (str): Path to the saved pickle file.
        test_sub_id (int): Subject index.
        fold_name (str): Fold identifier, e.g., 'Fold1' or 'Fold2'.

    Returns:
        dict: Dictionary with keys 'x' and 'y' for the requested subject and fold.
    """
    with open(pickle_file_path, 'rb') as f:
        transformed_data = pickle.load(f)

    try:
        subject_data = transformed_data[test_sub_id][fold_name]
        return {
            'x': subject_data['x'],
            'y': subject_data['y']
        }
    except KeyError as e:
        raise ValueError(f"Invalid key: {e}. Check if subject ID and fold name are correct.")

# Example usage:
# result = load_transformed_data("TransformedTesting.pkl", test_sub_id=0, fold_name='Fold1')
# X, Y = result['x'], result['y']


In [None]:
import pickle

def load_training_transformed_data(pickle_file_path, test_sub_id):
    """
    Load transformed data (x and y) for a specific test subject.

    Parameters:
        pickle_file_path (str): Path to the saved pickle file.
        test_sub_id (int): Subject index.

    Returns:
        dict: Dictionary with keys 'x' and 'y' for the requested subject.
    """
    with open(pickle_file_path, 'rb') as f:
        transformed_data = pickle.load(f)

    try:
        subject_data = transformed_data[test_sub_id]
        return {
            'x': subject_data['x'],
            'y': subject_data['y']
        }
    except KeyError as e:
        raise ValueError(f"Invalid subject ID: {e}. Check if subject ID is correct.")

# Example usage:
# result = load_transformed_data("TransformedTesting.pkl", test_sub_id=0)
# X, Y = result['x'], result['y']


In [None]:
def SVM_UnCalibrated_Classification(XTrain, XTest, YTrain, YTest):
    
    SVM = SVC(C=1.0, kernel='linear', probability=True)
    
    SVM.fit(XTrain, YTrain)

    # Make predictions on the test set
    y_pred = SVM.predict(XTest)
    y_prob_uncalibrated = SVM.predict_proba(XTest)[:, 1]

    #Calibration curve
    calib_x = np.linspace(0.05, 0.95, 10)
    prob_true_uncal, prob_pred_uncal = calibration_curve(YTest, y_prob_uncalibrated, n_bins=10)

    interp_true_uncal = np.interp(calib_x, prob_pred_uncal, prob_true_uncal, left=np.nan, right=np.nan)

    # Compute performance metrics
    accuracy = accuracy_score(YTest, y_pred) 
    recall = recall_score(YTest,y_pred) 
    precision = precision_score(YTest, y_pred) 
    f1 = f1_score(YTest, y_pred) 
    brier = brier_score_loss(YTest, y_prob_uncalibrated)
    
    metrics = {
        "accuracy": accuracy,
        "recall": recall,
        "precision": precision,
        "f1_score": f1,
        "brier_score": brier,
        "calib_curve_pred": calib_x,
        "calib_curve_true": interp_true_uncal
    }

    return metrics

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.frozen import FrozenEstimator
from sklearn.metrics import brier_score_loss
import numpy as np

def SVM_Calibrated_Classification(XTrain, XTest, YTrain, YTest):

    skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=42)
    
    # Lists to collect metrics per fold
    accuracy_list = []
    recall_list = []
    precision_list = []
    f1_list = []
    brier_list = []
    prob_true_all = []

    calib_x = np.linspace(0.05, 0.95, 10)  # Common bin centers for interpolation

    for fold_idx, (calib_idx, final_test_idx) in enumerate(skf.split(XTest, YTest)):
        
        # Split calibration and final test set
        X_calib, X_final_test = XTest[calib_idx], XTest[final_test_idx]
        Y_calib, Y_final_test = YTest[calib_idx], YTest[final_test_idx]
    
        # Base SVM
        base_SVM = SVC(C=1.0, kernel='linear', probability=True)  
        base_SVM.fit(XTrain, YTrain)
    
        # Calibrated SVM
        calibrated_SVM = CalibratedClassifierCV(FrozenEstimator(base_SVM))
        calibrated_SVM.fit(X_calib, Y_calib)

        # Predictions
        y_pred = calibrated_SVM.predict(X_final_test)
        y_prob = calibrated_SVM.predict_proba(X_final_test)[:, 1]

        # Metrics
        accuracy_list.append(accuracy_score(Y_final_test, y_pred))
        recall_list.append(recall_score(Y_final_test, y_pred))
        precision_list.append(precision_score(Y_final_test, y_pred))
        f1_list.append(f1_score(Y_final_test, y_pred))
        brier_list.append(brier_score_loss(Y_final_test, y_prob))

        # Calibration curve
        prob_true, prob_pred = calibration_curve(Y_final_test, y_prob, n_bins=10)
        interp_true = np.interp(calib_x, prob_pred, prob_true, left=np.nan, right=np.nan)
        prob_true_all.append(interp_true)

    # Average across folds
    prob_true_avg = np.nanmean(prob_true_all, axis=0)

    Metrics_Fold = {
        "accuracy": np.mean(accuracy_list),
        "recall": np.mean(recall_list),
        "precision": np.mean(precision_list),
        "f1_score": np.mean(f1_list),
        "brier_score": np.mean(brier_list),
        "calib_curve_pred": calib_x,
        "calib_curve_true": prob_true_avg
    }

    return Metrics_Fold


# Load transformed training and testing features

In [None]:
# Import (transformed) training and testing features - baseline and spatially filtered 
Baseline_Path = '/home/nicole/Documents/AudioCueWalking_analysis/Variables/AdaptVsNon/LOSO_CV/Dataset/Ns_100_Individual/Advance/Transformed/Ind_TS_Transformed/Baseline'
SpatFilt_Path = '/home/nicole/Documents/AudioCueWalking_analysis/Variables/AdaptVsNon/LOSO_CV/Dataset/Ns_100_Individual/Advance/Transformed/Ind_TS_Transformed/SpatFiltSignals'


Baseline_Training_Path  = os.path.join(Baseline_Path, 'TransformedTraining.pkl')
Baseline_Testing_Path = os.path.join(Baseline_Path, 'TransformedTesting.pkl')


SpatFilt_Training_Path = os.path.join(SpatFilt_Path, 'TransformedTraining.pkl')
SpatFilt_Testing_Path = os.path.join(SpatFilt_Path, 'TransformedTesting.pkl')

# Implement classification

In [None]:
sub_ids = [0]
#fold_names = ['Fold1', 'Fold2']
fold_names = ['Fold1']

In [None]:
import warnings

# Suppress specific FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning, message=".*force_all_finite.*")
warnings.filterwarnings("ignore", message="The `cv='prefit'` option is deprecated")

In [None]:
Results = {}

for test_sub_id in sub_ids:
    print(f"Classification for Subject {test_sub_id}...")
    Results[test_sub_id] = {}
    
    for fold_name in fold_names:
        print(f"\nSubject {test_sub_id}, {fold_name} classification...")
        # ------- Load subject features -------
        Baseline_Training = load_training_transformed_data(Baseline_Training_Path, test_sub_id)
        Baseline_Testing = load_testing_transformed_data(Baseline_Testing_Path, test_sub_id, fold_name)

        SpatFilt_Training = load_training_transformed_data(SpatFilt_Training_Path, test_sub_id)
        SpatFilt_Testing = load_testing_transformed_data(SpatFilt_Testing_Path, test_sub_id, fold_name)
        
        # Load labels 
        Y_Train_Baseline = Baseline_Training['y']
        Y_Train_SpatFilt = SpatFilt_Training['y']

        Y_Test_Baseline = Baseline_Testing['y']
        Y_Test_SpatFilt = SpatFilt_Testing['y']
    
        #Ensure labels are the same and assign baseline as labels 
        if np.array_equal(Y_Train_Baseline, Y_Train_SpatFilt) and np.array_equal(Y_Test_Baseline, Y_Test_SpatFilt):

            Y_train_dataset = Y_Train_Baseline
            Y_test_dataset = Y_Test_Baseline

        else:

            print("Labels do not match")
            sys.exit() 
        
        # Load dataset
        Baseline_Training_data = Baseline_Training['x']
        Baseline_Testing_data = Baseline_Testing['x']

        SpatFilt_Training_data = SpatFilt_Training['x']
        SpatFilt_Testing_data = SpatFilt_Testing['x']

        # Sanity Check 
        print("Loaded sizes: ")
        print("Baseline training and testing: ")
        print(Baseline_Training_data.shape)
        print(Baseline_Testing_data.shape)
        
        print("SpatFilt training and testing: ")
        print(SpatFilt_Training_data.shape)
        print(SpatFilt_Testing_data.shape)
        
        
        print("Labels training and testing: ")
        print(Y_train_dataset.shape)
        print(Y_test_dataset.shape)

        Results[test_sub_id][fold_name] = {}

        # ------- Classify baselines -------
        # print("Baseline classification")
        # Baseline_metrics = SVM_Calibrated_Classification(Baseline_Training_data, Baseline_Testing_data, Y_train_dataset, Y_test_dataset)
        # print(f"Baseline Metrics for test subject {test_sub_id} for fold {fold_name}:")
        # print(Baseline_metrics)

        # #Results[test_sub_id][fold_name]["Baseline"] = Baseline_metrics
        # Results[test_sub_id][fold_name]["Baseline"] = {
        #     "Calib1": Baseline_metrics[0],
        #     "Calib2": Baseline_metrics[1]
        #     }

        # del Baseline_metrics 
        
        # ------- Classify Spatially Filtered (RCSP(A) + Rie) -------
        print("SpatFilt classification")
        SpatFilt_metrics = SVM_UnCalibrated_Classification(SpatFilt_Training_data, SpatFilt_Testing_data, Y_train_dataset, Y_test_dataset)
        #SpatFilt_metrics = SVM_Calibrated_Classification(SpatFilt_Training_data, SpatFilt_Testing_data, Y_train_dataset, Y_test_dataset)

        
        
        print(f"SpatFilt Metrics for test subject {test_sub_id} for fold {fold_name}:")
        print(SpatFilt_metrics)
        
        Results[test_sub_id][fold_name]["SpatFilt"] = SpatFilt_metrics
        # Results[test_sub_id][fold_name]["SpatFilt"] = {
        #     "Calib1": SpatFilt_metrics[0],
        #     "Calib2": SpatFilt_metrics[1]
        #     }

        del SpatFilt_metrics
        
        # ------- Create concatenated features Baseline + SpatFilt (per subject) -------
        Baseline_Train_cat = np.concatenate((SpatFilt_Training_data, Baseline_Training_data), axis=1) #W/out cluster
        Baseline_Test_cat = np.concatenate((SpatFilt_Testing_data, Baseline_Testing_data), axis=1)


        # ------- Classify concatenation -------
        print("Concat classification")
        Concat_metrics = SVM_UnCalibrated_Classification(Baseline_Train_cat , Baseline_Test_cat, Y_train_dataset, Y_test_dataset)
        #Concat_metrics = SVM_Calibrated_Classification(Baseline_Train_cat , Baseline_Test_cat, Y_train_dataset, Y_test_dataset)
        
        print(f"Concat Metrics for test subject {test_sub_id} for fold {fold_name}:")
        print(Concat_metrics)

        Results[test_sub_id][fold_name]["Concat"] = Concat_metrics
        # Results[test_sub_id][fold_name]["Concat"] = {
        #     "Calib1": Concat_metrics[0],
        #     "Calib2": Concat_metrics[1]
        #     }
        
        
        del Concat_metrics
        
        #del Baseline_Training_data , Baseline_Testing_data
        del SpatFilt_Training_data, SpatFilt_Testing_data, Baseline_Train_cat, Baseline_Test_cat

        #Save 'Results'
        filename = f"Results_Subject{test_sub_id}_{fold_name}_WCalibration.pkl"
        with open(filename, 'wb') as f:
            pickle.dump(Results, f)
        
        gc.collect()
    
        print(f"Cleared variables for test subject {test_sub_id} for fold {fold_name}")

# Finish

In [None]:
end_time = time.time()
elapsed_time = end_time - start_time
print("Elapsed time:", elapsed_time/60, "minutes")

# Calibration Curves

In [None]:
import matplotlib.pyplot as plt

# Define the three ratios you are interested in
ratios = [0.3, 0.5, 0.7]
colors = ['r', 'g', 'b']  # Red for 0.3, Green for 0.5, Blue for 0.7
labels = ['30:70', '50:50', '70:30']  # Labels for the legend

# Loop through the ratios
for ratio, color, label in zip(ratios, colors, labels):
    calib_curve_true = Results[test_sub_id][fold_name]["Concat"][ratio]["calib_curve_true"]
    calib_curve_pred = Results[test_sub_id][fold_name]["Concat"][ratio]["calib_curve_pred"]
    
    # Plot the calibration curve for this ratio
    plt.plot(calib_curve_pred, calib_curve_true, marker='o', color=color, label=label)

# Ideal calibration line (diagonal line)
plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfect Calibration')

# Labels and title
plt.xlabel('Mean predicted probability')
plt.ylabel('Fraction of positives')
plt.title(f'Calibration Curves for Test Subject {test_sub_id}, PS Fold: {fold_name}')
plt.legend()
plt.grid(True)
plt.show()