In [18]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import copy

data preparation(by k_fold):

In [19]:
# constants:
GESTURE_NUM = 13
VCC = 5

In [20]:
# load data
ges_dfs = [[i] for i in range(GESTURE_NUM)]
folder_path = ".\\gesture_data\\"
for i in range(GESTURE_NUM):
    file_path = folder_path + f"gesture{i+1}.csv"
    df = pd.read_csv(file_path)
    zero_rows = (df == 0).all(axis=1)
    zero_rows_indexes = list(zero_rows[zero_rows].index)
    ges_df_list = []
    ges_dfs[i].pop()
    start = 0
    for j in range(len(zero_rows_indexes)):
        ges_df = df.iloc[start:zero_rows_indexes[j],:]
        start = zero_rows_indexes[j] + 1
        ges_df_list.append(ges_df)     
    ges_dfs[i].extend(ges_df_list)

len(ges_dfs),len(ges_dfs[0])

(13, 390)

In [21]:
ges_dfs[0][0].shape

(14, 10)

In [22]:
# get time-step in lstm
time_step =zero_rows_indexes[1] - zero_rows_indexes[0] - 1 # remove all-zero row
time_step

14

In [23]:
def set_shuffle(setX:list,setY:list):
    """
    shuffle setX and setY, but still keep setX and setY's mapping
    return:
       setX,setY
    """
    import random
    ges_set = list(zip(setY,setX))
    keys = [i+1 for i in range(len(ges_set))]
    ges_set_dict = dict(zip(keys,ges_set))
    random.shuffle(keys)
    setX = []
    setY = []
    for i in range(len(keys)):
        ges_sample = ges_set_dict[keys[i]]
        setX.append(ges_sample[1])
        setY.append(ges_sample[0])
    return setX,setY

In [24]:
# get training set, CV set and testing set:
ges_trainingX = []
ges_trainingY = []
# ges_validationX = []
# ges_validationY = []
ges_testingX = []
ges_testingY = []


for i in range(GESTURE_NUM):
    test_split = round(len(ges_dfs[i]) * 0.10) 
    ges_trainingX.extend(ges_dfs[i][:len(ges_dfs[i])-test_split]) # training set
    ges_testingX.extend(ges_dfs[i][len(ges_dfs[i])-test_split:]) # test set

    ges_trainingY.extend([i+1]*(len(ges_dfs[i])-test_split))
    ges_testingY.extend([i+1]*test_split)

# shuffle training and testing set
ges_trainingX,ges_trainingY = set_shuffle(ges_trainingX,ges_trainingY)
ges_testingX,ges_testingY = set_shuffle(ges_testingX,ges_testingY)


In [25]:
# shape of all sets:
print("trainX shape--","(",len(ges_trainingX),",",ges_trainingX[0].shape,")")
print("testX shape--","(",len(ges_testingX),",",ges_testingX[0].shape,")")

trainX shape-- ( 4563 , (14, 10) )
testX shape-- ( 507 , (14, 10) )


In [26]:
from sklearn.preprocessing import MinMaxScaler

In [27]:
# concat all training data in X, and do normalization:
ges_trainingX_df = pd.concat(ges_trainingX)
ges_testingX_df = pd.concat(ges_testingX)
# #用于后续性能优化：让analog pin口读数转为电压
# ges_trainingX_df = transfrom_flex_raw(ges_trainingX_df)
# ges_testingX_df = transfrom_flex_raw(ges_testingX_df)

# transfrom flex sensor data to voltage:

scaler = MinMaxScaler(feature_range=(0,1))
ges_trainingX_scaled = scaler.fit_transform(ges_trainingX_df)
ges_testingX_scaled = scaler.fit_transform(ges_testingX_df)

In [28]:
ges_trainingX_scaled.shape,ges_testingX_scaled.shape

((63882, 10), (7098, 10))

In [29]:
def splitX(dataset,time_step):
    dataX = []
    for i in range(time_step,len(dataset)+time_step,time_step):
        dataX.append(dataset[i-time_step:i,0:dataset.shape[1]]) 
    return np.array(dataX)

In [30]:
def to_categorical_numpy(y, num_classes=None):
    """
    将整数数组转换为 one-hot 编码的 NumPy 数组。
    
    参数:
    - y: 一个包含整数标签的 1D NumPy 数组。
    - num_classes: one-hot 编码的目标类别数。
    
    返回:
    - one_hot: one-hot 编码的 2D NumPy 数组。
    """
    if num_classes is None:
        num_classes = np.max(y) + 1
    
    one_hot = np.zeros((y.shape[0], num_classes))
    one_hot[np.arange(y.shape[0]), y] = 1
    
    return one_hot

