# Genres in Genres 可视化实验 Notebook

这个 Notebook 用于在 **不依赖 Gradio 前端** 的情况下，直接调用项目中的可视化接口（`src/visualization.py`），方便你：

- 快速切换参数（PCA/t-SNE/UMAP、K、是否画专辑轮廓、Radar 维度等）
- 一次性把所有图表导出到 **同一个 PDF**（多页）

建议从 `genres_in_genres/` 目录启动 `jupyter lab`，然后运行本 Notebook。

## 0. 路径与导入（保持项目框架）

说明：
- 本项目可视化依赖 `src/` 包。
- 语义相关（tag 编码、Radar、簇命名）需要 `pipeline_mulan.MuQMuLanEncoder`（位于上层 `t2m/` 仓库根目录）。
- 如果找不到 `pipeline_mulan.py`，仍可以生成除 Radar/语义标签外的大多数图。

In [None]:
from __future__ import annotations

import os
import sys
from pathlib import Path

HERE = Path.cwd().resolve()

# 在 Notebook 中约定：从 genres_in_genres/ 目录运行
SUBPROJECT_ROOT = HERE
if not (SUBPROJECT_ROOT / "src").exists():
    raise RuntimeError(
        "请从 genres_in_genres/ 目录启动并运行 notebook（确保当前目录包含 src/）。\n"
        f"当前目录：{SUBPROJECT_ROOT}"
    )

# 尝试向上寻找 t2m 仓库根（包含 pipeline_mulan.py）
T2M_ROOT: Path | None = None
for p in [SUBPROJECT_ROOT] + list(SUBPROJECT_ROOT.parents):
    if (p / "pipeline_mulan.py").exists():
        T2M_ROOT = p
        break

# 维护项目框架：只把必要的根目录加进 sys.path
sys.path.insert(0, str(SUBPROJECT_ROOT))
if T2M_ROOT is not None:
    sys.path.insert(0, str(T2M_ROOT))

print("SUBPROJECT_ROOT:", SUBPROJECT_ROOT)
print("T2M_ROOT:", T2M_ROOT)

In [None]:
import datetime
+import numpy as np
+
+import matplotlib.pyplot as plt
+from matplotlib.backends.backend_pdf import PdfPages
+
+from src.core import ArtistCareer
+from src.mock_data import MockDataGenerator
+from src.library_manager import LibraryManager
+from src.analysis import StyleAnalyzer
+from src.semantics import SemanticMapper
+from src.visualization import GenreTrajectoryVisualizer, CareerStoryteller
+
+
+plt.rcParams["figure.dpi"] = 120
+plt.rcParams["savefig.bbox"] = "tight"
+plt.rcParams["pdf.fonttype"] = 42
+plt.rcParams["ps.fonttype"] = 42

## 1. 参数区（改这里即可）

推荐流程：
1) 先用 `MODE = "library"` + 某个缓存过的 artist 跑通
2) 再调整 `METHOD` / `K` / `SHOW_ALBUM_CONTOURS` / Radar 的 tags
3) 最后把结果导出到 `exports/*.pdf`（已加入 `.gitignore`）

In [None]:
# 数据模式："library"（读取 data/cache/*.pkl）或 "mock"（生成模拟职业生涯）
MODE = "library"  # "library" | "mock"

# library 模式下：艺术家名称来自 data/cache/*.pkl（例如 "Pink Floyd"）
ARTIST_NAME = "Pink Floyd"

# 可选：只分析部分专辑（None 表示全选）
SELECTED_ALBUMS: list[str] | None = None

# 降维方法
METHOD = "pca"  # "pca" | "tsne" | "umap"

# 聚类设置
AUTO_K = False
K = 3

# 轨迹图：是否画每张专辑的凸包轮廓（专辑多时可能很乱）
SHOW_ALBUM_CONTOURS = False

# 语义/雷达图：如果你只想做“几何图”（不用语义标签），可以设为 False
USE_SEMANTICS = True
MAX_TAGS = 2000

# Radar 对比专辑（None 表示默认：第一张 vs 最后一张）
RADAR_ALBUMS: list[str] | None = None

# Radar 语义维度（建议 4-8 个）
RADAR_TAGS = ["Happy", "Sad", "Energetic", "Calm", "Dark", "Bright"]

# 输出设置
OUTPUT_DIR = SUBPROJECT_ROOT / "exports"
RUN_NAME = f"{MODE}_{ARTIST_NAME}_{METHOD}".replace(" ", "_")
OUTPUT_PDF = OUTPUT_DIR / f"{RUN_NAME}.pdf"

## 2. 载入数据（library 或 mock）

