# Fakeddit 文本‑only（BERT）基线

本 Notebook 按论文方法论的**文本分支**做一个可复现的 baseline：
- 使用 **Fakeddit v2.0 的 `multimodal_only_samples`**（仅图文样本，但这里只喂文本）
- 文本使用 **`clean_title`**
- 模型使用 **BERT** 做 2/3/6‑way 分类

后续你可以在此基础上增加图像分支做多模态融合。

**注意：**第一次运行需要下载 HuggingFace 模型权重（`bert-base-uncased`），请确保网络可用或已缓存。


## 0. 环境依赖（如本地未安装）
如果你还没有安装依赖，可以在终端或此 Notebook 中执行：
```bash
pip install torch torchvision torchaudio transformers scikit-learn tqdm
```


In [None]:
# 配置区（按需修改）
import os
from pathlib import Path

DATA_ROOT = Path('Fakeddit datasetv2.0')
TRAIN_PATH = DATA_ROOT / 'multimodal_only_samples' / 'multimodal_train.tsv'
VAL_PATH = DATA_ROOT / 'multimodal_only_samples' / 'multimodal_validate.tsv'
TEST_PATH = DATA_ROOT / 'multimodal_only_samples' / 'multimodal_test_public.tsv'

TASK = 2  # 2 / 3 / 6
MODEL_NAME = 'bert-base-uncased'  # 有GPU可直接用
MAX_LEN = 128
BATCH_SIZE = 32
EPOCHS = 3
LR = 2e-5
WEIGHT_DECAY = 0.01
WARMUP_RATIO = 0.1
MAX_SAMPLES = None  # None 表示使用全部样本；调小可快速验证
NUM_WORKERS = 2

OUTPUT_DIR = Path('outputs') / f'bert_text_only_{TASK}way'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print('Train:', TRAIN_PATH)
print('Val:', VAL_PATH)
print('Test:', TEST_PATH)
print('Output:', OUTPUT_DIR)


In [None]:
import csv
import random
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

try:
    from tqdm import tqdm
except Exception:
    def tqdm(x, **kwargs):
        return x

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device:', device)


## 1) 标签映射说明
以下映射已在本地 TSV 中验证：
- 2‑way：`1=True`，`0=Fake`
- 3‑way：`0=True`，`1=Fake with true text`，`2=Fake with false text`
- 6‑way：`0=True`，`1=Satire/Parody`，`2=False Connection`，`3=Imposter`，`4=Manipulated`，`5=Misleading`


In [None]:
LABEL_NAMES = {
    2: {0: 'Fake', 1: 'True'},
    3: {0: 'True', 1: 'Fake-TrueText', 2: 'Fake-FalseText'},
    6: {0: 'True', 1: 'Satire/Parody', 2: 'False Connection', 3: 'Imposter', 4: 'Manipulated', 5: 'Misleading'},
}

def load_tsv(path, task=2, max_samples=None):
    texts = []
    labels = []
    label_key = f'{task}_way_label'
    with open(path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f, delimiter='	')
        for i, row in enumerate(reader):
            text = (row.get('clean_title') or '').strip()
            if not text:
                continue
            label_str = row.get(label_key)
            if label_str is None or label_str == '':
                continue
            label = int(float(label_str))
            texts.append(text)
            labels.append(label)
            if max_samples is not None and len(texts) >= max_samples:
                break
    return texts, labels

train_texts, train_labels = load_tsv(TRAIN_PATH, task=TASK, max_samples=MAX_SAMPLES)
val_texts, val_labels = load_tsv(VAL_PATH, task=TASK, max_samples=MAX_SAMPLES)
test_texts, test_labels = load_tsv(TEST_PATH, task=TASK, max_samples=MAX_SAMPLES)

print('train:', len(train_texts))
print('val  :', len(val_texts))
print('test :', len(test_texts))

# label distribution
from collections import Counter
print('train label dist:', Counter(train_labels))
print('val label dist  :', Counter(val_labels))


In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

def collate_fn(batch):
    texts, labels = zip(*batch)
    enc = tokenizer(list(texts), padding=True, truncation=True, max_length=MAX_LEN, return_tensors='pt')
    enc['labels'] = torch.tensor(labels, dtype=torch.long)
    return enc

train_ds = TextDataset(train_texts, train_labels)
val_ds = TextDataset(val_texts, val_labels)
test_ds = TextDataset(test_texts, test_labels)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, collate_fn=collate_fn)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, collate_fn=collate_fn)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, collate_fn=collate_fn)


In [None]:
num_labels = TASK
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=num_labels)
model.to(device)

# class weights (optional, useful for 6-way imbalance)
from collections import Counter
label_counts = Counter(train_labels)
weights = [0.0] * num_labels
total = sum(label_counts.values())
for k in range(num_labels):
    # inverse frequency
    count = label_counts.get(k, 1)
    weights[k] = total / (num_labels * count)
weights = torch.tensor(weights, dtype=torch.float, device=device)

# Replace loss to use class weights
import torch.nn as nn
criterion = nn.CrossEntropyLoss(weight=weights)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
total_steps = len(train_loader) * EPOCHS
warmup_steps = int(total_steps * WARMUP_RATIO)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps)

scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())
print('total steps:', total_steps, 'warmup:', warmup_steps)


In [None]:
@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    total_loss = 0.0
    for batch in loader:
        labels = batch.pop('labels').to(device)
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        logits = outputs.logits
        loss = criterion(logits, labels)
        total_loss += loss.item() * labels.size(0)
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    cm = confusion_matrix(all_labels, all_preds)
    avg_loss = total_loss / max(1, len(all_labels))
    return avg_loss, acc, f1, cm


In [None]:
for epoch in range(1, EPOCHS + 1):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_loader, desc=f'Epoch {epoch}/{EPOCHS}'):
        labels = batch.pop('labels').to(device)
        batch = {k: v.to(device) for k, v in batch.items()}
        optimizer.zero_grad()
        with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
            outputs = model(**batch)
            logits = outputs.logits
            loss = criterion(logits, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()
        running_loss += loss.item() * labels.size(0)

    train_loss = running_loss / max(1, len(train_ds))
    val_loss, val_acc, val_f1, val_cm = evaluate(model, val_loader)
    print(f'Epoch {epoch}: train_loss={train_loss:.4f} val_loss={val_loss:.4f} val_acc={val_acc:.4f} val_f1={val_f1:.4f}')
    print('Val confusion matrix:
', val_cm)


In [None]:
test_loss, test_acc, test_f1, test_cm = evaluate(model, test_loader)
print(f'Test: loss={test_loss:.4f} acc={test_acc:.4f} f1={test_f1:.4f}')
print('Test confusion matrix:
', test_cm)

# 保存模型
model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print('Saved to', OUTPUT_DIR)


## 后续多模态融合建议
- 保留这份文本‑only 作为基线
- 添加图像分支（ResNet/ViT）并抽取图像向量
- 融合策略：
  - 拼接（concat）
  - 模间注意力（cross‑attention）
  - 语义一致性分支（cosine 或 MLP 预测一致性分数）
- 最终输入：`[text_emb, image_emb, consistency_score]`

如果你需要，我可以在此 Notebook 基础上继续扩展多模态版本。
