In [1]:
sc

In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, IntegerType

In [3]:
import chess
from chess import piece_name, square_name

In [4]:
df = spark.read.csv("chess_dataframe_small.csv", header=True)

In [5]:
df.columns

['Event',
 'Site',
 'Date',
 'Round',
 'White',
 'Black',
 'Result',
 'BlackElo',
 'BlackRatingDiff',
 'BlackTitle',
 'ECO',
 'LichessURL',
 'Opening',
 'Termination',
 'TimeControl',
 'UTCDate',
 'UTCTime',
 'WhiteElo',
 'WhiteRatingDiff',
 'WhiteTitle',
 'Moves']

In [6]:
# extract game type
df = df.withColumn("game_type", F.split(df['Event'], ' ').getItem(1))

In [7]:
df.groupBy("game_type").count().sort(F.col("count").desc()).show()

+---------+-----+
|game_type|count|
+---------+-----+
|    Blitz|   30|
+---------+-----+



In [8]:
def get_move_squares(board, san_move):
    # Corrents the from_square and to_square attributes of a move
    # in case the move is a castling move
    move = board.parse_san(san_move)
    move = board._to_chess960(move)
    
    from_sq = move.from_square
    to_sq = move.to_square
    to_sq_occupied = board.piece_type_at(to_sq) != None

    # Handle special pawn moves
    if board.piece_type_at(from_sq) == chess.PAWN:
        ep_square = board.ep_square
        diff = move.to_square - move.from_square

        if to_sq == ep_square and abs(diff) in [7, 9] and not to_sq_occupied:
            # Handle en passant move
            down = -8 if board.turn == chess.WHITE else 8
            cap_sq = ep_square + down
            return [(from_sq, to_sq)], cap_sq

    # Handle castling
    if board.is_castling(move):

        a_side = chess.square_file(move.to_square) < chess.square_file(move.from_square)

        # Queen-side castling
        if a_side:
            return [
                (from_sq, chess.C1 if board.turn == chess.WHITE else chess.C8),
                (to_sq, chess.D1 if board.turn == chess.WHITE else chess.D8),
            ], None

        # King-side castling
        return [
            (from_sq, chess.G1 if board.turn == chess.WHITE else chess.G8),
            (to_sq, chess.F1 if board.turn == chess.WHITE else chess.F8),
        ], None
        
    # Handle regular move
    return [(from_sq, to_sq)], to_sq if to_sq_occupied else None

In [9]:
def get_square_name(sq):
    return chess.square_name(sq) if sq else None

def test(moves):
    board = chess.Board()
    for san_move in moves:
        move_squares, cap_square = get_move_squares(board, san_move)
        move_squares = [(get_square_name(a), get_square_name(b)) for a, b in move_squares]
        print(san_move)
        print(move_squares, get_square_name(cap_square))
        board.push_san(san_move)
        
test_moves = ["Nf3","Nf6","c4","c6","g3","d5","Bg2","Bf5","Qb3","Qb6","d3","e6","Be3","Qxb3","axb3"]
# test(test_moves)

In [10]:
def get_lifetimes(moves_string):
    moves = []
    for i in moves_string.split(" "):
        if "." not in i:
            moves.append(i)
    board = chess.Board(chess.STARTING_FEN)
    init_occupied_squares = [sq for sq in chess.SQUARES if board.piece_at(sq) != None]

    init_sq = {sq: sq for sq in init_occupied_squares}

    alive = {sq: 0 for sq in init_occupied_squares}
    lifetime = {sq: 0 for sq in init_occupied_squares}
    for i, san_move in enumerate(moves):
        if i % 2 == 0:
            # increment lifetime of alive pieces
            for sq in alive:
                alive[sq] = board.fullmove_number

        move_sqs, cap_sq = get_move_squares(board, san_move)

        if cap_sq is not None:
            # captured piece dies
            init_cap_sq = init_sq[cap_sq]
            del init_sq[cap_sq]
            lifetime[init_cap_sq] = alive[init_cap_sq]  # determines lifetime of captured piece
            del alive[init_cap_sq]

        for from_sq, to_sq in move_sqs:
            init_sq[to_sq] = init_sq[from_sq]
        for from_sq, _ in move_sqs:
            del init_sq[from_sq]
        board.push_san(san_move)

    # assign lifetimes of remaining alive pieces after the game ended
    for sq in alive:
        lifetime[sq] = board.fullmove_number
    return list(lifetime.values())

