In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shutil
import pywt
from scipy.stats import entropy
from scipy.io import loadmat
from scipy.signal import welch
from sklearn.decomposition import FastICA
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from skimage.metrics import structural_similarity as ssim

In [None]:
# Define the local folder path where you want to save the CSV files
local_folder = '/content/local_data_folder'

# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)

# to clear the contents of the directory uncomment the next 2 lines
# shutil.rmtree(local_folder, ignore_errors=True)
# os.makedirs(local_folder, exist_ok=True)

In [None]:
raw_data_folder = '/content/drive/MyDrive/Data/raw_data'
# filtered_data_folder = '/content/drive/MyDrive/Data/filtered_data'

In [None]:
files = os.listdir(raw_data_folder)

In [None]:
import numpy as np
from scipy.signal import butter, lfilter

def calculate_alpha(eeg_data_signal):

    # Define the alpha band frequency range
    alpha_low = 8.0  # Lower alpha band frequency (Hz)
    alpha_high = 12.0  # Upper alpha band frequency (Hz)

    # Define the sampling frequency (replace with your actual sampling frequency)
    sampling_frequency = 128  # For example, 1000 Hz

    # Design a bandpass filter to extract the alpha band
    def butter_bandpass(lowcut, highcut, fs, order=5):
        nyquist = 0.5 * fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(order, [low, high], btype='band')
        return b, a

    def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
        b, a = butter_bandpass(lowcut, highcut, fs, order=order)
        y = lfilter(b, a, data, axis=1)
        return y

    # Apply the alpha bandpass filter to the EEG data
    alpha_band_data = butter_bandpass_filter(eeg_data_signal, alpha_low, alpha_high, sampling_frequency)
    return alpha_band_data

    # Now, alpha_band_data contains the filtered EEG data in the alpha frequency range.


In [None]:
def calculate_beta(eeg_data_signal):
    sampling_rate = 128
    central_frequency = 22

    sigma = 1 / (2 * np.pi * central_frequency)
    scales = np.arange(13, 31)


    morlet_coefficients = []
    for channel in eeg_data:
        coefficients, frequencies = pywt.cwt(channel, scales, 'morl', sampling_period=1/sampling_rate)
        morlet_coefficients.append(coefficients)

    beta_band_power_array = np.abs(np.array(morlet_coefficients)) ** 2 #This shape is 32,18,3200
    collapsed_beta_band_power_array = np.mean(beta_band_power_array, axis = 1)

    return collapsed_beta_band_power_array

In [None]:
# loopnumber=0
test_names = ["Mirror_image_sub", "Stroop_sub", "Arithmetic_sub", "Relax_sub"]


beta_shape = (480, 32, 3200)
beta_aggregate = np.zeros(beta_shape)
alpha_aggregate = np.zeros(beta_shape)

# Loop through each .mat file in the folder
loopnumber = 0

for test_name in test_names:
    for file_name in files:
        if file_name.endswith('.mat')and file_name.startswith(test_name):
            data = loadmat(os.path.join(raw_data_folder, file_name))

            x = data['Data']
            df = pd.DataFrame(x)
        # Save the DataFrame as a CSV file in the local folder
            csv_file_name = os.path.splitext(file_name)[0] + '.csv'
            csv_file_path = os.path.join(local_folder, csv_file_name)
            df.to_csv(csv_file_path, index=False)

            eeg_data = pd.read_csv(csv_file_path)
            eeg_array = eeg_data.values
        # print(eeg_array.size)
            eeg_array = eeg_array.T
            n_components = 4
            independent_components = np.zeros((n_components, eeg_array.shape[0], eeg_array.shape[1]))

            ica = FastICA(n_components=n_components, random_state=0)
            for i in range(eeg_array.shape[0]):
                channel_data = eeg_array[i, :].reshape(-1, 1)
                independent_channel = ica.fit_transform(channel_data)
                independent_components[:, i, :] = independent_channel.T


            ica_data = independent_components[0, :, :]
            ica_data=ica_data.T # ica_data.shape


            rows_indices = [i for i in range(32)]

            stress = ica_data[rows_indices, :]

            eeg_data = stress

            alpha_array = calculate_alpha(eeg_data)
            collapsed_beta_band_power_array = calculate_beta(eeg_data)

            beta_aggregate[loopnumber] = collapsed_beta_band_power_array
            alpha_aggregate[loopnumber] = alpha_array
            loopnumber = loopnumber + 1

