In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np


In [8]:
#数据预处理
import os
import re
import torch
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import nltk

nltk.download('stopwords')

def clean_email_text(text):
    """
    清洗单封邮件文本
    """
    # 1. 转换为小写
    text = text.lower()
    # 2. 去除邮件头 (Subject, From, To 等关键词)
    text = re.sub(r'(subject:|to:|cc:|from:)', ' ', text)
    # 3. 去除转发信息和特殊分割线
    text = re.sub(r'(-+|_+|\|)', ' ', text)
    # 4. 去除数字和非字母字符
    text = re.sub(r'[^a-z\s]', ' ', text)
    # 5. 去除多余空格
    text = re.sub(r'\s+', ' ', text).strip()
    
    # 6. 分词、去停用词、词干提取
    stop_words = set(stopwords.words('english'))
    ps = PorterStemmer()
    words = text.split()
    cleaned_words = [ps.stem(w) for w in words if w not in stop_words]
    
    return " ".join(cleaned_words)

def load_data(legitimate_path, spam_path):
    """
    读取文件夹中的所有txt文件
    """
    emails = []
    labels = []
    
    # 读取正常邮件 (Label 0)
    for filename in os.listdir(legitimate_path):
        with open(os.path.join(legitimate_path, filename), 'r', encoding='latin-1') as f:
            emails.append(clean_email_text(f.read()))
            labels.append(0)
            
    # 读取垃圾邮件 (Label 1)
    for filename in os.listdir(spam_path):
        with open(os.path.join(spam_path, filename), 'r', encoding='latin-1') as f:
            emails.append(clean_email_text(f.read()))
            labels.append(1)
            
    return emails, labels


legit_dir = 'enron1/ham'
spam_dir = 'enron1/spam'

print("正在读取并清洗数据...")
texts, y = load_data(legit_dir, spam_dir)

# 4. TF-IDF 特征提取 (限制为 5000 维)
print("正在生成 TF-IDF 特征向量...")
tfidf = TfidfVectorizer(max_features=5000)
X = tfidf.fit_transform(texts).toarray()

# 5. 转换为 PyTorch 张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.LongTensor(y)

# 6. 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X_tensor, y_tensor, test_size=0.2, random_state=42, stratify=y_tensor
)

print(f"预处理完成！")
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")

[nltk_data] Downloading package stopwords to /Users/a1012/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


正在读取并清洗数据...
正在生成 TF-IDF 特征向量...
预处理完成！
训练集大小: torch.Size([4137, 5000])
测试集大小: torch.Size([1035, 5000])


In [9]:
#定义变分线性层 (Variational Linear Layer)
class VariationalLinear(nn.Module):
    def __init__(self, in_features, out_features, prior_sigma=1.0):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        
        # 权重均值 (mu) 和偏置均值
        self.w_mu = nn.Parameter(torch.Tensor(out_features, in_features).normal_(0, 0.1))
        self.b_mu = nn.Parameter(torch.Tensor(out_features).normal_(0, 0.1))
        
        # 权重标准差的对数 (rho)，使用 rho 是为了确保 sigma = log(1 + exp(rho)) 始终为正
        self.w_rho = nn.Parameter(torch.Tensor(out_features, in_features).fill_(-3.0))
        self.b_rho = nn.Parameter(torch.Tensor(out_features).fill_(-3.0))
        
        # 先验分布的标准差
        self.prior_sigma = prior_sigma

    def forward(self, x):
        # 1. 重参数化技巧: w = mu + sigma * epsilon
        w_sigma = torch.log(1 + torch.exp(self.w_rho))
        b_sigma = torch.log(1 + torch.exp(self.b_rho))
        
        epsilon_w = torch.randn_like(w_sigma)
        epsilon_b = torch.randn_like(b_sigma)
        
        w = self.w_mu + w_sigma * epsilon_w
        b = self.b_mu + b_sigma * epsilon_b
        
        # 2. 计算 KL 散度 (假设先验为标准正态分布 N(0, prior_sigma^2))
        # 这里的 KL 散度简化为解析形式
        kl_w = torch.log(self.prior_sigma / w_sigma) + (w_sigma**2 + self.w_mu**2) / (2 * self.prior_sigma**2) - 0.5
        kl_b = torch.log(self.prior_sigma / b_sigma) + (b_sigma**2 + self.b_mu**2) / (2 * self.prior_sigma**2) - 0.5
        self.kl_loss = kl_w.sum() + kl_b.sum()
        
        return F.linear(x, w, b)


