In [None]:
# ============================================================
# 01_ingest_historical.py (Jupyter-friendly)  ✅ FULL CORRECTED
# - Backfills historical Sleeper drafts/picks and builds:
#     1) ADP time series (month anchored on draft start_time)
#     2) Auction price time series (month anchored on draft start_time)
# - Time axis is the DRAFT start_time (epoch ms), not file pull time
# ============================================================

import os
import time
from typing import Any, Dict, List, Optional, Tuple, Set
from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
import pandas as pd
import requests
from tqdm import tqdm


# -----------------------------
# CONFIG
# -----------------------------
SEASONS = [2026]     # <-- set your historical years here
SEED_USERS = [
    ("camsnotsober", "567994319854673920"),
    ("dynastybuck", "332066581859282944"),
    ("curtistodd", "568256222760906752"),
    ("elnostrathomas", "387839476958965760"),
    ("coombesie9", "386648007942254592"),
]

# discovery controls
MAX_EXPANSION_STEPS = 2       # 0 = only seed users, 1 = one hop, etc.
MAX_USERS_PER_STEP = 2500
MAX_LEAGUES_TOTAL = 20000

# request / parallelism controls
MAX_WORKERS = 40
CHUNK_SIZE = 400
SLEEP_BETWEEN_CHUNKS_SEC = 8

# filters for ADP/Auction (keep broad in 01; filter later in app)
KEEP_DYNASTY_CLASSES = {"startup", "rookie"}  # only build time series for these

ROOT_DIR = "sleeper_dynasty_adp"

DIR_RAW_LEAGUES      = os.path.join(ROOT_DIR, "data", "raw", "leagues")
DIR_RAW_LEAGUE_USERS = os.path.join(ROOT_DIR, "data", "raw", "league_users")
DIR_RAW_DRAFTS       = os.path.join(ROOT_DIR, "data", "raw", "drafts")
DIR_RAW_PICKS        = os.path.join(ROOT_DIR, "data", "raw", "picks")

DIR_SNAP_ADP_TS      = os.path.join(ROOT_DIR, "data", "snapshots", "adp_time_series")
DIR_SNAP_DRAFT_CAT   = os.path.join(ROOT_DIR, "data", "snapshots", "draft_catalog")

# ✅ NEW: Auction snapshots (mirrors ADP structure)
DIR_SNAP_AUCTION_TS  = os.path.join(ROOT_DIR, "data", "snapshots", "auction_price_series")
DIR_SNAP_AUCTION_CAT = os.path.join(ROOT_DIR, "data", "snapshots", "auction_draft_catalog")

for d in [
    DIR_RAW_LEAGUES, DIR_RAW_LEAGUE_USERS, DIR_RAW_DRAFTS, DIR_RAW_PICKS,
    DIR_SNAP_ADP_TS, DIR_SNAP_DRAFT_CAT,
    DIR_SNAP_AUCTION_TS, DIR_SNAP_AUCTION_CAT,
]:
    os.makedirs(d, exist_ok=True)


# -----------------------------
# HTTP
# -----------------------------
BASE = "https://api.sleeper.app/v1"
session = requests.Session()
session.headers.update({"User-Agent": "Sleeper-Dynasty-ADP/1.0"})


def get_json(url: str, timeout: int = 30, retries: int = 4, backoff: float = 1.8) -> Any:
    last_err = None
    for i in range(retries):
        try:
            r = session.get(url, timeout=timeout)
            if r.status_code == 429:
                time.sleep(min(30, (backoff ** i) + 1))
                continue
            r.raise_for_status()
            return r.json()
        except Exception as e:
            last_err = e
            time.sleep(min(30, (backoff ** i) + 0.5))
    raise RuntimeError(f"GET failed: {url}\nLast error: {last_err}")


def chunked(lst: List[Any], n: int):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


