#  LoL K-Means Tier List (Notebook)
This notebook bundles three workflows:
1) K-Means Diagnostics, 2) Single-Patch Tierlist, 3) All-Patches Tierlist.

##  Setup

In [1]:
import sys, subprocess, pkgutil
def _pip(pkg):
    if pkg not in {m.name for m in pkgutil.iter_modules()}:
        subprocess.run([sys.executable, "-m", "pip", "install", "-q", pkg], check=False)
for p in ["pandas","numpy","scikit-learn","matplotlib","python-dotenv"]:
    _pip(p)
print("‚úì Dependencies ready")

‚úì Dependencies ready


##  Environment setup (.env friendly)

In [7]:
# Repo-relative OUT_DIR: .../cs171_league_analysts_in_training/Datasets/riot_out
from pathlib import Path
import os, re

repo = Path.cwd()
while repo.name != "cs171_league_analysts_in_training" and repo.parent != repo:
    repo = repo.parent

DESIRED_OUT = str(repo / "Datasets" / "riot_out")
os.environ["OUT_DIR"] = DESIRED_OUT
Path(DESIRED_OUT).mkdir(parents=True, exist_ok=True)

envp = repo / ".env"
text = envp.read_text(encoding="utf-8") if envp.exists() else ""
if re.search(r"^OUT_DIR=", text, flags=re.M):
    text = re.sub(r"^OUT_DIR=.*$", f"OUT_DIR={DESIRED_OUT}", text, flags=re.M)
else:
    if text and not text.endswith("\n"): text += "\n"
    text += f"OUT_DIR={DESIRED_OUT}\n"
envp.write_text(text, encoding="utf-8")

print("‚úÖ OUT_DIR set to:", os.getenv("OUT_DIR"))
print("üíæ .env updated at:", envp.resolve())


‚úÖ OUT_DIR set to: /Users/brandonlee/Documents/GitHub/cs171_league_analysts_in_training/Datasets/riot_out
üíæ .env updated at: /Users/brandonlee/Documents/GitHub/cs171_league_analysts_in_training/.env


##  Paths (Launcher)

In [None]:
# Paths (Launcher) ‚Äî repo-robust
from pathlib import Path
import os

REPO_NAME = "cs171_league_analysts_in_training"

# Find the repo root by walking up from the current working dir
repo = Path.cwd()
while repo.name != REPO_NAME and repo.parent != repo:
    repo = repo.parent
if repo.name != REPO_NAME:
    # Fallback: assume current dir is the repo root
    print(f"‚ö†Ô∏è Could not find '{REPO_NAME}' above {Path.cwd()}; using CWD as repo root.")
    repo = Path.cwd()

# OUT_DIR inside Datasets/
OUT_DIR = repo / "Datasets" / "riot_out"
OUT_DIR.mkdir(parents=True, exist_ok=True)
os.environ["OUT_DIR"] = str(OUT_DIR)

# Combined CSV inside Datasets/
CSV_COMBINED = repo / "Datasets" / "champion_winrates_all_patches.csv"

print("OUT_DIR     :", OUT_DIR)
print("CSV_COMBINED:", CSV_COMBINED)


OUT_DIR     : /Users/brandonlee/Documents/GitHub/cs171_league_analysts_in_training/Datasets/riot_out
CSV_COMBINED: /Users/brandonlee/Documents/GitHub/cs171_league_analysts_in_training/Datasets/champion_winrates_all_patches.csv


## 1)  K-Means Diagnostics

In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
kmeans_cluster_diagnostics.py
Outputs KMeans "loss" graphs for LoL champion data:
- Elbow curve (Sum of Square Errors) vs Number of clusters
- (Optional) Silhouette score vs K

Usage:
  python3 kmeans_cluster_diagnostics.py \
    --csv "/path/to/champion_winrates_all_patches.csv" \
    --patch 15.20 \
    --k-min 2 --k-max 10 --logit \
    --out-dir "/path/to/riot_out/plots"

  # All patches
  python3 kmeans_cluster_diagnostics.py --csv ... --each --k-min 2 --k-max 10

