# Leave-one-theme-out generalization (Style vs TF-IDF vs Combined)

For each **Theme bucket** `T`, train on all other themes and test on `T`.

Models:
- **Style-only (no Theme):** Claim/Framing + CTA + Evidence (Link/URL evidence removed as in v6)
- **TF-IDF** text baseline (URLs already stripped from text)
- **Combined** stacked ensemble (same meta-features as v6)

Report:
- Per-theme macro-F1 bar chart.


In [None]:
from __future__ import annotations

from pathlib import Path
from typing import List, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    brier_score_loss,
    f1_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer


def _resolve_upward(start: Path, rel: Path) -> Path:
    for p in [start] + list(start.parents):
        cand = p / rel
        if cand.exists():
            return cand
    raise FileNotFoundError(f"Could not resolve path upward: {rel}")


HERE = Path.cwd().resolve()
DATA_PATH = _resolve_upward(
    HERE,
    Path("mbfc_channel_masked_logreg_fullpackage_v2_MBFC_C") / "MBFC " / "mega_samples_dedup_qwen_mbfc.csv",
)
print({"data_path": str(DATA_PATH)})

df = pd.read_csv(DATA_PATH, low_memory=False)
if "source" not in df.columns:
    df = df.rename(columns={df.columns[0]: "source"})

# Match v6 preprocessing: strip URLs from message text.
df["message"] = (
    df["message"]
    .astype(str)
    .str.replace(r"(https?://|http://|www\.[^\s]*|t\.me/[^\s]*)", " ", regex=True)
    .str.strip()
)
df = df[df["message"] != ""].copy()

# Keep only confident labels and rows with a domain.
df = df.dropna(subset=["risk_label", "normalized_domain"]).copy()
df["y"] = df["risk_label"].astype(int)

print(
    {
        "rows": int(len(df)),
        "unique_domains": int(df["normalized_domain"].nunique()),
        "pos_rate": float(df["y"].mean()),
    }
)


RESULTS_DIR = Path("mbfc_url_masked_logreg_leave_one_theme_out_results_v1")
RESULTS_DIR.mkdir(exist_ok=True)
print({"results_dir": str(RESULTS_DIR.resolve())})


In [None]:
# Theme normalization (same buckets as v6)

THEME_BUCKETS = [
    "Finance/Crypto",
    "Public health & medicine",
    "Politics",
    "Lifestyle & well-being",
    "Crime & public safety",
    "Gaming/Gambling",
    "News/Information",
    "Sports",
    "Technology",
    "Conversation/Chat/Other",
    "Other theme",
]


def _norm_theme(raw: object) -> Optional[str]:
    if not isinstance(raw, str):
        return None
    t = raw.strip()
    if not t:
        return None
    t = t.replace("\u2011", "-").replace("\u2013", "-").replace("\u2014", "-")
    tl = t.lower()

    if t in THEME_BUCKETS:
        return t

    if any(
        k in tl
        for k in [
            "crypto",
            "token",
            "coin",
            "airdrop",
            "ido",
            "staking",
            "defi",
            "exchange",
            "market",
            "finance",
            "econom",
        ]
    ):
        return "Finance/Crypto"

    if any(
        k in tl
        for k in [
            "health",
            "covid",
            "vaccine",
            "vaccination",
            "medicine",
            "medical",
            "clinical",
            "disease",
            "pandemic",
            "public health",
            "hospital",
        ]
    ):
        return "Public health & medicine"

    if any(
        k in tl
        for k in [
            "politic",
            "election",
            "parliament",
            "congress",
            "senate",
            "government",
            "president",
            "minister",
            "policy",
            "war",
            "conflict",
            "ukraine",
            "russia",
        ]
    ):
        return "Politics"

    if any(
        k in tl
        for k in [
            "crime",
            "criminal",
            "terror",
            "shooting",
            "police",
            "public safety",
            "fraud",
            "scam",
        ]
    ):
        return "Crime & public safety"

    if any(k in tl for k in ["gaming", "gambling", "casino", "betting", "lottery", "poker"]):
        return "Gaming/Gambling"

    if any(k in tl for k in ["sport", "football", "soccer", "basketball", "tennis", "nba", "nfl"]):
        return "Sports"

    if any(
        k in tl
        for k in [
            "technology",
            "tech",
            "software",
            "app ",
            "platform",
            "ai ",
            " a.i.",
            "machine learning",
            "blockchain",
            "internet",
            "social media",
            "algorithm",
            "science",
            "research",
            "study",
        ]
    ):
        return "Technology"

    if any(
        k in tl
        for k in [
            "lifestyle",
            "well-being",
            "wellbeing",
            "culture",
            "entertainment",
            "media",
            "celebrity",
            "social issues",
            "society",
            "family",
            "community",
        ]
    ):
        return "Lifestyle & well-being"

    if any(k in tl for k in ["news", "headline", "breaking", "coverage", "roundup", "update"]):
        return "News/Information"

    if any(k in tl for k in ["comment", "conversation", "chat", "q&a", "ama", "ask me anything"]):
        return "Conversation/Chat/Other"

    return "Other theme"


