数据

In [9]:
%matplotlib inline
import json
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
import os  
from models.dnf import DNF, DeltaDelayedExponentialDecayScheduler
import numpy as np
class WeiboDataset(Dataset):
    def __init__(self, folder_path, selected_features=None):  # 新增selected_features参数
        # 设置默认特征（如果未指定则使用P1-P8）
        self.selected_features = selected_features if selected_features else [f"P{i}" for i in range(1, 9)]
        records = []
        for filename in os.listdir(folder_path):
            if filename.endswith('.json'):
                file_path = os.path.join(folder_path, filename)
                # 修改：添加 encoding="utf-8"
                with open(file_path, 'r', encoding="utf-8") as f:
                    data = json.load(f)
                    records.append(data)
        self.features = []
        self.labels = []
        
        for item in records:
            try:
                # 显式按顺序提取P1-P8并处理空值
                p_values = []
                for key in self.selected_features:  # 改为遍历选中特征
                    val = item['general'].get(key, 0.0)  # 获取值，若缺失则默认为0.0
                    # 处理空字符串
                    if isinstance(val, str):
                        if val.strip() == "":
                            val = 0.0  # 空字符串替换为0.0
                        else:
                            val = float(val)  # 尝试转换非空字符串
                    p_values.append(float(val))
                
                self.features.append(p_values)
                
                # 标签处理（兼容多种格式）
                label_val = item.get('label', 0)
                label = 1 if str(label_val).strip() in ("1", "true", "True") else 0
                self.labels.append(label)
                
            except (KeyError, ValueError) as e:
                print(f"文件 {item.get('id', '未知')} 数据异常: {str(e)}")
                continue  # 跳过无效样本
        self.num_features = len(self.selected_features)
        # 新增诊断信息
        print(f"\n数据集诊断信息:")
        print(f"总样本数: {len(self.labels)}")
        print(f"正样本比例: {sum(self.labels)/len(self.labels):.2%}")
        print(f"特征维度: {len(self.features[0]) if self.features else 0}")
        print(f"示例特征: {self.features[0] if self.features else []}")
        print(f"对应标签: {self.labels[0] if self.labels else []}")
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        return torch.FloatTensor(self.features[idx]), torch.FloatTensor([self.labels[idx]])
selected_features = ["P1", "P2", "P3", "P4", "P5", "P6", "P7","P8"]  # 可自由组合
dataset = WeiboDataset("./data/processed/weibo_21_well",selected_features=selected_features)  # 加载数据集
print(f"成功加载 {len(dataset)} 个样本")  # 先确认是否加载到数据
print(dataset[0])


数据集诊断信息:
总样本数: 7283
正样本比例: 46.24%
特征维度: 8
示例特征: [0.7, 0.5, 0.9, 0.8, 0.5, 1.0, 0.8, 1.0]
对应标签: 0
成功加载 7283 个样本
(tensor([0.7000, 0.5000, 0.9000, 0.8000, 0.5000, 1.0000, 0.8000, 1.0000]), tensor([0.]))


In [10]:
class DNFClassifier(nn.Module):
    def __init__(self, num_preds, num_conjuncts, n_out, delta=0.01, weight_init_type="normal"):
        super(DNFClassifier, self).__init__()
        self.dnf = DNF(num_preds, num_conjuncts, n_out, delta, weight_init_type=weight_init_type)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        return self.sigmoid(self.dnf(x))

In [11]:
# 参数配置
num_preds = dataset.num_features       # P1-P8特征维度
num_conjuncts = 10  # 合取项数量（可调整）
n_out = 1           # 二分类输出

model = DNFClassifier(num_preds, num_conjuncts, n_out)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Delta调度器（关键参数需根据实验调整）
delta_scheduler = DeltaDelayedExponentialDecayScheduler(
    initial_delta=0.01,
    delta_decay_delay=100,
    delta_decay_steps=50,
    delta_decay_rate=0.1
)

# ------------------------------------------------------------------
# 数据加载及划分训练集和验证集
train_size = int(0.8 * len(dataset))      # 80%作为训练集
val_size = len(dataset) - train_size      # 剩余作为验证集
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# ------------------------------------------------------------------


TRAIN

In [12]:
import matplotlib.pyplot as plt

