In [1]:
import numpy as np
import pywt

import seaborn as sns #绘制confusion matrix heatmap

import os
import scipy.io as sio


from python_speech_features import mfcc as pmfcc

import tqdm

import matplotlib.pyplot as plt
%matplotlib inline


In [None]:
pmfcc()

In [2]:
import warnings
warnings.simplefilter('ignore') #忽略警告

In [3]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier

import xgboost

  from numpy.core.umath_tests import inner1d


In [4]:
sample_rate = 256
origin_channel = 16


SAMPLE_CHANNEL = ['Pz' , 'PO3' , 'PO4' , 'O1' , 'O2' , 'Oz' , 'O9' , 'FP2' ,
                  'C4' , 'C6' , 'CP3' , 'CP1' ,
                  'CPZ' , 'CP2' , 'CP4' , 'PO8']

LABEL2STR = {0:'sen' , 1:'hong' , 2:'zhao',
             3:'fen' , 4:'xiao' , 5:'yu' , 
             6:'bin' , 7:'wang' , 8:'wei' , 
             9:'fei'}

CLIP_FORWARD = 1 #首部裁掉时间
CLIP_BACKWARD = 1 #尾部裁掉时间

trial_time = 3 #segment second


#是否进行归一化
#reference:a study on performance increasing in ssvep based bci application
#IS_NORMALIZE = True

#是否进行滤波
#IS_FILTER = False
#EEG频率范围
#reference:a study on performance increasing in ssvep based bci application
LO_FREQ = 0.5
HI_FREQ = 40

#是否陷波
#IS_NOTCH = False
NOTCH_FREQ = 50 #陷波 工频


In [5]:
def load_data(filename):

    data = sio.loadmat(file_name=filename)['data_received'] #length*16 matrix

    data = data[CLIP_FORWARD * sample_rate : - CLIP_BACKWARD * sample_rate] #首部 尾部 进行裁剪
   
    return data 

In [6]:
#data = load_data('./circle/0/10/1.mat')
#amfcc = pmfcc(data[: , 0] , samplerate=256)
#plt.plot(amfcc.T[5 , :])

In [7]:
#amfcc.shape #原数据的长度除以3 为1536

In [8]:
#data[: , 0].shape

In [6]:
def separate(data , label , overlap_length = 128):
    '''
    最长重叠长度为size长 256*3 个数据点
    '''
    train_data = []
    train_labels = []

    size = sample_rate * trial_time #一小段 256*3 个数据点
    data_length = data.shape[0]

    idx = 0

    while idx<=data_length-size:
        train_data.append(data[idx : idx+size , :])
        train_labels.append(label)

        idx = idx + (size - overlap_length)

    return np.array(train_data) , np.array(train_labels)

In [7]:
def train_val(data , ratio = 0.9):
    '''
    将数据分为 训练集 和 验证集
    '''
    
    seg = int(ratio * data.shape[0])
    
    return data[ : seg] , data[seg : ]

def shuffle_t_v(filenames):
    np.random.shuffle(filenames)
    
    return filenames

