Importing all dependencies

In [1]:
import numpy as np
import pickle as pickle
from sklearn.preprocessing import normalize
from sklearn.ensemble import RandomForestRegressor
import os
import xgboost as xgb
from sklearn.metrics import mean_squared_error, accuracy_score, f1_score
from scipy.signal import welch
import os

In [2]:
channel = [1,2,3,4,6,11,13,17,19,20,21,25,29,31] #14 Channels chosen to fit Emotiv Epoch+
band = [4,8,12,16,25,45] #5 bands
window_size = 256 #Averaging band power of 2 sec
step_size = 16 #Each 0.125 sec update once
sample_rate = 128 #Sampling rate of 128 Hz
subjectList = ['01','02','03','04','05','06','07','08','09','10','11','12','13','14','15','16','17','18','19','20','21','22','23','24','25','26','27','28','29','30','31','32']


In [3]:
def band_power(data, band, sample_rate):
    """Calculate band power for a given signal."""
    freqs, psd = welch(data, sample_rate, nperseg=sample_rate*2)
    bandpower = []
    for (low, high) in zip(band[:-1], band[1:]):
        idx_band = np.logical_and(freqs >= low, freqs < high)
        bandpower.append(np.sum(psd[idx_band]))
    return bandpower

def FFT_Processing(sub, channel, band, window_size, step_size, sample_rate):
    '''
    arguments:  string subject
                list channel indices
                list band
                int window size for FFT
                int step size for FFT
                int sample rate for FFT
    return:     void
    '''
    meta = []
    with open(f'data_preprocessed_python/s{sub}.dat', 'rb') as file:
        subject = pickle.load(file, encoding='latin1')  

        for i in range(40): 
            data = subject["data"][i]
            labels = subject["labels"][i]
            start = 0

            while start + window_size < data.shape[1]:
                meta_array = []
                meta_data = []  
                for j in channel:
                    X = data[j][start: start + window_size] 
                    Y = band_power(X, band, sample_rate)  
                    meta_data = meta_data + list(Y)

                meta_array.append(np.array(meta_data))
                meta_array.append(labels)

                meta.append(np.array(meta_array))    
                start += step_size
                
        meta = np.array(meta)

        if not os.path.exists('out'):
            os.makedirs('out')

        np.save(f'out/s{sub}', meta, allow_pickle=True, fix_imports=True)



def testing(M, L, model):
    '''
    arguments:  M: testing dataset
                L: testing dataset label
                model: scikit-learn model

    return:     void
    '''
    output = model.predict(M[0:468480:32])
    label = L[0:468480:32]

    k = 0
    l = 0

    for i in range(len(label)):
        k = k + (output[i] - label[i]) ** 2 

      
        if (output[i] > 5 and label[i] > 5) or (output[i] < 5 and label[i] < 5):
            l += 1

    print("l2 error:", k / len(label), "classification accuracy:", l / len(label), l, len(label))

In [4]:
for subjects in subjectList:
    FFT_Processing(subjects, channel, band, window_size, step_size, sample_rate)



In [4]:
data_training = []
label_training = []
data_testing = []
label_testing = []
data_validation = []
label_validation = []

for subjects in subjectList:
    file_path = os.path.join('out', f's{subjects}.npy')
    
    with open(file_path, 'rb') as file:
        sub = np.load(file, allow_pickle=True)
        for i in range(sub.shape[0]):
            if i % 8 == 0:
                data_testing.append(sub[i][0])
                label_testing.append(sub[i][1])
            elif i % 8 == 1:
                data_validation.append(sub[i][0])
                label_validation.append(sub[i][1])
            else:
                data_training.append(sub[i][0])
                label_training.append(sub[i][1])


np.save(os.path.join('out', 'data_training'), np.array(data_training), allow_pickle=True, fix_imports=True)
np.save(os.path.join('out', 'label_training'), np.array(label_training), allow_pickle=True, fix_imports=True)
print("training dataset:", np.array(data_training).shape, np.array(label_training).shape)

np.save(os.path.join('out', 'data_testing'), np.array(data_testing), allow_pickle=True, fix_imports=True)
np.save(os.path.join('out', 'label_testing'), np.array(label_testing), allow_pickle=True, fix_imports=True)
print("testing dataset:", np.array(data_testing).shape, np.array(label_testing).shape)

