In [1]:
!pip install sklearn-genetic-opt
# Import necessary libraries
import pickle
import numpy as np
import pandas as pd
from scipy.signal import welch
from scipy.integrate import simps
from matplotlib import pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression, Lasso
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn_genetic import GAFeatureSelectionCV, ExponentialAdapter

import time

Collecting sklearn-genetic-opt
  Downloading sklearn_genetic_opt-0.10.1-py3-none-any.whl.metadata (10.0 kB)
Downloading sklearn_genetic_opt-0.10.1-py3-none-any.whl (33 kB)
[0mInstalling collected packages: sklearn-genetic-opt
Successfully installed sklearn-genetic-opt-0.10.1


2024-07-02 05:36:03.137498: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-02 05:36:03.137600: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-02 05:36:03.271515: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
# Function to read data
def read_data(filename):
    x = pickle._Unpickler(open(filename, 'rb'))
    x.encoding = 'latin1'
    data = x.load()
    return data

In [3]:
# Load data
files = [f"{i:02d}" for i in range(1, 23)]
labels = []
data = []
for i in files:
    fileph = "/kaggle/input/deap-dataset/data_preprocessed_python/s" + i + ".dat"
    d = read_data(fileph)
    labels.append(d['labels'])
    data.append(d['data'])

In [4]:
# Convert to numpy arrays
labels = np.array(labels)
data = np.array(data)

In [5]:
# Reshape data
labels = labels.reshape(880, 4)
data = data.reshape(880, 40, 8064)

In [6]:
# Extract EEG data
eeg_data = data[:, :32, :]

# Create labels dataframe
df_label = pd.DataFrame({'Valence': labels[:, 0], 'Arousal': labels[:, 1]})

In [7]:
# Describe and info
print(df_label.describe())
print(df_label.info())

# Binarize labels
labels_valence = (labels[:, 0] > 5).astype(int)
labels_arousal = (labels[:, 1] > 5).astype(int)
labels_dominance = (labels[:, 2] > 5).astype(int)
labels_liking = (labels[:, 3] > 6).astype(int)

          Valence     Arousal
count  880.000000  880.000000
mean     5.218034    5.238898
std      2.093837    1.879631
min      1.000000    1.000000
25%      3.650000    3.895000
50%      5.040000    5.490000
75%      7.040000    6.795000
max      9.000000    9.000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 880 entries, 0 to 879
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype  
---  ------   --------------  -----  
 0   Valence  880 non-null    float64
 1   Arousal  880 non-null    float64
dtypes: float64(2)
memory usage: 13.9 KB
None


In [8]:
# Function to compute band power
def bandpower(data, sf, band):
    band = np.asarray(band)
    low, high = band
    nperseg = (2 / low) * sf
    freqs, psd = welch(data, sf, nperseg=nperseg)
    freq_res = freqs[1] - freqs[0]
    idx_band = np.logical_and(freqs >= low, freqs <= high)
    bp = simps(psd[idx_band], dx=freq_res)
    return bp

In [9]:
# Get band power
def get_band_power(people, channel, band):
    bands = {"delta": (0.5, 4), "theta": (4, 8), "alpha": (8, 12), "beta": (12, 30), "gamma": (30, 64)}
    return bandpower(eeg_data[people, channel], 128, bands[band])

eeg_band = []
for i in range(len(eeg_data)):
    for j in range(len(eeg_data[0])):
        eeg_band.extend([get_band_power(i, j, b) for b in ["delta", "theta", "alpha", "beta", "gamma"]])

eeg_band = np.array(eeg_band).reshape((880, 160))  # 5 bands x 32 channels
np.save("eeg_band.npy", eeg_band)
eeg_band = np.load("eeg_band.npy")

In [10]:
labels_combined = np.vstack((labels_valence, labels_arousal)).T
model_names = ['Logistic Regression', 'Random Forest', 'XGBoost', 'SVM']
models_lr = LogisticRegression(),
models_rf = RandomForestClassifier()
models_xgb = XGBClassifier(),
models_svc = SVC (kernel = 'linear')

In [11]:
# Function to compute metrics
def metrics(y_test, pred):
    accuracy = accuracy_score(y_test, pred)
    precision = precision_score(y_test, pred, average='macro')
    recall = recall_score(y_test, pred, average='macro')
    f1 = f1_score(y_test, pred, average='macro')
    return accuracy, precision, recall, f1

**without feature selection**

In [26]:
# Cross-validation and classification with feature selection
results = {name: {'accuracy': [], 'precision': [], 'recall': [], 'f1': [], 'time': []} for name in model_names}
cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
  # Adjust the number of features to select
def ML(model):
    print(model)
    # for model_name, model in zip(model_names, models_rf): 
    for label_type in range(2):  # For each label (valence, arousal, dominance, liking)
        X, y = eeg_band, labels_combined[:, label_type]
        scaler = StandardScaler()
        X = scaler.fit_transform(X)
        mean_acc = []
        mean_f1 = []
        for i, (train_index, test_index) in enumerate(cv.split(X, y)):
    #         print(f"{model_name} - Label {label_type} - Fold {i+1}")
            start_time = time.perf_counter()

            X_train, X_test = X_selected[train_index], X_selected[test_index]
            y_train, y_test = y[train_index], y[test_index]

            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)

            model.fit(X_train, y_train)
            pred = model.predict(X_test)
            accuracy, precision, recall, f1 = metrics(y_test, pred)
            mean_acc.append(accuracy)
            mean_f1.append(f1)
            results[model_name]['accuracy'].append(accuracy)
            results[model_name]['precision'].append(precision)
            results[model_name]['recall'].append(recall)
            results[model_name]['f1'].append(f1)

            elapsed_time = time.perf_counter() - start_time
            results[model_name]['time'].append(elapsed_time)
