# 基于PyTorch的模型
**注意：本文件为本小组对比探究用例，并非主要的探究内容，少部分内容由AI生成**

为保证代码运行的稳定性，建议您使用以下版本的python及其库：

| 名称 | python | numpy | pandas | matplotlib | seaborn | torch | sklearn |
| ------- | ------- | ------- | ------- | ------- | ------- | ------- | ------- |
| 版本号 | 3.11.13 | 2.3.1 | 2.3.1 | 3.10.3 | 0.13.2 | 2.7.1+cpu | 1.7.0 |

In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import joblib
import os
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.font_manager import FontProperties
import warnings

# 忽略警告
warnings.filterwarnings('ignore')

In [2]:
# 设置中文字体支持
try:
    font = FontProperties(fname=r'C:\Windows\Fonts\simhei.ttf', size=12)  # Windows
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
    plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号
except:
    try:
        font = FontProperties(fname='/System/Library/Fonts/PingFang.ttc', size=12)  # macOS
    except:
        print("警告: 中文字体设置失败，图表可能无法显示中文")

# 设置随机种子确保可复现性
torch.manual_seed(42)
np.random.seed(42)

In [3]:
# 数据上传
def load_data(file_path, is_train=True):
    data = pd.read_csv(file_path, header=0)
    if is_train:
        ids = data.iloc[:, 0].values
        labels = data.iloc[:, 1].values
        features = data.iloc[:, 2:].values
        print(f"训练集加载完成: {features.shape[0]}个样本, {features.shape[1]}个特征")
        return ids, features, labels
    else:
        ids = data.iloc[:, 0].values
        features = data.iloc[:, 2:].values
        print(f"测试集加载完成: {features.shape[0]}个样本, {features.shape[1]}个特征")
        return ids, features

In [4]:
# 数据预处理
# from sklearn.preprocessing import StandardScaler
# 使用sklearn库对特征数据进行归一化
def preprocess_features(features, scaler=None, fit_scaler=False):
    # 需要创建新的归一化数据
    if fit_scaler:
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(features)
        return scaled_features, scaler
    # 已经有归一化数据了
    else:
        scaled_features = scaler.transform(features)
        return scaled_features