In [None]:
np.save('/content/drive/My Drive/32_chnl/raw_data_Beta.npy', beta_aggregate)
np.save('/content/drive/My Drive/32_chnl/raw_data_Alpha.npy', alpha_aggregate)

NameError: ignored

In [None]:
beta_aggregate = np.load('/content/drive/My Drive/32_chnl/raw_data_Beta.npy')
alpha_aggregate = np.load('/content/drive/My Drive/32_chnl/raw_data_Alpha.npy')

In [None]:
# Split the original array into four smaller arrays
taskwise_beta_split = np.split(beta_aggregate, 4, axis=0)

# Each split_array will have shape (120, 32, 3200)
beta_Mirror_task, beta_stroop_task, beta_arthimetic_task, beta_rest_task = taskwise_beta_split
# print(beta_Mirror_task.shape)

In [None]:
# Split the original array into four smaller arrays
taskwise_alpha_split = np.split(alpha_aggregate, 4, axis=0)

# Each split_array will have shape (120, 32, 3200)
alpha_Mirror_task, alpha_stroop_task, alpha_arthimetic_task, alpha_rest_task = taskwise_alpha_split
# print(beta_Mirror_task.shape)

In [None]:
def calculate_entropy(input_array):
    entropy_values = np.zeros((120, 1))
    i = 0
    for data in input_array:
        data_T = data.reshape(3200,)

        num_bins = int(np.sqrt(len(data_T)))  # Adjust the number of bins as needed
        # num_bins = 20  # Adjust the number of bins as needed

        # Calculate the histogram and probabilities
        hist, bin_edges = np.histogram(data_T, bins=num_bins, density=True)
        probabilities = hist / np.sum(hist)

        # Calculate entropy using scipy's entropy function
        data_entropy = entropy(probabilities, base=2)
        # print("data entropy is",data_entropy,"\n")
        entropy_values[i] = data_entropy
        i += 1
    return entropy_values

In [None]:
def calculate_psd(input_array, sampling_rate):
    # Calculate PSD for a 3D array of shape (120, 1, 3200) and returns a 2D array of shape (120, 1) with PSD values.
    # Reshape the array to (120, 3200) by removing the middle dimension
    reshaped_array = input_array[:, 0, :]

    # Initialize an array to store the PSD values
    psd_array = np.zeros((120, 1))

    # Calculate PSD for each signal in the reshaped array
    for i in range(120):
        signal = reshaped_array[i, :]
        f, pxx = welch(signal, fs=sampling_rate, nperseg=256)  # Adjust nperseg as needed
        psd_value = np.mean(pxx)  # You can use np.sum(pxx) for total power instead of mean
        psd_array[i, 0] = psd_value

    return psd_array


In [None]:
import numpy as np

feature_matrix_1 = np.empty((0, 96))

for task in taskwise_beta_split:
    each_task_beta_dimensions = (120, 32, 3200)
    channels = []

    for i in range(each_task_beta_dimensions[1]):
        reshaped_array = task[:, i:i + 1, :]
        channels.append(reshaped_array)


    featurerow = np.mean(channels[0], axis=2)

    for i in range(31):
        channel = channels[i+1]
        mean_array = np.mean(channel, axis=2)
        featurerow = np.hstack((featurerow, mean_array))

    for i in range(32):
        channel = channels[i]
        # Calculate entropy and add to the feature row
        entropy_array = calculate_entropy(channel)
        featurerow = np.hstack((featurerow, entropy_array))

    for i in range(32):
        channel = channels[i]
        # Calculate PSD and add to the feature row
        psd_array = calculate_psd(channel, 128)
        featurerow = np.hstack((featurerow, psd_array))

    # Append the feature row to the feature matrix
    feature_matrix_1 = np.vstack((feature_matrix_1, featurerow))

