In [15]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import itertools
import seaborn as sns
import pickle as pkl
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from sklearn import manifold
import sklearn
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

In [None]:
fault_rates = [0.5]

for fault_rate in fault_rates:
    # 加载训练数据
    x_train = np.load('%s_x_train.npy' % fault_rate)  # 输入特征
    y_train = np.load('%s_y_train.npy' % fault_rate)  # 标签
    y_train = np.argmax(y_train, axis=1)
    # 加载测试数据
    x_test = np.load('weight_%s_x_test.npy' % fault_rate)
    y_test = np.load('weight_%s_y_test.npy' % fault_rate)
    y_test = np.argmax(y_test, axis=1)
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

(74586, 60, 2)
(74586,)
(7140, 60, 2)
(7140,)


In [3]:
# 计算类别权重
classes = np.unique(y_train)
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=classes,
    y=y_train
)
class_weights = torch.tensor(class_weights, dtype=torch.float32)
# 如果使用 GPU，将权重移到 GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_weights = class_weights.to(device)

In [16]:
BATCH_SIZE = 32

x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
x_test = torch.tensor(x_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# 划分训练集和验证集（类似 validation_split=0.2）
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, test_size=0.2, random_state=42
)
# 创建 DataLoader
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

val_dataset = TensorDataset(x_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset)

  x_train = torch.tensor(x_train, dtype=torch.float32)
  y_train = torch.tensor(y_train, dtype=torch.long)


In [5]:
# 查看训练集的 batch 尺寸
for batch_x, batch_y in train_loader:
    print("Train batch - x shape:", batch_x.shape)  # 输入特征的尺寸
    print("Train batch - y shape:", batch_y.shape)  # 标签的尺寸
    break  # 只看第一个 batch

# 查看验证集的 batch 尺寸
for batch_x, batch_y in val_loader:
    print("Val batch - x shape:", batch_x.shape)    # 输入特征的尺寸
    print("Val batch - y shape:", batch_y.shape)    # 标签的尺寸
    break  # 只看第一个 batch

Train batch - x shape: torch.Size([32, 60, 2])
Train batch - y shape: torch.Size([32])
Val batch - x shape: torch.Size([32, 60, 2])
Val batch - y shape: torch.Size([32])


In [None]:
class BaseCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(BaseCNN, self).__init__()
        
        # 卷积层定义
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        
        # 批归一化层
        self.bn1 = nn.BatchNorm1d(32)
        self.bn2 = nn.BatchNorm1d(64)
        self.bn3 = nn.BatchNorm1d(128)
        self.bn4 = nn.BatchNorm1d(256)
        
        # 池化层（全局平均池化替代全连接层）
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        
        # 输出层
        self.fc = nn.Linear(256, num_classes)
        
        # Dropout
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, x):
        # 输入 x: [batch_size, 60, 2] → 转置为 [batch_size, 2, 60]
        x = x.transpose(1, 2)
        
        # 卷积 + ReLU + BatchNorm
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        
        # 全局平均池化 → [batch_size, 256, 1]
        x = self.global_avg_pool(x)
        
        # 展平 → [batch_size, 256]
        x = x.view(x.size(0), -1)
        
        # Dropout
        x = self.dropout(x)
        
        # 输出层 → [batch_size, num_classes]
        x = self.fc(x)
        return x

In [None]:
# 定义模型（替换为你的模型，例如 1D CNN）
model = BaseCNN().to(device)

