In [1]:
"""
UFC model processing pipeline for Kaggle.

This script provides a high‑performance reimplementation of the UFC fight
statistics pipeline tailored for Kaggle.  It reads a Silver++ dataset
from a local CSV, computes a variety of engineered features and
rolling statistics, and writes the results back to a CSV.  The
pipeline avoids expensive pandas ``rolling()`` calls by using NumPy
cumulative sums and vectorised operations throughout.  All rolling
windows exclude the current fight (equivalent to a ``shift(1)`` in
pandas) to prevent data leakage into future predictions.

Input:
    /kaggle/input/ufc-fight-forecast-complete-gold-modeling-dataset/UFC_full_data_silver.csv

Output:
    ufc_model_full_analysis_rounds.csv in the working directory

The high level steps are:

1. Read the Silver++ CSV and sort by event_date.
2. Add base features (fight duration, per‑round durations, winner encoding).
3. Compute strike breakdowns and quality metrics.
4. Convert to long format with two rows per fight (one per fighter).
5. Compute rolling outcomes and rolling averages/ratios using NumPy.
6. Pivot rolling stats back to wide format and merge onto the base data.
7. Add ordinal fight numbers, ages and cumulative fight counts.
8. Compute head‑to‑head differences and interactions between fighters.
9. Coerce boolean columns and sanitise types, then write to CSV.

Note: This version does not interact with BigQuery and can be run
directly on Kaggle.
"""

from __future__ import annotations

import numpy as np
import pandas as pd
import warnings
import time
from typing import Callable, Dict, Tuple

# Suppress noisy pandas warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
pd.options.mode.chained_assignment = None

# Rolling window sizes (3 to 15 previous fights)
ROLLING_WINDOWS: list[int] = list(range(3, 16))


# -----------------------------------------------------------------------------
# Utility functions
# -----------------------------------------------------------------------------
def _infer_per_round_categories(df: pd.DataFrame) -> list[str]:
    """
    Infer the set of per‑round strike categories present in the DataFrame.

    Categories are derived from column names matching the pattern
    'f_[12]_r[1-5]_<category>_(succ|att)'.  If no such columns are found,
    a default set of common categories is returned.
    """
    import re
    cats = set()
    for c in df.columns:
        m = re.match(r"^f_[12]_r[1-5]_([A-Za-z0-9_]+)_(succ|att)$", str(c))
        if m:
            cats.add(m.group(1))
    if cats:
        return sorted(cats)
    return ['sig_strikes', 'total_strikes', 'takedown', 'head', 'body', 'leg']


def time_to_minutes(t: str | None) -> float:
    """Convert a time string "M:S" to minutes, returning zero on failure."""
    try:
        if not t:
            return 0.0
        m, s = t.split(":")
        return (int(m) * 60 + int(s)) / 60.0
    except Exception:
        return 0.0


def time_to_seconds(t: str | None) -> int:
    """Convert a time string "M:S" to seconds, returning zero on failure."""
    try:
        if not t:
            return 0
        m, s = t.split(":")
        return int(m) * 60 + int(s)
    except Exception:
        return 0


def _to_float(s: pd.Series) -> np.ndarray:
    """Convert a Series to a float NumPy array, coercing non‑numeric to NaN."""
    return pd.to_numeric(s, errors='coerce').astype(float).to_numpy()


def _safe_div(num: pd.Series | np.ndarray, den: pd.Series | np.ndarray) -> np.ndarray:
    """
    Perform elementwise division safely, returning NaN where the denominator is <= 0.

    Accepts pandas Series or NumPy arrays and coerces them to float arrays.  Zero
    or negative denominators yield NaN.  This mirrors ``np.divide`` with a mask
    but avoids issues with pandas nullable dtypes.
    """
    n = _to_float(num)
    d = _to_float(den)
    out = np.full_like(n, np.nan, dtype=float)
    mask = d > 0
    out[mask] = n[mask] / d[mask]
    return out


# -----------------------------------------------------------------------------
# Data retrieval and base feature preparation
# -----------------------------------------------------------------------------
def get_full_fight_data() -> pd.DataFrame:
    """
    Read the full fight data from the Kaggle input CSV.  The results are
    ordered by ``event_date`` to ensure rolling computations are chronological.
    """
    path = "/kaggle/input/ufc-fight-forecast-complete-gold-modeling-dataset/UFC_full_data_silver.csv"
    df = pd.read_csv(path)
    df = df.sort_values('event_date').reset_index(drop=True)
    return df


