# AoE2 Replay Analyzer — RECSAGE
Métricas clave por jugador: aldeanos creados, tiempo de TC inactivo (idle) y APM.

Funciona en local y en Google Colab. Si usas Colab, ejecuta primero la celda de instalación.


In [None]:
# Instalación/actualización automática de dependencias (ejecuta en este kernel)
import sys, subprocess, pkgutil

# Preferimos Numpy < 2 para evitar incompatibilidades con módulos compilados (p.ej. bottleneck)
base_packages = [
    "numpy<2",
    "pandas",
    "matplotlib",
    "tqdm",
    "mgz",
]
# Asegura binarios compatibles
compat_packages = [
    "bottleneck>=1.3.7",
    "numexpr>=2.8.7",
]
print("Instalando/actualizando:", ", ".join(base_packages + compat_packages))
try:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-U"] + base_packages + compat_packages)
except Exception as e:
    print("Aviso: fallo parcial en instalación:", e)

# Mostrar versiones
try:
    import numpy, pandas, matplotlib
    try:
        import mgz
        mgz_ver = getattr(mgz, "__version__", "?")
    except Exception:
        mgz_ver = "?"
    print("numpy:", getattr(numpy, "__version__", "?"))
    print("pandas:", getattr(pandas, "__version__", "?"))
    print("matplotlib:", getattr(matplotlib, "__version__", "?"))
    print("mgz:", mgz_ver)
    # Opcionales aceleradores que a veces fallan con incompatibilidades
    for mod in ("bottleneck", "numexpr"):
        avail = pkgutil.find_loader(mod) is not None
        print(f"{mod}:", "ok" if avail else "no instalado")
except Exception as e:
    print("Aviso: no se pudieron consultar versiones:", e)

print("Si alguna librería fue instalada/actualizada, reinicia el kernel y ejecuta de nuevo.")


In [None]:
# %% Imports y utilidades
import re
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Optional
import mgz as _mgz

# Librería mgz (pura-Python)
from mgz.model import parse_match  # devuelve objeto Match

WINDOW_SEC = 60  # ventana para APM
_VILLAGER_RE = re.compile(r'villager|aldean', re.IGNORECASE)

def load_match(replay_path: str):
    with open(replay_path, 'rb') as fh:
        try:
            return parse_match(fh)
        except Exception as e:
            print('Error al parsear el replay con mgz.model.parse_match')
            print('mgz __version__ =', getattr(_mgz, '__version__', 'desconocida'))
            print('Sugerencias: actualiza mgz (pip install -U mgz) y/o re-descarga el replay.')
            raise

def is_villager(unit_name: Optional[str]) -> bool:
    if not unit_name:
        return False
    return bool(_VILLAGER_RE.search(unit_name))

def villager_counts(match):
    counts = {p.number: 0 for p in match.players}
    for act in match.actions:
        tname = getattr(getattr(act, 'type', None), 'name', '')
        if 'TRAIN' not in tname and 'CREATE' not in tname:
            continue
        if not _payload_matches(act.payload, _VILLAGER_RE):
            continue
        # if here, we consider it a villager creation event
            pid = act.player.number if act.player else None
            if pid is not None:
                counts[pid] += 1
    return counts

def tc_idle_time(match, base_prod_time: float = 25.0, gap_threshold: float = 27.0):
    """Aproximación: tiempo TC inactivo = (gap entre aldeanos) - base_prod_time
    si el gap supera 'gap_threshold'.
    """
    idle = {p.number: 0.0 for p in match.players}
    last_train = {p.number: None for p in match.players}
    for act in match.actions:
        tname = getattr(getattr(act, 'type', None), 'name', '')
        if tname not in ('TRAIN', 'CREATE'):
            continue
        unit_obj = act.payload.get('unit') or {}
        name = (getattr(unit_obj, 'name', None) or
                getattr(unit_obj, 'unit_name', None) or
                (unit_obj.get('name') if isinstance(unit_obj, dict) else None) or
                act.payload.get('unit_name'))
        if not is_villager(name):
            continue
        pid = act.player.number if act.player else None
        if pid is None:
            continue
        t = act.timestamp.total_seconds()
        if last_train[pid] is not None:
            gap = t - last_train[pid]
            if gap > gap_threshold:
                idle[pid] += max(0.0, gap - base_prod_time)
        last_train[pid] = t
    return idle

