<h2>Hyperparameter tuning - highest_prob

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import ShuffleSplit, KFold
from mne.decoding import CSP

from sklearn.model_selection import ParameterSampler
import numpy as np


current_directory = os.path.abspath('')

project_root = os.path.abspath(os.path.join(current_directory, '..', '..', '..', '..'))

sys.path.append(project_root)

print("ROOT:", project_root)
from Early_predict_UQ.data.make_dataset import make_data


# epoch tmin  = 2 and tmax = 6 , as the motor imagery task lasted in that time
def early_pred(probabilities, predict, numTimesBelowThreshold, patience, confidence_type, threshold):
    probabilities = probabilities.flatten()
    sorted_probs = sorted(probabilities, reverse=True)
    if confidence_type == 'highest_prob':
        confidence = sorted_probs[0]
    else:
        confidence = 1 - (1 / (1 + (sorted_probs[0] + (sorted_probs[0] - sorted_probs[1]))))
    if confidence > threshold and not predict:
        #print("confindence:", confidence)
        sorted_probs[0]
        numTimesBelowThreshold += 1
        if numTimesBelowThreshold == patience:
            predict = True
    return predict, confidence, numTimesBelowThreshold

def run_sliding_classification(subjects, threshold, patience, confidence_type, w_length, w_step, sfreq):
    scores_across_subjects = []
    prediction_time_across_subjects = []
    current_person = 0
    for person in subjects:
        current_person += 1
        print("Person %d" % (person))
        subject= [person]
        epochs, labels = make_data(subject)
        epochs_train = epochs.copy()
        labels = epochs.events[:, -1] - 4
        epochs_data = epochs.get_data(copy=False)
        epochs_data_train = epochs_train.get_data(copy=False)

        cv = ShuffleSplit(3, test_size=0.2, random_state=42)
        cv_split = cv.split(epochs_data_train)
        scores_cv_splits = []
        predict_time_cv_splits = []

        lda = LinearDiscriminantAnalysis()
        csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)
        current_cv = 0 
        for train_idx, test_idx in cv_split:
            current_cv += 1
            y_train, y_test = labels[train_idx], labels[test_idx]
            X_train = csp.fit_transform(epochs_data_train[train_idx], y_train)
            lda.fit(X_train, y_train)
            w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step) 
            scores_across_epochs = []
            predict_time_across_epochs = []

            for epoch_idx in range(len(test_idx)):
                predict = False
                numTimesBelowThreshold = 0
                for n in w_start:
                    X_test_window = csp.transform(epochs_data_train[test_idx][:, :, n:(n + w_length)])
                    X_test_epoch_window = X_test_window[epoch_idx]
                    probabilities = lda.predict_proba([X_test_epoch_window])
                    probabilities = np.array(probabilities)
                    probabilities = probabilities.flatten()
                    predict, confidence, numTimesBelowThreshold = early_pred(
                        probabilities, predict, numTimesBelowThreshold, patience, confidence_type, threshold
                    )
                    if predict:
                        #IF WE DIDNT PREDICT EARLY, MAYBE PREDICT ON THE WHOLE EPOCH?
                        predict_time = n
                        score = lda.score(X_test_epoch_window.reshape(1, -1), [y_test[epoch_idx]])
                        break
                else:
                    predict_time = n
                    score = lda.score(X_test_epoch_window.reshape(1, -1), [y_test[epoch_idx]])
                predict_time = (predict_time + w_length / 2.0) / sfreq + epochs.tmin
                scores_across_epochs.append(score)
                predict_time_across_epochs.append(predict_time)

            if current_cv == 1:
                scores_cv_splits = np.array(scores_across_epochs)
                predict_time_cv_splits = np.array(predict_time_across_epochs)
            else:
                scores_cv_splits = np.vstack((scores_cv_splits,np.array(scores_across_epochs)))
                predict_time_cv_splits = np.vstack((predict_time_cv_splits,np.array(predict_time_across_epochs)))

        mean_scores_across_cv = np.mean(scores_cv_splits, axis=0)
        mean_predict_time_across_cv = np.mean(predict_time_cv_splits, axis=0)
        if current_person == 1:
            scores_across_subjects  = np.array(mean_scores_across_cv)
            prediction_time_across_subjects = np.array(mean_predict_time_across_cv)
        else:
            scores_across_subjects = np.vstack((scores_across_subjects,np.array(mean_scores_across_cv)))
            prediction_time_across_subjects = np.vstack((predict_time_cv_splits,np.array(mean_predict_time_across_cv)))

        mean_scores_across_subjects = np.mean(scores_across_subjects, axis=0)
        accuracy = np.mean(mean_scores_across_subjects)

        mean_prediction_time_across_subjects = np.mean(prediction_time_across_subjects, axis=0)
        mean_prediction_time = np.mean(mean_prediction_time_across_subjects)
    return accuracy, mean_prediction_time, epochs, labels
 
