In [1]:
#TODO: I want to know what all the headers possible in the db are so I can do a better job preprocessing the data
import chess.pgn # I would normally do 'from chess import pgn', but the developer examples did it this way.
import pandas as pd

print(3)
pgn = open("./chessSim/data/caissabase.pgn") # http://caissabase.co.uk/ download Scid files, export to pgn

gameData = []
while True:
    headers = chess.pgn.read_headers(pgn)
    if headers is None:
        break
    
    headerElements = [header for header in headers] #create list of meta data for each game, could be dict instead

    # if this criteria is met, the game has all the criteria we need for our model training data.
    if ('WhiteElo' in headerElements) & ('BlackElo' in headerElements) & ('Result' in headerElements):
        
        if ('Site' in headerElements) & ('INT' in headers['Site']):
            continue #skip internet games
        else:
            # append relevant data to what will become our pandas df
            dat = [headers['WhiteElo'], headers['BlackElo'], headers['Result']]
            gameData.append(dat)

df = pd.DataFrame(gameData, columns = ['whiteElo', 'blackElo', 'result'])

##Cleaning Data
df = df[df.result != '*'] # cleaning some games that didn't have a valid result recorded
df.whiteElo = df.whiteElo.astype(int) #changing type
df.blackElo = df.blackElo.astype(int)

##Feature Engineering (very simple!)
df['EloDiff'] = df.whiteElo - df.blackElo
df['EloAvg'] =((df.whiteElo + df.blackElo) / 2 ).astype(int)

# write to csv for future use
# df.to_csv('./chess-sim/data/caissabase_df.csv', index = False)
print(min(df.whiteElo))


3


In [1]:
import chess
from enum import Enum
from enum import IntFlag
import numpy as np
import tensorflow as tf

SQUARE_NB = 64


class PieceSquare(IntFlag):
    NONE = 0,
    W_PAWN = 1,
    B_PAWN = 1 * SQUARE_NB + 1
    W_KNIGHT = 2 * SQUARE_NB + 1
    B_KNIGHT = 3 * SQUARE_NB + 1
    W_BISHOP = 4 * SQUARE_NB + 1
    B_BISHOP = 5 * SQUARE_NB + 1
    W_ROOK = 6 * SQUARE_NB + 1
    B_ROOK = 7 * SQUARE_NB + 1
    W_QUEEN = 8 * SQUARE_NB + 1
    B_QUEEN = 9 * SQUARE_NB + 1
    W_KING = 10 * SQUARE_NB + 1
    END = W_KING  # pieces without kings (pawns included)
    B_KING = 11 * SQUARE_NB + 1
    END2 = 12 * SQUARE_NB + 1

    @staticmethod
    def from_piece(p: chess.Piece, is_white_pov: bool):
        return {
            chess.WHITE: {
                chess.PAWN: PieceSquare.W_PAWN,
                chess.KNIGHT: PieceSquare.W_KNIGHT,
                chess.BISHOP: PieceSquare.W_BISHOP,
                chess.ROOK: PieceSquare.W_ROOK,
                chess.QUEEN: PieceSquare.W_QUEEN,
                chess.KING: PieceSquare.W_KING
            },
            chess.BLACK: {
                chess.PAWN: PieceSquare.B_PAWN,
                chess.KNIGHT: PieceSquare.B_KNIGHT,
                chess.BISHOP: PieceSquare.B_BISHOP,
                chess.ROOK: PieceSquare.B_ROOK,
                chess.QUEEN: PieceSquare.B_QUEEN,
                chess.KING: PieceSquare.B_KING
            }
        }[p.color == is_white_pov][p.piece_type]


def orient(is_white_pov: bool, sq: int):
    # Use this one for "flip" instead of "rotate"
    # return (chess.A8 * (not is_white_pov)) ^ sq
    return (63 * (not is_white_pov)) ^ sq


