In [11]:
import sys, subprocess, json, csv, datetime, re
from pathlib import Path

# ===== 参数 =====
QUERY = "dermatology"                # 换成你的关键词
CHUNK = 100                          # 每批抓多少条（50~100比较稳）
LOOPS_PER_MODE = 40                  # 每种排序尝试多少轮（越大越慢；YouTube有自然上限）
BASE = Path("/Users/yhu10/Desktop/VLM/pathology_video")
OUT_DIR = BASE / "playlist"
OUT_DIR.mkdir(parents=True, exist_ok=True)
# =================

def ensure_yt_dlp():
    try:
        import yt_dlp  # noqa
    except Exception:
        print("[setup] Installing yt-dlp...")
        subprocess.run([sys.executable, "-m", "pip", "install", "--user", "yt-dlp"], check=True)
        import yt_dlp  # noqa

def run_ytdlp_lines(source: str, extractor_args: str = "youtube:search_client=youtubei"):
    """
    关键点：不加 --flat-playlist；直接 -j，逐行解析。
    不去假设最后一行是 entries，而是对每一行做 json.loads，抓到就存。
    """
    cmd = [sys.executable, "-m", "yt_dlp", "-j", "--extractor-args", extractor_args, source]
    res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    if res.returncode != 0:
        print(res.stdout)
        raise RuntimeError("yt-dlp failed")
    items = []
    for line in res.stdout.strip().splitlines():
        try:
            j = json.loads(line)
            items.append(j)
        except json.JSONDecodeError:
            pass
    return items

def collect_ids_titles(items):
    rows = []
    for j in items:
        # 兼容 video / playlist entries 的不同字段
        vid = j.get("id") or j.get("url")
        title = j.get("title") or ""
        if not vid or len(vid) < 6:  # 跳过无效
            continue
        rows.append({
            "video_id": vid,
            "title": title,
            "url": f"https://www.youtube.com/watch?v={vid}"
        })
    return rows

def search_many(query: str, chunk: int, loops_per_mode: int):
    """
    用两种模式：按日期 & 按相关性；并尝试多轮。
    注意：ytsearchN:* 并不真正分页，所以多轮会高度重叠；我们做去重。
    同时尝试两套 search_client，提高命中率。
    """
    modes = [f"ytsearchdate{chunk}:{query}", f"ytsearch{chunk}:{query}"]
    clients = ["youtube:search_client=youtubei", "youtube:search_client=youtube"]
    seen = set()
    results = []
    for source in modes:
        for client in clients:
            no_new_rounds = 0
            for _ in range(loops_per_mode):
                items = run_ytdlp_lines(source, extractor_args=client)
                rows = collect_ids_titles(items)
                added = 0
                for r in rows:
                    if r["video_id"] in seen:
                        continue
                    seen.add(r["video_id"])
                    results.append(r)
                    added += 1
                if added == 0:
                    no_new_rounds += 1
                # 连续两轮都没有新增，就换下一个模式/客户端
                if no_new_rounds >= 2:
                    break
    return results
ensure_yt_dlp()

In [12]:
print(f"[search] query={QUERY!r}, chunk={CHUNK}, loops/mode={LOOPS_PER_MODE}")
rows = search_many(QUERY, CHUNK, LOOPS_PER_MODE)
print(f"[search] collected (dedup): {len(rows)}")

[search] query='dermatology', chunk=100, loops/mode=40
[search] collected (dedup): 323


In [14]:
stamp = datetime.datetime.now().strftime("%Y%m%d")
safe_q = re.sub(r"[^A-Za-z0-9_]+", "_", QUERY.strip()) or "query"
out_csv = OUT_DIR / f"{safe_q}_all_{stamp}.csv"
with open(out_csv, "w", newline="", encoding="utf-8") as f:
    w = csv.DictWriter(f, fieldnames=["video_id", "title", "url"])
    w.writeheader()
    for r in rows:
        w.writerow(r)

print(f"[done] wrote: {out_csv} ({len(rows)} rows)")
out_csv

[done] wrote: /Users/yhu10/Desktop/VLM/pathology_video/playlist/dermatology_all_20251017.csv (323 rows)


PosixPath('/Users/yhu10/Desktop/VLM/pathology_video/playlist/dermatology_all_20251017.csv')