# Advanced Player Analysis

We will conduct a study of six internationally renowned grandmasters, analyzing all their games played over the last six months. The information collected in these notebooks may be useful for any of these players, as it highlights aspects and trends in their play that can help guide game preparation and also reveal personal weaknesses.

### **Part One:** Game Selection and Analysis


<h1> An√°lisis Avanzado de jugadores</h1>
<p> Realizaremos un estudio de seis grandes maestros de reconocido prestigio internacional tomando todas sus partidas durante los seis √∫ltimos meses. La informaci√≥n recopilada en estos cuadernillos puede ser √∫til para cualquiera de estos jugadores, pues mostrar√° aspectos y tendencias en el juego que pueden servir para enfocar la preparaci√≥n de las partidas y tambi√©n para descubrir debilidades propias.</p>

<h3> <strong>Primera parte:</strong> Selecci√≥n y an√°lisis de partidas</h3>


## 0. Carga de librer√≠as necesarias

In [1]:
# =========================
# M√≥dulos est√°ndar de Python
# =========================
import os
import sys
import io
import re
import math
import subprocess
import datetime
import statistics
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Any

# =========================
# Librer√≠as cient√≠ficas y de datos
# =========================
import numpy as np
import pandas as pd
from tqdm import tqdm

# =========================
# Almacenamiento y formatos
# =========================
from pyarrow import fs

# =========================
# Ajedrez y an√°lisis con motor
# =========================
import chess
import chess.pgn
from stockfish import Stockfish

## 1. Selecci√≥n de jugadores y del horizonte temporal

In [2]:
#jugadores_objetivo = {"Carlsen,M"}

jugadores_objetivo = {"Carlsen,M", "Nakamura,Hi", "Lai,Duc Minh", "Sarana,A", "Zhigalko,S","Bortnyk,Olexandr"}

numero_minimo = 1469
numero_maximo = 1625

## 2. Selecci√≥n y guardado de las partidas de los jugadores objetivo

In [None]:
# --- Configuraci√≥n local ---
patron_twic = re.compile(r"twic(\d+)\.pgn", re.IGNORECASE)

# Directorios locales
carpeta_entrada = Path("./PGN")
carpeta_salida = Path("./procesados_1")
carpeta_salida.mkdir(parents=True, exist_ok=True)
archivo_salida = carpeta_salida / "partidas_grupo.pgn"

# Valores por defecto si no existen en el entorno del cuaderno
try:
    numero_minimo
except NameError:
    numero_minimo = float("-inf")
try:
    numero_maximo
except NameError:
    numero_maximo = float("inf")
try:
    jugadores_objetivo
except NameError:
    jugadores_objetivo = set()  # por ejemplo: {"Carlsen, Magnus", "Nakamura, Hikaru"}

# --- Proceso ---
partidas_guardadas = set()
contador = 0

# Listar archivos .pgn locales
archivos_pgn = sorted(carpeta_entrada.glob("*.pgn"))

if not archivos_pgn:
    print(f"No se encontraron .pgn en {carpeta_entrada.resolve()}")

with open(archivo_salida, "w", encoding="utf-8") as salida:
    for ruta in archivos_pgn:
        nombre = ruta.name

        # Filtrado por TWIC n√∫mero si aplica
        match = patron_twic.match(nombre)
        if match:
            numero = int(match.group(1))
            if not (numero_minimo <= numero <= numero_maximo):
                continue

        print(f"Leyendo {nombre} desde {carpeta_entrada}/")

        # Leer el archivo y parsear partidas una a una
        with open(ruta, "rb") as fbin:
            contenido = fbin.read().decode("utf-8", errors="ignore")
        buffer = io.StringIO(contenido)

        while True:
            try:
                partida = chess.pgn.read_game(buffer)
                if partida is None:
                    break
            except Exception as e:
                print(f" Error al leer partida: {e}")
                continue

            # Descartar Chess960
            if partida.headers.get("Variant", "").lower() == "chess960":
                continue

            blanco = partida.headers.get("White", "Unknown")
            negro = partida.headers.get("Black", "Unknown")

            # Mantener solo si est√° alguno de los jugadores objetivo (si el set est√° vac√≠o, no filtra)
            if jugadores_objetivo and (blanco not in jugadores_objetivo and negro not in jugadores_objetivo):
                continue

            # Evitar duplicados por (jugadores ordenados, resultado)
            clave = tuple(sorted([blanco, negro])) + (partida.headers.get("Result", ""),)
            if clave in partidas_guardadas:
                continue

            # Validar jugadas b√°sicas
            try:
                tablero = partida.board()
                for movimiento in partida.mainline_moves():
                    if movimiento not in tablero.legal_moves:
                        raise ValueError(f"Movimiento ilegal: {tablero.san(movimiento)}")
                    tablero.push(movimiento)

                texto_partida = str(partida)
            except Exception as e:
                print(f" Partida inv√°lida {blanco} vs {negro}: {e}")
                continue

            partidas_guardadas.add(clave)
            salida.write(texto_partida + "\n\n")
            contador += 1