np.save(os.path.join('out', 'data_validation'), np.array(data_validation), allow_pickle=True, fix_imports=True)
np.save(os.path.join('out', 'label_validation'), np.array(label_validation), allow_pickle=True, fix_imports=True)
print("validation dataset:", np.array(data_validation).shape, np.array(label_validation).shape)


training dataset: (468480, 70) (468480, 4)
testing dataset: (78080, 70) (78080, 4)
validation dataset: (78080, 70) (78080, 4)


In [5]:
with open('out\data_training.npy', 'rb') as fileTrain:
    X  = np.load(fileTrain)
    
with open('out\label_training.npy', 'rb') as fileTrainL:
    Y  = np.load(fileTrainL)
    
X = normalize(X)
Z = np.ravel(Y[:, [1]])

Arousal_Train = np.ravel(Y[:, [0]])
Valence_Train = np.ravel(Y[:, [1]])
Domain_Train = np.ravel(Y[:, [2]])
Like_Train = np.ravel(Y[:, [3]])



with open('out\data_validation.npy', 'rb') as fileTrain:
    M  = np.load(fileTrain)
    
with open('out\label_validation.npy', 'rb') as fileTrainL:
    N  = np.load(fileTrainL)

M = normalize(M)
L = np.ravel(N[:, [1]])

Arousal_Test = np.ravel(N[:, [0]])
Valence_Test = np.ravel(N[:, [1]])
Domain_Test = np.ravel(N[:, [2]])
Like_Test = np.ravel(N[:, [3]])

  with open('out\data_training.npy', 'rb') as fileTrain:
  with open('out\label_training.npy', 'rb') as fileTrainL:
  with open('out\data_validation.npy', 'rb') as fileTrain:
  with open('out\label_validation.npy', 'rb') as fileTrainL:


Random Forest Regression 

Testing different number of estimators to compare performace

In [8]:
# Function to evaluate model performance
def evaluate_model(n_estimators, X_train, y_train, X_test, y_test):
    model = RandomForestRegressor(n_estimators=n_estimators, n_jobs=6)
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    accuracy = sum((predictions > 5) == (y_test > 5)) / len(predictions)
    print(f"n_estimators: {n_estimators} - l2 error: {mse}, classification accuracy: {accuracy}")
    return mse, accuracy

# Define the training and testing datasets
X_train = X[0:468480:32]
y_train = Valence_Train[0:468480:32]
X_test = M[0:78080:32]
y_test = Valence_Test[0:78080:32]

# Test with different numbers of estimators
for n in [512,600, 800, 1000]:
    evaluate_model(n, X_train, y_train, X_test, y_test)


n_estimators: 512 - l2 error: 1.9218373778684024, classification accuracy: 0.8372950819672131
n_estimators: 600 - l2 error: 1.9068313006193074, classification accuracy: 0.8381147540983607
n_estimators: 800 - l2 error: 1.918188828644976, classification accuracy: 0.840983606557377
n_estimators: 1000 - l2 error: 1.9076483796302401, classification accuracy: 0.8385245901639344


With 600 eliminators

Liking

In [26]:
n_estimators = 650
min_samples_leaf = 1  
n_jobs = 6  


Lik_R = RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, n_jobs=n_jobs)
Lik_R.fit(X[0:468480:32], Like_Train[0:468480:32])


def testing(X_test, y_test, model):
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    accuracy = sum((predictions > 5) == (y_test > 5)) / len(predictions)
    std_dev = np.std(predictions)
    f1 = f1_score((y_test > 5), (predictions > 5))
    print(f"MSE: {mse}, Classification Accuracy: {accuracy}")
    print(f"Standard Deviation of Predictions: {std_dev}")
    print(f"F1 Score: {f1}")

testing(M[0:78080:32], Like_Test[0:78080:32], Lik_R)


MSE: 2.505508366623144, Classification Accuracy: 0.865983606557377
Standard Deviation of Predictions: 0.9815188453257646
F1 Score: 0.903397341211226


Dominance

In [25]:
n_estimators = 650
min_samples_leaf = 1  
n_jobs = 6  # Number of parallel jobs for fitting

# Train and evaluate the Dominance model
Dom_R = RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, n_jobs=n_jobs)
Dom_R.fit(X[0:468480:32], Domain_Train[0:468480:32])


