In [None]:
import time, math, json, logging, os
from pathlib import Path
from typing import Dict, Any, List, Optional
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

LOG = logging.getLogger("tmdb")
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s")

try:
    from dotenv import load_dotenv
    # 우선순위: tmdbKey.env > .env (둘 다 있으면 둘 다 읽되, 먼저 읽은 값이 유지됨)
    if os.path.exists("tmdbKey.env"):
        load_dotenv("tmdbKey.env")   # TMDB_API_KEY=...
    load_dotenv()                    # .env (있으면)
except Exception:
    pass

TMDB_API_KEY = os.getenv("TMDB_API_KEY")

def _require_tmdb_key():
    if not TMDB_API_KEY or not TMDB_API_KEY.strip():
        raise RuntimeError(
            "TMDB_API_KEY가 설정되지 않았습니다. "
            "tmdbKey.env 또는 .env 파일에 TMDB_API_KEY=... 를 넣거나, "
            "환경변수로 설정해 주세요."
        )

BASE = Path(".")
OUT  = BASE / "artifacts"; OUT.mkdir(exist_ok=True)

def make_session() -> requests.Session:
    s = requests.Session()
    retry = Retry(
        total=8,
        connect=5,
        read=5,
        backoff_factor=0.8,               # 지수 백오프
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"],
        raise_on_status=False,
        respect_retry_after_header=True,
    )
    s.headers.update({
        "Accept": "application/json",
        "User-Agent": "CBF-enrichment/1.0 (+contact: your_email@example.com)"
    })
    adapter = HTTPAdapter(max_retries=retry, pool_connections=50, pool_maxsize=50)
    s.mount("https://", adapter); s.mount("http://", adapter)
    return s

def fetch_tmdb_movie(session: requests.Session, tmdb_id: int, lang="en-US") -> Optional[Dict[str, Any]]:
    _require_tmdb_key()
    url = f"https://api.themoviedb.org/3/movie/{tmdb_id}"
    params = {
        "api_key": TMDB_API_KEY,
        "append_to_response": "credits",
        "language": lang,
    }
    try:
        r = session.get(url, params=params, timeout=(5, 15))
        if r.status_code == 429:
            # rate limit: Retry-After 존중
            retry_after = int(r.headers.get("Retry-After", "2"))
            LOG.warning("429 received. sleeping %ss", retry_after)
            time.sleep(retry_after)
            return fetch_tmdb_movie(session, tmdb_id, lang)
        if r.status_code >= 500:
            LOG.warning("Server %s for %s", r.status_code, tmdb_id)
            return None
        if r.status_code == 404:
            return None
        r.raise_for_status()
        return r.json()
    except requests.exceptions.ReadTimeout:
        LOG.warning("Read timeout for tmdbId=%s", tmdb_id)
    except requests.exceptions.ConnectionError as e:
        LOG.warning("Connection error for tmdbId=%s: %s", tmdb_id, e)
    except Exception as e:
        LOG.warning("Unexpected error for tmdbId=%s: %s", tmdb_id, e)
    return None

def extract_fields(js: Dict[str, Any]) -> Dict[str, Any]:
    # director, genres, runtime, popularity, release_year
    credits = js.get("credits") or {}
    crew = credits.get("crew") or []
    director = None
    for c in crew:
        if (c.get("job") == "Director") or (c.get("known_for_department") == "Directing" and c.get("job") == "Director"):
            director = c.get("name"); break
    genres = [g.get("name") for g in (js.get("genres") or []) if g.get("name")]
    release_date = js.get("release_date") or ""
    year = None
    if len(release_date) >= 4 and release_date[:4].isdigit():
        year = int(release_date[:4])
    return {
        "tmdbId": js.get("id"),
        "director": director,
        "genres_tmdb": "|".join(genres) if genres else None,
        "runtime": js.get("runtime"),
        "popularity": js.get("popularity"),
        "release_year": year,
        "title_tmdb": js.get("title"),
        "original_language": js.get("original_language"),
    }