Notes:
- Features: win_rate, pick_rate, ban_rate (percent). Optional --logit transform.
- Always scales features with StandardScaler.
- Weights KMeans by sqrt(games) unless --no-weight is passed.
- Saves: elbow_sse_patch_<patch>.png  (and silhouette_patch_<patch>.png unless --no-sil is set)
"""

import argparse
import os
import re
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt


def logit_percent(p, eps=1e-4):
    p = np.clip(np.asarray(p, dtype=float) / 100.0, eps, 1 - eps)
    return np.log(p / (1 - p))


def canon_patch(p):
    s = str(p).strip()
    m = re.search(r'(\d+)\.(\d+)', s)
    return f"{int(m.group(1))}.{int(m.group(2))}" if m else None


def numeric_patch_key(p):
    return tuple(map(int, p.split(".")))


def prepare_features(df, use_logit):
    feats = df[["win_rate", "pick_rate", "ban_rate"]].to_numpy(dtype=float)
    if use_logit:
        feats = np.column_stack([
            logit_percent(df["win_rate"].values),
            logit_percent(df["pick_rate"].values),
            logit_percent(df["ban_rate"].values),
        ])
    X = StandardScaler().fit_transform(feats)
    return X


def run_for_patch(df, patch, k_min, k_max, use_logit, weight_by_games, out_dir: Path, save_sil=True, random_state=42):
    dpp = df[df["patch"] == patch].copy()
    if dpp.empty:
        print(f"[skip] patch {patch}: no rows after cleaning")
        return None

    # Prepare features
    X = prepare_features(dpp, use_logit=use_logit)
    n = len(dpp)
    if n < 3:
        print(f"[skip] patch {patch}: too few rows ({n})")
        return None

    # sample weights
    sample_weight = None
    if weight_by_games and "games" in dpp.columns:
        sample_weight = np.sqrt(np.clip(pd.to_numeric(dpp["games"], errors="coerce").fillna(1).values, 1, None))

    # Respect k bounds
    k_max_eff = max(k_min, min(k_max, n - 1))

    ks, inertias, sils = [], [], []
    for k in range(k_min, k_max_eff + 1):
        try:
            km = KMeans(n_clusters=k, n_init=20, random_state=random_state)
            km.fit(X, sample_weight=sample_weight)
            ks.append(k)
            inertias.append(km.inertia_)
            if save_sil and k >= 2 and k < n:
                sils.append(silhouette_score(X, km.labels_))
            elif save_sil:
                sils.append(np.nan)
            print(f"[{patch}] K={k:2d}  SSE={km.inertia_:,.1f}" + (f"  sil={sils[-1]: .4f}" if save_sil else ""))
        except Exception as e:
            print(f"[{patch}] K={k}: skipping ({e})")

    out_dir.mkdir(parents=True, exist_ok=True)

    # Elbow plot: match the example labels
    plt.figure(figsize=(10, 4))
    plt.plot(ks, inertias, marker="o")
    plt.xlabel("Number of clusters")
    plt.ylabel("Sum of Square Errors")
    plt.title(f"Elbow Curve ‚Äî Patch {patch}")
    elbow_png = out_dir / f"elbow_sse_patch_{patch}.png"
    plt.savefig(elbow_png, bbox_inches="tight")
    plt.close()
    print(f"[saved] {elbow_png}")

    # Optional silhouette
    if save_sil:
        valid = [(k, s) for k, s in zip(ks, sils) if not np.isnan(s)]
        if valid:
            kv, sv = zip(*valid)
            plt.figure(figsize=(10, 4))
            plt.plot(kv, sv, marker="o")
            plt.xlabel("Number of clusters")
            plt.ylabel("Silhouette Score")
            plt.title(f"Silhouette vs K ‚Äî Patch {patch}")
            sil_png = out_dir / f"silhouette_patch_{patch}.png"
            plt.savefig(sil_png, bbox_inches="tight")
            plt.close()
            print(f"[saved] {sil_png}")

    # CSV summary
    import csv
    diag_csv = out_dir / f"kmeans_diagnostics_patch_{patch}.csv"
    with diag_csv.open("w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["patch", "K", "SSE", "silhouette"])
        for i, k in enumerate(ks):
            sse = inertias[i]
            sil = sils[i] if save_sil and i < len(sils) else ""
            w.writerow([patch, k, sse, sil])
    print(f"[saved] {diag_csv}")

    return {"patch": patch, "K": ks, "SSE": inertias, "silhouette": sils if save_sil else None}


def main():
    ap = argparse.ArgumentParser(description="KMeans elbow (SSE) and silhouette diagnostics")
    ap.add_argument("--csv", required=True, help="Combined CSV (patch,win_rate,pick_rate,ban_rate,games)")
    ap.add_argument("--patch", help="Patch to analyze (e.g., 15.20). If omitted and --each not set, use latest numerically.")
    ap.add_argument("--each", action="store_true", help="Run for every patch separately")
    ap.add_argument("--k-min", type=int, default=2, help="Minimum K")
    ap.add_argument("--k-max", type=int, default=10, help="Maximum K")
    ap.add_argument("--logit", action="store_true", help="Apply logit transform to WR/PR/BR before scaling")
    ap.add_argument("--no-weight", action="store_true", help="Disable sqrt(games) sample weighting")
    ap.add_argument("--no-sil", action="store_true", help="Do not compute/save silhouette plot")
    ap.add_argument("--out-dir", default=os.path.expanduser("~/riot_out/plots"), help="Output directory")
    args = ap.parse_args()

    out_dir = Path(args.out_dir)

    # Read CSV as string to preserve "15.20"
    df = pd.read_csv(args.csv, dtype={"patch": str})
    print(f"[debug] loaded rows: {len(df)} from {args.csv}")

    # Normalize patch
    df["patch"] = df["patch"].map(canon_patch)

    # Ensure columns exist + numeric
    if "ban_rate" not in df.columns: df["ban_rate"] = 0.0
    if "games" not in df.columns: df["games"] = 1
    for c in ["win_rate", "pick_rate", "ban_rate", "games"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    df = df.dropna(subset=["patch","win_rate","pick_rate","ban_rate","games"]).copy()

    patches = sorted(df["patch"].unique(), key=numeric_patch_key)
    print("[debug] patches after cleaning:", patches)

    if args.each:
        for p in patches:
            sub = out_dir / f"patch_{p}"
            run_for_patch(df, p, args.k_min, args.k_max, args.logit, (not args.no_weight), sub, save_sil=(not args.no_sil))
    else:
        target = canon_patch(args.patch) if args.patch else patches[-1] if patches else None
        if not target:
            raise SystemExit("No patches after cleaning.")
        print("[debug] selecting patch:", target)
        run_for_patch(df, target, args.k_min, args.k_max, args.logit, (not args.no_weight), out_dir, save_sil=(not args.no_sil))


if __name__ == "__main__":
    main()

KeyboardInterrupt: 

In [None]:
# ‚ñ∂Ô∏è Diagnostics Launcher
import sys, runpy, os
CSV=CSV_COMBINED; OUT=OUT_DIR+"/diagnostics"; KMIN, KMAX = 2, 10; LOGIT=True; PATCH=None
argv = ["kmeans_cluster_diagnostics.py","--csv",CSV,"--out-dir",OUT,"--kmin",str(KMIN),"--kmax",str(KMAX)]
if LOGIT: argv.append("--logit")
if PATCH: argv += ["--patch", PATCH]
sys.argv = argv
runpy.run_path("/mnt/data/kmeans_cluster_diagnostics.py", run_name="__main__")

## 2)  Single-Patch K-Means Tierlist

In [None]:
#!/usr/bin/env python3
import argparse, os, subprocess
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import re


TIERS_5 = ["S","A","B","C","D"]
TIERS_6 = ["S","A","B","C","D","E"]
TIERS_7 = ["S","A","B","C","D","E","F"]

def logit_percent(p, eps=1e-4):
    # p is in % (0..100). Convert to logit in R.
    p = np.clip(p / 100.0, eps, 1 - eps)
    return np.log(p / (1 - p))

def choose_tier_labels(k):
    if k == 5: return TIERS_5
    if k == 6: return TIERS_6
    if k == 7: return TIERS_7
    # fallback: T1..Tk (T1 = best)
    return [f"T{i}" for i in range(1, k+1)]

def rank_clusters_by_center(centers_raw_pct, weights=(0.6, 0.3, 0.1)):
    """
    Rank clusters by a composite score computed in **raw % space** (WR, PR, BR).
    centers_raw_pct: array shape (k, 3) for [wr%, pr%, br%]
    """
    w_wr, w_pr, w_br = weights
    scores = w_wr*centers_raw_pct[:,0] + w_pr*centers_raw_pct[:,1] + w_br*centers_raw_pct[:,2]
    order = np.argsort(-scores)  # descending
    rank_of_cluster = np.empty_like(order)
    rank_of_cluster[order] = np.arange(len(order))  # 0 = best
    return rank_of_cluster, scores

def cluster_one_patch(df_patch, k, use_logit=False, weight_by_games=True, random_state=42):
    """
    df_patch columns: championId, championName, win_rate, pick_rate, ban_rate, games
    Returns: per-row labels, tier letters, centers (raw %) and mapping.
    """
    feats = df_patch[["win_rate","pick_rate","ban_rate"]].to_numpy(dtype=float)
    # Keep a copy in % space for center back-transform
    feats_pct = feats.copy()

    # Optional transform then scale
    if use_logit:
        feats = np.column_stack([logit_percent(df_patch["win_rate"].values),
                                 logit_percent(df_patch["pick_rate"].values),
                                 logit_percent(df_patch["ban_rate"].values)])
    scaler = StandardScaler()
    X = scaler.fit_transform(feats)

    # Sample weights (downweight low-sample champs)
    sample_weight = None
    if weight_by_games and "games" in df_patch.columns:
        # sqrt or log1p temper extremes; choose one:
        sample_weight = np.sqrt(np.clip(df_patch["games"].values, 1, None))
        # sample_weight = np.log1p(df_patch["games"].values)

    # KMeans
    km = KMeans(n_clusters=k, n_init=20, random_state=random_state)
    km.fit(X, sample_weight=sample_weight)
    labels = km.labels_

    # Compute cluster centers back in **raw %** units (for ranking)
    centers_in_feat_space = scaler.inverse_transform(km.cluster_centers_)
    if use_logit:
        # inverse-logit to %: sigmoid(x)*100
        sigmoid = lambda z: 1.0/(1.0+np.exp(-z))
        centers_raw_pct = sigmoid(centers_in_feat_space) * 100.0
    else:
        centers_raw_pct = centers_in_feat_space  # already roughly in % units

    # Rank clusters -> tiers
    rank_of_cluster, scores = rank_clusters_by_center(centers_raw_pct)
    tiers = choose_tier_labels(k)
    cluster_to_tier = {c: tiers[rank_of_cluster[c]] for c in range(k)}

    return labels, cluster_to_tier, centers_raw_pct, scores

def run_for_patch(df, patch, k, use_logit, weight_by_games, out_dir):
    dfp = df[df["patch"] == patch].copy()
    if dfp.empty:
        print(f"[skip] patch {patch}: no rows")
        return None, None

    labels, c2t, centers_raw_pct, scores = cluster_one_patch(
        dfp, k=k, use_logit=use_logit, weight_by_games=weight_by_games
    )
    dfp["cluster"] = labels
    dfp["tier"] = dfp["cluster"].map(c2t)

    # Save tier list
    out_dir.mkdir(parents=True, exist_ok=True)
    out_csv = out_dir / f"tierlist_patch_{patch}.csv"
    cols = ["patch","championId","championName","win_rate","pick_rate","ban_rate","games","cluster","tier"]
    dfp[cols].to_csv(out_csv, index=False)
    print(f"[saved] {out_csv}")

    # Save cluster centers
    centers_df = pd.DataFrame(centers_raw_pct, columns=["center_wr_pct","center_pr_pct","center_br_pct"])
    centers_df["cluster"] = np.arange(len(centers_df))
    centers_df["score"] = scores
    centers_df["tier"] = centers_df["cluster"].map(c2t)
    centers_df["patch"] = patch
    centers_csv = out_dir / f"tier_centers_patch_{patch}.csv"
    centers_df[["patch","cluster","tier","center_wr_pct","center_pr_pct","center_br_pct","score"]].to_csv(centers_csv, index=False)
    print(f"[saved] {centers_csv}")

    return dfp, centers_df

def main():
    ap = argparse.ArgumentParser(description="K-means tier list from WR/PR/BR (per patch)")
    ap.add_argument("--csv", required=True, help="Input CSV with columns: patch, championId, championName, win_rate, pick_rate, ban_rate, games")
    ap.add_argument("--k", type=int, default=5, help="Number of tiers/clusters (default 5)")
    ap.add_argument("--patch", default=None, help="Specific patch (e.g., '15.22'). If omitted and --each not set, uses latest.")
    ap.add_argument("--each", action="store_true", help="Cluster each patch separately and save multiple tierlists")
    ap.add_argument("--logit", action="store_true", help="Use logit transform on rates before scaling (often better)")
    ap.add_argument("--no-weight", action="store_true", help="Disable games-based sample weighting")
    ap.add_argument("--out-dir", default=os.path.expanduser("~/riot_out/tierlists"))
    args = ap.parse_args()

    SCRIPT_DIR = Path(__file__).resolve().parent
    def git_root(start: Path) -> Path | None:
        try:
            p = subprocess.check_output(
                ["git", "rev-parse", "--show-toplevel"],
                cwd=start
            ).decode().strip()
            return Path(p)
        except Exception:
            return None
    
    REPO_ROOT = git_root(SCRIPT_DIR)

    out_dir = Path(os.getenv("out_dir", REPO_ROOT / "riot_out"))
    out_dir.mkdir(parents=True, exist_ok=True)
    print(f"[out] saving to: {out_dir}")
    df = pd.read_csv(args.csv, dtype={"patch": str})  # preserve "15.20"

    print(f"[debug] loaded rows: {len(df)} from {args.csv}")

    # 1) normalize patch ‚Üí "major.minor"
    def canon_patch(p):
        s = str(p).strip()
        m = re.search(r'(\d+)\.(\d+)', s)
        return f"{int(m.group(1))}.{int(m.group(2))}" if m else None
    df["patch"] = df["patch"].map(canon_patch)

    # 2) ensure required columns exist
    if "ban_rate" not in df.columns:
        df["ban_rate"] = 0.0
    if "games" not in df.columns:
        df["games"] = 1

    # 3) numeric coercion
    for c in ["win_rate","pick_rate","ban_rate","games"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # 4) drop rows that can't be used
    df = df.dropna(subset=["patch","win_rate","pick_rate","ban_rate","games"]).copy()

    # 5) show which patches remain
    # 3) when picking the latest patch, sort NUMERICALLY
    patches = sorted(df["patch"].unique(), key=lambda p: tuple(map(int, p.split("."))))
    # if --patch not provided, choose the max numerically:
    target_patch = args.patch or patches[-1]
    print("[debug] patches after cleaning:", patches)
    print("[debug] counts by patch:\n", df["patch"].value_counts().sort_index())

    if args.each:
        all_rows, all_centers = [], []
        for p in patches:
            res = run_for_patch(df, p, args.k, args.logit, not args.no_weight, out_dir)
            if res[0] is not None:
                all_rows.append(res[0]); all_centers.append(res[1])
        if all_rows:
            pd.concat(all_rows).to_csv(out_dir / "tierlist_all_patches.csv", index=False)
            pd.concat(all_centers).to_csv(out_dir / "tier_centers_all_patches.csv", index=False)
            print(f"[saved] {out_dir/'tierlist_all_patches.csv'}")
            print(f"[saved] {out_dir/'tier_centers_all_patches.csv'}")
        pass
    else:
        target_patch = canon_patch(args.patch) if args.patch else patches[-1]
        run_for_patch(df, target_patch, args.k, args.logit, not args.no_weight, out_dir)
        print("[debug] selecting patch:", target_patch)
        run_for_patch(df, target_patch, args.k, args.logit, not args.no_weight, out_dir)
        return

if __name__ == "__main__":
    main()

In [None]:
# ‚ñ∂Ô∏è Single-Patch Launcher
import sys, runpy
CSV=CSV_COMBINED; PATCH="15.20"; K=5; LOGIT=True
argv=["kmeans_tierlist.py","--csv",CSV,"--k",str(K),"--out-dir",OUT_DIR]
if PATCH: argv+=["--patch",PATCH]
if LOGIT: argv.append("--logit")
sys.argv=argv
runpy.run_path("/mnt/data/kmeans_tierlist.py", run_name="__main__")

## 3)  All-Patches K-Means Tierlist

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tierlist_all_patches.py
Build KMeans-based tier lists for EVERY patch in a combined CSV.

Inputs (one row per champion per patch):
  patch, championId, championName, games, wins, win_rate, pick_rate, ban_rate

Outputs:
  <OUT_DIR>/patch_<patch>/tierlist_patch_<patch>.csv
  <OUT_DIR>/patch_<patch>/tier_centers_patch_<patch>.csv
  <OUT_DIR>/patch_<patch>/kmeans_validation_<patch>.csv
  <OUT_DIR>/tierlist_all_patches.csv
  <OUT_DIR>/kmeans_validation_all_patches.csv

CLI:
  python3 tierlist_all_patches.py \
    --csv /path/to/champion_winrates_all_patches.csv \
    --k 5 --logit --out-dir /path/to/riot_out
"""