print(f"\n{contador} partidas seleccionadas.")
print(f"Archivo guardado en: {archivo_salida.resolve()}")


In [None]:
# =========================
# Configuraci√≥n del usuario
# =========================
try:
    jugadores_objetivo
except NameError:
    jugadores_objetivo = {"Carlsen,M", "Nakamura,Hi", "Lai,Duc Minh", "Sarana,A", "Zhigalko,S","Bortnyk,Olexandr"}

# =========================
# Selecci√≥n de fuentes PGN
# =========================
ruta_procesado = Path("./procesados_1/partidas_grupo.pgn")
if ruta_procesado.exists():
    fuentes = [ruta_procesado]
else:
    carpeta_entrada = Path("./PGN")
    fuentes = sorted(carpeta_entrada.glob("*.pgn"))

    # Si hay rango TWIC definido (numero_minimo / numero_maximo), filtramos por nombre de archivo twicNNNN.pgn
    patron_twic = re.compile(r"twic(\d+)\.pgn", re.IGNORECASE)
    try:
        numero_minimo
    except NameError:
        numero_minimo = float("-inf")
    try:
        numero_maximo
    except NameError:
        numero_maximo = float("inf")

    filt = []
    for p in fuentes:
        m = patron_twic.match(p.name)
        if m:
            n = int(m.group(1))
            if numero_minimo <= n <= numero_maximo:
                filt.append(p)
        else:
            # Si no es TWIC numerado, lo incluimos
            filt.append(p)
    fuentes = filt

if not fuentes:
    raise SystemExit("No se encontraron PGN para procesar. Revisa ./procesados_1/ o ./PGN.")

# =========================
# Contadores
# =========================
stats = {j: {"total": 0, "blancas": 0, "negras": 0} for j in jugadores_objetivo}

# =========================
# Intento con python-chess
# =========================
use_python_chess = True
try:
    import chess.pgn  # type: ignore
except Exception:
    use_python_chess = False

def procesar_con_python_chess(rutas):
    for ruta in rutas:
        contenido = Path(ruta).read_bytes().decode("utf-8", errors="ignore")
        buffer = io.StringIO(contenido)
        while True:
            game = chess.pgn.read_game(buffer)
            if game is None:
                break
            w = game.headers.get("White", "")
            b = game.headers.get("Black", "")
            if w in stats:
                stats[w]["total"] += 1
                stats[w]["blancas"] += 1
            if b in stats:
                stats[b]["total"] += 1
                stats[b]["negras"] += 1

def procesar_con_regex(rutas):
    header_block_re = re.compile(r'(?:\[[^\n]+\]\s*)+', re.MULTILINE)
    tag_re = re.compile(r'^\[(\w+)\s+"(.*?)"\]\s*$', re.MULTILINE)
    for ruta in rutas:
        text = Path(ruta).read_text(encoding="utf-8", errors="ignore")
        for m in header_block_re.finditer(text):
            headers = dict(tag_re.findall(m.group(0)))
            w = headers.get("White", "")
            b = headers.get("Black", "")
            if w in stats:
                stats[w]["total"] += 1
                stats[w]["blancas"] += 1
            if b in stats:
                stats[b]["total"] += 1
                stats[b]["negras"] += 1

if use_python_chess:
    procesar_con_python_chess(fuentes)
else:
    procesar_con_regex(fuentes)

# =========================
# Mostrar resultados (tabla)
# =========================
anchon = max((len(j) for j in jugadores_objetivo), default=6)
encabezado = f"{'Jugador objetivo'.ljust(anchon)}  {'Total':>6}  {'Blancas':>7}  {'Negras':>6}"
print(encabezado)
print("-" * len(encabezado))
for jugador, c in sorted(stats.items(), key=lambda kv: (-kv[1]["total"], kv[0])):
    print(f"{jugador.ljust(anchon)}  {c['total']:>6}  {c['blancas']:>7}  {c['negras']:>6}")


In [None]:
## SIN Partidas duplicadas 
pgn_path = Path("./procesados_1/partidas_grupo.pgn")

hashes = set()
duplicadas = 0
total = 0

with open(pgn_path, "r", encoding="utf-8", errors="ignore") as f:
    while True:
        game = chess.pgn.read_game(f)
        if game is None:
            break
        total += 1
        h = hash(str(game).strip())
        if h in hashes:
            duplicadas += 1
        else:
            hashes.add(h)

print(f"Partidas totales: {total}")
print(f"Duplicadas: {duplicadas}")
print(f"√önicas: {total - duplicadas}")


## 3. Divisi√≥n del archivo de partidas en lotes

