In [None]:
# 6类数据，分为浅呼吸躺，深呼吸躺，浅呼吸坐，深呼吸坐，浅呼吸走，深呼吸走， 0代表胸式呼吸 1代表腹式呼吸

In [None]:
# 统计一下data.xlsx每一个label分别有多少个数据，总共有6个label（呼吸躺，深呼吸躺，浅呼吸坐，深呼吸坐，浅呼吸走，深呼吸走， 0代表胸式呼吸 1代表腹式呼吸）

In [68]:
import pandas as pd
import torch
import torch.nn as nn

# 读取Excel文件
file_path = 'data/data.xlsx'
df = pd.read_excel(file_path, sheet_name='Sheet1',header=None)

# 统计每个label的数量
label_counts = df.iloc[:,-1].value_counts()

# 打印统计结果
print(label_counts)


6
lying-1    17625
sit-0      16103
lying-0    15824
sit-1      14176
walk-1     10012
walk-0      7519
label          1
Name: count, dtype: int64


In [69]:
import pandas as pd
df = pd.read_excel(file_path, sheet_name='Sheet1')
label_counts = df['label'].value_counts()
print(label_counts)

label
lying-1    17625
sit-0      16103
lying-0    15824
sit-1      14176
walk-1     10012
walk-0      7519
Name: count, dtype: int64


In [70]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder

# 提取特征和标签
features = df[['x_imu', 'y_imu', 'z_imu', 'x_gry', 'y_gry', 'z_gry']]
labels = df['label']

# 特征标准化
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

# 标签编码
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(features_scaled, labels_encoded, test_size=0.2, random_state=42)

# 转换为PyTorch张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)


In [60]:
from torch.utils.data import DataLoader, TensorDataset

# 创建数据集和数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)



In [92]:
import torch.nn as nn
import torch.optim as optim

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # RNN层
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        # 全连接层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])
        return out

# 参数
input_size = features.shape[1]  # 特征数量
print(f'input_size:{input_size}')
hidden_size = 64
output_size = len(label_encoder.classes_)  # 标签类别数量
print(f'hidden_size:{hidden_size}')
num_layers = 1

# 实例化模型、损失函数和优化器
model = RNNModel(input_size, hidden_size, output_size, num_layers)
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

input_size:6
hidden_size:64


In [94]:
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        outputs = model(inputs.unsqueeze(1))  # 需要增加一个维度以适应 RNN 的输入
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
torch.save(model.state_dict(), 'rnn_model.pth')



Epoch [1/100], Step [10/4063], Loss: 0.3262
Epoch [1/100], Step [20/4063], Loss: 0.4283
Epoch [1/100], Step [30/4063], Loss: 0.5276
Epoch [1/100], Step [40/4063], Loss: 0.4969
Epoch [1/100], Step [50/4063], Loss: 0.3166
Epoch [1/100], Step [60/4063], Loss: 0.3561
Epoch [1/100], Step [70/4063], Loss: 0.8987
Epoch [1/100], Step [80/4063], Loss: 0.4537
Epoch [1/100], Step [90/4063], Loss: 0.3855
Epoch [1/100], Step [100/4063], Loss: 0.3303
Epoch [1/100], Step [110/4063], Loss: 0.3601
Epoch [1/100], Step [120/4063], Loss: 0.4000
Epoch [1/100], Step [130/4063], Loss: 0.7524
Epoch [1/100], Step [140/4063], Loss: 0.4006
Epoch [1/100], Step [150/4063], Loss: 0.3959
Epoch [1/100], Step [160/4063], Loss: 0.2584
Epoch [1/100], Step [170/4063], Loss: 0.4103
Epoch [1/100], Step [180/4063], Loss: 0.7593
Epoch [1/100], Step [190/4063], Loss: 0.3038
Epoch [1/100], Step [200/4063], Loss: 0.4851
Epoch [1/100], Step [210/4063], Loss: 0.8012
Epoch [1/100], Step [220/4063], Loss: 0.7633
Epoch [1/100], Step

第一次的学习：学习率
optimizer = optim.Adam(model.parameters(), lr=0.001)(刚开始没有归一化，归一化之后就发现效果还不错——）

In [95]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)


[[2860  296    0    0    0    0]
 [ 343 3142    0    0    0    0]
 [   1    0 3124   19   40   53]
 [   0    0    3 2869    5    4]
 [   6    0   64   15  830  588]
 [   1    0   66    0  300 1623]]


第二次的学习：学习率
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [17]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)

[[2838    0    0  294    0   24]
 [ 328 2171    0  923    0   63]
 [  23    0    0 3116    0   98]
 [   2    0    0 2709    0  170]
 [ 115    2   24  731    0  631]
 [ 104    0    0  729    0 1157]]


第三次的学习：学习率
optimizer = optim.Adam(model.parameters(), lr=0.0005) weight_decay=1e-5

In [20]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)

[[2852  301    3    0    0    0]
 [ 339 3096   22    0    0   28]
 [   1    0 3071   47    0  118]
 [   0    0   68 2683    3  127]
 [   7   13  730   19    1  733]
 [   1    4  729   33    0 1223]]


增加惩罚系数在lr最好的

In [23]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)


[[2857  239    2   58    0    0]
 [ 341 2685    0  459    0    0]
 [   1    3 3081    9    0  143]
 [   0   87  644 2127    1   22]
 [   4   68  705   16    2  708]
 [   2  206  641    3    0 1138]]


增加惩罚项之后效果很差

In [26]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)


[[   2    0 3154    0    0    0]
 [   0 2192 1249    0    0   44]
 [   0    5 3226    0    0    6]
 [   0   47 2788    0    0   46]
 [   4  230 1209    0    0   60]
 [   0  480 1355    0    0  155]]