def create_parameterslist():
    rng = np.random.RandomState(42)
    # Generate random values for w_length and w_step
    w_length_values = np.round(rng.uniform(0.1, 1, 10), 2)
    w_step_values = []

    for w_length in w_length_values:
        # Generate a random step between 0.1 and the value of w_length
        w_step = np.round(rng.uniform(0.1, w_length, 1)[0], 2)
        w_step_values.append(w_step)

    parameters = {
        'csp_components': [2, 4, 6, 8, 10], 
        'w_length': w_length_values, 
        'w_step': w_step_values
    }

    parameters_list = list(ParameterSampler(parameters, n_iter=10, random_state=rng))
    return parameters_list

def hyperparameter_tuning (parameters_list):

    for n, param_set in enumerate(parameters_list):
        csp_components = param_set['csp_components']
        initial_window_length = int(sfreq * param_set['initial_window_length'])  
        expansion_rate = int(sfreq * param_set['expansion_rate'])

        subjects_accuracies, accuracy = run_sliding_classification(subjects, initial_window_length, expansion_rate, csp_components)
        
        mean_accuracy = np.mean(accuracy)

        print(f"Iteration {n+1}/{len(parameters_list)}: Mean accuracy for parameters {param_set} is {mean_accuracy}")

        if mean_accuracy > best_accuracy:
            best_accuracy = mean_accuracy
            best_params = param_set

    return best_params

if __name__ == "__main__":
    subjects = [1, 2, 3, 4, 5, 6, 7, 8, 9]  # 9 subjects
    sfreq = 250      
    epochs, labels = make_data([1]) # just to access the epoch.shape
    epochs_data = epochs.get_data(copy=False)

    #Hyperparameter_tuning
    print("Hyperparameter tuning: \n\n")
    parameters_list = create_parameterslist()
    best_params = hyperparameter_tuning(parameters_list)
    
    print("\n\n Classification: \n\n")
    subjects_accuracies, accuracy, epochs, labels = run_expanding_classification(subjects, best_params['initial_window_length'], best_params['expansion_rate'], best_params['csp_components'])
    accuracy_array = np.array(accuracy)



'''
To do - dynamic stop:
- sliding
    - make the for loops work and contain and provide the mean predict time and score correctly X
    - then advance to for all subjects X
    - modularize
    - the start the hyperparameter tuning to maximize classification accuracy, and minimize predict_time 
    - then loop across all threshold values 
    - make it take into account all the subjects
    - make it work using svm
    - provide the plots for all the subjects for all subjects for each condition, let it just save the plots to a folder automatically (potentially also the values to make plots somewhere else)
    - nb: watch the memory and time usage for codespaces
- expanding:
    - make a new file, adjust to use expanding window
    - save its plots into another folder automaically
            
To do - static:
- make a new file and adjust the dynamic to just use a specific predict times using the cost function
- save the plots

to do - whole:
- already did that 

'''




In [2]:
import numpy as np

rng = np.random.RandomState(42)

# Generate random values for w_length and w_step
w_length_values = np.round(rng.uniform(0.1, 1, 10), 2)
w_step_values = []

for w_length in w_length_values:
    # Generate a random step between 0.1 and the value of w_length
    w_step = np.round(rng.uniform(0.1, w_length, 1)[0], 2)
    w_step_values.append(w_step)

print("w_length_values:", w_length_values)
print("w_step_values:", w_step_values)


w_length_values: [0.44 0.96 0.76 0.64 0.24 0.24 0.15 0.88 0.64 0.74]
w_step_values: [0.11, 0.93, 0.65, 0.21, 0.13, 0.13, 0.12, 0.51, 0.33, 0.29]


<h2> Hyperparameter tuning - cost func

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import ShuffleSplit
from mne.decoding import CSP

current_directory = os.path.abspath('')
project_root = os.path.abspath(os.path.join(current_directory, '..', '..', '..', '..'))
sys.path.append(project_root)
print("ROOT:", project_root)
from Early_predict_UQ.data.make_dataset import make_data