In [None]:
def dividir_pgn_en_lotes(pgn_path, partidas_por_lote, carpeta_salida="lotes_pgn"):
    """
    Divide un PGN grande en varios archivos m√°s peque√±os con N partidas cada uno.
    - pgn_path: ruta al archivo PGN de entrada
    - partidas_por_lote: n√∫mero de partidas por archivo de salida (entero > 0)
    - carpeta_salida: carpeta donde se guardar√°n los lotes
    Retorna: lista de rutas de los archivos generados (str)
    """
    # Validaciones
    pgn_path = Path(pgn_path)
    if not pgn_path.exists():
        raise FileNotFoundError(f"No existe el archivo: {pgn_path}")
    if not isinstance(partidas_por_lote, int) or partidas_por_lote <= 0:
        raise ValueError("partidas_por_lote debe ser un entero > 0")

    carpeta_salida = Path(carpeta_salida)
    carpeta_salida.mkdir(parents=True, exist_ok=True)
    base_nombre = pgn_path.stem  # sin extensi√≥n

    archivos_creados = []
    contador_partidas = 0
    contador_lote = 0
    archivo_salida = None

    try:
        with pgn_path.open("r", encoding="utf-8", errors="ignore") as archivo_pgn:
            while True:
                partida = chess.pgn.read_game(archivo_pgn)
                if partida is None:
                    break

                # Abrir nuevo lote si toca
                if contador_partidas % partidas_por_lote == 0:
                    if archivo_salida:
                        archivo_salida.close()
                    contador_lote += 1
                    nombre_archivo_lote = f"{base_nombre}_{contador_lote:03}.pgn"
                    ruta_lote = carpeta_salida / nombre_archivo_lote
                    archivo_salida = ruta_lote.open("w", encoding="utf-8")
                    archivos_creados.append(str(ruta_lote))

                # Escribir partida
                archivo_salida.write(str(partida) + "\n\n")
                contador_partidas += 1
    finally:
        if archivo_salida:
            archivo_salida.close()

    print(f"Se dividi√≥ '{pgn_path}' en {contador_lote} archivo(s) en '{carpeta_salida}'.")
    print(f"Partidas totales procesadas: {contador_partidas}")
    return archivos_creados


In [None]:
# Por ejemplo, si tu archivo final est√° en:
ruta = "./procesados_1/partidas_grupo.pgn"

# Crear lotes de 150 partidas cada uno:
lotes = dividir_pgn_en_lotes(ruta, partidas_por_lote=500, carpeta_salida="lotes_pgn")


## 4. An√°lisis de las partidas

In [None]:
# Analizar lotes PGN ‚Üí DataFrame parquet (unificado)
# - PGN-only siempre (sin motor)
# - Opcional: evals/precisi√≥n con Stockfish si se pasa engine_path
# - Columnas "rival" y "elo_rival" SIEMPRE
# - Barra de progreso y sin duplicados (Windows)
#
# Requisitos m√≠nimos:
#   pip install python-chess pandas pyarrow tqdm
#   (opcional motor) pip install stockfish


# tqdm adaptable (terminal / notebook)
try:
    from tqdm.auto import tqdm
except Exception:
    def tqdm(iterable=None, total=None, desc=None, leave=True, **kwargs):
        return iterable if iterable is not None else range(total or 0)

# -------------------------
# Windows: pol√≠tica asyncio
# -------------------------
if sys.platform.startswith("win"):
    try:
        import asyncio
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    except Exception:
        pass

# ----------------------------
# Utilidades comunes
# ----------------------------
CENTRAL = {chess.D4, chess.E4, chess.D5, chess.E5}

def _eco_from_headers(headers: chess.pgn.Headers) -> Optional[str]:
    return headers.get("ECO") or None

def _get_fide_id(headers: chess.pgn.Headers, white: bool) -> Optional[str]:
    return headers.get("WhiteFideId") if white else headers.get("BlackFideId")

def _get_player_name(headers: chess.pgn.Headers, white: bool) -> Optional[str]:
    return headers.get("White") if white else headers.get("Black")

def _get_player_elo(headers: chess.pgn.Headers, white: bool) -> Optional[float]:
    key = "WhiteElo" if white else "BlackElo"
    val = headers.get(key)
    try:
        return float(val) if val and val != "?" else None
    except Exception:
        return None

def _result_for(headers: chess.pgn.Headers, white: bool) -> Optional[float]:
    res = headers.get("Result")
    if res == "1-0":
        return 1.0 if white else 0.0
    if res == "0-1":
        return 0.0 if white else 1.0
    if res in ("1/2-1/2", "1/2-¬Ω", "¬Ω-¬Ω"):
        return 0.5
    return None

def _parse_date(headers: chess.pgn.Headers) -> Optional[pd.Timestamp]:
    raw = headers.get("Date")
    if not raw:
        return None
    parts = raw.split(".")
    if len(parts) != 3:
        return None
    y, m, d = parts
    y = "2000" if y in ("????", "0000") else y
    m = "01" if m in ("??", "00") else m
    d = "01" if d in ("??", "00") else d
    try:
        return pd.to_datetime(f"{y}-{m}-{d}", errors="coerce")
    except Exception:
        return None

