In [4]:
# 一键生成音乐 sample（100 首），按人气筛选并限制艺人/风格占比
# 运行位置：code/data_clean/data_clean_music_sample.ipynb
# 读入：../../raw_data/music_dataset.csv
# 导出：../../data_clean/music/music_sample_100.csv

from pathlib import Path
import pandas as pd
import numpy as np

# ---------- 路径 ----------
INPUT_PATH = Path("../../raw_data/music_dataset.csv")
OUTPUT_DIR = Path("../../data_clean/music")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_CSV = OUTPUT_DIR / "music_sample_100.csv"

# ---------- 参数（可调） ----------
TARGET_N = 100
MIN_POP = 50            # 人气下限（更耳熟可调高 60/70）
MIN_DUR = 90_000        # 1.5 分钟
MAX_DUR = 360_000       # 6 分钟
EXPLICIT_OK = False     # 是否允许露骨歌词
MAX_PER_ARTIST = 2      # 每位艺人最多几首
MAX_PER_GENRE = 10      # 每个风格最多几首

# ---------- 读取 ----------
df = pd.read_csv(INPUT_PATH)

# ---------- 基础清洗 ----------
num_cols = [
    "popularity","duration_ms","danceability","energy","key","loudness","mode",
    "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature"
]
for c in num_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

# 标准化关键文本列
for c in ["track_id","track_name","artists","track_genre","album_name","explicit"]:
    if c in df.columns:
        df[c] = df[c].astype(str).fillna("")

# popularity 不在时设为 0（避免报错）
if "popularity" not in df.columns:
    df["popularity"] = 0

# 去重：优先按 track_id 去重，否则按 (artists, track_name)
if "track_id" in df.columns and df["track_id"].notna().any():
    df = df.drop_duplicates(subset=["track_id"])
else:
    df = df.drop_duplicates(subset=["artists","track_name"])

# ---------- 质量闸门 ----------
mask = pd.Series(True, index=df.index)

# 人气与时长
if "popularity" in df.columns:
    mask &= (df["popularity"] >= MIN_POP)
if "duration_ms" in df.columns:
    mask &= df["duration_ms"].between(MIN_DUR, MAX_DUR, inclusive="both")

# explicit 过滤（如果列存在）
if not EXPLICIT_OK and "explicit" in df.columns:
    # 规范为布尔
    df["explicit_norm"] = df["explicit"].astype(str).str.upper().isin(["TRUE","1","T","YES"])
    mask &= (~df["explicit_norm"])

df_filt = df[mask].copy()

# 若筛完不足，放宽人气补全
if len(df_filt) < TARGET_N:
    df_relax = df.copy()
    if "duration_ms" in df_relax.columns:
        df_relax = df_relax[df_relax["duration_ms"].between(MIN_DUR, MAX_DUR, inclusive="both")]
    if not EXPLICIT_OK and "explicit_norm" in df_relax.columns:
        df_relax = df_relax[~df_relax["explicit_norm"]]
    df_filt = pd.concat([df_filt, df_relax]).drop_duplicates(subset=["track_id"] if "track_id" in df.columns else ["artists","track_name"])

# ---------- 排序（按人气） ----------
df_filt = df_filt.sort_values(["popularity"], ascending=False)

# ---------- 分桶限额挑选 ----------
selected = []
artist_count = {}
genre_count = {}

for _, row in df_filt.iterrows():
    artist = str(row.get("artists","")).strip()
    genre  = str(row.get("track_genre","")).strip()
    tid    = str(row.get("track_id","")).strip()

    if artist and artist_count.get(artist, 0) >= MAX_PER_ARTIST:
        continue
    if genre and genre_count.get(genre, 0) >= MAX_PER_GENRE:
        continue

    selected.append(row)
    if artist:
        artist_count[artist] = artist_count.get(artist, 0) + 1
    if genre:
        genre_count[genre]   = genre_count.get(genre, 0) + 1

    if len(selected) >= TARGET_N:
        break

sel_df = pd.DataFrame(selected)

# 仍不足则无视限额，按人气补齐
if len(sel_df) < TARGET_N:
    if "track_id" in df_filt.columns:
        remaining = df_filt[~df_filt["track_id"].isin(sel_df.get("track_id", pd.Series([], dtype=str)))]
    else:
        remaining = df_filt.merge(
            sel_df[["artists","track_name"]], on=["artists","track_name"], how="left", indicator=True
        )
        remaining = remaining[remaining["_merge"] == "left_only"].drop(columns=["_merge"])
    extra = remaining.head(TARGET_N - len(sel_df))
    sel_df = pd.concat([sel_df, extra], ignore_index=True)

# ---------- 导出 ----------
keep_cols = [
    "track_id","artists","album_name","track_name","track_genre","popularity","duration_ms","explicit",
    "danceability","energy","loudness","speechiness","acousticness","instrumentalness","liveness","valence","tempo",
    "key","mode","time_signature"
]
keep_cols = [c for c in keep_cols if c in sel_df.columns]
out = sel_df[keep_cols].reset_index(drop=True)
out.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")

print(f"[OK] Saved {len(out)} rows -> {OUTPUT_CSV}")
print(f"Artists covered: {out['artists'].nunique() if 'artists' in out.columns else 'NA'}")
print(f"Genres covered:  {out['track_genre'].nunique() if 'track_genre' in out.columns else 'NA'}")


[OK] Saved 100 rows -> ../../data_clean/music/music_sample_100.csv
Artists covered: 86
Genres covered:  28
