In [None]:
!pip -q install aiolimiter

In [None]:
import sys
sys.path.insert(0, "/Users/jisoo/projects/thesis/carte_test")

from config import RAW, PROCESSED

In [4]:
from __future__ import annotations

import os
import json
import asyncio
import sqlite3
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple

import pandas as pd
import aiohttp
from aiolimiter import AsyncLimiter
from tqdm.auto import tqdm


TMDB_BASE_URL = "https://api.themoviedb.org/3"


# =========================
# 시간/토큰/헤더
# =========================
def utc_now_iso() -> str:
    # - UTC 현재시각 ISO 문자열 반환
    return datetime.now(timezone.utc).isoformat()


def get_tmdb_read_token() -> str:
    # - TMDB 토큰을 환경변수에서 로딩
    # - 하드코딩 방지(보안)
    # token = os.getenv("TMDB_READ_TOKEN")
    token = "eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiI0MjNhZTRhOWIwYjFlNmNkZTJiZDA4ZmI2MTJhNjgyYyIsIm5iZiI6MTc2NTA4ODQ5Mi4xMzUsInN1YiI6IjY5MzUxY2VjMzY0Y2Y5MTZmMjcwYWM1MiIsInNjb3BlcyI6WyJhcGlfcmVhZCJdLCJ2ZXJzaW9uIjoxfQ.26qCgFADmi5zLj6fGRldOg-cQYxlq8EOCcupprpVplI"
    if not token or not token.strip():
        raise RuntimeError(
            "TMDB_READ_TOKEN 환경변수를 설정해줘.\n"
            "예) export TMDB_READ_TOKEN='...'\n"
            "Colab이면: os.environ['TMDB_READ_TOKEN']='...'"
        )
    return token.strip()


def build_headers() -> Dict[str, str]:
    # - TMDB API 호출용 헤더 구성
    token = get_tmdb_read_token()
    return {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json;charset=utf-8",
        "Accept": "application/json",
    }


def load_tmdb_ids(movielens_links_csv: Path) -> List[int]:
    # - MovieLens links.csv에서 tmdbId 목록 로딩
    df = pd.read_csv(movielens_links_csv, encoding="utf-8")
    tmdb_ids = (
        pd.to_numeric(df["tmdbId"], errors="coerce")
        .dropna()
        .astype(int)
        .drop_duplicates()
        .sort_values()
        .tolist()
    )
    return tmdb_ids


# =========================
# TMDB fetch
# =========================
async def fetch_one(
    session: aiohttp.ClientSession,
    limiter: AsyncLimiter,
    headers: Dict[str, str],
    tmdb_id: int,
    language: str = "en-US",
    max_retries: int = 5,
) -> Dict[str, Any]:
    """
    - TMDB movie 상세 1건 호출 + 재시도
    - credits 포함: append_to_response=credits
    """
    url = f"{TMDB_BASE_URL}/movie/{tmdb_id}"
    params = {"language": language, "append_to_response": "credits"}

    for attempt in range(1, max_retries + 1):
        try:
            async with limiter:
                async with session.get(url, headers=headers, params=params) as resp:
                    if resp.status == 200:
                        return await resp.json()

                    # - NOT_FOUND는 재시도해도 의미 없음
                    if resp.status == 404:
                        text = await resp.text()
                        raise RuntimeError(f"HTTP 404 NOT_FOUND: {text[:200]}")

                    # - Rate limit / 서버 오류는 backoff 재시도
                    if resp.status in (429, 500, 502, 503, 504):
                        # - Retry-After 헤더가 있으면 우선 사용
                        retry_after = resp.headers.get("Retry-After")
                        if retry_after and retry_after.isdigit():
                            await asyncio.sleep(int(retry_after))
                        else:
                            await asyncio.sleep(1.7 ** attempt)
                        continue

                    # - 그 외 상태코드는 바로 실패 처리
                    text = await resp.text()
                    raise RuntimeError(f"HTTP {resp.status}: {text[:200]}")

        except aiohttp.ClientError as e:
            # - 네트워크 오류도 재시도
            if attempt < max_retries:
                await asyncio.sleep(1.7 ** attempt)
                continue
            raise RuntimeError(f"NETWORK_FAIL: {repr(e)}")

    raise RuntimeError("MAX_RETRY_EXCEEDED")


