In [None]:
import sys
import os

# Add the parent directory of the 'ciwa' module to the Python path
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
!pip install cairosvg

In [None]:
from ciwa.utils.session_parser import parse_session_json

In [None]:
# Chess Game Analysis and Visualization

import os
import glob
import json
import pandas as pd
import numpy as np
import chess
import chess.svg
from IPython.display import display, SVG, HTML
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import re
import io
from cairosvg import svg2png
from PIL import Image
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to load all session results
def load_all_sessions(directory='.'):
    all_sessions = []
    for filename in glob.glob(os.path.join(directory, 'Session_*_results.json')):
        with open(filename, 'r') as f:
            session_data = json.load(f)
        all_sessions.append(session_data)
    return all_sessions

# Load all sessions
all_sessions = load_all_sessions()

# Parse all sessions and combine into a single dataframe
all_data = []
for session in all_sessions:
    parsed_data = parse_session_json(session)
    session_uuid = session['session']['uuid']
    session_name = session['session']['name']
    
    # Add session info to submissions
    parsed_data['submissions']['session_uuid'] = session_uuid
    parsed_data['submissions']['session_name'] = session_name
    
    all_data.append(parsed_data['submissions'])

combined_df = pd.concat(all_data, ignore_index=True)

# Extract top-ranked moves
top_moves = combined_df.sort_values('aggregated_result').groupby('session_name').first().reset_index()

# Sort moves by session name (assuming format "Move X")
def extract_move_number(session_name):
    match = re.search(r'\d+', session_name)
    return int(match.group()) if match else np.nan

top_moves['move_number'] = top_moves['session_name'].apply(extract_move_number)
top_moves = top_moves.sort_values('move_number').dropna(subset=['move_number'])

print(top_moves[['session_name', 'content', 'aggregated_result']])

# Function to convert algebraic notation to UCI
def algebraic_to_uci(board, move_str):
    # Remove move number if present
    move_str = re.sub(r'^\d+\.', '', move_str).strip()
    # If the move is just a square (e.g., "e4"), assume it's a pawn move
    if re.match(r'^[a-h][1-8]$', move_str):
        file_index = ord(move_str[0]) - ord('a')
        rank_index = int(move_str[1]) - 1
        to_square = chess.square(file_index, rank_index)
        
        # Find the correct starting square for the pawn
        for rank in range(rank_index - 1, -1, -1):
            from_square = chess.square(file_index, rank)
            if board.piece_at(from_square) == chess.Piece(chess.PAWN, board.turn):
                return chess.Move(from_square, to_square).uci()
        
        logger.warning(f"Could not find a pawn to move to {move_str}")
        return None
    try:
        move = board.parse_san(move_str)
        return move.uci()
    except ValueError:
        logger.warning(f"Could not parse move: {move_str}")
        return None



# Function to safely extract move from content
def extract_move(content):
    logger.info(f"Attempting to extract move from content: {content}")
    if isinstance(content, dict) and 'move' in content:
        return content['move']
    elif isinstance(content, str):
        # Try to parse the string as JSON
        try:
            content_dict = json.loads(content)
            if isinstance(content_dict, dict) and 'move' in content_dict:
                return content_dict['move']
        except json.JSONDecodeError:
            # If it's not JSON, try to extract a move using regex
            # This pattern looks for standard algebraic notation like "1.e4" or "e4"
            move_match = re.search(r'\b(?:\d+\.)?([KQRBNP]?[a-h]?[1-8]?x?[a-h][1-8](?:=[QRBN])?|O-O(?:-O)?|0-0(?:-0)?)\b', content)
            if move_match:
                return move_match.group(0)  # Return the full match, including move number if present
    # If we couldn't extract a move, return None
    logger.warning(f"Could not extract move from content: {content}")
    return None


# Function to convert SVG to PNG
def svg_to_png(svg_string):
    png_bytes = svg2png(bytestring=svg_string)
    return Image.open(io.BytesIO(png_bytes))

# Create chess game animation
def create_chess_animation(moves):
    board = chess.Board()
    fig, ax = plt.subplots(figsize=(8, 8))
    moves_applied = 0
    
    def animate(i):
        nonlocal moves_applied
        ax.clear()
        ax.axis('off')
        
        if i < len(moves) and moves_applied < i + 1:
            move = moves.iloc[i]
            algebraic_move = extract_move(move['content'])
            if algebraic_move:
                uci_move = algebraic_to_uci(board, algebraic_move)
                if uci_move:
                    try:
                        chess_move = chess.Move.from_uci(uci_move)
                        if chess_move in board.legal_moves:
                            board.push(chess_move)
                            logger.info(f"Applied move: {algebraic_move} (UCI: {uci_move})")
                            moves_applied += 1
                        else:
                            logger.warning(f"Illegal move: {uci_move} at move {i+1}")
                    except ValueError:
                        logger.warning(f"Invalid move format: {uci_move} at move {i+1}")
                else:
                    logger.warning(f"Could not convert to UCI: {algebraic_move} at move {i+1}")
            else:
                logger.warning(f"Could not extract move at move {i+1}")
        
        svg = chess.svg.board(board, size=400)
        png_image = svg_to_png(svg)
        ax.imshow(png_image)
        ax.set_title(f"Move {i+1}: {extract_move(moves.iloc[i]['content']) if i < len(moves) else 'End'}")
    
    anim = animation.FuncAnimation(fig, animate, frames=len(moves)+1, interval=1000, repeat=False)
    return anim

# Print out the top_moves DataFrame for inspection
print(top_moves[['session_name', 'content', 'aggregated_result']])

# Create and display the animation
anim = create_chess_animation(top_moves)
HTML(anim.to_jshtml())

# Optional: Save the animation
# anim.save('chess_game.gif', writer='pillow')

In [None]:
all_data

In [None]:
print(top_moves)