In [1]:
import re
import chess
import json
import pandas as pd

from tqdm import tqdm
from itertools import chain, groupby

In [2]:
board = {
'wr-L': 'a1', 'wn-L': 'b1', 'wb-L': 'c1', 'wq':   'd1', 'wk':   'e1', 'wb-R': 'f1', 'wn-R': 'g1', 'wr-R': 'h1',
'wp-a': 'a2', 'wp-b': 'b2', 'wp-c': 'c2', 'wp-d': 'd2', 'wp-e': 'e2', 'wp-f': 'f2', 'wp-g': 'g2', 'wp-h': 'h2',
'bp-a': 'a7', 'bp-b': 'b7', 'bp-c': 'c7', 'bp-d': 'd7', 'bp-e': 'e7', 'bp-f': 'f7', 'bp-g': 'g7', 'bp-h': 'h7',
'br-L': 'a8', 'bn-L': 'b8', 'bb-L': 'c8', 'bq':   'd8', 'bk':   'e8', 'bb-R': 'f8', 'bn-R': 'g8', 'br-R': 'h8'
}

In [3]:
def stringify(board):
    s = [['.' for col in range(8)] for row in range(8)]
    for piece, pos in board.items():
        row = int(pos[1]) - 1
        col = ord(pos[0]) - ord('a')
        c = piece[1]
        if piece[0] == 'w':
            c = c.upper()
        s[row][col] = c
    return '\n'.join(' '.join(l) for l in s[::-1])

In [4]:
def get_piece(board, pos):
    for piece, p in board.items():
        if p == pos:
            return piece

def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
        
def get_differences(s1, s2):
    state1 = ''.join(s1.split())
    state2 = ''.join(s2.split())
    pos = []
    diffs = []
    for i in range(8):
        row = 8 - i
        for j, col in enumerate(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']):
            if state1[i*8 + j] != state2[i*8 + j]:
                pos.append(f'{col}{row}')
                diffs.append([state1[i*8+j], state2[i*8+j]])
    return pos, diffs

def extract_states_transitions(moves):
    chessboard = board.copy()
    engine = chess.Board()
    
    old_engine = str(engine)
    states = [board.copy()]
    transitions = {k: dict() for k in board.keys()}
    promotions = 0
    for turn, san in enumerate(moves):
        move = str(engine.push_san(san))
        
        new_engine = str(engine)
        positions, diffs = get_differences(old_engine, new_engine)
        pieces = [get_piece(chessboard, pos) for pos in positions]
        for i, (prev, new) in enumerate(diffs):
            if new == '.': # the current piece left this place
                for j, (p, n) in enumerate(diffs):
                    if prev == n:
                        if pieces[i] in board:
                            transitions[pieces[i]][turn] = [chessboard[pieces[i]], positions[j]]
                        chessboard[pieces[i]] = positions[j]
                        break
                else:
                    if pieces[i] in board:
                        transitions[pieces[i]][turn] = [chessboard[pieces[i]]]
                    del chessboard[pieces[i]]
            elif prev == '.' or move[-1] == new.lower(): # the current piece arrived here, might be promotion
                for j, (p, n) in enumerate(diffs):
                    if new == p:
                        break
                else:
                    promotions += 1
                    name = f'w{new.lower()}-{promotions}' if new.isupper() else f'b{new}-{promotions}'
                    piece = get_piece(chessboard, positions[i])
                    if piece is not None:
                        if piece in board:
                            transitions[piece][turn] = [chessboard[piece]]
                        del chessboard[piece]
                    chessboard[name] = positions[i]
            else: # the current piece was captured
                if pieces[i] in board:
                    transitions[pieces[i]][turn] = [chessboard[pieces[i]]]
                del chessboard[pieces[i]]

        for k1, v1 in chessboard.items():
            for k2, v2 in chessboard.items():
                if k1 != k2 and v1 == v2:
                    print("Two pieces on the same position", v1, ":", k1, "and", k2)
                    print(positions, diffs, pieces, sep='\n')
                    print(stringify(chessboard))
                    print('------')
                    print(chessboard)
                    break
            
        if stringify(chessboard) != str(new_engine):
            print(turn, "Error at", move, san)
            print(moves)
            print(positions, diffs, pieces, sep='\n')
            print("Expected:")
            print(new_engine)
            print('------')
            print("Found:")
            print(stringify(chessboard))
            print('------')
            print(chessboard)
            return
            
        old_engine = new_engine
        states.append(chessboard.copy())
    return states, transitions

In [5]:
games = pd.read_csv('games.csv')

## Extracting the "flows"

In [6]:
flows = {k: [] for k in board.keys()}
for _, row in tqdm(games.iterrows(), total=len(games)):
    transitions = extract_states_transitions(row.moves.split(" "))[1]
    elo = int(0.5 * row.white_rating + 0.5 * row.black_rating)
    for piece in flows.keys():
        flows[piece].append({'ELO': elo, 'positions': transitions[piece]})

100%|██████████| 20058/20058 [03:46<00:00, 88.74it/s] 


In [7]:
with open('data/flows.json', 'w') as f:
    json.dump(flows, f, indent=1)

## Extracting the openings

In [8]:
def remove_variation(x):
    return re.split("(:|\|| #)", x)[0]

In [9]:
games['opening'] = games.opening_name.apply(lambda x: remove_variation(x))
games['elo'] = (0.5*games.white_rating + 0.5*games.black_rating).apply(lambda x: [int(x)])
games['winner_white'] = games.winner.apply(lambda x: 1 if x == 'white' else 0)
games['winner_black'] = games.winner.apply(lambda x: 1 if x == 'black' else 0)
games['draw'] = games.winner.apply(lambda x: 1 if x == 'draw' else 0)
games['opening_moves'] = games.apply(lambda x: x.moves.split(' ')[0: x.opening_ply], axis = 1)
games['nb_games'] = 1

In [10]:
openings = games.groupby('opening').agg({
    'elo': 'sum',
    'winner_white' : 'sum',
    'winner_black' : 'sum',
    'draw' : 'sum', 
    'opening_moves': 'min',
    'nb_games': 'sum'
})

In [11]:
for col in ['winner_white', 'winner_black', 'draw']:
    openings[col] = openings.apply(lambda row: round(100 * row[col] / row.nb_games, 2), axis=1)

In [13]:
openings['states'] = openings.opening_moves.apply(lambda x: extract_states_transitions(x)[1])
openings.drop(columns='opening_moves', inplace=True)

In [14]:
openings.to_json('data/openings.json', orient='index', indent=1)