def parallel_fetch(urls: List[str], desc: str) -> List[Tuple[str, Any, Optional[str]]]:
    out: List[Tuple[str, Any, Optional[str]]] = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = {ex.submit(get_json, u): u for u in urls}
        for fut in tqdm(as_completed(futs), total=len(futs), desc=desc):
            u = futs[fut]
            try:
                out.append((u, fut.result(), None))
            except Exception as e:
                out.append((u, None, str(e)))
    return out


# -----------------------------
# URL helpers
# -----------------------------
def url_user_leagues(user_id: str, season: int) -> str:
    return f"{BASE}/user/{user_id}/leagues/nfl/{season}"

def url_league_users(league_id: str) -> str:
    return f"{BASE}/league/{league_id}/users"

def url_league_drafts(league_id: str) -> str:
    return f"{BASE}/league/{league_id}/drafts"

def url_draft_picks(draft_id: str) -> str:
    return f"{BASE}/draft/{draft_id}/picks"


# -----------------------------
# SAFE TIME CONVERSION (fixes overflow)
# -----------------------------
def safe_ms_to_datetime_utc(ms_series: pd.Series, *, label: str = "start_time") -> pd.Series:
    """
    Convert epoch-ms -> UTC datetime safely.
    Masks out-of-range ms values so pandas/numpy never overflows internally.
    """
    s = pd.to_numeric(ms_series, errors="coerce")

    # plausible bounds for Sleeper NFL drafts
    lower = pd.Timestamp("2010-01-01", tz="UTC").value // 1_000_000  # ns -> ms
    upper = pd.Timestamp("2036-12-31", tz="UTC").value // 1_000_000

    bad = s.notna() & ((s < lower) | (s > upper))
    bad_count = int(bad.sum())

    if bad_count > 0:
        examples = ms_series[bad].head(5).tolist()
        print(f"[warn] {label}: found {bad_count:,} out-of-range ms values. Examples: {examples}")

    s = s.mask(bad, np.nan)
    return pd.to_datetime(s, unit="ms", utc=True, errors="coerce")


# -----------------------------
# DISCOVERY (leagues + league_users)
# -----------------------------
def fetch_leagues_for_users(user_ids: List[str], season: int, season_tag: str) -> pd.DataFrame:
    urls = [url_user_leagues(uid, season) for uid in user_ids]
    rows = []
    for i, chunk in enumerate(chunked(urls, CHUNK_SIZE), start=1):
        res = parallel_fetch(chunk, desc=f"[{season_tag}] leagues chunk {i} ({len(chunk)})")
        for _u, data, err in res:
            if err or data is None:
                continue
            for lg in data:
                lg["_season"] = season
                rows.append(lg)
        if len(urls) > CHUNK_SIZE:
            time.sleep(SLEEP_BETWEEN_CHUNKS_SEC)

    if not rows:
        return pd.DataFrame()
    return pd.json_normalize(rows).drop_duplicates(subset=["league_id"])


def fetch_users_for_leagues(league_ids: List[str], season_tag: str) -> pd.DataFrame:
    urls = [url_league_users(lid) for lid in league_ids]
    rows = []
    for i, chunk in enumerate(chunked(urls, CHUNK_SIZE), start=1):
        res = parallel_fetch(chunk, desc=f"[{season_tag}] league users chunk {i} ({len(chunk)})")
        for u, data, err in res:
            if err or data is None:
                continue
            league_id = u.split("/league/")[1].split("/users")[0]
            for usr in data:
                usr["_league_id"] = league_id
                rows.append(usr)
        if len(urls) > CHUNK_SIZE:
            time.sleep(SLEEP_BETWEEN_CHUNKS_SEC)

    if not rows:
        return pd.DataFrame()
    return pd.json_normalize(rows)


