In [1]:
import torch
import chess.pgn
import chess.engine

In [2]:
rating_ranges = [
    (800, 1000), (1000, 1200), (1200, 1400), (1400, 1600), (1600, 1800),
    (1800, 2000), (2000, 2200), (2200, 2400), (2400, 2600), (2600, 3000)
]

### Load Games

In [3]:
chess_games = {rating: [] for rating in rating_ranges}
min_elo = 9999
max_elo = 0

for rating_range in rating_ranges:
    lower_bound = rating_range[0]
    upper_bound = rating_range[1]
    
    file = f"outputs/{str(lower_bound)}-{str(upper_bound)}.pgn"


    with open(file) as f:
        while len(chess_games[rating_range]) < 200:
            game = chess.pgn.read_game(f)
            if game is None:
                break
            if any(time_control in game.headers["Event"] for time_control in [
                "Correspondence", "Daily", "Classical, Bullet, UltraBullet"
            ]):
                continue
            if game.headers["WhiteElo"] == "?" or game.headers["BlackElo"] == "?":
                continue
            if (
                not lower_bound <= int(game.headers["WhiteElo"]) <= upper_bound
                or not lower_bound <= int(game.headers["BlackElo"]) <= upper_bound
            ):
                continue
            if not game.mainline_moves():
                continue
            
            chess_games[rating_range].append(game)
            
            min_elo = min(min_elo, int(game.headers["WhiteElo"]), int(game.headers["BlackElo"]))
            max_elo = max(max_elo, int(game.headers["WhiteElo"]), int(game.headers["BlackElo"]))
            
    print(f"{rating_range} games: {len(chess_games[rating_range])} done")
    
print(f"Min elo: {min_elo}, Max elo: {max_elo}")

(800, 1000) games: 200 done
(1000, 1200) games: 200 done
(1200, 1400) games: 200 done
(1400, 1600) games: 200 done
(1600, 1800) games: 200 done
(1800, 2000) games: 200 done
(2000, 2200) games: 200 done
(2200, 2400) games: 200 done
(2400, 2600) games: 200 done
(2600, 3000) games: 200 done
Min elo: 800, Max elo: 2853


### Analyze the games

In [9]:
def fen_to_bitboard(fen_str):
    mapings = {
        "P": 0, "N": 1, "B": 2, "R": 3, "Q": 4, "K": 5,
        "p": 6, "n": 7, "b": 8, "r": 9, "q": 10, "k": 11
    }   
    
    bitboard = torch.zeros(12, 64)
    fen, move, _castle, _en_passant, _halfmove, _fullmove = fen_str.split(" ")
    row, col = 0, 0
    for char in fen:
        if char == "/":
            row += 1
            col = 0
        elif char.isdigit():
            col += int(char)
        else:
            bitboard[mapings[char], row * 8 + col] = 1
            col += 1
    # Flatten the bitboard and add whose move it is
    return torch.cat((torch.tensor([1 if move == "w" else -1]), bitboard.flatten()))


In [6]:
def pad_tensor(tensor, length):
    return torch.cat((tensor, torch.ones(length - len(tensor)) * -mate_score))

def analyze_game(game, engine, mate_score, nof_moves=10):
    board = game.board()
    analysis = []
    
    for i, move in enumerate(game.mainline_moves()):
        # Enfine evaluation before the move
        top_moves = engine.analyse(board, chess.engine.Limit(time=0.1), multipv=nof_moves)
        # A tensor with the score of the top nof_moves moves (normalized to be between -1 and 1)
        top_moves_tensor = torch.Tensor([eval["score"].relative.score(mate_score=mate_score) for eval in top_moves]) 
        # Pad the tensor if there are less than nof_moves legal moves
        top_moves_tensor = pad_tensor(top_moves_tensor, nof_moves) / mate_score
        # Win, draw, loss chance tensor before the move
        before_wdl = top_moves[0]["score"].relative.wdl()
        before_wdl_tensor = torch.Tensor([
            before_wdl.winning_chance(), 
            before_wdl.drawing_chance(), 
            before_wdl.losing_chance()
        ])
        board.push(move)
        
        # Engine evaluation after the move
        after_move = engine.analyse(board, chess.engine.Limit(time=0.1))
        # Now it's the opponent's turn so negate the score
        after_move_tensor = torch.Tensor([after_move["score"].relative.score(mate_score=mate_score) * -1]) / mate_score
        # Reverse the list so that it's from the perspective of the player who just moved
        after_wdl = after_move["score"].relative.wdl()
        after_wdl_tensor = torch.Tensor([
            after_wdl.losing_chance(), 
            after_wdl.drawing_chance(), 
            after_wdl.winning_chance()
        ])

        analysis.append(torch.cat((
            top_moves_tensor, before_wdl_tensor, 
            after_move_tensor, after_wdl_tensor
        )))
        # print(f"before move: {move}, top_moves {top_moves_tensor}, wdl {before_wdl_tensor}")
        # print(f"after:               top_moves {after_move_tensor}, wdl {after_wdl_tensor}")
    
    # Pad the game if it ends on white's turn to batch white's and black's analysis together
    if len(analysis) % 2:
        analysis.append(torch.ones_like(analysis[0]) * (-mate_score))
    
    white_analysis = torch.stack([position for position in analysis[::2]])
    black_analysis = torch.stack([position for position in analysis[1::2]])
    
    white_elo = int(game.headers["WhiteElo"])
    black_elo = int(game.headers["BlackElo"])
    
    return torch.stack((white_analysis, black_analysis)), torch.Tensor([white_elo, black_elo])

