In [3]:
# --- 放到一个单元格里（或替换你的脚本）---
import os, argparse
import numpy as np
import matplotlib.pyplot as plt
from gensim.models import KeyedVectors

def pca_reduce(X, n_components=2):
    Xc = X - X.mean(axis=0, keepdims=True)
    U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
    return Xc @ Vt[:n_components].T

def try_tsne(X, n_components=2, perplexity=30, random_state=42):
    try:
        from sklearn.manifold import TSNE
        Z = TSNE(n_components=n_components, init="random",
                 perplexity=perplexity, random_state=random_state,
                 learning_rate="auto").fit_transform(X)
        return Z, True
    except Exception:
        return pca_reduce(X, n_components=n_components), False

def collect_subset(wv, seed_words, nn_per_seed=15, limit=300):
    chosen, seen = [], set()
    seeds = [w for w in seed_words if w in wv]
    for s in seeds:
        if s not in seen:
            chosen.append(s); seen.add(s)
        for w, _ in wv.most_similar(s, topn=nn_per_seed):
            if w not in seen:
                chosen.append(w); seen.add(w)
            if len(chosen) >= limit: break
        if len(chosen) >= limit: break
    if len(chosen) < limit:
        for w in wv.index_to_key:
            if w not in seen:
                chosen.append(w); seen.add(w)
            if len(chosen) >= limit: break
    return chosen