# 定义优化器和损失函数（带类别权重）
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(weight=class_weights)
early_stopping = EarlyStopping(patience=5)
EPOCHS = 50
# 初始化一个空 DataFrame 来存储训练日志
train_logs = pd.DataFrame(columns=[
    'Epoch', 
    'Train Loss', 
    'Train Acc (%)', 
    'Val Loss', 
    'Val Acc (%)'
])
# 训练循环
for epoch in range(EPOCHS):
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        # 累计训练损失和准确率
        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        train_total += batch_y.size(0)
        train_correct += (predicted == batch_y).sum().item()

    # 计算训练集平均损失和准确率
    train_loss = train_loss / len(train_loader)
    train_acc = 100 * train_correct / train_total
    # 验证集评估
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            # 累计验证损失和准确率
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += batch_y.size(0)
            val_correct += (predicted == batch_y).sum().item()
    # 计算验证集平均损失和准确率
    val_loss = val_loss / len(val_loader)
    val_acc = 100 * val_correct / val_total

    # 打印结果
    print(f'Epoch: {epoch} | '
          f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | '
          f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')
    
    # 将当前 epoch 的结果添加到 DataFrame
    train_logs.loc[len(train_logs)] = {
        'Epoch': epoch + 1,
        'Train Loss': train_loss,
        'Train Acc (%)': train_acc,
        'Val Loss': val_loss,
        'Val Acc (%)': val_acc
    }

# 训练结束后保存到 Excel
train_logs.to_excel('training_baseCNN.xlsx', index=False, engine='openpyxl')
print("训练日志已保存到 training_baseCNN.xlsx")

Epoch: 0 | Train Loss: 0.4174 | Train Acc: 77.48% | Val Loss: 0.2190 | Val Acc: 90.06%
Epoch: 1 | Train Loss: 0.2402 | Train Acc: 86.33% | Val Loss: 0.1413 | Val Acc: 94.23%
Epoch: 2 | Train Loss: 0.2019 | Train Acc: 89.44% | Val Loss: 0.1331 | Val Acc: 91.55%
Epoch: 3 | Train Loss: 0.1728 | Train Acc: 91.45% | Val Loss: 0.1208 | Val Acc: 96.01%
Epoch: 4 | Train Loss: 0.1521 | Train Acc: 92.90% | Val Loss: 0.1036 | Val Acc: 96.51%
Epoch: 5 | Train Loss: 0.1387 | Train Acc: 93.90% | Val Loss: 0.0908 | Val Acc: 96.57%
Epoch: 6 | Train Loss: 0.1301 | Train Acc: 94.42% | Val Loss: 0.0884 | Val Acc: 97.71%
Epoch: 7 | Train Loss: 0.1250 | Train Acc: 94.99% | Val Loss: 0.0907 | Val Acc: 96.13%
Epoch: 8 | Train Loss: 0.1087 | Train Acc: 95.52% | Val Loss: 0.0820 | Val Acc: 97.51%
Epoch: 9 | Train Loss: 0.1099 | Train Acc: 95.39% | Val Loss: 0.0877 | Val Acc: 97.75%
Epoch: 10 | Train Loss: 0.1023 | Train Acc: 95.74% | Val Loss: 0.0771 | Val Acc: 98.35%
Epoch: 11 | Train Loss: 0.1102 | Train Acc

In [None]:
torch.save(model, 'model.pth')  # 保存整个模型
model = torch.load('model.pth', weights_only=False)  # 加载整个模型
model.eval()  # 切换到评估模式
# 初始化存储真实标签和预测标签的列表
all_labels = []
all_predictions = []
# 在测试集上推理
correct = 0
total = 0
with torch.no_grad():  # 关闭梯度计算
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        # 收集标签和预测结果
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# 计算评估指标
accuracy = accuracy_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions, average='macro')  # 多分类用'macro'
recall = recall_score(all_labels, all_predictions, average='macro')
f1 = f1_score(all_labels, all_predictions, average='macro')
# 生成混淆矩阵（后续可用于绘制）
conf_matrix = confusion_matrix(all_labels, all_predictions)
# 将评估指标保存到 DataFrame
metrics_df = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1-Score'],
    'Value': [accuracy, precision, recall, f1]
})

# 将真实标签和预测标签保存到 DataFrame
labels_df = pd.DataFrame({
    'True_Label': all_labels,
    'Predicted_Label': all_predictions
})

# 将混淆矩阵保存到 DataFrame
conf_matrix_df = pd.DataFrame(conf_matrix)

# 写入 Excel 文件的不同 Sheet
with pd.ExcelWriter('test_baseCNN.xlsx', engine='openpyxl') as writer:
    metrics_df.to_excel(writer, sheet_name='Metrics', index=False)
    labels_df.to_excel(writer, sheet_name='True_vs_Pred', index=False)
    conf_matrix_df.to_excel(writer, sheet_name='Confusion_Matrix', index=False)

print("测试结果已保存到 test_baseCNN.xlsx")

测试结果已保存到 test_baseCNN.xlsx