import argparse, os, re, csv
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    silhouette_score, davies_bouldin_score, calinski_harabasz_score,
    adjusted_rand_score,
)
from sklearn.linear_model import LinearRegression


# ----------------------------- utils -----------------------------

def canon_patch(s: str) -> str | None:
    """Normalize '15.20' from any '15.20.x' / loose strings."""
    s = str(s).strip()
    m = re.search(r'(\d+)\.(\d+)', s)
    return f"{int(m.group(1))}.{int(m.group(2))}" if m else None

def numeric_patch_key(p: str) -> tuple[int, int]:
    return tuple(map(int, p.split(".")))

def logit_percent(arr, eps=1e-4):
    """Logit transform percent features (stabilize extremes)."""
    p = np.clip(np.asarray(arr, dtype=float) / 100.0, eps, 1 - eps)
    return np.log(p / (1 - p))

def prepare_features(df: pd.DataFrame, use_logit: bool) -> np.ndarray:
    cols = ["win_rate", "pick_rate", "ban_rate"]
    X_raw = (
        np.column_stack([logit_percent(df[c].values) for c in cols])
        if use_logit else
        df[cols].to_numpy(dtype=float)
    )
    return StandardScaler().fit_transform(X_raw)

def tier_letters(k: int) -> list[str]:
    base = ["S","A","B","C","D","E","F","G","H","I"]
    if k <= len(base): return base[:k]
    return base + [f"T{i}" for i in range(k - len(base))]


