In [239]:
import h5py
import mne 
import math
import numpy as np 
import neurokit2 as nk
import pandas as pd
import pickle
import random 

from scipy import stats
from scipy.signal import welch
from os.path import exists
from sklearn.preprocessing import MinMaxScaler,StandardScaler


[5,
 19,
 16,
 13,
 18,
 14,
 4,
 3,
 8,
 15,
 17,
 22,
 1,
 21,
 9,
 23,
 20,
 6,
 11,
 12,
 10,
 2,
 7]

In [240]:
participants=23
train_percentage=0.7
subjects = list(range(1, participants + 1))
random.shuffle(subjects)
subjects=[5, 19, 16, 13,18,14, 4, 3, 8, 15, 17, 22, 1, 21, 9, 23, 20, 6, 11, 12, 10, 2, 7]

In [241]:
# Severe=3,normal=0,Moderate=2,light=1
def assign_labels_two_class(filepath):
    # Read the CSV file
    label = pd.read_csv(filepath)
    
    # Initialize a new column for labels
    label['labels'] = 0
    
    # Iterate over each row in the dataframe
    for i in range(len(label)):
        valence = label.loc[i, 'valence']
        arousal = label.loc[i, 'Arousal']
        
        # Classify as 'Anxious' or 'Normal' first
        if valence <= 5 and arousal >= 5:
            # Within the 'Anxious' category, apply further classification
            if 0 < valence <= 2 and 7 <= arousal <= 9:
                label.loc[i, 'labels'] = 1
            elif 2 < valence <= 4 and 6 <= arousal < 7:
                label.loc[i, 'labels'] = 1
            elif 4 < valence <= 5 and 5 <= arousal < 6:
                label.loc[i, 'labels'] = 0
    
    # Create a dataframe for the labels
    labels = pd.DataFrame(pd.concat([label['labels']]*2).sort_index(kind='merge'))
    labels.reset_index(inplace=True)
    labels.drop('index', axis=1, inplace=True)
    
    return labels

In [242]:
def load_dataset(subject):
    if exists ('./Preprocessed_data(mat)/S{}preprocessed.mat'.format(subject)):
        file='./Preprocessed_data(mat)/S{}preprocessed.mat'.format(subject)
        with h5py.File(file,'r') as file:
            first_key = list(file.keys())[0]
            data = np.array(file[first_key])
            data=data*1e-6
        return data

In [243]:
def change_shape(data,i,label):
    final=[]
    n_chunks,time_length,n_channels=data.shape
    label_start_index = (i - 1) * 12     # Logic for initializing the labels according to the subject numbers
    label_end_index = label_start_index + 12
    subject_labels = label.iloc[label_start_index:label_end_index].values
    data=data.reshape((n_chunks,n_channels,time_length))
    for j in range(len(data)):
        final.append((data[j],subject_labels[j][0]))
    return final

In [244]:
def filtering_and_channel_selection(data,sfreq=128,channel_name=['F7','F8'],all_channels = ['AF3','F7','F3','FC5','T7','P7','O1','O2','P8','T8','FC6','F4','F8','AF4'],l_freq = 4.0,h_freq = 45.0,new_sfreq=250):
    data_raw=[]
    for i in range(len(data)):
        a,b=data[i]
        info = mne.create_info(ch_names=all_channels, sfreq=sfreq, ch_types='eeg')
        raw=mne.io.RawArray(a,info)
        raw=raw.filter(l_freq=l_freq, h_freq=h_freq)
        raw=raw.pick_channels(channel_name)
        raw_resampled = raw.resample(new_sfreq)
        resampled_data = raw_resampled.get_data()
        data_raw.append((resampled_data,b))
    channel,time_length=resampled_data.shape
    return data_raw,channel,time_length