def _list_pgn_files(carpeta_pgn: str) -> List[str]:
    p = Path(carpeta_pgn)
    if not p.exists():
        return []
    return sorted(str(f.resolve()) for f in p.iterdir() if f.is_file() and f.suffix.lower() == ".pgn")

def _contar_partidas(carpeta_pgn: str) -> int:
    total = 0
    for pgn_path in _list_pgn_files(carpeta_pgn):
        with open(pgn_path, "r", encoding="utf-8", errors="ignore") as f:
            while True:
                g = chess.pgn.read_game(f)
                if g is None:
                    break
                total += 1
    return total

# -------------------------------------------------
# Configuraci√≥n de mapeo de MATE (en centipawns)
# -------------------------------------------------
# M√°ximo absoluto para un mate (15 peones = 1500 cp)
MATE_CP_CAP   = 1500
# Cu√°nto decrece por cada ply de distancia al mate (ajustable)
MATE_CP_STEP  = 100
# Valor m√≠nimo que asignaremos a un mate lejano (8 peones = 800 cp)
MATE_CP_FLOOR = 800

# ----------------------------
# Motor opcional: Stockfish
# ----------------------------
class _SF:
    """Wrapper sencillo sobre 'stockfish' (sin asyncio). Se crea s√≥lo si engine_path != None."""
    def __init__(self, engine_path: str, depth: int = 14, threads: int = 4, hash_mb: int = 256):
        try:
            from stockfish import Stockfish
        except Exception as e:
            raise RuntimeError("Falta la librer√≠a 'stockfish'. Instala con: pip install stockfish") from e
        self.depth = depth
        self.sf = Stockfish(path=engine_path, parameters={"Threads": threads, "Hash": hash_mb})
        try:
            self.sf.set_depth(depth)
        except Exception:
            pass

    def eval_white_pov_cp(self, fen: str) -> Optional[float]:
        """Eval√∫a en centipawns desde el POV de BLANCAS (positivo = blancas mejor)."""
        self.sf.set_fen_position(fen)
        try:
            ev = self.sf.get_evaluation()
        except TypeError:
            ev = self.sf.get_evaluation(depth=self.depth)
        except Exception:
            return None

        tp = ev.get("type")
        val = ev.get("value")  # en 'mate' es distancia a mate con signo

        if val is None:
            return None

        if tp == "mate":
            # m > 0 = mate a favor de BLANCAS; m < 0 = mate a favor de NEGRAS
            m = int(val)
            sign = 1 if m > 0 else (-1 if m < 0 else 1)
            d = abs(m)  # distancia a mate en plies

            # Asignamos un valor en cp acotado por distancia: cerca del mate -> ~MATE_CP_CAP
            cp = MATE_CP_CAP - MATE_CP_STEP * max(d - 1, 0)
            cp = max(MATE_CP_FLOOR, min(MATE_CP_CAP, cp))
            return float(sign * cp)

        # tipo 'cp' normal
        try:
            return float(val)
        except Exception:
            return None

    def best_move_uci(self, board: chess.Board) -> Optional[str]:
        self.sf.set_fen_position(board.fen())
        try:
            bm = self.sf.get_best_move()
        except TypeError:
            try:
                bm = self.sf.get_best_move_time(50)
            except Exception:
                bm = None
        except Exception:
            bm = None
        return bm

def _evals_jugador_from_all(evals_all: List[float], color: str) -> List[float]:
    """
    'evals_all' es la secuencia de evaluaciones tras cada jugada (en peones)
    vista desde el lado que acaba de mover en cada paso (W,B,W,B,...).
    Devuelve las variaciones por jugada del jugador:
      - W: [evals[0], evals[2]-evals[1], evals[4]-evals[3], ...]
      - B: [evals[1]-evals[0], evals[3]-evals[2], ...]
    """
    if not evals_all:
        return []
    out: List[float] = []
    if color == "W":
        out.append(round(evals_all[0], 2))
        for i in range(1, len(evals_all), 2):
            j = i + 1
            if j < len(evals_all):
                out.append(round(evals_all[j] - evals_all[i], 2))
    else:  # B
        for i in range(0, len(evals_all) - 1, 2):
            out.append(round(evals_all[i + 1] - evals_all[i], 2))
    return out

# ----------------------------
# Utilidades para errores y precisi√≥n %
# ----------------------------
def _count_range(x: float, lo: float, hi: float, lo_incl: bool, hi_incl: bool) -> bool:
    """Devuelve True si x est√° en (lo, hi) con inclusiones seg√∫n banderas."""
    if pd.isna(x):
        return False
    left  = (x >= lo) if lo_incl else (x > lo)
    right = (x <= hi) if hi_incl else (x < hi)
    return left and right

def _contar_segmento(vals: List[float], start0: int, end0: Optional[int], pred) -> int:
    """Cuenta en vals[start0:end0] (√≠ndices 0-based, end0 exclusivo; None => hasta el final)."""
    if vals is None:
        return 0
    sl = vals[start0:(end0 if end0 is not None else None)]
    return sum(1 for v in sl if pred(v))

