# Elo Implementation Pipeline

In [4]:
import pandas as pd, re, bisect, copy
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple, Optional

# ── GLOBAL CONFIG ───────────────────────────────────────────────────────
DEFAULT_ELO = 100
K_FACTOR    = 4
MARGIN_SCALE = 1.3
DATA_PATH   = Path("emea.csv")         # adjust if needed

# ── LOW-LEVEL HELPERS ───────────────────────────────────────────────────
def expected(a: float, b: float) -> float:
    """Win probability of rating a vs b on 100-pt scale."""
    return 1 / (1 + 10 ** ((b - a) / 40))

def parse_score(s: str) -> Tuple[int, int]:
    """Robustly parse '13-2', '13-2', '13—2' → (13, 2)."""
    nums = re.findall(r"\d+", str(s))
    if len(nums) < 2:
        raise ValueError(f"Bad MapScore: {s}")
    return int(nums[0]), int(nums[1])

# ── ELO HISTORY ENGINE ──────────────────────────────────────────────────
class EloHistory:
    """
    Keeps snapshots of team-map Elo ratings and allows time-travel queries.
    """
    def __init__(self,
                 snapshots: List[Dict[str, Dict[str, float]]],
                 dates: List[pd.Timestamp],
                 default: float = DEFAULT_ELO):
        self._snapshots = snapshots
        self._dates     = dates
        self._default   = default

    # ── PUBLIC API ────────────────────────────────────────────────────
    def rating(self,
               team: str,
               map_name: str,
               date: str | pd.Timestamp) -> float:
        """Elo(team, map) strictly BEFORE `date` (=> state 'as of' that moment)."""
        snap = self._snapshot_before(date)
        return snap.get(team, {}).get(map_name, self._default)

    def ratings(self,
                team: str,
                date: str | pd.Timestamp) -> Dict[str, float]:
        """All map Elos for `team` as of date (missing → DEFAULT_ELO)."""
        snap = self._snapshot_before(date)
        # include maps never seen (optional) – here we just expose existing keys
        return {m: v for m, v in snap.get(team, {}).items()}

    def table(self,
              date: Optional[str | pd.Timestamp] = None) -> pd.DataFrame:
        """
        DataFrame with columns Team, Map, Elo.
        If `date` is None, returns latest snapshot.
        """
        snap = self._snapshot_before(date) if date is not None else self._snapshots[-1]
        rows = [{"Team": t, "Map": m, "Elo": round(e, 2)}
                for t, maps in snap.items() for m, e in maps.items()]
        return pd.DataFrame(rows).sort_values(["Team", "Map"])

    # ── INTERNALS ────────────────────────────────────────────────────
    def _snapshot_before(self, date: str | pd.Timestamp):
        ts = pd.to_datetime(date)
        idx = bisect.bisect_left(self._dates, ts) - 1
        if idx < 0:                       # before first match → all default
            return defaultdict(lambda: defaultdict(lambda: self._default))
        return self._snapshots[idx]

# ── BUILD FUNCTION ──────────────────────────────────────────────────────
def build_elo_history(csv_path: Path = DATA_PATH,
                      default_elo: float = DEFAULT_ELO,
                      k_factor: float = K_FACTOR,
                      margin_scale: float = MARGIN_SCALE) -> EloHistory:
    """Parse CSV, compute Elo match-by-match, return EloHistory instance."""
    df = pd.read_csv(csv_path)

    matches = (
        df.groupby(["MatchID", "Map", "Team"])
          .agg(Winner=("Winner", "max"), MapScore=("MapScore", "first"))
          .reset_index()
    )
    dates = df[["MatchID", "Date"]].drop_duplicates()
    matches = (matches.merge(dates, on="MatchID")
                      .assign(Date=lambda d: pd.to_datetime(d["Date"]))
                      .sort_values("Date"))

    elo        = defaultdict(lambda: defaultdict(lambda: default_elo))
    snapshots  = []
    snap_dates = []

    for (_, mp), g in matches.groupby(["MatchID", "Map"], sort=False):
        if len(g) != 2:                       # skip forfeits
            continue
        a, b = g.iloc[0], g.iloc[1]
        a_team, b_team = a["Team"], b["Team"]
        a_win         = bool(int(a["Winner"]))
        s_a, s_b      = parse_score(a["MapScore"])
        margin        = max(1, abs(s_a - s_b))
        scaling       = 1 + (margin - 1) * margin_scale

        elo_a0, elo_b0 = elo[a_team][mp], elo[b_team][mp]
        delta_a = k_factor * scaling * ((1 if a_win else 0) - expected(elo_a0, elo_b0))
        delta_b = -delta_a
        elo[a_team][mp] += delta_a
        elo[b_team][mp] += delta_b

        # deep-copy current state for snapshot
        snapshots.append({t: dict(maps) for t, maps in elo.items()})
        snap_dates.append(g["Date"].iloc[0])

    return EloHistory(snapshots, snap_dates, default_elo)

