In [2]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import random

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 如果有多个GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 设置随机数种子
set_seed(66)

# 数据集定义
class AccelerometerDataset(Dataset):
    def __init__(self, dataX_file, dataY_file, dataZ_file, dataLabel_file):
        self.dataX = self.load_data(dataX_file)
        self.dataY = self.load_data(dataY_file)
        self.dataZ = self.load_data(dataZ_file)
        self.labels = self.load_labels(dataLabel_file)
        
        # 将三个轴的数据堆叠成三维输入 (samples, time_step, feature)
        self.data = np.stack((self.dataX, self.dataY, self.dataZ), axis=2)
        self.scaler = StandardScaler()
        self.data = self.scaler.fit_transform(self.data.reshape(-1, self.data.shape[2])).reshape(self.data.shape)
        
    def load_data(self, file):
        with open(file, 'r') as f:
            data = [list(map(float, line.strip().split())) for line in f]
        return np.array(data)
    
    def load_labels(self, file):
        with open(file, 'r') as f:
            labels = [int(line.strip().split()[0]) for line in f]
        return np.array(labels)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# 创建数据集和数据加载器
dataset = AccelerometerDataset('dataset1/dataX.txt', 'dataset1/dataY.txt', 'dataset1/dataZ.txt', 'dataset1/dataLabel.txt')

# 拆分数据集为训练集和验证集
train_idx, val_idx = train_test_split(np.arange(len(dataset)), test_size=0.2, stratify=dataset.labels)

train_dataset = torch.utils.data.Subset(dataset, train_idx)
val_dataset = torch.utils.data.Subset(dataset, val_idx)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 模型定义
# 定义自注意力机制
class SelfAttention(nn.Module):
    def __init__(self, hidden_dim):
        super(SelfAttention, self).__init__()
        self.hidden_dim = hidden_dim
        self.query = nn.Linear(hidden_dim, hidden_dim)
        self.key = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, hidden_dim)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, x):
        queries = self.query(x)
        keys = self.key(x)
        values = self.value(x)
        
        attention_scores = torch.matmul(queries, keys.transpose(-2, -1)) / (self.hidden_dim ** 0.5)
        attention_weights = self.softmax(attention_scores)
        weighted_sum = torch.matmul(attention_weights, values)
        
        return weighted_sum

class BiLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(BiLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.attention = SelfAttention(hidden_size * 2)
        self.fc = nn.Linear(hidden_size*2, num_classes)  # 双向LSTM，hidden_size*2

    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)  # 2 for bidirection
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM前向传播
        out, _ = self.lstm(x, (h0, c0))
        out = self.attention(out)
        # 取加权求和后的输出的平均值
        out = out.mean(dim=1)
        out = self.fc(out)
        return out

# 超参数
input_size = 3  # 输入维度（加速度数据的三个轴）
hidden_size = 49
num_layers = 2
num_classes = len(set(dataset.labels))  # 类别数
num_epochs = 100
learning_rate = 0.001

# 设备配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
model = BiLSTMModel(input_size, hidden_size, num_layers, num_classes).to(device)
print(model)
total_params = sum(p.numel() for p in model.parameters())
print(f'Total number of parameters: {total_params}')

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 保存性能最好的模型
# best_val_loss = float('inf')
best_val_accuracy = 0
best_model_path = 'best_model_BiLSTM_self.pth'

def calculate_accuracy(loader, model, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, labels in loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

# 训练循环
for epoch in range(num_epochs):
    model.train()
    for data, labels in train_loader:
        data, labels = data.to(device), labels.to(device)
        
        # 前向传播
        outputs = model(data)
        loss = criterion(outputs, labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # 计算训练集和验证集准确率
    train_accuracy = calculate_accuracy(train_loader, model, device)
    val_accuracy = calculate_accuracy(val_loader, model, device)
    
    # 验证循环
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for data, labels in val_loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    
    val_loss /= len(val_loader)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Val Accuracy: {val_accuracy:.2f}%')
    
    # # 如果验证损失减小，则保存模型
    # if val_loss < best_val_loss:
    #     best_val_loss = val_loss
    #     torch.save(model.state_dict(), best_model_path)
    #     print(f'Best model saved with loss: {best_val_loss:.4f}')

    # 如果验证准确率减小，则保存模型
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f'Best model saved with accuracy: {best_val_accuracy:.2f}%')

print('训练完成')


Using device: cpu
BiLSTMModel(
  (lstm): LSTM(3, 49, num_layers=2, batch_first=True, bidirectional=True)
  (attention): SelfAttention(
    (query): Linear(in_features=98, out_features=98, bias=True)
    (key): Linear(in_features=98, out_features=98, bias=True)
    (value): Linear(in_features=98, out_features=98, bias=True)
    (softmax): Softmax(dim=-1)
  )
  (fc): Linear(in_features=98, out_features=2, bias=True)
)
Total number of parameters: 108880
Epoch [1/100], Train Loss: 0.6829, Val Loss: 0.6804, Train Accuracy: 59.32%, Val Accuracy: 59.20%
Best model saved with accuracy: 59.20%
Epoch [2/100], Train Loss: 0.6788, Val Loss: 0.6732, Train Accuracy: 59.32%, Val Accuracy: 59.20%
Epoch [3/100], Train Loss: 0.5835, Val Loss: 0.5578, Train Accuracy: 74.15%, Val Accuracy: 74.00%
Best model saved with accuracy: 74.00%
Epoch [4/100], Train Loss: 0.4627, Val Loss: 0.4086, Train Accuracy: 81.86%, Val Accuracy: 79.60%
Best model saved with accuracy: 79.60%
Epoch [5/100], Train Loss: 0.4110, V

In [3]:
from sklearn.metrics import precision_score, recall_score, f1_score
def calculate_metrics(loader, model, device):
    model.eval()
    all_labels = []
    all_predictions = []
    with torch.no_grad():
        for data, labels in loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
    
    precision = precision_score(all_labels, all_predictions, average='weighted')
    recall = recall_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')
    accuracy = (np.array(all_predictions) == np.array(all_labels)).mean() * 100
    
    return accuracy, precision, recall, f1

In [4]:
# 加载最佳模型
model.load_state_dict(torch.load(best_model_path))

# 计算验证集的precision, recall和F1 score
val_accuracy, val_precision, val_recall, val_f1 = calculate_metrics(val_loader, model, device)
print(f'Validation Accuracy: {val_accuracy:.4f}%')
print(f'Validation Precision: {val_precision:.4f}')
print(f'Validation Recall: {val_recall:.4f}')
print(f'Validation F1 Score: {val_f1:.4f}')

Validation Accuracy: 91.2000%
Validation Precision: 0.9122
Validation Recall: 0.9120
Validation F1 Score: 0.9115


  model.load_state_dict(torch.load(best_model_path))
