In [1]:
import skopt
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Subset
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import random
from sklearn.base import BaseEstimator, ClassifierMixin
from skopt import BayesSearchCV
from skopt.space import Real, Integer, Categorical
from skopt.utils import dimensions_aslist
import warnings
warnings.filterwarnings('ignore')
dimensions_aslist.original = dimensions_aslist
def patched_dimensions_aslist(search_space):
    return [d if isinstance(d, Categorical) else d for d in search_space]
skopt.utils.dimensions_aslist = patched_dimensions_aslist
# Fix random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

In [2]:

# 1. Enhanced Data Loading and Preprocessing
def load_data():
    train_df = pd.read_csv('train_smote.csv')
    test_df = pd.read_csv('test_smote.csv')

    # Feature scaling (normalization)
    train_features = train_df.drop(columns=['failure mode']).values.astype(np.float32)
    test_features = test_df.drop(columns=['failure mode']).values.astype(np.float32)

    # Normalize using training set statistics
    feature_mean = train_features.mean(axis=0)
    feature_std = train_features.std(axis=0) + 1e-8  # Avoid division by zero
    train_features = (train_features - feature_mean) / feature_std
    test_features = (test_features - feature_mean) / feature_std

    # Labels (assuming classes start from 1)
    train_labels = (train_df['failure mode'].values - 1).astype(np.int64)
    test_labels = (test_df['failure mode'].values - 1).astype(np.int64)

    # ===== 新增数据验证 =====
    print("\n=== Data shape verification ===")
    print(f"Training data shape: {train_features.shape}")
    print(f"Training label shape: {train_labels.shape}")
    print(f"Test data shape: {test_features.shape}")
    print(f"Test label shape: {test_labels.shape}")
    print(f"Unique label value: {np.unique(train_labels)}")

    assert train_features.shape[1] == test_features.shape[1], "The training and testing feature dimensions are inconsistent!"
    assert len(np.unique(train_labels)) == len(np.unique(test_labels)), "The training and test label categories are inconsistent!"


    return (train_features, train_labels), (test_features, test_labels)

(train_features, train_labels), (test_features, test_labels) = load_data()

# 2. Improved Bayesian Neural Network Implementation

class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features, prior_sigma1=1.0, prior_sigma2=0.1, prior_pi=0.5):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        # Weight parameters (mean and rho)
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).normal_(0, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-5, -4))

        # Bias parameters (mean and rho)
        self.bias_mu = nn.Parameter(torch.Tensor(out_features).normal_(0, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features).uniform_(-5, -4))

        # Scale mixture prior parameters
        self.prior_sigma1 = prior_sigma1
        self.prior_sigma2 = prior_sigma2
        self.prior_pi = prior_pi

        # Initialize prior distributions
        self.weight_prior = self.scale_mixture_prior()
        self.bias_prior = self.scale_mixture_prior()

        # For KL divergence approximation
        self.kl = 0

    def scale_mixture_prior(self):
        mix = torch.distributions.Categorical(torch.tensor([self.prior_pi, 1.0 - self.prior_pi]))
        comp = torch.distributions.Normal(
            torch.tensor([0.0, 0.0]),
            torch.tensor([self.prior_sigma1, self.prior_sigma2])
        )
        return torch.distributions.MixtureSameFamily(mix, comp)

    def forward(self, x, sample=True):

        assert x.size(-1) == self.in_features, \
        f"输入特征维度{x.size(-1)}与层输入特征数{self.in_features}不匹配"

        if sample:
            weight_sigma = torch.log1p(torch.exp(self.weight_rho))
            bias_sigma = torch.log1p(torch.exp(self.bias_rho))

            assert self.weight_mu.shape == (self.out_features, self.in_features), \
            f"权重mu形状应为{(self.out_features, self.in_features)}，实际为{self.weight_mu.shape}"
            assert self.bias_mu.shape == (self.out_features,), \
            f"偏置mu形状应为{(self.out_features,)}, 实际为{self.bias_mu.shape}"

            eps_weight = torch.randn_like(self.weight_mu)
            eps_bias = torch.randn_like(self.bias_mu)

            weight = self.weight_mu + weight_sigma * eps_weight
            bias = self.bias_mu + bias_sigma * eps_bias

            self.kl = self.kl_divergence(weight, weight_sigma, bias, bias_sigma)
        else:
            weight = self.weight_mu
            bias = self.bias_mu

        output = F.linear(x, weight, bias)

    # 输出维度验证
        assert output.size(-1) == self.out_features, \
        f"输出特征维度{output.size(-1)}与层输出特征数{self.out_features}不匹配"

        return output

    def kl_divergence(self, weight, weight_sigma, bias, bias_sigma):
        # 权重后验分布
        weight_post = torch.distributions.Normal(self.weight_mu, weight_sigma)
        # 偏置后验分布
        bias_post = torch.distributions.Normal(self.bias_mu, bias_sigma)

        # 计算权重KL散度
        kl_weight = torch.sum(torch.distributions.kl.kl_divergence(
            weight_post,
            torch.distributions.Normal(0, self.prior_sigma1)
        ))
        # 计算偏置KL散度
        kl_bias = torch.sum(torch.distributions.kl.kl_divergence(
            bias_post,
            torch.distributions.Normal(0, self.prior_sigma1)
        ))
        return kl_weight + kl_bias


