In [1]:
from utils import *
import pandas as pd
import mne
from glob import glob
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from sklearn.preprocessing import normalize, StandardScaler
plt.style.use('seaborn-whitegrid')

# PSS Score

In [3]:
def get_threshold(scores):
    scores = np.array(scores)
    lower_threshold = scores.mean() - (scores.std()/2)
    upper_threshold = scores.mean() + (scores.std()/2)
    return scores.mean(), (lower_threshold,upper_threshold)

def get_stress_type(score, grade):
    """ Non-stress (0): score < lower_threshold
        Neutral    (1): lower_threshold <= score <= upper_threshold
        Stress     (2): score > lower_threshold """
    if(score < grade[0]):
        return 0
    elif(score <= grade[1]):
        return 1
    elif(score > grade[1]):
        return 2

def PSS_printer(PSS):
    # peak at info
    temp = PSS.popitem()
    PSS[temp[0]] = temp[1]
    column = list(temp[1].keys())
    space = "\t\t"
    print(f"Name{space}",f"{space}".join(column),sep="" )
    print("="*60)
    for name, info in PSS.items():
        print(f"{name}{space}",sep="",end="")
        for col in column:
            print(f"{info[col]}{space}",end="")

        print()

def get_PSS():
    try:
        PSS = load('PSS')
        type_count = load('type_count')
    except:
        TYPE_DEF = {0:'Non-Stress', 1:'Neutral', 2: 'Stress'}
        PSS = dict()
        scores = []
        with open('./PSS_scores.csv','r') as f:
            f.readline() # skip header
            for line in f.readlines(): 
                name,score = line.split(',')
                PSS[name] = {'score':int(score)}
                scores.append(int(score))

        mean, grade = get_threshold(scores)
        print(f"Total={len(PSS)} | Mean={mean} | Lower Thres={grade[0]} | Higher Thres={grade[1]}")

        type_count = {0:0, 1:0, 2:0}
        for name, dict_info in PSS.items():
            label = get_stress_type(dict_info['score'], grade)
            dict_info['type'] = label
            dict_info['type_definition'] = TYPE_DEF[label]
            type_count[label] = type_count[label] + 1

        # print(f"Non Stress={type_count[0]} | Neutral={type_count[1]} | Stress={type_count[2]}")

        # PSS_printer(PSS)

        sampling_rate = 125 #Hz
        files = glob(f"data/*.csv")
        for index, f in enumerate(files):
            name = f.split('/')[1].split('__')[0]
            pd_raw = pd.read_csv(f, dtype={'Marker':str})
            pd_raw = pd_raw.drop(columns='timestamps')
            raw = dataframe_to_raw(pd_raw, sfreq=sampling_rate)
            PSS[name]['raw'] = raw
            print(f"{index} {name} | time: {len(pd_raw)/125}")

        save(PSS, 'PSS')
        save(type_count, 'type_count')
    finally:
        return PSS, type_count

PSS, type_count = get_PSS()

In [4]:
def get_freq(PSS):
    # peak at info
    temp = PSS.popitem()
    PSS[temp[0]] = temp[1]
    raw = temp[1]['raw']
    power,freq = mne.time_frequency.psd_welch(raw,n_fft=125, verbose=True)
    return freq


for name, info in PSS.items():
    raw = info['raw']
    raw.filter(l_freq=1,h_freq=None, method='iir', iir_params={'order':3.0, 'ftype':'butter'}, verbose=False) # Slow drift
    raw.notch_filter(freqs=[50])
    # epochs = mne.Epochs(raw, np.array([[125*60*1, 0, 1]]), tmin=0, tmax=30, baseline=(0,30), verbose=False)
    # print(name)

    # a = epochs.plot_psd(picks=['F3','F4','T3','T4'])
    # print("="*40)


freq = get_freq(PSS)
print(freq)


band_names = np.array(['Delta', 'Theta', 'Alpha', 'Beta', 'Gamma', 'Slow', 'Low_beta'])
filter_list = [[1,3],[4,7],[8,12],[13,30],[30,43], [4,13], [13,17]]
bands = []
for filt in filter_list:
    pt = np.argwhere((freq >= filt[0]) & (freq <= filt[1])).reshape(-1)
    bands.append(pt)
bands = np.array(bands)
print(bands)

Effective window size : 1.000 (s)
[ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14. 15. 16. 17.
 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
 54. 55. 56. 57. 58. 59. 60. 61. 62.]
[array([1, 2, 3]) array([4, 5, 6, 7]) array([ 8,  9, 10, 11, 12])
 array([13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
        30])
 array([30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43])
 array([ 4,  5,  6,  7,  8,  9, 10, 11, 12, 13])
 array([13, 14, 15, 16, 17])]


  bands = np.array(bands)


In [5]:
def get_markers():
    sampling_rate = 125 #Hz
    step_minutes = np.arange(0,5,0.25)
    print(f"{step_minutes=}")
    step_minutes = np.expand_dims(step_minutes * sampling_rate * 60, axis=1)
    markers = np.concatenate( [step_minutes, np.zeros( step_minutes.shape ), np.ones( step_minutes.shape ) ], axis=1  ).astype(np.int64)
    return markers