In [None]:
DATA_DIR = SUBPROJECT_ROOT / "data" / "music"
CACHE_DIR = SUBPROJECT_ROOT / "data" / "cache"
METADATA_DIR = SUBPROJECT_ROOT / "data" / "metadata"

library = LibraryManager(str(DATA_DIR), str(CACHE_DIR))

if MODE == "mock":
    career = MockDataGenerator.generate_career(
        artist_name=ARTIST_NAME,
        num_albums=5,
        tracks_per_album=10,
        start_year=2010,
    )
elif MODE == "library":
    cached = library.list_cached_artists()
    if ARTIST_NAME not in cached:
        raise ValueError(
            f"找不到缓存艺术家：{ARTIST_NAME}\n"
            f"可选：{cached[:20]}{'...' if len(cached) > 20 else ''}"
        )
    career = library.load_from_cache(ARTIST_NAME)
    if career is None:
        raise RuntimeError(f"无法从 cache 加载：{ARTIST_NAME}")
else:
    raise ValueError(f"未知 MODE: {MODE}")

# 可选：按专辑过滤
if SELECTED_ALBUMS:
    filtered_tracks = [t for t in career.tracks if t.album in SELECTED_ALBUMS]
    filtered_embeddings = [e for e in career.embeddings if e.track_ref.album in SELECTED_ALBUMS]
    filtered = ArtistCareer(artist_name=career.artist_name)
    filtered.tracks = filtered_tracks
    filtered.embeddings = filtered_embeddings
    career = filtered

albums = []
seen = set()
for t in career.tracks:
    if t.album not in seen:
        albums.append(t.album)
        seen.add(t.album)

print("Tracks:", len(career.tracks), "Embeddings:", len(career.embeddings))
print("Albums:", len(albums))
print("Album list (first 10):", albums[:10])

## 3. 初始化语义映射（可选）

- `USE_SEMANTICS=False`：簇标签仅显示 `C0/C1/...`，Radar 会跳过（或输出空图）。
- `USE_SEMANTICS=True`：会加载 MuQ‑MuLan 文本编码器，并把 tags 编码后用于：
  - 给簇命名（最近邻 tags）
  - Radar 图（语义维度对比）

In [None]:
semantic_mapper: SemanticMapper | None = None

if USE_SEMANTICS:
    if T2M_ROOT is None:
        raise RuntimeError(
            "USE_SEMANTICS=True 但未找到 pipeline_mulan.py。\n"
            "请在 t2m 仓库中运行，或把 t2m 根目录加入 PYTHONPATH。"
        )

    import torch
    from pipeline_mulan import MuQMuLanEncoder

    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Semantic device:", device)

    encoder = MuQMuLanEncoder(device=device)
    semantic_mapper = SemanticMapper(
        encoder=encoder,
        metadata_dir=str(METADATA_DIR),
        cache_dir=str(CACHE_DIR),
        device=device,
    )
    semantic_mapper.initialize_tags(max_tags=MAX_TAGS)

analyzer = StyleAnalyzer(career, semantic_mapper)

## 4. 聚类 + 生成簇标签（用于图表注释）

In [None]:
def build_cluster_labels(
    analyzer: StyleAnalyzer,
    clusters: dict[int, list],
    top_k: int = 5,
) -> tuple[dict[int, str], dict[int, str]]:
    """返回 (plot_labels, rich_labels)。

    - plot_labels: 用于图表（短标签：C0/C1/...）
    - rich_labels: 用于说明（带 tags：C0: xxx (0.85), ...）
    """
    plot_labels: dict[int, str] = {}
    rich_labels: dict[int, str] = {}

    for cid, tracks in clusters.items():
        plot_labels[cid] = f"C{cid}"

        if analyzer.mapper is None:
            rich_labels[cid] = f"C{cid}"
            continue

        vecs = [e.vector for e in analyzer.career.embeddings if e.track_ref in tracks]
        if not vecs:
            rich_labels[cid] = f"C{cid}"
            continue

        centroid = np.mean(np.stack(vecs), axis=0)
        tags = analyzer.mapper.get_nearest_tags(centroid, k=top_k)
        tag_str = ", ".join([f"{name} ({score:.2f})" for name, score in tags])
        rich_labels[cid] = f"C{cid}: {tag_str}"

    return plot_labels, rich_labels