# 训练参数
num_epochs = 50
patience = 5  # 早停容忍的 epoch 数量
best_val_loss = float("inf")
trigger_times = 0

train_loss_list = []
val_loss_list = []
accuracy_list = []

for epoch in range(num_epochs):
    # --------------------- 训练阶段 ---------------------
    model.train()
    running_train_loss = 0
    for step, (inputs, labels) in enumerate(train_loader):
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 更新 delta 值（假设 delta_scheduler 及 model.dnf 已定义）
        current_step = epoch * len(train_loader) + step
        delta_scheduler.step(model.dnf, current_step)
        
        running_train_loss += loss.item() * inputs.size(0)
    
    avg_train_loss = running_train_loss / len(train_loader.dataset)
    train_loss_list.append(avg_train_loss)
    
    # --------------------- 验证阶段 ---------------------
    model.eval()
    total_loss = 0
    total_samples = 0
    corrects = 0  # 累计正确预测数量
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)
            total_samples += inputs.size(0)
            
            # 二分类预测（阈值0.5）
            predicted = (outputs > 0.5).float()
            corrects += (predicted == labels).sum().item()
            
    avg_val_loss = total_loss / total_samples
    accuracy = corrects / total_samples
    val_loss_list.append(avg_val_loss)
    accuracy_list.append(accuracy)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, " 
          f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}")
    
    # --------------------- 早停策略 ---------------------
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        trigger_times = 0
        # 可以在这里保存模型最佳权重
        best_model_state = model.state_dict()
    else:
        trigger_times += 1
        print(f"EarlyStopping Trigger Times: {trigger_times}")
        if trigger_times >= patience:
            print("Early stopping!\n")
            break



# 保存最佳模型
torch.save(best_model_state, "./models/best_model.pth")

Epoch 1/50, Training Loss: 0.6824, Validation Loss: 0.6609, Accuracy: 0.5724
Epoch 2/50, Training Loss: 0.6487, Validation Loss: 0.6173, Accuracy: 0.6973
Epoch 3/50, Training Loss: 0.6043, Validation Loss: 0.5800, Accuracy: 0.7502
Epoch 4/50, Training Loss: 0.5724, Validation Loss: 0.5567, Accuracy: 0.7467
Epoch 5/50, Training Loss: 0.5586, Validation Loss: 0.5528, Accuracy: 0.7495
Epoch 6/50, Training Loss: 0.5546, Validation Loss: 0.5489, Accuracy: 0.7481
Epoch 7/50, Training Loss: 0.5511, Validation Loss: 0.5468, Accuracy: 0.7461
Epoch 8/50, Training Loss: 0.5498, Validation Loss: 0.5448, Accuracy: 0.7433
Epoch 9/50, Training Loss: 0.5495, Validation Loss: 0.5448, Accuracy: 0.7461
EarlyStopping Trigger Times: 1
Epoch 10/50, Training Loss: 0.5489, Validation Loss: 0.5440, Accuracy: 0.7440
Epoch 11/50, Training Loss: 0.5485, Validation Loss: 0.5443, Accuracy: 0.7461
EarlyStopping Trigger Times: 1
Epoch 12/50, Training Loss: 0.5478, Validation Loss: 0.5438, Accuracy: 0.7447
Epoch 13/50

eval

In [13]:
from sklearn.metrics import accuracy_score, f1_score, classification_report

# 加载保存的最佳模型状态
model.load_state_dict(torch.load("./models/best_model.pth"))
model.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in val_loader:
        outputs = model(inputs)
        # 二分类预测（阈值 0.5）
        preds = (outputs > 0.5).float()
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

accuracy = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='binary')  # 二分类情况

print("Validation Accuracy: {:.4f}".format(accuracy))
print("Validation F1 Score: {:.4f}".format(f1))
print("\nClassification Report:\n", classification_report(all_labels, all_preds))

Validation Accuracy: 0.7481
Validation F1 Score: 0.7230

Classification Report:
               precision    recall  f1-score   support

         0.0       0.79      0.75      0.77       811
         1.0       0.71      0.74      0.72       646

    accuracy                           0.75      1457
   macro avg       0.75      0.75      0.75      1457