#             print(f"Time: {elapsed_time:.2f} seconds")
        if label_type == 0:
            print('Valence')
        else:
            print('Arousal')
        print('mean acc = ', np.mean(mean_acc))
        print('mean f1 = ', np.mean(mean_f1))
        print('mean time = ', np.mean(results[model_name]['time']))
# # Display results
# for model_name in model_names:
#     print(f"{model_name} Results:")
#     for metric in ['accuracy', 'precision', 'recall', 'f1', 'time']:
#         values = results[model_name][metric]
#         print(f"{metric.capitalize()}: Mean = {np.mean(values)* 100:.2f}, Std = {np.std(values):.4f}")


In [27]:
ML(LogisticRegression())
ML(RandomForestClassifier())  
ML(XGBClassifier())
ML(SVC(kernel = 'linear'))

LogisticRegression()


NameError: name 'X_selected' is not defined

**Filter-based feature selection method**


In [28]:
# Cross-validation and classification with feature selection
results = {name: {'accuracy': [], 'precision': [], 'recall': [], 'f1': [], 'time': []} for name in model_names}
cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
  # Adjust the number of features to select
def ML(model, selected_features):
    print(model)
    print('selected_features = ', selected_features)
    num_features_to_select = selected_features
    # for model_name, model in zip(model_names, models_rf): 
    for label_type in range(2):  # For each label (valence, arousal, dominance, liking)
        X, y = eeg_band, labels_combined[:, label_type]
        selector = SelectKBest(f_classif, k=num_features_to_select)
        X_selected = selector.fit_transform(X, y)
        scaler = StandardScaler()
        X_selected = scaler.fit_transform(X_selected)
        mean_acc = []
        mean_f1 = []
        for i, (train_index, test_index) in enumerate(cv.split(X_selected, y)):
    #         print(f"{model_name} - Label {label_type} - Fold {i+1}")
            start_time = time.perf_counter()

            X_train, X_test = X_selected[train_index], X_selected[test_index]
            y_train, y_test = y[train_index], y[test_index]

            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)

            model.fit(X_train, y_train)
            pred = model.predict(X_test)
            accuracy, precision, recall, f1 = metrics(y_test, pred)
            mean_acc.append(accuracy)
            mean_f1.append(f1)
            results[model_name]['accuracy'].append(accuracy)
            results[model_name]['precision'].append(precision)
            results[model_name]['recall'].append(recall)
            results[model_name]['f1'].append(f1)

            elapsed_time = time.perf_counter() - start_time
            results[model_name]['time'].append(elapsed_time)
#             print(f"Time: {elapsed_time:.2f} seconds")
        if label_type == 0:
            print('Valence')
        else:
            print('Arousal')
        print('mean acc = ', np.mean(mean_acc))
        print('mean f1 = ', np.mean(mean_f1))
        print('mean time = ', np.mean(results[model_name]['time']))