df["theme_norm"] = df["theme"].apply(_norm_theme)
df = df.dropna(subset=["theme_norm"]).copy()
print(df["theme_norm"].value_counts().to_dict())


In [None]:
# Style-only (no Theme) tag normalization (v6 behavior: Link/URL removed)

DROP_LINK_URL_LABEL = True
_LINK_URL_LABEL_NORM = "link/url"


def tokenize_multi(value: object) -> List[str]:
    if not isinstance(value, str):
        return []
    value = value.replace("+", ",")
    parts = [part.strip() for part in value.split(",") if part.strip()]
    if not DROP_LINK_URL_LABEL:
        return parts
    return [p for p in parts if "".join(p.lower().split()) != _LINK_URL_LABEL_NORM]


CLAIM_BUCKETS = [
    "Verifiable factual statement",
    "Rumour / unverified report",
    "Announcement",
    "Opinion / subjective statement",
    "Misleading context / cherry-picking",
    "Promotional hype / exaggerated profit guarantee",
    "Emotional appeal / fear-mongering",
    "Scarcity/FOMO tactic",
    "Statistics",
    "Other claim type",
    "No substantive claim",
    "Fake content",
    "Speculative forecast / prediction",
    "None / assertion only",
]

CTA_BUCKETS = [
    "Visit external link / watch video",
    "Engage/Ask questions",
    "Join/Subscribe",
    "Buy / invest / donate",
    "Attend event / livestream",
    "Share / repost / like",
    "No CTA",
    "Other CTA",
]

EVID_BUCKETS = [
    "Link/URL",
    "Statistics",
    "Quotes/Testimony",
    "Chart / price graph / TA diagram",
    "Other (Evidence)",
    "None / assertion only",
]


def _norm_claim_labels(raw: object) -> List[str]:
    labels = tokenize_multi(raw)
    out: List[str] = []
    for lbl in labels:
        base = lbl.strip()
        if not base:
            continue
        low = base.lower()
        if base in CLAIM_BUCKETS:
            out.append(base)
            continue
        if "verifiable" in low or "factual" in low:
            out.append("Verifiable factual statement")
        elif "rumour" in low or "unverified" in low:
            out.append("Rumour / unverified report")
        elif "misleading context" in low or "cherry" in low:
            out.append("Misleading context / cherry-picking")
        elif "promotional hype" in low or "exaggerated profit" in low:
            out.append("Promotional hype / exaggerated profit guarantee")
        elif "emotional appeal" in low or "fear-mongering" in low or "fear mongering" in low:
            out.append("Emotional appeal / fear-mongering")
        elif "scarcity" in low or "fomo" in low:
            out.append("Scarcity/FOMO tactic")
        elif "statistic" in low:
            out.append("Statistics")
        elif "fake content" in low or "fabricated" in low:
            out.append("Fake content")
        elif "predict" in low or "forecast" in low:
            out.append("Speculative forecast / prediction")
        elif "announcement" in low:
            out.append("Announcement")
        elif "opinion" in low or "interpretive" in low or "analysis" in low or "review" in low:
            out.append("Opinion / subjective statement")
        elif "none / assertion only" in low or "assertion only" in low:
            out.append("None / assertion only")
        else:
            out.append("Other claim type")
    seen = set()
    result: List[str] = []
    for v in out:
        if v not in seen:
            seen.add(v)
            result.append(v)
    return result