weighted avg       0.75      0.75      0.75      1457



In [14]:
# 训练完成后调用
rules = model.dnf.get_rules(threshold=0.5)  # 可调整阈值

# selected_features
print("==== 合取规则 ====")
for rule in rules["conjuncts"]:
    if "∅" not in rule:  # 过滤空规则
        print(rule)

print("\n==== 最终分类规则 ====")
for class_idx, rule in rules["disjuncts"].items():
    print(f"Class {class_idx}: {rule}")

==== 合取规则 ====
conj0 = ¬P2
conj1 = ¬P2 ∧ P5
conj2 = P2 ∧ ¬P5
conj6 = ¬P2
conj7 = P2 ∧ ¬P5
conj8 = P2
conj9 = ¬P2 ∧ P5

==== 最终分类规则 ====
Class 0: conj1 ∨ ¬conj2 ∨ ¬conj3 ∨ conj6 ∨ ¬conj7 ∨ ¬conj8 ∨ conj9


In [15]:
def explain_rule(rule: str, feature_map: dict) -> str:
    """将符号规则转换为自然语言"""
    explanation = []
    for term in rule.split():
        if "P" in term:
            # 提取特征和符号
            sign = "非" if "¬" in term else ""
            p_num = term.split("P")[-1]
            explanation.append(f"{sign}{feature_map[int(p_num)]}")
        elif "conj" in term:
            explanation.append(f"({term})")
    return " 或 ".join(explanation).replace("∧", " 且 ").replace("∨", " 或 ")
feature_map = {
    1: "信息充分性", 2: "信息准确性", 
    3: "内容完整性", 4: "意图正当性",
    5: "发布者信誉", 6: "情感中立性",
    7: "无诱导行为", 8: "信息一致性"
}

In [16]:
def generate_semantic_report(rules, feature_map):
    """生成完整语义报告"""
    report = []
    # 合取规则解释
    report.append("## 基础特征组合规则")
    for conj in rules["conjuncts"]:
        if "∅" not in conj:
            _, expr = conj.split("=")
            report.append(f"- 当 {explain_rule(expr.strip(), feature_map)} 时触发该规则")
    
    # 最终决策规则解释
    report.append("\n## 最终决策逻辑")
    for cls, rule in rules["disjuncts"].items():
        if "∅" not in rule:
            cls_name = "虚假信息" if cls == 0 else "真实信息"
            report.append(f"### {cls_name}判定条件")
            report.append(f"满足以下任一条件即判定为{cls_name}：")
            for term in rule.split("∨"):
                report.append(f"  - {explain_rule(term.strip(), feature_map)}")
    return "\n".join(report)
print(generate_semantic_report(rules, feature_map))

## 基础特征组合规则
- 当 非信息准确性 时触发该规则
- 当 非信息准确性 或 发布者信誉 时触发该规则
- 当 信息准确性 或 非发布者信誉 时触发该规则
- 当 非信息准确性 时触发该规则
- 当 信息准确性 或 非发布者信誉 时触发该规则
- 当 信息准确性 时触发该规则
- 当 非信息准确性 或 发布者信誉 时触发该规则

## 最终决策逻辑
### 虚假信息判定条件
满足以下任一条件即判定为虚假信息：
  - (conj1)
  - (¬conj2)
  - (¬conj3)
  - (conj6)
  - (¬conj7)
  - (¬conj8)
  - (conj9)


In [None]:
import random

with torch.no_grad():
    for inputs, labels in val_loader:
        # 从当前batch随机挑选一个样本
        random_idx = random.randint(0, inputs.size(0) - 1)
        single_input = inputs[random_idx : random_idx + 1]  # 保持维度一致
        single_label = labels[random_idx : random_idx + 1]
        print("随机样本输入 shape:", single_input.shape)
        print("随机样本标签:", single_label)
        break  # 只取第一个batch中的随机样
    predicted = model(single_input)
    print("随机样本模型预测输出:", predicted.item())


随机样本输入 shape: torch.Size([1, 8])
随机样本标签: tensor([[0.]])
随机样本模型预测输出: 0.08580563962459564


: 

In [None]:
from utils.reporter import generate_report
generate_report(model, val_loader, feature_map, "./reports/weibo_dnf_report.html")