# =========================
# Run dir / parts / state(SQLite)
# =========================
def ensure_run_dirs(run_dir: Path) -> Dict[str, Path]:
    # - Drive(또는 로컬) 경로에 필요한 폴더 생성
    parts_dir = run_dir / "parts"
    logs_dir = run_dir / "logs"
    state_dir = run_dir / "state"
    parts_dir.mkdir(parents=True, exist_ok=True)
    logs_dir.mkdir(parents=True, exist_ok=True)
    state_dir.mkdir(parents=True, exist_ok=True)

    return {
        "parts_dir": parts_dir,
        "logs_dir": logs_dir,
        "state_dir": state_dir,
        "sqlite_path": state_dir / "fetch_state.sqlite3",
    }


def part_file_paths(parts_dir: Path, logs_dir: Path, part_idx: int) -> Tuple[Path, Path]:
    # - part index별 데이터/로그 파일 경로 생성
    data_path = parts_dir / f"tmdb_movies.part-{part_idx:05d}.jsonl"
    log_path = logs_dir / f"tmdb_fetch_log.part-{part_idx:05d}.jsonl"
    return data_path, log_path


def open_sqlite(sqlite_path: Path) -> sqlite3.Connection:
    # - SQLite 연결 + 테이블 생성
    conn = sqlite3.connect(str(sqlite_path))
    conn.execute("PRAGMA journal_mode=WAL;")  # - 충돌/크래시 내구성 강화
    conn.execute("PRAGMA synchronous=NORMAL;")  # - Drive I/O 고려한 균형 옵션

    conn.execute(
        """
        CREATE TABLE IF NOT EXISTS fetch_state (
            tmdb_id        INTEGER PRIMARY KEY,
            status         TEXT NOT NULL,             -- 'OK' or 'FAIL'
            fetched_at_utc TEXT NOT NULL,
            error          TEXT,
            updated_at_utc TEXT NOT NULL
        )
        """
    )
    conn.execute("CREATE INDEX IF NOT EXISTS idx_fetch_state_status ON fetch_state(status);")
    conn.commit()
    return conn


def load_ok_ids(conn: sqlite3.Connection) -> Set[int]:
    # - 이미 OK 처리된 tmdb_id 집합 로딩
    rows = conn.execute("SELECT tmdb_id FROM fetch_state WHERE status='OK'").fetchall()
    return {int(r[0]) for r in rows}


def load_fail_ids(conn: sqlite3.Connection) -> Set[int]:
    # - FAIL 처리된 tmdb_id 집합 로딩
    rows = conn.execute("SELECT tmdb_id FROM fetch_state WHERE status='FAIL'").fetchall()
    return {int(r[0]) for r in rows}


def count_ok(conn: sqlite3.Connection) -> int:
    # - OK 개수 카운트
    row = conn.execute("SELECT COUNT(*) FROM fetch_state WHERE status='OK'").fetchone()
    return int(row[0]) if row else 0


def upsert_state(
    conn: sqlite3.Connection,
    tmdb_id: int,
    status: str,
    fetched_at_utc: str,
    error: Optional[str],
) -> None:
    # - tmdb_id 상태 upsert
    now_utc = utc_now_iso()
    conn.execute(
        """
        INSERT INTO fetch_state (tmdb_id, status, fetched_at_utc, error, updated_at_utc)
        VALUES (?, ?, ?, ?, ?)
        ON CONFLICT(tmdb_id) DO UPDATE SET
            status=excluded.status,
            fetched_at_utc=excluded.fetched_at_utc,
            error=excluded.error,
            updated_at_utc=excluded.updated_at_utc
        """,
        (tmdb_id, status, fetched_at_utc, error, now_utc),
    )