def _norm_cta_labels(raw: object) -> List[str]:
    labels = tokenize_multi(raw)
    out: List[str] = []
    for lbl in labels:
        base = lbl.strip()
        if not base:
            continue
        low = base.lower()
        if base in CTA_BUCKETS:
            out.append(base)
            continue
        if base in {"None", "No CTA"} or "no cta" in low:
            out.append("No CTA")
        elif "engage" in low or "ask" in low or "anything" in low:
            out.append("Engage/Ask questions")
        elif "attend" in low or "event" in low or "livestream" in low or "live stream" in low:
            out.append("Attend event / livestream")
        elif "join" in low or "subscribe" in low or "follow" in low or "whitelist" in low:
            out.append("Join/Subscribe")
        elif "buy" in low or "invest" in low or "donate" in low or "stake" in low or "swap" in low:
            out.append("Buy / invest / donate")
        elif "share" in low or "repost" in low or "like" in low:
            out.append("Share / repost / like")
        elif (
            "visit" in low
            or "read" in low
            or "watch" in low
            or "link" in low
            or "website" in low
            or "check" in low
            or "view charts" in low
        ):
            out.append("Visit external link / watch video")
        else:
            out.append("Other CTA")
    seen = set()
    result: List[str] = []
    for v in out:
        if v not in seen:
            seen.add(v)
            result.append(v)
    return result


def _norm_evidence_labels(raw: object) -> List[str]:
    labels = tokenize_multi(raw)
    out: List[str] = []
    for lbl in labels:
        base = lbl.strip()
        if not base:
            continue
        low = base.lower()
        if base in EVID_BUCKETS:
            if base != "Link/URL":
                out.append(base)
            continue
        if "link/url" in low or "link" in low or "url" in low:
            continue
        elif "statistic" in low:
            out.append("Statistics")
        elif "quote" in low or "testimony" in low:
            out.append("Quotes/Testimony")
        elif "chart" in low or "graph" in low or "diagram" in low:
            out.append("Chart / price graph / TA diagram")
        elif "none / assertion only" in low or "assertion only" in low:
            out.append("None / assertion only")
        else:
            out.append("Other (Evidence)")
    seen = set()
    result: List[str] = []
    for v in out:
        if v not in seen:
            seen.add(v)
            result.append(v)
    return result


def build_style_tokens_no_theme(row: pd.Series) -> List[str]:
    tokens: List[str] = []
    for label in _norm_claim_labels(row.get("claim_types")):
        tokens.append(f"claim={label}")
    for label in _norm_cta_labels(row.get("ctas")):
        tokens.append(f"cta={label}")
    for label in _norm_evidence_labels(row.get("evidence")):
        tokens.append(f"evid={label}")
    return tokens


In [None]:
def expected_calibration_error(y_true, y_proba, n_bins=10) -> float:
    y_true = np.asarray(y_true)
    y_proba = np.asarray(y_proba)
    bins = np.linspace(0.0, 1.0, n_bins + 1)
    idx = np.digitize(y_proba, bins) - 1
    ece = 0.0
    n = len(y_true)
    for b in range(n_bins):
        mask = idx == b
        if not np.any(mask):
            continue
        p_bin = float(y_proba[mask].mean())
        y_bin = float(y_true[mask].mean())
        weight = float(mask.sum() / n)
        ece += weight * abs(p_bin - y_bin)
    return float(ece)


def sweep_thresholds(y_true, proba, grid=None):
    grid = grid or [round(t, 2) for t in np.linspace(0.05, 0.95, 19)]
    best = None
    for t in grid:
        pred = (proba >= t).astype(int)
        macro_f1 = f1_score(y_true, pred, average="macro")
        if best is None or macro_f1 > best["macro_f1"]:
            best = {"threshold": float(t), "macro_f1": float(macro_f1)}
    return best


def fit_val_tuned_logreg(X_train, y_train, X_val, y_val, X_test, y_test, *, seed=0):
    clf = LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="lbfgs",
        max_iter=1000,
        class_weight="balanced",
        random_state=seed,
    )
    clf.fit(X_train, y_train)
    val_proba = clf.predict_proba(X_val)[:, 1]
    thr = sweep_thresholds(y_val, val_proba)

    # retrain on train+val for test predictions
    X_trainval = np.vstack([X_train, X_val])
    y_trainval = np.concatenate([y_train, y_val])
    clf2 = LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="lbfgs",
        max_iter=1000,
        class_weight="balanced",
        random_state=seed,
    )
    clf2.fit(X_trainval, y_trainval)
    test_proba = clf2.predict_proba(X_test)[:, 1]
    test_pred = (test_proba >= thr["threshold"]).astype(int)

    out = {
        "threshold": float(thr["threshold"]),
        "macro_f1": float(f1_score(y_test, test_pred, average="macro")),
        "macro_recall": float(recall_score(y_test, test_pred, average="macro")),
        "roc_auc": float(roc_auc_score(y_test, test_proba)),
        "accuracy": float(accuracy_score(y_test, test_pred)),
        "brier": float(brier_score_loss(y_test, test_proba)),
        "ece": float(expected_calibration_error(y_test, test_proba, n_bins=10)),
        "test_proba": test_proba,
        "val_proba": val_proba,
    }
    return out