# ------------------------- diagnostics --------------------------

def kmeans_diagnostics(X: np.ndarray, labels: np.ndarray, df_with_tier: pd.DataFrame) -> dict:
    """Internal quality + construct validity for one patch."""
    uniq = np.unique(labels)
    if len(uniq) > 1:
        sil = silhouette_score(X, labels)
        db  = davies_bouldin_score(X, labels)
        ch  = calinski_harabasz_score(X, labels)
    else:
        sil = db = ch = np.nan

    # Monotonicity of mean WR across tiers (S > A > B ...). Higher tier listed first.
    g = (df_with_tier.groupby("tier")["win_rate"]
         .mean()
         .sort_values(ascending=False))
    mono = bool(all(g.values[i] >= g.values[i+1] for i in range(len(g)-1))) if len(g) > 1 else True

    # How much WR variance tiers explain (quick R^2)
    D = pd.get_dummies(df_with_tier["tier"], drop_first=True)
    y = pd.to_numeric(df_with_tier["win_rate"], errors="coerce").fillna(0).values
    Xr = D.values if D.shape[1] else np.zeros((len(df_with_tier), 1))
    r2 = LinearRegression().fit(Xr, y).score(Xr, y) if len(df_with_tier) > 1 else np.nan

    return {
        "silhouette": float(sil),
        "davies_bouldin": float(db),
        "calinski_harabasz": float(ch),
        "tier_monotone": mono,
        "tier_wr_r2": float(r2),
    }