# # Display results
# for model_name in model_names:
#     print(f"{model_name} Results:")
#     for metric in ['accuracy', 'precision', 'recall', 'f1', 'time']:
#         values = results[model_name][metric]
#         print(f"{metric.capitalize()}: Mean = {np.mean(values)* 100:.2f}, Std = {np.std(values):.4f}")


In [29]:
ML(LogisticRegression(), 50)
ML(RandomForestClassifier(), 50)  
ML(XGBClassifier(), 50)
ML(SVC(kernel = 'linear'), 50)
ML(LogisticRegression(), 100)
ML(RandomForestClassifier(), 100)  
ML(XGBClassifier(), 100)
ML(SVC(kernel = 'linear'), 100)
ML(LogisticRegression(), 150)
ML(RandomForestClassifier(), 150)  
ML(XGBClassifier(), 150)
ML(SVC(kernel = 'linear'), 150)

LogisticRegression()
selected_features =  50


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


NameError: name 'model_name' is not defined

**Embedded-based feature selection method**

In [None]:
from sklearn.model_selection import GridSearchCV
# Cross-validation and classification with feature selection
results = {name: {'accuracy': [], 'precision': [], 'recall': [], 'f1': [], 'time': []} for name in model_names}
cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)


def ML(model):
    print(model)
    # for model_name, model in zip(model_names, models_rf): 
    for label_type in range(2):  # For each label (valence, arousal, dominance, liking)
        X, y = eeg_band, labels_combined[:, label_type]
        selected_features = []
        for label_index in range(labels_combined.shape[1]):
            y = labels_combined[:, label_index]
            lasso = Lasso()
            params = {'alpha': [0.0001, 0.001, 0.01, 0.1, 1, 10]}
            cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
            lasso_cv = GridSearchCV(lasso, param_grid=params, cv=cv)
            lasso_cv.fit(eeg_band, y)
            best_lasso = lasso_cv.best_estimator_
            lasso_coef = np.abs(best_lasso.coef_)
            selected_features.append(np.where(lasso_coef > 0.000001)[0])
        # Combine selected features for all labels
        selected_features = np.unique(np.concatenate(selected_features))
        print("Selected Feature: {}".format(len(selected_features)))
        X_selected = eeg_band[:, selected_features]
        scaler = StandardScaler()
        X_selected = scaler.fit_transform(X_selected)
        mean_acc = []
        mean_f1 = []
        for i, (train_index, test_index) in enumerate(cv.split(X_selected, y)):
    #         print(f"{model_name} - Label {label_type} - Fold {i+1}")
            start_time = time.perf_counter()

            X_train, X_test = X_selected[train_index], X_selected[test_index]
            y_train, y_test = y[train_index], y[test_index]

            scaler = StandardScaler()
            X_train = scaler.fit_transform(X_train)
            X_test = scaler.transform(X_test)

            model.fit(X_train, y_train)
            pred = model.predict(X_test)
            accuracy, precision, recall, f1 = metrics(y_test, pred)
            mean_acc.append(accuracy)
            mean_f1.append(f1)
            results[model_name]['accuracy'].append(accuracy)
            results[model_name]['precision'].append(precision)
            results[model_name]['recall'].append(recall)
            results[model_name]['f1'].append(f1)

            elapsed_time = time.perf_counter() - start_time
            results[model_name]['time'].append(elapsed_time)
#             print(f"Time: {elapsed_time:.2f} seconds")
        if label_type == 0:
            print('Valence')
        else:
            print('Arousal')
        print('mean acc = ', np.mean(mean_acc))
        print('mean f1 = ', np.mean(mean_f1))
        print('mean time = ', np.mean(results[model_name]['time']))
# # Display results
# for model_name in model_names:
#     print(f"{model_name} Results:")
#     for metric in ['accuracy', 'precision', 'recall', 'f1', 'time']:
#         values = results[model_name][metric]
#         print(f"{metric.capitalize()}: Mean = {np.mean(values)* 100:.2f}, Std = {np.std(values):.4f}")


In [None]:
ML(LogisticRegression())
ML(RandomForestClassifier())  
ML(XGBClassifier())
ML(SVC(kernel = 'linear'))

**Wrapper-based feature selection method**

In [16]:
# Cross-validation and classification with feature selection
results = {name: {'accuracy': [], 'precision': [], 'recall': [], 'f1': [], 'time': []} for name in model_names}
cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)