def fit_val_tuned_text(train_df, val_df, test_df, *, seed=0):
    vec = TfidfVectorizer(
        ngram_range=(1, 2),
        max_features=20000,
        min_df=2,
        strip_accents="unicode",
    )
    X_train = vec.fit_transform(train_df["message"].astype(str))
    X_val = vec.transform(val_df["message"].astype(str))
    X_test = vec.transform(test_df["message"].astype(str))

    # Use saga for sparse high-dimensional LR.
    clf = LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="saga",
        max_iter=2000,
        n_jobs=-1,
        class_weight="balanced",
        random_state=seed,
    )
    clf.fit(X_train, train_df["y"].astype(int).to_numpy())
    val_proba = clf.predict_proba(X_val)[:, 1]
    thr = sweep_thresholds(val_df["y"].astype(int).to_numpy(), val_proba)

    # retrain on train+val (keep sparse to avoid huge densification)
    from scipy import sparse as _sp
    X_trainval_sp = _sp.vstack([X_train, X_val])
    y_trainval = np.concatenate([
        train_df["y"].astype(int).to_numpy(),
        val_df["y"].astype(int).to_numpy(),
    ])
    clf2 = LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="saga",
        max_iter=2000,
        n_jobs=-1,
        class_weight="balanced",
        random_state=seed,
    )
    clf2.fit(X_trainval_sp, y_trainval)
    test_proba = clf2.predict_proba(X_test)[:, 1]
    test_pred = (test_proba >= thr["threshold"]).astype(int)

    y_test = test_df["y"].astype(int).to_numpy()
    out = {
        "threshold": float(thr["threshold"]),
        "macro_f1": float(f1_score(y_test, test_pred, average="macro")),
        "macro_recall": float(recall_score(y_test, test_pred, average="macro")),
        "roc_auc": float(roc_auc_score(y_test, test_proba)),
        "accuracy": float(accuracy_score(y_test, test_pred)),
        "brier": float(brier_score_loss(y_test, test_proba)),
        "ece": float(expected_calibration_error(y_test, test_proba, n_bins=10)),
        "test_proba": test_proba,
        "val_proba": val_proba,
    }
    return out


def fit_val_tuned_style_no_theme(train_df, val_df, test_df, *, seed=0):
    train_tokens = train_df.apply(build_style_tokens_no_theme, axis=1).tolist()
    val_tokens = val_df.apply(build_style_tokens_no_theme, axis=1).tolist()
    test_tokens = test_df.apply(build_style_tokens_no_theme, axis=1).tolist()

    vocab = sorted(set(t for toks in (train_tokens + val_tokens) for t in toks))
    mlb = MultiLabelBinarizer(classes=vocab)
    X_train = mlb.fit_transform(train_tokens).astype(np.float32)
    X_val = mlb.transform(val_tokens).astype(np.float32)
    X_test = mlb.transform(test_tokens).astype(np.float32)

    y_train = train_df["y"].astype(int).to_numpy()
    y_val = val_df["y"].astype(int).to_numpy()
    y_test = test_df["y"].astype(int).to_numpy()

    out = fit_val_tuned_logreg(X_train, y_train, X_val, y_val, X_test, y_test, seed=seed)
    out["n_features"] = int(len(vocab))
    return out


In [None]:
# Run leave-one-theme-out

themes = [t for t in THEME_BUCKETS if t in set(df["theme_norm"].unique())]

