# Testbench Experiments
### Testbench
- Given some pipeline, and amount of data to train on?
- Fit pipeline
- Then read some 'test' games from pgn and predict elos using pipeline
- Then calculate the r2 of the predictions
- Output a 'report' file containing relevant info: description of the pipeline (named_steps), number of games used to train on, number of (unseen) games tested on, r2 score.

### Pipeline
- The models must be part of a pipeline, containing a series of transforms to extract features / scale data followed by an estimator (i.e. regression model)
- The input to the pipelines here will be a list of chess.Games X and a corresponding list of Elo ratings y

In [9]:
import chess
import chess.pgn
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
import numpy as np
import time
import json
from sklearn.neighbors import KNeighborsRegressor

In [10]:

#TRAIN_FILE = '../data/train_50k.pgn'
TRAIN_FILE = '../data/std_train_big.clean.pgn'
#TEST_FILE = '../data/test_10k.pgn'
TEST_FILE = '../data/std_test_small.clean.pgn'

def test(pipe, train_count, test_count, filename, description=None):
    train_pgn = open(TRAIN_FILE)
    X_train = []
    y_train = []
    for i in range(train_count):
        game = chess.pgn.read_game(train_pgn)
        X_train.append(game)
        y_train.append([int(game.headers['WhiteElo']),int(game.headers['BlackElo'])])

    fit_start = time.time()
    pipe.fit(X_train, y_train)
    fit_end = time.time()
    fit_time = (fit_end - fit_start)
    
    test_pgn = open(TEST_FILE)
    X_test = []
    y_test = []
    for i in range(test_count):
        game = chess.pgn.read_game(train_pgn)
        X_test.append(game)
        y_test.append([int(game.headers['WhiteElo']),int(game.headers['BlackElo'])])
    
    pred_start = time.time()
    y_pred = pipe.predict(X_test)
    pred_end = time.time()
    pred_time = (pred_end - pred_start)

    R2 = r2_score(y_test, y_pred)
    MSE = mean_squared_error(y_test, y_pred)
    
    results = {
        'Description': description,
        'Pipeline': str(pipe.named_steps),
        '# Games for training': train_count,
        '# Games for testing': test_count,
        'Fit time': fit_time,
        'Predict time': pred_time,
        'R2 score': R2,
        'MSE': MSE
    }
    print(results)
    with open(f'../reports/{filename}.json', 'w') as file:
        json.dump(results, file)

### Example
- Making & testing a pipeline with game2vec for feature extraction and knn regressor as estimator

In [12]:
# From chess_utils.py

def board_to_vec(board):
    '''
        Given a chess.Board return a vector of length 64
        representing the piece / lack of piece at a given square.
    '''
    vec = np.zeros((64), dtype=int)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece is not None:
            if piece.color == chess.WHITE:
                vec[square] = piece.piece_type
            else:
                vec[square] = -1 * piece.piece_type
    return vec

def game_to_vec(game, moves_limit):
    '''
    Given a chess.Game, return a concatenation of board states
    represented as vectors, as generated by board_to_vec()
    '''
    board = game.board()
    game_as_vec = np.zeros((64 * moves_limit))
    i = 0
    for move in game.mainline_moves():
        if i >= moves_limit:
            break
        board.push(move)
        game_as_vec[(64*i):(64*(i+1))] = board_to_vec(board)
        i += 1
    return game_as_vec

# From knn_model.py
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.neighbors import KNeighborsRegressor

def games_to_opening_vecs(games):
    return np.array(list(map(lambda game: game_to_vec(game, 35), games)))

knn_model = KNeighborsRegressor(n_neighbors = 12, weights = 'uniform', metric='hamming')
knn_pipe = Pipeline([
    ('Game to vec', FunctionTransformer(games_to_opening_vecs)),
    ('kNN', knn_model)
     ])

#testbench(knn_pipe, 1000, 100, 'knn_report', 'KNN model with hamming window metric gets good performance with m=35, k=12')

In [24]:
from sklearn.preprocessing import OneHotEncoder
import pickle

GAMES_LIMIT = 10000

def game_to_movetext(game, move_limit=-1):
    '''
    Returns a list of the moves of chess.Game 'game' as strings in
    Standard Algebraic Notation (https://en.wikipedia.org/wiki/Algebraic_notation_(chess))
    '''
    game_string = str(game.mainline())
    move_strings = game_string.split('. ')[1:move_limit]
    move_strings = list(map(lambda s: s.rsplit(' ', 1)[0], move_strings))
    flattened_move_strings = [move for sublist in move_strings for move in sublist]
    return flattened_move_strings

def fit_onehot_encoder():
    '''
    Fits an sklearn OneHotEncoder to moves from GAMES_LIMIT
    number of games. Saves encoder in file 'encoder'
    '''
    all_movetext = []
    pgn = open(TRAIN_FILE)
    for i in range(GAMES_LIMIT):
        game = chess.pgn.read_game(pgn)
        movetext = game_to_movetext(game)
        all_movetext = all_movetext + movetext
    all_movetext = np.array(all_movetext).reshape(-1, 1)
    encoder = OneHotEncoder(handle_unknown='ignore').fit(all_movetext)
    with open('encoder', 'wb') as f:
        pickle.dump(encoder, f)

fit_onehot_encoder()

In [18]:
from sklearn.decomposition import TruncatedSVD
from sklearn.decomposition import PCA

MOVES_LIMIT = 20

def games_to_text(games):
    '''
    Converts list of chess.Game to one-hot encodings of PGN movetext
    '''
    encoded_games = []
    f = open('old_encoder', 'rb')
    encoder = pickle.load(f)
    for game in games:
        movetext = game_to_movetext(game, MOVES_LIMIT)
        movetext = np.array(movetext).reshape(-1, 1)
        if not len(movetext):
            continue
        # encode game
        encoding = encoder.transform(movetext).toarray()
        encoding = np.rot90(encoding, axes=(0, 1))
        # compress encoding
        pca = PCA(n_components=MOVES_LIMIT)
        compressed_encoding = pca.fit_transform(encoding)
        compressed_encoding = np.rot90(compressed_encoding, axes=(1, 0))
        encoded_games.append(compressed_encoding.flatten())
    return encoded_games

knn_model = KNeighborsRegressor(n_neighbors = 12, weights = 'uniform', metric='hamming')
text_knn_pipe = Pipeline([
    ('Game to text', FunctionTransformer(games_to_text)),
    ('kNN', knn_model)
     ])

test(text_knn_pipe, 10000, 1000, 'text_knn_report')

{'Description': None, 'Pipeline': "{'Game to text': FunctionTransformer(accept_sparse=False, check_inverse=True,\n                    func=<function games_to_text at 0x1a18487f80>,\n                    inv_kw_args=None, inverse_func=None, kw_args=None,\n                    validate=False), 'kNN': KNeighborsRegressor(algorithm='auto', leaf_size=30, metric='hamming',\n                    metric_params=None, n_jobs=None, n_neighbors=12, p=2,\n                    weights='uniform')}", '# Games for training': 1000, '# Games for testing': 100, 'Fit time': 10.116522789001465, 'Predict time': 1.3074209690093994, 'R2 score': -0.11890096504165826, 'MSE': 82864.07444444444}