if AUTO_K:
    n_tracks = len(analyzer.career.embeddings)
    max_k = min(10, max(2, n_tracks // 10))
    k = analyzer.find_optimal_k(k_range=(2, max_k))
else:
    k = int(K)

clusters = analyzer.cluster_songs(n_clusters=k)
plot_labels, rich_labels = build_cluster_labels(analyzer, clusters, top_k=5)

print("K =", k)
for cid in sorted(rich_labels.keys()):
    print(rich_labels[cid])

## 5. 生成所有图表（调用全部可视化接口）

In [None]:
figs: list[tuple[str, plt.Figure]] = []

# 1) Trajectory
fig_traj = GenreTrajectoryVisualizer.plot_2d_trajectory(
    analyzer,
    method=METHOD,
    clusters=clusters,
    cluster_labels=plot_labels,
    show_album_contours=SHOW_ALBUM_CONTOURS,
)
figs.append(("trajectory", fig_traj))

# 2) Streamgraph
fig_stream = CareerStoryteller.plot_streamgraph(analyzer, clusters, cluster_labels=plot_labels)
figs.append(("streamgraph", fig_stream))

# 3) Cluster Composition
fig_comp = CareerStoryteller.plot_cluster_composition(analyzer, clusters, cluster_labels=plot_labels)
figs.append(("cluster_composition", fig_comp))

# 4) Consistency
fig_cons = CareerStoryteller.plot_consistency(analyzer)
figs.append(("consistency", fig_cons))

# 5) Radar（需要语义 mapper）
if RADAR_ALBUMS is None:
    if len(albums) >= 2:
        radar_albums = [albums[0], albums[-1]]
    else:
        radar_albums = albums
else:
    radar_albums = RADAR_ALBUMS

if analyzer.mapper is None:
    fig_radar = plt.figure(figsize=(6, 4))
    plt.text(0.5, 0.5, "Radar 需要 USE_SEMANTICS=True", ha="center", va="center")
    plt.axis("off")
else:
    fig_radar = CareerStoryteller.plot_radar(analyzer, radar_albums, tags=RADAR_TAGS)

figs.append(("radar", fig_radar))

len(figs), [name for name, _ in figs]

## 6. 导出到一个 PDF（多页）

In [None]:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

with PdfPages(OUTPUT_PDF) as pdf:
    # 封面页：参数与簇标签（可复现实验）
    cover = plt.figure(figsize=(11.69, 8.27))  # A4 landscape
    cover.suptitle("Genres in Genres — Export", fontsize=16)
    txt = [
        f"time: {datetime.datetime.now().isoformat(timespec='seconds')}",
        f"mode: {MODE}",
        f"artist: {career.artist_name}",
        f"tracks: {len(career.tracks)} | embeddings: {len(career.embeddings)} | albums: {len(albums)}",
        f"method: {METHOD}",
        f"auto_k: {AUTO_K} | K: {k}",
        f"show_album_contours: {SHOW_ALBUM_CONTOURS}",
        f"use_semantics: {USE_SEMANTICS}",
        f"radar_albums: {radar_albums}",
        f"radar_tags: {RADAR_TAGS}",
        "",
        "cluster labels:",
    ]
    for cid in sorted(rich_labels.keys()):
        txt.append(f"- {rich_labels[cid]}")
    cover.text(0.02, 0.95, "\n".join(txt), va="top", family="monospace", fontsize=10)
    cover.tight_layout()
    pdf.savefig(cover)
    plt.close(cover)

    # 图表页
    for name, fig in figs:
        fig.suptitle(f"{career.artist_name} — {name}", fontsize=12)
        pdf.savefig(fig)
        plt.close(fig)

print("Saved:", OUTPUT_PDF)

## 7.（可选）批量实验：多组参数一次性导出

如果你想比较不同方法/不同 K，可以把下面的 `EXPERIMENTS` 打开，然后一次性导出到一个 PDF。

In [None]:
# EXAMPLE（按需启用）
# EXPERIMENTS = [
#     {"method": "pca", "k": 3, "show_album_contours": False},
#     {"method": "tsne", "k": 3, "show_album_contours": False},
#     {"method": "umap", "k": 4, "show_album_contours": True},
# ]
#
# batch_pdf = OUTPUT_DIR / f"batch_{career.artist_name.replace(' ', '_')}.pdf"
# with PdfPages(batch_pdf) as pdf:
#     for exp in EXPERIMENTS:
#         method = exp["method"]
#         k = exp["k"]
#         show_album_contours = exp["show_album_contours"]
#
#         clusters = analyzer.cluster_songs(n_clusters=k)
#         plot_labels, _ = build_cluster_labels(analyzer, clusters, top_k=5)
#
#         f1 = GenreTrajectoryVisualizer.plot_2d_trajectory(
#             analyzer,
#             method=method,
#             clusters=clusters,
#             cluster_labels=plot_labels,
#             show_album_contours=show_album_contours,
#         )
#         f1.suptitle(f"Trajectory — {method} | K={k} | contours={show_album_contours}")
#         pdf.savefig(f1)
#         plt.close(f1)
#
# print("Saved:", batch_pdf)
pass