def testing(X_test, y_test, model):
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    accuracy = sum((predictions > 5) == (y_test > 5)) / len(predictions)
    std_dev = np.std(predictions)
    f1 = f1_score((y_test > 5), (predictions > 5))
    print(f"MSE: {mse}, Classification Accuracy: {accuracy}")
    print(f"Standard Deviation of Predictions: {std_dev}")
    print(f"F1 Score: {f1}")

testing(M[0:78080:32], Domain_Test[0:78080:32], Dom_R)


MSE: 1.8486444326771718, Classification Accuracy: 0.8385245901639344
Standard Deviation of Predictions: 1.0977129969113386
F1 Score: 0.8697951090548579


Arousal

In [24]:
n_estimators = 650
min_samples_leaf = 1 
n_jobs = 6  # Number of parallel jobs for fitting

# Train and evaluate the Arousal model
Aro_R = RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, n_jobs=n_jobs)
Aro_R.fit(X[0:468480:32], Arousal_Train[0:468480:32])


def testing(X_test, y_test, model):
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    accuracy = sum((predictions > 5) == (y_test > 5)) / len(predictions)
    std_dev = np.std(predictions)
    f1 = f1_score((y_test > 5), (predictions > 5))
    print(f"MSE: {mse}, Classification Accuracy: {accuracy}")
    print(f"Standard Deviation of Predictions: {std_dev}")
    print(f"F1 Score: {f1}")

testing(M[0:78080:32], Arousal_Test[0:78080:32], Aro_R)


MSE: 2.143780738814238, Classification Accuracy: 0.8213114754098361
Standard Deviation of Predictions: 0.925116785044467
F1 Score: 0.8474457662701189


Valence

In [23]:
n_estimators = 650
min_samples_leaf = 1  
n_jobs = 6  # Number of parallel jobs for fitting

# Initialize Random Forest Regressor
Val_R = RandomForestRegressor(n_estimators=n_estimators, min_samples_leaf=min_samples_leaf, n_jobs=n_jobs)

# Fit the regressor to the training data
Val_R.fit(X[0:468480:32], Valence_Train[0:468480:32])

# Evaluate on testing data
predictions = Val_R.predict(M[0:78080:32])
mse = mean_squared_error(Valence_Test[0:78080:32], predictions)
accuracy = sum((predictions > 5) == (Valence_Test[0:78080:32] > 5)) / len(predictions)

print(f"L2 error: {mse}, Classification Accuracy: {accuracy}")

# Calculate standard deviation of predictions
std_dev = np.std(predictions)
print(f"Standard Deviation of Predictions: {std_dev}")

# Calculate F1 score
f1 = f1_score((Valence_Test[0:78080:32] > 5), (predictions > 5))
print(f"F1 Score: {f1}")


L2 error: 1.9132658834413618, Classification Accuracy: 0.8422131147540983
Standard Deviation of Predictions: 0.9253473228831534
F1 Score: 0.8649596632760435


In [37]:
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# Function to calculate MAE and print results
def calculate_mae(model, X_test, y_test, label):
    predictions = model.predict(X_test)
    mae = mean_absolute_error(y_test, predictions)
    print(f"{label} - Validation MAE: {mae}")


calculate_mae(Val_R, M[0:78080:32], Valence_Test[0:78080:32], "Valence")

# Calculate MAE for Dominance
calculate_mae(Dom_R, M[0:78080:32], Domain_Test[0:78080:32], "Dominance")

# Calculate MAE for Arousal
calculate_mae(Aro_R, M[0:78080:32], Arousal_Test[0:78080:32], "Arousal")

# Calculate MAE for Liking
calculate_mae(Lik_R, M[0:78080:32], Like_Test[0:78080:32], "Liking")



Valence - Validation MAE: 1.1088813808322822
Dominance - Validation MAE: 1.0850908890290025
Arousal - Validation MAE: 1.2048426103404783
Liking - Validation MAE: 1.292865510718789


In [7]:
pip install mne

Defaulting to user installation because normal site-packages is not writeable
Collecting mne
  Downloading mne-1.7.0-py3-none-any.whl.metadata (13 kB)
Collecting jinja2 (from mne)
  Downloading jinja2-3.1.4-py3-none-any.whl.metadata (2.6 kB)
Collecting lazy-loader>=0.3 (from mne)
  Downloading lazy_loader-0.4-py3-none-any.whl.metadata (7.6 kB)
Collecting pooch>=1.5 (from mne)
  Downloading pooch-1.8.2-py3-none-any.whl.metadata (10 kB)