def apm_timeseries(match, window_sec: int = WINDOW_SEC):
    rows = [(act.timestamp.total_seconds(), act.player.number)
            for act in match.actions if act.player]
    if not rows:
        return pd.DataFrame()
    df = pd.DataFrame(rows, columns=['t', 'player'])
    max_t = df['t'].max()
    bins = np.arange(0, max_t + window_sec, window_sec)
    apm = {}
    for pid in df['player'].unique():
        counts, _ = np.histogram(df.loc[df['player'] == pid, 't'], bins=bins)
        apm[pid] = counts * 60 / window_sec
    ts = pd.DataFrame(apm, index=bins[:-1])
    ts.index.name = 'time_sec'
    return ts

def plot_apm(ts, match, window_sec: int = WINDOW_SEC):
    if ts.empty:
        print('Sin acciones suficientes para APM.')
        return
    plt.figure(figsize=(10, 6))
    for pid in ts.columns:
        name = next(p.name for p in match.players if p.number == pid)
        plt.plot(ts.index / 60, ts[pid], label=name)
    plt.xlabel('Tiempo (min)')
    plt.ylabel('APM')
    plt.title(f'APM por jugador — ventana {window_sec}s')
    plt.grid(True)
    plt.legend()
    plt.show()

def plot_apm_bar(ts, match):
    if ts.empty:
        print('Sin datos para generar barplot de APM.')
        return
    means = ts.mean()
    stds = ts.std()
    names = [next(p.name for p in match.players if p.number == pid) for pid in means.index]
    x = np.arange(len(names))
    plt.figure(figsize=(6, 5))
    plt.bar(x, means.values, yerr=stds.values, capsize=6)
    plt.xticks(x, names, rotation=45, ha='right')
    plt.ylabel('APM medio')
    plt.title('APM medio ± desviación estándar')
    plt.tight_layout()
    plt.show()


In [None]:
# Utilidad: series de unidades creadas por tiempo
_UNIT_PATTERNS = {
    'Villager': re.compile(r'villager|aldean', re.IGNORECASE),
    'Archer': re.compile(r'archer|arquero', re.IGNORECASE),
    'Skirmisher': re.compile(r'skirm|guerrillero|hostigador', re.IGNORECASE),
    'Militia': re.compile(r'militia|milicia', re.IGNORECASE),
    'Scout': re.compile(r'scout|explorador', re.IGNORECASE),
}

def _extract_unit_name(unit_obj, payload):
    return (getattr(unit_obj, 'name', None) or
            getattr(unit_obj, 'unit_name', None) or
            (unit_obj.get('name') if isinstance(unit_obj, dict) else None) or
            payload.get('unit_name'))

def unit_created_timeseries(match, unit_type: str, window_sec: int = 60):
    pattern = _UNIT_PATTERNS.get(unit_type)
    if pattern is None:
        raise ValueError(f'Unidad no soportada: {unit_type}')
    rows = []
    for act in match.actions:
        tname = getattr(getattr(act, 'type', None), 'name', '')
        if 'TRAIN' not in tname and 'CREATE' not in tname:
            continue
        if not _payload_matches(act.payload, pattern):
            continue
        if not act.player:
            continue
        rows.append((act.timestamp.total_seconds(), act.player.number))
    if not rows:
        return pd.DataFrame()
    df = pd.DataFrame(rows, columns=['t', 'player'])
    max_t = df['t'].max()
    bins = np.arange(0, max_t + window_sec, window_sec)
    out = {}
    for pid in df['player'].unique():
        counts, _ = np.histogram(df.loc[df['player'] == pid, 't'], bins=bins)
        out[pid] = counts
    ts = pd.DataFrame(out, index=bins[:-1])
    ts.index.name = 'time_sec'
    return ts