In [31]:
# 用numpy写了原来keras中to_categorical的平替方法
ges_trainingX = splitX(ges_trainingX_scaled,time_step)
ges_trainingY = np.array(ges_trainingY) # generate an one-hot encoding for labels, this encoding is a dim=14 vector, where first element points to label=0(however, no label here =0)
ges_testingX = splitX(ges_testingX_scaled,time_step)
ges_testingY = np.array(ges_testingY)

In [32]:
# view shapes
print("trainX Shape-- ",ges_trainingX.shape)
print("trainY Shape-- ",ges_trainingY.shape)
print("testX Shape-- ",ges_testingX.shape)
print("testY Shape-- ",ges_testingY.shape)


trainX Shape--  (4563, 14, 10)
trainY Shape--  (4563,)
testX Shape--  (507, 14, 10)
testY Shape--  (507,)


训练之前，将训练集，验证集和测试集封装成torch中的dataloader

In [33]:
from torch.utils.data import DataLoader,TensorDataset,SubsetRandomSampler
from sklearn.model_selection import KFold

In [34]:
# 将 NumPy 数组转换为 PyTorch Tensor
trainX_tensor = torch.tensor(ges_trainingX,dtype=torch.float32)
trainY_tensor = torch.tensor(ges_trainingY,dtype=torch.float32)
testX_tensor = torch.tensor(ges_testingX,dtype=torch.float32)
testY_tensor = torch.tensor(ges_testingY,dtype=torch.float32)

# 创建 TensorDataset
train_dataset = TensorDataset(trainX_tensor, trainY_tensor)
test_dataset = TensorDataset(testX_tensor, testY_tensor)

# 创建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)  # 测试集通常不打乱

Model training：

In [35]:
# constants
num_layers = 1  #一层lstm
num_directions = 2  #双向lstm
lr = 1e-3 # 学习率
batch_size = 16   
epochs = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [36]:
GESTURE_NUM = 13
num_classses =GESTURE_NUM+1
time_step = 14
input_size = 10
hidden_size = 50 # how many hidden layer on LSTM

