# LOADING DATASET

In [26]:
import pathlib
import random
import urllib
import zstandard
import chess
import torch
import numpy as np
from torch import nn, Tensor

In [27]:
def __download(url: str, name: str) -> str:
    path, _ = urllib.request.urlretrieve(url, name)
    return path


def __unpack(path: str, name: str):
    input_file = pathlib.Path(path)
    with open(input_file, 'rb') as compressed:
        decomp = zstandard.ZstdDecompressor()
        output_path = name
        with open(output_path, 'wb') as destination:
            decomp.copy_stream(compressed, destination)
            destination.close()
        compressed.close()


def __remove(path: str):
    pathlib.Path.unlink(pathlib.Path(path))

In [28]:
path = __download("https://database.lichess.org/lichess_db_puzzle.csv.zst", "lichess_db_puzzle.csv.zst")

In [29]:
__unpack(path, "lichess_db_puzzle.csv")

In [30]:
__remove("lichess_db_puzzle.csv.zst")

In [31]:
class Puzzle:
    def __init__(self, row: str):
        fields = row.split(',')
        self.fen = fields[1]
        self.moves = fields[2].split(" ")
        self.tags = fields[7].split(" ")
        self.board = chess.Board(self.fen)

    def __str__(self):
        return "{fen: " + self.fen + " ,tags: [" + ", ".join(self.tags) + "],moves: [" + ",".join(self.moves) + "]}"
    
    def fen_to_tensors_list(self) -> [torch.Tensor]:
        white = self.board.occupied_co[chess.WHITE]
        black = self.board.occupied_co[chess.BLACK]
        
        return [
            bitboard_to_tensor(white),
            bitboard_to_tensor(black),
            bitboard_to_tensor(white & self.board.kings),
            bitboard_to_tensor(black & self.board.kings),
            bitboard_to_tensor(white & self.board.pawns),
            bitboard_to_tensor(black & self.board.pawns),
            bitboard_to_tensor(white & self.board.queens),
            bitboard_to_tensor(black & self.board.queens),
            bitboard_to_tensor(white & self.board.knights),
            bitboard_to_tensor(black & self.board.knights),
            bitboard_to_tensor(white & self.board.bishops),
            bitboard_to_tensor(black & self.board.bishops),
            bitboard_to_tensor(white & self.board.rooks),
            bitboard_to_tensor(black & self.board.rooks)
        ]
    
    def puzzle_to_tensor(self) -> torch.Tensor:
        fen_tensors = self.fen_to_tensors_list()
        move_tensors: list[Tensor] = []
        for i in range(7): # FIRST 7 moves
            if i < len(self.moves):
                move_tensors.append(move_to_tensor(self.moves[i]))
            else:
                move_tensors.append(torch.zeros(8, 8))
        return torch.stack(fen_tensors + move_tensors)

In [32]:
def load(k: int) -> [Puzzle]:
    f = open("lichess_db_puzzle.csv")
    f.readline()
    result = []
    for i in range(k):
        result.append(Puzzle(f.readline()))
    f.close()
    return result

In [33]:
load(10)[0].__str__()

'{fen: r6k/pp2r2p/4Rp1Q/3p4/8/1N1P2R1/PqP2bPP/7K b - - 0 24 ,tags: [crushing, hangingPiece, long, middlegame],moves: [f2g3,e6e7,b2b1,b3c1,b1c1,h6c1]}'

# FILTER DATASET

In [34]:
expected_tags = {
    'attraction',
    'discoveredAttack',
    'doubleCheck',
    'fork',
    'pin',
    'sacrifice',
    'skewer',
    'xRayAttack',
    'zugzwang',
    'deflection',
    'clearance'
}

In [35]:
expected_tags_list = list(expected_tags)

In [36]:
def not_has_promotion_move(puzzle: Puzzle) -> bool:
    return all([len(m) == 4 for m in puzzle.moves])