def make_halfkp_index(is_white_pov: bool, king_sq: int, sq: int, p: chess.Piece):
    return orient(is_white_pov, sq) + PieceSquare.from_piece(p, is_white_pov) + PieceSquare.END * king_sq

# Returns SparseTensors
def get_halfkp_indeces(board: chess.Board):
    result = []
    for turn in [board.turn, not board.turn]:
        indices = []
        values = []
        for sq, p in board.piece_map().items():
            if p.piece_type == chess.KING:
                continue
            indices.append([make_halfkp_index(
                turn, orient(turn, board.king(turn)), sq, p)])
            values.append(1.0)
        result.append(tf.sparse.reorder(tf.sparse.SparseTensor(
            indices=indices, values=values, dense_shape=[41024])))
    return result

In [49]:
import numpy as np
import chess
import chess.pgn
import random

GAMES_PER_EPOCH = 1_000

def centipawn_to_wincp(cp_score):
  return 2 / (1 + np.math.exp(-0.004 * cp_score)) - 1

def gen(pgn_file_path):
  with open(pgn_file_path) as pgn:

    # Start from random position in the file
    pgn.seek(0, 2)
    pgn.seek(random.randint(0, pgn.tell()))
    chess.pgn.read_headers(pgn)

    while True:
      game = chess.pgn.read_game(pgn)

      if not game:
        pgn.seek(0)
        continue

      '''
      result_header = game.headers['Result']
      game_value_for_white = 0
      if result_header == '*':
        continue
      elif result_header == '1-0':
        game_value_for_white = 1
      elif result_header == '0-1':
        game_value_for_white = -1
      else:
        game_value_for_white = 0
      '''
      board = game.board()
      for node in game.mainline():
        # print(node, 'this is the node')
        board.push(node.move)
        eval = node.eval()
        if not eval:
          break
        eval = eval.pov(not board.turn).score()
        print(board, eval)
        if not eval:
          continue
        try:
          X = get_halfkp_indeces(board)
        except:
          print(f'Would have crashed: {board}')
          continue
        # y = game_value_for_white if board.turn == chess.WHITE else -game_value_for_white
        # y = eval if board.turn == chess.WHITE else -eval
        yield (X[0], X[1]), eval / 100

posGen = gen("./chessSim/data/caissabase.pgn")

In [238]:
import numpy as np
import chess
import chess.pgn
import random

engine = chess.engine.SimpleEngine.popen_uci("/usr/local/bin/stockfish")
evalDepth = 16

evalDict = {}
actionEvaluations = []

db = "./chessSim/data/caissabase.pgn"

def validGame(gamePGN):
    headerErrors = 0

    neededKeys = ['Site', 'White', 'Black', 'WhiteElo', 'BlackElo', 'Result']

    for neededKey in neededKeys:
        try:
            game.headers[neededKey]
        except KeyError:
            headerErrors +=1
    if headerErrors > 0:
        return 0
    elif 'INT' in game.headers['Site']:
        return 0
    else: return 1