def plot_units_created_ts(ts, match, unit_type: str, window_sec: int = 60):
    if ts.empty:
        print(f'Sin acciones suficientes para {unit_type}.')
        return
    plt.figure(figsize=(10, 6))
    for pid in ts.columns:
        name = next(p.name for p in match.players if p.number == pid)
        plt.plot(ts.index/60, ts[pid], label=name)
    plt.xlabel('Tiempo (min)')
    plt.ylabel(f'Unidades creadas ({unit_type})')
    plt.title(f'{unit_type} creadas por jugador — ventana {window_sec}s')
    plt.grid(True)
    plt.legend()
    plt.show()


In [None]:
# Verificación rápida de firma
import inspect as _inspect
try:
    print('is_villager signature:', _inspect.signature(is_villager))
except Exception as _e:
    print('is_villager check skipped:', _e)


In [None]:
# Selector de replay: cambia el índice si hay varios archivos
CHOICE_IDX = 0


In [None]:
# %% Selección del replay (Colab o local)
REPLAY_PATH = None
try:
    from google.colab import files  # type: ignore
    print('🔄 Sube un archivo .aoe2record…')
    uploaded = files.upload()
    if uploaded:
        REPLAY_PATH = next(iter(uploaded))
except Exception:
    pass

if REPLAY_PATH is None:
    # Detecta replays locales y permite elegir por índice o dropdown si hay varios
    candidates = []
    from pathlib import Path as _Path
    for base in [_Path('.'), _Path('AOE2_STATPARSER'), _Path('..')]:
        candidates += sorted(base.glob('*.aoe2record'))
    if candidates:
        if len(candidates) == 1:
            REPLAY_PATH = str(candidates[0])
            print(f'Usando replay: {REPLAY_PATH}')
        else:
            # Intenta usar ipywidgets para un selector interactivo
            try:
                import ipywidgets as widgets
                from IPython.display import display
                options = [str(p) for p in candidates]
                dropdown = widgets.Dropdown(options=options, description='Replay:')
                display(dropdown)
                # Usa el valor actual del dropdown; si cambias la selección, vuelve a ejecutar esta celda
                REPLAY_PATH = dropdown.value
                print(f'Usando replay (dropdown): {REPLAY_PATH}')
            except Exception:
                # Fallback a índice si ipywidgets no está disponible
                print('Replays detectados:')
                for i,p in enumerate(candidates):
                    print(f'  [{i}] {p}')
                try:
                    CHOICE_IDX
                except NameError:
                    CHOICE_IDX = 0  # cambia este índice para elegir otro
                if not (0 <= CHOICE_IDX < len(candidates)):
                    raise ValueError(f'CHOICE_IDX fuera de rango (0..{len(candidates)-1})')
                REPLAY_PATH = str(candidates[CHOICE_IDX])
                print(f'Usando replay (índice): {REPLAY_PATH}')
    else:
        raise RuntimeError('No se ha seleccionado replay. Sube un .aoe2record o asigna REPLAY_PATH manualmente.')

match = load_match(REPLAY_PATH)
print(f'Mapa: {match.map.name} — Duración: {match.duration.total_seconds()/60:.1f} min')


In [None]:
# Unidades creadas: selector interactivo
try:
    import ipywidgets as widgets
    from IPython.display import display
    global UNITS_WIDGET_STATE
    try:
        st = UNITS_WIDGET_STATE
        st['unit_dropdown'].unobserve(st['handler_unit'], names='value')
        st['window_dropdown'].unobserve(st['handler_window'], names='value')
    except Exception:
        st = {}
    unit_dropdown = widgets.Dropdown(options=['Villager','Archer','Skirmisher','Militia','Scout'], value='Villager', description='Unidad:')
    window_dropdown = widgets.Dropdown(options=[15,30,45,60,90,120], value=60, description='Ventana (s):')
    out = widgets.Output()
    def handler(change=None):
        with out:
            out.clear_output(wait=True)
            unit = unit_dropdown.value
            w = int(window_dropdown.value)
            ts = unit_created_timeseries(match, unit_type=unit, window_sec=w)
            plot_units_created_ts(ts, match, unit_type=unit, window_sec=w)
    unit_dropdown.observe(handler, names='value')
    window_dropdown.observe(handler, names='value')
    display(widgets.HBox([unit_dropdown, window_dropdown]))
    display(out)
    handler(None)
    UNITS_WIDGET_STATE = {
        'unit_dropdown': unit_dropdown,
        'window_dropdown': window_dropdown,
        'handler_unit': handler,
        'handler_window': handler,
    }