def _metricas_errores(evals_jugador: List[float]) -> Dict[str, Any]:
    """
    - leves: (-1, -0.5]     (incluye -0.5, excluye -1)
    - errores: (-3, -1]     (incluye -1, excluye -3)
    - graves: (-inf, -3]    (‚â§ -3)
    Fases: 1‚Äì15 (0‚Äì14), 16‚Äì40 (15‚Äì39), 41‚Äìfin (‚â•40). Si no hay 41, final = <NA>.
    """
    is_leve  = lambda v: _count_range(v, -1.0, -0.5, lo_incl=False, hi_incl=True)
    is_error = lambda v: _count_range(v, -3.0, -1.0, lo_incl=False, hi_incl=True)
    is_grave = lambda v: (not pd.isna(v)) and (v <= -3.0)

    # Apertura (0..14)
    leves_ap = _contar_segmento(evals_jugador, 0, 15, is_leve)
    errs_ap  = _contar_segmento(evals_jugador, 0, 15, is_error)
    grav_ap  = _contar_segmento(evals_jugador, 0, 15, is_grave)

    # Mediojuego (15..39)
    leves_mj = _contar_segmento(evals_jugador, 15, 40, is_leve)
    errs_mj  = _contar_segmento(evals_jugador, 15, 40, is_error)
    grav_mj  = _contar_segmento(evals_jugador, 15, 40, is_grave)

    # Final (desde 40)
    if len(evals_jugador) >= 41:  # existe posici√≥n 41 (√≠ndice 40)
        leves_fn = _contar_segmento(evals_jugador, 40, None, is_leve)
        errs_fn  = _contar_segmento(evals_jugador, 40, None, is_error)
        grav_fn  = _contar_segmento(evals_jugador, 40, None, is_grave)
    else:
        leves_fn = pd.NA
        errs_fn  = pd.NA
        grav_fn  = pd.NA

    def _zero_if_na(x):
        return 0 if pd.isna(x) else int(x)

    leves_tot = int(leves_ap + leves_mj + _zero_if_na(leves_fn))
    errs_tot  = int(errs_ap  + errs_mj  + _zero_if_na(errs_fn))
    grav_tot  = int(grav_ap  + grav_mj  + _zero_if_na(grav_fn))

    return {
        "errores_leves_apertura": leves_ap,
        "errores_leves_mediojuego": leves_mj,
        "errores_leves_final": leves_fn,
        "errores_leves": leves_tot,
        "errores_apertura": errs_ap,
        "errores_mediojuego": errs_mj,
        "errores_final": errs_fn,
        "errores": errs_tot,
        "errores_graves_apertura": grav_ap,
        "errores_graves_mediojuego": grav_mj,
        "errores_graves_final": grav_fn,
        "errores_graves": grav_tot,
    }

# ---- Precisi√≥n porcentual (100 * exp(-k * p√©rdida)) con k=0.5 ----
_K_PREC = 0.5

def _to_precision_percent(loss: Optional[float]) -> Optional[float]:
    if loss is None or pd.isna(loss):
        return None
    return 100.0 * math.exp(-_K_PREC * float(loss))

def _precision_media(losses: Optional[List[Optional[float]]]) -> Optional[float]:
    if not losses:
        return None
    vals = [_to_precision_percent(x) for x in losses if x is not None and not pd.isna(x)]
    if not vals:
        return None
    return round(float(np.mean(vals)), 2)

def _precision_media_segmento(
    losses: Optional[List[Optional[float]]],
    start0: int,
    end0_exclusive: Optional[int],
    na_for_empty: bool = False
) -> Any:
    if not losses:
        return (pd.NA if na_for_empty else None)
    seg = losses[start0:(end0_exclusive if end0_exclusive is not None else None)]
    vals = [_to_precision_percent(x) for x in seg if x is not None and not pd.isna(x)]
    if not vals:
        return (pd.NA if na_for_empty else None)
    return round(float(np.mean(vals)), 2)

# ---- NUEVAS utilidades: medias de evals ----
def _mean_list(vals: Optional[List[Optional[float]]]) -> Optional[float]:
    """Media redondeada a 2 decimales. Ignora None/NaN. Lista vac√≠a -> None."""
    if not vals:
        return None
    x = [v for v in vals if v is not None and not pd.isna(v)]
    if not x:
        return None
    return round(float(np.mean(x)), 2)

def _mean_segment(vals: Optional[List[Optional[float]]],
                  start0: int,
                  end0_exclusive: Optional[int],
                  na_for_empty: bool = False) -> Any:
    """Media redondeada de un segmento 0-based (end exclusivo)."""
    if not vals:
        return (pd.NA if na_for_empty else None)
    seg = vals[start0:(end0_exclusive if end0_exclusive is not None else None)]
    x = [v for v in seg if v is not None and not pd.isna(v)]
    if not x:
        return (pd.NA if na_for_empty else None)
    return round(float(np.mean(x)), 2)