In [245]:
def chunks_division(data, split_sec,fs=250):
    window_size=split_sec*fs
    splitted_chunk=[]
    for value, label in data:
        s_f_chunk=[]
        value=np.array(value)
        a,b=value.shape
        n_chunks=b//window_size
        value=value[:,:n_chunks*window_size]
        for i in range(n_chunks):
            start=i*window_size
            end=start+window_size
            s_chunk=value[:,start:end]
            s_f_chunk.append((s_chunk,label))
        splitted_chunk.extend(s_f_chunk)
    return splitted_chunk

In [246]:
def fft(data, min_range=8,max_range=30,fs=250,time=1):
    featuress=[]
    time_range=fs*time
    minimum=int((min_range*time_range)/fs)
    maximum=int((max_range*time_range)/fs)
    for chunk,label in data:
        segg = []
        for i in range(chunk.shape[0]):
            fourier = np.abs(np.fft.fft(chunk[i, :]))
            half_fourier = fourier[:chunk.shape[1] // 2]
            selected_features = half_fourier[minimum:maximum]
            selected_features = selected_features / np.sum(selected_features)
            #segg.append(selected_features_normalized)
            segg.append(selected_features)
        final_concate = np.concatenate(segg, axis=0)
        featuress.append((final_concate,label))
    return featuress


In [247]:
#'AF3','FC5','P7','P8','FC6','AF4'
def preprocessing(indices,original_frequency=128,new_frequency=250,train_percent=0.8,channel_selection=['F7','F8'],window_size_time=1,min_freq_range=0,max_freq_range=30):
    file_path = 'labels.csv'
    labels = assign_labels_two_class(file_path)
    train_limit = math.ceil(train_percent * participants)    
    fft_class_train = []
    fft_class_test = []
    class_train = []
    class_test = []
    limit=1
    for i in indices:
        eeg_data=load_dataset(i)
        data = change_shape(eeg_data,i,label=labels) 
        data,_,_ = filtering_and_channel_selection(data, sfreq=original_frequency, channel_name=channel_selection,new_sfreq=new_frequency) 
        data=chunks_division(data,split_sec=window_size_time)
        fft_chunk = fft(data,time=window_size_time,min_range=min_freq_range,max_range=max_freq_range)
        if limit<train_limit:
            fft_class_train+=fft_chunk
            class_train+=data
        else:
            fft_class_test+=fft_chunk
            class_test+=data
        limit=limit+1
    return fft_class_train,fft_class_test,class_train,class_test

In [248]:
fft_class_train,fft_class_test,class_train,class_test=preprocessing(subjects,original_frequency=128,new_frequency=250,train_percent=0.8)

Creating RawArray with float64 data, n_channels=14, n_times=1920
    Range : 0 ... 1919 =      0.000 ...    14.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 4 - 45 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 4.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 3.00 Hz)
- Upper passband edge: 45.00 Hz
- Upper transition bandwidth: 11.25 Hz (-6 dB cutoff frequency: 50.62 Hz)
- Filter length: 213 samples (1.664 s)

NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Creating RawArray with float64 data, n_channels=14, n_times=1920
    Range : 0 ... 1919 =      0.000 ...    14.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 4 - 45 Hz

FIR filter paramet

Creating RawArray with float64 data, n_channels=14, n_times=1920
    Range : 0 ... 1919 =      0.000 ...    14.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 4 - 45 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 4.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 3.00 Hz)
- Upper passband edge: 45.00 Hz
- Upper transition bandwidth: 11.25 Hz (-6 dB cutoff frequency: 50.62 Hz)
- Filter length: 213 samples (1.664 s)

NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Creating RawArray with float64 data, n_channels=14, n_times=1920
    Range : 0 ... 1919 =      0.000 ...    14.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 4 - 45 Hz

FIR filter paramet

In [249]:
with open('./samples/fft_class_train.pkl', 'wb') as f:
    pickle.dump(fft_class_train, f)

with open('./samples/fft_class_test.pkl', 'wb') as f:
    pickle.dump(fft_class_test, f)