def plot_2d(words, coords, out_path):
    plt.figure(figsize=(12, 9))
    x, y = coords[:,0], coords[:,1]
    plt.scatter(x, y, s=12)
    for i, w in enumerate(words):
        if i % max(1, len(words)//200) == 0:
            plt.annotate(w, (x[i], y[i]), fontsize=8, alpha=0.8)
    plt.title("Word Embeddings (2D projection)")
    plt.tight_layout(); os.makedirs("outputs", exist_ok=True)
    plt.savefig(out_path, dpi=200); plt.close()
    print("✅ Saved 2D →", out_path)

def plot_3d(words, coords, out_path):
    from mpl_toolkits.mplot3d import Axes3D  # noqa
    fig = plt.figure(figsize=(12, 9))
    ax = fig.add_subplot(111, projection='3d')
    xs, ys, zs = coords[:,0], coords[:,1], coords[:,2]
    ax.scatter(xs, ys, zs, s=12)
    step = max(1, len(words)//200)
    for i, w in enumerate(words[::step]):
        j = i*step; ax.text(xs[j], ys[j], zs[j], w, fontsize=7)
    ax.set_title("Word Embeddings (3D projection)")
    plt.tight_layout(); os.makedirs("outputs", exist_ok=True)
    plt.savefig(out_path, dpi=200); plt.close()
    print("✅ Saved 3D →", out_path)

def visualize(vec="outputs/w2v_text8_sgns.vec", limit=300, nn_per_seed=15, method="pca"):
    wv = KeyedVectors.load_word2vec_format(vec, binary=False)
    print("Loaded:", len(wv.key_to_index), "dim:", wv.vector_size)
    seed_words = ["king","queen","man","woman","london","paris","france","england",
                  "computer","software","data","science","music","art","city","country","river","mountain"]
    words = collect_subset(wv, seed_words, nn_per_seed=nn_per_seed, limit=limit)
    X = np.stack([wv[w] for w in words], axis=0)

    if method == "tsne":
        Z2, used = try_tsne(X, n_components=2)
        print("Using", "t-SNE(2D)" if used else "PCA(2D)")
    else:
        Z2 = pca_reduce(X, n_components=2); print("Using PCA(2D)")
    plot_2d(words, Z2, "outputs/emb_2d.png")

    if method == "tsne":
        Z3, used = try_tsne(X, n_components=3)
        print("Using", "t-SNE(3D)" if used else "PCA(3D)")
    else:
        Z3 = pca_reduce(X, n_components=3); print("Using PCA(3D)")
    plot_3d(words, Z3, "outputs/emb_3d.png")

# —— 在Jupyter里直接调用：
# visualize(method="pca")        # 默认PCA
# visualize(method="tsne")       # 若已安装scikit-learn


In [4]:
visualize(method="pca", limit=300)
# 或
# visualize(method="tsne", limit=300)


Loaded: 71290 dim: 300
Using PCA(2D)
✅ Saved 2D → outputs/emb_2d.png
Using PCA(3D)
✅ Saved 3D → outputs/emb_3d.png


In [5]:
# ==== Word2Vec 复评可视化（多模型对比，统一投影） ====
import os
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from gensim.models import KeyedVectors

# ---------- 基础降维 ----------
def pca_fit_on_concat(X_list, n_components=2):
    """在多个矩阵的拼接上拟合同一套 PCA 主轴；返回 (mean, Vt[:n])"""
    X = np.vstack(X_list)
    mu = X.mean(axis=0, keepdims=True)
    Xc = X - mu
    U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
    return mu, Vt[:n_components]  # components_

def pca_transform(X, mean, components):
    return (X - mean) @ components.T

def pca_reduce(X, n_components=2):
    Xc = X - X.mean(axis=0, keepdims=True)
    U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
    return Xc @ Vt[:n_components].T

def try_tsne(X, n_components=2, perplexity=30, random_state=42):
    """仅作单模型可视化；多模型对比请用 PCA"""
    try:
        from sklearn.manifold import TSNE
        Z = TSNE(n_components=n_components, init="random",
                 perplexity=perplexity, random_state=random_state,
                 learning_rate="auto").fit_transform(X)
        return Z, True
    except Exception:
        return pca_reduce(X, n_components=n_components), False

# ---------- 采样子集（确保各模型可比：使用“公共词集合”） ----------
def collect_subset_common(wv_list, seed_words, nn_per_seed=15, limit=300):
    """
    先用第一个模型的近邻引导主题，再取所有模型的公共词，保证同一批词在所有模型中均存在。
    """
    base = wv_list[0]
    seeds = [w for w in seed_words if w in base]
    chosen = []
    seen = set()
    for s in seeds:
        if s not in seen:
            chosen.append(s); seen.add(s)
        for w, _ in base.most_similar(s, topn=nn_per_seed):
            if w not in seen:
                chosen.append(w); seen.add(w)
            if len(chosen) >= limit:
                break
        if len(chosen) >= limit:
            break
    # 若不足，补高频词
    if len(chosen) < limit:
        for w in base.index_to_key:
            if w not in seen:
                chosen.append(w); seen.add(w)
            if len(chosen) >= limit:
                break
    # 取所有模型的交集
    common = []
    for w in chosen:
        if all(w in wv for wv in wv_list):
            common.append(w)
    # 兜底：若交集仍太少，直接用所有模型的整体交集高频前 limit 个
    if len(common) < min(50, limit//2):
        sets = [set(wv.index_to_key) for wv in wv_list]
        inter = sets[0]
        for s in sets[1:]:
            inter = inter & s
        # 按第一个模型的词频顺序截取
        common = [w for w in wv_list[0].index_to_key if w in inter][:limit]
    return common[:limit]

# ---------- 绘图 ----------
def overlay_2d(words, coords_list, labels, out_path):
    plt.figure(figsize=(12, 9))
    for i, Z in enumerate(coords_list):
        x, y = Z[:,0], Z[:,1]
        plt.scatter(x, y, s=12, label=labels[i], alpha=0.7)
    # 标注少量词避免过密：仅标注 seed-like 的“代表词”
    for w in words[:max(20, len(words)//20)]:
        idx = words.index(w)
        # 用第一组坐标放标签
        x, y = coords_list[0][idx,0], coords_list[0][idx,1]
        plt.annotate(w, (x, y), fontsize=7, alpha=0.8)
    plt.legend()
    plt.title("Word Embeddings (2D overlay, shared PCA basis)")
    plt.tight_layout(); os.makedirs("outputs", exist_ok=True)
    plt.savefig(out_path, dpi=200); plt.close()
    print("✅ Saved 2D overlay →", out_path)

def overlay_3d(words, coords_list, labels, out_path):
    from mpl_toolkits.mplot3d import Axes3D  # noqa
    fig = plt.figure(figsize=(12, 9))
    ax = fig.add_subplot(111, projection='3d')
    for i, Z in enumerate(coords_list):
        xs, ys, zs = Z[:,0], Z[:,1], Z[:,2]
        ax.scatter(xs, ys, zs, s=12, alpha=0.7, label=labels[i])
    # 只用第一组坐标标一部分标签
    step = max(1, len(words)//100)
    for i, w in enumerate(words[::step]):
        j = i*step
        x, y, z = coords_list[0][j,0], coords_list[0][j,1], coords_list[0][j,2]
        ax.text(x, y, z, w, fontsize=7)
    ax.legend()
    ax.set_title("Word Embeddings (3D overlay, shared PCA basis)")
    plt.tight_layout(); os.makedirs("outputs", exist_ok=True)
    plt.savefig(out_path, dpi=200); plt.close()
    print("✅ Saved 3D overlay →", out_path)

# ---------- 单模型旧版接口（保持兼容） ----------
def visualize(vec="outputs/w2v_text8_sgns.vec", limit=300, nn_per_seed=15, method="pca"):
    wv = KeyedVectors.load_word2vec_format(vec, binary=False)
    print("Loaded:", len(wv.key_to_index), "dim:", wv.vector_size)
    seed_words = ["king","queen","man","woman","london","paris","france","england",
                  "computer","software","data","science","music","art","city","country","river","mountain"]
    words = collect_subset_common([wv], seed_words, nn_per_seed=nn_per_seed, limit=limit)
    X = np.stack([wv[w] for w in words], axis=0)

    if method == "tsne":
        Z2, used = try_tsne(X, n_components=2); print("Using", "t-SNE(2D)" if used else "PCA(2D)")
        Z3, used = try_tsne(X, n_components=3); print("Using", "t-SNE(3D)" if used else "PCA(3D)")
    else:
        Z2 = pca_reduce(X, n_components=2); print("Using PCA(2D)")
        Z3 = pca_reduce(X, n_components=3); print("Using PCA(3D)")

    os.makedirs("outputs", exist_ok=True)
    # 单模型也用 overlay 画法（一个图层）
    overlay_2d(words, [Z2], [Path(vec).name], "outputs/emb_2d.png")
    overlay_3d(words, [Z3], [Path(vec).name], "outputs/emb_3d.png")

# ---------- 新增：多模型对比（共享投影；推荐 method='pca'） ----------
def visualize_compare(vec_paths, labels=None, limit=300, nn_per_seed=15, method="pca"):
    vec_paths = [p for p in vec_paths if Path(p).exists()]
    assert len(vec_paths) >= 2, "至少传入两个已存在的 .vec 路径用于对比"
    if labels is None:
        labels = [Path(p).stem for p in vec_paths]
    else:
        assert len(labels) == len(vec_paths), "labels 数量需与 vec_paths 一致"

    # 加载
    wvs = [KeyedVectors.load_word2vec_format(p, binary=False) for p in vec_paths]
    print("Loaded models:", [f"{Path(p).name}(|V|={len(wv.key_to_index)},D={wv.vector_size})"
                            for p, wv in zip(vec_paths, wvs)])

    # 公共词集合（确保可比）
    seed_words = ["king","queen","man","woman","london","paris","france","england",
                  "computer","software","data","science","music","art","city","country","river","mountain"]
    words = collect_subset_common(wvs, seed_words, nn_per_seed=nn_per_seed, limit=limit)
    print(f"Using common word subset: {len(words)} items")

    # 取向量
    X_list = [np.stack([wv[w] for w in words], axis=0) for wv in wvs]

    # 统一投影
    if method == "tsne":
        print("⚠️ t-SNE 无法共享坐标轴，不适合严格对比；将分别降维并叠加，仅作参考。")
        Z2_list, Z3_list = [], []
        for X in X_list:
            Z2, _ = try_tsne(X, n_components=2); Z2_list.append(Z2)
            Z3, _ = try_tsne(X, n_components=3); Z3_list.append(Z3)
    else:
        # PCA 在所有模型的拼接上拟合同一套主轴
        mu2, comps2 = pca_fit_on_concat([X for X in X_list], n_components=2)
        mu3, comps3 = pca_fit_on_concat([X for X in X_list], n_components=3)
        Z2_list = [pca_transform(X, mu2, comps2) for X in X_list]
        Z3_list = [pca_transform(X, mu3, comps3) for X in X_list]

    # 绘图（叠加）
    os.makedirs("outputs", exist_ok=True)
    overlay_2d(words, Z2_list, labels, "outputs/emb_compare_2d.png")
    overlay_3d(words, Z3_list, labels, "outputs/emb_compare_3d.png")

# ===== 调用示例 =====
# 单模型（保持你原来的接口）：
# visualize(method="pca", limit=300)

# 多模型对比（推荐；共享 PCA 主轴）：
# visualize_compare(
#     vec_paths=[
#         "outputs/w2v_text8_sgns.vec",
#         "outputs/w2v_text8_sgns_abtt.vec",
#         # "outputs/w2v_text8_sgns_cn.vec",
#     ],
#     labels=["raw","abtt"],   # 可自定义
#     method="pca",
#     limit=300,
#     nn_per_seed=15,
# )


In [6]:
visualize_compare(
    vec_paths=[
        "outputs/w2v_text8_sgns.vec",
        "outputs/w2v_text8_sgns_abtt.vec",
        # "outputs/w2v_text8_sgns_cn.vec",
    ],
    labels=["raw","abtt"],
    method="pca",       # 对比请用 PCA
    limit=300,
    nn_per_seed=15,
)


Loaded models: ['w2v_text8_sgns.vec(|V|=71290,D=300)', 'w2v_text8_sgns_abtt.vec(|V|=71290,D=300)']
Using common word subset: 300 items
✅ Saved 2D overlay → outputs/emb_compare_2d.png
✅ Saved 3D overlay → outputs/emb_compare_3d.png
