In [1]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

In [2]:
class TimeSeriesDataset(Dataset):
    def __init__(self, csv_file, num_channels, seq_length, transform=None):
        self.data = pd.read_csv(csv_file)
        self.num_channels = num_channels
        self.seq_length = seq_length
        self.transform = transform
        self.samples = self._extract_samples()

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        features = sample['data']
        label = sample['label']
        
        if self.transform:
            features = self.transform(features)
        
        # 将标签转换为 Long 类型
        label = torch.tensor(label, dtype=torch.long)
        
        return features, label

    def _extract_samples(self):
        samples = []
        # 遍历数据集，从第一个样本到倒数第 seq_length 个样本
        for i in range(len(self.data) - self.seq_length + 1):
            sample_data = np.zeros((self.seq_length, self.num_channels), dtype=np.float32)  # 存储当前样本的特征数据
            # 遍历每个通道（每列数据）
            for j, channel_name in enumerate(['J1', 'J2', 'J3', 'M1', 'M2', 'M3']):
                # 根据通道名称获取数据列
                channel_data = self.data.iloc[i:i+self.seq_length][channel_name].values.astype(np.float32)
                # 将当前通道的数据加入到样本数据中的对应列
                sample_data[:, j] = channel_data
            # 提取标签，这里假设标签在时间序列的最后一个时间点上
            label = self.data.iloc[i + self.seq_length - 1]['label']
            # 组合成一个样本，包括特征数据和标签
            sample = {'data': sample_data, 'label': label}
            # 将当前样本加入到样本列表中
            samples.append(sample)
        return samples


In [3]:
# 加载数据
train_file = 'new_train_peek_data.csv'
test_file = 'new_test_peek_data.csv'
num_channels = 6  # J1-J3 和 M1-M3 共 6 个通道
seq_length = 200   # 序列长度
train_dataset = TimeSeriesDataset(train_file, num_channels, seq_length)
test_dataset = TimeSeriesDataset(test_file, num_channels, seq_length)
# 创建 DataLoader
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)


In [4]:
print(train_dataset[12000])
print(test_dataset[12000])
# 打印训练集的标签列数据类型
print("训练集标签数据类型:", type(train_dataset.data['label']))

# 打印测试集的标签列数据类型
print("测试集标签数据类型:", type(test_dataset.data['label']))


(array([[ 122.,   44.,  105., 2422., 1799., 1935.],
       [ 122.,   44.,  105., 1723., 1705., 1767.],
       [ 121.,   44.,  105., 1704., 1692., 1753.],
       ...,
       [ 120.,   47.,  110., 1550., 1682., 1704.],
       [ 124.,   45.,  108., 1226., 1351., 1278.],
       [ 121.,   45.,  107., 2508., 1878., 2023.]], dtype=float32), tensor(2))
(array([[ 152.,   83.,  108., 1452., 1472., 1368.],
       [ 151.,   83.,  109., 1370., 1402., 1226.],
       [ 155.,   85.,  111., 1402., 1421., 1262.],
       ...,
       [ 158.,   85.,  111., 1319., 1330., 1157.],
       [ 158.,   85.,  111., 1609., 1618., 1777.],
       [ 158.,   85.,  110., 1424., 1441., 1326.]], dtype=float32), tensor(2))
训练集标签数据类型: <class 'pandas.core.series.Series'>
测试集标签数据类型: <class 'pandas.core.series.Series'>


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # 前向传播
        out, _ = self.lstm(x, (h0, c0))
        # 取最后一个时间步的输出并应用 softmax
        out = F.softmax(self.fc(out[:, -1, :]), dim=1)
        return out

best_accuracy = 0.0  # 初始化最佳准确率
# 设置超参数
input_size = 6  # 输入特征维度，即通道数
hidden_size = 64  # 隐藏层维度
num_layers = 4  # LSTM 层数
num_classes = 5  # 输出类别数，即标签类别数
batch_size = 128
learning_rate = 0.0001
num_epochs = 5

# 初始化模型、损失函数和优化器
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
for epoch in range(num_epochs):
    model.train()  # 设置模型为训练模式
    for inputs, labels in train_loader:
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 在训练集上打印每个epoch的损失
    print(f'Training Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    
    # 在测试集上评估模型
    model.eval()  # 设置模型为评估模式
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    print(f'Test Accuracy: {100 * accuracy:.2f}%')
    
    # 保存最佳模型参数
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), 'best_model.pth')


Training Epoch [1/5], Loss: 1.8307
Test Accuracy: 32.12%
Training Epoch [2/5], Loss: 1.8205
Test Accuracy: 44.33%
Training Epoch [3/5], Loss: 1.8872
Test Accuracy: 25.25%
Training Epoch [4/5], Loss: 1.0751
Test Accuracy: 1.30%
Training Epoch [5/5], Loss: 0.9406
Test Accuracy: 20.14%