# -----------------------------------------------
# N√∫cleo unificado
# -----------------------------------------------
def analizar_pgn_unificado(
    carpeta_pgn: str = "./lotes_pgn",
    carpeta_salida: str = "./analizadas_grupo",
    archivo_salida: str = "analisis_global.parquet",
    *,
    engine_path: Optional[str] = None,   # si se pasa, se calculan evals/precisi√≥n
    depth: int = 14,
    threads: int = 4,
    hash_mb: int = 256,
    calcular_precision: bool = True,
    mostrar_progreso: bool = True,
) -> pd.DataFrame:
    Path(carpeta_salida).mkdir(parents=True, exist_ok=True)

    sf: Optional[_SF] = None
    if engine_path:
        sf = _SF(engine_path=engine_path, depth=depth, threads=threads, hash_mb=hash_mb)

    rows: List[Dict[str, Any]] = []
    total_partidas = _contar_partidas(carpeta_pgn) if mostrar_progreso else None
    pbar_partidas = tqdm(total=total_partidas, desc="Analizando partidas", leave=True) if mostrar_progreso else None

    for pgn_path in _list_pgn_files(carpeta_pgn):
        with open(pgn_path, "r", encoding="utf-8", errors="ignore") as f:
            while True:
                game = chess.pgn.read_game(f)
                if game is None:
                    break

                headers = game.headers
                eco_code = _eco_from_headers(headers)
                pgn_text = str(game)

                white_name = _get_player_name(headers, True)
                black_name = _get_player_name(headers, False)
                white_elo  = _get_player_elo(headers, True)
                black_elo  = _get_player_elo(headers, False)

                # Base por color (con elo_rival SIEMPRE)
                base = {
                    True: {  # Blancas
                        "jugador": white_name, "rival": black_name, "color": "W",
                        "fide_id": _get_fide_id(headers, True),
                        "elo": white_elo, "elo_rival": black_elo,
                        "resultados": _result_for(headers, True),
                        "evento": headers.get("Event"), "lugar": headers.get("Site"),
                        "fechas": _parse_date(headers), "cod_eco": eco_code,
                        "movimientos_total": 0, "mov_peones": 0,
                        "mov_centrales": 0, "intercambio_piezas": 0, "enroque": 0,
                    },
                    False: {  # Negras
                        "jugador": black_name, "rival": white_name, "color": "B",
                        "fide_id": _get_fide_id(headers, False),
                        "elo": black_elo, "elo_rival": white_elo,
                        "resultados": _result_for(headers, False),
                        "evento": headers.get("Event"), "lugar": headers.get("Site"),
                        "fechas": _parse_date(headers), "cod_eco": eco_code,
                        "movimientos_total": 0, "mov_peones": 0,
                        "mov_centrales": 0, "intercambio_piezas": 0, "enroque": 0,
                    },
                }

                # SAN principal
                san_moves: List[str] = []
                node = game
                while node.variations:
                    mv = node.variation(0).move
                    san_moves.append(node.board().san(mv))
                    node = node.variation(0)

                # M√©tricas por jugadas (sin motor) + opcional motor
                board = game.board()
                evals_all_W: List[float] = []
                evals_all_B: List[float] = []
                precision_W: List[Optional[float]] = []
                precision_B: List[Optional[float]] = []

                mainline_moves = list(game.mainline_moves())
                pbar_movs = tqdm(total=len(mainline_moves),
                                 desc=f"{white_name} vs {black_name}",
                                 leave=False) if (mostrar_progreso and mainline_moves) else None

                for move in mainline_moves:
                    mover_es_blancas = board.turn
                    fullmove = board.fullmove_number

                    s = base[mover_es_blancas]
                    s["movimientos_total"] += 1

                    piece_from = board.piece_at(move.from_square)
                    if piece_from and piece_from.piece_type == chess.PAWN:
                        s["mov_peones"] += 1
                    if move.to_square in CENTRAL:
                        s["mov_centrales"] += 1
                    if board.is_capture(move):
                        s["intercambio_piezas"] += 1
                    if board.is_castling(move) and s["enroque"] == 0:
                        s["enroque"] = fullmove

                    # Mejor jugada antes del push (para precisi√≥n)
                    best_val_for_player_peones = None
                    if sf is not None and calcular_precision:
                        best_uci = sf.best_move_uci(board)
                        if best_uci:
                            try:
                                best_board = board.copy(stack=False)
                                best_board.push(chess.Move.from_uci(best_uci))
                                eW_best_cp = sf.eval_white_pov_cp(best_board.fen())
                                if eW_best_cp is not None:
                                    if mover_es_blancas:
                                        best_val_for_player_peones = round(eW_best_cp / 100.0, 2)
                                    else:
                                        best_val_for_player_peones = round((-eW_best_cp) / 100.0, 2)
                            except Exception:
                                best_val_for_player_peones = None

                    # Jugada real
                    board.push(move)

                    # Evaluaciones tras la jugada
                    if sf is not None:
                        eW_cp = sf.eval_white_pov_cp(board.fen())
                        if eW_cp is not None:
                            # Convertir a peones y recortar a ¬±15 para evitar outliers residuales
                            eW_p = round(np.clip(eW_cp / 100.0, -15.0, 15.0), 2)
                            eB_p = round(-eW_p, 2)
                            evals_all_W.append(eW_p)
                            evals_all_B.append(eB_p)

                            if calcular_precision and best_val_for_player_peones is not None:
                                actual_val = eW_p if mover_es_blancas else eB_p
                                loss = max(0.0, round(best_val_for_player_peones - actual_val, 3))
                                if mover_es_blancas:
                                    precision_W.append(loss)
                                else:
                                    precision_B.append(loss)
                            elif calcular_precision:
                                if mover_es_blancas:
                                    precision_W.append(None)
                                else:
                                    precision_B.append(None)

                    if pbar_movs is not None:
                        pbar_movs.update(1)

                if pbar_movs is not None:
                    pbar_movs.close()

                # Filas por color
                for color_bool in (True, False):
                    row = {
                        "jugador": base[color_bool]["jugador"],
                        "rival": base[color_bool]["rival"],
                        "color": base[color_bool]["color"],
                        "fide_id": base[color_bool]["fide_id"],
                        "elo": base[color_bool]["elo"],
                        "elo_rival": base[color_bool]["elo_rival"],
                        "evento": base[color_bool]["evento"],
                        "lugar": base[color_bool]["lugar"],
                        "fechas": base[color_bool]["fechas"],
                        "cod_eco": base[color_bool]["cod_eco"],
                        "resultados": base[color_bool]["resultados"],
                        "movimientos_total": base[color_bool]["movimientos_total"],
                        "mov_peones": base[color_bool]["mov_peones"],
                        "mov_centrales": base[color_bool]["mov_centrales"],
                        "intercambio_piezas": base[color_bool]["intercambio_piezas"],
                        "enroque": base[color_bool]["enroque"],
                        "pgn": pgn_text,
                        "san": san_moves,
                    }

                    if sf is not None:
                        if color_bool:  # W
                            evals_all = evals_all_W
                            precision_losses = precision_W if calcular_precision else None
                        else:         # B
                            evals_all = evals_all_B
                            precision_losses = precision_B if calcular_precision else None

                        row["evals_all"] = evals_all
                        row["evals_jugador"] = _evals_jugador_from_all(evals_all, row["color"])
                        row["precision_jugador"] = precision_losses

                        # M√©tricas de errores
                        met = _metricas_errores(row["evals_jugador"])
                        row.update(met)

                        # ---- Precisi√≥n porcentual ----
                        row["precision"] = _precision_media(precision_losses)
                        row["precision_apertura"]    = _precision_media_segmento(precision_losses, 0, 15, na_for_empty=False)
                        row["precision_mediojuego"]  = _precision_media_segmento(precision_losses, 15, 40, na_for_empty=False)
                        row["precision_final"]       = _precision_media_segmento(precision_losses, 40, None, na_for_empty=True)

                        # ---- Medias de evaluaci√≥n ----
                        # eval          = media de TODOS los datos de evals_all (en peones, ya recortados ¬±20)
                        # eval_apertura = media de √≠ndices 0..29  (plies)
                        # eval_medio    = media de 30..79
                        # eval_final    = media desde 80+
                        row["eval"]            = _mean_list(evals_all)
                        row["eval_apertura"]   = _mean_segment(evals_all, 0, 30, na_for_empty=False)
                        row["eval_mediojuego"] = _mean_segment(evals_all, 30, 80, na_for_empty=False)
                        row["eval_final"]      = _mean_segment(evals_all, 80, None, na_for_empty=True)
                    else:
                        # Si no hay motor, a√±adimos columnas con None / <NA> coherentes
                        row["precision"] = None
                        row["precision_apertura"] = None
                        row["precision_mediojuego"] = None
                        row["precision_final"] = pd.NA

                        row["eval"] = None
                        row["eval_apertura"] = None
                        row["eval_mediojuego"] = None
                        row["eval_final"] = pd.NA

                    rows.append(row)

                if pbar_partidas is not None:
                    pbar_partidas.update(1)

    if pbar_partidas is not None:
        pbar_partidas.close()

    df = pd.DataFrame(rows)

    # Orden de columnas
    base_cols = [
        "jugador","rival","color","fide_id","elo","elo_rival",
        "evento","lugar","fechas","cod_eco","resultados",
        "movimientos_total","mov_peones","mov_centrales","intercambio_piezas","enroque",
        "pgn","san",
    ]
    motor_cols = [c for c in ["evals_all","evals_jugador","precision_jugador"] if c in df.columns]
    errores_cols = [
        "errores_leves_apertura","errores_leves_mediojuego","errores_leves_final","errores_leves",
        "errores_apertura","errores_mediojuego","errores_final","errores",
        "errores_graves_apertura","errores_graves_mediojuego","errores_graves_final","errores_graves",
    ]
    errores_cols = [c for c in errores_cols if c in df.columns]

    # Precisi√≥n porcentual
    prec_cols = ["precision", "precision_apertura", "precision_mediojuego", "precision_final"]

    # NUEVAS columnas de medias de evaluaci√≥n
    eval_cols = ["eval", "eval_apertura", "eval_mediojuego", "eval_final"]

    ordered_cols = base_cols + motor_cols + errores_cols + prec_cols + eval_cols
    df = df.reindex(columns=[c for c in ordered_cols if c in df.columns])

    # Tipado Int64 (enteros anulables) para errores_* que pueden llevar <NA>
    for c in errores_cols:
        try:
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int64")
        except Exception:
            pass

    # Desduplicado por partida y color
    df = df.drop_duplicates(subset=["pgn", "color"], keep="first").reset_index(drop=True)

    # Guardado
    out_path = Path(carpeta_salida) / archivo_salida
    df.to_parquet(out_path, engine="pyarrow", index=False)
    print(f"Guardado parquet en: {out_path.resolve()}")

    return df