# =========================
# Safe runner
# =========================
async def run_fetch_drive_parts_safe(
    tmdb_ids: List[int],
    run_dir: Path,
    output_jsonl_path: Path,
    output_log_parquet: Path,
    rps: int = 35,
    concurrency: int = 60,
    language: str = "en-US",
    part_size: int = 5000,
    flush_every: int = 300,
    retry_failed: bool = True,
) -> None:
    """
    ✅ 안전 리팩토링 포인트
    - done_ids.txt 제거 → SQLite(fetch_state.sqlite3)가 단일 truth
    - writer(단일 consumer)가 파일/DB 쓰기 담당 → 동시성 write 꼬임 방지
    - merge 시 tmdb_id 기준 dedup + 깨진 라인 무시 → 크래시/중복 내구성
    """
    paths = ensure_run_dirs(run_dir)
    parts_dir = paths["parts_dir"]
    logs_dir = paths["logs_dir"]
    sqlite_path = paths["sqlite_path"]

    headers = build_headers()
    limiter = AsyncLimiter(rps, time_period=1)

    # - 상태 DB 오픈
    conn = open_sqlite(sqlite_path)

    ok_ids = load_ok_ids(conn)
    fail_ids = load_fail_ids(conn)

    # - todo 구성
    #   - OK는 무조건 스킵
    #   - FAIL은 retry_failed=True일 때만 다시 시도
    todo_ids: List[int] = []
    for tid in tmdb_ids:
        if tid in ok_ids:
            continue
        if (tid in fail_ids) and (not retry_failed):
            continue
        todo_ids.append(tid)

    ok_count = count_ok(conn)

    print(
        f"[RESUME] ok={len(ok_ids):,} fail={len(fail_ids):,} "
        f"todo={len(todo_ids):,} (retry_failed={retry_failed})"
    )

    # - 현재 part 계산(OK 개수 기반)
    part_idx = ok_count // part_size
    part_written = ok_count % part_size

    data_path, log_path = part_file_paths(parts_dir, logs_dir, part_idx)

    # - append 모드로 이어쓰기
    data_f = data_path.open("a", encoding="utf-8")
    log_f = log_path.open("a", encoding="utf-8")

    # - 결과 큐(워커 → writer)
    result_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()

    connector = aiohttp.TCPConnector(limit=concurrency, ttl_dns_cache=300)
    timeout = aiohttp.ClientTimeout(total=60)

    async def rotate_part_if_needed() -> None:
        nonlocal part_idx, part_written, data_f, log_f, data_path, log_path
        if part_written < part_size:
            return

        # - 파일 flush/close 후 새 part로 교체
        data_f.flush()
        log_f.flush()
        data_f.close()
        log_f.close()

        part_idx += 1
        data_path, log_path = part_file_paths(parts_dir, logs_dir, part_idx)
        data_f = data_path.open("a", encoding="utf-8")
        log_f = log_path.open("a", encoding="utf-8")
        part_written = 0

    async def worker_loop(worker_idx: int, queue: asyncio.Queue[int], session: aiohttp.ClientSession) -> None:
        # - TMDB fetch 워커
        while True:
            try:
                tmdb_id = queue.get_nowait()
            except asyncio.QueueEmpty:
                break

            fetched_at = utc_now_iso()

            status = "OK"
            err: Optional[str] = None
            payload: Optional[Dict[str, Any]] = None

            try:
                payload = await fetch_one(
                    session=session,
                    limiter=limiter,
                    headers=headers,
                    tmdb_id=tmdb_id,
                    language=language,
                )
            except Exception as e:
                status = "FAIL"
                err = repr(e)

            await result_queue.put(
                {
                    "tmdb_id": tmdb_id,
                    "status": status,
                    "error": err,
                    "payload": payload,
                    "fetched_at_utc": fetched_at,
                }
            )
            queue.task_done()

    async def writer_loop(total: int) -> None:
        """
        - 단일 writer가 아래를 전담
          - parts jsonl 기록
          - logs jsonl 기록
          - sqlite 상태 upsert + batch commit
        """
        nonlocal part_written
        processed = 0
        pending_commit = 0

        with tqdm(total=total, desc="TMDB fetch (safe)") as pbar:
            while processed < total:
                item = await result_queue.get()

                tmdb_id = int(item["tmdb_id"])
                status = str(item["status"])
                err = item["error"]
                payload = item["payload"]
                fetched_at_utc = str(item["fetched_at_utc"])

                # - part 회전 체크
                await rotate_part_if_needed()

                # =========================
                # 1) logs 기록(항상 기록)
                # =========================
                log_f.write(
                    json.dumps(
                        {
                            "tmdb_id": tmdb_id,
                            "status": status,
                            "fetched_at_utc": fetched_at_utc,
                            "error": err,
                        },
                        ensure_ascii=False,
                    )
                    + "\n"
                )

                # =========================
                # 2) OK일 때만 data 기록
                #    - 중복 OK 방지: DB에서 이미 OK면 파일 기록 스킵
                # =========================
                if status == "OK":
                    # - 이미 OK이면 스킵(중복 저장 방지)
                    row = conn.execute(
                        "SELECT status FROM fetch_state WHERE tmdb_id=?",
                        (tmdb_id,),
                    ).fetchone()
                    if row and row[0] == "OK":
                        # - 그래도 로그/진행률은 처리됨
                        pass
                    else:
                        data_f.write(
                            json.dumps(
                                {"tmdb_id": tmdb_id, "fetched_at_utc": fetched_at_utc, "data": payload},
                                ensure_ascii=False,
                            )
                            + "\n"
                        )
                        part_written += 1

                # =========================
                # 3) 상태 upsert
                # =========================
                upsert_state(
                    conn=conn,
                    tmdb_id=tmdb_id,
                    status=status,
                    fetched_at_utc=fetched_at_utc,
                    error=err,
                )
                pending_commit += 1

                # =========================
                # 4) flush/commit 배치
                # =========================
                if pending_commit >= flush_every:
                    conn.commit()
                    data_f.flush()
                    log_f.flush()
                    pending_commit = 0

                processed += 1
                pbar.update(1)

                result_queue.task_done()

            # - 마지막 잔여 commit/flush
            conn.commit()
            data_f.flush()
            log_f.flush()

    # - 워커용 task queue 구성
    task_queue: asyncio.Queue[int] = asyncio.Queue()
    for tid in todo_ids:
        task_queue.put_nowait(int(tid))

    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        workers = [
            asyncio.create_task(worker_loop(i, task_queue, session))
            for i in range(concurrency)
        ]
        writer = asyncio.create_task(writer_loop(total=len(todo_ids)))

        await asyncio.gather(*workers)
        await task_queue.join()
        await result_queue.join()
        await writer

    # - 파일 닫기
    data_f.close()
    log_f.close()
    conn.close()

    print("[OK] parts/logs/state saved at:", run_dir)

    # =========================================================
    # 1) parts -> 단일 jsonl로 병합 (dedup + broken line skip)
    # =========================================================
    output_jsonl_path.parent.mkdir(parents=True, exist_ok=True)

    seen: Set[int] = set()
    part_files = sorted(parts_dir.glob("tmdb_movies.part-*.jsonl"))

    with output_jsonl_path.open("w", encoding="utf-8") as out:
        for p in part_files:
            with p.open("r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        obj = json.loads(line)
                        tid = obj.get("tmdb_id")
                        if not isinstance(tid, int):
                            continue
                        if tid in seen:
                            continue
                        seen.add(tid)
                        out.write(json.dumps(obj, ensure_ascii=False) + "\n")
                    except Exception:
                        # - 크래시로 인한 부분 라인 등은 무시
                        continue

    print("[OK] merged jsonl:", output_jsonl_path, "unique_tmdb_id=", len(seen))

    # =========================================================
    # 2) sqlite(fetch_state) -> parquet 로그(요약)
    # =========================================================
    # - 대용량 jsonl 로그를 전부 메모리에 올리는 대신
    #   상태 테이블(최종 OK/FAIL)을 parquet로 뽑음
    conn2 = sqlite3.connect(str(sqlite_path))
    df_state = pd.read_sql_query("SELECT * FROM fetch_state", conn2)
    conn2.close()

    output_log_parquet.parent.mkdir(parents=True, exist_ok=True)
    df_state.to_parquet(output_log_parquet, index=False)
    print("[OK] state parquet:", output_log_parquet, "rows=", len(df_state))




In [None]:
# =============================================================================
# 실행
# =============================================================================
tmdb_ids = load_tmdb_ids(RAW.LINKS_CSV)

# TMDB API 토큰 설정 (환경변수 또는 직접 입력)
# os.environ["TMDB_READ_TOKEN"] = "..."

await run_fetch_drive_parts_safe(
    tmdb_ids=tmdb_ids,
    run_dir=RAW.TMDB_RUN_DIR,
    output_jsonl_path=RAW.TMDB_MOVIES_JSONL,
    output_log_parquet=PROCESSED.TMDB_FETCH_STATE_PARQUET,
    rps=35,
    concurrency=60,
    part_size=5000,
    flush_every=300,
    retry_failed=True,
)