def stability_ari(X: np.ndarray, k: int, seeds=(1,11,21,31,41), sample_weight=None) -> float:
    """Mean Adjusted Rand Index across multiple random seeds."""
    label_sets = []
    for s in seeds:
        km = KMeans(n_clusters=k, n_init=20, random_state=s)
        km.fit(X, sample_weight=sample_weight)
        label_sets.append(km.labels_)
    aris = []
    for i in range(len(label_sets)):
        for j in range(i+1, len(label_sets)):
            aris.append(adjusted_rand_score(label_sets[i], label_sets[j]))
    return float(np.mean(aris)) if aris else np.nan


# --------------------------- core -------------------------------

def run_for_patch(df_patch: pd.DataFrame, patch: str, k: int, use_logit: bool,
                  weight_by_games: bool, out_dir: Path, random_state=42):
    """Cluster one patch, save CSVs, return (rows, centers, diag)."""
    if len(df_patch) < k:
        print(f"[skip] patch {patch}: rows={len(df_patch)} < k={k}")
        return None, None, None

    # Prepare features
    X = prepare_features(df_patch, use_logit=use_logit)

    # Sample weighting by sqrt(games)
    sample_weight = None
    if weight_by_games and "games" in df_patch.columns:
        sample_weight = np.sqrt(
            np.clip(pd.to_numeric(df_patch["games"], errors="coerce").fillna(1).values, 1, None)
        )

    # Fit KMeans
    km = KMeans(n_clusters=k, n_init=20, random_state=random_state)
    km.fit(X, sample_weight=sample_weight)
    labels = km.labels_

    # Compute cluster centers in ORIGINAL % space (weighted by games if enabled)
    centers = []
    for c in range(k):
        members = (labels == c)
        n = int(members.sum())
        if n == 0:
            wmr = wpr = wbr = 0.0
        else:
            if weight_by_games and "games" in df_patch.columns:
                ws = df_patch.loc[members, "games"].clip(lower=1).astype(float).values
                wmr = float(np.average(df_patch.loc[members, "win_rate"].values, weights=ws))
                wpr = float(np.average(df_patch.loc[members, "pick_rate"].values, weights=ws))
                wbr = float(np.average(df_patch.loc[members, "ban_rate"].values,  weights=ws))
            else:
                wmr = float(df_patch.loc[members, "win_rate"].mean())
                wpr = float(df_patch.loc[members, "pick_rate"].mean())
                wbr = float(df_patch.loc[members, "ban_rate"].mean())
        centers.append({
            "cluster": c, "n": n,
            "mean_win_rate": wmr, "mean_pick_rate": wpr, "mean_ban_rate": wbr
        })

    # Rank clusters by mean win_rate (then pick_rate) ‚Üí assign S/A/B...
    centers_sorted = sorted(centers, key=lambda d: (-d["mean_win_rate"], -d["mean_pick_rate"]))
    letters = tier_letters(k)
    cluster_to_tier = {cinfo["cluster"]: (letters[i] if i < len(letters) else f"T{i}")
                       for i, cinfo in enumerate(centers_sorted)}

    # Per-row output
    rows_out = []
    dfp = df_patch.reset_index(drop=True)
    for i, row in dfp.iterrows():
        c = int(labels[i])
        rows_out.append({
            "patch": patch,
            "championId": int(row["championId"]) if str(row["championId"]).isdigit() else row["championId"],
            "championName": row.get("championName", ""),
            "games": int(pd.to_numeric(row.get("games", 0), errors="coerce")) if pd.notna(row.get("games", None)) else 0,
            "wins": int(pd.to_numeric(row.get("wins", 0), errors="coerce")) if pd.notna(row.get("wins", None)) else 0,
            "win_rate": float(pd.to_numeric(row.get("win_rate", 0.0), errors="coerce")),
            "pick_rate": float(pd.to_numeric(row.get("pick_rate", 0.0), errors="coerce")),
            "ban_rate": float(pd.to_numeric(row.get("ban_rate", 0.0), errors="coerce")),
            "cluster": c,
            "tier": cluster_to_tier.get(c, "U"),
        })

    # Diagnostics
    df_out = pd.DataFrame(rows_out)
    diag = kmeans_diagnostics(X, labels, df_out)
    diag["patch"] = patch
    diag["k"] = k
    diag["n_rows"] = int(len(df_out))
    diag["stability_ari"] = stability_ari(X, k, seeds=(1,11,21,31,41), sample_weight=sample_weight)

    # Save per-patch outputs
    out_dir.mkdir(parents=True, exist_ok=True)

    per_patch = out_dir / f"tierlist_patch_{patch}.csv"
    with per_patch.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=[
            "patch","championId","championName","games","wins",
            "win_rate","pick_rate","ban_rate","cluster","tier"
        ])
        w.writeheader()
        w.writerows(rows_out)

    centers_csv = out_dir / f"tier_centers_patch_{patch}.csv"
    with centers_csv.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["cluster","n","mean_win_rate","mean_pick_rate","mean_ban_rate"])
        w.writeheader()
        w.writerows(centers_sorted)

    diag_csv = out_dir / f"kmeans_validation_{patch}.csv"
    pd.DataFrame([diag]).to_csv(diag_csv, index=False)

    print(f"[saved] {per_patch}")
    print(f"[saved] {centers_csv}")
    print(f"[saved] {diag_csv}")

    return rows_out, centers_sorted, diag


