<a href="https://colab.research.google.com/github/Srinjoy2002/eeg_classify/blob/main/eeg_emotion_analysis1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'seed-iv:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F218098%2F472319%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20241010%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20241010T190506Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D7cf03874ded18db1cfc1e959297ebf6b5d5aae3ddce1a730a6a5d91aa978b5124be71e9477fb5177dce037a37c6c2c3b3d9e04f659d3fe69bda0e5cbb841b8ad0a2707caed5680186cacc9d2afdf2f25ff5b166306c8fa60bd8243f892ec4f9c41800de91e3f77911e762f8e886317149dac6afc3a8d8d1424a19e9a872ca037defee6b38afb4af8c89294dd4cf65df2589d01dc34e03d013618f61d74254b77e2780c59ab1c85d1ed40874096dc437939127a00872e6d21dc6fae36a55c0679b17a83251ccb16cce16c4a73291ec1423273b63db1184500032adfda93173afc291abf0ec76ab90a35eadb7a840bee68465bdb913b1cef3627510d3ae544fcfb'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import numpy as np
import tensorflow as tf

import scipy
import os
from os import listdir
from scipy.io import loadmat
import pickle
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from tensorflow.keras import regularizers
from tensorflow.keras import activations

In [None]:
import os
import numpy as np
from scipy.io import loadmat

# Correct paths for Kaggle dataset
directories = ["/kaggle/input/seed-iv/eeg_feature_smooth/{}".format(i+1) for i in range(3)]
print(directories)

# EEG channel coordinates
channel_coords = [
    ['0', '0', 'AF3', 'FP1', 'FPZ', 'FP2', 'AF4', '0', '0'],
    ['F7', 'F5', 'F3', 'F1', 'FZ', 'F2', 'F4', 'F6', 'F8'],
    ['FT7', 'FC5', 'FC3', 'FC1', 'FCZ', 'FC2', 'FC4', 'FC6', 'FT8'],
    ['T7', 'C5', 'C3', 'C1', 'CZ', 'C2', 'C4', 'C6', 'T8'],
    ['TP7', 'CP5', 'CP3', 'CP1', 'CPZ', 'CP2', 'CP4', 'CP6', 'TP8'],
    ['P7', 'P5', 'P3', 'P1', 'PZ', 'P2', 'P4', 'P6', 'P8'],
    ['0', 'PO7', 'PO5', 'PO3', 'POZ', 'PO4', 'PO6', 'PO8', '0'],
    ['0', '0', 'CB1', 'O1', 'OZ', 'O2', 'CB2', '0', '0']
]

channel_list = [
    'FP1', 'FPZ', 'FP2', 'AF3', 'AF4', 'F7', 'F5', 'F3', 'F1', 'FZ',
    'F2', 'F4', 'F6', 'F8', 'FT7', 'FC5', 'FC3', 'FC1', 'FCZ', 'FC2',
    'FC4', 'FC6', 'FT8', 'T7', 'C5', 'C3', 'C1', 'CZ', 'C2', 'C4',
    'C6', 'T8', 'TP7', 'CP5', 'CP3', 'CP1', 'CPZ', 'CP2', 'CP4',
    'CP6', 'TP8', 'P7', 'P5', 'P3', 'P1', 'PZ', 'P2', 'P4', 'P6',
    'P8', 'PO7', 'PO5', 'PO3', 'POZ', 'PO4', 'PO6', 'PO8', 'CB1',
    'O1', 'OZ', 'O2', 'CB2'
]

# Verifying the length of channel coordinates
print(len(channel_coords), len(channel_coords[0]))

# Creating a dictionary to map channel names to coordinates
coord_dict = {}
for n in range(len(channel_list)):
    for i, l in enumerate(channel_coords):
        for j, x in enumerate(l):
            if channel_list[n] == x:
                coord_dict[n] = (i, j)
print(coord_dict)

# Initialize the array with the correct shape
n = 24
perSample = ['de_movingAve', 'de_LDS', 'psd_movingAve', 'psd_LDS']
array = np.zeros(shape=(len(directories), len(os.listdir(directories[0])), n, 4, 8, 9, 5, 64))

