修正后的基于LSTM的堆叠模型（二分类）

In [1]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, classification_report
from sklearn.ensemble import RandomForestClassifier

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import warnings
warnings.filterwarnings("ignore")

# === 加载数据 ===
dataTrain = pd.read_csv("allAtt_onehot_large_train_new4.csv")
dataTest = pd.read_csv("allAtt_onehot_large_test_new4.csv")

x_train, y_train = dataTrain.iloc[:, 1:30].values, dataTrain.iloc[:, 30:].values
x_test, y_test = dataTest.iloc[:, 1:30].values, dataTest.iloc[:, 30:].values

y_train_int = np.argmax(y_train, axis=1)
y_test_int = np.argmax(y_test, axis=1)

# === 定义基模型 ===
xgb = XGBClassifier(objective="multi:softprob", num_class=2, eval_metric="mlogloss", use_label_encoder=False)
lgb = LGBMClassifier(objective='multiclass', num_class=2)
cat = CatBoostClassifier(iterations=300, learning_rate=0.1, depth=6, loss_function='MultiClass', verbose=0)
rf = RandomForestClassifier(n_estimators=100, random_state=42)

# === 模型训练 ===
xgb.fit(x_train, y_train_int)
lgb.fit(x_train, y_train_int)
cat.fit(x_train, y_train_int)
rf.fit(x_train, y_train_int)

# === 获取各模型预测概率 ===
def get_stacked_proba(model, X):
    return model.predict_proba(X)

train_probs = np.concatenate([
    get_stacked_proba(xgb, x_train),
    get_stacked_proba(lgb, x_train),
    get_stacked_proba(cat, x_train),
    get_stacked_proba(rf, x_train)
], axis=1)  # shape: [N, 8] (4个模型 × 2个类别)

test_probs = np.concatenate([
    get_stacked_proba(xgb, x_test),
    get_stacked_proba(lgb, x_test),
    get_stacked_proba(cat, x_test),
    get_stacked_proba(rf, x_test)
], axis=1)  # shape: [N, 8]

# === 构建 PyTorch MLP 模型 ===
class MLPStack(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=2, dropout_rate=0.3):
        super(MLPStack, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.softmax(x)
        return x

# 转换为PyTorch tensors
train_probs_tensor = torch.FloatTensor(train_probs)
test_probs_tensor = torch.FloatTensor(test_probs)
y_train_tensor = torch.LongTensor(y_train_int)
y_test_tensor = torch.LongTensor(y_test_int)

# 创建数据加载器
train_dataset = TensorDataset(train_probs_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 初始化模型
input_dim = train_probs.shape[1]  # 堆叠的所有预测概率
hidden_dim = 64
output_dim = 2  # 修正为二分类
model = MLPStack(input_dim, hidden_dim, output_dim)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# === 训练 MLP ===
epochs = 30
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(epochs):
    model.train()
    total_loss = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}')

# === 预测 + 评估 ===
model.eval()
with torch.no_grad():
    test_probs_tensor = test_probs_tensor.to(device)
    outputs = model(test_probs_tensor)
    _, predicted = torch.max(outputs, 1)
    
    predicted = predicted.cpu().numpy()
    
acc = accuracy_score(y_test_int, predicted)
print(f"\n✅ Final MLP Stacking Accuracy: {acc:.4f}")
print("📊 Classification Report:")
print(classification_report(y_test_int, predicted))

FileNotFoundError: [Errno 2] No such file or directory: 'allAtt_onehot_large_train_new4.csv'

修正后的纯PyTorch堆叠模型（二分类）

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# === 加载数据 ===
dataTrain = pd.read_csv("allAtt_onehot_large_train_new7.csv")
dataTrain = dataTrain.drop(['Hist_H_Prob','Hist_D_Prob','Hist_A_Prob'], axis=1)
dataTest = pd.read_csv("allAtt_onehot_large_test_new7.csv")
dataTest = dataTest.drop(['Hist_H_Prob','Hist_D_Prob','Hist_A_Prob'], axis=1)

x_train, y_train = dataTrain.iloc[:, 4:38].values, dataTrain.iloc[:, 38:].values
x_train_reshaped = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
y_train_int = np.argmax(y_train, axis=1)

x_test, y_test = dataTest.iloc[:, 4:38].values, dataTest.iloc[:, 38:].values 
x_test_reshaped = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)
y_test_int = np.argmax(y_test, axis=1)

# === PyTorch 堆叠模型 ===
class StackingModel(nn.Module):
    def __init__(self, input_size, lstm_hidden_size, fc_hidden_size, num_classes=2):
        super(StackingModel, self).__init__()
        # LSTM部分
        self.lstm = nn.LSTM(input_size, lstm_hidden_size, batch_first=True)
        self.lstm_bn = nn.BatchNorm1d(lstm_hidden_size)
        
        # 全连接部分 - 将LSTM输出和原始特征结合
        self.fc1 = nn.Linear(lstm_hidden_size, fc_hidden_size)
        self.fc_bn = nn.BatchNorm1d(fc_hidden_size)
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(fc_hidden_size, num_classes)
        
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        # LSTM处理
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]  # 取最后一个时间步的输出
        lstm_out = self.lstm_bn(lstm_out)
        
        # 全连接层处理
        x = self.relu(self.fc1(lstm_out))
        x = self.fc_bn(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.softmax(x)
        
        return x

# 转换为PyTorch tensors
x_train_tensor = torch.FloatTensor(x_train_reshaped)
y_train_tensor = torch.LongTensor(y_train_int)
x_test_tensor = torch.FloatTensor(x_test_reshaped)

# 创建数据加载器
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 初始化模型
input_size = 1  # 每个时间步的特征维度
lstm_hidden_size = 64
fc_hidden_size = 32
num_classes = 2  # 修正为二分类
model = StackingModel(input_size, lstm_hidden_size, fc_hidden_size, num_classes)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# === 训练模型 ===
epochs = 30
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 添加早停机制
best_val_loss = float('inf')
patience = 5
counter = 0

for epoch in range(epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # 计算准确率
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}, ' 
          f'Accuracy: {correct/total:.4f}')
    
    # 验证
    model.eval()
    with torch.no_grad():
        val_inputs = x_train_tensor[-380:].to(device)  # 使用最后380个样本作为验证集
        val_labels = y_train_tensor[-380:].to(device)
        val_outputs = model(val_inputs)
        val_loss = criterion(val_outputs, val_labels)
        
        # 早停判断
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            counter = 0
            torch.save(model.state_dict(), 'best_stacking_model.pt')
        else:
            counter += 1
            if counter >= patience:
                print(f'Early stopping at epoch {epoch+1}')
                break