markers = get_markers()
# markers

step_minutes=array([0.  , 0.25, 0.5 , 0.75, 1.  , 1.25, 1.5 , 1.75, 2.  , 2.25, 2.5 ,
       2.75, 3.  , 3.25, 3.5 , 3.75, 4.  , 4.25, 4.5 , 4.75])


In [6]:
# features = None
for name,info in PSS.items():
    raw = info['raw']
    epochs = mne.Epochs(raw, markers, tmin=0, tmax=15, baseline=(0,15), verbose=False)
    feature_epochs = None
    for evoked in epochs.iter_evoked():
        feature = None
        slow, gamma = None, None
        a_f3, a_f4 = None, None
        a_t7, a_t8 = None, None
        b_f3, b_f4 = None, None
        b_t7, b_t8 = None, None
        for index, band in enumerate(bands):
            power,freq = mne.time_frequency.psd_welch(evoked,n_fft=125, verbose=False)
            power = power.squeeze()
            power = 10 * np.log10(power)
            data = power[::,band].mean(axis=1).reshape(1,-1)
            # for asym
            if(band_names[index] == 'Alpha'):
                a_f3 = data[:,raw.ch_names.index('F3')]
                a_f4 = data[:,raw.ch_names.index('F4')]
                # We use t3 as t7 and t4 as t8
                a_t7 = data[:,raw.ch_names.index('T3')]
                a_t8 = data[:,raw.ch_names.index('T4')]
            if(band_names[index] == 'Beta'):
                b_f3 = data[:,raw.ch_names.index('F3')]
                b_f4 = data[:,raw.ch_names.index('F4')]
                # We use t3 as t7 and t4 as t8
                b_t7 = data[:,raw.ch_names.index('T3')]
                b_t8 = data[:,raw.ch_names.index('T4')]

            ####### Mean for visualization #######
            data = data.mean().reshape(1,-1)
            # for relative gamma
            if(band_names[index] == 'Slow'): slow = data
            if(band_names[index] == 'Gamma'): gamma = data

            if(type(feature) == type(None)): feature = data
            else: feature = np.concatenate([feature, data], axis=1)

        # the eighth feature: relative gamma is slow/gamma
        relative_gamma = slow/gamma
        feature = np.concatenate([feature, relative_gamma], axis=1)
        # The asymetry
        alpha_frontal = ((a_f4 - a_f3) / (a_f4 + a_f3)).reshape(1,-1)
        feature = np.concatenate([feature, alpha_frontal], axis=1)
        # alpha_temporal
        alpha_temporal = ((a_t8 - a_t7) / (a_t8 + a_t7)).reshape(1,-1)
        feature = np.concatenate([feature, alpha_temporal], axis=1)
        # alpha_asymmetry
        alpha_asymmetry = alpha_frontal + alpha_temporal
        feature = np.concatenate([feature, alpha_asymmetry], axis=1)
        # beta_frontal
        beta_frontal = ((b_f4 - b_f3) / (b_f4 + b_f3)).reshape(1,-1)
        feature = np.concatenate([feature, beta_frontal], axis=1)
        # beta_temporal
        beta_temporal = ((b_t8 - b_t7) / (b_t8 + b_t7)).reshape(1,-1)
        feature = np.concatenate([feature, beta_temporal], axis=1)

        if(type(feature_epochs) == type(None)): feature_epochs = feature
        else: feature_epochs = np.concatenate( [feature_epochs, feature], axis=0 )
    info['feature'] = feature_epochs
print(f"{feature_epochs.shape=}")


feature_epochs.shape=(20, 13)


In [7]:
# plt.plot(feature)
feature_names = list(band_names)
feature_names.append('Relative_Gamma')
feature_names.append('Alpha_Frontal')
feature_names.append('Alpha_Temporal')
feature_names.append('Alpha_Asymmetry')
feature_names.append('Beta_Frontal')
feature_names.append('Beta_Temporal')
feature_names = np.array(feature_names)
feature_names[[3,10]]
X_ori,y_ori = [], []
filtered_participants = []
filtered_scored = []
non_stress_count, stress_count = 0,0
for index,(name,info) in enumerate(PSS.items()):
    for feature in info['feature']:
        # Neutral
        # if(info['type'] == 1): continue
        if(info['type'] == 1 or (info['score'] >= 17 and info['score'] <= 23)): continue
        # Non-Stress
        elif(info['type'] == 0):
            non_stress_count = non_stress_count + 1
            # print(name, info['score'])
            y_ori.append(0)
        # Stress
        elif(info['type'] == 2):
            stress_count = stress_count + 1
            y_ori.append(1)
        X_ori.append(feature)
        filtered_participants.append(name)
        filtered_scored.append(info['score'])
print(f"{non_stress_count=} {stress_count=}")
print(f"{np.array(X_ori).shape=}")
print(f"{np.array(y_ori).shape=}")

non_stress_count=260 stress_count=400
np.array(X_ori).shape=(660, 13)
np.array(y_ori).shape=(660,)