# Iterate over directories to load and process data
for h, dire in enumerate(directories):
    print(dire)
    data = [loadmat(os.path.join(dire, file)) for file in os.listdir(dire)]
    for i, bigsample in enumerate(data):
        for j in range(n):
            for k, key in enumerate(perSample):
                # Transpose and pad the sample
                sample = np.transpose(np.array(bigsample[key + str(j+1)]), (0, 2, 1))
                sample = np.pad(sample, [(0, 0), (0, 0), (0, 64 - sample.shape[2])])
                # Fill the array based on channel locations
                for l, channel in enumerate(sample):
                    array[h][i][j][k][coord_dict[l][0]][coord_dict[l][1]] = channel

# Output the shape of the final array
print(array.shape)


In [None]:
_X = array.reshape(np.prod(array.shape[0:3]), *array.shape[3:])
print(_X.shape)
X_loso = array[:,:,:,1,:,:,]
X_loso = np.transpose(X_loso, (0,1,2,6,3,4,5))
print(X_loso.shape)

In [None]:
session1_label = [1,2,3,0,2,0,0,1,0,1,2,1,1,1,2,3,2,2,3,3,0,3,0,3]
session2_label = [2,1,3,0,0,2,0,2,3,3,2,3,2,0,1,1,2,1,0,3,0,1,3,1]
session3_label = [1,2,2,1,3,3,3,1,1,2,1,0,2,3,3,0,2,3,0,0,2,0,1,0]
labels = {0: 'neutral', 1: 'sad', 2: 'fear', 3: 'happy'}

y = np.array(session1_label * 15 + session2_label * 15 + session3_label * 15)

print(y.shape)
y_loso = np.reshape(y, (3,15,24))
print(y_loso.shape)

In [None]:
def crossval_loso(generate_model, n_epochs, X, y):
    cvscores = []
    for i in range(15):
        a = [x for x in range(15) if x != i]
        print(a)
        X_train = X[:,a,:,:,:,:,:]
        X_test =  X[:,[i],:,:,:,:,:]
        X_train = X_train.reshape(np.prod(X_train.shape[0:3]), *X_train.shape[3:])
        X_test = X_test.reshape(np.prod(X_test.shape[0:3]), *X_test.shape[3:])
        y_train = y[:,a,:]
        y_test = y[:, [i], :]
        y_train = y_train.flatten()
        y_test = y_test.flatten()
        print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
        model = generate_model()
        model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
        print('------------------------------------------------------------------------')
        print(f'Training for fold {i} ...')
        model.fit(X_train, y_train, epochs=n_epochs, verbose=1) # validation_split=0.2)
        scores = model.evaluate(X_test, y_test, verbose=2)
        print("Score for fold %d - %s: %.6f%%" % (i, model.metrics_names[1], scores[1]*100))
        cvscores.append(scores[1])
    print('------------------------------------------------------------------------')
    print("Avg accuracies: %.6f%% (+/- %.6f%%)" % (np.mean(cvscores), np.std(cvscores)))

In [None]:
def crossval(generate_model, n_epochs, X_train, y_train, X_test, y_test, filename = None):
    kfold = StratifiedKFold(n_splits=4, shuffle=True, random_state=0)
    bestmodel = None
    bestAcc = 0
    cvscores = []
    fold = 1
    for train, test in kfold.split(X_train, y_train):
        model = generate_model()
        model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
        print('------------------------------------------------------------------------')
        print(f'Training for fold {fold} ...')
        model.fit(X_train[train], y_train[train],epochs=n_epochs, verbose=1) # validation_split=0.2)
        scores = model.evaluate(X_train[test], y_train[test], verbose=1)
        print("Score for fold %d - %s: %.2f%%" % (fold, model.metrics_names[1], scores[1]*100))
        if(scores[1] > bestAcc):
            bestAcc = scores[1]
            bestmodel = model
        cvscores.append(scores[1] * 100)
        fold += 1
    print('------------------------------------------------------------------------')
    print("Avg accuracies: %.2f%% (+/- %.2f%%)" % (np.mean(cvscores), np.std(cvscores)))
    test_loss, test_acc = bestmodel.evaluate(X_test,  y_test, verbose=2)
    print('\nTest accuracy:', test_acc)
    if filename:
        pickle.dump( bestmodel, open( filename, "wb" ) )