def combine(freq = 10):
    '''
    训练数据与验证数据
    :freq: 指定闪烁的频率
    
    '''
    
    if freq not in [10 , 15 , 20 , 25]:
        print('freq must in 10,15,20,25')
        return 
    
    ratio = 0.9 #训练集的占比
    overlap_length = 2*256 #重叠2秒数据
    
    #保证随机性 进行置乱
    person_0_filenames = shuffle_t_v( os.listdir('circle/0/%s/' % freq) )
    person_1_filenames = shuffle_t_v( os.listdir('circle/1/%s/' % freq) )
    person_2_filenames = shuffle_t_v( os.listdir('circle/2/%s/' % freq) )
    person_3_filenames = shuffle_t_v( os.listdir('circle/3/%s/' % freq) )
    person_4_filenames = shuffle_t_v( os.listdir('circle/4/%s/' % freq) )
    person_5_filenames = shuffle_t_v( os.listdir('circle/5/%s/' % freq) )
    person_6_filenames = shuffle_t_v( os.listdir('circle/6/%s/' % freq) )
    person_7_filenames = shuffle_t_v( os.listdir('circle/7/%s/' % freq) )
    person_8_filenames = shuffle_t_v( os.listdir('circle/8/%s/' % freq) )
    person_9_filenames = shuffle_t_v( os.listdir('circle/9/%s/' % freq) )

    #打开信号文件 并 合并
    person_0 = np.concatenate([load_data('circle/0/%s/' % freq + filename) for filename in person_0_filenames] , axis = 0)
    person_1 = np.concatenate([load_data('circle/1/%s/' % freq + filename) for filename in person_1_filenames] , axis = 0)
    person_2 = np.concatenate([load_data('circle/2/%s/' % freq + filename) for filename in person_2_filenames] , axis = 0)
    person_3 = np.concatenate([load_data('circle/3/%s/' % freq + filename) for filename in person_3_filenames] , axis = 0)
    person_4 = np.concatenate([load_data('circle/4/%s/' % freq + filename) for filename in person_4_filenames] , axis = 0)
    person_5 = np.concatenate([load_data('circle/5/%s/' % freq + filename) for filename in person_5_filenames] , axis = 0)
    person_6 = np.concatenate([load_data('circle/6/%s/' % freq + filename) for filename in person_6_filenames] , axis = 0)
    person_7 = np.concatenate([load_data('circle/7/%s/' % freq + filename) for filename in person_7_filenames] , axis = 0)
    person_8 = np.concatenate([load_data('circle/8/%s/' % freq + filename) for filename in person_8_filenames] , axis = 0)
    person_9 = np.concatenate([load_data('circle/9/%s/' % freq + filename) for filename in person_9_filenames] , axis = 0)
    
    person_0_train , person_0_val = train_val(person_0)
    person_1_train , person_1_val = train_val(person_1)
    person_2_train , person_2_val = train_val(person_2)
    person_3_train , person_3_val = train_val(person_3)
    person_4_train , person_4_val = train_val(person_4)
    person_5_train , person_5_val = train_val(person_5)
    person_6_train , person_6_val = train_val(person_6)
    person_7_train , person_7_val = train_val(person_7)
    person_8_train , person_8_val = train_val(person_8)
    person_9_train , person_9_val = train_val(person_9)
    
    #数据分段阶段
    
    #============
    #训练数据分段
    train_person_data_0 , train_person_labels_0 = separate(person_0_train , label = 0 , overlap_length=overlap_length)
    train_person_data_1 , train_person_labels_1 = separate(person_1_train , label = 1 , overlap_length=overlap_length)
    train_person_data_2 , train_person_labels_2 = separate(person_2_train , label = 2 , overlap_length=overlap_length)
    train_person_data_3 , train_person_labels_3 = separate(person_3_train , label = 3 , overlap_length=overlap_length)
    train_person_data_4 , train_person_labels_4 = separate(person_4_train , label = 4 , overlap_length=overlap_length)
    train_person_data_5 , train_person_labels_5 = separate(person_5_train , label = 5 , overlap_length=overlap_length)
    train_person_data_6 , train_person_labels_6 = separate(person_6_train , label = 6 , overlap_length=overlap_length)
    train_person_data_7 , train_person_labels_7 = separate(person_7_train , label = 7 , overlap_length=overlap_length)
    train_person_data_8 , train_person_labels_8 = separate(person_8_train , label = 8 , overlap_length=overlap_length)
    train_person_data_9 , train_person_labels_9 = separate(person_9_train , label = 9 , overlap_length=overlap_length)

    #合并数据
    train_data = np.concatenate((train_person_data_0 , train_person_data_1 , train_person_data_2 ,
                                 train_person_data_3 , train_person_data_4 , train_person_data_5 ,
                                 train_person_data_6 , train_person_data_7 , train_person_data_8 ,
                                 train_person_data_9 ))
    
    train_labels = np.concatenate((train_person_labels_0 , train_person_labels_1 , train_person_labels_2 ,
                                   train_person_labels_3 , train_person_labels_4 , train_person_labels_5 ,
                                   train_person_labels_6 , train_person_labels_7 , train_person_labels_8 ,
                                   train_person_labels_9 ))
    
    #产生索引并置乱
    idx_train_data = list(range(train_data.shape[0]))
    np.random.shuffle(idx_train_data)

    #将训练数据置乱
    train_data = train_data[idx_train_data]
    train_labels = train_labels[idx_train_data]
    
    #============
    #验证数据分段
    val_person_data_0 , val_person_labels_0 = separate(person_0_val , label = 0 , overlap_length=0)
    val_person_data_1 , val_person_labels_1 = separate(person_1_val , label = 1 , overlap_length=0)
    val_person_data_2 , val_person_labels_2 = separate(person_2_val , label = 2 , overlap_length=0)
    val_person_data_3 , val_person_labels_3 = separate(person_3_val , label = 3 , overlap_length=0)
    val_person_data_4 , val_person_labels_4 = separate(person_4_val , label = 4 , overlap_length=0)
    val_person_data_5 , val_person_labels_5 = separate(person_5_val , label = 5 , overlap_length=0)
    val_person_data_6 , val_person_labels_6 = separate(person_6_val , label = 6 , overlap_length=0)
    val_person_data_7 , val_person_labels_7 = separate(person_7_val , label = 7 , overlap_length=0)
    val_person_data_8 , val_person_labels_8 = separate(person_8_val , label = 8 , overlap_length=0)
    val_person_data_9 , val_person_labels_9 = separate(person_9_val , label = 9 , overlap_length=0)
    
    #合并数据
    val_data = np.concatenate((val_person_data_0 , val_person_data_1 , val_person_data_2 ,
                               val_person_data_3 , val_person_data_4 , val_person_data_5 ,
                               val_person_data_6 , val_person_data_7 , val_person_data_8 ,
                               val_person_data_9 ))
    
    val_labels = np.concatenate((val_person_labels_0 , val_person_labels_1 , val_person_labels_2 ,
                                 val_person_labels_3 , val_person_labels_4 , val_person_labels_5 ,
                                 val_person_labels_6 , val_person_labels_7 , val_person_labels_8 ,
                                 val_person_labels_9 ))
    
    #产生索引并置乱
    idx_val_data = list(range(val_data.shape[0]))
    np.random.shuffle(idx_val_data)

    #将训练数据置乱
    val_data = val_data[idx_val_data]
    val_labels = val_labels[idx_val_data]
    
    return train_data , train_labels , val_data , val_labels