rows = []
for theme in themes:
    test_df = df[df["theme_norm"] == theme].copy()
    trainval_df = df[df["theme_norm"] != theme].copy()

    # Enforce domain-disjointness: remove any domains that appear in the held-out theme.
    test_domains = set(test_df["normalized_domain"].astype(str).unique())
    trainval_df = trainval_df[~trainval_df["normalized_domain"].astype(str).isin(test_domains)].copy()

    # Require both classes present in test.
    if test_df["y"].nunique() < 2:
        print({"theme": theme, "skipped": "test_has_single_class", "n_test": int(len(test_df))})
        continue
    if trainval_df["y"].nunique() < 2 or len(trainval_df) < 1000:
        print({"theme": theme, "skipped": "train_too_small_or_single_class", "n_trainval": int(len(trainval_df))})
        continue

    train_df, val_df = train_test_split(
        trainval_df,
        test_size=0.125,
        random_state=123,
        stratify=trainval_df["y"],
    )

    # Style-only (no Theme)
    style = fit_val_tuned_style_no_theme(train_df, val_df, test_df, seed=0)
    rows.append(
        {
            "theme": theme,
            "model": "style_only_no_theme",
            "n_test": int(len(test_df)),
            "pos_rate_test": float(test_df["y"].mean()),
            **{k: style[k] for k in ["macro_f1", "macro_recall", "roc_auc", "accuracy", "brier", "ece", "threshold"]},
            "n_features": int(style.get("n_features", 0)),
        }
    )

    # TF-IDF
    text = fit_val_tuned_text(train_df, val_df, test_df, seed=0)
    rows.append(
        {
            "theme": theme,
            "model": "tfidf",
            "n_test": int(len(test_df)),
            "pos_rate_test": float(test_df["y"].mean()),
            **{k: text[k] for k in ["macro_f1", "macro_recall", "roc_auc", "accuracy", "brier", "ece", "threshold"]},
            "n_features": np.nan,
        }
    )

    # Combined stacking (meta model trained on val)
    meta_X_val = np.stack(
        [
            text["val_proba"],
            style["val_proba"],
            text["val_proba"] * style["val_proba"],
        ],
        axis=1,
    )
    meta_X_test = np.stack(
        [
            text["test_proba"],
            style["test_proba"],
            text["test_proba"] * style["test_proba"],
        ],
        axis=1,
    )

    meta = LogisticRegression(penalty="l2", C=1.0, solver="lbfgs", max_iter=1000)
    meta.fit(meta_X_val, val_df["y"].astype(int).to_numpy())
    meta_val_proba = meta.predict_proba(meta_X_val)[:, 1]
    thr = sweep_thresholds(val_df["y"].astype(int).to_numpy(), meta_val_proba)

    test_proba = meta.predict_proba(meta_X_test)[:, 1]
    test_pred = (test_proba >= thr["threshold"]).astype(int)
    y_test = test_df["y"].astype(int).to_numpy()
    rows.append(
        {
            "theme": theme,
            "model": "combined",
            "n_test": int(len(test_df)),
            "pos_rate_test": float(test_df["y"].mean()),
            "macro_f1": float(f1_score(y_test, test_pred, average="macro")),
            "macro_recall": float(recall_score(y_test, test_pred, average="macro")),
            "roc_auc": float(roc_auc_score(y_test, test_proba)),
            "accuracy": float(accuracy_score(y_test, test_pred)),
            "brier": float(brier_score_loss(y_test, test_proba)),
            "ece": float(expected_calibration_error(y_test, test_proba, n_bins=10)),
            "threshold": float(thr["threshold"]),
            "n_features": np.nan,
        }
    )

    print(
        {
            "theme": theme,
            "n_test": int(len(test_df)),
            "style_macro_f1": round(style["macro_f1"], 4),
            "tfidf_macro_f1": round(text["macro_f1"], 4),
            "combined_macro_f1": round(rows[-1]["macro_f1"], 4),
        }
    )

results = pd.DataFrame(rows)
results_csv = RESULTS_DIR / "leave_one_theme_out_results.csv"
results.to_csv(results_csv, index=False)
print({"results_csv": str(results_csv)})
display(results.sort_values(["theme", "model"]))


# Plot macro-F1 by held-out theme
plot_df = results.pivot(index="theme", columns="model", values="macro_f1").reindex(themes)
fig, ax = plt.subplots(figsize=(10.5, 3.2), dpi=300)
x = np.arange(len(plot_df.index))
width = 0.25

series = [
    ("tfidf", "#666666"),
    ("style_only_no_theme", "#1f77b4"),
    ("combined", "#2ca02c"),
]

for i, (col, color) in enumerate(series):
    if col not in plot_df.columns:
        continue
    ax.bar(x + (i - 1) * width, plot_df[col].to_numpy(dtype=float), width, label=col, color=color)

ax.set_xticks(x)
ax.set_xticklabels(plot_df.index.tolist(), rotation=20, ha="right")
ax.set_ylabel("Macro-F1")
ax.set_title("Leave-one-theme-out generalization (test = held-out theme; domains removed from train)")
ax.set_ylim(0.0, 1.0)
ax.grid(axis="y", alpha=0.25)
ax.legend(frameon=False, loc="lower left")
fig.tight_layout()

fig_png = RESULTS_DIR / "leave_one_theme_out_macro_f1.png"
fig_pdf = RESULTS_DIR / "leave_one_theme_out_macro_f1.pdf"
fig.savefig(fig_png, dpi=300)
fig.savefig(fig_pdf)
plt.close(fig)
print({"macro_f1_plot_png": str(fig_png)})