In [11]:
squares = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63]

In [12]:
# Filter None values
df = df.where(df.Moves.isNotNull())

In [13]:
lifetime_udf = F.udf(get_lifetimes, ArrayType(IntegerType()))

In [14]:
df_games = df.withColumn("lifetimes", lifetime_udf(df["Moves"])).select("lifetimes")

In [15]:
df_lifetimes = df_games.select([df_games.lifetimes[ind].alias(str(i)) for ind, i in enumerate(squares)])

In [16]:
%%time

df_lifetimes_mean  = df_lifetimes.select(*[F.mean(c).alias(c) for c in df_lifetimes.columns]).collect()[0]

CPU times: user 15.3 ms, sys: 0 ns, total: 15.3 ms
Wall time: 3.92 s


In [17]:
init_board = chess.Board(chess.STARTING_FEN)

data = []

for sq in squares:
    val = [chess.COLOR_NAMES[init_board.color_at(sq)],
           chess.piece_name(init_board.piece_type_at(sq)),
           chess.square_name(sq),
           df_lifetimes_mean[str(sq)]]
    data.append(val)

data = sorted(data, key=lambda e: -e[3])

In [18]:
data

[['white', 'king', 'e1', 41.233333333333334],
 ['black', 'king', 'e8', 41.233333333333334],
 ['black', 'pawn', 'g7', 39.53333333333333],
 ['white', 'pawn', 'f2', 36.4],
 ['white', 'pawn', 'h2', 36.266666666666666],
 ['black', 'pawn', 'h7', 36.03333333333333],
 ['white', 'pawn', 'g2', 35.2],
 ['black', 'pawn', 'f7', 34.96666666666667],
 ['black', 'pawn', 'a7', 34.63333333333333],
 ['white', 'pawn', 'a2', 33.56666666666667],
 ['black', 'pawn', 'b7', 33.36666666666667],
 ['white', 'pawn', 'b2', 32.36666666666667],
 ['white', 'rook', 'h1', 30.866666666666667],
 ['black', 'rook', 'h8', 30.466666666666665],
 ['black', 'rook', 'a8', 29.533333333333335],
 ['black', 'queen', 'd8', 29.4],
 ['black', 'pawn', 'e7', 29.3],
 ['white', 'queen', 'd1', 28.833333333333332],
 ['white', 'rook', 'a1', 28.6],
 ['white', 'bishop', 'c1', 27.9],
 ['black', 'pawn', 'c7', 27.866666666666667],
 ['black', 'bishop', 'f8', 27.433333333333334],
 ['black', 'knight', 'g8', 25.6],
 ['white', 'pawn', 'e2', 25.0],
 ['blac

In [19]:
import pickle

with open('lifetime_expectancy.pkl', 'wb') as f:
    pickle.dump(data, f)

In [20]:
with open('lifetime_expectancy.pkl', 'rb') as f:
    data = pickle.load(f)
    print(data)

[['white', 'king', 'e1', 41.233333333333334], ['black', 'king', 'e8', 41.233333333333334], ['black', 'pawn', 'g7', 39.53333333333333], ['white', 'pawn', 'f2', 36.4], ['white', 'pawn', 'h2', 36.266666666666666], ['black', 'pawn', 'h7', 36.03333333333333], ['white', 'pawn', 'g2', 35.2], ['black', 'pawn', 'f7', 34.96666666666667], ['black', 'pawn', 'a7', 34.63333333333333], ['white', 'pawn', 'a2', 33.56666666666667], ['black', 'pawn', 'b7', 33.36666666666667], ['white', 'pawn', 'b2', 32.36666666666667], ['white', 'rook', 'h1', 30.866666666666667], ['black', 'rook', 'h8', 30.466666666666665], ['black', 'rook', 'a8', 29.533333333333335], ['black', 'queen', 'd8', 29.4], ['black', 'pawn', 'e7', 29.3], ['white', 'queen', 'd1', 28.833333333333332], ['white', 'rook', 'a1', 28.6], ['white', 'bishop', 'c1', 27.9], ['black', 'pawn', 'c7', 27.866666666666667], ['black', 'bishop', 'f8', 27.433333333333334], ['black', 'knight', 'g8', 25.6], ['white', 'pawn', 'e2', 25.0], ['black', 'bishop', 'c8', 24.4