In [1]:
# 引入 library
import scipy.io as sio
import matplotlib.pyplot as plt
import numpy as np

In [2]:
# 讀取資料
path = 'S_All_A1_E1/'
mat = 'S1_A1_E1'
data = []
for i in range(1,28,1):
    file = path + 'S' + str(i) + '_A1_E1'
    data.append(sio.loadmat(file))
    plt.close()
print('資料讀取完畢！')

資料讀取完畢！


In [3]:
# 讀取受試者資料
# subject_emg[受試者編號][emg編號][訊號]
# subject_emg[受試者編號][狀態]
subject_emg = []
subject_restimulus = []
# 選擇受試者
subject_ID_begin = 0
subject_ID_end = 1
for sub_data in enumerate(data[subject_ID_begin:subject_ID_end]):
    print('第' + str(sub_data[0]+1) + '受試者資料讀取')
    subject_emg.append(sub_data[1]['emg'].T)
    subject_restimulus.append(sub_data[1]['restimulus'])
print('受試者資料讀取完畢！')

第1受試者資料讀取
受試者資料讀取完畢！


In [4]:
# 設定窗口大小與位移量(單位:sample point)
sliding_window_size = 80
sliding_duration = 40

In [5]:
# 資料預處理
import math
import pywt

x_data = []
y_data = []
count = 0
for emg, restimulus in zip(subject_emg, subject_restimulus):
    count += 1
    print('第' + str(count) + '位受試者資料預處理')
    total_len = len(restimulus)
    # math.ceil -> 無條件進位
    sliding_times = math.ceil((total_len - sliding_window_size) / sliding_duration) + 1

    # 資料分割 + 特徵提取
    window_begin = 0
    for i in range(sliding_times):
    #   特徵提取
        feature_matrix = []
        for e in emg:
            emg_segment = e[window_begin:window_begin+sliding_window_size]
        #   使用多階小波包轉換
        #   小波包基底: db5
        #   層數: 4
            wp = pywt.WaveletPacket(data=emg_segment, wavelet='db5', mode='symmetric', maxlevel=4)
        #   對第四層每一節點做能量值計算
            wavelet_energy = []
            for j in [node.path for node in wp.get_level(wp.maxlevel, 'natural')]:
                wavelet_energy.append(np.sum( (np.array(wp[j].data)) ** 2 ))
            feature_matrix.append(wavelet_energy)
        x_data.append(feature_matrix)
    #   標標籤
        restimulus_segment = restimulus[window_begin:window_begin+sliding_window_size]
    #   np.sqeeze()把矩陣內的單維向量的框框消掉
        counts = np.bincount(np.squeeze(restimulus_segment))
        #返回眾數(注意:此方法只有非負數列才可使用)
        label_action_ID = np.argmax(counts)
        y_data.append(label_action_ID)
        window_begin = window_begin + sliding_duration
print('資料預處理完畢！共', len(x_data), '筆資料')
print('資料標籤數量分布：', np.bincount(np.squeeze(y_data)))

第1位受試者資料預處理
資料預處理完畢！共 2525 筆資料
資料標籤數量分布： [1588   94   62  104   76   96   78   63   77   64   80   62   81]


In [6]:
# 讓每一種標籤的資料都平均一點
import random
x_filter_data = []
y_filter_data = []
for i in range(len(x_data)):
    if y_data[i] == 0:
        if random.randint(1,15) == 1:
            x_filter_data.append(x_data[i])
            y_filter_data.append(y_data[i])
    else:
        x_filter_data.append(x_data[i])
        y_filter_data.append(y_data[i])
x_data = x_filter_data
y_data = y_filter_data
del x_filter_data
del y_filter_data
print('資料篩選完成')
print('資料數量： x -> ', len(x_data), ', y ->', len(y_data))
print('資料標籤分佈：', np.bincount(np.squeeze(y_data)))

資料篩選完成
資料數量： x ->  1055 , y -> 1055
資料標籤分佈： [118  94  62 104  76  96  78  63  77  64  80  62  81]


In [7]:
# 正規化
from sklearn import preprocessing
# 34095 x 10 x 16
x_data = list(preprocessing.scale(np.array(x_data).reshape((-1, 16))).reshape(-1, 10, 16))

In [8]:
for i in range(len(x_data)):
    x_data[i] = [x_data[i]]

In [9]:
import torch
import torch.utils.data as Data

# 訓練批次
EPOCH = 200
# 一個Batch的樣本數
BATCH_SIZE = 1024
# Learning rate
LR = 0.0005

# 先转换成 torch 能识别的 Dataset
torch_dataset = Data.TensorDataset(torch.FloatTensor(x_data), torch.LongTensor(y_data))