# ---------------------------- main ------------------------------

def main():
    ap = argparse.ArgumentParser(description="KMeans tierlist for every patch in the combined CSV")
    ap.add_argument("--csv", required=True, help="Combined CSV path (from scraper)")
    ap.add_argument("--k", type=int, default=5, help="Clusters per patch (default 5)")
    ap.add_argument("--logit", action="store_true", help="Apply logit transform to WR/PR/BR before scaling")
    ap.add_argument("--no-weight", action="store_true", help="Disable sqrt(games) sample weighting")
    ap.add_argument("--out-dir", default=os.path.expanduser("~/riot_out"), help="Output directory")
    args = ap.parse_args()

    out_dir = Path(args.out_dir)

    # Load & clean data
    df = pd.read_csv(args.csv, dtype={"patch": str})
    print(f"[debug] loaded rows: {len(df)} from {args.csv}")
    df["patch"] = df["patch"].map(canon_patch)

    if "ban_rate" not in df.columns: df["ban_rate"] = 0.0
    if "games" not in df.columns: df["games"] = 1

    for c in ["win_rate","pick_rate","ban_rate","games","wins","championId"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    df = df.dropna(subset=["patch","win_rate","pick_rate","ban_rate"]).copy()
    patches = sorted(df["patch"].unique(), key=numeric_patch_key)
    print("[debug] patches after cleaning:", patches)

    all_rows, all_diags = [], []

    for p in patches:
        dfp = df[df["patch"] == p].copy()
        sub_out = out_dir / f"patch_{p}"
        rows, _, diag = run_for_patch(
            df_patch = dfp,
            patch = p,
            k = args.k,
            use_logit = args.logit,
            weight_by_games = (not args.no_weight),
            out_dir = sub_out
        )
        if rows:
            all_rows.extend(rows)
        if diag:
            all_diags.append(diag)

    # Save combined outputs
    if all_rows:
        combined_csv = out_dir / "tierlist_all_patches.csv"
        with combined_csv.open("w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=[
                "patch","championId","championName","games","wins",
                "win_rate","pick_rate","ban_rate","cluster","tier"
            ])
            w.writeheader()
            w.writerows(all_rows)
        print(f"[saved] {combined_csv}")

    if all_diags:
        diag_out = out_dir / "kmeans_validation_all_patches.csv"
        pd.DataFrame(all_diags).to_csv(diag_out, index=False)
        print(f"[saved] {diag_out}")

    if not all_rows:
        print("[warn] no tierlists produced ‚Äî check K and data size.")


if __name__ == "__main__":
    main()

In [None]:
# ‚ñ∂Ô∏è All-Patches Launcher
import sys, runpy
CSV=CSV_COMBINED; K=5; LOGIT=True
argv=["tierlist_all_patches.py","--csv",CSV,"--k",str(K),"--out-dir",OUT_DIR]
if LOGIT: argv.append("--logit")
sys.argv=argv
runpy.run_path("/mnt/data/tierlist_all_patches.py", run_name="__main__")