In [10]:
#构建变分贝叶斯神经网络 (BNN)
class BayesianSpamClassifier(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.layer1 = VariationalLinear(input_dim, 256)
        self.layer2 = VariationalLinear(256, 2) # 二分类：Spam vs Legitimate

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = self.layer2(x)
        return x

    def get_kl_loss(self):
        return self.layer1.kl_loss + self.layer2.kl_loss

In [11]:
def predict_with_uncertainty(model, x, num_samples=10):
    model.eval()
    results = []
    with torch.no_grad():
        for _ in range(num_samples):
            # 这里的 model(x) 会触发 VariationalLinear 的随机采样
            outputs = model(x)
            results.append(F.softmax(outputs, dim=1).cpu().numpy())
    
    results = np.array(results)  # [Samples, Batch, Classes]
    mean_pred = results.mean(axis=0)
    # 计算类别概率的方差，作为模型不确定性的度量
    uncertainty = results.var(axis=0).sum(axis=1) 
    return mean_pred, uncertainty

In [12]:
#训练逻辑与 ELBO 损失
def train_step(model, optimizer, data, target, num_samples=5):
    optimizer.zero_grad()
    
    # 变分推断通过多次采样来逼近期望
    total_nll = 0
    for _ in range(num_samples):
        output = model(data)
        # 考虑类别不平衡：给 Spam 类 (假设为1) 更高的权重
        weight = torch.tensor([1.0, 2.45]).to(data.device) 
        total_nll += F.cross_entropy(output, target, weight=weight)
    
    avg_nll = total_nll / num_samples
    kl_loss = model.get_kl_loss() / 5975 # 按照总样本数归一化
    
    # ELBO = - (NLL + KL) -> 最小化负 ELBO
    loss = avg_nll + kl_loss
    loss.backward()
    optimizer.step()
    return loss.item()

In [None]:
if __name__ == "__main__":
    # 修改数据集划分比例 
    X_train, X_test, y_train, y_test = train_test_split(
        X_tensor, y_tensor, test_size=0.3, random_state=42, stratify=y_tensor
    )
    
    #  初始化模型与优化器
    input_dim = 5000
    model = BayesianSpamClassifier(input_dim=input_dim)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # 准备 DataLoader
    from torch.utils.data import DataLoader, TensorDataset
    train_dataset = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    # 训练模型
    print(f"开始训练步骤... (训练样本数: {len(X_train)})")
    model.train()
    for epoch in range(1, 21): 
        epoch_loss = 0
        for batch_x, batch_y in train_loader:
            loss = train_step(model, optimizer, batch_x, batch_y)
            epoch_loss += loss
        print(f"Epoch {epoch}/20, Average Loss: {epoch_loss/len(train_loader):.4f}")

    # 
    # 测试集评估 (这是新增的核心部分)
    print(f"\n开始在测试集上评估... (测试样本数: {len(X_test)})")
    
    def evaluate_test_set(model, X_test, y_test, num_monte_carlo=10):
        model.eval()
        all_probs = []
        with torch.no_grad():
            # 贝叶斯预测：通过多次采样取平均值来降低方差
            for _ in range(num_monte_carlo):
                outputs = model(X_test)
                probs = F.softmax(outputs, dim=1)
                all_probs.append(probs.numpy())
        
        # 计算平均概率
        mean_probs = np.mean(all_probs, axis=0)
        y_pred = np.argmax(mean_probs, axis=1)
        
        # 计算准确率
        correct = (y_pred == y_test.numpy()).sum()
        accuracy = correct / len(y_test)
        
        return y_pred, accuracy

    # 执行评估
    y_pred, test_acc = evaluate_test_set(model, X_test, y_test)
    
    from sklearn.metrics import classification_report, confusion_matrix
    print("\n" + "="*30)
    print(f"测试集准确率: {test_acc:.2%}")
    print("详细分类报告:")
    print(classification_report(y_test, y_pred, target_names=['Legitimate', 'Spam']))
    print("混淆矩阵:")
    print(confusion_matrix(y_test, y_pred))
    print("="*30)

 

开始训练步骤... (训练样本数: 3620)
Epoch 1/20, Average Loss: 536.8036
Epoch 2/20, Average Loss: 524.3176
Epoch 3/20, Average Loss: 512.2341
Epoch 4/20, Average Loss: 500.2995
Epoch 5/20, Average Loss: 488.4512
Epoch 6/20, Average Loss: 476.6684
Epoch 7/20, Average Loss: 464.9416
Epoch 8/20, Average Loss: 453.2673
Epoch 9/20, Average Loss: 441.6449
Epoch 10/20, Average Loss: 430.0746
Epoch 11/20, Average Loss: 418.5578
Epoch 12/20, Average Loss: 407.0972
Epoch 13/20, Average Loss: 395.6943
Epoch 14/20, Average Loss: 384.3550
Epoch 15/20, Average Loss: 373.0793
Epoch 16/20, Average Loss: 361.8732
Epoch 17/20, Average Loss: 350.7396
Epoch 18/20, Average Loss: 339.6835
Epoch 19/20, Average Loss: 328.7098
Epoch 20/20, Average Loss: 317.8231

开始在测试集上评估... (测试样本数: 1552)

测试集准确率: 98.32%
详细分类报告:
              precision    recall  f1-score   support

  Legitimate       0.99      0.98      0.99      1102
        Spam       0.96      0.98      0.97       450

    accuracy                           0.98      