def ML(model):
    print(model)
    clf = SVC(kernel='linear')
    # for model_name, model in zip(model_names, models_rf): 
    for label_type in range(2):  # For each label (valence, arousal, dominance, liking)
        X, y = eeg_band, labels_combined[:, label_type]
        scaler = StandardScaler()
        X = scaler.fit_transform(X)
        X = scaler.transform(X)
        mutation_scheduler = ExponentialAdapter(0.8, 0.2, 0.01)
        crossover_scheduler = ExponentialAdapter(0.2, 0.8, 0.01)
        evolved_estimator = GAFeatureSelectionCV(
            estimator=clf,
            scoring="accuracy",
            population_size=30,
            generations=20,
            mutation_probability=mutation_scheduler,
            crossover_probability=crossover_scheduler,
            n_jobs=-1,
            cv=5,
            verbose=True
        )
        
        evolved_estimator.fit(X, y)
        reduced_X = evolved_estimator.transform(X)
        print(f'Number of selected features for label: {reduced_X.shape[1]}')
        scaler = StandardScaler()
        X_selected = scaler.fit_transform(reduced_X)
        mean_acc = []
        mean_f1 = []
        for i, (train_index, test_index) in enumerate(cv.split(X_selected, y)):
    #         print(f"{model_name} - Label {label_type} - Fold {i+1}")
            start_time = time.perf_counter()

            X_train, X_test = X_selected[train_index], X_selected[test_index]
            y_train, y_test = y[train_index], y[test_index]

#             scaler = StandardScaler()
#             X_train = scaler.fit_transform(X_train)
#             X_test = scaler.transform(X_test)

            clf.fit(X_train, y_train)
            pred = clf.predict(X_test)
            accuracy, precision, recall, f1 = metrics(y_test, pred)
            mean_acc.append(accuracy)
            mean_f1.append(f1)
            results['SVM']['accuracy'].append(accuracy)
            results['SVM']['precision'].append(precision)
            results['SVM']['recall'].append(recall)
            results['SVM']['f1'].append(f1)

            elapsed_time = time.perf_counter() - start_time
            results['SVM']['time'].append(elapsed_time)
#             print(f"Time: {elapsed_time:.2f} seconds")
        if label_type == 0:
            print('Valence')
        else:
            print('Arousal')
        print('mean acc = ', np.mean(mean_acc))
        print('mean f1 = ', np.mean(mean_f1))
        print('mean time = ', np.mean(results['SVM']['time']))
# # Display results
# for model_name in model_names:
#     print(f"{model_name} Results:")
#     for metric in ['accuracy', 'precision', 'recall', 'f1', 'time']:
#         values = results[model_name][metric]
#         print(f"{metric.capitalize()}: Mean = {np.mean(values)* 100:.2f}, Std = {np.std(values):.4f}")


In [17]:
# ML(LogisticRegression())
# ML(RandomForestClassifier())  
# ML(XGBClassifier())
ML(SVC(kernel = 'linear'))

SVC(kernel='linear')
gen	nevals	fitness 	fitness_std	fitness_max	fitness_min
0  	30    	0.521439	0.00750765 	0.535227   	0.511364   
1  	60    	0.528523	0.00721897 	0.535227   	0.519318   
2  	60    	0.529621	0.00691804 	0.535227   	0.519318   
3  	60    	0.525871	0.00740344 	0.535227   	0.511364   
4  	60    	0.529962	0.00695867 	0.535227   	0.519318   
5  	60    	0.533409	0.00467247 	0.535227   	0.519318   
6  	60    	0.53072 	0.00702639 	0.535227   	0.514773   
7  	60    	0.533902	0.00399168 	0.535227   	0.520455   
8  	60    	0.53322 	0.00527876 	0.535227   	0.514773   
9  	60    	0.530644	0.00721976 	0.535227   	0.514773   
10 	60    	0.532197	0.00610777 	0.535227   	0.519318   
11 	60    	0.530871	0.00814218 	0.535227   	0.511364   
12 	60    	0.5325  	0.00555619 	0.535227   	0.518182   
13 	60    	0.532311	0.00587317 	0.535227   	0.519318   
14 	60    	0.532689	0.00570211 	0.535227   	0.519318   
15 	60    	0.532273	0.00672666 	0.535227   	0.514773   
16 	60    	0.533447	0.00457