In [37]:
class BiLSTMModel(nn.Module):
    def __init__(self, input_size,hidden_size, num_layers, num_directions, num_classes,dropout_prob):
        super(BiLSTMModel, self).__init__()

        self.input_size = input_size # 数据维度
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_directions = num_directions
        
        
        self.lstm1 = nn.LSTM(input_size, hidden_size, 
                             num_layers = num_layers, bidirectional = (num_directions == 2))
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, 
                             num_layers = num_layers, bidirectional = (num_directions == 2))
        self.dropout = nn.Dropout(dropout_prob)
        self.attention_weights_layer = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(inplace=True)
        )
        self.liner = nn.Linear(hidden_size, num_classes)
        self.act_func = nn.Softmax(dim=1)
    
    def forward(self,x):
        """
        x: input data with shape: [batch_size, time_step, embedding_size]
        """
        #x [batch_size, time_step, embedding_size]
        x = x.permute(1, 0, 2) #[time_step, batch_size, embedding_size]
        #LSTM 期望将时间步放在最前面        
        #由于数据集不一定是预先设置的batch_size的整数倍，所以用size(1)获取当前数据实际的batch
        batch_size = x.size(1)

        # LSTM最初的前向输出，即记忆单元C和隐状图H
        h_0 = torch.randn(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(device)
        c_0 = torch.randn(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(device)

        # 第一层LSTM
        #out[seq_len, batch, num_directions * hidden_size]。多层lstm，out只保存最后一层每个时间步t的输出h_t
        #h_n, c_n [num_layers(1) * num_directions, batch, hidden_size]，h_n是全部
        out, (h_n, c_n) = self.lstm1(x, (h_0, c_0))
        # 第一层Dropout，
        out = self.dropout(out)
        #双向LSTM输出拆成前向和后向输出
        #将双向lstm的输出拆分为前向输出和后向输出
        (forward_out, backward_out) = torch.chunk(out, 2, dim = 2)
        out = forward_out + backward_out  #[seq_len, batch, hidden_size]

         # 第二层LSTM
        out, (h_n, c_n) = self.lstm2(out, (h_n, c_n))
        # 第二层Dropout
        out = self.dropout(out)
        
        # 再次将双向LSTM的输出拆分为前向输出和后向输出，并求和
        (forward_out, backward_out) = torch.chunk(out, 2, dim=2)
        out = forward_out + backward_out  # [seq_len, batch, hidden_size]
    
        # 调整out的维度以符合后续处理的要求
        out = out.permute(1, 0, 2)  # [batch, seq_len, hidden_size]
        #print("LSTM out",out)
         #为了使用到lstm最后一个时间步时每层lstm的表达，用h_n生成attention的权重
        h_n = h_n.permute(1, 0, 2)  #[batch, num_layers * num_directions,  hidden_size]
        h_n = torch.sum(h_n, dim=1) #[batch, 1,  hidden_size]
        h_n = h_n.squeeze(dim=1)  #[batch, hidden_size]
        # 得到attention层的权重
        attention_w = self.attention_weights_layer(h_n)  #[batch, hidden_size]
        attention_w = attention_w.unsqueeze(dim=1) #[batch, 1, hidden_size]
        attention_context = torch.bmm(attention_w, out.transpose(1, 2))  #[batch, 1, seq_len]
        #print("attention_context",x,sep='\n')
        softmax_w = F.softmax(attention_context, dim=-1)  #[batch, 1, seq_len], 用softmax对刚才的权重归一化,a-> a'
        
        x = torch.bmm(softmax_w, out)  #[batch, 1, hidden_size] 抽取sequence内的重要信息
        x = x.squeeze(dim=1)  #[batch, hidden_size]
        #print("after squeeze:",x,sep='\n')
        x = self.liner(x)
        #x = self.act_func(x)
        return x
        

In [61]:
def test(model, test_loader, loss_func):
    model.eval()
    loss_val = 0.0
    corrects = 0.0
    for datas, labels in test_loader:
        datas = datas.to(device)
        labels = labels.to(device)
        
        preds = model(datas)
        loss = loss_func(preds, labels.long())
        loss_val += loss.item() * datas.size(0)
        
        #获取预测的最大概率出现的位置
        preds = nn.Softmax(dim=1)(preds)
        preds = torch.argmax(preds, dim=1)
        #labels = torch.argmax(labels, dim=0)
        corrects += torch.sum(preds == labels).item()
    test_loss = loss_val / len(test_loader.dataset) # 计算整个测试集的总损失
    test_acc = corrects / len(test_loader.dataset) # 计算整个测试集的总正确率
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")
    return test_loss,test_acc

In [68]:
def train(model, train_loader, optimizer, loss_func, epochs,k_fold):
    """
    define training process of upon model
    return:
    model: the model after training
    train_losses: loss for each epoch in training
    train_accss: accuracy for each epoch in training
    CV_losses: loss for each epoch in validation
    CV_accs: accuracy for each epoch in validation
    """
    k = k_fold # k折交叉验证
    best_val_acc_list = [0.0 for i in range(k)] # 跟踪每个折最佳验证准确率
    best_models = [] # 记录每一折训练得到的最优model，然后根据正确率得到一个全局最优model
    best_model_params = copy.deepcopy(model.state_dict()) # 创建当前模型参数的深拷贝
    kf = KFold(n_splits=k)
    average_val_acc = 0.0
    train_loss_folds = []
    train_acc_folds = []
    CV_loss_folds = []
    CV_acc_folds = []
    for fold,(train_indexes,val_indexes) in enumerate(kf.split(train_loader.dataset)):
        train_sampler = SubsetRandomSampler(train_indexes) # 告诉dataloader应该加载与len(train_indexes)数量相同，与train_indexes对应的样本
        val_sampler = SubsetRandomSampler(val_indexes)
        curr_train_loader = DataLoader(train_loader.dataset,batch_size=16,sampler=train_sampler)
        val_loader = DataLoader(train_loader.dataset,batch_size=16,sampler=val_sampler) 

        # 记录训练损失和正确率，用于画图：
        train_loss_epochs = []
        train_acc_epochs = []
        # 记录每次验证的损失和正确率，用于画图：
        CV_loss_epochs = []
        CV_acc_epochs = []
        for epoch in range(epochs):
            print(f"Fold{fold+1}, epoch{epoch+1}:...")
            model.train() # 设置为训练模式
            loss_val = 0.0
            corrects = 0.0
            for datas, labels in curr_train_loader:
                # datas: (batch_size,input_size(14),features(10))
                # labels: (batch_size,input_size(14))
                datas = datas.to(device)
                labels = labels.to(device)
                
                preds = model(datas) # 前向传播
                #print(labels.long())
                loss = loss_func(preds, labels.long()) # 计算损失,tensor大小是1
                
                optimizer.zero_grad() # 清除优化器梯度（来自于上一次反向传播）
                loss.backward() # 反向传播, 计算模型参数梯度
                optimizer.step() # 根据计算得到的梯度，使用优化器更新模型的参数。
                
                loss_val += loss.item() * datas.size(0) #获取loss，并乘以当前批次大小
                
                #获取预测的最大概率出现的位置
                preds = nn.Softmax(dim=1)(preds)
                preds = torch.argmax(preds, dim=1)
                #labels = torch.argmax(labels, dim=0)
                corrects += torch.sum(preds == labels).item()
            train_loss = loss_val / len(curr_train_loader.dataset) # 计算整个模型的总损失
            train_acc = corrects / len(curr_train_loader.dataset) # 计算整个模型的总正确率
            print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
            train_loss_epochs.append(train_loss)
            train_acc_epochs.append(train_acc)
            # if(epoch % 2 == 0): 每个epoch都进行评估：
            val_loss,val_acc = test(model, val_loader, loss_func)
            if(best_val_acc_list[fold] < val_acc): #出现最优模型时，保存最优模型
                best_val_acc_list[fold] = val_acc
                best_model_params = copy.deepcopy(model.state_dict())
            # 更新平均accuracy指标
            average_val_acc+=val_acc
            CV_loss_epochs.append(val_loss)
            CV_acc_epochs.append(val_acc)
        model.load_state_dict(best_model_params)
        best_models.append(model)
        train_loss_folds.append(train_loss_epochs)
        train_acc_folds.append(train_acc_epochs)
        CV_loss_folds.append(CV_loss_epochs)
        CV_acc_folds.append(CV_acc_epochs)
        print(len(CV_loss_folds),len(CV_acc_folds),sep=",")
        
    # 计算所有折的平均验证accuracy
    average_val_acc = average_val_acc / (k*epochs)
    print(f'Average Validation Accuracy: {average_val_acc:.4f}')
    # 找到所有折中最好的model作为返回model，以及当前折的loss和acc记录返回
    best_val_acc_index = np.argmax(np.array(best_val_acc_list))
    print(best_val_acc_index)
    model = best_models[best_val_acc_index]
    train_losses = train_loss_folds[best_val_acc_index]
    train_accs = train_acc_folds[best_val_acc_index]
    CV_losses = CV_loss_folds[best_val_acc_index]
    CV_accs = CV_acc_folds[best_val_acc_index]
    return model,train_losses,train_accs,CV_losses,CV_accs
    

In [40]:
test_a = np.arange(5)
test_b = [i for i in range(5,10)]
max_index = np.argmax(test_a)
print(test_b)
print(test_b[max_index])

[5, 6, 7, 8, 9]
9


In [71]:
epochs=20

In [72]:
model = BiLSTMModel(input_size, hidden_size, num_layers, num_directions, num_classses,dropout_prob=0.1)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_func = nn.CrossEntropyLoss() # 与categorical crossEntropyLoss不同，CrossEntropyLoss期望输出是一个类索引，而不是独热编码
model,train_loss,train_acc,CV_loss,CV_acc = train(model, train_loader,optimizer, loss_func, epochs,k_fold=5)

Fold1, epoch1:...
Train Loss: 1.7829, Train Acc: 0.1457
Test Loss: 0.3489, Test Acc: 0.0717
Fold1, epoch2:...
Train Loss: 1.2518, Train Acc: 0.3088
Test Loss: 0.2821, Test Acc: 0.0936
Fold1, epoch3:...
Train Loss: 0.9647, Train Acc: 0.4181
Test Loss: 0.2212, Test Acc: 0.1102
Fold1, epoch4:...
Train Loss: 0.8230, Train Acc: 0.4751
Test Loss: 0.1848, Test Acc: 0.1289
Fold1, epoch5:...
Train Loss: 0.7281, Train Acc: 0.5119
Test Loss: 0.1711, Test Acc: 0.1319
Fold1, epoch6:...
Train Loss: 0.6706, Train Acc: 0.5216
Test Loss: 0.1475, Test Acc: 0.1431
Fold1, epoch7:...
Train Loss: 0.6391, Train Acc: 0.5402
Test Loss: 0.1420, Test Acc: 0.1475
Fold1, epoch8:...
Train Loss: 0.5680, Train Acc: 0.5781
Test Loss: 0.1423, Test Acc: 0.1449
Fold1, epoch9:...
Train Loss: 0.5458, Train Acc: 0.5819
Test Loss: 0.1365, Test Acc: 0.1414
Fold1, epoch10:...
Train Loss: 0.5135, Train Acc: 0.5946
Test Loss: 0.1235, Test Acc: 0.1528
Fold1, epoch11:...
Train Loss: 0.4952, Train Acc: 0.6022
Test Loss: 0.1189, Tes

In [75]:
test(model,test_loader,loss_func)

Test Loss: 1.7972, Test Acc: 0.5049


(1.7971768137030817, 0.504930966469428)

In [98]:
test(model,test_loader,loss_func)

Test Loss: 1.9838184990591317, Test Acc: 0.4714003944773176


(1.9838184990591317, 0.4714003944773176)