In [5]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import warnings
warnings.filterwarnings("ignore")

In [8]:
# Define the path to the PyDreamer dataset folder
dataset_folder = 'D:\\Tutorial\\Amir Kair University\\Article_Masters\\1_SelfCode\\Python\\EEG_ACTNN-main\\PyDreamer'

# Initialize an empty list to store stimuli data
stimuli_data = []

# Iterate over each subfolder (person1 to person23)
for person_folder in os.listdir(dataset_folder):
    if person_folder == 'GPT talk.txt' or person_folder == 'Info.txt':
        continue    
    person_path = os.path.join(dataset_folder, person_folder)
    
    # Read age, gender, and person ID data
    with open(os.path.join(person_path, 'age.txt'), 'r') as f:
        age = int(f.read().strip())
    with open(os.path.join(person_path, 'gender.txt'), 'r') as f:
        gender = f.read().strip()
    person_id = int(person_folder.lstrip('person'))  # extract person ID from folder name
    
    # Iterate over stimuli EEG data
    for clip_id in range(1, 19):
        # Read stimuli EEG data
        with open(os.path.join(person_path, f'stimuli_eeg{clip_id}.txt'), 'r') as f:
            eeg_data = [list(map(float, row.strip().split('\t')[1:])) for row in f.readlines()] # exclude the electrode names
        
        # Read emotion scores
        with open(os.path.join(person_path, 'valence.txt'), 'r') as f:
            valence = float(f.readline().strip().split('\t')[clip_id - 1])
        with open(os.path.join(person_path, 'arousal.txt'), 'r') as f:
            arousal = float(f.readline().strip().split('\t')[clip_id - 1])
        with open(os.path.join(person_path, 'dominance.txt'), 'r') as f:
            dominance = float(f.readline().strip().split('\t')[clip_id - 1])
        
        # Create a dictionary to store the stimuli data for this clip
        clip_data = {
            'Person_ID': person_id,
            'Clip_ID': clip_id,
            'Age': age,
            'Gender': gender,
            'Valence': valence,
            'Arousal': arousal,
            'Dominance': dominance
        }
        
        # Add EEG data to the dictionary
        for count, electrode in enumerate(['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4']):
            # clip_data[electrode] = [sample[i] for sample in eeg_data]
            clip_data[electrode] = eeg_data[count]

        
        # Append the dictionary to the list
        stimuli_data.append(clip_data)
        
        break # remove later analysis

    # remove for further using
    break # So I only store one data 


# Create a DataFrame for stimuli data
stimuli_df = pd.DataFrame(stimuli_data)


In [11]:
x_df = stimuli_df.drop(['Person_ID','Clip_ID','Age','Gender','Valence','Arousal','Dominance'], axis=1)
y_df = stimuli_df[['Valence','Arousal','Dominance']]

In [49]:
len(x_df.values[0][0])

25472

In [64]:
x_t = torch.tensor(np.zeros((1,1,14,25472)))
for i, item in enumerate(x_df.values[0]):
    x_t[0,0,i,:] = torch.tensor(item)

# len(x_df.values[0][1])
x_t

tensor([[[[4388.2051, 4375.8975, 4378.4614,  ..., 4371.2822, 4371.7949,
           4386.6665],
          [4102.5640, 4093.8462, 4091.2820,  ..., 4092.3076, 4089.7437,
           4098.9741],
          [4219.4873, 4252.8203, 4230.2563,  ..., 4237.4360, 4214.3589,
           4175.8975],
          ...,
          [4454.3589, 4466.6665, 4461.0259,  ..., 4475.3848, 4466.1538,
           4451.7949],
          [4326.1538, 4372.8203, 4328.2051,  ..., 4377.9487, 4351.7949,
           4302.5640],
          [4165.1284, 4247.1797, 4203.5898,  ..., 4230.2563, 4206.6665,
           4133.3335]]]], dtype=torch.float64)

In [65]:
torch.float

torch.float32

In [66]:
import torch
from utils import generate_TS_channel_order
from networks import TSception

