

### Serialization explanation:

The chess board has **64 squares**, indexed by `python-chess` as:

```
[a1, b1, c1, d1, e1, f1, g1, h1,
 a2, ..., h2,
 ...
 a8, ..., h8]
```

Each square is converted into a **number**:

* Empty → `0`
* White king → `6`
* White rook → `4`
* White rook with castling rights → `7`
* Black king → `14`

---

### Initial position

```
8  . . . . k . . .
7  . . . . . . . .
6  . . . . . . . .
5  . . . . . . . .
4  . . . . . . . .
3  . . . . . . . .
2  . . . . . . . .
1  . . . . K . . R
   a b c d e f g h
```

* White king at `e1` → index `4` → value `6`
* White rook at `h1` → index `7` → value `7` (castling allowed)

---

### After white castles kingside (O-O)

```
8  . . . . k . . .
7  . . . . . . . .
6  . . . . . . . .
5  . . . . . . . .
4  . . . . . . . .
3  . . . . . . . .
2  . . . . . . . .
1  . . . . . R K .
   a b c d e f g h
```

* King moves to `g1` → index `6` → value `6`
* Rook moves to `f1` → index `5` → value `4`
* Castling rights are gone

---

### Final step

Each square’s value is split into **4 binary planes**, plus:

* **1 plane** for whose turn it is

Total:

```
8 × 8 × 5 = 320 values
```



In [None]:
import chess
import chess.pgn
import os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time
import sys
import numpy as np
from multiprocessing import Pool
import time
from tqdm import tqdm
import h5py

In [3]:
board = chess.Board()
print(board)

r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
P P P P P P P P
R N B Q K B N R


In [4]:
pgn = open("./data/KingBase2018-E60-E99.pgn")

In [None]:
#counting number of games in one dataset:
#Simple slow code:

# num_games = 0
# while True:
#     game = chess.pgn.read_game(pgn)
#     if game == None:
#         break
#     else:
#         num_games += 1
#         print(num_games)
# print(num_games)

#fast code:
num_games = 0
with open("./data/KingBase2018-E60-E99.pgn", 'r') as f2:
    for line in f2:
        #print(line)
        if line.startswith("[Event"):
            num_games += 1

with open("./data/KingBase2018-E60-E99.pgn", "r", errors="ignore") as f:
    data = f.read()
    print(f' type of png data: {type(data)}') 
    print(f'size of png data: {sys.getsizeof(data)}')
    

print(f"number of games: {num_games}")

 type of png data: <class 'str'>
size of png data: 87095114
number of games: 237619


In [2]:
class State():
    def __init__(self, board=None):
        if board is None:
            self.board = chess.Board()
        else:
            self.board = board
 
    def edges(self):
        return list(self.board.legal_moves)

    def value(self):
        return 1
    
    def serialize(self):
        assert self.board.is_valid()
        
        bstate = np.zeros(64, np.uint8)
        for i in range(64):
            pp = self.board.piece_at(i)
            if pp is not None:
                bstate[i] = {"P":1, "N":2, "B":3, "R":4, "Q":5, "K":6, \
                             "p":9, "n":10, "b":11, "r":12, "q":13, "k":14 }[pp.symbol()]
        
        if self.board.has_queenside_castling_rights(chess.WHITE):
            assert bstate[0] == 4
            bstate[0] = 7
        if self.board.has_kingside_castling_rights(chess.WHITE):
            assert bstate[7] == 4
            bstate[7] = 7
        if self.board.has_queenside_castling_rights(chess.BLACK):
            assert bstate[56] == 8+4
            bstate[56] = 8+7
        if self.board.has_kingside_castling_rights(chess.BLACK):
            assert bstate[63] == 8+4
            bstate[63] = 7


        if self.board.ep_square is not None:
            assert bstate[self.board.ep_square] == 0
            bstate[self.board.ep_square] = 8
        bstate = bstate.reshape(8,8)
                
    
        #binary state:
        state = np.zeros((8,8,5), np.uint8)
        #state[self.board.ep_square, :, :, 3] = 1  #self.board.ep_square: The potential en
                                                #passant square on the third or sixth rank or None.


        # 0-3 columns to binary
        state[:,:,0] = (bstate>>3)&1
        state[:,:,1] = (bstate>>2)&1
        state[:,:,2] = (bstate>>1)&1
        state[:,:,3] = (bstate>>0)&1

        #4th column is who's turn it is:
        state[:, :, 4] = (self.board.turn * 1.0)  # white turn := True

        # pp = self.board.shredder_fen()
        # print(pp)
        return state

    

    

#0-1 black wins -> -1 
#1-0 white wins -> +1 
#1/2 1/2 draw   ->  0

In [10]:
def get_dataset(num_samples=None):
    X,Y = [], []
    game_num = 0
    for fn in os.listdir("data"):
        pgn = open(os.path.join("data", fn))
        while 1:
            try: 
                game = chess.pgn.read_game(pgn)
            except Exception:
                break        
            
            print("parsing game number %d got %d examples" %(game_num, len(X)) )
            

            value = {"1-0": 1, "0-1": -1, "1/2-1/2": 0}[game.headers['Result']]
            board  = game.board()

            for i, move in enumerate(game.mainline_moves()):
                board.push(move)
                ser =  State(board).serialize()[:, :, 0]
                X.append(ser)
                Y.append(value)
            if num_samples is not None and len(X) > num_samples:
                return X,Y
            game_num += 1

    return X,Y