except Exception as _e:
    print('Widgets no disponibles; llama unit_created_timeseries() y plot_units_created_ts() manualmente.')


In [None]:
# Diagnóstico: resumen de tipos de acciones
from collections import Counter
cnt = Counter(getattr(getattr(a, 'type', None), 'name', '') for a in match.actions)
print('Tipos de acción y conteo:')
for k,v in cnt.most_common():
    print(f'  {k}: {v}')
# Muestra algunos ejemplos de TRAIN/CREATE
examples = [a for a in match.actions if getattr(getattr(a, 'type', None), 'name', '') in ('TRAIN','CREATE')][:5]
for i,a in enumerate(examples):
    unit_obj = a.payload.get('unit') or {}
    name = (getattr(unit_obj, 'name', None) or getattr(unit_obj, 'unit_name', None) or (unit_obj.get('name') if isinstance(unit_obj, dict) else None) or a.payload.get('unit_name'))
    print(f'Ejemplo {i}:', getattr(getattr(a, 'type', None), 'name', ''), '|', 'player', getattr(getattr(a, 'player', None), 'number', None), '|', 'name', name)


In [None]:
# Parámetros interactivos (APM)
try:
    import ipywidgets as widgets
    from IPython.display import display
    global APM_WIDGET_STATE
    try:
        st = APM_WIDGET_STATE
        # detach old observers
        st['window_dropdown'].unobserve(st['handler'], names='value')
    except Exception:
        st = {}
    out = widgets.Output()
    window_dropdown = widgets.Dropdown(options=[15,30,45,60,90,120], value=60, description='Ventana (s):')
    def handler(change=None):
        with out:
            out.clear_output(wait=True)
            w = int(window_dropdown.value)
            ts = apm_timeseries(match, window_sec=w)
            plot_apm(ts, match, window_sec=w)
            plot_apm_bar(ts, match)
    window_dropdown.observe(handler, names='value')
    display(widgets.HBox([window_dropdown]))
    display(out)
    handler(None)
    APM_WIDGET_STATE = {'window_dropdown': window_dropdown, 'handler': handler}
except Exception as _e:
    print('Widgets no disponibles; calcula manualmente:')
    print('ts = apm_timeseries(match, window_sec=60)')
    print('plot_apm(ts, match, window_sec=60); plot_apm_bar(ts, match)')


In [None]:
# %% Cálculo de métricas y resumen por jugador
villagers = villager_counts(match)
idles = tc_idle_time(match)
aps = apm_timeseries(match, window_sec=WINDOW_SEC)

rows = []
for p in match.players:
    pid = p.number
    name = p.name
    civ = getattr(p, 'civilization', None)
    apm_mean = float(aps[pid].mean()) if (not aps.empty and pid in aps) else np.nan
    apm_peak = float(aps[pid].max()) if (not aps.empty and pid in aps) else np.nan
    idle_s = float(idles.get(pid, 0.0))
    dur_s = match.duration.total_seconds()
    idle_pct = 100.0 * idle_s / dur_s if dur_s > 0 else np.nan
    rows.append({
        'player': name,
        'civ': civ,
        'villagers_trained': int(villagers.get(pid, 0)),
        'tc_idle_s': round(idle_s, 1),
        'tc_idle_%': round(idle_pct, 1),
        'apm_mean': round(apm_mean, 1) if apm_mean == apm_mean else np.nan,
        'apm_peak': round(apm_peak, 1) if apm_peak == apm_peak else np.nan,
    })

summary = pd.DataFrame(rows).set_index('player')
summary


In [None]:
# %% Visualizaciones APM
plot_apm(aps, match)
plot_apm_bar(aps, match)
