In [1]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical


In [2]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# 加载数据的函数
def load_data(directory, sequence_length=4096):
    X, y = [], []
    for label in range(4):  # 四种状态
        folder = os.path.join(directory, str(label))
        for file in os.listdir(folder):
            if file.endswith('.txt'):
                file_path = os.path.join(folder, file)
                with open(file_path, 'r') as f:
                    waveform = [float(line.strip()) for line in f.readlines()]
                    if len(waveform) == sequence_length:
                        X.append(waveform)
                        y.append(label)
    X = np.array(X)
    y = np.array(y)
    return X, y

# 加载测试数据的函数
def load_test_data(test_directory, sequence_length=4096):
    X_test = []
    for i in range(2000):  # 根据您的文件数量调整
        file_path = os.path.join(test_directory, f"{i}.txt")
        df = pd.read_csv(file_path, sep='\t', header=None).T
        waveform = df.values.flatten()
        if len(waveform) == sequence_length:
            X_test.append(waveform)
    X_test = np.array(X_test)
    return X_test

# 重塑数据以适应LSTM输入
def reshape_data(X):
    return X.reshape((X.shape[0], X.shape[1], 1))

# 加载训练数据
X, y = load_data('data/train')
X = reshape_data(X)
y = to_categorical(y, num_classes=4)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# 加载测试数据
X_test = load_test_data('data/test1')
X_test = reshape_data(X_test)


In [3]:
# 查看数据维度
print("训练数据维度: ", X_train.shape)
print("验证数据维度: ", X_val.shape)
print("测试数据维度: ", X_test.shape)

# 查看标签的维度
print("训练标签维度: ", y_train.shape)
print("验证标签维度: ", y_val.shape)

# 查看一些训练样本
print("训练样本示例:\n", X_train[:2])

# 查看一些测试样本
print("测试样本示例:\n", X_test[:2])

# 检查训练标签分布
unique, counts = np.unique(np.argmax(y_train, axis=1), return_counts=True)
print("训练集标签分布: ", dict(zip(unique, counts)))

# 检查验证标签分布
unique, counts = np.unique(np.argmax(y_val, axis=1), return_counts=True)
print("验证集标签分布: ", dict(zip(unique, counts)))


训练数据维度:  (3200, 4096, 1)
验证数据维度:  (800, 4096, 1)
测试数据维度:  (2000, 4096, 1)
训练标签维度:  (3200, 4)
验证标签维度:  (800, 4)
训练样本示例:
 [[[-0.58664616]
  [-0.94184249]
  [-0.25689108]
  ...
  [-0.39397053]
  [ 0.40303024]
  [-2.12762123]]

 [[-0.24358047]
  [-0.37693999]
  [ 1.24655105]
  ...
  [ 3.09291723]
  [ 2.64895901]
  [ 1.81705709]]]
测试样本示例:
 [[[ -2.29395955]
  [ -0.51380474]
  [  1.34389521]
  ...
  [ -0.77486932]
  [  0.55476074]
  [ -0.66728662]]

 [[ -9.38502737]
  [ 25.77933458]
  [ 31.80100779]
  ...
  [-69.92468496]
  [-32.02012487]
  [ 18.34911875]]]
训练集标签分布:  {0: 791, 1: 787, 2: 806, 3: 816}
验证集标签分布:  {0: 209, 1: 213, 2: 194, 3: 184}


In [4]:
import torch
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

# 定义重塑数据的函数
def reshape_data_for_lstm(X):
    # 重塑数据为 (batch_size, 64, 64)
    batch_size = X.shape[0]
    return X.reshape(batch_size, 64, 64)

# 将数据从 numpy 数组转换为 PyTorch 张量，并应用重塑
X_train_tensor = torch.tensor(reshape_data_for_lstm(X_train.squeeze(-1)), dtype=torch.float32)
X_val_tensor = torch.tensor(reshape_data_for_lstm(X_val.squeeze(-1)), dtype=torch.float32)
X_test_tensor = torch.tensor(reshape_data_for_lstm(X_test.squeeze(-1)), dtype=torch.float32)

# 转换标签为 PyTorch 张量
y_train_tensor = torch.tensor(y_train, dtype=torch.long).squeeze()
y_val_tensor = torch.tensor(y_val, dtype=torch.long).squeeze()

# 创建 TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

# 创建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# 验证 DataLoader 输出
for data, target in train_loader:
    print("Batch data size:", data.size())  # 应该输出: torch.Size([64, 64, 64])
    print("Batch target size:", target.size())
    break  # 只打印第一批次的信息


Batch data size: torch.Size([64, 64, 64])
Batch target size: torch.Size([64, 4])


In [5]:
# one-hot 编码标签转换为整数索引
y_train_indices = np.argmax(y_train, axis=1)
y_val_indices = np.argmax(y_val, axis=1)

# 转换为 PyTorch 张量
y_train_tensor = torch.tensor(y_train_indices, dtype=torch.long)
y_val_tensor = torch.tensor(y_val_indices, dtype=torch.long)

# 重新创建 TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

# 重新创建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# 再次验证 DataLoader 输出
for data, target in train_loader:
    print("Batch data size:", data.size())  # 应该输出: torch.Size([64, 64, 64])
    print("Batch target size:", target.size())  # 应该输出: torch.Size([64])
    break  # 只打印第一批次的信息