# Ensure feature_matrix dimensions are correct (number of rows, 96 columns)
print(feature_matrix_1.shape)

(480, 96)


In [None]:
feature_matrix_2 = np.empty((0, 96))
for task in taskwise_alpha_split:
    each_task_alpha_dimensions = (120, 32, 3200)
    channels = []

    for i in range(each_task_alpha_dimensions[1]):
        reshaped_array = task[:, i:i + 1, :]
        channels.append(reshaped_array)


    featurerow = np.mean(channels[0], axis=2)

    for i in range(31):
        channel = channels[i+1]
        mean_array = np.mean(channel, axis=2)
        featurerow = np.hstack((featurerow, mean_array))

    for i in range(32):
        channel = channels[i]
        # Calculate entropy and add to the feature row
        entropy_array = calculate_entropy(channel)
        featurerow = np.hstack((featurerow, entropy_array))

    # print(featurerow.min())

    for i in range(32):
        channel = channels[i]
        # Calculate PSD and add to the feature row
        psd_array = calculate_psd(channel, 128)
        featurerow = np.hstack((featurerow, psd_array))

    # Append the feature row to the feature matrix
    feature_matrix_2 = np.vstack((feature_matrix_2, featurerow))

# Ensure feature_matrix dimensions are correct (number of rows, 96 columns)
print(feature_matrix_2.shape)


(480, 96)


In [None]:
feature_matrix = np.hstack((feature_matrix_1, feature_matrix_2))

In [None]:
feature_matrix.shape

(480, 192)

In [None]:
# Assuming 'feature_matrix' is your NumPy array
df = pd.DataFrame(feature_matrix)
df.to_csv('/content/drive/My Drive/32_chnl/raw_feature_matrix.csv', index=False)

In [None]:
X = pd.read_csv('/content/drive/My Drive/32_chnl/raw_feature_matrix.csv')
y = np.concatenate([np.ones(360), np.zeros(120)])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Import necessary libraries
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, roc_curve, auc
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier


# Assuming X is your feature matrix and y is your target labels
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Logistic Regression
logistic_regression_model = LogisticRegression()
logistic_regression_model.fit(X_train, y_train)
logistic_regression_predictions = logistic_regression_model.predict(X_test)
logistic_regression_accuracy = accuracy_score(y_test, logistic_regression_predictions)
logistic_regression_f1 = f1_score(y_test, logistic_regression_predictions, average='weighted')
logistic_regression_auc_roc = roc_auc_score(y_test, logistic_regression_model.predict_proba(X_test)[:, 1])

# Support Vector Machine (SVM)
svm_model = SVC(kernel='linear')  # You can choose different kernel functions
svm_model.fit(X_train, y_train)
svm_predictions = svm_model.predict(X_test)
svm_accuracy = accuracy_score(y_test, svm_predictions)
svm_f1 = f1_score(y_test, svm_predictions, average='weighted')
svm_auc_roc = roc_auc_score(y_test, svm_model.decision_function(X_test))

# Random Forest
random_forest_model = RandomForestClassifier()
random_forest_model.fit(X_train, y_train)
random_forest_predictions = random_forest_model.predict(X_test)
random_forest_accuracy = accuracy_score(y_test, random_forest_predictions)
random_forest_f1 = f1_score(y_test, random_forest_predictions, average='weighted')
random_forest_auc_roc = roc_auc_score(y_test, random_forest_model.predict_proba(X_test)[:, 1])