def shuffle(data , labels):
    idx_data = list(range(data.shape[0]))
    np.random.shuffle(idx_data)

    data = data[idx_data]
    labels = labels[idx_data]
    
    return data , labels

In [12]:
train_X_ , train_y , val_X_ , val_y = combine(freq = 10) #10 15 20 25 hz

In [13]:
print(train_X_.shape , train_y.shape , val_X_.shape , val_y.shape)

(1590, 768, 16) (1590,) (50, 768, 16) (50,)


In [8]:
def mfcc(data):
    
    X = []
   
    for i in range(16):
        X.append( pmfcc( data[: , i] , samplerate=256).reshape([1 , -1]).squeeze() )
        
    return np.array( X ).reshape([1 , -1]).squeeze()

In [9]:
def feature_extraction_mfcc(data):
    X = []
    
    for datum in data:
        X.append( mfcc(datum) )

    return np.array(X)

In [None]:
train_X = feature_extraction_mfcc(train_X_)
val_X = feature_extraction_mfcc(val_X_)

In [16]:
print(train_X.shape , train_y.shape , val_X.shape , val_y.shape)

In [10]:
for freq in [10 , 15 , 20 , 25]:
    
    rf_score = []
    knn_score = []
    gnb_score = []
    train_X_ , train_y , val_X_ , val_y = combine(freq = freq)
    
    for i in tqdm.tqdm( range(20) ):

        train_X_ , train_y = shuffle(train_X_ , train_y)
        val_X_ , val_y = shuffle(val_X_ , val_y)
        
        train_X = feature_extraction_mfcc(train_X_)
        val_X = feature_extraction_mfcc(val_X_)

        rf = RandomForestClassifier()
        rf.fit(train_X , train_y)
        rf_score.append( rf.score(val_X , val_y) )
        
        knn = KNeighborsClassifier()
        knn.fit(train_X , train_y)
        knn_score.append( knn.score(val_X , val_y) )
        
        gnb = GaussianNB()
        gnb.fit(train_X , train_y)
        gnb_score.append( gnb.score(val_X , val_y) )
    
    print('freq:%d rf:%f knn:%f gnb:%f' % (freq , np.average(rf_score) , np.average(knn_score) , np.average(gnb_score)))

100%|█████████████████████████████████████████████████████████████████████████████████| 20/20 [45:29<00:00, 148.58s/it]


freq:10 rf:1.000000 knn:0.900000 gnb:0.983333


100%|█████████████████████████████████████████████████████████████████████████████████| 20/20 [49:18<00:00, 147.82s/it]


freq:15 rf:1.000000 knn:0.900000 gnb:1.000000


100%|█████████████████████████████████████████████████████████████████████████████████| 20/20 [48:53<00:00, 140.39s/it]


freq:20 rf:1.000000 knn:1.000000 gnb:0.983333


100%|█████████████████████████████████████████████████████████████████████████████████| 20/20 [43:16<00:00, 117.88s/it]


freq:25 rf:1.000000 knn:0.900000 gnb:1.000000


1.0

0.9

In [None]:
# xgb=xgboost.XGBClassifier()
# xgb.fit(train_X , train_y)
# xgb.score(val_X , val_y)

1.0

In [None]:
# mlp = MLPClassifier()
# mlp.fit(train_X , train_y)
# mlp.score(val_X , val_y)

0.9