In [None]:
import os
import csv
import re, time, random
from typing import Optional, Iterable, List
from datetime import datetime

from huggingface_hub import HfApi, model_info
from huggingface_hub.utils import HfHubHTTPError

# Optional on some versions:
try:
    from huggingface_hub import ModelFilter  # type: ignore
except Exception:
    ModelFilter = None  # fallback if not available

# ------------------ Config ------------------
AUTHORS: List[str] = [
    # put users or orgs here (exact owner names on HF)
    "meta-llama",
    "Qwen",
    'microsoft',
    'deepseek-ai',
    'google',
    'mistralai',
    'xai-org'
]
OUT_CSV = "hf_models_by_author.csv"
HF_TOKEN = "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"  # export HF_TOKEN=... (recommended)

api = HfApi(token=HF_TOKEN)

# ---------------- Utilities -----------------
def iso(dt) -> str:
    if dt is None:
        return ""
    if isinstance(dt, str):
        return dt
    try:
        return dt.isoformat()
    except Exception:
        return str(dt)

def parse_iso(dt) -> Optional[datetime]:
    if dt is None or dt == "":
        return None
    if isinstance(dt, datetime):
        return dt
    try:
        return datetime.fromisoformat(str(dt).replace("Z", "+00:00"))
    except Exception:
        return None

def _status_code(exc) -> Optional[int]:
    try:
        return getattr(getattr(exc, "response", None), "status_code", None)
    except Exception:
        return None

def _retry_after_seconds(exc) -> Optional[float]:
    try:
        hdrs = getattr(getattr(exc, "response", None), "headers", {}) or {}
        ra = hdrs.get("Retry-After")
        return float(ra) if ra not in (None, "") else None
    except Exception:
        return None

def _parse_rate_limit(headers: dict):
    rl = headers.get("RateLimit", "") or ""
    rem = reset = None
    m_r = re.search(r"r=(\d+)", rl)
    m_t = re.search(r"t=(\d+)", rl)
    if m_r: rem = int(m_r.group(1))
    if m_t: reset = int(m_t.group(1))
    return rem, reset

def _sleep_from_headers(exc, attempt: int, max_backoff: float = 60.0) -> float:
    wait = _retry_after_seconds(exc)
    if wait is not None:
        return wait + random.uniform(0, 0.5)
    headers = getattr(getattr(exc, "response", None), "headers", {}) or {}
    _, reset_sec = _parse_rate_limit(headers)
    if reset_sec is not None:
        return float(reset_sec) + random.uniform(0, 0.5)
    return min(max_backoff, (2 ** attempt)) + random.uniform(0, 0.25)

def safe_model_info(repo_id: str, token: Optional[str], max_retries: int = 6):
    attempt = 0
    while True:
        try:
            return model_info(repo_id, token=token)
        except HfHubHTTPError as e:
            code = _status_code(e)
            if code in (429, 502, 503, 504) and attempt < max_retries:
                attempt += 1
                wait = _sleep_from_headers(e, attempt)
                print(f"[retry] model_info {repo_id} -> {code}; sleeping {wait:.1f}s (attempt {attempt}/{max_retries})")
                time.sleep(wait)
                continue
            raise

# --------- Author/Owner listing (no text search) ----------
def list_models_by_owner(api: HfApi, owner: str):
    """
    Try multiple API signatures to list models by owner/author without text search.
    Yields ModelInfo-like summaries (from list_models).
    """
    # 1) Try owner kwarg if supported
    try:
        yield from api.list_models(owner=owner, full=True, sort="last_modified", direction=-1)
        return
    except TypeError:
        pass  # this hub version may not support 'owner='

    # 2) Try author kwarg if supported
    try:
        yield from api.list_models(author=owner, full=True, sort="last_modified", direction=-1)
        return
    except TypeError:
        pass

    # 3) Try ModelFilter if available
    if ModelFilter is not None:
        try:
            filt = ModelFilter(author=owner)  # some versions accept 'author'
        except TypeError:
            try:
                filt = ModelFilter(owner=owner)  # others accept 'owner'
            except TypeError:
                filt = None
        if filt is not None:
            yield from api.list_models(filter=filt, full=True, sort="last_modified", direction=-1)
            return

    # 4) As a last resort (should be avoided): fall back to search (not preferred)
    # Commented out to respect your “no text search” requirement.
    # yield from api.list_models(search=f"author:{owner}", full=True, sort="last_modified", direction=-1)
    raise RuntimeError("This hf client version lacks an owner/author filter; please upgrade huggingface_hub.")

# ------------------ Main --------------------
FIELDNAMES = [
    "owner",            # user/org (derived from repo id prefix)
    "repo_id",          # e.g., owner/name
    "model_name",
    "pipeline_tag",
    "created_at",
    "last_modified",
    "downloads",
    "private",          # if visible in summary
    "sha",              # last commit sha if present
]

written = 0
with open(OUT_CSV, "w", encoding="utf-8", newline="") as f:
    w = csv.DictWriter(f, fieldnames=FIELDNAMES)
    w.writeheader()

    for owner in AUTHORS:
        seen = set()
        print(f"== Owner: {owner} ==")
        while True:
            try:
                for summary in list_models_by_owner(api, owner):
                    repo_id = getattr(summary, "modelId", None)
                    if not repo_id or repo_id in seen:
                        continue
                    seen.add(repo_id)

                    # enrich with model_info (optional but useful)
                    try:
                        info = safe_model_info(repo_id, token=HF_TOKEN)
                    except HfHubHTTPError:
                        # fallback: write what we have from summary
                        info = summary

                    # parse fields
                    model_name = repo_id.split("/", 1)[1] if "/" in repo_id else repo_id
                    pipeline = getattr(info, "pipeline_tag", "") or getattr(summary, "pipeline_tag", "") or ""
                    created_at = getattr(info, "created_at", None) or getattr(summary, "created_at", None)
                    last_modified = getattr(info, "lastModified", None) or getattr(summary, "lastModified", None)
                    downloads = getattr(info, "downloads", 0) or getattr(summary, "downloads", 0) or 0
                    private = bool(getattr(summary, "private", False))
                    sha = getattr(summary, "sha", "") or ""

                    w.writerow({
                        "owner": owner,
                        "repo_id": repo_id,
                        "model_name": model_name,
                        "pipeline_tag": pipeline,
                        "created_at": iso(parse_iso(created_at)),
                        "last_modified": iso(parse_iso(last_modified)),
                        "downloads": downloads,
                        "private": private,
                        "sha": sha,
                    })
                    written += 1
                break  # completed this owner
            except HfHubHTTPError as e:
                code = _status_code(e)
                if code in (429, 502, 503, 504):
                    wait = _sleep_from_headers(e, attempt=1)
                    print(f"[retry] list_models(owner={owner}) -> {code}; sleeping {wait:.1f}s")
                    time.sleep(wait)
                    continue
                raise

print(f"Done. Wrote {written} rows to {OUT_CSV}")
