In [1]:
import chess.pgn
import chess.engine
import os
import numpy as np
import pandas as pd
import requests
import json
import io
import re
import time
import warnings
from typing import Optional
from chess.engine import Cp, Mate, MateGiven
import xgboost as xgb
from xgboost import XGBRegressor

In [2]:
engine = chess.engine.SimpleEngine.popen_uci("engines/stockfish-7-win/stockfish-7-win/Windows/stockfish 7 x64 bmi2")
#choose engine

In [4]:
def getRatings(game, color=1):
    """
    Input: game and color point of view
    Output: List of (rating, opponent's rating)
    """
    if color == 1:
        return(int(game.headers['WhiteElo']), int(game.headers['BlackElo']))
    else:
        return(int(game.headers['BlackElo']), int(game.headers['WhiteElo']))

In [5]:
"""
Helper functions to create bit boards. Taken from 
https://towardsdatascience.com/creating-a-chess-algorithm-using-deep-learning-and-monte-carlo-methods-d7dabd275e63
"""

chess_dict = {
    'p' : [1,0,0,0,0,0,0,0,0,0,0,0],
    'P' : [0,0,0,0,0,0,1,0,0,0,0,0],
    'n' : [0,1,0,0,0,0,0,0,0,0,0,0],
    'N' : [0,0,0,0,0,0,0,1,0,0,0,0],
    'b' : [0,0,1,0,0,0,0,0,0,0,0,0],
    'B' : [0,0,0,0,0,0,0,0,1,0,0,0],
    'r' : [0,0,0,1,0,0,0,0,0,0,0,0],
    'R' : [0,0,0,0,0,0,0,0,0,1,0,0],
    'q' : [0,0,0,0,1,0,0,0,0,0,0,0],
    'Q' : [0,0,0,0,0,0,0,0,0,0,1,0],
    'k' : [0,0,0,0,0,1,0,0,0,0,0,0],
    'K' : [0,0,0,0,0,0,0,0,0,0,0,1],
    '.' : [0,0,0,0,0,0,0,0,0,0,0,0],
}
def make_matrix(board): 
    pgn = board.epd()
    foo = []  
    pieces = pgn.split(" ", 1)[0]
    rows = pieces.split("/")
    for row in rows:
        foo2 = []  
        for thing in row:
            if thing.isdigit():
                for i in range(0, int(thing)):
                    foo2.append('.')
            else:
                foo2.append(thing)
        foo.append(foo2)
    return foo
def translate(matrix,chess_dict):
    rows = []
    for row in matrix:
        terms = []
        for term in row:
            terms.append(chess_dict[term])
        rows.append(terms)
    return rows


In [6]:
def getBitBoard(fen):
    """
    Input: single fen of a position
    Output: numpy array of 768 bits representing the position. Can alternatively use 8x8x12 array without flattening.
    """
    board = chess.Board(fen)
    matrix = make_matrix(board.copy())
    translated = translate(matrix, chess_dict)
    flattened = np.array(translated).flatten()
    return flattened

In [8]:
def getBoards(game, color=1):
    """
    Input: game and color point of view
    Output: list of numpy arrays of bit boards of all the positions before the point of view moves. At the moment,
    from move 9 to 100, inclusive.
    """
    allboards = []
    boards = []
    board = chess.Board()
    for move in game.mainline_moves():
        allboards.append(getBitBoard(board.epd()))
        board.push(move)
    
    if color == 1:
        for i in allboards[::2]:
            boards.append(i)
    else:
        for i in allboards[1::2]:
            boards.append(i)
    
    return(boards[8:100])
            
        

In [9]:
def isInc(game):
    """
    Input: game
    Output: True if there's increment, False if not
    """
    
    if not hasattr(game, 'headers'):
        return np.nan
    
    if 'TimeControl' not in game.headers:
        return np.nan
    
    tc = game.headers['TimeControl'] #has format 180 or 180+2, for example
    
    regex = re.compile(r"""(\d+)\+?(\d*)""") #search regex xxx or xxx+xxx
    match = regex.search(tc)
    
    if match is None:
        return False
    
    if match.group(2) != '':  #second matching group is everything after the +
        return True
    
    else:
        return False

In [10]:
def getTime(move):
        """
        Input: move of game as a string
        Output: number of seconds remaining on the player's clock after making the move
        """
        regex = re.compile(r"""\[%clk\s(\d+):(\d+):(\d+).?(\d*)\]""") #searches for [%clk ...]
        match = regex.search(move)
        
        if match is None:
            return -1 #-1 move time means error
        
        if not match.group(4): #if there is no decimal
            return (int(match.group(1)) * 3600 + int(match.group(2)) * 60 + int(match.group(3)))
        
        return int(match.group(1)) * 3600 + int(match.group(2)) * 60 + int(match.group(3)) + int(match.group(4)) * 0.1

