In [3]:

import numpy as np
import pyeeg as pe
import pickle as pickle
import pandas as pd
import math

from sklearn import svm
from sklearn.preprocessing import normalize
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import AdaBoostRegressor

import os
import tensorflow as tf
import time

In [4]:
channel = [1,2,3,4,6,11,13,17,19,20,21,25,29,31] #14 Channels chosen to fit Emotiv Epoch+
band = [4,8,12,16,25,45] #5 bands
window_size = 256 #Averaging band power of 2 sec
step_size = 16 #Each 0.125 sec update once
sample_rate = 128 #Sampling rate of 128 Hz
subjectList = ['01','02','03','04','05','06','07','08','09','10','11','12','13','14','15','16','17','18','19','20','21','22','23','24','25','26','27','28','29','30','31','32']
#List of subjects

In [5]:
def bin_power(X, Band, Fs):
    """Compute power in each frequency bin specified by Band from FFT result of
    X. By default, X is a real signal.

    Note
    -----
    A real signal can be synthesized, thus not real.

    Parameters
    -----------

    Band
        list

        boundary frequencies (in Hz) of bins. They can be unequal bins, e.g.
        [0.5,4,7,12,30] which are delta, theta, alpha and beta respectively.
        You can also use range() function of Python to generate equal bins and
        pass the generated list to this function.

        Each element of Band is a physical frequency and shall not exceed the
        Nyquist frequency, i.e., half of sampling frequency.

     X
        list

        a 1-D real time series.

    Fs
        integer

        the sampling rate in physical frequency

    Returns
    -------

    Power
        list

        spectral power in each frequency bin.

    Power_ratio
        list

        spectral power in each frequency bin normalized by total power in ALL
        frequency bins.

    """

    C = np.fft.fft(X)
    C = abs(C)
    Power = np.zeros(len(Band) - 1)
    for Freq_Index in range(0, len(Band) - 1):
        Freq = float(Band[Freq_Index])
        Next_Freq = float(Band[Freq_Index + 1])
        Power[Freq_Index] = sum(
            C[int(np.floor(Freq / Fs * len(X))): 
                int(np.floor(Next_Freq / Fs * len(X)))]
        )
    Power_Ratio = Power / sum(Power)
    return Power, Power_Ratio


In [6]:
def FFT_Processing (sub, channel, band, window_size, step_size, sample_rate):
    '''
    arguments:  string subject
                list channel indice
                list band
                int window size for FFT
                int step size for FFT
                int sample rate for FFT
    return:     void
    '''
    meta = []
    with open('dataset\s' + sub + '.dat', 'rb') as file:

        subject = pickle.load(file, encoding='latin1') #resolve the python 2 data problem by encoding : latin1

        for i in range (0,40):
            # loop over 0-39 trails
            data = subject["data"][i]
            labels = subject["labels"][i]
            start = 0;

            while start + window_size < data.shape[1]:
                meta_array = []
                meta_data = [] #meta vector for analysis
                for j in channel:
                    X = data[j][start : start + window_size] #Slice raw data over 2 sec, at interval of 0.125 sec
                    Y = bin_power(X, band, sample_rate) #FFT over 2 sec of channel j, in seq of theta, alpha, low beta, high beta, gamma
                    meta_data = meta_data + list(Y[0])

                meta_array.append(np.array(meta_data))
                meta_array.append(labels)

                meta.append(np.array(meta_array))    
                start = start + step_size
                
        meta = np.array(meta)
        np.save('out\s' + sub, meta, allow_pickle=True, fix_imports=True)

def testing (M, L, model):
    '''
    arguments:  M: testing dataset
                L: testing dataset label
                model: scikit-learn model

    return:     void
    '''
    output = model.predict(M[0:78080:32])
    label = L[0:78080:32]

    k = 0
    l = 0

    for i in range(len(label)):
        k = k + (output[i] - label[i])*(output[i] - label[i]) #square difference 

        #a good guess
        if (output[i] > 5 and label[i] > 5):
            l = l + 1
        elif (output[i] < 5 and label[i] <5):
            l = l + 1

    print ("l2 error:", k/len(label), "classification accuracy:", l / len(label),l, len(label))

In [7]:
for subjects in subjectList:
    FFT_Processing (subjects, channel, band, window_size, step_size, sample_rate)

  meta.append(np.array(meta_array))


In [9]:
#for subjects in subjectList:
data_training = []
label_training = []
data_testing = []
label_testing = []
data_validation = []
label_validation = []

