# Import libraries

Import required libraries and initialize Stockfish engine

TODO: re-write engines to use chess.engine instead, so we can use lc0 with Maia weights as well

In [None]:
import chess
import chess.pgn
import os
import math
from stockfish import Stockfish

stockfish = Stockfish(os.path.join('bin', 'stockfish_14_x64'))

# Load pgns

In [None]:
LIMIT = 50 # max number of games to load

In [None]:
pgn = open(os.path.join('data', 'lichess_db_standard_rated_2013-01.pgn'))
games = []
for _ in range(LIMIT):
    game = chess.pgn.read_game(pgn)
    games.append(game)
print(len(games))

# Define pipeline functions

* Calculate value of a piece
* Get Stockfish evaluations of tactic move suggestions
* Get top-n Stockfish move suggestions for a given board position
* Evaluate the move suggestions from a tactic against Stockfish suggestions using DCG (discounted
  cumulative gain)


In [None]:
def value(piece):
    if not piece:
        return 0
    elif piece.piece_type == chess.PAWN:
        return 1
    elif piece.piece_type == chess.KNIGHT:
        return 3
    elif piece.piece_type == chess.BISHOP:
        return 3
    elif piece.piece_type == chess.ROOK:
        return 5
    elif piece.piece_type == chess.QUEEN:
        return 9
    else:
        return 0

In [None]:
def get_evals(engine, board, suggestions):
    evals = []
    previous_board_fen = engine.get_fen_position()
    for suggested_move in suggestions:
        board.push(suggested_move)
        engine.set_fen_position(board.fen())
        evaluation = engine.get_evaluation()
        evals.append((evaluation['type'], evaluation['value']))
        board.pop()
    engine.set_fen_position(previous_board_fen)
    return evals

In [None]:
def get_top_n_moves(engine, n, board):
    previous_board_fen = engine.get_fen_position()
    engine.set_fen_position(board.fen())
    top_n_moves = engine.get_top_moves(n)
    engine.set_fen_position(previous_board_fen)
    return top_n_moves

In [None]:
def evaluate(evaluated_suggestions, top_moves):
    dcg = 0
    for idx, (evaluated_move, top_move) in enumerate(zip(evaluated_suggestions, top_moves)):
        # print(evaluated_move, top_move)
        if evaluated_move[1][0] == 'cp':
            eval = evaluated_move[1][1]
        elif evaluated_move[1][0] == 'mate':
            eval = 2000
        top_eval = top_move['Centipawn'] if not top_move['Mate'] else 2000
        dcg += abs(top_eval - eval) / math.log2(1 + (idx + 1))
    return dcg

# Define tactic heuristics

* Fork
    * Pattern: if a non-king piece can move to a square where it attacks more than 1 piece of greater
    value than a pawn
    * Suggestion: suggest the moves where the condition holds

In [None]:
def fork(board):
    match = False
    suggestions = []
    for move in board.legal_moves:
        if board.piece_type_at(move.from_square) == chess.KING:
            continue
        board.push(move)
        board.push(chess.Move.null())
        nb = 0
        # print(move, chess.square_name(move.to_square), board.attacks(move.to_square))
        for attacks in board.attacks(move.to_square):
            attacked_piece = board.piece_at(attacks)
            # print(chess.square_name(attacks), attacked_piece, value(attacked_piece))
            if value(attacked_piece) > 1:
                nb += 1
        board.pop()
        board.pop()
        if nb > 1:
            match = True
            suggestions.append(move)
    return match, suggestions[:3]

# Calculate metrics for all games

In [None]:
total = 0
matches = 0
total_dcg = 0
for game in games:
    board = game.board()
    total += 1
    for move in game.mainline_moves():
        board.push(move)
        match, suggestions = fork(board)
        if match:
            matches += 1
            evals = get_evals(stockfish, board, suggestions)
            evaluated_suggestions = zip(suggestions, evals)
            top_n_moves = get_top_n_moves(stockfish, len(suggestions), board)
            # print(board.fen(), evaluated_suggestions, top_n_moves)
            total_dcg += evaluate(evaluated_suggestions, top_n_moves)
            
print(f'Coverage = {matches / total}')
print(f'Total DCG = {total_dcg}')

# Individual Test Position

A single constructed position for testing the fork heuristic

In [None]:
board = chess.Board('r1bqkb1r/pppp1ppp/2n5/4p3/4P1n1/2NP1N2/PPP2PPP/R1BQKB1R w KQkq - 1 5')
board

In [None]:
match, suggestions = fork(board)
top_moves = get_top_n_moves(stockfish, len(suggestions), board)
if match:
    evals = get_evals(stockfish, board, suggestions)
    print(board.fen(), match, list(zip(suggestions, evals)), top_moves)
print(evaluate(zip(suggestions, evals), top_moves))