In [None]:
import pandas as pd
import chess
import io
from IPython.display import SVG
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
dataset_pgn = "../dataset/Lichess_2013_2014_FEN.csv"
dataset_complete = "../dataset/Lichess_2013_2014_Complete.csv"


In [None]:

chunk_size = 100_000
num_chunks = 1
resultados = []

for i, chunk in enumerate(pd.read_csv(dataset_pgn, chunksize=chunk_size)):
    if i >= num_chunks:
        break
    resultados.append(chunk)

df = pd.concat(resultados, ignore_index=True)
print(df.shape)


(100000, 2)


In [6]:
data = df.iloc[:100, [0]]

In [9]:
import chess
import chess.pgn
import pandas as pd

# ---------- Métricas de estructura de peones ----------

def pawn_metrics_for_color(board, color):
    """
    Calcula métricas de peones para un color en una posición dada.
    color: chess.WHITE o chess.BLACK
    """
    pawns = board.pieces(chess.PAWN, color)
    opp_pawns = board.pieces(chess.PAWN, not color)

    files = [chess.square_file(sq) for sq in pawns]
    unique_files = sorted(set(files))

    # 1) Número de peones
    num_pawns = len(pawns)

    # 2) Islas de peones (archivos contiguos con peones)
    pawn_islands = 0
    if unique_files:
        pawn_islands = 1
        prev_file = unique_files[0]
        for f in unique_files[1:]:
            if f > prev_file + 1:
                pawn_islands += 1
            prev_file = f

    # 3) Peones doblados
    from collections import Counter
    file_counts = Counter(files)
    doubled_files = sum(1 for c in file_counts.values() if c > 1)
    doubled_pawns = sum(c - 1 for c in file_counts.values() if c > 1)

    # 4) Peones aislados
    isolated_pawns = 0
    for sq in pawns:
        f = chess.square_file(sq)
        has_left = any(chess.square_file(p) == f - 1 for p in pawns) if f > 0 else False
        has_right = any(chess.square_file(p) == f + 1 for p in pawns) if f < 7 else False
        if not has_left and not has_right:
            isolated_pawns += 1

    # 5) Peones pasados
    passed_pawns = 0
    for sq in pawns:
        f = chess.square_file(sq)
        r = chess.square_rank(sq)

        # Archivos a revisar (mismo y adyacentes)
        candidate_files = [f]
        if f > 0:
            candidate_files.append(f - 1)
        if f < 7:
            candidate_files.append(f + 1)

        if color == chess.WHITE:
            # Peones negros delante (rango mayor)
            has_blocking_enemy = False
            for opp_sq in opp_pawns:
                of = chess.square_file(opp_sq)
                orank = chess.square_rank(opp_sq)
                if of in candidate_files and orank > r:
                    has_blocking_enemy = True
                    break
            if not has_blocking_enemy:
                passed_pawns += 1
        else:
            # Peones blancos delante (rango menor)
            has_blocking_enemy = False
            for opp_sq in opp_pawns:
                of = chess.square_file(opp_sq)
                orank = chess.square_rank(opp_sq)
                if of in candidate_files and orank < r:
                    has_blocking_enemy = True
                    break
            if not has_blocking_enemy:
                passed_pawns += 1

    # 6) Peones avanzados
    advanced_pawns = 0
    for sq in pawns:
        r = chess.square_rank(sq)
        if color == chess.WHITE:
            if r >= 4:  # 4 = 5ª fila (0-indexed: 0→1ª, 4→5ª)
                advanced_pawns += 1
        else:
            if r <= 3:  # 3 = 4ª fila para negras
                advanced_pawns += 1

    return {
        "num_pawns": num_pawns,
        "pawn_islands": pawn_islands,
        "doubled_files": doubled_files,
        "doubled_pawns": doubled_pawns,
        "isolated_pawns": isolated_pawns,
        "passed_pawns": passed_pawns,
        "advanced_pawns": advanced_pawns,
    }


def pawn_metrics_position(board):
    """
    Métricas combinadas (blancas y negras) para una posición.
    """
    white = pawn_metrics_for_color(board, chess.WHITE)
    black = pawn_metrics_for_color(board, chess.BLACK)

    metrics = {}
    for k, v in white.items():
        metrics[f"white_{k}"] = v
    for k, v in black.items():
        metrics[f"black_{k}"] = v
    return metrics


# ---------- Procesar archivo PGN completo ----------

def analyze_pgn_pawn_structures(pgn_path, max_games=100):
    """
    Lee hasta max_games partidas de un archivo PGN
    y calcula métricas de peones en la posición final de cada partida.
    """
    rows = []
    with open(pgn_path, encoding="utf-8") as f:
        game_index = 0
        while game_index < max_games:
            game = chess.pgn.read_game(f)
            if game is None:
                break  # fin de archivo

            game_index += 1
            board = game.board()

            # Recorremos toda la partida hasta la última posición
            for move in game.mainline_moves():
                board.push(move)

            # Puedes cambiar esto si quieres medir en otro momento (ej. después de 20 jugadas)

            metrics = pawn_metrics_position(board)

            # Opcional: extraer metadata de la partida (resultado, duración aproximada, etc.)
            result = game.headers.get("Result", "?")
            white_elo = game.headers.get("WhiteElo", None)
            black_elo = game.headers.get("BlackElo", None)
            eco = game.headers.get("ECO", None)

            row = {
                "game_id": game_index,
                "result": result,
                "white_elo": white_elo,
                "black_elo": black_elo,
                "eco": eco,
            }
            row.update(metrics)
            rows.append(row)

    df = pd.DataFrame(rows)
    return df


# ---------- Ejemplo de uso ----------

if __name__ == "__main__":
    df_metrics = analyze_pgn_pawn_structures(data, max_games=100)
    print(df_metrics.head())

    # Guardar a CSV para análisis posterior
    df_metrics.to_csv("pawn_structures_metrics.csv", index=False)


TypeError: unhashable type: 'DataFrame'