# ── EXAMPLE USAGE ───────────────────────────────────────────────────────
if __name__ == "__main__":
    elo_hist = build_elo_history()

    # Elo of FNATIC on Ascent before Feb-15-2025
    print("FNATIC Ascent elo on 2025-02-15:",
          elo_hist.rating("FNATIC", "Ascent", "2025-02-15"))

    # All map Elos for Team Liquid as of same date
    print("\nTEAM LIQUID map ratings (as of 2025-02-15):")
    print(elo_hist.ratings("TEAM LIQUID", "2025-02-15"))

    # Full table (latest snapshot)
    print("\nFinal Elo table:")
    print(elo_hist.table().to_string(index=False))


FNATIC Ascent elo on 2025-02-15: 100

TEAM LIQUID map ratings (as of 2025-02-15):
{}

Final Elo table:
Team      Map    Elo
 APK   Ascent  84.24
 APK    Haven  95.40
 APK    Lotus  68.41
 APK    Split  85.97
 BBL   Ascent 104.60
 BBL Fracture  92.59
 BBL    Haven 133.02
 BBL   Icebox  95.15
 BBL    Lotus 111.19
 BBL    Pearl 110.63
 FNC   Ascent  92.80
 FNC Fracture 104.60
 FNC    Haven  96.04
 FNC    Lotus 135.50
 FNC    Split 121.57
 FUT   Ascent  95.40
 FUT Fracture 104.60
 FUT   Icebox  87.86
 FUT    Lotus  99.20
 FUT    Pearl 118.76
 FUT    Split  89.11
  GX   Ascent 112.38
  GX Fracture 108.51
  GX    Haven  74.60
  GX    Lotus  93.95
  GX    Pearl  85.00
  GX    Split  97.40
  KC    Haven 105.21
  KC   Icebox 130.53
  KC    Lotus  85.61
  KC    Pearl 131.12
  KC    Split  95.03
  M8 Fracture  95.40
  M8    Haven  86.18
  M8   Icebox  78.89
  M8    Lotus  95.40
  M8    Pearl  68.31
  M8    Split  88.17
MKOI   Icebox 110.97
MKOI    Lotus  70.59
MKOI    Pearl  71.17
MKOI    Split  

In [5]:
import pandas as pd
from functools import lru_cache

# ── (1) BUILD ELO HISTORY  ──────────────────────────────────────────────
elo_hist = build_elo_history()        # ← uses the function in the previous block

# We also need the matches DF (with Date) for “maps played” look-ups
df_raw   = pd.read_csv(DATA_PATH)
matches_df = (
    df_raw.groupby(["MatchID", "Map", "Team"])
          .agg(Date=("Date", "first"))        # Date string → first instance
          .reset_index()
          .assign(Date=lambda d: pd.to_datetime(d["Date"]))
)

ALL_MAPS = sorted(matches_df["Map"].unique())   # every map in the data

# ── (2) HELPER CACHES  ──────────────────────────────────────────────────
@lru_cache(maxsize=None)
def maps_played_by(team: str, up_to: pd.Timestamp):
    """Set of maps this team had played before `up_to`."""
    played = matches_df[(matches_df["Team"] == team)
                        & (matches_df["Date"] < up_to)]["Map"].unique()
    return set(played)

def avg_played_elo(team: str, up_to: pd.Timestamp):
    """Average Elo on maps the team *has* played."""
    played = maps_played_by(team, up_to)
    if not played:
        return DEFAULT_ELO
    return sum(elo_hist.rating(team, m, up_to) for m in played) / len(played)

def team_class(team: str, opponent: str, up_to: pd.Timestamp):
    """'good' if its avg-played Elo ≥ opponent’s, else 'bad'."""
    return ("good" if avg_played_elo(team, up_to) >=
                      avg_played_elo(opponent, up_to) else "bad")