def discover_leagues(season: int, seed_users: List[Tuple[str, str]]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    season_tag = str(season)

    frontier_users = [uid for _name, uid in seed_users]
    seen_users: Set[str] = set(frontier_users)
    seen_leagues: Set[str] = set()

    leagues_parts: List[pd.DataFrame] = []
    memberships_parts: List[pd.DataFrame] = []

    for step in range(MAX_EXPANSION_STEPS + 1):
        frontier_users = frontier_users[:MAX_USERS_PER_STEP]
        print(f"\n=== [{season_tag}] DISCOVERY STEP {step} | users={len(frontier_users)} ===")

        leagues_df = fetch_leagues_for_users(frontier_users, season, season_tag)
        if leagues_df.empty:
            break

        leagues_df["league_id"] = leagues_df["league_id"].astype(str)
        new_leagues_df = leagues_df[~leagues_df["league_id"].isin(seen_leagues)].copy()
        print(f"[{season_tag}] Leagues fetched={len(leagues_df):,} | new={len(new_leagues_df):,}")

        if new_leagues_df.empty:
            break

        leagues_parts.append(new_leagues_df)
        new_league_ids = new_leagues_df["league_id"].tolist()
        seen_leagues.update(new_league_ids)

        if len(seen_leagues) >= MAX_LEAGUES_TOTAL:
            print(f"[{season_tag}] Hit MAX_LEAGUES_TOTAL cap.")
            break

        mem_df = fetch_users_for_leagues(new_league_ids, season_tag)
        if not mem_df.empty:
            memberships_parts.append(mem_df)

        # stop expansion if no more steps
        if step == MAX_EXPANSION_STEPS or mem_df.empty or "user_id" not in mem_df.columns:
            break

        discovered_users = mem_df["user_id"].dropna().astype(str).unique().tolist()
        frontier_users = [u for u in discovered_users if u not in seen_users]
        seen_users.update(frontier_users)
        print(f"[{season_tag}] Next frontier users={len(frontier_users):,} | total users seen={len(seen_users):,}")

    leagues_out = pd.concat(leagues_parts, ignore_index=True) if leagues_parts else pd.DataFrame()
    memberships_out = pd.concat(memberships_parts, ignore_index=True) if memberships_parts else pd.DataFrame()

    # write raw
    leagues_path = os.path.join(DIR_RAW_LEAGUES, f"leagues_{season}.parquet")
    users_path  = os.path.join(DIR_RAW_LEAGUE_USERS, f"league_users_{season}.parquet")
    leagues_out.to_parquet(leagues_path, index=False)
    memberships_out.to_parquet(users_path, index=False)

    print(f"[{season_tag}] leagues={leagues_out.shape} users={memberships_out.shape}")
    return leagues_out, memberships_out


# -----------------------------
# DRAFTS
# -----------------------------
def draft_to_row(d: dict, league_id: str, season: int) -> dict:
    md = d.get("metadata") or {}
    st = d.get("settings") or {}

    return {
        "draft_id": str(d.get("draft_id") or ""),
        "league_id": str(league_id),
        "season": int(season),

        "draft_status": d.get("status"),
        "type": d.get("type"),          # snake / linear / auction
        "sport": d.get("sport"),
        "season_type": d.get("season_type"),

        "created": d.get("created"),
        "start_time": d.get("start_time"),
        "last_picked": d.get("last_picked"),

        "md_scoring_type": md.get("scoring_type"),  # includes dynasty_*
        "md_name": md.get("name"),
        "md_league_type": md.get("league_type"),

        "st_teams": st.get("teams"),
        "st_rounds": st.get("rounds"),
        "st_pick_timer": st.get("pick_timer"),
        "st_reversal_round": st.get("reversal_round"),

        "st_slots_qb": st.get("slots_qb"),
        "st_slots_rb": st.get("slots_rb"),
        "st_slots_wr": st.get("slots_wr"),
        "st_slots_te": st.get("slots_te"),
        "st_slots_flex": st.get("slots_flex"),
        "st_slots_super_flex": st.get("slots_super_flex"),
        "st_slots_def": st.get("slots_def"),
        "st_slots_k": st.get("slots_k"),
    }


def fetch_drafts_for_leagues(league_ids: List[str], season: int) -> pd.DataFrame:
    season_tag = str(season)
    urls = [url_league_drafts(lid) for lid in league_ids]
    parts, buf = [], []

    for i, chunk in enumerate(chunked(urls, CHUNK_SIZE), start=1):
        res = parallel_fetch(chunk, desc=f"[{season_tag}] drafts chunk {i} ({len(chunk)})")
        for u, data, err in res:
            if err or data is None:
                continue
            league_id = u.split("/league/")[1].split("/drafts")[0]
            for d in data:
                row = draft_to_row(d, league_id, season)
                if row["draft_id"]:
                    buf.append(row)

        if buf:
            parts.append(pd.DataFrame(buf).drop_duplicates(subset=["draft_id"]))
            buf = []

        if len(urls) > CHUNK_SIZE:
            time.sleep(SLEEP_BETWEEN_CHUNKS_SEC)

    drafts_df = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
    out_path = os.path.join(DIR_RAW_DRAFTS, f"drafts_{season}.parquet")
    drafts_df.to_parquet(out_path, index=False)
    print(f"[{season_tag}] drafts={drafts_df.shape}")
    return drafts_df


def build_draft_catalog(drafts_df: pd.DataFrame) -> pd.DataFrame:
    df = drafts_df.copy()

    # normalize id cols
    for c in ["draft_id", "league_id"]:
        if c in df.columns:
            df[c] = df[c].astype(str)

    # numeric
    num_cols = [
        "created", "start_time", "last_picked",
        "st_teams", "st_rounds", "st_pick_timer", "st_reversal_round",
        "st_slots_qb", "st_slots_rb", "st_slots_wr", "st_slots_te",
        "st_slots_flex", "st_slots_super_flex", "st_slots_def", "st_slots_k",
    ]
    for c in num_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # SAFE time anchors
    df["start_dt"] = safe_ms_to_datetime_utc(df["start_time"], label="start_time")
    df["start_date"] = df["start_dt"].dt.date.astype("string")
    df["start_month"] = df["start_dt"].dt.strftime("%Y-%m")

    # flags
    df["is_dynasty"] = df["md_scoring_type"].astype(str).str.contains("dynasty", case=False, na=False)
    df["is_superflex"] = (df.get("st_slots_super_flex", 0).fillna(0) > 0) | df["md_scoring_type"].astype(str).str.contains(
        "2qb|superflex", case=False, na=False
    )

    # dynasty class
    def _dynasty_class(row) -> str:
        if not bool(row.get("is_dynasty", False)):
            return "non_dynasty"
        rounds = row.get("st_rounds", np.nan)
        if pd.notna(rounds) and rounds <= 6:
            return "rookie"
        if pd.notna(rounds) and rounds >= 14:
            return "startup"
        return "other"

    df["dynasty_class"] = df.apply(_dynasty_class, axis=1)

    return df


# ✅ NEW: auction-only catalog builder
def build_auction_draft_catalog(draft_catalog: pd.DataFrame) -> pd.DataFrame:
    if draft_catalog.empty or "type" not in draft_catalog.columns:
        return pd.DataFrame()
    return draft_catalog[draft_catalog["type"].astype(str).str.lower() == "auction"].copy()


# -----------------------------
# PICKS
# -----------------------------
def pick_to_row(p: dict, draft_id: str) -> dict:
    md = p.get("metadata") or {}
    return {
        "draft_id": str(draft_id),
        "player_id": p.get("player_id"),
        "pick_no": p.get("pick_no"),
        "round": p.get("round"),
        "draft_slot": p.get("draft_slot"),
        "is_keeper": p.get("is_keeper"),

        "md_first_name": md.get("first_name"),
        "md_last_name": md.get("last_name"),
        "md_team": md.get("team"),
        "md_pos": md.get("position"),

        # ✅ NEW: auction price (exists for auction picks; null otherwise)
        "md_amount": md.get("amount"),
    }


def fetch_picks_for_completed_drafts(draft_catalog: pd.DataFrame, season: int) -> pd.DataFrame:
    season_tag = str(season)

    # robust status column
    status_col = "draft_status" if "draft_status" in draft_catalog.columns else "status"
    completed_ids = (
        draft_catalog.loc[draft_catalog[status_col].astype(str).str.lower() == "complete", "draft_id"]
        .astype(str).dropna().unique().tolist()
    )

    urls = [url_draft_picks(did) for did in completed_ids]
    parts, buf = [], []

    for i, chunk in enumerate(chunked(urls, CHUNK_SIZE), start=1):
        res = parallel_fetch(chunk, desc=f"[{season_tag}] picks chunk {i} ({len(chunk)})")
        for u, data, err in res:
            if err or data is None:
                continue
            draft_id = u.split("/draft/")[1].split("/picks")[0]
            for p in data:
                buf.append(pick_to_row(p, draft_id))

        if buf:
            parts.append(pd.DataFrame(buf))
            buf = []

        if len(urls) > CHUNK_SIZE:
            time.sleep(SLEEP_BETWEEN_CHUNKS_SEC)

    picks_df = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
    out_path = os.path.join(DIR_RAW_PICKS, f"picks_{season}.parquet")
    picks_df.to_parquet(out_path, index=False)
    print(f"[{season_tag}] picks={picks_df.shape}")
    return picks_df


# -----------------------------
# ADP TIME SERIES (month anchored)
# -----------------------------
def compute_adp_time_series(picks_df: pd.DataFrame, draft_catalog: pd.DataFrame) -> pd.DataFrame:
    p = picks_df.copy()
    p["draft_id"] = p["draft_id"].astype(str)
    p["player_id"] = p["player_id"].astype(str)
    p["pick_no"] = pd.to_numeric(p["pick_no"], errors="coerce")

    d = draft_catalog.copy()
    d["draft_id"] = d["draft_id"].astype(str)

    keep_cols = [
        "draft_id", "season", "start_dt", "start_month",
        "dynasty_class", "type", "md_scoring_type", "st_teams", "st_rounds", "is_superflex",
        "draft_status"
    ]
    keep_cols = [c for c in keep_cols if c in d.columns]
    m = p.merge(d[keep_cols], on="draft_id", how="left")

    m = m[m["pick_no"].notna() & m["player_id"].notna()].copy()
    m = m[m["start_month"].notna()].copy()
    m = m[m["dynasty_class"].isin(list(KEEP_DYNASTY_CLASSES))].copy()

    out = (
        m.groupby(
            ["season", "start_month", "player_id", "dynasty_class", "type", "md_scoring_type", "st_teams", "st_rounds", "is_superflex"],
            dropna=False
        )
        .agg(
            drafts=("draft_id", "nunique"),
            picks=("pick_no", "size"),
            adp=("pick_no", "mean"),
            min_pick=("pick_no", "min"),
            max_pick=("pick_no", "max"),
        )
        .reset_index()
    )

    out["adp"] = out["adp"].round(2)
    return out


# ✅ NEW: AUCTION PRICE TIME SERIES (month anchored)
def compute_auction_price_time_series(picks_df: pd.DataFrame, draft_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Monthly auction $ time series anchored on draft start_time (via draft_catalog.start_month)
    Output is per player_id + format + month aggregates, similar to compute_adp_time_series.
    """
    p = picks_df.copy()
    p["draft_id"] = p["draft_id"].astype(str)
    p["player_id"] = p["player_id"].astype(str)

    if "md_amount" not in p.columns:
        return pd.DataFrame()

    p["amount"] = pd.to_numeric(p["md_amount"], errors="coerce")

    d = draft_catalog.copy()
    d["draft_id"] = d["draft_id"].astype(str)

    keep_cols = [
        "draft_id", "season", "start_dt", "start_month",
        "dynasty_class", "type", "md_scoring_type", "st_teams", "st_rounds", "is_superflex",
        "draft_status"
    ]
    keep_cols = [c for c in keep_cols if c in d.columns]
    m = p.merge(d[keep_cols], on="draft_id", how="left")

    # auction only
    if "type" in m.columns:
        m = m[m["type"].astype(str).str.lower() == "auction"].copy()

    # clean
    m = m[m["player_id"].notna()].copy()
    m = m[m["start_month"].notna()].copy()
    m = m[m["amount"].notna()].copy()

    # match ADP TS class restriction
    if "dynasty_class" in m.columns:
        m = m[m["dynasty_class"].isin(list(KEEP_DYNASTY_CLASSES))].copy()

    out = (
        m.groupby(
            ["season", "start_month", "player_id", "dynasty_class", "md_scoring_type", "st_teams", "st_rounds", "is_superflex"],
            dropna=False
        )
        .agg(
            drafts=("draft_id", "nunique"),
            sales=("amount", "size"),
            avg_price=("amount", "mean"),
            med_price=("amount", "median"),
            min_price=("amount", "min"),
            max_price=("amount", "max"),
        )
        .reset_index()
    )

    for c in ["avg_price", "med_price", "min_price", "max_price"]:
        out[c] = pd.to_numeric(out[c], errors="coerce").round(2)

    return out


# -----------------------------
# MAIN LOOP (per season)
# -----------------------------
all_adp_parts = []
all_draftcat_parts = []

# ✅ NEW collectors
all_auction_parts = []
all_auctioncat_parts = []

for season in SEASONS:
    season_tag = str(season)

    # 1) Discover leagues/users
    leagues_df, league_users_df = discover_leagues(season, SEED_USERS)
    if leagues_df.empty:
        print(f"[{season_tag}] No leagues discovered. Skipping season.")
        continue

    league_ids = leagues_df["league_id"].astype(str).unique().tolist()

    # 2) Drafts
    drafts_df = fetch_drafts_for_leagues(league_ids, season)
    if drafts_df.empty:
        print(f"[{season_tag}] No drafts found. Skipping season.")
        continue

    # 3) Draft catalog (safe start_time -> start_dt/start_month)
    draft_catalog = build_draft_catalog(drafts_df)

    # Save draft catalog snapshot for the season
    cat_out_dir = os.path.join(DIR_SNAP_DRAFT_CAT, f"season={season}")
    os.makedirs(cat_out_dir, exist_ok=True)
    draft_catalog.to_parquet(os.path.join(cat_out_dir, "draft_catalog.parquet"), index=False)

    # ✅ 3b) auction draft catalog snapshot
    auction_catalog = build_auction_draft_catalog(draft_catalog)
    if not auction_catalog.empty:
        auc_cat_out_dir = os.path.join(DIR_SNAP_AUCTION_CAT, f"season={season}")
        os.makedirs(auc_cat_out_dir, exist_ok=True)
        auction_catalog.to_parquet(os.path.join(auc_cat_out_dir, "auction_draft_catalog.parquet"), index=False)
        print(f"[{season_tag}] auction_catalog={auction_catalog.shape}")
        all_auctioncat_parts.append(auction_catalog)

    # 4) Picks (completed drafts only)  (now includes md_amount when present)
    picks_df = fetch_picks_for_completed_drafts(draft_catalog, season)
    if picks_df.empty:
        print(f"[{season_tag}] No picks found. Skipping ADP/Auction TS.")
        continue

    # 5) ADP time series by month
    adp_ts = compute_adp_time_series(picks_df, draft_catalog)

    out_dir = os.path.join(DIR_SNAP_ADP_TS, f"season={season}")
    os.makedirs(out_dir, exist_ok=True)
    adp_ts.to_parquet(os.path.join(out_dir, "adp_time_series.parquet"), index=False)

    print(f"[{season_tag}] adp_ts={adp_ts.shape}")
    print(f"[OK] wrote season={season} ADP -> {out_dir}")

    all_adp_parts.append(adp_ts)
    all_draftcat_parts.append(draft_catalog)

    # ✅ 5b) Auction price time series by month
    auction_ts = compute_auction_price_time_series(picks_df, draft_catalog)
    if not auction_ts.empty:
        out_dir_auc = os.path.join(DIR_SNAP_AUCTION_TS, f"season={season}")
        os.makedirs(out_dir_auc, exist_ok=True)
        auction_ts.to_parquet(os.path.join(out_dir_auc, "auction_price_series.parquet"), index=False)
        print(f"[{season_tag}] auction_ts={auction_ts.shape}")
        print(f"[OK] wrote season={season} AUCTION -> {out_dir_auc}")
        all_auction_parts.append(auction_ts)
    else:
        print(f"[{season_tag}] auction_ts is empty (no completed auction drafts or no md_amount found).")


# -----------------------------
# combined outputs across seasons
# -----------------------------
if all_adp_parts:
    adp_all = pd.concat(all_adp_parts, ignore_index=True)
    adp_all_path = os.path.join(DIR_SNAP_ADP_TS, "adp_time_series_ALL.parquet")
    adp_all.to_parquet(adp_all_path, index=False)
    print(f"[OK] wrote combined ADP TS -> {adp_all_path} | shape={adp_all.shape}")

if all_draftcat_parts:
    dc_all = pd.concat(all_draftcat_parts, ignore_index=True)
    dc_all_path = os.path.join(DIR_SNAP_DRAFT_CAT, "draft_catalog_ALL.parquet")
    dc_all.to_parquet(dc_all_path, index=False)
    print(f"[OK] wrote combined draft catalog -> {dc_all_path} | shape={dc_all.shape}")

# ✅ NEW: combined auction outputs
if all_auction_parts:
    auc_all = pd.concat(all_auction_parts, ignore_index=True)
    auc_all_path = os.path.join(DIR_SNAP_AUCTION_TS, "auction_price_series_ALL.parquet")
    auc_all.to_parquet(auc_all_path, index=False)
    print(f"[OK] wrote combined auction TS -> {auc_all_path} | shape={auc_all.shape}")

if all_auctioncat_parts:
    ac_all = pd.concat(all_auctioncat_parts, ignore_index=True)
    ac_all_path = os.path.join(DIR_SNAP_AUCTION_CAT, "auction_draft_catalog_ALL.parquet")
    ac_all.to_parquet(ac_all_path, index=False)
    print(f"[OK] wrote combined auction catalog -> {ac_all_path} | shape={ac_all.shape}")



=== [2026] DISCOVERY STEP 0 | users=5 ===


[2026] leagues chunk 1 (5): 100%|████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 18.11it/s]


[2026] Leagues fetched=83 | new=83


[2026] league users chunk 1 (83): 100%|███████████████████████████████████████████████| 83/83 [00:00<00:00, 176.30it/s]


[2026] Next frontier users=720 | total users seen=725

=== [2026] DISCOVERY STEP 1 | users=720 ===


[2026] leagues chunk 1 (400): 100%|█████████████████████████████████████████████████| 400/400 [00:01<00:00, 204.59it/s]
[2026] leagues chunk 2 (320): 100%|█████████████████████████████████████████████████| 320/320 [00:01<00:00, 265.19it/s]


[2026] Leagues fetched=6,077 | new=5,994


[2026] league users chunk 1 (400): 100%|█████████████████████████████████████████████| 400/400 [00:06<00:00, 66.60it/s]
[2026] league users chunk 2 (400): 100%|████████████████████████████████████████████| 400/400 [00:00<00:00, 408.16it/s]
[2026] league users chunk 3 (400): 100%|████████████████████████████████████████████| 400/400 [00:02<00:00, 179.21it/s]
