## Training a specialised model (CNN+RNN) for Othello/Reversi

This notebook presents a new approach to estimate the next move to play in a game of Othello using Supervised Learning. The datasets come from the [Fédération Française d'Othello](https://www.ffothello.org/informatique/la-base-wthor/). The model input is on one hand the board state which will feed in a Convolutional Neural Network (CNN) and on the other hand the history of the game which will feed in a Recurrent Neural Network (RNN). The output of the model is the next move to play. We approach the task as a classification problem with a new type of kernel for the CNN : star-shaped kernel.


### Data Handling

In [1]:
import struct   # for reading the .wtb files
import os       # for file/path/directories,...  handling
import pickle   # for saving/loading the data

import cv2      # display

#### Extracting data from the WThor database

Some functions were taken or modified from the [dnnothello repo](https://github.com/wjaskowski/dnnothello/blob/master/games/othello_data.py)

The header of a .wthor file is 16 bytes long and contains the following fields:
- 1 byte: century of the file's creation
- 1 byte: year of the file's creation
- 1 byte: month of the file's creation
- 1 byte: day of the file's creation
- 4 bytes (int): number of games in the file ($\leq$ 2 147 483 648)
- 2 bytes (short): 0 here (but for other type of files : number of players, tournaments, or number of empty squares in the board ($\leq$ 65 535))
- 1 byte: year of the games
- 1 byte: size of the board {0: 8x8, 8: 8x8, 10: 10x10}
- 1 byte: 0 here the games type (1 if "solitaire", 0 otherwise)
- 1 byte: the games depth
- 1 byte: reserved

The games are stored in the file in the following format:
- 2 bytes (short): label of the tournament
- 2 bytes (short): id number of the black player
- 2 bytes (short): id number of the white player
- 1 byte: true score of the black player
- 1 byte: theoretic score of the black player

And then each move is stored as a 60 byte long record (list of moves).

In [2]:
BOARD_SIZE = 8

HEADER_LENGTH = 16
HEADER_FORMAT = "<BBBBIHHBBBB"  # Byte, Byte, Byte, Byte, Int, Short, Short, Byte, Byte, Byte, (Reserved) Byte

GAME_INFO_LENGTH = 8    
GAME_INFO_FORMAT = "<HHHBB"     # Short, Short, Short, Byte, Byte

MOVES_LENGTH = 60
MOVES_FORMAT = "<" + "B"*MOVES_LENGTH

POSSIBLE_SIZE = [0, 8]

def read_all_wtb_files(directory):
    """Generator to read all .wtb files in a directory."""
    for file_name in os.listdir(directory):
        if file_name.endswith(".wtb"):
            yield from read_wtb(os.path.join(directory, file_name))

def read_wtb(file_path):
    """Generator to read a .wtb file and yield game information and played moves."""
    with open(file_path, 'rb') as f:
        header = struct.unpack(HEADER_FORMAT, f.read(HEADER_LENGTH))
        assert header[7] in POSSIBLE_SIZE   # Check the board size
        
        for _ in range(header[4]):  # Number of games
            game_info = struct.unpack(GAME_INFO_FORMAT, f.read(GAME_INFO_LENGTH))
            played_moves = struct.unpack(MOVES_FORMAT, f.read(MOVES_LENGTH))
            yield game_info[3], played_moves    # Black player true score, moves

In [3]:
reader = read_wtb('../data/raw/WTH_2001.wtb')
print(next(reader))

full_reader = read_all_wtb_files('../data/raw')
print(next(full_reader))

(11, (56, 64, 53, 46, 35, 63, 34, 66, 65, 74, 37, 43, 57, 33, 76, 24, 75, 26, 83, 36, 73, 38, 25, 16, 14, 15, 17, 47, 13, 68, 48, 58, 52, 28, 67, 23, 12, 61, 32, 42, 31, 86, 51, 41, 27, 84, 85, 82, 71, 18, 72, 11, 21, 22, 62, 81, 77, 78, 88, 87))
(34, (56, 64, 33, 36, 46, 34, 43, 67, 66, 65, 53, 63, 74, 84, 75, 57, 35, 24, 47, 38, 76, 52, 58, 37, 42, 62, 83, 82, 73, 85, 86, 87, 48, 68, 25, 14, 13, 31, 61, 51, 15, 26, 77, 23, 41, 88, 21, 72, 16, 32, 12, 22, 78, 71, 81, 11, 17, 27, 28, 18))


In [4]:
from utils import *     # Functions from the default project

In [5]:
def decode_game(moves):
    """Decode moves played in a game from the 0-63 representation to the bitboard representation."""
    own, enemy = init()
    node = Node(None, own, enemy, -1, BOARD_SIZE)
    for move in moves:
        if move == 0:
            break
        node.expand() # Generate the possible moves
        x, y = decode_move(move)
        move = set_state(0, x, y, BOARD_SIZE)
        
        if move not in node.moves: # then it means it is a pass and the other player plays of it is the end of the game
            node.invert()
            node.expand()
            if move in node.moves:
                node = node.set_child(move)
            else:
                node.set_child(node.moves[0])
        else:
            node = node.set_child(move)
    return node

            
def decode_move(move):
    """Decode a move from the 0-63 representation to the (x, y) representation."""
    return move // 10 - 1, move % 10 - 1

In [6]:
true_score, game_moves = next(full_reader)
print(f"Expected score: {true_score}")
first_game = decode_game(game_moves)
# replay(first_game, BOARD_SIZE)
print(f"Score : {cell_count(first_game.own_pieces), cell_count(first_game.enemy_pieces)}")
while true_score in [cell_count(first_game.own_pieces), cell_count(first_game.enemy_pieces)]:
    true_score, game_moves = next(full_reader)
    first_game = decode_game(game_moves)
print(f"Expected score: {true_score}")
print(f"Score : {cell_count(first_game.own_pieces), cell_count(first_game.enemy_pieces)}")
# replay(first_game, BOARD_SIZE)

Expected score: 52
Score : (12, 52)
Expected score: 64
Score : (0, 63)


True score is the number of pieces of the black player + the empty ones.

In [7]:
from tqdm import tqdm

def dump_data(input_dir, output_dir, name):
    """Dump the data from the .wtb files in a pickle file."""
    data = []
    data_reader = read_all_wtb_files(input_dir)
    for i, (score, moves) in tqdm(enumerate(data_reader)):
        game = decode_game(moves)
        move_list = replay(game, BOARD_SIZE, False)
        data.append((score, move_list))
        with open(f"{output_dir}/{name}_{i}.pkl", 'wb') as f:
            pickle.dump(data, f)

In [10]:
dump_data('../data/raw', '../data/processed/', 'game')

1328it [08:15,  1.36it/s]

In [None]:
def load_data(file_path, bound=16):
    """Load the data from a pickle file."""
    data = []
    for file_name in os.listdir(file_path):
        if len(data) > bound:
                break
        with open(os.path.join(file_path, file_name), 'rb') as f:
            data += pickle.load(f)
    return data

In [None]:
loaded_data = load_data('../data/processed/')
def test_loaded():
    print(len(loaded_data))
    print(len(loaded_data[0]))
    for score, game_nodes in loaded_data:
        print(score)
        for game_node in game_nodes:
            if not game_node.moves:
                game_node.moves = generate_moves(game_node.own_pieces, game_node.enemy_pieces, BOARD_SIZE)[0]
            print(game_node)
            print(game_node.moves)
            print(game_node.value)
            cv2_display(BOARD_SIZE, game_node.own_pieces, game_node.enemy_pieces, game_node.moves, game_node.turn, display_only=True)
            answer = input("Continue ?")
            if answer == 'n':
                return
    cv2.destroyAllWindows()
test_loaded()

1
2


#### Data Preprocessing
Now we want to remove duplicates, add symmetries, and consider the games as the black player's perspective (if white win, we invert the board).

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from typing import Dict, List, Tuple
import logging

# Dataset class for Othello (credits to https://github.com/zatomos for coming up with this absolute masterpiece of a name)
class Othelload(Dataset):
    def __int__(self, file_list: np.ndarray, transform, loggers: Dict[str, logging.Logger], nb_game_by_file=1000, duplicates=True, symmetries=True):
        self.file_list = file_list
        self.nb_games_by_file = nb_game_by_file
        self.transform = transform
        self.duplicates = duplicates
        self.symmetries = symmetries
        self.loggers = loggers
        
    def __len__(self):
        return len(self.file_list * self.nb_games_by_file)
    
    def __getitem__(self, index):
        try:
            file_path = self.file_list[index]
            with open(file_path, 'r') as f:
                sample = pickle.load(f)
            if not self.duplicates:
                self.loggers["info"].info(f"N° of games before removing duplicates : {len(sample)}")
                sample = self.remove_duplicates(sample)
                self.loggers["info"].info(f"N° of games after removing duplicates : {len(sample)}")
            if self.symmetries:
                sample = self.add_symetries(sample)
            label = self.generate_labels(sample)
            return sample, label
        except Exception as e:
            self.loggers["error"].error(f"Error while generating pair (sample, label) \n{e}")
            raise e
    
    def remove_duplicates(self, sample: List[Tuple[int, List[Node]]]):
        """Remove duplicates from the sample."""
        return list(set(sample))
    
    def add_symmetries(self, sample: List[Tuple[int, List[Node]]]):
        raise NotImplementedError

ModuleNotFoundError: No module named 'torch'