def filter_data(data: [Puzzle]) -> [Puzzle]:
    puzzles_single_tag = list(filter(lambda p: len(set(p.tags) & expected_tags) == 1, data))
    puzzles_not_ambiguous =  list(filter(not_has_promotion_move, puzzles_single_tag))
    return  puzzles_not_ambiguous
        

In [37]:
len(filter_data(load(100)))

35

# CONVERSION TO TENSOR

In [38]:
def bitboard_to_tensor(bitboard: int) -> torch.Tensor:
    li = [1 if digit == '1' else 0 for digit in bin(bitboard)[2:]]
    li = [0 for _ in range(64 - len(li))] + li
    return torch.tensor(li).reshape((8, 8))

In [39]:
load(1)[0].fen_to_tensors_list()

[tensor([[0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [1, 0, 0, 1, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 1, 0, 0, 1, 0, 1, 0],
         [1, 1, 0, 0, 0, 1, 0, 1],
         [1, 0, 0, 0, 0, 0, 0, 0]]),
 tensor([[1, 0, 0, 0, 0, 0, 0, 1],
         [1, 0, 0, 1, 0, 0, 1, 1],
         [0, 0, 1, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 1, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 1, 0, 0, 0, 1, 0],
         [0, 0, 0, 0, 0, 0, 0, 0]]),
 tensor([[0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [1, 0, 0, 0, 0, 0, 0, 0]]),
 tensor([[1, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0

In [40]:
def move_to_tensor(move: str) -> torch.Tensor:
    x1 = 7 - ord(move[0]) + ord('a')
    y1 = 8 - int(move[1])
    x2 = 7 - ord(move[2]) + ord('a')
    y2 = 8 - int(move[3])
    tensor = torch.zeros(8, 8)
    tensor[y1][x1] = 1
    tensor[y2][x2] = 1
    return tensor

In [41]:
print(move_to_tensor('b2b4'))

tensor([[0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.]])


In [42]:
load(1)[0].puzzle_to_tensor()

tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [1., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 1., 0.,  ..., 0., 1., 0.],
         [1., 1., 0.,  ..., 1., 0., 1.],
         [1., 0., 0.,  ..., 0., 0., 0.]],

        [[1., 0., 0.,  ..., 0., 0., 1.],
         [1., 0., 0.,  ..., 0., 1., 1.],
         [0., 0., 1.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 1.,  ..., 0., 1., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [1., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0., 

# CONVERT AND BATCH DATASET

In [43]:
def puzzle_to_tag_index(puzzle: Puzzle) -> int:
    [tag] = set(puzzle.tags) & expected_tags
    index = expected_tags_list.index(tag)
    return index

In [44]:
puzzle_to_tag_index(filter_data(load(100))[0])

5

In [45]:
def convert_dataset(puzzles: [Puzzle]) -> list[tuple[torch.Tensor, int]]:
    return [(puzzle.puzzle_to_tensor(), puzzle_to_tag_index(puzzle)) for puzzle in puzzles]

In [46]:
TEST_SIZE = 500*64
VALIDATION_SIZE = 500*64

In [47]:
dataset = convert_dataset(filter_data(load(3000000)))
random.shuffle(dataset) 
len(dataset) 

886146

In [48]:
test_dataset, dataset = dataset[:TEST_SIZE], dataset[TEST_SIZE:]
validation_dataset, train_dataset = dataset[:VALIDATION_SIZE], dataset[VALIDATION_SIZE:]

print("Train: ", len(train_dataset))
print("Test: ", len(test_dataset))
print("Validation: ", len(validation_dataset))

Train:  822146
Test:  32000
Validation:  32000


In [49]:
torch.save(train_dataset, "dataset_train.save") # 5GB
torch.save(test_dataset, "dataset_test.save")
torch.save(validation_dataset, "dataset_valid.save")

In [50]:
train_dataset[0][0].shape

torch.Size([21, 8, 8])