X,Y = get_dataset(1000)


parsing game number 0 got 0 examples
parsing game number 1 got 85 examples
parsing game number 2 got 163 examples
parsing game number 3 got 213 examples
parsing game number 4 got 351 examples
parsing game number 5 got 415 examples
parsing game number 6 got 544 examples
parsing game number 7 got 653 examples
parsing game number 8 got 701 examples
parsing game number 9 got 760 examples
parsing game number 10 got 821 examples
parsing game number 11 got 925 examples


## run def get_dataset() using multiprocessor:


In [11]:
def get_dataset_multi_process(path, num_samples=None):
    X, Y = [], []
    game_num = 0
    progress = []
    with open(path) as pgn:
        while True:
            try:
                game = chess.pgn.read_game(pgn)
            except Exception:
                break

            if game is None:
                break

            print("parsing game number %d got %d examples" % (game_num, len(X)))

            value = {"1-0": 1, "0-1": -1, "1/2-1/2": 0}[game.headers['Result']]
            board = game.board()

            for i, move in enumerate(game.mainline_moves()):
                board.push(move)
                ser = State(board).serialize()[:, :, 0]
                X.append(ser)
                Y.append(value)

            if num_samples is not None and len(X) > num_samples:
                return X, Y
            
            progress.append(game_num)
            game_num += 1

    return X, Y, progress


In [12]:
# files = [os.path.join("data", f) for f in os.listdir("data")]
# files = files[:2]   # only first 2 files

# print(len(files))
# # Normal single process run
# start = time.time()
# X, Y = [], []
# for f in files:
#     x, y = get_dataset_multi_process(f)
#     X.extend(x)
#     Y.extend(y)

# end = time.time()
# print("Single-process time:", end - start, "seconds")


In [None]:


files = [os.path.join("data", f) for f in os.listdir("data")]
files = files[:2]   # only first 2 files


# Multiprocessing timing
start = time.time()
processes = min(len(files), os.cpu_count())
with Pool(processes=processes) as pool:
    # results = pool.map(get_dataset_multi_process, files)
    results = list(tqdm(pool.imap(get_dataset_multi_process, files), total=len(files)))


X = np.concatenate([r[0] for r in results])
Y = np.concatenate([r[1] for r in results])
end = time.time()
print("Multiprocessing time:", end - start, "seconds")



  0%|          | 0/2 [00:00<?, ?it/s]

## Stream to disk instead of keeping X/Y in RAM

In [3]:
def process_file_chunks(path, num_samples=None, chunk_size=2000):
    X_chunk, Y_chunk = [], []
    # game_num = 0
    progress = []
    with open(path) as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break

            print("parsing game number %d got %d examples" % (game_num, len(X)))

            value = {"1-0": 1, "0-1": -1, "1/2-1/2": 0}[game.headers['Result']]
            board = game.board()

            for move in game.mainline_moves():
                board.push(move)
                ser = State(board).serialize()[:, :, 0]

                X_chunk.append(ser)
                Y_chunk.append(value)

            # flush chunk
                if len(X_chunk) >= chunk_size:
                    yield np.asarray(X_chunk), np.asarray(Y_chunk)
                    X_chunk.clear()
                    Y_chunk.clear()
            
            progress.append(game_num)
            game_num += 1
     # flush remaining
    if X_chunk:
        yield np.asarray(X_chunk), np.asarray(Y_chunk)

    return X, Y, progress


In [4]:
def worker(path):
    return list(process_file_chunks(path))

In [None]:
if __name__ == "__main__":

    files = [os.path.join("data", f) for f in os.listdir("data")]
    files = files[:2]   # test with 2 files first

    processes = min(len(files), os.cpu_count())

    with Pool(processes=processes) as pool:
        results = pool.imap_unordered(worker, files)

        with h5py.File("dataset.h5", "w") as f:
            X_ds, Y_ds = None, None
            total = 0

            for file_chunks in tqdm(results, total=len(files)):
                for X_chunk, Y_chunk in file_chunks:

                    if X_ds is None:
                        # create datasets
                        X_ds = f.create_dataset(
                            "X",
                            data=X_chunk,
                            maxshape=(None,) + X_chunk.shape[1:],
                            chunks=True
                        )
                        Y_ds = f.create_dataset(
                            "Y",
                            data=Y_chunk,
                            maxshape=(None,),
                            chunks=True
                        )
                        total = len(X_chunk)
                    else:
                        # resize & append
                        X_ds.resize(total + len(X_chunk), axis=0)
                        Y_ds.resize(total + len(Y_chunk), axis=0)

                        X_ds[total:] = X_chunk
                        Y_ds[total:] = Y_chunk
                        total += len(X_chunk)

    print("Finished. Total samples:", total)


  0%|          | 0/2 [00:00<?, ?it/s]