# 导入库

In [1]:
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rcParams
import pandas_profiling as ppf
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
# # 设置中文和'-'负号显示问题
# from pylab import mpl
# mpl.rcParams['font.sans-serif'] = ['FangSong']  # 设置matplotlib可以显示汉语
# mpl.rcParams['axes.unicode_minus'] = True

# Preprocessing tools
import math
import time
import numpy as np
import pandas as pd

# DL/ML Algoirthm
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from tsai.all import *

# import data
import joblib


import os, sys
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import warnings
warnings.filterwarnings(action="ignore")

In [2]:
# class HiddenPrints:
#     def __enter__(self):
#         self._original_stdout = sys.stdout
#         sys.stdout = open(os.devnull, 'w')

#     def __exit__(self, exc_type, exc_val, exc_tb):
#         sys.stdout.close()
#         sys.stdout = self._original_stdout

In [3]:
def R2Score(YTrue, YPre):
    u = ((YTrue - YPre) ** 2).sum()
    v = ((YTrue - YTrue.mean()) ** 2).sum()
    return 1 - u / v

# 导入数据

In [4]:
cols = ["One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Rect", "Time_ns"]

train1_data = pd.read_csv("data/Train1.csv", header=None)
train2_data = pd.read_csv("data/Train2.csv", header=None)
train3_data = pd.read_csv("data/Train3.csv", header=None)
train_data = pd.read_csv("data/Train.csv", header=None)

test_data = pd.read_csv("data/Test.csv", header=None)
valid_data = pd.read_csv("data/Valid.csv", header=None)

train_data.columns = test_data.columns = valid_data.columns = cols

In [5]:
train_features = train_data.drop(['Rect', 'Time_ns'], axis=1)
train_labels = train_data.Rect
train_all = pd.concat([train_features, train_labels], axis=1, ignore_index=True, sort=False)
train_six_features = train_data.drop(['Two', 'Five', 'Rect', 'Time_ns'], axis=1)
train_two_features = pd.concat([train_data.Three, train_data.Four], axis=1, ignore_index=False, sort=False)

test_features = test_data.drop(['Rect', 'Time_ns'], axis=1)
test_labels = test_data.Rect
test_all = pd.concat([test_features, test_labels], axis=1, ignore_index=True, sort=False)
test_six_features = test_data.drop(['Two', 'Five', 'Rect', 'Time_ns'], axis=1)
test_two_features = pd.concat([test_data.Three, test_data.Four], axis=1, ignore_index=False, sort=False)

valid_features = valid_data.drop(['Rect', 'Time_ns'], axis=1)
valid_labels = valid_data.Rect
valid_all = pd.concat([valid_features, valid_labels], axis=1, ignore_index=True, sort=False)
valid_six_features = valid_data.drop(['Two', 'Five', 'Rect', 'Time_ns'], axis=1)
valid_two_features = pd.concat([valid_data.Three, valid_data.Four], axis=1, ignore_index=False, sort=False)

sEMG_all = pd.concat([train_all, test_all], axis=0, ignore_index=True, sort=False)

train_all.columns = test_all.columns = valid_all.columns = sEMG_all.columns = cols[:-1]
train_features, train_labels, train_all, sEMG_all, train_six_features, train_two_features

