In [1]:
import YXJ
import torch
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import numpy as np
import torch.nn.functional as F

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np

class TimeSeriesDataset(Dataset):
    def __init__(self, csv_file, steps_per_sample):
        self.data_frame = pd.read_csv(csv_file) # 读取CSV文件
        self.steps_per_sample = steps_per_sample
        # 预处理数据
        n_samples = len(self.data_frame) // steps_per_sample
        self.data_frame = self.data_frame.iloc[:n_samples * steps_per_sample]
        self.data_frame = self.data_frame.values.reshape(n_samples, steps_per_sample,-1)
        
    def __len__(self):
        return len(self.data_frame)
    
    def __getitem__(self, idx):
        sample = self.data_frame[idx]
        X = sample[:, :-1]  # 所有行，除了最后一列
        y = sample[0, -1]   # 第一个时间步的标签
        
        # 转换为Tensor
        X = torch.tensor(X, dtype=torch.float32)
        X = X.transpose(0, 1)
        y = torch.tensor(y, dtype=torch.long)
        return X, y


# 使用示例
SAMPLES_PER_GESTURE = 100
dataset = TimeSeriesDataset('normalized_data.csv', SAMPLES_PER_GESTURE)
GESTURES = ["smoke", "nosmoke"]  # 手势列表
NUM_GESTURES= len(GESTURES);

# 分割数据集
from torch.utils.data import random_split
dataset_size = len(dataset)
train_size = int(dataset_size * 0.7)  # 70% 数据用于训练
valid_size = int(dataset_size * 0.2)  # 20% 数据用于验证
test_size = dataset_size - train_size - valid_size  # 剩余10% 用于测试
batch_size =64;
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])
train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

YXJ.check_dataloader(test_loader)
input_size=SAMPLES_PER_GESTURE*9

torch.Size([64, 9, 100]) tensor([0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0,
        0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 0, 0,
        0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0])
torch.Size([64, 9, 100]) tensor([0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1,
        1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,
        0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0])
torch.Size([64, 9, 100]) tensor([1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1,
        0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0,
        1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0])
torch.Size([64, 9, 100]) tensor([1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0,
        1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1,
        0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1])
torch.Size([64, 9, 100]) tensor(

In [3]:
import torch
import torch.nn as nn

class GRUNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(GRUNet, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) 
        # 前向传播GRU
        out, _ = self.gru(x, h0)
        # 取最后一个时间步
        out = out[:, -1, :]
        out = self.fc(out)
        return out

In [4]:
def validate(model, device, criterion, test_loader):
    model.eval()
    validation_loss = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            validation_loss += criterion(output, target).item()
    validation_loss /= len(test_loader)  # 计算平均验证损失
    return validation_loss
    
def train(model, device, train_loader, criterion, optimizer, epochs):
    model.train()
    train_losses = []  # 用于存储每个epoch的损失值
    for epoch in range(epochs):
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()           # 清零梯度
            output = model(data)            # 前向传播
            loss = criterion(output, target) # 计算损失
            loss.backward()                 # 反向传播
            optimizer.step()                # 更新参数
        train_losses.append(loss.item())
        print(f'Epoch {epoch+1}, Loss: {loss.item()}')
    return train_losses
    
def train_vali(model, device, train_loader, test_loader, criterion, optimizer, epochs):
    model.train()
    train_losses = []
    validation_losses = []
    for epoch in range(epochs):
        model.train()
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()  # 清零梯度
            output = model(data)  # 前向传播
            loss = criterion(output, target)  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
        train_losses.append(loss.item())
        
        # 在每个epoch后计算验证集上的损失
        validation_loss = validate(model, device, criterion, test_loader)
        validation_losses.append(validation_loss)
        
        print(f'Epoch {epoch+1}, Training Loss: {loss.item()}, Validation Loss: {validation_loss}')

    return train_losses, validation_losses
    
def test(model, device, test_loader):
    model.eval()  # 将模型设置为评估模式
    test_loss = 0
    correct = 0  # 用于累计正确预测的数量
    with torch.no_grad():  # 在评估阶段不计算梯度
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()  # 累加损失值
            pred = output.argmax(dim=1, keepdim=True)  # 找到概率最高的类别作为预测
            correct += pred.eq(target.view_as(pred)).sum().item()  # 如果预测正确，则累加到correct变量中

    test_loss /= len(test_loader.dataset)  # 计算平均损失
    accuracy = 100. * correct / len(test_loader.dataset)  # 计算准确率

    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.4f}%)')
    print(len(test_loader.dataset))

In [5]:
# 使用GPU
use_cuda = torch.cuda.is_available()
device=torch.device('cuda' if use_cuda else 'cpu')
hidden_size =30
layers_num =30
model = GRUNet(100, hidden_size,layers_num,2).to(device) 
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)  # input_size, hidden_size, num_layers, num_classes

# 假设 train_loader 和 test_loader 已经定义
train_losses, validation_losses=train_vali(model, device, train_loader, valid_loader, criterion, optimizer, epochs=100)
YXJ.draw_loss(train_losses,validation_losses)
test(model, device, valid_loader)
test(model, device, test_loader)

KeyboardInterrupt: 

In [None]:
print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.4f}%)')
print(len(test_loader.dataset))