In [None]:
import chess
import chess.pgn
import sqlite3 as sql
import regex as re
from datetime import datetime


## PGN Read and data transmit to SQL

In [None]:
pgn = open('Jan 2016.pgn')
conn = sql.connect('lichess game data.db')
cur = conn.cursor()

In [None]:
def create_board():
    '''Creates dictionary representing chess board, using standard square notation for ease of understanding'''
    
    board = {}
    for num in '12345678':
        for letter in 'abcdefgh':
            key = letter + num
            board[key]  = []
    
    return board

In [None]:
def generate_pieces():
    '''Defines pieces with unique letter/number ID. To be chained with populate_board and make_move_dict'''
    
    nums = []
    for x in range(1,17):
        nums.append(str(x))

    pawns = []
    rooks  = []
    knights = []
    bishops = []
    kings = []
    queens = []

    for x in range(16):
        pawns.append('p'+nums[x])

    for x in range(4):
        rooks.append('r'+nums[x])
        knights.append('n'+nums[x])
        bishops.append('s'+nums[x])

    for x in range(2):
        kings.append('k'+nums[x])
        queens.append('q'+nums[x])
    
    return pawns, rooks, knights, bishops, kings, queens
    

In [None]:
def populate_board(board):
    '''Takes in game board, generates piece lists, then populates and returns board'''
    
    pawns, rooks, knights, bishops, kings, queens = generate_pieces()
    
    board_keys = list(board.keys())
    
    w_backrow = rooks[0] + knights[0] + bishops[0] + queens[0] + kings[0] + bishops[1] + knights[1] + rooks[1]
    b_backrow = rooks[2] + knights[2] + bishops[2] + kings[1] + queens[1] + bishops[3] + knights[3] + rooks[3]
    
    n=2
    w_backrow = [w_backrow[i:i+n] for i in range(0, len(w_backrow), n)] #splits string into lengths of n
    b_backrow = [b_backrow[i:i+n] for i in range(0, len(b_backrow), n)]
    
    board.update({ k:[v] for (k,v) in zip(board_keys[8:16], pawns[0:8])})
    board.update({ k:[v] for (k,v) in zip(reversed(board_keys[-16:-8]), pawns[8:])})

    board.update({ k:[v] for (k,v) in zip(board_keys[:8], w_backrow)})
    board.update({ k:[v] for (k,v) in zip(reversed(board_keys[-8:]), b_backrow)})
    
    return board

In [None]:
def make_moves_dict():
    '''Creates dict to store move history of each piece. To be exported to SQL database'''
    
    pawns, rooks, knights, bishops, kings, queens = generate_pieces()
    
    moves_by_piece = {piece: [] for piece in(pawns + rooks + bishops + knights + kings + queens)}
    moves_by_piece['move_count'] = [0]
    
    return moves_by_piece

