In [1]:
from typing import Literal, Optional
from urllib.parse import parse_qs, urlparse
from huggingface_hub.utils import get_session, hf_raise_for_status
from tqdm import tqdm
from dotenv import load_dotenv
import aiohttp
import asyncio
import pandas as pd
import os

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv(".env")
HF_TOKEN = os.getenv("HF_TOKEN")

In [3]:
N_TOTAL_MODELS = 20000
BATCH_SIZE_LIST_MODELS = 100
BATCH_SIZE_FETCH_MODEL_INFO = 50
TIME_OUT = 60

In [4]:
class ModelIterator:
    def __init__(self, items: list[dict], next_cursor: Optional[str] = None):
        self.items = items
        self.next_cursor = next_cursor

    def __iter__(self):
        yield from self.items


def list_models_with_cursor(
    *,
    limit: int = 100,
    direction: Optional[Literal[-1]] = None,
    cursor: Optional[str] = None,
) -> ModelIterator:
    """
    List models with cursor-based pagination.
    """
    url = "https://huggingface.co/api/models"
    params = {
        "limit": limit,
        "direction": direction,
        "cursor": cursor,
        "full": False,
        "cardData": False,
        "fetch_config": False,
    }

    headers = {"Authorization": f"Bearer {HF_TOKEN}"}

    response = get_session().get(url, params=params, headers=headers)

    # Add timeout if status code is 429 (Too Many Requests)
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", TIME_OUT))
        print(f"Rate limit exceeded. Waiting for {retry_after} seconds...")
        import time

        time.sleep(retry_after)
        # Retry the request after waiting
        response = get_session().get(url, params=params, headers=headers)

    hf_raise_for_status(response)
    next_url = response.links.get("next", {}).get("url")
    next_cursor = None
    if next_url:
        next_cursor = parse_qs(urlparse(next_url).query)["cursor"][0]

    return ModelIterator(response.json(), next_cursor)

In [5]:
async def fetch_model_info(session, model_name: str) -> dict:
    url = f"https://huggingface.co/api/models/{model_name}"
    headers = {"Authorization": f"Bearer {HF_TOKEN}"}
    async with session.get(url, headers=headers) as response:
        # Add timeout if status code is 429 (Too Many Requests)
        if response.status == 429:
            retry_after = int(response.headers.get("Retry-After", TIME_OUT))
            await asyncio.sleep(retry_after)
            # Retry the request after waiting
            async with session.get(url, headers=headers) as retry_response:
                return await retry_response.json()
        return await response.json()


async def fetch_model_info_batch_async(model_names: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_model_info(session, model_name) for model_name in model_names]
        return await asyncio.gather(*tasks)

In [6]:
pipeline_to_model_type = {
    "audio-text-to-text": "Multimodal",
    "image-text-to-text": "Multimodal",
    "visual-question-answering": "Multimodal",
    "document-question-answering": "Multimodal",
    "video-text-to-text": "Multimodal",
    "visual-document-retrieval": "Multimodal",
    "any-to-any": "Multimodal",
    "depth-estimation": "Computer Vision",
    "image-classification": "Computer Vision",
    "object-detection": "Computer Vision",
    "image-segmentation": "Computer Vision",
    "text-to-image": "Computer Vision",
    "image-to-text": "Computer Vision",
    "image-to-image": "Computer Vision",
    "image-to-video": "Computer Vision",
    "unconditional-image-generation": "Computer Vision",
    "video-classification": "Computer Vision",
    "text-to-video": "Computer Vision",
    "zero-shot-image-classification": "Computer Vision",
    "mask-generation": "Computer Vision",
    "zero-shot-object-detection": "Computer Vision",
    "text-to-3d": "Computer Vision",
    "image-to-3d": "Computer Vision",
    "image-feature-extraction": "Computer Vision",
    "keypoint-detection": "Computer Vision",
    "text-classification": "Natural Language Processing",
    "token-classification": "Natural Language Processing",
    "table-question-answering": "Natural Language Processing",
    "question-answering": "Natural Language Processing",
    "zero-shot-classification": "Natural Language Processing",
    "translation": "Natural Language Processing",
    "summarization": "Natural Language Processing",
    "feature-extraction": "Natural Language Processing",
    "text-generation": "Natural Language Processing",
    "text2text-generation": "Natural Language Processing",
    "fill-mask": "Natural Language Processing",
    "sentence-similarity": "Natural Language Processing",
    "text-to-speech": "Audio",
    "text-to-audio": "Audio",
    "automatic-speech-recognition": "Audio",
    "audio-to-audio": "Audio",
    "audio-classification": "Audio",
    "voice-activity-detection": "Audio",
    "tabular-classification": "Tabular",
    "tabular-regression": "Tabular",
    "time-series-forecasting": "Tabular",
    "reinforcement-learning": "Reinforcement Learning",
    "robotics": "Reinforcement Learning",
    "graph-machine-learning": "Other",
}


def extract_model_features(model_name: str, model_info: dict) -> dict:
    """
    Extract key features from a model's information dictionary.

    Args:
        model_info (dict): Dictionary containing model information

    Returns:
        dict: Dictionary with extracted features
    """
    features = {
        "model_name": model_name,
    }

    # Model size (in billions of parameters)
    if "safetensors" in model_info and "total" in model_info["safetensors"]:
        # Convert from bytes to billions
        features["model_size"] = model_info["safetensors"]["total"]
    else:
        features["model_size"] = None

    # Number of downloads
    features["downloads"] = model_info.get("downloads", 0)

    # Number of likes
    features["likes"] = model_info.get("likes", 0)

    # Task category (pipeline_tag)
    features["task_category"] = model_info.get("pipeline_tag", None)

    # Model category (from tags or library_name)
    features["model_category"] = pipeline_to_model_type.get(
        features["task_category"], None
    )

    # Model creators (author)
    features["creators"] = model_info.get("author", None)

    # Date of publishing (createdAt)
    features["publishing_date"] = model_info.get("createdAt", None)

    return features

In [7]:
prev_cursor = None
model_names = []
for i in tqdm(range(N_TOTAL_MODELS // BATCH_SIZE_LIST_MODELS)):
    response = list_models_with_cursor(limit=BATCH_SIZE_LIST_MODELS, cursor=prev_cursor)
    prev_cursor = response.next_cursor
    model_names.extend([model["modelId"] for model in response.items])

100%|██████████| 200/200 [01:24<00:00,  2.37it/s]


In [8]:
models_info = []

for i in tqdm(range(N_TOTAL_MODELS // BATCH_SIZE_FETCH_MODEL_INFO)):
    res = await fetch_model_info_batch_async(
        model_names[
            i * BATCH_SIZE_FETCH_MODEL_INFO : (i + 1) * BATCH_SIZE_FETCH_MODEL_INFO
        ]
    )
    models_info.extend(res)

100%|██████████| 400/400 [09:49<00:00,  1.47s/it]


In [20]:
model_features = [
    extract_model_features(model_name, model_info)
    for model_name, model_info in zip(model_names, models_info)
]
df = pd.DataFrame(model_features)

In [32]:
df.to_csv("./data/models_info.csv", index=False)