Downloading mne-1.7.0-py3-none-any.whl (7.4 MB)
   ---------------------------------------- 0.0/7.4 MB ? eta -:--:--
   ---------------------------------------- 0.0/7.4 MB 1.9 MB/s eta 0:00:04
    --------------------------------------- 0.2/7.4 MB 3.7 MB/s eta 0:00:02
   -- ------------------------------------- 0.5/7.4 MB 5.4 MB/s eta 0:00:02
   ---- ----------------------------------- 0.9/7.4 MB 6.3 MB/s eta 0:00:02
   ----- ---------------------------------- 1.1/7.4 MB 5.7 MB/s eta 0:00:02
   ------ --------------------------------- 1.2/7.4 MB 5.5 MB/s 

In [13]:
import numpy as np

# Load the data for the 10th participant
data = np.load('out/s11.npy', allow_pickle=True)

# Print the entire data
print("Data:")
print(data)

Data:
[[array([7.87144981e+02, 3.77151846e+01, 4.32964068e+01, 7.62226562e+01,
         1.08990425e+02, 4.81212478e+02, 7.64365890e+01, 6.84873556e+01,
         9.49679870e+01, 1.48996786e+02, 1.19034088e+04, 1.28909701e+03,
         1.04585046e+03, 1.11679837e+03, 1.91894730e+03, 5.57375580e+02,
         8.01786133e+01, 6.91122706e+01, 6.60244881e+01, 1.47913336e+02,
         1.47408972e+03, 1.11865534e+02, 1.03658569e+02, 1.12319744e+02,
         1.93100540e+02, 1.86821214e+03, 1.13156530e+02, 1.20627550e+02,
         8.07700412e+01, 2.12460194e+02, 2.61552034e+03, 2.14300810e+02,
         2.07436001e+02, 1.96798426e+02, 3.75748378e+02, 1.05539743e+01,
         4.73313102e+00, 2.18818934e+00, 2.32655896e+01, 3.11143297e+01,
         1.14535920e+03, 1.30416770e+02, 1.17319048e+02, 1.30780650e+02,
         1.83715690e+02, 3.51999124e+03, 2.80068108e+02, 2.36238632e+02,
         2.28704726e+02, 4.02428187e+02, 1.62173991e+03, 1.11358592e+02,
         9.23908411e+01, 9.13638390e+01, 1.70

In [11]:
import numpy as np
import matplotlib.pyplot as plt
import mne
from mne.viz import circular_layout, plot_connectivity_circle

# Load the data from the file
file_path = 'out/s01.npy'
data_dict = np.load(file_path, allow_pickle=True).item()

data = data_dict['data']
labels = data_dict['labels']

# Extract dimensions from the loaded data
num_trials, num_channels, num_samples = data.shape

# Assuming there are 4 labels (valence, arousal, dominance, liking)
num_labels = labels.shape[1]

# Print information about the loaded data
print(f"Number of trials: {num_trials}")
print(f"Number of EEG channels: {num_channels}")
print(f"Number of samples per channel: {num_samples}")
print(f"Number of labels (emotions): {num_labels}")

# Define channel names (assuming generic names since specific names are not provided)
# You can use this list to create your plots if specific channel names are not available
channel_names = [f'Channel {i+1}' for i in range(num_channels)]

# Prepare data for plotting
for label_idx, emotion in enumerate(['valence', 'arousal', 'dominance', 'liking']):
    fig, ax = plt.subplots(figsize=(8, 8))
    
    # Extract the data and labels for the current emotion
    emotion_data = labels[:, label_idx]
    connectivity_matrix = data[:, :, 0]  # Assuming we take the first sample for illustration
    
    # Set up the circular layout
    node_order = np.arange(num_channels)
    node_angles = circular_layout(channel_names, node_order, start_pos=90,
                                  group_boundaries=[0, num_channels // 2])
    
    # Plot the connectivity circle
    plot_connectivity_circle(connectivity_matrix, channel_names, n_lines=None,
                             node_angles=node_angles, node_colors=emotion_data,
                             title=emotion.capitalize(), fig=fig, colorbar=True)

    plt.show()


ImportError: cannot import name 'plot_connectivity_circle' from 'mne.viz' (C:\Users\user\AppData\Roaming\Python\Python312\site-packages\mne\viz\__init__.py)