In [11]:
def getTimeData(game, color=1):
    """
    Input: game and color point of view
    Output: list of seconds taken for each move of the point of view color, and the time remaining before the move is played
    from move 9 to 100, inclusive
    """
    allTimes = []
    moveTimes = []
    timeLeft = []
    
    fulltc = game.headers['TimeControl']
    regex = re.compile(r"""(\d+)\+?(\d*)""") #checking for inc
    match = regex.search(fulltc)
    
    if match is None: 
        return [], []
    
    tc = int(match.group(1)) #base time control, ie 180 in 180+2
    
    allTimes.append(tc)
    allTimes.append(tc) #both white and black start at base tc
    
    if match.group(2) != '':     #getting inc value
        inc = int(match.group(2))
    else:
        inc = 0
            
    for move in game.mainline():
        allTimes.append(getTime(str(move)))  #getting all time values after the moves are played
            
    if any(x < 0 for x in allTimes):
        return [], []
        
    if len(allTimes) == 2:    #no moves have been played so no movetimes
        return [], []
    
    if color == 1:
        for i in range(len(allTimes))[2::2]: #for all white times after moves starting move 1
            moveTimes.append(allTimes[i-2]-allTimes[i]+inc) 
            timeLeft.append(allTimes[i-2])
        
    else:
        if len(allTimes) == 3:   #only one white move has been played- no black moves
            return [], []
        for i in range(len(allTimes))[3::2]:  #for all black times after moves starting move 1
            moveTimes.append(allTimes[i-2]-allTimes[i]+inc)
            timeLeft.append(allTimes[i-2])
            
    moveTimesGame = moveTimes[8:100]
            
    if any(x <= 0 for x in moveTimesGame): #move times should not be neg or 0
        return [], []

    
    
    return(moveTimesGame, timeLeft[8:100])
    

In [12]:
def analyzeGame(game, ply=14, pvs=5, color=1):
    """
    Input: game, depth to analyze until, number of top moves to analyze for color to play, and color
    Output: analyzed game
    """
    
    

In [13]:
def getCPLs(game, ply, color=1): #placeholder for real feature, returns list of centipawn losses- 300 if more than 300
    """
    Input: game, color point of view, and depth to analyze until
    Output: list of centipawn losses per move for point of view player
    """
    cpls = []
    evals = []    
    board = game.board()
    
    evals.append(engine.analyse(board, chess.engine.Limit(depth=ply))['score'].white().score(mate_score=100000))#starting pos
    for move in game.mainline_moves():
        board.push(move)
        evals.append(engine.analyse(board, chess.engine.Limit(depth=ply))['score'].white().score(mate_score=100000))
        
    if color == 1:
        for i in range(len(evals))[1::2]:
            num = evals[i-1]-evals[i]
            if num < 0:
                num = 0
            if num > 300:
                num = 300
            cpls.append(num)
    else:
        for i in range(len(evals))[2::2]:
            num = evals[i]-evals[i-1]
            if num < 0:
                num = 0
            if num > 300:
                num = 300
            cpls.append(num)

    return(cpls[8:100])

In [14]:
def getWC(eval, color=1):
    """
    Input: engine eval in centipawns
    Output: value for winning chances
    """
    np.seterr(all="ignore")
    
    wc = 1 / (1 + np.exp(-0.008 * eval))
    
    if color == 1:
        return wc
    else:
        return 1-wc
        

In [15]:
def getWCLs(game, ply, color=1):
    """
    Input: game, color point of view, and depth to analyze to
    Output: list of winning chance losses per move for point of view player
    """
    wcs = []
    wcls = []
    board = game.board()
    scores = [] #delete later
    
    score = engine.analyse(board, chess.engine.Limit(depth=ply))['score'].white().score(mate_score=100000)
    wcs.append(getWC(score, color=color))
    scores.append(score)
    for move in game.mainline_moves():
        board.push(move)
        score = engine.analyse(board, chess.engine.Limit(depth=ply))['score'].white().score(mate_score=100000)
        scores.append(score)
        wcs.append(getWC(score, color=color))
        
    if color == 1:
        for i in range(len(wcs))[1::2]:
            num = wcs[i-1]-wcs[i]
            if num < 0:
                num = 0
            wcls.append(num)
    else:
        for i in range(len(wcs))[2::2]:
            num = wcs[i-1]-wcs[i]
            if num < 0:
                num = 0
            wcls.append(num)
            
    
    return wcls[8:100]

In [16]:
def getUsername(game, color=1): #used for debugging
    """
    Input: game, color point of view
    Output: username
    """
    if color == 1:
        return(game.headers['White'])
    else:
        return(game.headers['Black'])

In [17]:
def getGameID(game):
    """
    Input: game
    Output: game ID
    """
    regex = re.compile(r"""https:\/\/www\.chess.com\/game\/live\/(\d+)""") #searches for https://chess.com/game/live/...
    match = regex.search(game.headers['Link'])
        
    if match is None:
        return -1 #-1 move time means error
        
    return match.group(1)