(      One  Two  Three  Four  Five  Six  Seven  Eight
 0      84  268    736   161    57  285     76    209
 1      90  262    717   147    54  292     78    216
 2      96  291    708   173    61  297     80    205
 3      96  361    719   164    58  302     80    208
 4      85  374    746   148    55  309     75    215
 ...   ...  ...    ...   ...   ...  ...    ...    ...
 8926  212  596    640   186    65  573    360    488
 8927  215  588    647   189    69  588    434    561
 8928  194  589    653   185    60  601    477    573
 8929  200  464    649   160    53  629    499    579
 8930  197  438    629   143    53  661    508    606
 
 [8931 rows x 8 columns],
 0       0.18
 1       0.19
 2       0.21
 3       0.22
 4       0.23
         ... 
 8926    0.29
 8927    0.30
 8928    0.31
 8929    0.32
 8930    0.33
 Name: Rect, Length: 8931, dtype: float64,
       One  Two  Three  Four  Five  Six  Seven  Eight  Rect
 0      84  268    736   161    57  285     76    209  0.18
 1     

# 机器学习模型

In [6]:
# 加载机器学习模型
MLModels = joblib.load('models/sEMGML.pkl')

# 初始化最优权重值
MLWeight = np.array([0.42, 0.04, 0.14, 0.11, 0.09, 0.2])

## ML测试

In [7]:
MLTrain_Results = np.zeros((train_features.shape[0], len(MLModels)))
MLTest_Results = np.zeros((test_features.shape[0], len(MLModels)))
MLValid_Results = np.zeros((valid_features.shape[0], len(MLModels)))


idx = 0
for model in MLModels:
    MLTrain_Results[:, idx] = MLModels[model].predict(train_features)
    MLTest_Results[:, idx] = MLModels[model].predict(test_features)
    MLValid_Results[:, idx] = MLModels[model].predict(valid_features)
    idx += 1

MLTrain_Results, MLTest_Results, MLValid_Results

(array([[0.43085063, 0.30477173, 0.22223776, 0.22317647, 0.28845479,
         0.37974083],
        [0.44880923, 0.30763618, 0.22979021, 0.22835294, 0.26067798,
         0.37974083],
        [0.44601176, 0.31969189, 0.2441958 , 0.24035294, 0.27522198,
         0.33478057],
        ...,
        [0.27416195, 0.27762241, 0.29944056, 0.29235294, 0.1957659 ,
         0.25217813],
        [0.33256748, 0.28362576, 0.30496503, 0.29682353, 0.23436033,
         0.25018883],
        [0.34077732, 0.27405009, 0.30132867, 0.28470588, 0.28078823,
         0.29625681]]),
 array([[0.32102138, 0.39810778, 0.42202797, 0.34352941, 0.49197519,
         0.41710743],
        [0.31598836, 0.41696216, 0.45503497, 0.37811765, 0.49892752,
         0.43328741],
        [0.25276803, 0.37851528, 0.28972028, 0.27741176, 0.39213947,
         0.33069032],
        ...,
        [0.26341001, 0.26353859, 0.25391608, 0.27435294, 0.30275246,
         0.2670275 ],
        [0.27854299, 0.26846458, 0.26195804, 0.26423529, 0.255

In [8]:
R2Score(train_labels, MLTrain_Results @ MLWeight)

0.934382825970081

In [9]:
R2Score(test_labels, MLTest_Results @ MLWeight)

0.8338765577848803

In [10]:
R2Score(valid_labels, MLValid_Results @ MLWeight)

0.8268109106138984

# 深度学习模型

In [11]:
from tsai.inference import load_learner

In [12]:
# # 加载深度学习回归模型
# DLModels = {}
# DLModels['10LSTM'] = load_learner(Path('./models/10LSTMRegression.pkl'), cpu=False)
# DLModels['11LSTM'] = load_learner(Path('./models/11LSTMRegression.pkl'), cpu=False)
# DLModels['12LSTM'] = load_learner(Path('./models/12LSTMRegression.pkl'), cpu=False)
# DLModels['13LSTM'] = load_learner(Path('./models/13LSTMRegression.pkl'), cpu=False)
# DLModels['14LSTM'] = load_learner(Path('./models/14LSTMRegression.pkl'), cpu=False)
# DLModels['15XCMPlus'] = load_learner(Path('./models/15XCMPlusRegression.pkl'), cpu=False)

# # 初始化最优权重值
# DLWeight = np.array([0.45, 0.26, 0, 0, 0.14, 0.15])

In [13]:
DLTrain_Features = torch.tensor(train_data.drop(['Rect', 'Time_ns'], axis=1).values, dtype=torch.float32).reshape(-1, 1, 8)
DLTrain_Labels = torch.tensor(train_data.Rect.values.reshape(-1, 1), dtype=torch.float32).reshape(-1)

DLTest_Features = torch.tensor(test_data.drop(['Rect', 'Time_ns'], axis=1).values, dtype=torch.float32).reshape(-1, 1, 8)
DLTest_Labels = torch.tensor(test_data.Rect.values.reshape(-1, 1), dtype=torch.float32).reshape(-1)

DLValid_Features = torch.tensor(valid_data.drop(['Rect', 'Time_ns'], axis=1).values, dtype=torch.float32).reshape(-1, 1, 8)
DLValid_Labels = torch.tensor(valid_data.Rect.values.reshape(-1, 1), dtype=torch.float32).reshape(-1)

In [14]:
# # 加载深度学习回归模型

DLModels = {}
DLModels['30LSTM'] = load_learner(Path('./models/30LSTMRegression.pkl'), cpu=False)
DLModels['31LSTM'] = load_learner(Path('./models/31LSTMRegression.pkl'), cpu=False)
DLModels['32LSTM'] = load_learner(Path('./models/32LSTMRegression.pkl'), cpu=False)
DLModels['33MLSTM_FCN'] = load_learner(Path('./models/33MLSTM_FCNRegression.pkl'), cpu=False)
DLModels['34MLSTM_FCNPlus'] = load_learner(Path('./models/34MLSTM_FCNPlusRegression.pkl'), cpu=False)
DLModels['35LSTM'] = load_learner(Path('./models/35LSTMRegression.pkl'), cpu=False)

# 初始化最优权重值
DLWeight =  torch.tensor([0.15, 0.25, 0.28, 0.32, 0, 0], dtype=torch.float32)

## DL测试

In [15]:
DLTrainProbas = {}
DLTestProbas = {}
DLValidProbas = {}

for i in DLModels:
    DLTrainProbas[i], _, _ = DLModels[i].get_X_preds(DLTrain_Features)
    DLTestProbas[i], _, _ = DLModels[i].get_X_preds(DLTest_Features)
    DLValidProbas[i], _, _ = DLModels[i].get_X_preds(DLValid_Features)

In [16]:
DLTrainProbasCat = torch.cat([DLTrainProbas[i] for i in DLTrainProbas], 1)
DLTestProbasCat = torch.cat([DLTestProbas[i] for i in DLTestProbas], 1)
DLValidProbasCat = torch.cat([DLValidProbas[i] for i in DLValidProbas], 1)

In [17]:
DLTrainProbasCat

TensorBase([[0.2469, 0.2998, 0.3168, 0.2180, 0.1997, 0.2691],
        [0.2669, 0.2944, 0.3257, 0.2173, 0.1927, 0.2893],
        [0.3260, 0.2870, 0.3670, 0.2292, 0.2107, 0.2979],
        ...,
        [0.2597, 0.3004, 0.3020, 0.2846, 0.2918, 0.2497],
        [0.2928, 0.2761, 0.2724, 0.2522, 0.2791, 0.3193],
        [0.3289, 0.2669, 0.2724, 0.2417, 0.2663, 0.3195]])

In [18]:
for i in range(0, 6):
    print(R2Score(DLTrain_Labels, DLTrainProbasCat[:,i]))

TensorBase(0.9124)
TensorBase(0.9267)
TensorBase(0.8937)
TensorBase(0.9440)
TensorBase(0.9430)
TensorBase(0.9144)


In [19]:
R2Score(DLTrain_Labels, DLTrainProbasCat @ DLWeight)

TensorBase(0.9297)

In [20]:
for i in range(0, 6):
    print(R2Score(DLTest_Labels, DLTestProbasCat[:,i]))

TensorBase(0.8396)
TensorBase(0.8367)
TensorBase(0.8351)
TensorBase(0.8345)
TensorBase(0.8342)
TensorBase(0.8333)


In [21]:
R2Score(DLTest_Labels, DLTestProbasCat @ DLWeight)

TensorBase(0.8475)

In [22]:
for i in range(0, 6):
    print(R2Score(DLValid_Labels, DLValidProbasCat[:,i]))

TensorBase(0.8463)
TensorBase(0.8285)
TensorBase(0.8256)
TensorBase(0.8176)
TensorBase(0.8167)
TensorBase(0.8385)


In [23]:
R2Score(DLValid_Labels, DLValidProbasCat @ DLWeight)

TensorBase(0.8410)

# 集成ML和DL

In [24]:
MLTestResult = MLTest_Results @ MLWeight
DLTestResult = np.array(DLTestProbasCat @ DLWeight)

best_value = 0
best_weight = []

In [25]:
for i in range(0, 10001, 1):
    test_re = (MLTestResult * i + DLTestResult * (10000 - i)) / 10000
    
    R2 = R2Score(test_labels, test_re)
    if R2 > best_value:
        best_value = R2
        best_weight = i

print(i, best_value, best_weight)

10000 0.8485710940887329 2112


In [29]:
MLTrainResult = MLTrain_Results @ MLWeight
DLTrainResult = np.array(DLTrainProbasCat @ DLWeight)

train_re = MLTrainResult * 0.2112 + DLTrainResult * 0.7888

R2Score(train_labels, train_re)

0.9326221211456382

In [31]:
MLValidResult = MLValid_Results @ MLWeight
DLValidResult = np.array(DLValidProbasCat @ DLWeight)

valid_re = MLValidResult * 0.2112 + DLValidResult * 0.7888

R2Score(valid_labels, valid_re)

0.8423466037121745

In [33]:
# plt.figure()
# plt.plot(np.arange(len(valid_labels)), valid_labels,'go-',label='True Value')
# plt.plot(np.arange(len(valid_labels)), valid_re,'ro-',label='Predict Value')
# # plt.title(f'{name} score: {score}')
# plt.xlabel = "Samples"
# plt.ylabel = "Angle of Rotation"
# plt.legend()
# plt.show()

# 延时卷积投票算法

## 这边tsai会输出很多空行，需要到pycharm中计算

In [42]:
results = torch.zeros((100, 1))
test_labels = torch.tensor(test_labels)

for ii in range(3, 100, 2):
    window_size = ii  # 滑动窗口大小为7
    half_window_size = window_size // 2
    window_weight = torch.tensor([1 / ii for i in range(0, ii)])

    MLDL_weight = torch.tensor([0.2112, 0.7888])

    idx = 0
    window_slide = torch.zeros(window_weight.shape)
    re = torch.zeros(test_labels.shape)
    re_idx = 0

    isWindowsEmpty = True

    t = 3

    for index, row in test_features.iterrows():
        sEMG = np.array(row).reshape(1, -1)

        # ML
        MLResults = 0
        for i, j in enumerate(MLModels):
            MLResults += MLWeight[i] * MLModels[j].predict(sEMG)
        
        # DL
        DLResults = 0
        for i, j in enumerate(DLModels):
            sEMG = sEMG.reshape(1, 1, 8)
            _, _, preds = DLModels[j].get_X_preds(sEMG)
            DLResults += DLWeight[i] * np.array(preds)

        sEMG = MLDL_weight[0] * MLResults + MLDL_weight[1] * DLResults

        # 先把数据填满
        if isWindowsEmpty:
            window_slide[idx] = sEMG
            # 保存结果
            if idx < half_window_size:
                re[re_idx] = sEMG.item()
                re_idx += 1

                if re_idx == re.shape[0]:
                    break

            idx += 1

            if idx == window_size:
                idx = half_window_size
                isWindowsEmpty = False
            else:
                continue

        # 投票算法
        window_slide[(idx + half_window_size) % window_size] = sEMG

        window_slide[idx] *= window_weight[0]

        for i in range(1, window_size, 1):
            window_slide[idx] += window_slide[(idx + i) % window_size] * window_weight[i]

        re[re_idx] = window_slide[idx].item()
        re_idx += 1

        # print(re_idx)
        if re_idx == valid_features.shape[0] - half_window_size:
            while re_idx < valid_features.shape[0]:
                idx = (idx + 1) % 7
                re[re_idx] = window_slide[idx].item()
                re_idx += 1

        if idx < window_size - 1:
            idx += 1
        else:
            idx = 0

    results[ii] = R2Score(valid_labels, re)
    print(ii, results[ii])