def early_pred(probabilities, predict, numTimesBelowThreshold, patience, confidence_type, threshold):
    probabilities = probabilities.flatten()
    sorted_probs = sorted(probabilities, reverse=True)
    if confidence_type == 'highest_prob':
        confidence = sorted_probs[0]
    else:
        confidence = 1 - (1 / (1 + (sorted_probs[0] + (sorted_probs[0] - sorted_probs[1]))))
    if confidence > threshold and not predict:
        numTimesBelowThreshold += 1
        if numTimesBelowThreshold == patience:
            predict = True
    return predict, confidence, numTimesBelowThreshold

def run_sliding_classification(subjects, threshold, patience, confidence_type, w_length, w_step, sfreq, n_components):
    scores_across_subjects = []
    prediction_time_across_subjects = []
    for person in subjects:
        print("Processing Person %d" % (person))  # Print statement to track progress
        subject= [person]
        epochs, labels = make_data(subject)
        epochs_train = epochs.copy()
        labels = epochs.events[:, -1] - 4
        epochs_data = epochs.get_data(copy=False)
        epochs_data_train = epochs_train.get_data(copy=False)

        cv = ShuffleSplit(10, test_size=0.2, random_state=42)
        cv_split = cv.split(epochs_data_train)
        scores_cv_splits = []
        predict_time_cv_splits = []

        lda = LinearDiscriminantAnalysis()
        csp = CSP(n_components=n_components, reg=None, log=True, norm_trace=False)
        for train_idx, test_idx in cv_split:
            y_train, y_test = labels[train_idx], labels[test_idx]
            X_train = csp.fit_transform(epochs_data_train[train_idx], y_train)
            lda.fit(X_train, y_train)
            w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step)
            scores_across_epochs = []
            predict_time_across_epochs = []

            for epoch_idx in range(len(test_idx)):
                predict = False
                numTimesBelowThreshold = 0
                for n in w_start:
                    X_test_window = csp.transform(epochs_data_train[test_idx][:, :, n:(n + w_length)])
                    X_test_epoch_window = X_test_window[epoch_idx]
                    probabilities = lda.predict_proba([X_test_epoch_window])
                    probabilities = np.array(probabilities)
                    probabilities = probabilities.flatten()
                    predict, confidence, numTimesBelowThreshold = early_pred(
                        probabilities, predict, numTimesBelowThreshold, patience, confidence_type, threshold
                    )
                    if predict:
                        predict_time = n
                        score = lda.score(X_test_epoch_window.reshape(1, -1), [y_test[epoch_idx]])
                        break
                else:
                    predict_time = n
                    score = lda.score(X_test_epoch_window.reshape(1, -1), [y_test[epoch_idx]])
                predict_time = (predict_time + w_length / 2.0) / sfreq + epochs.tmin
                scores_across_epochs.append(score)
                predict_time_across_epochs.append(predict_time)

            scores_cv_splits.append(np.mean(scores_across_epochs))
            predict_time_cv_splits.append(np.mean(predict_time_across_epochs))

        mean_scores_across_cv = np.mean(scores_cv_splits)
        mean_predict_time_across_cv = np.mean(predict_time_cv_splits)

        scores_across_subjects.append(mean_scores_across_cv)
        prediction_time_across_subjects.append(mean_predict_time_across_cv)

    accuracy = np.mean(scores_across_subjects)
    mean_prediction_time = np.mean(prediction_time_across_subjects)

    return accuracy, mean_prediction_time

if __name__ == "__main__":
    threshold = 0.4  # values - {0,1}
    patience = 4  # values - {1,36}
    subjects = [1, 2, 3, 4, 5, 6, 7, 8, 9]  # 9 subjects
    confidence_type = 'cost'  # 'highest_prob' or the cost function. # hyperparameter - maybe compare in different files
    sfreq = 250
    w_length = int(sfreq * 0.5)
    w_step = int(sfreq * 0.5)

    # Define hyperparameter values
    n_components_values = [2, 4, 6, 8, 10]
    w_length_values = [int(sfreq * 0.1), int(sfreq * 0.5), int(sfreq * 1)]
    w_step_values = [int(sfreq * 0.1), int(sfreq * 0.5), int(sfreq * 1)]

    results = []

    for n_components in n_components_values:
        for w_length in w_length_values:
            for w_step in w_step_values:
                print("Evaluating: n_components={}, w_length={}, w_step={}".format(n_components, w_length, w_step))  # Print statement to track progress
                accuracy, mean_prediction_time = run_sliding_classification(subjects, threshold, patience, confidence_type, w_length, w_step, sfreq, n_components)
                results.append((n_components, w_length, w_step, accuracy, mean_prediction_time))

    # Print results
    for result in results:
        print("n_components: {}, w_length: {}, w_step: {}, accuracy: {}, mean_prediction_time: {}".format(*result))