In [2]:
def getGameData(game, color=1):
    """
    Input: PGN and color point of view
    Output: full game analysis
    """
    
    if hasattr(game, 'headers') and 'TimeControl' in game.headers:
        if 'SetUp' in game.headers:
            return None
        
        username = getUsername(game=game, color=color)
        rating, opprating = getRatings(game=game, color=color)
        inc = isInc(game=game)
        movetimes, timeleft = getTimeData(game=game, color=color)
        wcls14 = getWCLs(game=game, ply=14, color=color)
        boards = getBoards(game=game, color=color)
        gameID = getGameID(game=game)
    
        if movetimes == []:
            return None
    
        n = len(movetimes) #num of moves using for game

        usernames = [username]*n
        ratings = [rating]*n
        oppratings = [opprating]*n
        incs = [inc] * n
        gameIDs = [gameID] * n

        featurelist = zip(gameIDs, usernames, ratings, oppratings, incs, movetimes, timeleft, wcls14)
        featurelist = list(featurelist)

        gameData = pd.DataFrame(featurelist)

        gameData.columns = ('ID', 'username', 'rating', 'opprating', 'inc', 'movetime', 'timeleft', 'wcl14')
        gameData = gameData.dropna(axis=1) #remove irrelevant columns
        gameData = pd.concat([gameData, pd.DataFrame(boards)], axis=1)
        #can modify above to pd.concat all dataframes- can also modify indiv functions to return dataframes instead of lists
    
    else:
        wcls14 = getWCLs(game=game, ply=14, color=color)
        boards = getBoards(game=game, color=color)
    
        n = len(wcls14) #num of moves using for game


        featurelist = zip(wcls14)
        featurelist = list(featurelist)

        gameData = pd.DataFrame(featurelist)

        gameData.columns = ('wcl14')
        gameData = pd.concat([gameData, pd.DataFrame(boards)], axis=1)
    
    return(gameData)

In [None]:
string = '1. e4 g6 2. d3 Bg7 3. a3 f6 4. Nc3 Nh6 5. f3 Nf7 6. Be3 O-O 7. Qd2 d6 8. O-O-O Nc6 9. h4 h6 10. g4 e5 11. g5 fxg5 12. hxg5 h5 13. Be2 Be6 14. f4 exf4 15. Bxf4 Nfe5 16. Bxh5 gxh5 17. Rxh5 Bg4 18. Qh2 Bxh5 19. Bxe5 Qxg5+ 20. Kb1 Bxd1 21. Bxg7 Qxg7 22. Nxd1 Rf1 0-1'
pgn = io.StringIO(string)
testgame = chess.pgn.read_game(pgn)


In [27]:
def getUserData(username):
    """
    Input: chess.com username
    Output: Full game analysis of the first blitz game played in month specified, if any
    """
    url = 'https://api.chess.com/pub/player/' + username + '/games/2021/01' #all games played in jan 2021
    response = requests.get(url) #get stuff on url
    data = response.text #translate stuff to text
    parsed = json.loads(data) #make text usable- json format
    
    if 'games' not in parsed:
        return None
    
    if parsed['games'] == []:
        return None
    
    if parsed['games'][0]['rules'] != 'chess':
        return None
    
    if parsed['games'][0]['time_class'] != 'blitz':
        return None
    
    if 'pgn' not in parsed['games'][0]:
        return None
    
    pgn = io.StringIO(parsed['games'][0]['pgn']) #games section of the text on webpage, the first pgn
    game = chess.pgn.read_game(pgn) #convert to usable pgn
        
    if game.headers['White'] == username: #if the user specified is white, color = 1 
        color = 1
    else:
        color = 0
    
    gameData = getGameData(game=game, color=color)
    
    return(gameData)

In [28]:
def getData(usernames):
    """
    Input: list of usernames
    Output: dataframe with all the data
    """
    data = pd.DataFrame()
    i=1
    for name in usernames:
        data = pd.concat([data, getUserData(name)])
        print(i)
        i = i+1
    return(data)

In [1]:
def gamePred(game, model, color=1):
    """
    Input: PGN, color point of view, prediction model to use
    Output: predicted rating
    """
    analyzedGame = getGameData(game=game, color=color)
    analyzedGame = analyzedGame.reset_index()
    modGame = analyzedGame.drop(['ID', 'username', 'rating', 'opprating', 'timeleft', 'inc', 'movetime'], axis=1)
    modGame2 = xgb.DMatrix(modGame)
    preds = model.predict(modGame2)
    pred = np.mean(preds)
    return pred
    

In [29]:
def getTitledNames():
    """
    Output: usernames of all titled players on site
    """
    titles = ['GM', 'IM', 'FM', 'CM', 'NM', 'WGM', 'WIM', 'WFM', 'WCM', 'WNM']
    titlednames = []
    for title in titles:
        url = 'https://api.chess.com/pub/titled/' + title
        response = requests.get(url)
        names = json.loads(response.text)
        titlednames = titlednames + names['players']
    return titlednames

In [30]:
def getClubNames(club):
    """
    Input: club name as shown in url
    Output: usernames of members of club
    """
    names = []
    url = 'https://api.chess.com/pub/club/' + club + '/members'
    response = requests.get(url)
    text = json.loads(response.text)
    for key in text.keys():
        for entry in text[key]:
            names.append(entry['username'])
    return names
    