## 5. Llamada a la funci√≥n de an√°lisis y guardado

In [None]:
import os
import shutil
import tempfile
from pathlib import Path
import pandas as pd  # para validar el parquet leyendo el esquema

# === Directorios locales ===
carpeta_lotes_local = Path("./lotes_pgn")             # aqu√≠ est√°n los .pgn
carpeta_analizados_local = Path("./analizadas_grupo") # aqu√≠ dejaremos los .parquet resultantes
carpeta_analizados_local.mkdir(parents=True, exist_ok=True)

# Motor (ajusta la ruta a tu ejecutable)
engine_path = str(Path("./motor_ejecutable/stockfish.exe").resolve())

# Comprobar que existen lotes
archivos_lote = sorted(carpeta_lotes_local.glob("*.pgn"))
if not archivos_lote:
    raise SystemExit(f"No hay lotes .pgn en {carpeta_lotes_local.resolve()}")

def parquet_valido(ruta_parquet: Path) -> bool:
    """Devuelve True si el parquet existe y se puede leer (al menos el esquema)."""
    if not ruta_parquet.exists() or ruta_parquet.stat().st_size == 0:
        return False
    try:
        _ = pd.read_parquet(ruta_parquet, engine="pyarrow")
        return True
    except Exception:
        return False

total = len(archivos_lote)
print(f"üîç Archivos a procesar: {total}")