Batch data size: torch.Size([64, 64, 64])
Batch target size: torch.Size([64])


In [6]:
import torch
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self):
        super(LSTMClassifier, self).__init__()
        # LSTM 层
        self.lstm_layers = nn.ModuleList([
            nn.LSTM(64, 256, batch_first=True),
            nn.LSTM(256, 128, batch_first=True),
            nn.LSTM(128, 64, batch_first=True)
        ])
        
        # 分类器
        self.classifier = nn.Sequential(
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 4)  # 输出层，因为您有 4 个类别
        )
    
    def forward(self, x):
        # 通过 LSTM 层
        for lstm_layer in self.lstm_layers:
            x, _ = lstm_layer(x)
        
        # 取 LSTM 最后一层的最后一个时间步的输出
        x = x[:, -1, :]
        
        # 通过分类器
        x = self.classifier(x)
        return x

# 创建模型实例
model = LSTMClassifier()

# 打印模型结构
print(model)


LSTMClassifier(
  (lstm_layers): ModuleList(
    (0): LSTM(64, 256, batch_first=True)
    (1): LSTM(256, 128, batch_first=True)
    (2): LSTM(128, 64, batch_first=True)
  )
  (classifier): Sequential(
    (0): Linear(in_features=64, out_features=64, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=64, out_features=4, bias=True)
  )
)


In [7]:
# 将模型移动到正确的设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [8]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100, patience=6):
    best_acc = 0.0
    best_model_wts = None
    epochs_no_improve = 0  # 跟踪没有改进的 epochs 数

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0

        # 训练阶段
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs} Train loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        # 验证阶段
        model.eval()
        val_loss = 0.0
        val_corrects = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)

        val_loss = val_loss / len(val_loader.dataset)
        val_acc = val_corrects.double() / len(val_loader.dataset)
        print(f'Validation loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # 早停逻辑
        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f'Early stopping triggered after {epoch+1} epochs!')
            break

    # 加载最佳模型权重
    model.load_state_dict(best_model_wts)
    return model

# 训练模型，包含早停
best_model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50, patience=6)

# 保存最佳模型权重
torch.save(best_model.state_dict(), 'best_lstm_model.pth')



Epoch 1/50 Train loss: 1.1246 Acc: 0.4384
Validation loss: 0.9249 Acc: 0.4713
Epoch 2/50 Train loss: 0.9432 Acc: 0.4966
Validation loss: 0.9176 Acc: 0.4888
Epoch 3/50 Train loss: 0.9271 Acc: 0.5034
Validation loss: 0.9114 Acc: 0.4888
Epoch 4/50 Train loss: 0.9190 Acc: 0.4947
Validation loss: 0.9163 Acc: 0.4713
Epoch 5/50 Train loss: 0.9187 Acc: 0.5219
Validation loss: 0.9310 Acc: 0.6200
Epoch 6/50 Train loss: 0.9919 Acc: 0.5941
Validation loss: 0.9442 Acc: 0.5400
Epoch 7/50 Train loss: 0.8780 Acc: 0.5797
Validation loss: 0.7250 Acc: 0.7000
Epoch 8/50 Train loss: 0.8765 Acc: 0.6034
Validation loss: 1.0087 Acc: 0.5400
Epoch 9/50 Train loss: 0.9738 Acc: 0.5572
Validation loss: 0.9647 Acc: 0.5575
Epoch 10/50 Train loss: 0.8411 Acc: 0.6231
Validation loss: 0.8285 Acc: 0.6075
Epoch 11/50 Train loss: 0.8570 Acc: 0.6081
Validation loss: 0.8929 Acc: 0.6012
Epoch 12/50 Train loss: 0.6251 Acc: 0.6900
Validation loss: 0.5257 Acc: 0.7350
Epoch 13/50 Train loss: 0.4393 Acc: 0.7500
Validation loss: 0

In [12]:
# 确保模型处于评估模式
best_model.eval()

# 创建测试集的 DataLoader
test_loader = DataLoader(TensorDataset(X_test_tensor), batch_size=64, shuffle=False)

# 进行预测
pred_y = []
with torch.no_grad():
    for inputs in test_loader:
        inputs = inputs[0].to(device)
        outputs = best_model(inputs)
        _, preds = torch.max(outputs, 1)
        pred_y.extend(preds.cpu().numpy())

# 将预测结果转换为 NumPy 数组
pred_y = np.array(pred_y)


In [13]:
import pandas as pd

# 创建 DataFrame 并保存预测结果
df_test = pd.DataFrame()
df_test["label"] = pred_y
df_test.to_csv('predicted_labels.csv', index=False, header=False)


In [14]:
# 准确率计算函数
def calculate_accuracy(model, data_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# 计算训练集上的准确率
train_accuracy = calculate_accuracy(best_model, train_loader)
print(f'Training set accuracy: {train_accuracy:.4f}')

# 计算验证集上的准确率
val_accuracy = calculate_accuracy(best_model, val_loader)
print(f'Validation set accuracy: {val_accuracy:.4f}')




Training set accuracy: 0.9828
Validation set accuracy: 0.9437