In [None]:
def simulate_game(moves_sequence):
    '''Takes in moves sequence  as supplied from chess.pgn.read_game(pgn).mainline_moves() method,
    recreates game and records moves in moves_by_piece dict. Returns moves_by_piece for export to SQL database'''
    
    board = create_board()
    board = populate_board(board)
    moves_by_piece = make_moves_dict()
    
    for move in moves_sequence:
        moves_by_piece['move_count'][0] +=1
        start = str(move.uci()[:2])
        stop = str(move.uci()[2:4])

        #Capped piece logic
        if board[stop] != []:
            capped_piece = board[stop][0]
            moves_by_piece[capped_piece] = board[stop][1:]+[stop]+[board[start][0]]

        #Castling logic. General move tracking should handle the king, this just handles the rooks
        if board[start][0][0] == 'k' and (start == 'e1' or start == 'e8'):

            if board[start][0] == 'k1':
                if stop == 'g1':
                    board['f1'] = board['h1'] + ['h1']
                    board['h1'] = []

                if stop == 'c1':
                    board['d1'] = board['a1'] + ['a1']
                    board['a1'] = []

            if board[start][0] == 'k2':
                if stop == 'g8':
                    board['f8'] = board['h8'] + ['h8']
                    board['h8'] = []

                if stop == 'c8':
                    board['d8'] = board['a8'] + ['a8']
                    board['a8'] = []
                    
         #En passant
        if (board[start][0][0] == 'p' and                        #Check for pawn
         start[0] != stop[0] and                                 #Check for diagonal movement
         board[stop] == [] and                                   #Check for standard capture
         len([1 for x in board[start][1:] if re.match('P.', x)]) < 1 ):   #Check that pawn isn't promoted
            
            if start[1] == '5': #White's en passant capture
                ep_cap = stop[0] + '5'

            if start[1] == '4': #Black's en passant capture
                ep_cap = stop[0] + '4'

            capped_piece = board[ep_cap][0]
            moves_by_piece[capped_piece] = board[ep_cap][1:]+[ep_cap]+[board[start][0]]
            board[ep_cap] = []
            
            
        #Move tracking logic
        board[stop] = board[start] + ([start])
        board[start]= []
        
        #Dealing with promotions
        if len(move.uci())>4:
            board[stop].append('P'+move.uci()[-1])
            
    #After game completion, gather move history for non-capped pieces. Pieces that didn't move are ignored.  
    for square in board.keys():
        if len(board[square]) > 1:
            piece = board[square][0]
            moves_by_piece[piece] = board[square][1:]+[square]
            
    return moves_by_piece

In [None]:
run = True
batch = 0

while run == True:
    batch += 1
    print(F"Batch {batch}, Time {datetime.now()}")
    count = 0

    #Generate batch of records for export
    while count < 50000: 

        entries  = []
        
        game = chess.pgn.read_game(pgn)
        
        header = game.headers
        
        if header is None:
            conn.commit()
            run = False
            break
        
        #Unique game ID. Traces back to game at https://lichess.org/{game_id}
        entries.append(header.get('Site')[-8:])
        
        #Retrieve game format, excluding tournament links
        game_type = header.get('Event')
        entries.append(re.match(pattern='^.*(?= https.*)|^.*', string=game_type).group())

        #White & Black player names
        entries.append(header.get('White'))
        entries.append(header.get('Black'))

        #White & Black ELO ratings
        try:
            entries.append(header.get('WhiteElo'))
        except:
            entries.append(None)
        try:    
            entries.append(header.get('BlackElo'))
        except:
            entries.append(None)

        #Match winner, converted from 1-0/0-1 where 1-0 is a white win
        if  header.get('Result')[0] == '1':
            winner = 'white'
        elif header.get('Result')[-1] == '1':
            winner = 'black'
        else:
            winner = header.get('Result') #placeholder until I figure out what a draw looks like    
        entries.append(winner)

        #Opening sequence
        entries.append(header.get('Opening'))

        #How the game ended
        entries.append(header.get('Termination'))

        #Date of match, expected as YYYY.MM.DD
        entries.append(header.get('UTCDate'))

        #moves_by_piece dict generationn
        moves = game.mainline_moves()
        moves_results = simulate_game(moves)
        
        #Number of moves add to metadata
        entries.append(moves_results['move_count'][0])
        
        export = []
        for piece in list(moves_results.values())[:32]:
            export.append('/'.join(piece))

        entries = tuple(entries)
        
        cur.execute('''INSERT INTO games_metadata
                    (game_id, game_type, white_id, black_id, white_elo, black_elo, winner, opening, game_end, match_date, number_of_moves)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', entries)
        
        cur.execute('''INSERT INTO moves_by_piece
                    (game_id, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16, r1, r2, r3, r4,
                    s1, s2, s3, s4, n1, n2, n3, n4, k1, k2, q1, q2) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,
                    ?,?,?,?,?,?,?,?,?,?,?,?)''', tuple([entries[0]] + export))
        count +=1

    conn.commit()
    
conn.close()