In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import random

# 自定义 MAPE 损失函数
class MAPE_Loss(nn.Module):
    def __init__(self):
        super(MAPE_Loss, self).__init__()

    def forward(self, y_pred, y_true):
        epsilon = 1e-8  # 避免除零
        return torch.mean(torch.abs((y_true - y_pred) / (y_true + epsilon))) * 100

# 自定义 RMSE 损失函数
class RMSE_Loss(nn.Module):
    def __init__(self):
        super(RMSE_Loss, self).__init__()

    def forward(self, y_pred, y_true):
        return torch.sqrt(torch.mean((y_true - y_pred) ** 2))

In [2]:
# 读取数据
data = pd.read_csv("../data/dataset.csv")

# 数据分割
data['target_class'] = pd.qcut(data['Cs'], q=10, labels=False)
X = data.drop(['Cs', 'target_class'], axis=1).values
y = data['Cs'].values
stratify_column = data['target_class']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=21, stratify=stratify_column)

# 数据标准化
scaler = StandardScaler()
scaler.fit(X_train)

X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 将数据转换为 PyTorch 张量
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)

X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

# 创建 DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [3]:
class ANN(nn.Module):
    def __init__(self, input_dim):
        super(ANN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 12)
        self.layer2 = nn.Linear(12, 90)
        self.layer3 = nn.Linear(90, 90)
        self.layer4 = nn.Linear(90, 60)
        self.layer5 = nn.Linear(60, 70)
        self.layer6 = nn.Linear(70, 30)
        self.output = nn.Linear(30, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x))
        x = torch.relu(self.layer5(x))
        x = torch.relu(self.layer6(x))
        x = self.output(x)
        return x

# 初始化模型
input_dim = X_train_scaled.shape[1]
model = ANN(input_dim)

# 选择损失函数 (可以选择 MAPE 或 RMSE)
loss_function = MAPE_Loss()
# loss_function = RMSE_Loss()  # 或者使用 MAPE_Loss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# # 设置 PyTorch 随机种子
# def set_random_seed(seed):
#     torch.manual_seed(seed)
#     torch.cuda.manual_seed_all(seed)  # 如果使用 GPU
#     np.random.seed(seed)  # 设置 Numpy 随机种子
#     random.seed(seed)  # 设置 Python 原生随机数生成器的种子
#     torch.backends.cudnn.deterministic = True  # 保证卷积操作的确定性
#     torch.backends.cudnn.benchmark = False  # 禁用 cudnn 自动优化（用于固定输入尺寸）
#
# # 设置种子
# set_random_seed(21)

In [4]:
# 训练模型
num_epochs = 5000
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = loss_function(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # 每 10 个 epoch 打印一次平均损失
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

# 保存模型
torch.save(model.state_dict(), "ann_model_CPU_5000.pth")

Epoch [10/5000], Loss: 32.6088
Epoch [20/5000], Loss: 25.0076
Epoch [30/5000], Loss: 20.8672
Epoch [40/5000], Loss: 17.4715
Epoch [50/5000], Loss: 15.1184
Epoch [60/5000], Loss: 13.8035
Epoch [70/5000], Loss: 13.0016
Epoch [80/5000], Loss: 12.6784
Epoch [90/5000], Loss: 12.3825
Epoch [100/5000], Loss: 12.3214
Epoch [110/5000], Loss: 11.6720
Epoch [120/5000], Loss: 11.3387
Epoch [130/5000], Loss: 11.1410
Epoch [140/5000], Loss: 10.9073
Epoch [150/5000], Loss: 10.5253
Epoch [160/5000], Loss: 10.3899
Epoch [170/5000], Loss: 10.4905
Epoch [180/5000], Loss: 9.8678
Epoch [190/5000], Loss: 10.2188
Epoch [200/5000], Loss: 10.0219
Epoch [210/5000], Loss: 9.6355
Epoch [220/5000], Loss: 8.9991
Epoch [230/5000], Loss: 8.6299
Epoch [240/5000], Loss: 9.0249
Epoch [250/5000], Loss: 8.2829
Epoch [260/5000], Loss: 8.1197
Epoch [270/5000], Loss: 7.9727
Epoch [280/5000], Loss: 8.4938
Epoch [290/5000], Loss: 8.9016
Epoch [300/5000], Loss: 7.6739
Epoch [310/5000], Loss: 7.5248
Epoch [320/5000], Loss: 7.570

In [7]:
# 重新定义模型结构
model = ANN(input_dim)  # 使用相同的模型结构

# 加载模型参数，确保使用 weights_only=True 来提高安全性
model.load_state_dict(torch.load("ann_model.pth", weights_only=True))  # 加载模型参数

# 评估模型 (确保在 GPU 上)
model.eval()
with torch.no_grad():
    y_train_pred = model(X_train_tensor).cpu().numpy()  # 转换为 CPU 数据，便于后续处理
    y_test_pred = model(X_test_tensor).cpu().numpy()

from function import metrics_to_dataframe, plot_actual_vs_predicted
# 绘制实际 vs 预测图
# plot_actual_vs_predicted(y_train, y_train_pred, y_test, y_test_pred, 'Artificial Neural Network', 'ann.png')

# 计算并显示评估指标
ann_metrics = metrics_to_dataframe(y_train, y_train_pred, y_test, y_test_pred, 'ANN')
ann_metrics.to_csv('ann_metrics.csv', index=False)
ann_metrics

Unnamed: 0,model,R2_train,MAE_train,MAPE_train,RMSE_train,R2_test,MAE_test,MAPE_test,RMSE_test
0,ANN,0.943932,5.739903,6.449725,9.45267,0.963017,5.347574,6.180498,7.632015


In [15]:
y_train.shape, y_train_pred.shape

((480,), (480, 1))

In [17]:
ann_train = pd.DataFrame({'Actual': y_train, 'Predicted': y_train_pred.squeeze()})
ann_test = pd.DataFrame({'Actual': y_test, 'Predicted': y_test_pred.squeeze()})
ann_train.to_csv('ann_train.csv', index=False)
ann_test.to_csv('ann_test.csv', index=False)