In [None]:
import chess
import chess.pgn
import chess.engine
import numpy as np
import h5py

# Generate all possible moves (this is a static list for indexing purposes)
def generate_all_possible_moves():
    all_moves = []
    for from_square in range(64):
        for to_square in range(64):
            # Basic moves
            if chess.SQUARE_NAMES[from_square] != chess.SQUARE_NAMES[to_square]:
                all_moves.append(chess.SQUARE_NAMES[from_square] + chess.SQUARE_NAMES[to_square])
            # Add promotions
            if chess.square_rank(from_square) in [1, 6]:  # Pawn promotion ranks
                for promotion in ['q', 'r', 'b', 'n']:
                    all_moves.append(chess.SQUARE_NAMES[from_square] + chess.SQUARE_NAMES[to_square] + promotion)
    # Add "none" move for positions without a move
    all_moves.append("None")
    return all_moves

# Generate a dictionary for all possible moves and their indices
ALL_MOVES = generate_all_possible_moves()
MOVE_INDEX = {move: idx for idx, move in enumerate(ALL_MOVES)}
TOTAL_MOVES = len(ALL_MOVES)
print(f"Total Moves: {TOTAL_MOVES}")  # Should print 8193

def uci_to_one_hot(move, possible_moves):
    """
    Convert a UCI move to a one-hot vector based on possible moves.
    """
    one_hot = np.zeros(len(possible_moves), dtype=np.int32)
    if move not in possible_moves:
        raise ValueError(f"Move '{move}' not in possible moves mapping.")
    one_hot[possible_moves.index(move)] = 1.0
    print("Index of:", possible_moves.index(move), "Move made to 1:", move)
    return one_hot

def board_to_cnn_input(board):
    """
    Convert a chess board position into an 8x8x19 tensor.
    """
    planes = np.zeros((8, 8, 19), dtype=np.float32)
    piece_map = board.piece_map()

    # Map piece type to tensor index
    piece_to_plane = {
        chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
        chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5,
    }

    for square, piece in piece_map.items():
        rank, file = divmod(square, 8)
        idx = piece_to_plane[piece.piece_type] + (6 if piece.color == chess.BLACK else 0)
        planes[rank, file, idx] = 1

    # Add side to move (one plane)
    planes[:, :, 12] = int(board.turn == chess.WHITE)

    # Add castling rights (four planes)
    planes[:, :, 13] = int(board.has_kingside_castling_rights(chess.WHITE))
    planes[:, :, 14] = int(board.has_queenside_castling_rights(chess.WHITE))
    planes[:, :, 15] = int(board.has_kingside_castling_rights(chess.BLACK))
    planes[:, :, 16] = int(board.has_queenside_castling_rights(chess.BLACK))

    # Add en passant (one plane)
    if board.ep_square:
        ep_rank, ep_file = divmod(board.ep_square, 8)
        planes[ep_rank, ep_file, 17] = 1

    # Add fifty-move rule counter (one plane)
    planes[:, :, 18] = board.halfmove_clock / 100  # Normalize to [0, 1]

    return planes
def generate_cnn_input_from_pgn(pgn_file_path, engine_path, output_file):
    """
    Parse PGN file, evaluate positions, and convert each position into CNN input format.
    Save data every 2 games and append to the output file.
    """
    # Open the PGN file
    with open(pgn_file_path, "r") as pgn_file:
        # Start the chess engine
        engine = chess.engine.SimpleEngine.popen_uci(engine_path)

        inputs = []
        evals = []
        moves = []
        game_count = 0

        # Process each game in the PGN file
        while True:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break  # End of file

            game_count += 1
            board = game.board()

            for move in game.mainline_moves():
                board.push(move)

                try:
                    evaluation = engine.analyse(board, chess.engine.Limit(depth=20))
                    score = evaluation["score"].white().score(mate_score=100000)
                except Exception as e:
                    print(f"Error during analysis: {e}")
                    continue

                # Normalize the score to range [0, 1]
                if score is not None:
                    score = (score + 10000) / 20000.0  # Normalize centipawn score to [0, 1] range
                else:
                    score = 0.5  # Handle checkmate/illegal score (or a draw)

                # Get the best move from the engine
                best_move = engine.play(board, chess.engine.Limit(depth=20)).move
                best_move_uci = best_move.uci() if best_move else None

                # Skip positions with no valid best move
                if best_move_uci is None:
                    print("No best move for current position, skipping...")
                    continue

                # Generate possible moves dynamically for the current position
                possible_moves = [m.uci() for m in board.generate_legal_moves()]

                if best_move_uci not in possible_moves:
                    print(f"Warning: Move '{best_move_uci}' is not valid for the current position.")
                    print(f"Possible moves: {possible_moves}")
                    continue  # Skip this position

                # Convert position and move to CNN-compatible format
                cnn_input = board_to_cnn_input(board)
                move_one_hot = uci_to_one_hot(best_move_uci, ALL_MOVES)
               

                # Append to datasets
                inputs.append(cnn_input)
                evals.append(score)
                moves.append(move_one_hot)

            # Save data every 2 games
            if game_count % 1 == 0:
                save_data(inputs, evals, moves, output_file)
                inputs, evals, moves = [], [], []  # Reset lists

        # Save any remaining data
        if inputs:
            save_data(inputs, evals, moves, output_file)

        # Shutdown the engine
        engine.quit()

        print(f"All data saved to {output_file}")

def save_data(inputs, evals, moves, output_file):
    """
    Save or append data to the H5 file.
    """
    mode = "a" if os.path.exists(output_file) else "w"
    with h5py.File(output_file, mode) as hf:
        # Get the current size of existing datasets
        current_size = hf["inputs"].shape[0] if "inputs" in hf else 0

        # Resize existing datasets
        if "inputs" in hf:
            hf["inputs"].resize((current_size + len(inputs), 8, 8, 19))
            hf["evals"].resize((current_size + len(evals),))
            hf["moves"].resize((current_size + len(moves), len(ALL_MOVES)))
        else:
            # Create datasets if they don't exist
            hf.create_dataset("inputs", data=np.array(inputs), maxshape=(None, 8, 8, 19), chunks=True)
            hf.create_dataset("evals", data=np.array(evals), maxshape=(None,), chunks=True)
            hf.create_dataset("moves", data=np.array(moves), maxshape=(None, len(ALL_MOVES)), chunks=True)

        # Append new data
        if "inputs" in hf:
            hf["inputs"][current_size:] = np.array(inputs)
            hf["evals"][current_size:] = np.array(evals)
            hf["moves"][current_size:] = np.array(moves)

    print(f"Data appended to {output_file}")
# Example usage
pgn_file_path = "/content/lichess_db_standard_rated_2013-01.pgn"  # Replace with your PGN file path
engine_path = "/usr/games/stockfish"  # Replace with your Stockfish engine path
output_file = "Chess_data1.h5"  # Output file to store the dataset

generate_cnn_input_from_pgn(pgn_file_path, engine_path, output_file)