=== Data shape verification ===
Training data shape: (676, 69)
Training label shape: (676,)
Test data shape: (43, 69)
Test label shape: (43,)
Unique label value: [0 1 2 3]


In [3]:

class BayesianMLP(nn.Module):
    def __init__(
        self,
        input_dim=69,
        hidden_dims=(256, 128, 64),
        output_dim=4,
        dropout_prob=0.1,
        prior_sigma1=1.0,
        prior_sigma2=0.1,
        prior_pi=0.5
    ):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim

        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(
                BayesianLinear(
                    prev_dim,
                    hidden_dim,
                    prior_sigma1=prior_sigma1,
                    prior_sigma2=prior_sigma2,
                    prior_pi=prior_pi
                )
            )
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_prob))
            prev_dim = hidden_dim

        self.layers = nn.Sequential(*layers)
        self.output_layer = BayesianLinear(
            prev_dim,
            output_dim,
            prior_sigma1=prior_sigma1,
            prior_sigma2=prior_sigma2,
            prior_pi=prior_pi
        )
        self.kl = 0

    def forward(self, x, sample=True):
        self.kl = 0
        x = self.layers(x)
        x = self.output_layer(x, sample)
        self.kl += self.output_layer.kl
        return x

    def get_kl(self):
        return self.kl

class SklearnBayesianMLP(BaseEstimator, ClassifierMixin):
    def __init__(self, hidden_dims=(256, 128, 64), dropout_prob=0.1,
                 prior_sigma1=1.0, prior_sigma2=0.1, prior_pi=0.5,
                 lr=1e-3, epochs=50, batch_size=32, device='cuda'):
        # 增强类型转换逻辑
        if isinstance(hidden_dims, np.ndarray):
            self.hidden_dims = tuple(hidden_dims.astype(int).tolist())
        elif isinstance(hidden_dims, list):
            self.hidden_dims = tuple(hidden_dims)
        else:
            self.hidden_dims = hidden_dims

        # 确保所有数值参数为Python原生类型
        self.dropout_prob = float(dropout_prob)
        self.prior_sigma1 = float(prior_sigma1)
        self.prior_sigma2 = float(prior_sigma2)
        self.prior_pi = float(prior_pi)
        self.lr = float(lr)
        self.epochs = int(epochs)
        self.batch_size = int(batch_size)
        self.device = device
        self.model = None

    def fit(self, X, y):
        # Convert data to PyTorch tensors
        X_tensor = torch.tensor(X).float().to(self.device)
        y_tensor = torch.tensor(y).long().to(self.device)
        dataset = TensorDataset(X_tensor, y_tensor)
        loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

        # Initialize model
        self.model = BayesianMLP(
            input_dim=X.shape[1],
            hidden_dims=self.hidden_dims,
            dropout_prob=self.dropout_prob,
            prior_sigma1=self.prior_sigma1,
            prior_sigma2=self.prior_sigma2,
            prior_pi=self.prior_pi
        ).to(self.device)

        optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        criterion = nn.CrossEntropyLoss()

        # Training loop
        for epoch in range(self.epochs):
            self.model.train()
            for inputs, targets in loader:
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, targets) + 1e-3 * self.model.get_kl()
                loss.backward()
                optimizer.step()
        return self

    def predict(self, X):
        self.model.eval()
        with torch.no_grad():
            X_tensor = torch.tensor(X).float().to(self.device)
            outputs = self.model(X_tensor, sample=False)
            return torch.argmax(outputs, dim=1).cpu().numpy()