def enrich_tmdb_for_links(links_csv: Path, batch_size=40, sleep_sec=0.35):
    # MovieLens 'links.csv' 필요: movieId, imdbId, tmdbId
    links = pd.read_csv(links_csv)
    links = links.dropna(subset=["tmdbId"]).copy()
    links["tmdbId"] = links["tmdbId"].astype(int)

    session = make_session()
    rows: List[Dict[str, Any]] = []
    failed: List[int] = []

    for i, tmdb_id in enumerate(links["tmdbId"].tolist(), start=1):
        js = fetch_tmdb_movie(session, tmdb_id)
        if js:
            rows.append(extract_fields(js))
        else:
            failed.append(tmdb_id)

        # 배치 저장 & 슬로틀링
        if (i % batch_size) == 0:
            df_part = pd.DataFrame(rows)
            if not df_part.empty:
                path_part = OUT / f"tmdb_raw_part_{i//batch_size:04d}.parquet"
                df_part.to_parquet(path_part, index=False)
                LOG.info("saved batch %s rows=%s -> %s", i//batch_size, len(df_part), path_part)
            rows.clear()
            if sleep_sec > 0:
                time.sleep(sleep_sec)

    # 남은 것 저장
    if rows:
        path_part = OUT / f"tmdb_raw_part_final.parquet"
        pd.DataFrame(rows).to_parquet(path_part, index=False)
        LOG.info("saved final part rows=%s -> %s", len(rows), path_part)

    # 실패 목록 저장
    (OUT / "tmdb_failed_ids.json").write_text(json.dumps(failed, ensure_ascii=False, indent=2))
    LOG.info("failed ids saved: %s items", len(failed))

def build_tmdb_items_enriched(links_csv: Path, content_parquet: Path) -> pd.DataFrame:
    # Raw 파츠 병합
    parts = sorted(OUT.glob("tmdb_raw_part_*.parquet"))
    if not parts:
        raise RuntimeError("tmdb_raw_part_*.parquet not found. Run enrich_tmdb_for_links() first.")
    tmdb_raw = pd.concat([pd.read_parquet(p) for p in parts], ignore_index=True).drop_duplicates("tmdbId")

    # MovieLens content와 연결 (movieId 얻기)
    links = pd.read_csv(links_csv)[["movieId","tmdbId"]].dropna()
    links["tmdbId"] = links["tmdbId"].astype(int)

    df = links.merge(tmdb_raw, on="tmdbId", how="left")
    # 가벼운 클린업/스케일 준비
    # (필요하다면 여기서 runtime/popularity/연도 필터링; 드랍 비율 로그 필수)
    before = len(df)
    # 예: 음수 런타임 제거
    df = df[(df["runtime"].isna()) | (df["runtime"] >= 0)]
    LOG.info("dropped by runtime filter: %s", before - len(df))

    assert len(df) > 0, "Enrichment produced zero rows — aborting save"
    out_path = BASE / "tmdb_items_enriched.parquet"
    df.to_parquet(out_path, index=False)
    LOG.info("tmdb_items_enriched saved: %s rows -> %s", len(df), out_path)
    return df

# 사용 순서:
enrich_tmdb_for_links(Path('links.csv'), batch_size=40, sleep_sec=0.35)
build_tmdb_items_enriched(Path('links.csv'), Path('movielens_content.parquet'))

Unnamed: 0,movieId,tmdbId,director,genres_tmdb,runtime,popularity,release_year,title_tmdb,original_language
0,1,862,John Lasseter,Family|Comedy|Animation|Adventure,81.0,14.2360,1995.0,Toy Story,en
1,2,8844,Joe Johnston,Adventure|Fantasy|Family,104.0,2.1723,1995.0,Jumanji,en
2,3,15602,Howard Deutch,Romance|Comedy,101.0,2.5894,1995.0,Grumpier Old Men,en
3,4,31357,Forest Whitaker,Comedy|Drama|Romance,127.0,2.5470,1995.0,Waiting to Exhale,en
4,5,11862,Charles Shyer,Comedy|Family,106.0,3.5329,1995.0,Father of the Bride Part II,en
...,...,...,...,...,...,...,...,...,...
62311,209157,499546,Rene Eller,Drama,100.0,3.5400,2018.0,We,nl
62312,209159,63407,João Jardim,Documentary,73.0,4.1092,2001.0,Janela da Alma,pt
62313,209163,553036,Gábor Reisz,Comedy|Drama,97.0,0.3582,2018.0,Bad Poems,hu
62314,209169,162892,,,,,,,


In [1]:
!pip install requests pandas pyarrow python-dotenv



In [3]:
import pandas as pd
import numpy as np
from pathlib import Path

BASE = Path(".")

files = {
    # "movielens_content": BASE / "movielens_content.parquet",
    # "movielens_genome_long": BASE / "movielens_genome_long.parquet",
    # "tmdb_items_enriched": BASE / "tmdb_items_enriched_update.parquet",
    "netflix_ratings": BASE / "netflix_ratings.parquet",
}

def schema_summary(df: pd.DataFrame) -> pd.DataFrame:
    ex_vals = []
    for col in df.columns:
        s = df[col].dropna()
        v = s.iloc[0] if len(s) else np.nan
        if isinstance(v, str) and len(v) > 80:
            v = v[:77] + "…"
        ex_vals.append(v)
    return pd.DataFrame({
        "col_idx": range(len(df.columns)),
        "column": df.columns,
        "dtype": [str(t) for t in df.dtypes],
        "non_null": [int(df[c].notna().sum()) for c in df.columns],
        "nulls": [int(df[c].isna().sum()) for c in df.columns],
        "example": ex_vals
    })

OUT = BASE / "inspect_out"
OUT.mkdir(exist_ok=True)

for name, path in files.items():
    print(f"\n=== {name} ===")
    df = pd.read_parquet(path)
    print(f"shape: {df.shape}")

    # 스키마 표 저장
    sch = schema_summary(df)
    sch.to_csv(OUT / f"{name}_schema.csv", index=False)
    print(f"- schema -> {OUT / f'{name}_schema.csv'}")

    # 미리보기 저장
    prev = df.head(20)
    prev.to_csv(OUT / f"{name}_preview.csv", index=False)
    print(f"- preview(20 rows) -> {OUT / f'{name}_preview.csv'}")



=== netflix_ratings ===
shape: (100480507, 4)
- schema -> inspect_out/netflix_ratings_schema.csv
- preview(20 rows) -> inspect_out/netflix_ratings_preview.csv