with open(db) as db:
    while True:
        currPos = db.tell()
        print(currPos)
        game = chess.pgn.read_game(db)

        if not game:
            break

        if not validGame(game):
            continue

        board = game.board()
        ply = 0

        white = game.headers['White']
        black = game.headers['Black']
        whiteElo = game.headers['WhiteElo']
        blackElo = game.headers['BlackElo']

        for node in game.mainline():

            ply +=1
            if ply > 120: #Only analyze first 60 moves of games
                break

            if ply % 2 == 0:
                player = black
                playerElo = blackElo
            else:
                player = white
                playerElo = whiteElo

            legalMoves = len(list(board.legal_moves))

            if legalMoves > 2:
                
                #get current information
                startFEN = board.fen()

                if board.fen() in evalDict:
                    evaluation = evalDict[board.fen()]
                else: 
                    evaluation = engine.analyse(board, chess.engine.Limit(depth=evalDepth), multipv=3)
                    evalDict[board.fen()] = evaluation
                
                currEval = evaluation[0]['score'].relative.wdl().expectation()
                
                eval0 = currEval
                eval1 = evaluation[1]['score'].relative.wdl().expectation()
                eval2 = evaluation[2]['score'].relative.wdl().expectation()

                move0 = evaluation[0]['pv'][0].uci()
                move1 = evaluation[1]['pv'][0].uci()
                move2 = evaluation[2]['pv'][0].uci()

                shortEval = [(move0, eval0), (move1, eval1), (move2, eval2)]


                #evaluate what happened after the move was played
                board.push(node.move)

                legalMoves = len(list(board.legal_moves))

                if legalMoves > 2:

                    if board.fen() in evalDict:
                        evaluationNew = evalDict[board.fen()]
                    else: 
                        evaluationNew = engine.analyse(board, chess.engine.Limit(depth=evalDepth), multipv=3)
                        evalDict[board.fen()] = evaluationNew

                    newEval = round(1 - evaluationNew[0]['score'].relative.wdl().expectation(), 4)
                    expectationLoss = round(newEval - currEval, 4)

                    bestMove = 1 * (node.move.uci() == move0)

                    moveSummary = {
                        'player': player
                        ,'elo': playerElo
                        ,'fen': startFEN
                        ,'ply': ply
                        ,'eval': shortEval
                        ,'played': node.move.uci()
                        ,'expectationLoss': expectationLoss
                        ,'newEval': newEval
                        ,'bestMove': bestMove
                    }

                    actionEvaluations.append(moveSummary)
                    print('move done!')


            else: continue # if not multipv 3 valie, go to next move (or end loop)


engine.quit()


0
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
380
664
948
1380
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
1852
2826
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
move done!
mov

In [237]:
print(actionEvaluations)