procesados = 0
saltados = 0
errores = 0

for idx, ruta_local in enumerate(archivos_lote, start=1):
    nombre_archivo = ruta_local.name
    stem = ruta_local.stem  # p.ej. "partidas_grupo_004"
    # ‚úÖ Nombra el parquet por el archivo origen (evita ambig√ºedades al reanudar)
    archivo_salida = f"analisis_{stem}.parquet"
    destino = carpeta_analizados_local / archivo_salida

    # Si ya existe y es v√°lido, saltamos
    if parquet_valido(destino):
        print(f"‚è© {nombre_archivo} ya analizado (parquet: {destino.name}), se omite ({idx}/{total})")
        saltados += 1
        continue
    elif destino.exists():
        print(f"‚ö† Parquet existente pero no v√°lido: {destino.name}. Se volver√° a generar.")

    print(f"‚öôÔ∏è  Analizando {nombre_archivo}  ({idx}/{total}) ...")

    # --- Carpeta temporal por archivo ---
    with tempfile.TemporaryDirectory(prefix="pgn_tmp_") as tmpdir:
        tmpdir_path = Path(tmpdir)
        ruta_tmp = tmpdir_path / nombre_archivo
        shutil.copy2(ruta_local, ruta_tmp)

        try:
            # Llamada principal
            _ = analizar_pgn_unificado(
                carpeta_pgn=str(tmpdir_path),
                carpeta_salida=str(carpeta_analizados_local),
                archivo_salida=archivo_salida,
                engine_path=engine_path,     # quita este arg si no quieres motor
                depth=12, threads=6, hash_mb=512,
                calcular_precision=True,
                mostrar_progreso=True,
            )

            # Validar y borrar fuente si todo OK
            if parquet_valido(destino):
                print(f"‚úî Guardado correctamente: {destino.name}")
                try:
                    ruta_local.unlink()  # ‚úÖ borrar PGN origen al finalizar OK
                    print(f"üóë Borrado {ruta_local.name} tras analizar.\n")
                except Exception as e_del:
                    print(f"‚ö† No se pudo borrar {ruta_local.name}: {e_del}\n")
                procesados += 1
            else:
                print(f"‚ùå El parquet {destino.name} no es v√°lido tras el an√°lisis. No se borra el PGN.\n")
                errores += 1

        except Exception as e:
            print(f"‚ùå Error al analizar {nombre_archivo}: {e}\n")
            errores += 1

print(f"\nResumen: ‚úÖ procesados={procesados}  ‚è≠ saltados={saltados}  ‚ùå errores={errores}")
