In [2]:
import chess
import concurrent
from src.data_loading import save_dataset_to_csv
from src.raw_data_gather import gather
from src.patches import PUZZLE_DATASET_PATCH, PUZZLE_PATCH, STOCKFISH_PATH
from stockfish import Stockfish
from concurrent.futures import ThreadPoolExecutor

Download data

In [3]:
gather("https://database.lichess.org", "lichess_db_puzzle.csv.zst", PUZZLE_PATCH)

Load puzzles

In [4]:
class Puzzle:
    def __init__(self, row: str):
        fields = row.split(',')
        self.fen = fields[1]
        self.moves = fields[2].split(" ")
        self.tags = fields[7].split(" ")

    def __str__(self):
        return "{fen: " + self.fen + " ,tags: [" + ", ".join(self.tags) + "],moves: [" + ",".join(self.moves) + "]}"

In [5]:
def load(k: int) -> [Puzzle]:
    f = open(PUZZLE_PATCH)
    f.readline()
    result = []
    for i in range(k):
        result.append(Puzzle(f.readline()))
    f.close()
    return result

Evaluate positions

In [6]:
def generate_positions_for_puzzle(puzzle: Puzzle) -> [(str, [str])]:
    return [(puzzle.fen, puzzle.moves[:i]) for i in range(len(puzzle.moves) + 1)]

In [7]:
generate_positions_for_puzzle(load(1)[0])

[('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24', []),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24', ['f2g3']),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24', ['f2g3', 'e6e7']),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24',
  ['f2g3', 'e6e7', 'b2b1']),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24',
  ['f2g3', 'e6e7', 'b2b1', 'b3c1']),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24',
  ['f2g3', 'e6e7', 'b2b1', 'b3c1', 'b1c1']),
 ('r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24',
  ['f2g3', 'e6e7', 'b2b1', 'b3c1', 'b1c1', 'h6c1'])]

In [8]:
def generate_fen_for_position(position: (str, [str])) -> str:
    board = chess.Board(position[0])
    for move in position[1]:
        board.push_uci(move)
    if board.is_game_over():
        return 'FINISHED'
    return board.fen()

In [9]:
generate_fen_for_position(generate_positions_for_puzzle(load(1)[0])[0])

'r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24'

In [10]:
def puzzles_to_fens(puzzles: [Puzzle]) -> [str]:
    return [generate_fen_for_position(position)
            for puzzle in puzzles
            for position in generate_positions_for_puzzle(puzzle)]

In [11]:
puzzles_to_fens(load(10))

['r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24',
 'r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2b1/PqP3PP/7K w - - 0 25',
 'r6k/pp2R2p/5p1Q/3p4/8/1N1P2b1/PqP3PP/7K b - - 0 25',
 'r6k/pp2R2p/5p1Q/3p4/8/1N1P2b1/P1P3PP/1q5K w - - 1 26',
 'r6k/pp2R2p/5p1Q/3p4/8/3P2b1/P1P3PP/1qN4K b - - 2 26',
 'r6k/pp2R2p/5p1Q/3p4/8/3P2b1/P1P3PP/2q4K w - - 0 27',
 'r6k/pp2R2p/5p2/3p4/8/3P2b1/P1P3PP/2Q4K b - - 0 27',
 '5rk1/1p3ppp/pq3b2/8/8/1P1Q1N2/P4PPP/3R2K1 w - - 2 27',
 '5rk1/1p3ppp/pq1Q1b2/8/8/1P3N2/P4PPP/3R2K1 b - - 3 27',
 '3r2k1/1p3ppp/pq1Q1b2/8/8/1P3N2/P4PPP/3R2K1 w - - 4 28',
 '3Q2k1/1p3ppp/pq3b2/8/8/1P3N2/P4PPP/3R2K1 b - - 0 28',
 '3b2k1/1p3ppp/pq6/8/8/1P3N2/P4PPP/3R2K1 w - - 0 29',
 '8/4R3/1p2P3/p4r2/P6p/1P3Pk1/4K3/8 w - - 1 64',
 '8/5R2/1p2P3/p4r2/P6p/1P3Pk1/4K3/8 b - - 2 64',
 '8/5R2/1p2P3/p3r3/P6p/1P3Pk1/4K3/8 w - - 3 65',
 '8/5R2/1p2P3/p3r3/P6p/1P3Pk1/8/5K2 b - - 4 65',
 '8/5R2/1p2r3/p7/P6p/1P3Pk1/8/5K2 w - - 0 66',
 'r2qr1k1/b1p2ppp/pp4n1/P1P1p3/4P1n1/B2P2Pb/3NBP1P/RN1QR1K1 b - - 1 16',
 'r2qr1k1/b1

In [12]:
def filter_finished_fens(fens: [str]) -> [str]:
    return [f for f in fens if f != 'FINISHED']

In [13]:
def evaluate_fen(fen: str, stockfish_path: str, ) -> dict:
    stockfish = Stockfish(stockfish_path)
    stockfish.set_fen_position(fen)
    return stockfish.get_evaluation()

In [14]:
def evaluate_fens(fens: [str], stockfish_path: str) -> [(str, dict)]:
    with concurrent.futures.ThreadPoolExecutor(10) as executor:
        futures = [(fen, executor.submit(evaluate_fen, fen, stockfish_path)) for fen in fens]
    return [(f, e.result()) for f, e in futures]

Generate dataset with evaluated fens

In [15]:
def generate_dataset(size, stockfish_path) -> [(str, float)]:
    return [(f, e["value"]) for f, e in
            evaluate_fens(filter_finished_fens(puzzles_to_fens(load(size))), stockfish_path) if e["type"] == "cp"]

In [17]:
SIZE = 10 ** 5
dataset = generate_dataset(SIZE, STOCKFISH_PATH)
print("Dataset size: ", len(dataset))

Dataset size:  468403


In [18]:
save_dataset_to_csv(dataset, PUZZLE_DATASET_PATCH)