In [None]:
import pandas as pd
from stockfish import Stockfish
from tqdm import tqdm # progress bar
import ast # safety

In [None]:
# Parameters that will be set by the user
player = 'wreis79'
depth = 15
reeval = False # defines whether to run the stockfish evaluation again

In [None]:
tqdm.pandas(desc="Processing...", unit="game") 
#Barra de progresso do pandas

In [None]:
df = pd.read_csv('../../dados/base/chess_games_chesscom.csv')

In [None]:
df['move_evals'] = df['move_evals'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
# Converte strings que representam listas em objetos de lista reais, mantendo intactos os valores que já não são strings

In [None]:
stockfish_path = '../stockfish_folder/stockfish/stockfish-windows-x86-64-avx2.exe'

params = {
    "Threads": 4,
    "Hash": 2048,
}

stockfish = Stockfish(path=stockfish_path, depth=depth, parameters=params) 

In [None]:
df = df[df['player'] == player]
df = df[df['rules'] == 'chess']
df = df.dropna(axis=1, how='all')

In [None]:
def cut_eco_name(eco_url):
    '''
    Recebe o nome de uma abertura como uma string e tenta cortá-lo para remover partes desnecessárias.
    '''
    eco_parts = eco_url.split('/')
    eco_full_name = eco_parts[-1]
    
    eco = ' '.join(eco_full_name.split('-')[:2])

    if '.' in eco:
        eco = eco.split('.')[0]
    return eco

In [None]:
df['eco'] = df['eco'].apply(cut_eco_name) 

In [None]:
'''
Esta parte do código calcula a winrate do jogador para cada abertura utilizada por ele.
'''
winrates = {}

for opening in df['eco'].unique():
    kings_pawn_games = df[df['eco'].str.contains(opening)]
    wins = kings_pawn_games[kings_pawn_games['player_pieces'] == kings_pawn_games['winner']]
    if len(kings_pawn_games) < len(df)/100: 
        winrates[opening] = 'unknown'
    else:
        winrates[opening] = len(wins)/len(kings_pawn_games)

In [None]:
def decode(n):
    

    if not isinstance(n,str):
        return

    T = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!?{~}(^)[_]@#$,./&-*++="

    result = []
    w = len(n)
    for i in range(0, w, 2):
        move = {}
        drop = False
        o = T.index(n[i])
        s = T.index(n[i + 1])
        
        # Check if `s` indicates a promotion
        if s > 63:
            promotion_piece = "qnrbkp"[(s - 64) // 3]
            move["promotion"] = promotion_piece
            s = o + (-8 if o < 16 else 8) + ((s - 1) % 3) - 1

        # Check if `o` indicates a drop
        if o > 75:
            move["drop"] = "qnrbkp"[o - 79]
            drop = True
        else:
            move["from"] = T[o % 8] + str((o // 8) + 1)

        move["to"] = T[s % 8] + str((s // 8) + 1)

        # output formatting
        if not drop:
            move_string = move['from'] + move['to']
            if 'promotion' in move.keys():
                move_string += move['promotion']
            result.append(move_string)
        else:
            move_string = 'D' + move['drop'] + move['to']
            result.append(move_string)
    
    return result

# function to encode into TCN
# def encode(n):

#     if not isinstance(n,str):
#         return

#     T = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!?{~}(^)[_]@#$,./&-*++="
#     if not isinstance(n, list):
#         n = [n]
        
#     result = ""
#     for move in n:
#         if "drop" in move:
#             s = 79 + "qnrbkp".index(move["drop"])
#         else:
#             s = T.index(move["from"][0]) + 8 * (int(move["from"][1]) - 1)
        
#         _ = T.index(move["to"][0]) + 8 * (int(move["to"][1]) - 1)
        
#         if "promotion" in move:
#             promotion_piece_index = "qnrbkp".index(move["promotion"])
#             _ = 3 * promotion_piece_index + 64 + (9 + _ - s if _ < s else _ - s - 7)
        
#         result += T[s] + T[_]
    
#     return result

In [None]:
df['move_list'] = df['tcn'].apply(decode)

In [None]:
initial_board = {
    'a8': 'r', 'b8': 'n', 'c8': 'b', 'd8': 'q', 'e8': 'k', 'f8': 'b', 'g8': 'n', 'h8': 'r',
    'a7': 'p', 'b7': 'p', 'c7': 'p', 'd7': 'p', 'e7': 'p', 'f7': 'p', 'g7': 'p', 'h7': 'p',
    'a2': 'P', 'b2': 'P', 'c2': 'P', 'd2': 'P', 'e2': 'P', 'f2': 'P', 'g2': 'P', 'h2': 'P',
    'a1': 'R', 'b1': 'N', 'c1': 'B', 'd1': 'Q', 'e1': 'K', 'f1': 'B', 'g1': 'N', 'h1': 'R'
}

piece_values = {'p': 1, 'P': 1, 'n': 3, 'N': 3, 'b': 3, 'B': 3, 'r': 5, 'R': 5, 'q': 9, 'Q': 9, 'k': 0, 'K': 0}

def count_material(move_list):
    '''
    não leva em consideração enpassant por considerações de eficiência.
    '''
    if not isinstance(move_list, list):
        return
    
    board = initial_board.copy()
    material_history = []

    for move in move_list:

        if move[0] == 'D':
            return

        from_square = move[:2]
        to_square = move[2:4]

        if len(move) > 4: # promotes the piece
            piece = board.get(from_square)
            if piece.isupper():
                board[from_square] = move[-1].upper()
            else:
                board[from_square] = move[-1].lower()

        # Move the piece
        piece = board.get(from_square)
        if piece:
            board[to_square] = piece  # Place piece on new square
            del board[from_square]  # Remove from old square

        # Update material counts after the move
        white_material = sum(piece_values[piece] for piece in board.values() if piece.isupper())
        black_material = sum(piece_values[piece] for piece in board.values() if piece.islower())
        material_history.append((white_material, black_material))

    return material_history

In [None]:
starting_fen ='rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1'
def get_game_eval(move_list):
    '''
    usa o stockfish para avaliar cada posição de um jogo.
    '''
    if not isinstance(move_list, list):
        return
    move_evals = []
    stockfish.set_fen_position(starting_fen)

    try:
        for move in move_list:
            
            # Using Stockfish to get evaluation
            stockfish.make_moves_from_current_position([move])
            eval = stockfish.get_evaluation()
            move_evals.append(eval)
        return move_evals
    except:
        return

In [None]:
def extract_from_moves(move_list):
    """
    Function to aggregate the different extraction function loops in a single one, for the sake of efficiency.
    """
    if not isinstance(move_list, list):
        return
    

In [None]:
df['material_count'] = df['move_list'].progress_apply(count_material)

In [None]:
if ('move_evals' not in df.columns) or reeval:
    df['move_evals'] = df['move_list'].progress_apply(get_game_eval)


In [None]:
def map_eval(eval):
    """
    Mapeia a avaliação de uma jogada para um valor de intensidade categórico.

    A função recebe uma avaliação de jogada e retorna um valor que indica a força da posição em termos de uma escala de 4 níveis (de -4 a 4) 
    para avaliações de mate e uma escala de -3 a 3 para avaliações numéricas.

    Parâmetros:
    eval (dict): Dicionário contendo a chave 'type' (tipo de avaliação) e 'value' (valor da avaliação).

    Retorno:
    int: Valor categórico representando a força da avaliação:
         -4 (mate contra) a 4 (mate a favor), ou
         -3 a 3 para outras avaliações.
    """
    if eval['type'] == 'mate':
        if eval['value'] > 0:
            return 4
        else:
            return -4
    else:
        value = eval['value']
        if value < -500:
            return -3
        elif -500 <= value < -300:
            return -2
        elif -300 <= value < -50:
            return -1
        elif -50 <= value < 50:
            return 0
        elif 50 <= value < 300:
            return 1
        elif 300 <= value < 500:
            return 2
        else:
            return 3

In [None]:
def extrai_eval_fases(row):
    
    evals = row['move_evals']
    material_counts = row['material_count']

    if not isinstance(evals, list) or not isinstance(material_counts, list):
        return row

    if len(evals) >= 24:
        eval = evals[23]
    else:
        eval = evals[-1]
    row['opening_eval'] = map_eval(eval)

    # Getting the midgame eval
    for i, counts in enumerate(material_counts):
        if counts[0] <= 15 and counts[1] <= 15:
            eval = evals[i]
            row['midgame_eval'] = map_eval(eval)
            break

    if 'midgame_eval' not in row.index:
        eval = evals[-1]
        row['midgame_eval'] = map_eval(eval)
    elif pd.isna(row['midgame_eval']):
        eval = evals[-1]
        row['midgame_eval'] = map_eval(eval)
    
    return row

In [None]:
def evaluate_quality(eval: dict, previous_eval: dict, turn: bool) -> str:
    WHITE = 0
    BLACK = 1

    # From centipawn to centipawn
    if previous_eval['type'] == 'cp' and eval['type'] == 'cp':
        decrease = eval['value'] - previous_eval['value']

        if turn == WHITE:
            decrease = -decrease

        if decrease < 0: # Very rare case where the player makes a move better than what the engine saw
            quality = 'Brilliant'
        if decrease == 0:
            quality = 'Best'
        elif decrease < 50:
            quality = 'Excellent'
        elif decrease < 100:
            quality = 'Good'
        elif decrease < 200:
            quality = 'Inaccuracy'
        elif decrease < 300:
            quality = 'Mistake'
        else:
            quality = 'Blunder'

    # Mate sequence changed to centipawn, a mate was missed
    elif previous_eval['type'] == 'mate' and eval['type'] == 'cp':
        quality = 'Missed Mate'
    
    # Centipawn changed to mate
    # Either one player blundered a mate, or stockfish had a high centipawn eval but hadn't seen the mate sequence
    elif previous_eval['type'] == 'cp' and eval['type'] == 'mate':
        if (turn == WHITE and eval['value'] < 0) or (turn == BLACK and eval['value'] > 0):
            quality = 'Blunder' # We assume eval will never go from centipawn to mate in 0
        else:
            quality = 'Best'

    else: # both evals are of type mate
        previous_mate_distance = abs(previous_eval['value'])
        current_mate_distance = abs(eval['value'])

        if current_mate_distance < previous_mate_distance:
            if current_mate_distance == previous_mate_distance - 1:
                quality = 'Best'
            else:
                quality = 'Excellent'
        elif current_mate_distance == previous_mate_distance:
            quality = 'Best'
        else:
            quality = 'Mistake' # Mate distance increased, meaning a suboptimal move was made

    return quality

In [None]:
def extrai_qualidade(row):
    

    if not isinstance(row["move_evals"], list):
        return
    
    quality_counts = {
        'Brilliant': 0,
        'Best': 0,
        'Excellent': 0,
        'Good': 0,
        'Inaccuracy': 0,
        'Mistake': 0,
        'Blunder': 0,
        'Missed Mate': 0
    }

    turn = row["player_pieces"]
    if turn == "white": 
        turn = 0
    else:
        turn = 1
    
    move_evals = row["move_evals"]
    
    prev_movs = [{'type': 'cp', 'value': 0}]
    
    for i, move in enumerate(move_evals[:-1]):
        prev_movs.append(move)
        quality = evaluate_quality(move, prev_movs[i], turn)
        quality_counts[quality] += 1
    
    for quality_type in quality_counts:
        
        row[quality_type] = quality_counts[quality_type]

    return row


In [None]:
df = df.apply(extrai_qualidade, axis=1)

In [None]:
original_columns = list(df.columns)
if 'opening_eval' not in original_columns:
    new_columns = ['opening_eval', 'midgame_eval']
    final_columns = original_columns + new_columns
else:
    final_columns = original_columns

df = df.apply(extrai_eval_fases, axis=1)

In [None]:
df = df[final_columns]

In [None]:
# add the winrates with the opening used in each game

df['winrate_with_opening'] = 0.0

for op in df['eco'].unique():
    if op == None:
        continue
    df.loc[df['eco']==op,'winrate_with_opening'] = winrates[op]

In [None]:
df.to_csv(f'../../dados/base/chess_games_{player}.csv',index=False)