def add_winner_encoded(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add a column ``winner_encoded`` encoding the result of the fight:
        1 if the first fighter won, 0 if the second fighter won, -1 for draw/NC.
    """
    df = df.copy()
    df['winner_encoded'] = np.select(
        [df['winner'] == df['f_1_name'],
         df['winner'] == df['f_2_name'],
         df['winner'].isin(['Draw', 'No Contest'])],
        [1, 0, -1], default=-1
    )
    return df


def prepare_base_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Derive base level features from raw fight data.

    * Parse ``event_date`` strings into pandas Timestamps.
    * Compute the total fight duration in minutes from ``finish_round`` and
      ``finish_time``.
    * Create per‑round duration columns ``r1_duration``..``r5_duration``.
    """
    df = df.copy()
    df['event_date'] = pd.to_datetime(df['event_date'])
    # Fight duration in minutes
    df['fight_duration_minutes'] = df.apply(
        lambda r: (int(r['finish_round']) - 1) * 5 + time_to_minutes(r['finish_time']),
        axis=1
    )
    # Per‑round durations
    def _durations(row: pd.Series) -> pd.Series:
        fin_r = int(row['finish_round']) if pd.notna(row['finish_round']) else 1
        fin_m = time_to_minutes(row['finish_time'])
        res: Dict[str, float] = {}
        for r in range(1, 6):
            if r < fin_r:
                res[f'r{r}_duration'] = 5.0
            elif r == fin_r:
                res[f'r{r}_duration'] = fin_m
            else:
                res[f'r{r}_duration'] = 0.0
        return pd.Series(res)
    df = pd.concat([df, df.apply(_durations, axis=1)], axis=1)
    return df


# -----------------------------------------------------------------------------
# Strike breakdown calculations
# -----------------------------------------------------------------------------
def compute_strike_breakdowns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate strike accuracy and share metrics by body part and position.

    For each fighter prefix (``f_1`` and ``f_2``) aggregate per‑round strike
    statistics into fight‑level totals and compute accuracy and share metrics.
    Also compute per‑round accuracy and shares.  Missing per‑round columns
    are created on the fly and filled with zeros.
    """
    df = df.copy()
    categories = _infer_per_round_categories(df)
    default_categories = ['head', 'body', 'leg', 'distance', 'clinch', 'ground']
    categories_set = list(dict.fromkeys(default_categories + categories))
    # Ensure expected per‑round columns exist
    potential_cols = [
        f"{prefix}_r{r}_{cat}{suf}"
        for prefix in ['f_1', 'f_2']
        for r in range(1, 6)
        for cat in categories_set
        for suf in ['_succ', '_att']
    ]
    missing = [c for c in potential_cols if c not in df.columns]
    if missing:
        df[missing] = 0
    new_cols: Dict[str, pd.Series] = {}
    for prefix in ['f_1', 'f_2']:
        for cat in categories_set:
            succ_cols = [f'{prefix}_r{r}_{cat}_succ' for r in range(1, 6)]
            att_cols  = [f'{prefix}_r{r}_{cat}_att'  for r in range(1, 6)]
            succ_total = df[succ_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1)
            att_total  = df[att_cols ].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1)
            new_cols[f'{prefix}_{cat}_succ_total'] = succ_total
            new_cols[f'{prefix}_{cat}_att_total']  = att_total
            new_cols[f'{prefix}_{cat}_acc']        = _safe_div(succ_total, att_total)
        hbl_cats = [c for c in ['head', 'body', 'leg'] if c in categories_set]
        total_succ_hbl = sum(new_cols[f'{prefix}_{cat}_succ_total'] for cat in hbl_cats) if hbl_cats else 0
        for cat in hbl_cats:
            new_cols[f'{prefix}_{cat}_share'] = _safe_div(new_cols[f'{prefix}_{cat}_succ_total'], total_succ_hbl)
        pos_cats = [c for c in ['distance', 'clinch', 'ground'] if c in categories_set]
        total_succ_pos = sum(new_cols[f'{prefix}_{cat}_succ_total'] for cat in pos_cats) if pos_cats else 0
        for cat in pos_cats:
            new_cols[f'{prefix}_{cat}_share'] = _safe_div(new_cols[f'{prefix}_{cat}_succ_total'], total_succ_pos)
        for r in range(1, 6):
            total_round_hbl = sum(
                pd.to_numeric(df[f'{prefix}_r{r}_{cat}_succ'], errors='coerce').fillna(0)
                for cat in hbl_cats
            ) if hbl_cats else 0
            for cat2 in hbl_cats:
                new_cols[f'{prefix}_r{r}_{cat2}_acc'] = _safe_div(
                    df[f'{prefix}_r{r}_{cat2}_succ'], df[f'{prefix}_r{r}_{cat2}_att']
                )
                new_cols[f'{prefix}_r{r}_{cat2}_share'] = _safe_div(
                    df[f'{prefix}_r{r}_{cat2}_succ'], total_round_hbl
                )
            total_round_pos = sum(
                pd.to_numeric(df[f'{prefix}_r{r}_{cat}_succ'], errors='coerce').fillna(0)
                for cat in pos_cats
            ) if pos_cats else 0
            for cat2 in pos_cats:
                new_cols[f'{prefix}_r{r}_{cat2}_acc'] = _safe_div(
                    df[f'{prefix}_r{r}_{cat2}_succ'], df[f'{prefix}_r{r}_{cat2}_att']
                )
                new_cols[f'{prefix}_r{r}_{cat2}_share'] = _safe_div(
                    df[f'{prefix}_r{r}_{cat2}_succ'], total_round_pos
                )
    df2 = pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)
    return df2


# -----------------------------------------------------------------------------
# Quality feature calculations
# -----------------------------------------------------------------------------
def compute_quality_features(row: pd.Series, prefix: str) -> dict[str, float]:
    """
    Compute eight custom quality metrics for a single fight and fighter.

    These heuristic metrics combine fight statistics into scores for
    physical strength, punching power, dynamism, speed, timing, footwork,
    chin and cardio.  The calculations mirror those in the BigQuery
    pipeline.  Missing values are treated as zeros where appropriate.
    """
    opp = 'f_2' if prefix == 'f_1' else 'f_1'
    def _get_val(col: str, default=0):
        val = row.get(col, default)
        if val is None or (isinstance(val, (float, int, np.number)) and pd.isna(val)):
            return default
        if pd.isna(val):
            return default
        return val
    fight_min = max(_get_val('fight_duration_minutes', 0.0), 1e-9)
    max_min   = float(_get_val('num_rounds', 3)) * 5.0
    td_succ = _get_val(f'{prefix}_takedown_succ', 0)
    td_att  = _get_val(f'{prefix}_takedown_att', 0)
    td_acc  = (td_succ / td_att) if td_att > 0 else 0.0
    opp_td_succ = _get_val(f'{opp}_takedown_succ', 0)
    opp_td_att  = _get_val(f'{opp}_takedown_att', 0)
    td_def = 1.0 - (opp_td_succ / opp_td_att) if opp_td_att > 0 else 1.0
    ctrl_sec = _get_val(f'{prefix}_ctrl_time_sec', 0)
    ctrl_ratio = ctrl_sec / fight_min
    rev_self = _get_val(f'{prefix}_reversals', 0)
    rev_opp  = _get_val(f'{opp}_reversals', 0)
    total_rev = rev_self + rev_opp
    rev_share = (rev_self / total_rev) if total_rev > 0 else 0.0
    physical_strength = 0.4 * td_acc + 0.3 * td_def + 0.2 * ctrl_ratio + 0.1 * rev_share
    kd     = _get_val(f'{prefix}_knockdowns', 0)
    s_succ = _get_val(f'{prefix}_sig_strikes_succ', 0)
    s_att  = _get_val(f'{prefix}_sig_strikes_att', 0)
    fighter_name = row.get(f'{prefix}_name')
    winner = row.get('winner')
    result = (row.get('result') or '')
    is_win = (winner == fighter_name)
    ko_win = bool(is_win and (('KO' in result) or ('TKO' in result)))
    fin_round = int(_get_val('finish_round', 0))
    num_r     = int(_get_val('num_rounds', 3))
    finish_bonus = ((num_r - fin_round + 1) / num_r) if (ko_win and num_r > 0) else 0.0
    power_ratio = (kd / s_succ) if s_succ > 0 else 0.0
    punching_power = 0.5 * power_ratio + 0.3 * (1.0 if ko_win else 0.0) + 0.2 * finish_bonus
    remaining = max(max_min - fight_min, 0.0)
    finish_factor = (remaining / max_min) if (result != 'Decision' and max_min > 0) else 0.0
    actions = s_succ + td_succ + _get_val(f'{prefix}_submission_att', 0) + kd
    actions_per_min = actions / fight_min
    kd_per_min = kd / fight_min
    dynamika = 0.5 * finish_factor + 0.3 * actions_per_min + 0.2 * kd_per_min
    opp_succ = _get_val(f'{opp}_sig_strikes_succ', 0)
    opp_att  = _get_val(f'{opp}_sig_strikes_att', 0)
    slpm = s_succ / fight_min
    diff_per_min = (s_succ - opp_succ) / fight_min
    str_def = 1.0 - (opp_succ / opp_att) if opp_att > 0 else 1.0
    speed = 0.6 * slpm + 0.2 * diff_per_min + 0.2 * str_def
    strike_acc  = (s_succ / s_att) if s_att > 0 else 0.0
    kd_eff      = (kd / s_succ) if s_succ > 0 else 0.0
    timing = 0.7 * strike_acc + 0.2 * kd_eff + 0.1 * td_acc
    sapm = opp_succ / fight_min
    footwork = 0.5 * str_def + 0.3 * td_def + 0.2 * (1.0 - (sapm / 10.0))
    opp_kd = _get_val(f'{opp}_knockdowns', 0)
    lost_by_ko = (not is_win) and (('KO' in result) or ('TKO' in result))
    chin = 0.0 if lost_by_ko else (opp_succ / (opp_kd + 1))
    duration_ratio = (fight_min / max_min) if max_min > 0 else 0.0
    cardio = 0.7 * slpm + 0.3 * duration_ratio
    return {
        'physical_strength': physical_strength,
        'punching_power':    punching_power,
        'dynamika':          dynamika,
        'speed':             speed,
        'timing':            timing,
        'footwork':          footwork,
        'chin':              chin,
        'cardio':            cardio,
    }


# -----------------------------------------------------------------------------
# Long format transformation
# -----------------------------------------------------------------------------
def prepare_long_format(df: pd.DataFrame) -> pd.DataFrame:
    """
    Reshape the fight level DataFrame into long format with two rows per fight.

    Two rows per fight (roles f_1 and f_2) allow us to compute
    fighter‑centric rolling statistics.  This implementation builds
    the new DataFrame in a vectorised fashion to avoid fragmentation.
    """
    rows: list[pd.DataFrame] = []
    cats = ['head', 'body', 'leg', 'distance', 'clinch', 'ground']
    for prefix, opp, role in [('f_1', 'f_2', 'f_1'), ('f_2', 'f_1', 'f_2')]:
        data: Dict[str, pd.Series] = {}
        data['event_date'] = df['event_date']
        data['fight_url']  = df['fight_url']
        data['fighter']    = df[f'{prefix}_url']
        data['opponent']   = df[f'{opp}_url']
        data['role']       = role
        is_winner = (df['winner'] == df[f'{prefix}_name']).astype(int)
        is_loss   = (df['winner'] == df[f'{opp}_name']).astype(int)
        data['is_winner'] = is_winner
        data['is_loss']   = is_loss
        data['finish_win'] = np.where((is_winner == 1) & df['result'].str.contains('KO|TKO', na=False), 1, 0)
        data['submission_win'] = np.where((is_winner == 1) & (df['result'] == 'Submission'), 1, 0)
        data['finish_loss'] = np.where((is_loss == 1) & df['result'].str.contains('KO|TKO', na=False), 1, 0)
        data['submission_loss'] = np.where((is_loss == 1) & (df['result'] == 'Submission'), 1, 0)
        for stat in ['sig_strikes_succ', 'sig_strikes_att', 'takedown_succ',
                     'takedown_att', 'submission_att', 'reversals']:
            data[f'fighter_{stat}'] = df[f'{prefix}_{stat}']
            data[f'opp_{stat}']     = df[f'{opp}_{stat}']
        data['fighter_ctrl_time_sec'] = df[f'{prefix}_ctrl_time_sec']
        data['opp_ctrl_time_sec']     = df[f'{opp}_ctrl_time_sec']
        data['fight_duration_minutes'] = df['fight_duration_minutes']
        for r in range(1, 6):
            sfx = f'_r{r}'
            data[f'fighter_sig_strikes_succ_r{r}'] = df[f'{prefix}{sfx}_sig_strikes_succ']
            data[f'fighter_sig_strikes_att_r{r}']  = df[f'{prefix}{sfx}_sig_strikes_att']
            data[f'opp_sig_strikes_succ_r{r}']     = df[f'{opp}{sfx}_sig_strikes_succ']
            data[f'opp_sig_strikes_att_r{r}']      = df[f'{opp}{sfx}_sig_strikes_att']
            data[f'fighter_takedown_succ_r{r}']    = df[f'{prefix}{sfx}_td_1_succ']
            data[f'fighter_takedown_att_r{r}']     = df[f'{prefix}{sfx}_td_1_att']
            data[f'opp_takedown_succ_r{r}']        = df[f'{opp}{sfx}_td_1_succ']
            data[f'opp_takedown_att_r{r}']         = df[f'{opp}{sfx}_td_1_att']
            data[f'fighter_submission_att_r{r}']   = df[f'{prefix}{sfx}_submission_att']
            data[f'opp_submission_att_r{r}']       = df[f'{opp}{sfx}_submission_att']
            data[f'fighter_reversals_r{r}']        = df[f'{prefix}{sfx}_reversals']
            data[f'opp_reversals_r{r}']            = df[f'{opp}{sfx}_reversals']
            data[f'fighter_ctrl_r{r}_sec']         = df[f'{prefix}{sfx}_ctrl'].apply(time_to_seconds)
            data[f'opp_ctrl_r{r}_sec']             = df[f'{opp}{sfx}_ctrl'].apply(time_to_seconds)
            data[f'fighter_knockdowns_r{r}']       = df[f'{prefix}{sfx}_knockdowns']
            data[f'opp_knockdowns_r{r}']           = df[f'{opp}{sfx}_knockdowns']
            data[f'duration_r{r}']                 = df[f'r{r}_duration']
            data[f'duration_r{r}_sec']             = df[f'r{r}_duration'] * 60.0
        data['fighter_knockdowns'] = df[f'{prefix}_knockdowns']
        data['opp_knockdowns']     = df[f'{opp}_knockdowns']
        for cat in cats:
            data[f'{cat}_acc']   = df[f'{prefix}_{cat}_acc']
            data[f'{cat}_share'] = df[f'{prefix}_{cat}_share']
        for r in range(1, 6):
            for cat2 in cats:
                data[f'{cat2}_acc_r{r}']   = df[f'{prefix}_r{r}_{cat2}_acc']
                data[f'{cat2}_share_r{r}'] = df[f'{prefix}_r{r}_{cat2}_share']
        qual_df = df.apply(lambda row: compute_quality_features(row, prefix), axis=1, result_type='expand')
        for col in ['physical_strength','punching_power','dynamika','speed','timing','footwork','chin','cardio']:
            data[col] = qual_df[col]
        t = pd.DataFrame(data)
        rows.append(t)
    long_df = pd.concat(rows, ignore_index=True)
    return long_df.sort_values(['fighter', 'event_date']).reset_index(drop=True)


# -----------------------------------------------------------------------------
# Rolling outcomes
# -----------------------------------------------------------------------------
def calculate_outcomes(group: pd.DataFrame) -> pd.DataFrame:
    """
    Compute rolling win/loss counts and streaks for a single fighter.

    Uses cumulative sums to obtain the sum of the previous ``n`` fights
    efficiently.  Rolling windows exclude the current fight by subtracting
    cumulative sums at index ``i-n`` from those at ``i`` (i.e., sum of
    rows ``i-n`` through ``i-1``).  Streaks are capped by the window
    length.
    """
    g = group.sort_values('event_date').reset_index(drop=True)
    n_rows = len(g)
    is_win    = g['is_winner'].to_numpy(dtype=int)
    finish_win= g['finish_win'].to_numpy(dtype=int)
    sub_win   = g['submission_win'].to_numpy(dtype=int)
    is_loss   = g['is_loss'].to_numpy(dtype=int)
    finish_loss= g['finish_loss'].to_numpy(dtype=int)
    sub_loss  = g['submission_loss'].to_numpy(dtype=int)
    def cumsum_with_zero(x: np.ndarray) -> np.ndarray:
        cs = np.empty(n_rows + 1, dtype=int)
        cs[0] = 0
        cs[1:] = x.cumsum()
        return cs
    cs_win   = cumsum_with_zero(is_win)
    cs_fwin  = cumsum_with_zero(finish_win)
    cs_swin  = cumsum_with_zero(sub_win)
    cs_loss  = cumsum_with_zero(is_loss)
    cs_floss = cumsum_with_zero(finish_loss)
    cs_sloss = cumsum_with_zero(sub_loss)
    # Precompute streaks up to previous row
    streak_prev = np.zeros(n_rows, dtype=int)
    losing_prev = np.zeros(n_rows, dtype=int)
    run_win = 0
    run_loss = 0
    for i in range(n_rows):
        if is_win[i] == 1:
            run_win += 1
        else:
            run_win = 0
        if is_loss[i] == 1:
            run_loss += 1
        else:
            run_loss = 0
        streak_prev[i] = run_win
        losing_prev[i] = run_loss
    out_data: Dict[str, np.ndarray] = {}
    for n in ROLLING_WINDOWS:
        idx = np.arange(n_rows)
        prev_idx = np.maximum(0, idx - n)
        wins_n   = cs_win[idx]   - cs_win[prev_idx]
        fwin_n   = cs_fwin[idx]  - cs_fwin[prev_idx]
        swin_n   = cs_swin[idx]  - cs_swin[prev_idx]
        loss_n   = cs_loss[idx]  - cs_loss[prev_idx]
        floss_n  = cs_floss[idx] - cs_floss[prev_idx]
        sloss_n  = cs_sloss[idx] - cs_sloss[prev_idx]
        streak_n = np.zeros(n_rows, dtype=int)
        losing_n = np.zeros(n_rows, dtype=int)
        if n_rows > 1:
            streak_n[1:] = np.minimum(streak_prev[:-1], n)
            losing_n[1:] = np.minimum(losing_prev[:-1], n)
        out_data[f'wins_{n}']          = wins_n
        out_data[f'finish_wins_{n}']   = fwin_n
        out_data[f'sub_wins_{n}']      = swin_n
        out_data[f'losses_{n}']        = loss_n
        out_data[f'finish_losses_{n}'] = floss_n
        out_data[f'sub_losses_{n}']    = sloss_n
        out_data[f'streak_{n}']        = streak_n
        out_data[f'losing_streak_{n}'] = losing_n
    return pd.concat([g, pd.DataFrame(out_data, index=g.index)], axis=1)


# -----------------------------------------------------------------------------
# Rolling metrics (averages and ratios)
# -----------------------------------------------------------------------------
def calculate_group(group: pd.DataFrame) -> pd.DataFrame:
    """
    Compute rolling averages and ratios for a single fighter.

    Rolling windows exclude the current fight by looking back over the
    previous ``n`` fights.  Metrics include per‑fight and per‑round
    statistics, accuracy and share breakdowns, and the eight quality
    metrics.  All operations are vectorised via NumPy to avoid
    performance bottlenecks.
    """
    g = group.sort_values('event_date').reset_index(drop=True)
    n_rows = len(g)
    new_cols: Dict[str, np.ndarray] = {}
    metrics: Dict[str, Tuple[str | None, str | None, bool, bool]] = {}
    metrics.update({
        'slpm':      ('fighter_sig_strikes_succ','fight_duration_minutes',False,False),
        'str_acc':   ('fighter_sig_strikes_succ','fighter_sig_strikes_att',False,False),
        'sapm':      ('opp_sig_strikes_succ','fight_duration_minutes',False,False),
        'str_def':   ('opp_sig_strikes_succ','opp_sig_strikes_att',True,False),
        'td_avg':    ('fighter_takedown_succ','fight_duration_minutes',False,False),
        'td_acc':    ('fighter_takedown_succ','fighter_takedown_att',False,False),
        'td_def':    ('opp_takedown_succ','opp_takedown_att',True,False),
        'sub_avg':   ('fighter_submission_att','fight_duration_minutes',False,False),
        'ctrl_ratio':('fighter_ctrl_time_sec','fight_duration_minutes',False,False),
    })
    for r in range(1, 6):
        s = f'_r{r}'
        metrics.update({
            f'slpm{s}':      (f'fighter_sig_strikes_succ{s}', f'duration{s}', False, False),
            f'str_acc{s}':   (f'fighter_sig_strikes_succ{s}', f'fighter_sig_strikes_att{s}', False, False),
            f'sapm{s}':      (f'opp_sig_strikes_succ{s}',     f'duration{s}', False, False),
            f'str_def{s}':   (f'opp_sig_strikes_succ{s}',     f'opp_sig_strikes_att{s}', True, False),
            f'td_avg{s}':    (f'fighter_takedown_succ{s}',    f'duration{s}', False, False),
            f'td_acc{s}':    (f'fighter_takedown_succ{s}',    f'fighter_takedown_att{s}', False, False),
            f'td_def{s}':    (f'opp_takedown_succ{s}',        f'opp_takedown_att{s}', True, False),
            f'sub_avg{s}':   (f'fighter_submission_att{s}',   f'duration{s}', False, False),
            f'ctrl_ratio{s}':(f'fighter_ctrl{s}_sec',         f'duration{s}_sec', False, False),
        })
    cats = ['head','body','leg','distance','clinch','ground']
    for cat in cats:
        metrics[f'{cat}_acc']   = (f'{cat}_acc', None, False, True)
        metrics[f'{cat}_share'] = (f'{cat}_share', None, False, True)
        for r in range(1, 6):
            metrics[f'{cat}_acc_r{r}']   = (f'{cat}_acc_r{r}', None, False, True)
            metrics[f'{cat}_share_r{r}'] = (f'{cat}_share_r{r}', None, False, True)
    for q in ['physical_strength','punching_power','dynamika','speed','timing','footwork','chin','cardio']:
        metrics[q] = (q, None, False, True)
    cumsum_cache: Dict[str, np.ndarray] = {}
    count_cache: Dict[str, np.ndarray] = {}
    for name, (num, den, invert, is_avg) in metrics.items():
        if is_avg:
            if num not in cumsum_cache:
                arr = pd.to_numeric(g[num], errors='coerce').to_numpy(dtype=float)
                csum = np.empty(n_rows + 1, dtype=float)
                csum[0] = 0.0
                csum[1:] = np.nan_to_num(arr, nan=0.0).cumsum()
                cumsum_cache[num] = csum
                cnt = np.empty(n_rows + 1, dtype=int)
                cnt[0] = 0
                cnt[1:] = (~np.isnan(arr)).cumsum()
                count_cache[num] = cnt
        else:
            for col in [num, den]:
                if col not in cumsum_cache:
                    arr = pd.to_numeric(g[col], errors='coerce').to_numpy(dtype=float)
                    csum = np.empty(n_rows + 1, dtype=float)
                    csum[0] = 0.0
                    csum[1:] = np.nan_to_num(arr, nan=0.0).cumsum()
                    cumsum_cache[col] = csum
    for n in ROLLING_WINDOWS:
        idx = np.arange(n_rows)
        prev_idx = np.maximum(0, idx - n)
        for name, (num, den, invert, is_avg) in metrics.items():
            out_name = f'{name}_{n}'
            if is_avg:
                csum = cumsum_cache[num]
                cnt  = count_cache[num]
                sums = csum[idx] - csum[prev_idx]
                counts = cnt[idx] - cnt[prev_idx]
                vals = np.divide(sums, counts, out=np.full(n_rows, np.nan, dtype=float), where=counts > 0)
            else:
                num_csum = cumsum_cache[num]
                den_csum = cumsum_cache[den]
                num_sum = num_csum[idx] - num_csum[prev_idx]
                den_sum = den_csum[idx] - den_csum[prev_idx]
                vals = np.divide(num_sum, den_sum, out=np.full(n_rows, np.nan, dtype=float), where=den_sum > 0)
                if invert:
                    vals = 1.0 - vals
            new_cols[out_name] = vals
    return pd.concat([g, pd.DataFrame(new_cols, index=g.index)], axis=1)


# -----------------------------------------------------------------------------
# Merging and additional feature engineering
# -----------------------------------------------------------------------------
def merge_rolling_and_outcomes(df: pd.DataFrame, long_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merge rolling statistics back onto the fight level DataFrame.  Rolling
    columns are pivoted by fighter role (f_1/f_2) and flattened.  Missing
    rolling values are filled with zeros.
    """
    before_cols = set(df.columns)
    roll_cols = [c for c in long_df.columns if any(c.endswith(f'_{n}') for n in ROLLING_WINDOWS)]
    if not roll_cols:
        return df
    pivot = long_df.pivot(index='fight_url', columns='role', values=roll_cols)
    pivot.columns = [f'{col}_{role}' for col, role in pivot.columns]
    merged = df.merge(pivot, on='fight_url', how='left')
    added_cols = [c for c in merged.columns if c not in before_cols]
    if added_cols:
        merged[added_cols] = merged[added_cols].fillna(0)
    return merged


def add_fight_ordinal(df: pd.DataFrame, long_df: pd.DataFrame) -> pd.DataFrame:
    tmp = long_df.copy()
    tmp['fight_ordinal'] = tmp.groupby('fighter').cumcount() + 1
    pv = tmp.pivot(index='fight_url', columns='role', values='fight_ordinal')
    pv.columns = [f'fight_ordinal_{c}' for c in pv.columns]
    return df.merge(pv, on='fight_url', how='left')


def add_additional_features(df: pd.DataFrame, long_df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out['fighter_dob_f_1'] = pd.to_datetime(out['f_1_fighter_dob'], errors='coerce')
    out['fighter_dob_f_2'] = pd.to_datetime(out['f_2_fighter_dob'], errors='coerce')
    out['f_1_age'] = ((out['event_date'] - out['fighter_dob_f_1']).dt.days // 365)
    out['f_2_age'] = ((out['event_date'] - out['fighter_dob_f_2']).dt.days // 365)
    tmp = long_df.copy()
    tmp['fight_count'] = tmp.groupby('fighter').cumcount()
    pv = tmp.pivot(index='fight_url', columns='role', values='fight_count')
    pv.columns = [f'{c}_fight_number' for c in pv.columns]
    out = out.merge(pv, on='fight_url', how='left')
    return out


def add_diff_and_interaction_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    diff_cols: Dict[str, np.ndarray] = {}
    diff_cols['diff_age'] = (out['f_1_age'].to_numpy(dtype=float) - out['f_2_age'].to_numpy(dtype=float))
    diff_cols['diff_fight_number'] = (
        out['fight_ordinal_f_1'].to_numpy(dtype=float) - out['fight_ordinal_f_2'].to_numpy(dtype=float)
    )
    f1_rank = out.get('f_1_ranking').to_numpy(dtype=float) if 'f_1_ranking' in out else np.full(len(out), np.nan)
    f2_rank = out.get('f_2_ranking').to_numpy(dtype=float) if 'f_2_ranking' in out else np.full(len(out), np.nan)
    diff_cols['diff_ranking'] = f1_rank - f2_rank
    f1_odds = out.get('f_1_odds').to_numpy(dtype=float) if 'f_1_odds' in out else np.full(len(out), np.nan)
    f2_odds = out.get('f_2_odds').to_numpy(dtype=float) if 'f_2_odds' in out else np.full(len(out), np.nan)
    diff_cols['diff_odds'] = f1_odds - f2_odds
    base_metrics = [
        'slpm','str_acc','sapm','str_def','td_avg','td_acc','td_def','sub_avg','ctrl_ratio',
        'physical_strength','punching_power','dynamika','speed','timing','footwork','chin','cardio'
    ]
    windows = list(ROLLING_WINDOWS)
    for m in base_metrics:
        for n in windows:
            c1 = f'{m}_{n}_f_1'
            c2 = f'{m}_{n}_f_2'
            if c1 in out.columns and c2 in out.columns:
                diff_cols[f'diff_{m}_{n}'] = (
                    out[c1].to_numpy(dtype=float) - out[c2].to_numpy(dtype=float)
                )
    cats = ['head','body','leg','distance','clinch','ground']
    suffixes = ['', '_r1','_r2','_r3','_r4','_r5']
    types = ['acc','share']
    for base in cats:
        for suf in suffixes:
            for t in types:
                name = f'{base}_{t}{suf}'
                for n in windows:
                    c1 = f'{name}_{n}_f_1'
                    c2 = f'{name}_{n}_f_2'
                    if c1 in out.columns and c2 in out.columns:
                        diff_cols[f'diff_{name}_{n}'] = (
                            out[c1].to_numpy(dtype=float) - out[c2].to_numpy(dtype=float)
                        )
    if diff_cols:
        diff_df = pd.DataFrame(diff_cols, index=out.index)
        out = pd.concat([out, diff_df], axis=1)
    return out


# -----------------------------------------------------------------------------
# Boolean and dtype sanitisation
# -----------------------------------------------------------------------------
def _coerce_boolean_like(df: pd.DataFrame) -> list[str]:
    touched: list[str] = []
    for c in df.columns:
        s = df[c]
        if s.dtype == bool:
            df[c] = s.astype('boolean')
            touched.append(c)
            continue
        if s.dtype == object:
            non_na = s.dropna()
            if len(non_na) and non_na.isin([True, False]).all():
                df[c] = s.astype('boolean')
                touched.append(c)
    return touched


def _coerce_booleans_for_csv(df: pd.DataFrame) -> list[str]:
    """
    Convert boolean and boolean‑like columns to native Python bool for CSV output.
    Missing values are filled with False.  Integer 0/1 columns are left unchanged.
    """
    touched: list[str] = []
    for c in df.columns:
        s = df[c]
        try:
            if pd.api.types.is_bool_dtype(s):
                df[c] = s.fillna(False).astype(bool)
                touched.append(c)
                continue
            if s.dtype == object:
                unique_vals = set(s.dropna().unique().tolist())
                if unique_vals.issubset({True, False, 1, 0, '1','0','true','false','True','False','TRUE','FALSE'}):
                    df[c] = (
                        s.map(lambda x: True if x in {True,1,'1','true','True','TRUE'} else False)
                          .fillna(False)
                          .astype(bool)
                    )
                    touched.append(c)
                    continue
        except Exception:
            df[c] = s.fillna(0).map(lambda x: 1 if x in {True,1,'1','true','True','TRUE'} else 0).astype('int8')
            touched.append(c)
    return touched


def _sanitize_all_dtypes_for_csv(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    num_cols = out.select_dtypes(include=[np.number]).columns
    if len(num_cols):
        out[num_cols] = out[num_cols].replace([np.inf, -np.inf], np.nan)
    _coerce_booleans_for_csv(out)
    cat_cols = out.select_dtypes(include=['category']).columns
    if len(cat_cols):
        out[cat_cols] = out[cat_cols].astype('string')
    obj_cols = [c for c in out.columns if out[c].dtype == object]
    for c in obj_cols:
        if out[c].dropna().map(lambda v: isinstance(v, (list, dict, set, tuple))).any():
            out[c] = out[c].apply(lambda v: None if pd.isna(v) else str(v))
    bool_cols = [c for c in out.columns if pd.api.types.is_bool_dtype(out[c])]
    for c in bool_cols:
        out[c] = out[c].fillna(False).astype(bool)
    return out


# -----------------------------------------------------------------------------
# Main pipeline
# -----------------------------------------------------------------------------
def process_new_fights() -> None:
    """Run the full processing pipeline and write the result to CSV."""
    total_start = time.time()
    df = get_full_fight_data()
    df = add_winner_encoded(df)
    df = prepare_base_features(df)
    df = compute_strike_breakdowns(df)
    long_df = prepare_long_format(df)
    # Rolling outcomes and metrics
    long_df = long_df.groupby('fighter', group_keys=False).apply(calculate_outcomes)
    long_df = long_df.groupby('fighter', group_keys=False).apply(calculate_group)
    # Merge rolling stats back
    before_cols = set(df.columns)
    df = merge_rolling_and_outcomes(df, long_df)
    added_cols = [c for c in df.columns if c not in before_cols]
    df = add_fight_ordinal(df, long_df)
    df = add_additional_features(df, long_df)
    df = add_diff_and_interaction_features(df)
    # Fill missing rolling columns with zero
    if added_cols:
        df[added_cols] = df[added_cols].fillna(0)
    rolling_like = [c for c in df.columns if any(c.endswith(f'_{n}') for n in ROLLING_WINDOWS)]
    if rolling_like:
        df[rolling_like] = df[rolling_like].fillna(0)
    _coerce_boolean_like(df)
    df = _sanitize_all_dtypes_for_csv(df)
    # Write to CSV in Kaggle working directory
    output_path = 'ufc_model_full_analysis_rounds.csv'
    df.to_csv(output_path, index=False)
    elapsed = time.time() - total_start
    print(f"✅ Written {output_path} — {len(df)} records | {elapsed:.2f}s")


if __name__ == '__main__':
    process_new_fights()

  long_df = long_df.groupby('fighter', group_keys=False).apply(calculate_outcomes)
  long_df = long_df.groupby('fighter', group_keys=False).apply(calculate_group)


✅ Written ufc_model_full_analysis_rounds.csv — 8217 records | 279.85s
