In [None]:
import pickle
import openpyxl
import numpy as np
import pandas as pd
from pathlib import Path

from scipy.signal import welch
from scipy.stats import skew, kurtosis as kurt, entropy as entr

from sklearn.preprocessing import StandardScaler

Hyperparameters

In [None]:
sf = 128 # sampling frequency = 128 Hz
bins = 10 # used in entropy calculation

Function to load file

In [None]:
def load_file(file_path):
    file_path = Path(file_path) # ensure it is a path object

    if not file_path.exists():
        print(f"❌ File not found: {file_path}")
        return None
    
    try:
        with file_path.open("rb") as f:
            participant_data = pickle.load(f, encoding="latin1")

        # extract first 32 channels out of 40 and remove 3-sec pre-trial baseline from each channel
        eeg_data = participant_data["data"][:, :32, 3*sf:] # shape (40, 32, 63*sf - 3*sf) = (trials, channels, sample values)
        labels = participant_data["labels"][:, :2] # shape (40, 2) = (trials, [valence arousal])

        return eeg_data, labels
    
    except (pickle.UnpicklingError, EOFError) as e:
        print(f"❌ Error loading pickle file: {file_path} -> {e}")
        return None
    
    except Exception as e:
        print(f"❌ Unexpected error while loading {file_path}: {e}")
        return None

Prepare labels for target variable

In [None]:
def calculate_focus_level(valence, arousal):
    if valence >= 4.5 and arousal >= 4.5: # highly focused
        return 1
    else: # distracted
        return 0

In [None]:
def prepare_labels(labels):
    focus_label = np.array([calculate_focus_level(v, a) for v, a in labels])

    return focus_label

Functions to extract bandpower and statistical features

In [None]:
bands = {
    "theta": (4, 8),
    "alpha": (8, 12),
    "beta": (12, 30),
    "gamma": (30, 45)
}

def compute_bandpower(eeg_channel_arr, band_name):
    range = bands[band_name]

    # get array of freqs and corresponding power values
    freqs, psd = welch(eeg_channel_arr, sf, nperseg=256)
    
    # choose freqs within the freq range
    valid_indices = np.logical_and(freqs >= range[0], freqs < range[1])

    # calculate mean of all the power values for one channel
    return np.mean(psd[valid_indices])

In [None]:
def compute_stat_features(eeg_channel_arr):
    mean = np.mean(eeg_channel_arr)
    std = np.std(eeg_channel_arr)
    skewness = skew(eeg_channel_arr)
    kurtosis = kurt(eeg_channel_arr)

    hist, _ = np.histogram(eeg_channel_arr, bins=bins, density=True)
    entropy = entr(hist)

    return mean, std, skewness, kurtosis, entropy

Function to create feature matrix for 1 participant

In [None]:
def create_feature_matrix(eeg_data):
    feature_matrix = []

    for trial in eeg_data:
        trial_features = []

        for channel in trial:
            theta = compute_bandpower(channel, "theta")
            alpha = compute_bandpower(channel, "alpha")
            beta = compute_bandpower(channel, "beta")
            gamma = compute_bandpower(channel, "gamma")

            beta_alpha_ratio = beta / alpha

            mean, std, skewness, kurtosis, entropy = compute_stat_features(channel)

            channel_features = [theta, alpha, beta, gamma, beta_alpha_ratio, mean, std, skewness, kurtosis, entropy]

            trial_features.extend(channel_features)

        feature_matrix.append(trial_features)

    return np.array(feature_matrix)

Create a single featre matrix for all participants

In [None]:
feature_matrix, focus_label = [], []

folder_path = Path("..")/"eeg_emotion_data" # folder containing .dat file for each participant

for participant_file in folder_path.iterdir():
    eeg_data, labels = load_file(participant_file)

    if eeg_data is None or labels is None:
        print(f"⚠️ Skipping {participant_file}: Failed to load data")
        continue

    focus_label.extend(prepare_labels(labels).tolist())

    feature_matrix.append(create_feature_matrix(eeg_data))


feature_matrix = np.concatenate(feature_matrix, axis=0)
focus_label = np.array(focus_label).reshape(-1, 1) # convert labels into a column vector

In [None]:
feature_matrix.shape

In [None]:
focus_label.shape

Standardization

In [None]:
scalar = StandardScaler()
feature_matrix = scalar.fit_transform(feature_matrix)

# add labels column at the end of feature_matrix
feature_matrix = np.hstack((feature_matrix, focus_label))

In [None]:
feature_matrix.shape

Create and save the final matrix as a DataFrame

In [None]:
features = ["Theta", "Alpha", "Beta", "Gamma", "BetaAlpha", "Mean", "Std", "Skew", "Kurt", "Entropy"]

column_names = [f"{feature}_{i+1}" for i in range(32) for feature in features]
column_names.append("Focus_Level")

df = pd.DataFrame(feature_matrix, columns=column_names)
df.to_excel("../1280x321_features_binaryfocus.xlsx", index=False)

In [None]:
df