In [4]:
# 3. Implementation of meta-model components
class MetaController:
    """动态学习率调整的元控制器"""
    def __init__(self, init_lr=1e-3, decay_factor=0.5, patience=3):
        self.lr = init_lr
        self.decay_factor = decay_factor
        self.patience = patience
        self.best_loss = float('inf')
        self.wait = 0

    def update(self, val_loss):
        """根据验证损失调整学习率"""
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.lr *= self.decay_factor
                self.wait = 0
                print(f"MetaController: 学习率衰减至 {self.lr:.2e}")

In [5]:
class BayesianEnsemble:
    """BNN integration model based on uncertainty"""
    def __init__(self, n_models=5):
        self.n_models = n_models
        self.models = []

    def fit(self, train_features, train_labels, param_space, device='cuda'):
        opt = BayesSearchCV(
            estimator=SklearnBayesianMLP(device=device),  # Use wrapped model
            search_spaces=param_space,
            n_iter=10,
            cv=3,
            n_jobs=-1 if device == 'cuda' else 1
        )
        opt.fit(train_features, train_labels)

        # Train ensemble with best params
        for _ in range(self.n_models):
            model = SklearnBayesianMLP(**opt.best_params_)
            model.fit(train_features, train_labels)
            self.models.append(model)

    def predict(self, X, num_samples=100):
        """加权集成预测（权重=1/熵）"""
        all_probs = []
        for model in self.models:
            model.eval()
            probs = []
            for _ in range(num_samples):
                with torch.no_grad():
                    outputs = model(X, sample=True)
                    prob = F.softmax(outputs, dim=1)
                    probs.append(prob.cpu().numpy())
            mean_probs = np.mean(probs, axis=0)
            entropy = -np.sum(mean_probs * np.log(mean_probs + 1e-8), axis=1)
            weights = 1 / (entropy + 1e-8)
            all_probs.append(mean_probs * weights[:, None])

        final_probs = np.mean(all_probs, axis=0)
        return np.argmax(final_probs, axis=1)

def evaluate(model, dataloader, criterion=nn.CrossEntropyLoss(), scaling_factor=1e-3, num_samples=30, device='cuda'):
    model.eval()
    total_loss = 0.0
    total_samples = 0

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            batch_size = inputs.size(0)

            # 多次采样计算平均损失
            batch_loss = 0.0
            for _ in range(num_samples):
                outputs = model(inputs, sample=True)
                loss = criterion(outputs, targets)
                kl = model.get_kl()
                batch_loss += (loss + scaling_factor * kl).item()

            total_loss += (batch_loss / num_samples) * batch_size
            total_samples += batch_size

    return total_loss / total_samples