In [8]:
def check_float(data):
    if(data.dtype == np.int64): data = data.astype(np.float64)
    return data

def NormJa(data):
    data = check_float(data)
    for index, row in enumerate(data):
        min = row.min()
        max = row.max()
        # mean = row.mean()
        row = (row.astype(np.float64) - min) / float(max - min)
        data[index] = row
        # print(row)
    return data

def StandardJa(data):
    data = check_float(data)
    for index, row in enumerate(data):
        mean = row.mean()
        std = row.std()
        row = (row - mean) / std
        data[index] = row
        # print(row)
    return data

In [9]:
print(feature_names[8:12])
print(feature_names[9:12])
print(feature_names[10:12])
print(feature_names[10:11])

['Alpha_Frontal' 'Alpha_Temporal' 'Alpha_Asymmetry' 'Beta_Frontal']
['Alpha_Temporal' 'Alpha_Asymmetry' 'Beta_Frontal']
['Alpha_Asymmetry' 'Beta_Frontal']
['Alpha_Asymmetry']


In [10]:
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import cross_val_score
from sklearn.utils import shuffle
X,y = np.array(X_ori)[:,:], np.array(y_ori)
# X = normalize(X.copy(), axis=0)
# X = NormJa(X.copy().T).T
X = StandardJa(X.copy().T).T
X_shuff,y_shuff = shuffle(X,y)
print(X.shape, y.shape)

param_grid = dict(kernel=['linear','poly','rbf', 'sigmoid'])#,'precomputed'])
# cv = StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=42)
grid = GridSearchCV(SVC(), param_grid=param_grid, cv=5)
grid.fit(X, y)
print(f"The best parameters are {grid.best_params_} with a score of {grid.best_score_:.2f}")

model = SVC(kernel='rbf')
# model = GaussianNB()
model.fit(X_shuff, y_shuff)
ans = model.predict(X_shuff)
acc = sum(ans == y_shuff) / len(y_shuff)
cross = cross_val_score(model, X_shuff, y_shuff, cv=5)
print(acc, cross.mean(), cross)
print(ans)
# 0.8181818181818182 0.6666666666666666
# 0.7878787878787878 0.7285714285714285
# 0.7878787878787878 0.6714285714285715
# 0.8181818181818182 0.6666666666666666

(660, 13) (660,)
The best parameters are {'kernel': 'rbf'} with a score of 0.64
0.9090909090909091 0.8833333333333334 [0.83333333 0.90151515 0.89393939 0.87878788 0.90909091]
[0 1 0 1 1 1 0 1 0 1 0 1 1 1 0 1 1 0 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 0 1 0
 0 1 0 1 1 1 1 1 0 0 1 0 1 1 1 0 1 0 0 0 1 0 1 0 1 1 0 1 1 0 1 1 1 1 1 1 0
 0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 0 0 1
 1 1 0 1 1 0 1 1 1 1 1 1 1 1 1 0 0 1 0 1 0 1 1 0 1 1 1 1 0 0 1 0 0 0 1 1 1
 1 1 1 1 1 1 0 1 1 0 1 1 1 1 1 0 0 0 0 0 0 1 1 1 0 1 0 1 1 0 1 1 1 1 1 0 0
 0 1 1 1 0 1 1 1 0 1 1 1 1 0 1 1 1 1 1 1 0 0 1 1 0 1 1 1 1 1 0 1 1 0 1 0 1
 1 1 0 1 0 0 1 1 1 1 0 1 0 1 1 0 0 1 0 0 1 1 1 1 1 0 1 0 1 0 1 1 0 1 0 1 1
 1 1 0 1 0 1 1 1 0 1 0 0 1 1 1 1 1 0 1 0 1 1 1 1 0 0 1 1 1 1 1 0 1 1 1 1 0
 0 1 1 1 1 1 0 1 1 0 1 1 1 0 1 0 1 1 1 1 1 1 1 1 1 1 1 0 1 1 0 0 1 1 1 1 1
 1 1 1 1 1 1 0 1 0 0 0 1 1 1 0 1 1 0 1 0 1 0 1 1 0 1 1 0 1 0 1 1 0 1 1 0 1
 1 1 0 0 0 0 1 0 0 1 0 1 1 0 0 0 0 1 1 0 1 1 0 1 1 1 1 1 0 1 0 0 0 1 1 0 1


In [11]:
ans = model.predict(X)
acc = sum(ans == y) / len(y)
print(f"0: Non-Stress {type_count[0]}")
print(f"1: Stress {type_count[2]}")
print(f"Wrong\t|pred|label |Score |Name")
print("="*40)
for index,(i,j) in enumerate(zip(ans,y)):
    wrong = ""
    if(i != j):
        wrong = "X"
    print(f"{wrong}\t|{i}   |{j}     |{filtered_scored[index]}\t   |{filtered_participants[index]}")

0: Non-Stress 16
1: Stress 20
Wrong	|pred|label |Score |Name
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|0   |0     |15	   |amp
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |28	   |aui
	|1   |1     |