In [None]:
def fully_conv():
    return tf.keras.Sequential([
    tf.keras.layers.SeparableConv1D(
128, 3, data_format='channels_last', padding="same", activation='relu',input_shape=X.shape[1:]),
    tf.keras.layers.MaxPooling1D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.SeparableConv1D(
128, 3, data_format='channels_last', padding="same", activation='relu'),
    tf.keras.layers.MaxPooling1D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.SeparableConv1D(
128, 3, data_format='channels_last', padding="same", activation='relu'),
    tf.keras.layers.MaxPooling1D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.SeparableConv1D(
128, 3, data_format='channels_last', padding="same", activation='relu'),
    tf.keras.layers.MaxPooling1D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10)
])

In [None]:
def single_conv():
    return tf.keras.Sequential([
    tf.keras.layers.Conv1D(
32, 3, activation='relu',input_shape=X.shape[1:]),
    tf.keras.layers.MaxPooling1D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(32),
    tf.keras.layers.Dense(4)
])
def LSTM():
    return tf.keras.Sequential([
    tf.keras.layers.LSTM(100),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10)
    ])
def dense():
    return tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=X.shape[1:]),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(32),
    tf.keras.layers.Dense(4)
])

In [None]:
def twodConv():
    return tf.keras.Sequential([
    tf.keras.layers.Conv2D(
32, 3, padding="same", activation="relu",input_shape=X.shape[1:]),
    tf.keras.layers.MaxPooling2D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.BatchNormalization(center=True, scale=True),
    tf.keras.layers.Conv2D(
64, 3, padding="same", activation="relu",input_shape=X.shape[1:]),
    tf.keras.layers.MaxPooling2D(
    pool_size=2, strides=2, data_format="channels_last"),
    tf.keras.layers.BatchNormalization(center=True, scale=True),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10)
])
def threedConv():
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Conv3D(32, kernel_size=5, padding="same", input_shape=X.shape[1:]))
    model.add(tf.keras.layers.BatchNormalization(trainable=False))
    model.add(tf.keras.layers.Activation(activations.relu))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.MaxPooling3D(pool_size=(2, 2, 2)))

    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(256))
    model.add(tf.keras.layers.BatchNormalization(trainable=False))
    model.add(tf.keras.layers.Activation(activations.relu))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(10))
    return model
def convlstm2d():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.ConvLSTM2D(16, kernel_size=5, padding="same", return_sequences = False, input_shape=X.shape[1:]))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(256, activation="relu"))
    model.add(tf.keras.layers.Dense(10))
    return model

In [None]:
X = _X.transpose(0, 5, 1,2,3,4)
print(X.shape)
X = X.reshape(X.shape[0], X.shape[1], np.prod(X.shape[2:]))
print(X.shape)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

In [None]:
# crossval(dense, 40, X_train, y_train, X_test, y_test)

In [None]:
# crossval(single_conv, 40, X_train, y_train, X_test, y_test)

In [None]:
X = _X.transpose(0, 5, 2,3, 4, 1)[:,:,:,:,:,:]

X = X.reshape(X.shape[0], X.shape[1], X.shape[2], X.shape[3], np.prod(X.shape[4:]))
print(X.shape)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

In [None]:
# crossval(threedConv, 100, X_train, y_train, X_test, y_test)

In [None]:
# crossval(threedConv, 100, X_train, y_train, X_test, y_test)

In [None]:
X = _X.transpose(0, 5, 2,3, 4, 1)[:,:,:,:,:,1]

X = X.reshape(X.shape[0], X.shape[1], X.shape[2], X.shape[3], np.prod(X.shape[4:]))
print(X.shape)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

In [None]:
crossval(convlstm2d, 50, X_train, y_train, X_test, y_test)

In [None]:
crossval_loso(convlstm2d, 50, X_loso, y_loso)

In [None]:
# crossval_loso(threedConv, 100, X_loso, y_loso)