# 資料預處理

In [1]:
import os
os.chdir("C:\\Users\\zxpay\\Desktop\\BME_Senior\\EEG_CLF\\Emotion\\Code")
import sys
from Get_file import Get_file   # return dirpath, dirnames, filenames
from FFT import FFT, PlotDataOnTimeDomain
from BandPassFilter import BandPassFilter, PlotDataOnFreqDomain, BandPassListFilter
import pyedflib
import numpy as np
import datetime

import matplotlib.pyplot as plt
from sklearn.decomposition import FastICA, PCA

%matplotlib inline

data_path = "C:\\Users\\zxpay\\Desktop\\BME_Senior\\EEG_CLF\\Emotion\\database"  # EEG Data direction
BandPassICAFigureSavingDirection = "C:\\Users\\zxpay\\Desktop\\BME_Senior\\EEG_CLF\\Emotion\\Code\\Fig"

# Setting parameters
SampleFrequency = 128
DEBUG = True
LabelSize = 40
LowerFreqCut = 7.9
HigherFreqCut = 30.1
BandPassOrder = 3
channel_we_use = [2,3,4,5,6,7,8,9,10,11,12,13,14,15]   # 0~39 共40, 14 channels we use
SAVE_FIG = False


### 讀取檔案

In [2]:
#Four Lables
normal_label = []   # 0
scare_label = []    # 1
touch_label = []    # 2
laugh_label = []    # 3

#Four Signal
normal_signal = []
scare_signal = []
touch_signal = []
laugh_signal = []

signal = []
labels = []

_,data_files,_ = Get_file(data_path)
if data_files == []:
    raise("Not found any files ! check your data_path")
else:
    print('Get data from database ...')
    for data in data_files:
        _,_,file_names = Get_file(data_path+'\\'+data)
        for fn in file_names:
            if fn.split('.')[1] == 'edf':
                fedf = pyedflib.EdfReader(data_path+'\\'+data+'\\'+fn)
                n = fedf.signals_in_file
                signal_labels = np.asarray(fedf.getSignalLabels())
                sigbufs = np.zeros((n, fedf.getNSamples()[0]))
                for i in np.arange(n):
                    sigbufs[i, :] = fedf.readSignal(i)

                if 'normal' in fn:
                    normal_label.append(0)
                    normal_signal.append(sigbufs)
                elif 'scare' in fn:
                    scare_label.append(1)
                    scare_signal.append(sigbufs)
                elif 'touch' in fn:
                    touch_label.append(2)
                    touch_signal.append(sigbufs)
                elif 'laugh' in fn:
                    laugh_label.append(3)
                    laugh_signal.append(sigbufs)
                    
                if DEBUG:
                    print(sigbufs.shape)

''' Labels --> 40 channels
['COUNTER' 'INTERPOLATED' 'AF3' 'F7' 'F3' 'FC5' 'T7' 'P7' 'O1' 'O2' 'P8'
 'T8' 'FC6' 'F4' 'F8' 'AF4' 'RAW_CQ' 'GYROX' 'GYROY' 'MARKER'
 'MARKER_HARDWARE' 'SYNC' 'TIME_STAMP_s' 'TIME_STAMP_ms' 'CQ_AF3' 'CQ_F7'
 'CQ_F3' 'CQ_FC5' 'CQ_T7' 'CQ_P7' 'CQ_O1' 'CQ_O2' 'CQ_P8' 'CQ_T8' 'CQ_FC6'
 'CQ_F4' 'CQ_F8' 'CQ_AF4' 'CQ_CMS' 'CQ_DRL']
'''

print('Okay')
if DEBUG:
    print('normal signal length : ', len(normal_signal))
    print('scare signal length : ', len(scare_signal))
    print('touch signal length : ', len(touch_signal))
    print('laugh signal length : ', len(laugh_signal))