# 加载最佳模型
model.load_state_dict(torch.load('best_stacking_model.pt'))

# === 模型评估 ===
model.eval()
with torch.no_grad():
    x_test_tensor = x_test_tensor.to(device)
    outputs = model(x_test_tensor)
    _, predicted = torch.max(outputs, 1)
    
    predicted = predicted.cpu().numpy()
    
acc = accuracy_score(y_test_int, predicted)
print(f"\n✅ Final Stacking Model Accuracy: {acc:.4f}")
print("📊 Classification Report:")
print(classification_report(y_test_int, predicted))

# 混淆矩阵
conf_matrix = confusion_matrix(y_test_int, predicted)
print("\nConfusion Matrix:")
print(conf_matrix)

# 可视化混淆矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", 
            xticklabels=[0, 1], yticklabels=[0, 1])  # 修正为二分类标签
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

修正后的LSTM-CatBoost组合模型（二分类）

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from catboost import CatBoostClassifier

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# === 加载数据 ===
dataTrain = pd.read_csv("allAtt_onehot_large_train_new7.csv")
dataTrain = dataTrain.drop(['Hist_H_Prob','Hist_D_Prob','Hist_A_Prob'], axis=1)
dataTest = pd.read_csv("allAtt_onehot_large_test_new7.csv")
dataTest = dataTest.drop(['Hist_H_Prob','Hist_D_Prob','Hist_A_Prob'], axis=1)

x_train, y_train = dataTrain.iloc[:, 4:38].values, dataTrain.iloc[:, 38:].values
x_train_reshaped = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
y_train_int = np.argmax(y_train, axis=1)

x_test, y_test = dataTest.iloc[:, 4:38].values, dataTest.iloc[:, 38:].values
x_test_reshaped = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)
y_test_int = np.argmax(y_test, axis=1)

# === 构建 PyTorch LSTM 模型 ===
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.bn = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]  # 取最后一个时间步的输出
        bn_out = self.bn(lstm_out)
        fc_out = self.fc(bn_out)
        out = self.softmax(fc_out)
        return out

# 转换为PyTorch tensors
x_train_tensor = torch.FloatTensor(x_train_reshaped)
y_train_tensor = torch.LongTensor(y_train_int)
x_test_tensor = torch.FloatTensor(x_test_reshaped)

# 创建数据加载器
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 初始化模型
input_size = 1  # 每个时间步的特征维度
hidden_size = 64
num_classes = 2  # 修正为二分类
model = LSTMModel(input_size, hidden_size, num_classes)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# === 训练 LSTM ===
epochs = 20
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # 计算准确率
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}, ' 
          f'Accuracy: {correct/total:.4f}')

# === 使用LSTM生成特征 ===
model.eval()
with torch.no_grad():
    x_train_tensor = x_train_tensor.to(device)
    lstm_train_outputs = model(x_train_tensor)
    
    x_test_tensor = x_test_tensor.to(device)
    lstm_test_outputs = model(x_test_tensor)
    
    # 获取预测的类别
    _, lstm_train_pred = torch.max(lstm_train_outputs, 1)
    _, lstm_test_pred = torch.max(lstm_test_outputs, 1)
    
    # 转换为numpy数组
    lstm_train_outputs = lstm_train_outputs.cpu().numpy()
    lstm_test_outputs = lstm_test_outputs.cpu().numpy()
    lstm_train_pred = lstm_train_pred.cpu().numpy()
    lstm_test_pred = lstm_test_pred.cpu().numpy()

# === 训练CatBoost模型 ===
# 使用LSTM输出的概率分布作为特征
catboost_model = CatBoostClassifier(
    iterations=1000,
    depth=10,
    learning_rate=0.05,
    loss_function='MultiClass',  # 对于二分类问题，CatBoost也使用这个损失函数
    verbose=0
)

catboost_model.fit(lstm_train_outputs, y_train_int)

# 用CatBoost进行预测
catboost_pred = catboost_model.predict(lstm_test_outputs)

# 打印准确率和分类报告
acc = accuracy_score(y_test_int, catboost_pred)
print(f"Test Accuracy: {acc * 100:.2f}%")

print("\nClassification Report:")
print(classification_report(y_test_int, catboost_pred))

# 混淆矩阵
conf_matrix = confusion_matrix(y_test_int, catboost_pred)
print("\nConfusion Matrix:")
print(conf_matrix)

# 可视化混淆矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", 
            xticklabels=[0, 1], yticklabels=[0, 1])  # 修正为二分类标签
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()