# 把 dataset 放入 DataLoader
loader = Data.DataLoader(
    dataset=torch_dataset,      # torch TensorDataset format
    batch_size=BATCH_SIZE,      # mini batch size
    shuffle=True,               # 要不要打乱数据 (打乱比较好)
)

In [10]:
import torch.nn as nn
import torchvision
import torch.nn.functional as F
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        # nn.Sequential()可以快速搭建神經網路
        # 卷積運算所使用的mask一般稱為kernel map，這邊為5x5的map
        # stride決定kernel map一次要走幾格
        # 上面用5x5的kernel map去跑28x28的圖片，卷積完會只剩下26x26，故加兩層
        # zero-padding 讓圖片大小相同
        self.conv1 = nn.Sequential( # input shape(channel=1, height=28, weight=28)
            nn.Conv2d(
                in_channels = 1, # 輸入信號的通道
                out_channels = 4, # 卷積產生的通道
                kernel_size = (3, 5), # 卷積核的尺寸
                stride = 1, # 卷積步長
                padding = (1,2) # 輸入的每一條邊補充0的層數
            ),  # output shape(4, 10, 16)
            nn.ReLU()
        )
        self.conv2 = nn.Sequential(
            # input shape(4, 10, 16)
            nn.Conv2d(4, 8, (3,5), 1, (1,2)), # output shape(8, 10, 16)
            nn.ReLU()
        )
        self.hidden = nn.Linear(8*10*16, 256)
        self.hidden2 = nn.Linear(256, 256)
        self.hidden3 = nn.Linear(256, 256)
        self.out = nn.Linear(256, 13) # fully connected layer
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1) # 展平多維卷積圖層 (batch_size, 32*10*16)
        x = self.hidden(x)
        x = self.hidden2(x)
        x = self.hidden3(x)
        output = F.softmax(self.out(x))
        return output
    
cnn = CNN()
print(cnn)

CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 4, kernel_size=(3, 5), stride=(1, 1), padding=(1, 2))
    (1): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(4, 8, kernel_size=(3, 5), stride=(1, 1), padding=(1, 2))
    (1): ReLU()
  )
  (hidden): Linear(in_features=1280, out_features=256, bias=True)
  (hidden2): Linear(in_features=256, out_features=256, bias=True)
  (hidden3): Linear(in_features=256, out_features=256, bias=True)
  (out): Linear(in_features=256, out_features=13, bias=True)
)


In [11]:
# optimize all cnn parameters
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)
# the target label is not one-hotted
# pytorch 的 CrossEntropyLoss 會自動把張量轉為 one hot形式
loss_func = nn.CrossEntropyLoss()