Get data from database ...
(40, 38400)
(40, 105344)
(40, 38400)
(40, 105344)
(40, 179328)
(40, 309248)
(40, 38400)
(40, 179328)
(40, 309760)
(40, 38400)
(40, 105728)
(40, 158976)
(40, 58112)
(40, 50944)
(40, 33152)
(40, 3968)
(40, 38400)
(40, 105344)
(40, 178944)
(40, 308992)
(40, 38400)
(40, 105728)
(40, 179840)
(40, 237312)
(40, 38400)
(40, 105600)
(40, 179584)
(40, 38400)
(40, 178944)
(40, 308864)
(40, 38400)
(40, 105344)
(40, 179072)
(40, 83328)
(40, 117632)
(40, 107392)
(40, 38400)
(40, 308992)
(40, 38400)
(40, 179072)
(40, 105344)
(40, 308736)
(40, 38400)
(40, 313472)
(40, 38400)
(40, 105344)
(40, 38400)
(40, 105344)
(40, 38528)
(40, 179200)
(40, 308736)
(40, 38400)
(40, 179328)
(40, 38400)
(40, 105344)
Okay
normal signal length :  17
scare signal length :  11
touch signal length :  10
laugh signal length :  17


In [3]:
def CheckListTheSameSize(ListData):
    di = 0
    dsum = 0
    for i in range(len(ListData)):
        if i == 0:
            pass
        else:
            dsum += (ListData[i].shape == ListData[i-1].shape)
            
    if np.sum(dsum) != (len(ListData)-1):
        return False
    else:
        return True
def CalculateZeros(NumpyArray):
    LogicWithZeros = (NumpyArray == 0)
    return np.sum(LogicWithZeros)

### Analyze steps : Filters ==> ICA ==> FFT ==> Build Model

In [18]:
# Filter, Each Data 128*5 second to as a period
# Return Data Shape : (DataNumbers, Channels, Features)
def BandPassFiveSeconds(SignalListData, LowerFreqCut, HigherFreq, fs):
    channel_we_use = [2,3,4,5,6,7,8,9,10,11,12,13,14,15]   # 0~39 共40, 14 channels we use
    SamplesPerFiveSeconds = 5 * fs
    SignalData = np.zeros([len(SignalListData), len(channel_we_use), SamplesPerFiveSeconds], dtype=np.float32)
    for i in range(len(SignalListData)):
        LabelNum = SignalListData[i].shape[0]
        FeaturesNum = SignalListData[i].shape[1]
        TimesToSplit = int(FeaturesNum/SamplesPerFiveSeconds)
        print('Features Numbers : ', FeaturesNum)
        print('Times to Split Numbers :', TimesToSplit)
        ChCNT = 0
        for ch in range(LabelNum):
            if ch in channel_we_use:
                if ch in channel_we_use:
                    ChannelBuffer = np.zeros([TimesToSplit, SamplesPerFiveSeconds], dtype=np.float32)
                    ChannelMean = np.zeros(SamplesPerFiveSeconds, dtype=np.float32)
                    Times = 0
                    for t in range(TimesToSplit):
                        try:
                            ChannelBuffer[t][:] = BandPassFilter(SignalListData[i][ch][Times:Times+SamplesPerFiveSeconds],
                                                                 LowerFreqCut, HigherFreqCut, fs)
                            Times += SamplesPerFiveSeconds
                        except Exception as e:
                            pass
                    ChannelMean = np.mean(ChannelBuffer, axis=0)
                    SignalData[i][ChCNT][:] = ChannelMean
                ChCNT += 1
    print()
    if CalculateZeros(SignalData) == 0:
        return SignalData
    else:
        print("Warning: Have Zero Values in the Numpy array !!!")
        return SignalData

NormalSignalArray = BandPassFiveSeconds(normal_signal, LowerFreqCut, HigherFreqCut, SampleFrequency)
ScareSignalArray = BandPassFiveSeconds(scare_signal, LowerFreqCut, HigherFreqCut, SampleFrequency)
TouchSignalArray = BandPassFiveSeconds(touch_signal, LowerFreqCut, HigherFreqCut, SampleFrequency)
LaughSignalArray = BandPassFiveSeconds(laugh_signal, LowerFreqCut, HigherFreqCut, SampleFrequency)

if DEBUG:
    print('Normal Data shape:', NormalSignalArray.shape)
    print('Scare Data shape:', ScareSignalArray.shape)
    print('Touch Data shape:', TouchSignalArray.shape)
    print('Laugh Data shape:', LaughSignalArray.shape)

Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38528
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60
Features Numbers :  38400
Times to Split Numbers : 60

Features Numbers :  105344
Times to Split Numbers : 164
Features Numbers :  10534