def analyze_games(rating_range, games, n=None):
    analysis = []
    elo = []
    
    engine = chess.engine.SimpleEngine.popen_uci("/usr/bin/stockfish")
    mate_score = 1_000

    for i, game in enumerate(games[:n]):
        analysis_tensor, elo_tensor = analyze_game(game, engine, mate_score)
        analysis.append(analysis_tensor)
        elo.append(elo_tensor)
        
        if i % 20 == -1:
            print(f"{rating_range} game {i + 1} done")
            
    engine.close()
    print(f"{rating_range} done")
    
    return analysis, elo

In [32]:
def convert_position(game):
    board = game.board()
    positions = []
    
    for move in game.mainline_moves():
        board_position_tensor = fen_to_bitboard(board.fen())
        positions.append(board_position_tensor)
        board.push(move)
        
    # Pad the game if it ends on white's turn to batch white's and black's analysis together later
    if len(positions) % 2:
        positions.append(torch.zeros_like(positions[0]))
    
    white_positions = torch.stack([position for position in positions[::2]])
    black_positions = torch.stack([position for position in positions[1::2]])
    
    white_elo = int(game.headers["WhiteElo"])
    black_elo = int(game.headers["BlackElo"])
    
    return torch.stack((white_positions, black_positions)), torch.Tensor([white_elo, black_elo])
    

def convert_positions_to_tensors(dataset):
    positions = []
    elo = []
    
    for rating_range, games in dataset.items():
        for game in games:
            position_tensor, elo_tensor = convert_position(game)
            positions.append(position_tensor)
            elo.append(elo_tensor)
            
        print(f"{rating_range} done")
        
    return positions, elo

In [35]:
positions, elo = convert_positions_to_tensors(chess_games)

(800, 1000) done
(1000, 1200) done
(1200, 1400) done
(1400, 1600) done
(1600, 1800) done
(1800, 2000) done
(2000, 2200) done
(2200, 2400) done
(2400, 2600) done
(2600, 3000) done


In [36]:
torch.save((positions, elo), "all_board_positions.pt")

In [7]:
import multiprocessing as mp

# pool = mp.Pool(processes=mp.cpu_count())
pool = mp.Pool(processes=10)

n = None
args = [(label, games, n) for label, games in chess_games.items()]

output = pool.starmap(analyze_games, args)

pool.close()
pool.join()


NameError: name 'mate_score' is not defined

In [151]:
analysis = [game for games, _ in output for game in games]
elo = [elo for _, elos in output for elo in elos]

20 20
torch.Size([2, 59, 17]) torch.Size([2])


2


In [None]:
torch.save((analysis, elo), "all_analysis.pt")

In [96]:
game = chess.pgn.read_game(open("../my_game1.pgn"))
a = analyze_game(game)

before move: d2d4, top_moves tensor([ 0.0039,  0.0037,  0.0028,  0.0023,  0.0022,  0.0017,  0.0016,  0.0009,
         0.0002, -0.0008]), wdl tensor([0.0410, 0.9580, 0.0010])
after:               top_moves tensor([0.0029]), wdl tensor([0.0240, 0.9750, 0.0010])
before move: f7f5, top_moves tensor([-0.0035, -0.0036, -0.0044, -0.0054, -0.0054, -0.0044, -0.0054, -0.0059,
        -0.0062, -0.0065]), wdl tensor([0.0010, 0.9660, 0.0330])
after:               top_moves tensor([-0.0065]), wdl tensor([0.0000, 0.8520, 0.1480])
before move: c2c4, top_moves tensor([0.0074, 0.0059, 0.0053, 0.0051, 0.0051, 0.0048, 0.0044, 0.0037, 0.0032,
        0.0030]), wdl tensor([0.2200, 0.7800, 0.0000])
after:               top_moves tensor([0.0055]), wdl tensor([0.0920, 0.9080, 0.0000])
before move: g8f6, top_moves tensor([-0.0054, -0.0055, -0.0067, -0.0075, -0.0077, -0.0078, -0.0082, -0.0084,
        -0.0084, -0.0093]), wdl tensor([0.0000, 0.9120, 0.0880])
after:               top_moves tensor([-0.0061]), wdl t