# Gradient Boosting
gradient_boosting_model = GradientBoostingClassifier()
gradient_boosting_model.fit(X_train, y_train)
gradient_boosting_predictions = gradient_boosting_model.predict(X_test)
gradient_boosting_accuracy = accuracy_score(y_test, gradient_boosting_predictions)
gradient_boosting_f1 = f1_score(y_test, gradient_boosting_predictions, average='weighted')
gradient_boosting_auc_roc = roc_auc_score(y_test, gradient_boosting_model.predict_proba(X_test)[:, 1])

# K-Nearest Neighbors (K-NN)
knn_model = KNeighborsClassifier(n_neighbors=5)  # You can adjust the number of neighbors
knn_model.fit(X_train, y_train)
knn_predictions = knn_model.predict(X_test)
knn_accuracy = accuracy_score(y_test, knn_predictions)
knn_f1 = f1_score(y_test, knn_predictions, average='weighted')
knn_auc_roc = roc_auc_score(y_test, knn_model.predict_proba(X_test)[:, 1])

In [None]:
print(f"Logistic Regression Accuracy: {logistic_regression_accuracy * 100:.2f}%")
print(f"SVM Accuracy: {svm_accuracy * 100:.2f}%")
print(f"Random Forest Accuracy: {random_forest_accuracy * 100:.2f}%")
print(f"Gradient Boosting Accuracy: {gradient_boosting_accuracy * 100:.2f}%")
print(f"K-NN Accuracy: {knn_accuracy * 100:.2f}%")

Logistic Regression Accuracy: 69.79%
SVM Accuracy: 68.75%
Random Forest Accuracy: 72.92%
Gradient Boosting Accuracy: 70.83%
K-NN Accuracy: 71.88%


In [None]:
print(f"Logistic Regression F1 Score: {logistic_regression_f1 * 100:.2f}%")
print(f"SVM F1 Score: {svm_f1 * 100:.2f}%")
print(f"Random Forest F1 Score: {random_forest_f1 * 100:.2f}%")
print(f"Gradient Boosting F1 Score: {gradient_boosting_f1 * 100:.2f}%")
print(f"knn F1 Score: {knn_f1 * 100:.2f}%")

Logistic Regression F1 Score: 64.09%
SVM F1 Score: 65.30%
Random Forest F1 Score: 63.29%
Gradient Boosting F1 Score: 63.54%
knn F1 Score: 69.13%


In [None]:
print(f"Logistic Regression AUC-ROC Score: {logistic_regression_auc_roc * 100:.2f}%")
print(f"SVM AUC-ROC Score: {svm_auc_roc * 100:.2f}%")
print(f"Random Forest AUC-ROC Score: {random_forest_auc_roc * 100:.2f}%")
print(f"Gradient Boosting AUC-ROC Score: {gradient_boosting_auc_roc * 100:.2f}%")
print(f"K-NN AUC-ROC Score: {knn_auc_roc * 100:.2f}%")

Logistic Regression AUC-ROC Score: 67.91%
SVM AUC-ROC Score: 62.80%
Random Forest AUC-ROC Score: 67.53%
Gradient Boosting AUC-ROC Score: 68.52%
K-NN AUC-ROC Score: 74.53%


In [None]:
channels = ["CZ", "FZ", "Fp1", "F7", "F3", "FC1", "C3", "FC5", "FT9", "T7", "CP5", "CP1", "P3", "P7", "PO9", "O1", "PZ", "OZ", "O2", "PO10", "P8", "P4", "CP2", "CP6", "T8", "FT10", "FC6", "C4", "FC2", "F4", "F8", "Fp2"]

# Create a mapping dictionary from numbers to channel names
channel_mapping = {i+1: channel for i, channel in enumerate(channels)}

# Check the mapping
print(channel_mapping)