for subjects in subjectList:

    with open('out\s' + subjects + '.npy', 'rb') as file:
        sub = np.load(file,allow_pickle=True)
        for i in range (0,sub.shape[0]):
            if i % 8 == 0:
                data_testing.append(sub[i][0])
                label_testing.append(sub[i][1])
            elif i % 8 == 1:
                data_validation.append(sub[i][0])
                label_validation.append(sub[i][1])
            else:
                data_training.append(sub[i][0])
                label_training.append(sub[i][1])

np.save('out\data_training', np.array(data_training), allow_pickle=True, fix_imports=True)
np.save('out\label_training', np.array(label_training), allow_pickle=True, fix_imports=True)
print("training dataset:", np.array(data_training).shape, np.array(label_training).shape)

np.save('out\data_testing', np.array(data_testing), allow_pickle=True, fix_imports=True)
np.save('out\label_testing', np.array(label_testing), allow_pickle=True, fix_imports=True)
print("testing dataset:", np.array(data_testing).shape, np.array(label_testing).shape)

np.save('out\data_validation', np.array(data_validation), allow_pickle=True, fix_imports=True)
np.save('out\label_validation', np.array(label_validation), allow_pickle=True, fix_imports=True)
print("validation dataset:", np.array(data_validation).shape, np.array(label_validation).shape)

training dataset: (468480, 70) (468480, 4)
testing dataset: (78080, 70) (78080, 4)
validation dataset: (78080, 70) (78080, 4)


In [10]:
with open('out\data_training.npy', 'rb') as fileTrain:
    X  = np.load(fileTrain)
    
with open('out\label_training.npy', 'rb') as fileTrainL:
    Y  = np.load(fileTrainL)
    
X = normalize(X)
Z = np.ravel(Y[:, [1]])

Arousal_Train = np.ravel(Y[:, [0]])
Valence_Train = np.ravel(Y[:, [1]])
Domain_Train = np.ravel(Y[:, [2]])
Like_Train = np.ravel(Y[:, [3]])



with open('out\data_validation.npy', 'rb') as fileTrain:
    M  = np.load(fileTrain)
    
with open('out\label_validation.npy', 'rb') as fileTrainL:
    N  = np.load(fileTrainL)

M = normalize(M)
L = np.ravel(N[:, [1]])

Arousal_Test = np.ravel(N[:, [0]])
Valence_Test = np.ravel(N[:, [1]])
Domain_Test = np.ravel(N[:, [2]])
Like_Test = np.ravel(N[:, [3]])

In [11]:
clf = svm.SVR()
clf.fit(X[0:468480:32], Z[0:468480:32])  


In [12]:
Val_R = RandomForestRegressor(n_estimators=512, n_jobs=6)
Val_R.fit(X[0:468480:32], Valence_Train[0:468480:32])
testing (M, Valence_Test, Val_R)

l2 error: 1.8834456735387388 classification accuracy: 0.8319672131147541 2030 2440


In [13]:
Aro_R = RandomForestRegressor(n_estimators=512, n_jobs=6)
Aro_R.fit(X[0:468480:32], Arousal_Train[0:468480:32])
testing (M, Arousal_Test, Aro_R)


l2 error: 2.082245762870198 classification accuracy: 0.8319672131147541 2030 2440


In [14]:
Dom_R = RandomForestRegressor(n_estimators=512, n_jobs=6)
Dom_R.fit(X[0:468480:32], Domain_Train[0:468480:32])
testing (M, Domain_Test, Dom_R)

l2 error: 1.8270069410769632 classification accuracy: 0.8200819672131148 2001 2440


In [15]:
Lik_R = RandomForestRegressor(n_estimators=512, n_jobs=6)
Lik_R.fit(X[0:468480:32], Like_Train[0:468480:32])
testing (M, Like_Test, Lik_R)

l2 error: 2.485309634352356 classification accuracy: 0.8508196721311475 2076 2440


In [16]:
clf = AdaBoostRegressor(n_estimators=5000, learning_rate=0.01)
clf.fit(X[0:468480:32], Z[0:468480:32])

In [17]:
output = Val_R.predict(M[0:78080:32])
label = L[0:78080:32]

k = 0
l = 0

for i in range(len(label)):
    k = k + (output[i] - label[i])*(output[i] - label[i]) #square difference 
    
    #a good guess
    if (output[i] > 5 and label[i] > 5):
        l = l + 1
    elif (output[i] < 5 and label[i] <5):
        l = l + 1

print ("l2 error:", k/len(label), "classification accuracy:", l / len(label),l, len(label))

l2 error: 1.8834456735387388 classification accuracy: 0.8319672131147541 2030 2440