In [6]:
# %% 5. 完整训练流程
def main():
    # === 1. 数据加载与设备检测 ===
    (train_features, train_labels), (test_features, test_labels) = load_data()

    # 自动检测设备并确保与Tensor设备一致
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"使用设备: {device}")

    # === 2. 定义超参数搜索空间 ===
    param_space = {
        'hidden_dims': Categorical([(256, 128), (128, 64, 32), (512, 256)]),
        'dropout_prob': Real(0.1, 0.5, prior='uniform'),
        'prior_sigma1': Real(0.5, 2.0),
        'prior_sigma2': Real(0.1, 0.5),
        'lr': Real(1e-4, 1e-2, prior='log-uniform'),
        'batch_size': Integer(16, 64)
    }

    # === 3. 训练贝叶斯集成模型 ===
    print("\n=== 开始贝叶斯优化与集成训练 ===")
    ensemble = BayesianEnsemble(n_models=3)

    try:
        ensemble.fit(
            train_features=train_features,
            train_labels=train_labels,
            param_space=param_space,
            device=device
        )
    except Exception as e:
        print(f"集成训练失败: {str(e)}")
        return
    # 添加参数类型验证
    print("\n=== 最佳参数类型验证 ===")
    sample_model = ensemble.models[0]
    params = {
        'hidden_dims': sample_model.hidden_dims,
        'dropout_prob': sample_model.dropout_prob,
        'prior_sigma1': sample_model.prior_sigma1,
        'prior_sigma2': sample_model.prior_sigma2,
        'lr': sample_model.lr,
        'batch_size': sample_model.batch_size
    }
    for k, v in params.items():
        print(f"{k}: {type(v)} -> {v}")
    # === 4. 训练单个最佳模型（带动态学习率调整）===
    print("\n=== 训练最佳模型（带动态学习率）===")

    # 4.1 初始化模型与元控制器
    try:
        # 从集成模型中获取最佳参数（需确保参数类型转换）
        best_params = ensemble.models[0].get_params()
        best_params['device'] = device  # 确保设备参数正确传递

        best_model = BayesianMLP(
            input_dim=train_features.shape[1],
            hidden_dims=tuple(best_params['hidden_dims']),  # 显式转换为tuple
            dropout_prob=float(best_params['dropout_prob']),
            prior_sigma1=float(best_params['prior_sigma1']),
            prior_sigma2=float(best_params['prior_sigma2']),
        ).to(device)

    except KeyError as e:
        print(f"参数错误: 缺少必要参数 {str(e)}")
        return

    meta_controller = MetaController(init_lr=1e-3, decay_factor=0.5, patience=3)
    optimizer = optim.Adam(best_model.parameters(), lr=meta_controller.lr)
    criterion = nn.CrossEntropyLoss()
    scaling_factor = 1e-3  # 需与集成训练时一致

    # 4.2 创建数据加载器（使用完整训练集）
    try:
        train_dataset = TensorDataset(
            torch.tensor(train_features).float().to(device),
            torch.tensor(train_labels).long().to(device)
        )
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    except RuntimeError as e:
        print(f"数据加载失败: {str(e)}")
        return

    # 4.3 划分验证集（10%训练集）
    kf = KFold(n_splits=10, shuffle=True, random_state=42)
    train_idx, val_idx = next(kf.split(train_features))

    val_dataset = TensorDataset(
        torch.tensor(train_features[val_idx]).float().to(device),
        torch.tensor(train_labels[val_idx]).long().to(device)
    )
    val_loader = DataLoader(val_dataset, batch_size=32)

    # 4.4 训练循环
    best_val_loss = float('inf')
    for epoch in range(100):
        # 训练步骤
        best_model.train()
        total_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = best_model(inputs)
            loss = criterion(outputs, targets) + scaling_factor * best_model.get_kl()
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * inputs.size(0)

        train_loss = total_loss / len(train_loader.dataset)

        # 验证步骤
        val_loss = evaluate(
            model=best_model,
            dataloader=val_loader,
            criterion=criterion,
            scaling_factor=scaling_factor,
            device=device
        )

        # 动态学习率调整
        meta_controller.update(val_loss)
        optimizer.param_groups[0]['lr'] = meta_controller.lr

        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(best_model.state_dict(), "best_model.pth")

        print(f"Epoch {epoch+1:03d} | "
              f"Train Loss: {train_loss:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"LR: {meta_controller.lr:.2e}")

    # === 5. 最终评估 ===
    print("\n=== 最终评估 ===")

    # 5.1 加载最佳模型
    try:
        best_model.load_state_dict(torch.load("best_model.pth"))
    except FileNotFoundError:
        print("找不到模型文件")
        return

    # 5.2 评估单一模型
    test_tensor = torch.tensor(test_features).float().to(device)
    with torch.no_grad():
        outputs = best_model(test_tensor, sample=False)
    single_preds = torch.argmax(outputs, dim=1).cpu().numpy()

    print(f"单一模型测试准确率: {accuracy_score(test_labels, single_preds):.4f}")
    print(f"单一模型F1分数: {f1_score(test_labels, single_preds, average='macro'):.4f}")

    # 5.3 评估集成模型
    try:
        ensemble_preds = ensemble.predict(test_tensor)
        print(f"\n集成模型测试准确率: {accuracy_score(test_labels, ensemble_preds):.4f}")
        print(f"集成模型F1分数: {f1_score(test_labels, ensemble_preds, average='macro'):.4f}")
    except Exception as e:
        print(f"集成模型评估失败: {str(e)}")

if __name__ == "__main__":
    main()



=== Data shape verification ===
Training data shape: (676, 69)
Training label shape: (676,)
Test data shape: (43, 69)
Test label shape: (43,)
Unique label value: [0 1 2 3]
使用设备: cpu

=== 开始贝叶斯优化与集成训练 ===
集成训练失败: can only convert an array of size 1 to a Python scalar
