# Clean and Extend `getlivebench_data` Notebook

This notebook fetches and cleans LLM benchmark data for **per-model** and **per-task** analysis.  
It uses the Artificial Analysis API (requires `AA_API_KEY`) and outputs:

- `out/by_models.csv` – consolidated table (one row per model).

### What’s improved
- Adds **speed** & **latency** fields (e.g. `median_output_tokens_per_second`, `median_time_to_first_token_seconds`).
- Adds **context window** (searches `context_window_tokens`, `context_window`, `max_context_tokens`).
- **LIVEBENCH**: prefers **fine-grained subtask columns** and **drops aggregate indices** (Math/Coding/Intelligence).
- Cleans incorrect price columns by robust numeric parsing and normalizing to **per‑1K tokens** when possible.
- Removes columns with **all missing values**.


## 1) Prereqs & Config

Set `AA_API_KEY` in your environment. You can do this in the notebook session:
```python
import os
os.environ["AA_API_KEY"] = "YOUR_KEY_HERE"
```


In [None]:
# Imports & constants
import os, math, re, json, pathlib, unicodedata
from typing import Any, Dict, List, Tuple

import pandas as pd
import requests

BASE = "https://artificialanalysis.ai"
API  = f"{BASE}/api/v2/data/llms/models"  # documented free API endpoint

OUT_DIR = pathlib.Path("out")

def to_float(x):
    try:
        if x is None:
            return None
        if isinstance(x, (int, float)):
            return float(x)
        # Strip common symbols
        s = str(x).strip().replace("$","").replace(",","")
        return float(s) if s not in {"", "None", "nan"} else None
    except Exception:
        return None

def first_not_none(*vals):
    for v in vals:
        if v is not None:
            return v
    return None

def slugify(text: str) -> str:
    text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('ascii')
    text = re.sub(r'[^a-zA-Z0-9\-_. ]+', '', text)
    text = text.strip().lower().replace(' ', '-')
    text = re.sub(r'-{2,}', '-', text)
    return text or "model"

def fetch_models() -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    key = os.getenv("AA_API_KEY")
    if not key:
        raise SystemExit("Set AA_API_KEY (required by Artificial Analysis free API).")
    r = requests.get(API, headers={"x-api-key": key}, timeout=60)
    r.raise_for_status()
    payload = r.json()
    data = payload.get("data", [])
    meta = {k: v for k, v in payload.items() if k != "data"}
    return data, meta

# Columns we consider "aggregate indices" and want to DROP in favor of their subtasks
AGGREGATE_KEYS = {
    "artificial_analysis_intelligence_index",
    "artificial_analysis_math_index",
    "artificial_analysis_coding_index",
    "aa_intelligence_index",
    "aa_math_index",
    "aa_coding_index",
    "livebench_overall",
    "livebench_aggregate",
}


## 2) Fetch data

In [None]:
raw_models, meta = fetch_models()
len(raw_models), list(meta.keys())


(269, ['status', 'prompt_options'])

## 3) Normalize & Flatten

- Extract core identity & provider fields.
- Normalize **pricing** and compute per‑1K estimates when possible.
- Retain **speed/latency** and **context window**.
- Expand **evaluation** tasks into flat columns (drop aggregates).


In [None]:
records = []

for m in raw_models:
    mid   = m.get("id") or m.get("model_id") or m.get("uid")
    name  = m.get("name") or m.get("model_name")
    prov_obj = m.get("provider") or m.get("organization")
    if isinstance(prov_obj, dict):
        prov = prov_obj.get("name")
    else:
        prov = prov_obj
    family= m.get("family") or m.get("series")

    # Speed & latency (various possible keys)
    tps   = first_not_none(
        m.get("median_output_tokens_per_second"),
        m.get("output_tokens_per_second"),
        m.get("median_tokens_per_second"),
        m.get("tokens_per_second"),
    )
    ttf   = first_not_none(
        m.get("median_time_to_first_token_seconds"),
        m.get("median_ttfb_seconds"),
        m.get("time_to_first_token_seconds"),
        m.get("ttft_seconds"),
    )
    tta   = first_not_none(
        m.get("median_time_to_first_answer_token"),
        m.get("time_to_first_answer_token_seconds"),
        m.get("ttfa_seconds"),
    )

    # Context window
    ctx   = first_not_none(
        m.get("context_window_tokens"),
        m.get("context_window"),
        m.get("max_context_tokens"),
        m.get("max_input_tokens"),
    )

    # Pricing (attempt to find numbers; normalize to per-1K if we detect per‑million)
    pricing = m.get("pricing") or m.get("prices") or {}
    if isinstance(pricing, dict):
        inp = to_float(pricing.get("input", pricing.get("prompt")))
        outp= to_float(pricing.get("output", pricing.get("completion")))
        # normalize to per-1K
        def normalize_per_1k(v):
            if v is None:
                return None
            # Heuristic: if >= 0.5, likely per-1M; divide by 1000. Otherwise keep.
            return v/1000.0 if v >= 0.5 else v
        price_in_per1k  = normalize_per_1k(inp) if inp is not None else None
        price_out_per1k = normalize_per_1k(outp) if outp is not None else None
    else:
        price_in_per1k = price_out_per1k = None

    base = {
        "model_id": mid,
        "model_name": name,
        "provider": prov,
        "family": family,
        "median_output_tokens_per_second": to_float(tps),
        "median_time_to_first_token_seconds": to_float(ttf),
        "median_time_to_first_answer_token": to_float(tta),
        "context_window_tokens": to_float(ctx),
        "price_input_per_1k": to_float(price_in_per1k),
        "price_output_per_1k": to_float(price_out_per1k),
    }

    # Evaluations -> flatten as columns
    evals = m.get("evaluations") or m.get("evals") or {}
    flat = {}
    if isinstance(evals, dict):
        for task, val in evals.items():
            # Skip aggregates
            if task in AGGREGATE_KEYS:
                continue
            # Value can be a dict like {"score": 0.55, ...} or a direct number
            if isinstance(val, dict):
                score = val.get("score")
                flat[str(task)] = to_float(score)
            else:
                flat[str(task)] = to_float(val)

    rec = {**base, **flat}
    records.append(rec)

import pandas as pd
df = pd.DataFrame.from_records(records)

# Drop columns that are entirely missing
df = df.dropna(axis=1, how="all")

# Ensure numeric type for all evaluation columns (leave id/name/provider as is)
non_eval_cols = {"model_id","model_name","provider","family"}
for c in df.columns:
    if c not in non_eval_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")
print(df)


# Sort models by a reasonable default (e.g., tokens/sec desc then context desc)
df = df.sort_values(
    by=["median_output_tokens_per_second","context_window_tokens"],
    ascending=[False, False]
).reset_index(drop=True)

df.head(3)


## 4) Save consolidated CSV

In [None]:
OUT_DIR.mkdir(parents=True, exist_ok=True)
by_models_path = OUT_DIR / "by_models.csv"
df.to_csv(by_models_path, index=False)
print(f"Saved: {by_models_path.resolve()}")
print(f"Columns: {len(df.columns)}  |  Models: {len(df)}")