#original_order = ['Fp1', 'AF3', 'F3', 'F7', 'FC5', 'FC1', 'C3', 'T7', 'CP5', 'CP1', 'P3', 'P7', 'PO3', 'O1', 'Oz', 'Pz', 'Fp2', 'AF4', 'Fz', 'F4', 'F8', 'FC6', 'FC2', 'Cz', 'C4', 'T8', 'CP6', 'CP2', 'P4', 'P8', 'PO4', 'O2']   # DEAP is used as an example
original_order = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6','F4', 'F8', 'AF4']

TS_order = generate_TS_channel_order(original_order)   # generate proper channel orders for the asymmetric spatial layer in TSception

#data = torch.randn(1, 1, 14, 512)   # (batch_size=1, cnn_channel=1, EEG_channel=14, data_points=25472)
data = x_t
data = data.float()  # Convert input tensor to float

idx = []
for chan in TS_order:
    idx.append(original_order.index(chan))
data = data[:, :, idx, :]    # (batch_size=1, cnn_channel=1, EEG_channel=14, data_points=25472) Some channels are not selected, hence EEG channel becomes 28.

TS = TSception(
    num_classes=3, 
    input_size=(1, 14, 25472),
    sampling_rate=128, 
    num_T=15,   # num_T controls the number of temporal filters
    num_S=15,   # num_S controls the size of hidden embedding due to the global average pooling. Please increase it if you need a larger model capacity, e.g., subject-independent case
    hidden=32,   
    dropout_rate=0.5
)

preds = TS(data)

In [67]:
preds

tensor([[-0.1571,  0.2670, -0.0390]], grad_fn=<AddmmBackward0>)

In [None]:
from cross_validation import *
from prepare_data_DEAP import *
import argparse

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    ######## Data ########
    parser.add_argument('--dataset', type=str, default='DEAP')
    parser.add_argument('--data-path', type=str, default='/home/dingyi/data/deap/')
    parser.add_argument('--subjects', type=int, default=32)
    parser.add_argument('--num-class', type=int, default=2, choices=[2, 3, 4])
    parser.add_argument('--label-type', type=str, default='A', choices=['A', 'V'])
    parser.add_argument('--segment', type=int, default=4, help='segment length in seconds')
    parser.add_argument('--trial-duration', type=int, default=60, help='trial duration in seconds')
    parser.add_argument('--overlap', type=float, default=0)
    parser.add_argument('--sampling-rate', type=int, default=128)
    parser.add_argument('--input-shape', type=tuple, default=(1, 28, 512))
    parser.add_argument('--data-format', type=str, default='raw')
    ######## Training Process ########
    parser.add_argument('--random-seed', type=int, default=2021)
    parser.add_argument('--max-epoch', type=int, default=500)  
    parser.add_argument('--batch-size', type=int, default=64)
    parser.add_argument('--learning-rate', type=float, default=1e-3)
    parser.add_argument('--dropout', type=float, default=0.5)

    parser.add_argument('--save-path', default='./save/')
    parser.add_argument('--load-path', default='./save/max-acc.pth')
    parser.add_argument('--gpu', default='0')
    parser.add_argument('--save-model', type=bool, default=True)
    ######## Model Parameters ########
    parser.add_argument('--model', type=str, default='TSception')
    parser.add_argument('--T', type=int, default=15)
    parser.add_argument('--graph-type', type=str, default='TS', choices=['TS', 'O'], 
                        help='TS for the channel order of TSception, O for the original channel order')
    parser.add_argument('--hidden', type=int, default=32)

    ######## Reproduce the result using the saved model ######
    parser.add_argument('--reproduce', action='store_true')
    args = parser.parse_args()

    sub_to_run = np.arange(args.subjects)

    pd = PrepareData(args)
    pd.run(sub_to_run, split=True, feature=False, expand=True)

    cv = CrossValidation(args)
    seed_all(args.random_seed)
    cv.n_fold_CV(subject=sub_to_run, fold=10, reproduce=args.reproduce)  # To do leave one trial out please set fold=40
