In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
# 从本地读取数据
X_all = pd.read_csv('X_filtered.csv')
Y = pd.read_csv('Y.csv')

X_train, X_test, y_train, y_test = train_test_split(X_all, Y, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values.ravel(), dtype=torch.long)
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values.ravel(), dtype=torch.long)

# **定义神经网络**
class NeuralNet(nn.Module):
    def __init__(self, input_dim):
        super(NeuralNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.bn1 = nn.BatchNorm1d(64)
        self.fc2 = nn.Linear(64, 32)
        self.bn2 = nn.BatchNorm1d(32)
        self.fc3 = nn.Linear(32, 3)  # 3 类分类
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# **初始化模型**
input_dim = X_train.shape[1]
model = NeuralNet(input_dim)

# **定义损失函数 & 优化器**
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# **训练模型**
epochs = 300
batch_size = 128  # 增大batch size
patience = 10  # 早停的耐心值
best_loss = float('inf')
no_improve_count = 0

for epoch in range(epochs):
    model.train()
    
    # 创建数据加载器进行batch训练
    dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    epoch_loss = 0
    batch_count = 0
    
    for batch_X, batch_y in dataloader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        batch_count += 1
    
    avg_epoch_loss = epoch_loss / batch_count
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Loss = {avg_epoch_loss:.4f}")
    
    # 检查loss是否改善
    if avg_epoch_loss < best_loss:
        best_loss = avg_epoch_loss
        no_improve_count = 0
    else:
        no_improve_count += 1
    
    # 如果连续patience轮没有改善，则停止训练
    if no_improve_count >= patience:
        print(f"Early stopping at epoch {epoch} as loss hasn't improved for {patience} epochs")
        break

# **测试模型**
model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)
    predicted = torch.argmax(test_outputs, axis=1)
    accuracy = (predicted == y_test_tensor).sum().item() / len(y_test_tensor)
    print(f"Test Accuracy: {accuracy:.4f}")

Epoch 0: Loss = 1.0995
Epoch 10: Loss = 0.9021
Epoch 20: Loss = 0.8610
Epoch 30: Loss = 0.8483
Epoch 40: Loss = 0.8421
Epoch 50: Loss = 0.8362
Epoch 60: Loss = 0.8295
Early stopping at epoch 69 as loss hasn't improved for 10 epochs
Test Accuracy: 0.6350