{1: 'CZ', 2: 'FZ', 3: 'Fp1', 4: 'F7', 5: 'F3', 6: 'FC1', 7: 'C3', 8: 'FC5', 9: 'FT9', 10: 'T7', 11: 'CP5', 12: 'CP1', 13: 'P3', 14: 'P7', 15: 'PO9', 16: 'O1', 17: 'PZ', 18: 'OZ', 19: 'O2', 20: 'PO10', 21: 'P8', 22: 'P4', 23: 'CP2', 24: 'CP6', 25: 'T8', 26: 'FT10', 27: 'FC6', 28: 'C4', 29: 'FC2', 30: 'F4', 31: 'F8', 32: 'Fp2'}


In [None]:
import numpy as np
from sklearn.feature_selection import mutual_info_classif
from sklearn.preprocessing import StandardScaler

# Assuming you have a feature matrix X of shape (480, 96)
# where each row represents a sample (EEG recording) and each column is a feature

# Define your target variable (stress levels), let's call it y
# Make sure y is of shape (480,)
y = np.concatenate([np.ones(360), np.zeros(120)])

# Scale your feature matrix
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Calculate mutual information scores for each channel
mi_scores = mutual_info_classif(X_scaled, y)

# Create a list of (channel index, channel name, mutual information score) tuples
channel_scores = [(i, channel_mapping[i], score) for i, score in enumerate(mi_scores) if i in channel_mapping]

# Sort the list by mutual information score in descending order
channel_scores.sort(key=lambda x: x[2], reverse=True)
z=1
# Print the channels with their mutual information scores in decreasing order
for i, channel, score in channel_scores:
    print(f"{z}\t Channel {i} - {channel}: Mutual Information Score = {score}")
    z += 1


1	 Channel 27 - FC6: Mutual Information Score = 0.0566038026139597
2	 Channel 30 - F4: Mutual Information Score = 0.0466492246072423
3	 Channel 10 - T7: Mutual Information Score = 0.03740851384866817
4	 Channel 29 - FC2: Mutual Information Score = 0.035322690433829296
5	 Channel 32 - Fp2: Mutual Information Score = 0.030122597233637416
6	 Channel 14 - P7: Mutual Information Score = 0.027592471474403357
7	 Channel 17 - PZ: Mutual Information Score = 0.02707825969120048
8	 Channel 15 - PO9: Mutual Information Score = 0.025442390378667135
9	 Channel 12 - CP1: Mutual Information Score = 0.02527606481843958
10	 Channel 25 - T8: Mutual Information Score = 0.02354290814387472
11	 Channel 16 - O1: Mutual Information Score = 0.020351003724386807
12	 Channel 19 - O2: Mutual Information Score = 0.020090487507449906
13	 Channel 26 - FT10: Mutual Information Score = 0.017653252573319733
14	 Channel 20 - PO10: Mutual Information Score = 0.01684836954059521
15	 Channel 4 - F7: Mutual Information Scor

In [None]:
channels = [27, 30]
for channel in channels:
  selected_columns = [channel+63]
  psd = [0, 0, 0, 0]
  for i in range(4):
    selected_rows = [(i*120) + a for a in range(120)]
    psd[i] = np.mean(feature_matrix[selected_rows][:, selected_columns])

  print("for channel ", channel_mapping[channel])
  print("\tMirror Image Test : ", psd[0])
  print("\tStroop Color Test : ", psd[1])
  print("\tArithmetic Test   : ", psd[2])
  print("\tRelax state       : ", psd[3])

for channel  FC6
	Mirror Image Test :  1.9391661335766255e-05
	Stroop Color Test :  1.5457594697470777e-05
	Arithmetic Test   :  2.358307581634193e-05
	Relax state       :  1.1373670739332619e-05
for channel  F4
	Mirror Image Test :  1.8591146718272506e-05
	Stroop Color Test :  1.786607201798924e-05
	Arithmetic Test   :  1.7909278085325388e-05
	Relax state       :  1.650308811556642e-05