# ── (3) MAP-SELECTION SIMULATION  ───────────────────────────────────────
def simulate_map_selection(teamA: str,
                           teamB: str,
                           as_of_date: str | pd.Timestamp,
                           elo: EloHistory = elo_hist,
                           matches=matches_df) -> dict:
    """
    Returns dict with distinct BanA, BanB, PickA, PickB following:
      • Bad teams perma-ban unseen maps first, else their lowest-Elo map
      • Good teams ban their worst (lowest own-Elo) map
      • Picks favour own high-Elo + comfort bonus if opponent has played map
    """
    as_of_date = pd.to_datetime(as_of_date)

    # Pre-compute convenience sets
    played_A = maps_played_by(teamA, as_of_date)
    played_B = maps_played_by(teamB, as_of_date)

    class_A  = team_class(teamA, teamB, as_of_date)
    class_B  = team_class(teamB, teamA, as_of_date)

    def own(t, m): return elo.rating(t, m, as_of_date)

    # ---------- BAN PHASE ----------
    pool = ALL_MAPS.copy()

    def choose_ban(team, cls, played_set):
        if cls == "bad":
            unseen = [m for m in pool if m not in played_set]
            if unseen:
                return unseen[0]       # any unseen map suffices
        # otherwise ban lowest own-Elo map
        return min(pool, key=lambda m: own(team, m))

    ban_A = choose_ban(teamA, class_A, played_A); pool.remove(ban_A)
    ban_B = choose_ban(teamB, class_B, played_B); pool.remove(ban_B)

    # ---------- PICK PHASE ----------
    def comfort_bonus(team, m):
        # +10 if opponent has played map, −10 otherwise
        opponent_played = (m in played_B) if team == teamA else (m in played_A)
        return 10 if opponent_played else -10

    def desirability(team, m):
        # High value → more desirable for `team`
        return own(team, m) + comfort_bonus(team, m)

    pick_A = max(pool, key=lambda m: desirability(teamA, m)); pool.remove(pick_A)
    pick_B = max(pool, key=lambda m: desirability(teamB, m)); pool.remove(pick_B)

    return {
        "Date": as_of_date.date(),
        "TeamA": teamA, "TeamB": teamB,
        "ClassA": class_A, "ClassB": class_B,
        "BanA": ban_A, "BanB": ban_B,
        "PickA": pick_A, "PickB": pick_B
    }

# ── (4) EXAMPLE CALL  ───────────────────────────────────────────────────
result = simulate_map_selection("FNATIC", "TEAM LIQUID", "2025-05-15")
print(result)


{'Date': datetime.date(2025, 5, 15), 'TeamA': 'FNATIC', 'TeamB': 'TEAM LIQUID', 'ClassA': 'good', 'ClassB': 'good', 'BanA': 'Ascent', 'BanB': 'Fracture', 'PickA': 'Haven', 'PickB': 'Icebox'}


In [7]:
import numpy as np
from math import log10
from functools import lru_cache

# ── MAP-LENGTH PREDICTOR ─────────────────────────────────────────────────
def predict_map_length(elo_a: float,
                       elo_b: float,
                       sims: int = 5_000,
                       c: float = 40.0,
                       return_median: bool = False) -> float:
    """
    Monte-Carlo estimate of map length (number of rounds) given two map-Elos.

    Parameters
    ----------
    elo_a, elo_b : float
        Elo ratings of Team A and Team B on THIS map.
    sims : int
        Number of Monte-Carlo runs (5 000 is accurate to ±0.1 rounds).
    c : float
        Logistic scaling constant (40 keeps consistency with 100-point Elo).
    return_median : bool
        If True, return the median rounds instead of the mean.

    Returns
    -------
    float : expected (or median) number of rounds.
    """
    p_a = 1.0 / (1.0 + 10.0 ** (-(elo_a - elo_b) / c))  # Team A round win-prob
    rng  = np.random.default_rng()
    lengths = np.empty(sims, dtype=np.int16)

    for i in range(sims):
        a, b = 0, 0
        while True:
            # stop if win-condition met
            if (a >= 13 or b >= 13) and abs(a - b) >= 2:
                lengths[i] = a + b
                break
            # play one round
            if rng.random() < p_a:
                a += 1
            else:
                b += 1

    return float(np.median(lengths) if return_median else lengths.mean())


# ── EXAMPLE HOOK-UP WITH EloHistory ─────────────────────────────────────
# Assume you already created `elo_hist = build_elo_history()`

def predict_map_length_by_date(teamA: str,
                               teamB: str,
                               map_name: str,
                               date: str,
                               sims: int = 5_000) -> float:
    """
    Convenience wrapper: pull the two teams' map-Elos as of `date`
    and feed into the predictor.
    """
    elo_a = elo_hist.rating(teamA, map_name, date)
    elo_b = elo_hist.rating(teamB, map_name, date)
    return predict_map_length(elo_a, elo_b, sims=sims)


# ── QUICK TEST ──────────────────────────────────────────────────────────
if __name__ == "__main__":
    team1, team2, map_name, date = "FNATIC", "TEAM LIQUID", "Icebox", "2025-05-15"
    est_rounds = predict_map_length_by_date(team1, team2, map_name, date, sims=8000)
    print(f"Expected rounds on {map_name} ({team1} vs {team2} as of {date}): "
          f"{est_rounds:.2f}")


Expected rounds on Icebox (FNATIC vs TEAM LIQUID as of 2025-05-15): 22.47