In [5]:
# 创建数据集
def create_datasets(features, labels, test_size=0.2):
    # 转换为Tensor，方便使用torch
    features_tensor = torch.tensor(features, dtype=torch.float32)
    labels_tensor = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)
    
    full_dataset = TensorDataset(features_tensor, labels_tensor)
    
    # 分割训练集和验证集（train_set和val_set）
    train_size = int((1 - test_size) * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
    
    return train_dataset, val_dataset

In [6]:
# 多层感知机前向传播
class MLP(nn.Module):
    def __init__(self, input_size, hidden_layers=[128, 64], dropout_rate=0.3):
        super(MLP, self).__init__()
        layers = []
        
        # 添加隐藏层
        for hidden_size in hidden_layers:
            # 全连接层
            layers.append(nn.Linear(input_size, hidden_size))
            # 激活函数
            layers.append(nn.LeakyReLU(0.1))
            # 正则化层（防止过拟合）
            layers.append(nn.Dropout(dropout_rate))
            input_size = hidden_size
        
        # 输出层
        layers.append(nn.Linear(input_size, 1))
        # 将输出压缩到0~1，便于判断结果
        layers.append(nn.Sigmoid())
        
        # 顺序排列各层，形成多层感知机
        self.model = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.model(x)

In [7]:
# 训练函数
# 可以修改epoch和learning-rate
def train_model(model, train_loader, val_loader, epochs=50, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    criterion = nn.BCELoss() # 损失函数，二元交叉熵损失函数，用于二分类问题
    optimizer = optim.Adam(model.parameters(), lr=lr) # 优化器
    
    # 记录训练历史
    history = {
        'train_loss': [],
        'val_loss': [],
        'val_acc': [],
        'val_f1': []
    }
    
    best_val_acc = 0.0
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        
        # 训练
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            optimizer.zero_grad() # 清空梯度
            outputs = model(inputs) # 前向传播
            loss = criterion(outputs, targets) # 计算损失函数值
            loss.backward() # 计算梯度
            optimizer.step() # 更新参数权重
            # with torch.no_grad():
            #     for param in model.parameters():
            #         if param.grad is not None:
            #             param -= lr * param.grad
            
            train_loss += loss.item() * inputs.size(0)
        
        # 验证
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_targets = []
        
        # 禁用梯度
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                
                val_loss += loss.item() * inputs.size(0)
                preds = (outputs > 0.5).float()
                
                # .cpu将数据从GPU转到CPU
                # .numpy将张量转化成数组
                all_preds.extend(preds.cpu().numpy())
                all_targets.extend(targets.cpu().numpy())
        
        # 计算指标
        # 模型在训练集上的平均损失值
        train_loss = train_loss / len(train_loader.dataset)
        # 模型在验证集上的平均损失值
        val_loss = val_loss / len(val_loader.dataset)
        # 准确率：模型在验证机上的正确预测的比例
        val_acc = accuracy_score(all_targets, all_preds)
        # 精确率和召回率的调和平均数（综合判断精确率和召回率）
        val_f1 = f1_score(all_targets, all_preds)
        
        # 记录
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['val_f1'].append(val_f1)
        
        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            # 保存验证集预测结果用于混淆矩阵
            np.save('val_targets.npy', np.array(all_targets))
            np.save('val_preds.npy', np.array(all_preds))
        
        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {train_loss:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"Val Acc: {val_acc:.4f} | "
              f"Val F1: {val_f1:.4f}")
    
    print(f"训练完成，最佳验证准确率: {best_val_acc:.4f}")
    
    # 绘制训练曲线
    plot_training_history(history)
    
    return model, history

In [8]:
# 预测函数（同训练函数）
def predict(model, test_features, batch_size=512):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    
    # numpy数组转化成pytorch张量
    test_tensor = torch.tensor(test_features, dtype=torch.float32)
    test_loader = DataLoader(test_tensor, batch_size=batch_size, shuffle=False)
    
    all_preds = []
    all_probs = [] # 原始可能性（0~1之间）
    
    with torch.no_grad():
        for inputs in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            preds = (outputs > 0.5).float()
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(outputs.cpu().numpy())
    
    return np.array(all_preds).flatten(), np.array(all_probs).flatten()

In [9]:
# 保存预测结果
def save_predictions(test_ids, predictions, output_file='submission.csv'):
    # 创建结果DataFrame
    result_df = pd.DataFrame({
        'id': test_ids,
        'win': predictions.astype(int)  # 转换为整数类型
    })
    
    # 保存到csv文件
    result_df.to_csv(output_file, index=False)
    print(f"预测结果已保存至: {output_file}")
    return result_df

In [10]:
# 可视化部分（纯AI）
def plot_training_history(history):
    plt.figure(figsize=(15, 10))
    
    # 损失值部分
    plt.subplot(2, 2, 1)
    plt.plot(history['train_loss'], label='训练损失')
    plt.plot(history['val_loss'], label='验证损失')
    plt.title('训练和验证损失', fontsize=14)
    plt.xlabel('训练轮次', fontsize=12)
    plt.ylabel('损失', fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # 准确率曲线
    plt.subplot(2, 2, 2)
    plt.plot(history['val_acc'], label='验证准确率', color='green')
    plt.title('验证准确率', fontsize=14)
    plt.xlabel('训练轮次', fontsize=12)
    plt.ylabel('准确率', fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # F1分数曲线
    plt.subplot(2, 2, 3)
    plt.plot(history['val_f1'], label='验证F1分数', color='purple')
    plt.title('验证F1分数', fontsize=14)
    plt.xlabel('训练轮次', fontsize=12)
    plt.ylabel('F1分数', fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # 组合图
    plt.subplot(2, 2, 4)
    plt.plot(history['val_acc'], label='准确率', color='green')
    plt.plot(history['val_f1'], label='F1分数', color='purple')
    plt.title('验证集性能指标', fontsize=14)
    plt.xlabel('训练轮次', fontsize=12)
    plt.ylabel('分数', fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    plt.savefig('training_history.SVG', dpi=300)
    plt.close()
    print("训练历史图表已保存为 training_history.SVG")

In [11]:
# 绘制混淆矩阵
def plot_confusion_matrix(y_true, y_pred, classes=['失败', '胜利']):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    
    # import seaborn as sns
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=classes, yticklabels=classes,
                annot_kws={"size": 16, "weight": "bold"})
    
    plt.title('混淆矩阵', fontsize=18)
    plt.xlabel('预测标签', fontsize=14)
    plt.ylabel('真实标签', fontsize=14)
    
    # 添加性能指标
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    plt.text(0.5, -0.15, 
             f'准确率: {accuracy:.4f} | F1分数: {f1:.4f}',
             ha='center', va='center', transform=plt.gca().transAxes,
             fontsize=12, bbox=dict(facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    plt.savefig('confusion_matrix.SVG', dpi=300, bbox_inches='tight')
    plt.close()
    print("混淆矩阵已保存为 confusion_matrix.SVG")
    return cm

In [12]:
# 绘制预测分布
def plot_prediction_distribution(predictions, probs=None):
    plt.figure(figsize=(12, 6))
    
    # 预测结果分布
    plt.subplot(1, 2, 1)
    win_counts = pd.Series(predictions).value_counts()
    win_counts.plot(kind='bar', color=['#ff9999', '#66b3ff'])
    plt.title('预测结果分布', fontsize=14)
    plt.xlabel('胜负结果', fontsize=12)
    plt.ylabel('数量', fontsize=12)
    plt.xticks([0, 1], ['失败', '胜利'], rotation=0)
    
    # 添加百分比标签
    total = len(predictions)
    for i, count in enumerate(win_counts):
        plt.text(i, count + total*0.01, f'{count/total:.1%}', 
                 ha='center', fontsize=12)
    
    # 预测概率分布
    if probs is not None:
        plt.subplot(1, 2, 2)
        plt.hist(probs, bins=50, color='#88cc88', alpha=0.7)
        plt.title('预测概率分布', fontsize=14)
        plt.xlabel('胜利概率', fontsize=12)
        plt.ylabel('样本数量', fontsize=12)
        plt.axvline(0.5, color='r', linestyle='--', alpha=0.7)
        plt.text(0.52, plt.ylim()[1]*0.9, '决策边界', color='r')
    
    plt.tight_layout()
    plt.savefig('prediction_distribution.SVG', dpi=300)
    plt.close()
    print("预测分布图表已保存为 prediction_distribution.SVG")

In [13]:
# 主程序部分
TRAIN_PATH = 'train.csv'
TEST_PATH = 'test_template.csv'
OUTPUT_FILE = 'submission.csv'
SCALER_FILE = 'scaler.pkl' # 标准化数据

# 此处可以调整参数
BATCH_SIZE = 256
EPOCHS = 50
HIDDEN_LAYERS = [256, 128, 64]  # 隐藏层结构
DROPOUT_RATE = 0.4
LEARNING_RATE = 0.0005
TEST_SIZE = 0.2

In [14]:
# 训练
if os.path.exists(TRAIN_PATH):
    print("-"*50)
    print("开始训练阶段")
    print("-"*50)
    
    # 上传训练数据
    _, train_features, train_labels = load_data(TRAIN_PATH, is_train=True)
    
    # 数据预处理
    scaled_features, scaler = preprocess_features(train_features, fit_scaler=True)
    
    # 保存标准化数据
    joblib.dump(scaler, SCALER_FILE)
    
    # 创建数据集
    train_dataset, val_dataset = create_datasets(scaled_features, train_labels, test_size=TEST_SIZE)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    
    # 初始化模型
    input_size = scaled_features.shape[1]
    model = MLP(input_size, HIDDEN_LAYERS, DROPOUT_RATE)
    
    # 训练模型
    trained_model, history = train_model(
        model,
        train_loader,
        val_loader,
        epochs=EPOCHS,
        lr=LEARNING_RATE
    )
    
    # 绘制混淆矩阵
    try:
        y_true = np.load('val_targets.npy')
        y_pred = np.load('val_preds.npy')
        cm = plot_confusion_matrix(y_true, y_pred)
    except Exception as e:
        print(f"绘制混淆矩阵时出错: {e}")
else:
    print(f"训练文件 {TRAIN_PATH} 不存在")

--------------------------------------------------
开始训练阶段
--------------------------------------------------
训练集加载完成: 144000个样本, 30个特征
Epoch 1/50 | Train Loss: 0.4173 | Val Loss: 0.3572 | Val Acc: 0.8339 | Val F1: 0.8336
Epoch 2/50 | Train Loss: 0.3718 | Val Loss: 0.3514 | Val Acc: 0.8385 | Val F1: 0.8356
Epoch 3/50 | Train Loss: 0.3654 | Val Loss: 0.3479 | Val Acc: 0.8386 | Val F1: 0.8361
Epoch 4/50 | Train Loss: 0.3599 | Val Loss: 0.3458 | Val Acc: 0.8400 | Val F1: 0.8343
Epoch 5/50 | Train Loss: 0.3578 | Val Loss: 0.3443 | Val Acc: 0.8402 | Val F1: 0.8386
Epoch 6/50 | Train Loss: 0.3551 | Val Loss: 0.3430 | Val Acc: 0.8397 | Val F1: 0.8397
Epoch 7/50 | Train Loss: 0.3531 | Val Loss: 0.3409 | Val Acc: 0.8410 | Val F1: 0.8398
Epoch 8/50 | Train Loss: 0.3521 | Val Loss: 0.3414 | Val Acc: 0.8418 | Val F1: 0.8402
Epoch 9/50 | Train Loss: 0.3496 | Val Loss: 0.3395 | Val Acc: 0.8429 | Val F1: 0.8384
Epoch 10/50 | Train Loss: 0.3484 | Val Loss: 0.3381 | Val Acc: 0.8415 | Val F1: 0.8407
Epoc

In [15]:
# 预测
if os.path.exists(TEST_PATH):
    print("\n" + "-"*50)
    print("开始预测阶段")
    print("-"*50)
    
    # 上传测试数据
    test_ids, test_features = load_data(TEST_PATH, is_train=False)
    
    # 加载标准化数据
    if os.path.exists(SCALER_FILE):
        scaler = joblib.load(SCALER_FILE)
    else:
        print("标准化器文件不存在")
        scaler = StandardScaler()
        scaler.fit(test_features)
    
    # 预处理测试数据
    scaled_test_features = preprocess_features(test_features, scaler)
    
    # 初始化模型
    input_size = scaled_test_features.shape[1]
    model = MLP(input_size, HIDDEN_LAYERS, DROPOUT_RATE)
    
    # 加载最佳模型
    if os.path.exists('best_model.pth'):
        model.load_state_dict(torch.load('best_model.pth'))
        print("已加载最佳模型参数")
    else:
        print("未找到训练好的模型参数")
    
    # 进行预测
    predictions, probs = predict(model, scaled_test_features)
    
    # 保存结果
    result_df = save_predictions(test_ids, predictions, OUTPUT_FILE)
    
    # 预测可视化部分
    win_rate = np.mean(predictions) * 100
    print(f"预测结果分布: 胜率 {win_rate:.2f}% | 负率 {100-win_rate:.2f}%")
    
    plot_prediction_distribution(predictions, probs)


--------------------------------------------------
开始预测阶段
--------------------------------------------------
测试集加载完成: 36000个样本, 30个特征
已加载最佳模型参数
预测结果已保存至: submission.csv
预测结果分布: 胜率 49.86% | 负率 50.14%
预测分布图表已保存为 prediction_distribution.SVG