[{'player': '?', 'elo': '2666', 'fen': 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1', 'ply': 1, 'eval': [('d2d4', 0.536), ('e2e4', 0.535), ('c2c4', 0.531)], 'played': 'e2e4', 'expectationLoss': -0.0125, 'newEval': 0.5235, 'bestMove': 0}, {'player': '?', 'elo': '2804', 'fen': 'rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq - 0 1', 'ply': 2, 'eval': [('e7e5', 0.4765), ('c7c5', 0.476), ('e7e6', 0.469)], 'played': 'c7c5', 'expectationLoss': -0.0105, 'newEval': 0.466, 'bestMove': 0}, {'player': '?', 'elo': '2666', 'fen': 'rnbqkbnr/pp1ppppp/8/2p5/4P3/8/PPPP1PPP/RNBQKBNR w KQkq - 0 2', 'ply': 3, 'eval': [('g1f3', 0.534), ('c2c3', 0.5275), ('b1c3', 0.5215)], 'played': 'g1f3', 'expectationLoss': -0.0145, 'newEval': 0.5195, 'bestMove': 1}, {'player': '?', 'elo': '2804', 'fen': 'rnbqkbnr/pp1ppppp/8/2p5/4P3/5N2/PPPP1PPP/RNBQKB1R b KQkq - 1 2', 'ply': 4, 'eval': [('b8c6', 0.4805), ('e7e6', 0.469), ('d7d6', 0.4375)], 'played': 'e7e6', 'expectationLoss': -0.03, 'newEval': 0.4505, '

In [89]:
import chess.engine
import io

engine = chess.engine.SimpleEngine.popen_uci("/usr/local/bin/stockfish")
board = chess.Board()
info = engine.analyse(board, chess.engine.Limit(time=0.1, depth = 5))
print("Score:", info["score"], info)
# Score: PovScore(Cp(+20), WHITE)

board = chess.Board("8/8/8/8/6QP/7K/8/6qk b - - 0 1")
print(board.result())
info = engine.analyse(board, chess.engine.Limit(depth=10), multipv=3)
# board.push(info["pv"][0])
print("Score:", info[0])
# print(info[1], info[2])
print(board)
# Score: PovScore(Mate(+1), WHITE)

engine.quit()

board.fen()

Score: PovScore(Cp(+49), WHITE) {'string': 'NNUE evaluation using nn-6877cd24400e.nnue enabled', 'depth': 5, 'seldepth': 6, 'multipv': 1, 'score': PovScore(Cp(+49), WHITE), 'nodes': 1449, 'nps': 362250, 'tbhits': 0, 'time': 0.004, 'pv': [Move.from_uci('e2e4'), Move.from_uci('d7d5'), Move.from_uci('e4d5'), Move.from_uci('d8d5'), Move.from_uci('d2d4')]}
*
Score: {'string': 'NNUE evaluation using nn-6877cd24400e.nnue enabled', 'depth': 10, 'seldepth': 2, 'multipv': 1, 'score': PovScore(Mate(+1), BLACK), 'nodes': 15412, 'nps': 770600, 'tbhits': 0, 'time': 0.02, 'pv': [Move.from_uci('g1h2')]}
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . Q P
. . . . . . . K
. . . . . . . .
. . . . . . q k


'8/8/8/8/6QP/7K/8/6qk b - - 0 1'

In [218]:
round(0.0001, 4)

0.0001

In [45]:
with open('./chessSim/data/caissabase.pgn') as pgn:

    # Start from random position in the file
    pgn.seek(0, 2)
    pgn.seek(random.randint(0, pgn.tell()))
    chess.pgn.read_headers(pgn)
    game = chess.pgn.read_game(pgn)
    board = game.board()
    for node in game.mainline():
        print(node)
        # print(node.move)
        node.eval()


e2e4


AttributeError: 'Move' object has no attribute 'eval'

In [19]:
next(posGen)

[Event "Zuglo Hotel Open"]
[Site "Zuglo Hotel op"]
[Date "1999.??.??"]
[Round "4"]
[White "Gladyszev, Oleg"]
[Black "Juracsik, Jozsef"]
[Result "1/2-1/2"]
[BlackElo "2210"]
[ECO "E98a"]
[EventDate "1999.??.??"]
[PlyCount "54"]
[Source "ChessliB"]
[SourceDate "2003.01.21"]
[WhiteElo "2388"]

1. d4 Nf6 2. c4 g6 3. Nc3 Bg7 4. e4 d6 5. Be2 O-O 6. Nf3 e5 7. O-O Nc6 8. d5 Ne7 9. Ne1 a5 10. a3 a4 11. Nd3 b6 12. f4 exf4 13. Bxf4 Nd7 14. Nxa4 f5 15. Nc3 Bxc3 16. bxc3 fxe4 17. Nb4 Nc5 18. g4 Rxf4 19. Rxf4 g5 20. Rf1 Bd7 21. Ra2 Ng6 22. Qd4 Nf4 23. Bd1 Qe7 24. Nc2 Ncd3 25. Ne3 Nc1 26. Rb2 Ncd3 27. Ra2 Nc1 1/2-1/2
1. d4 Nf6 2. c4 g6 3. Nc3 Bg7 4. e4 d6 5. Be2 O-O 6. Nf3 e5 7. O-O Nc6 8. d5 Ne7 9. Ne1 a5 10. a3 a4 11. Nd3 b6 12. f4 exf4 13. Bxf4 Nd7 14. Nxa4 f5 15. Nc3 Bxc3 16. bxc3 fxe4 17. Nb4 Nc5 18. g4 Rxf4 19. Rxf4 g5 20. Rf1 Bd7 21. Ra2 Ng6 22. Qd4 Nf4 23. Bd1 Qe7 24. Nc2 Ncd3 25. Ne3 Nc1 26. Rb2 Ncd3 27. Ra2 Nc1 this is the node
[Event "It A"]
[Site "Berlin-Tegel GER"]
[Date "1999.??.??"]
[R

KeyboardInterrupt: 

KeyboardInterrupt: 