感觉仅仅学习率0.001的时候稍微好一点点

In [29]:
from sklearn.metrics import confusion_matrix

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 生成混淆矩阵
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)


[[2850   62  241    0    1    2]
 [ 339 2668  471    7    0    0]
 [   0    1 3116    0   23   97]
 [   0    0 2709    0    5  167]
 [   0   17  743    2  107  634]
 [   0    7  733    0  100 1150]]


In [97]:
import serial
import numpy as np
import time
from collections import Counter
import torch

# 初始化串口
ser = serial.Serial('COM4', 115200, timeout=1)

def read_imu_data():
    data = []
    while len(data) < 300:  # 读取1000个数据点
        line = ser.readline().decode('utf-8').strip()
        # print(line)
        if line:
            try:
                values = list(map(int, line.split(',')))  # 将字符串转换为整数列表
                if len(values) == 6:  # 确保每行有6个数据
                    data.append(values)
            except ValueError:
                pass  # 跳过不正确的行
    return np.array(data)

def normalize_data(data, min_value=0, max_value=65535):
    return (data - min_value) / (max_value - min_value)

def prepare_data(imu_data):
    imu_tensor = torch.tensor(imu_data, dtype=torch.float32)
    imu_tensor = imu_tensor.unsqueeze(0)  # (1, 1000, 6)
    return imu_tensor

# 加载模型
model = RNNModel(input_size=6, hidden_size=64, output_size=len(label_encoder.classes_), num_layers=1)
model.load_state_dict(torch.load('rnn_model.pth'))
model.eval()

while True:
    imu_data = read_imu_data()
    imu_data_normalized = normalize_data(imu_data)
    input_tensor = prepare_data(imu_data_normalized)
    
    predictions = []
    
    # 使用模型进行推理
    with torch.no_grad():
        output = model(input_tensor)
        _, predicted_labels = torch.max(output, 1)
        predictions.extend(predicted_labels.cpu().numpy())

    # 投票机制
    most_common_label = Counter(predictions).most_common(1)[0][0]
    
    # 将预测结果转换为实际标签
    predicted_label_str = label_encoder.inverse_transform([most_common_label])
    print(f'Predicted label: {predicted_label_str[0]}')

    


  model.load_state_dict(torch.load('rnn_model.pth'))


Predicted label: sit-0
Predicted label: sit-0
Predicted label: walk-0
Predicted label: walk-0
Predicted label: walk-0
Predicted label: sit-1
Predicted label: sit-0
Predicted label: sit-1
Predicted label: sit-0
Predicted label: sit-0
Predicted label: sit-0
Predicted label: sit-0
Predicted label: walk-0
Predicted label: sit-0
Predicted label: walk-1
Predicted label: sit-1
Predicted label: walk-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-0
Predicted label: walk-1
Predicted label: walk-1
Predicted label: sit-0
Predicted label: sit-0
Predicted label: sit-1
Predicted label: sit-1
Predicted label: walk-0
Predicted label: walk-0
Predicted label: sit-1
Predicted label: sit-1
Predicted label: sit-0
Predicted label: sit-1
Predicted label: sit-1


KeyboardInterrupt: 

In [88]:
import torch

# 假设数据格式是 [x_imu, y_imu, z_imu, x_gry, y_gry, z_gry]
def prepare_data(imu_data):
    # 将数据转换为PyTorch张量
    imu_tensor = torch.tensor(imu_data, dtype=torch.float32)
    # 添加批次维度和时间步维度，使得输入符合RNN的要求
    imu_tensor = imu_tensor.unsqueeze(0).unsqueeze(1)  # (batch_size, seq_length, feature_size)
    return imu_tensor

# 准备数据
input_tensor = prepare_data(imu_data)


In [None]:
import torch

# 假设数据格式是 [x_imu, y_imu, z_imu, x_gry, y_gry, z_gry]
def prepare_data(imu_data):
    # 将数据转换为PyTorch张量
    imu_tensor = torch.tensor(imu_data, dtype=torch.float32)
    # 添加批次维度和时间步维度，使得输入符合RNN的要求
    imu_tensor = imu_tensor.unsqueeze(0).unsqueeze(1)  # (batch_size, seq_length, feature_size)
    return imu_tensor

# 准备数据
input_tensor = prepare_data(imu_data)


In [None]:
model = RNNModel(input_size=6, hidden_size=64, output_size=len(label_encoder.classes_), num_layers=1)
model.load_state_dict(torch.load('rnn_model.pth'))
model.eval()

# 使用模型进行推理
with torch.no_grad():
    output = model(input_tensor)
    _, predicted_label = torch.max(output, 1)

# 将预测结果转换为实际标签
predicted_label_str = label_encoder.inverse_transform(predicted_label.cpu().numpy())
print(f'Predicted label: {predicted_label_str[0]}')


In [None]:
import time

while True:
    imu_data = read_imu_data()
    input_tensor = prepare_data(imu_data)
    
    with torch.no_grad():
        output = model(input_tensor)
        _, predicted_label = torch.max(output, 1)
    
    predicted_label_str = label_encoder.inverse_transform(predicted_label.cpu().numpy())
    print(f'Predicted label: {predicted_label_str[0]}')

    # 可以加一个暂停时间，避免无限循环
    time.sleep(1)  # 例如，每秒推理一次


In [4]:
import serial.tools.list_ports

ports = serial.tools.list_ports.comports()
for port in ports:
    print(f"{port.device}: {port.description}")


COM4: USB 串行设备 (COM4)