# training and testing
for epoch in range(EPOCH):
    print('第', epoch, '次訓練')
    # enumerate : 枚舉可列舉對象．ex.
    # A = [a, b, c]
    # list(enumerate(A)) = [(0,a), (1,b), (2,c)]
    for step, (b_x, b_y) in enumerate(loader):
        output = cnn(b_x)
        loss = loss_func(output, b_y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if (step % 50 == 0):
            print('step:' + str(step))
            print('loss:' + str(loss))
print('訓練完成！！')

第 0 次訓練




step:0
loss:tensor(2.5648, grad_fn=<NllLossBackward>)
第 1 次訓練
step:0
loss:tensor(2.5580, grad_fn=<NllLossBackward>)
第 2 次訓練
step:0
loss:tensor(2.5448, grad_fn=<NllLossBackward>)
第 3 次訓練
step:0
loss:tensor(2.5281, grad_fn=<NllLossBackward>)
第 4 次訓練
step:0
loss:tensor(2.5228, grad_fn=<NllLossBackward>)
第 5 次訓練
step:0
loss:tensor(2.5119, grad_fn=<NllLossBackward>)
第 6 次訓練
step:0
loss:tensor(2.4975, grad_fn=<NllLossBackward>)
第 7 次訓練
step:0
loss:tensor(2.4888, grad_fn=<NllLossBackward>)
第 8 次訓練
step:0
loss:tensor(2.4686, grad_fn=<NllLossBackward>)
第 9 次訓練
step:0
loss:tensor(2.4459, grad_fn=<NllLossBackward>)
第 10 次訓練
step:0
loss:tensor(2.4309, grad_fn=<NllLossBackward>)
第 11 次訓練
step:0
loss:tensor(2.4014, grad_fn=<NllLossBackward>)
第 12 次訓練
step:0
loss:tensor(2.3749, grad_fn=<NllLossBackward>)
第 13 次訓練
step:0
loss:tensor(2.3725, grad_fn=<NllLossBackward>)
第 14 次訓練
step:0
loss:tensor(2.3612, grad_fn=<NllLossBackward>)
第 15 次訓練
step:0
loss:tensor(2.3422, grad_fn=<NllLossBackward>)
第 16 次訓練
s

step:0
loss:tensor(2.1132, grad_fn=<NllLossBackward>)
第 131 次訓練
step:0
loss:tensor(2.1114, grad_fn=<NllLossBackward>)
第 132 次訓練
step:0
loss:tensor(2.1036, grad_fn=<NllLossBackward>)
第 133 次訓練
step:0
loss:tensor(2.0996, grad_fn=<NllLossBackward>)
第 134 次訓練
step:0
loss:tensor(2.0960, grad_fn=<NllLossBackward>)
第 135 次訓練
step:0
loss:tensor(2.1051, grad_fn=<NllLossBackward>)
第 136 次訓練
step:0
loss:tensor(2.1040, grad_fn=<NllLossBackward>)
第 137 次訓練
step:0
loss:tensor(2.0924, grad_fn=<NllLossBackward>)
第 138 次訓練
step:0
loss:tensor(2.0888, grad_fn=<NllLossBackward>)
第 139 次訓練
step:0
loss:tensor(2.0845, grad_fn=<NllLossBackward>)
第 140 次訓練
step:0
loss:tensor(2.0867, grad_fn=<NllLossBackward>)
第 141 次訓練
step:0
loss:tensor(2.0871, grad_fn=<NllLossBackward>)
第 142 次訓練
step:0
loss:tensor(2.0837, grad_fn=<NllLossBackward>)
第 143 次訓練
step:0
loss:tensor(2.0850, grad_fn=<NllLossBackward>)
第 144 次訓練
step:0
loss:tensor(2.0917, grad_fn=<NllLossBackward>)
第 145 次訓練
step:0
loss:tensor(2.0859, grad_fn=<NllL

In [12]:
from sklearn import metrics
begin = 0
end = len(y_data)
# end = 500
test_output = cnn(torch.FloatTensor(x_data[begin:end]))
pred_y = torch.max(test_output, 1)[1].data.numpy().squeeze()
y_cut_data = y_data[begin:end]
# 混淆矩陣
confusion_matrix = metrics.confusion_matrix(y_cut_data,pred_y)
print('混淆矩陣')
print('true/predict')
print(confusion_matrix)
classification_report = metrics.classification_report(y_cut_data,pred_y)
print('\n<============================>\n\n分類報告')
print(classification_report)

print('<============================>\n\n分類準確率')
correct = 0
for i in range(len(y_cut_data)):
    if y_cut_data[i] == pred_y[i]:
        correct += 1
print('測試資料數：', len(y_cut_data), ', 預測正確數：', correct, '準確率：', (correct/len(y_cut_data)))

混淆矩陣
true/predict
[[90  2  1  7  0  6  0  2  0  0 10  0  0]
 [ 5 66  4 11  0  3  0  0  0  1  2  0  2]
 [ 2  3 53  1  0  0  0  0  0  0  0  0  3]
 [ 7 15  2 63  0 11  0  0  0  0  3  0  3]
 [ 0  8  0  1 61  5  0  0  0  1  0  0  0]
 [ 0  0  0  1  0 95  0  0  0  0  0  0  0]
 [ 1  6  3  8  5 13 42  0  0  0  0  0  0]
 [ 0  0  0  0  4  2  0 56  0  0  1  0  0]
 [ 7  7  0  9  5 42  2  1  0  0  4  0  0]
 [ 0  3  0  0  0  0  0  0  0 59  0  0  2]
 [26  2  0  0  1 13  2  0  0  0 36  0  0]
 [ 2  0  0  0  0  0  2  0  0 49  0  0  9]
 [45  2  5  0  0  0  0  0  0  3  0  0 26]]


分類報告
              precision    recall  f1-score   support

           0       0.49      0.76      0.59       118
           1       0.58      0.70      0.63        94
           2       0.78      0.85      0.82        62
           3       0.62      0.61      0.61       104
           4       0.80      0.80      0.80        76
           5       0.50      0.99      0.66        96
           6       0.88      0.54      0.67      

  'precision', 'predicted', average, warn_for)


In [18]:
z = np.array([1,2,3,4,5,6,7,8,9,10,11,12])
zz = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
print(z)
print('\n')
print(zz)
print('\n')
print(preprocessing.scale(z))
print('\n')
print(preprocessing.scale(zz))


[ 1  2  3  4  5  6  7  8  9 10 11 12]


[[ 1  2  3]
 [ 4  5  6]
 [ 7  8  9]
 [10 11 12]]


[-1.59325501 -1.30357228 -1.01388955 -0.72420682 -0.43452409 -0.14484136
  0.14484136  0.43452409  0.72420682  1.01388955  1.30357228  1.59325501]


[[-1.34164079 -1.34164079 -1.34164079]
 [-0.4472136  -0.4472136  -0.4472136 ]
 [ 0.4472136   0.4472136   0.4472136 ]
 [ 1.34164079  1.34164079  1.34164079]]
range(0, 27)
<class 'NoneType'>
