In [1]:
# Input: lichess-api-queries/data_from_queries/position_data_white_attacking.txt from 1_query_games.sh
# Output: processed_data/filtered_data_san_fen_cluster.tsv, processed_data/cluster_representatives_without_analysis.tsv

import sys
import pandas
import numpy as np
import matplotlib.pyplot as plt
import chess
import chess.svg
import chess.engine

attacker = "black"

if attacker == "white":
    data_import = pandas.read_csv("../lichess-api-queries/data_from_queries/position_data_white_attacking.txt", sep="\t")
if attacker == "black":
    data_import = pandas.read_csv("../lichess-api-queries/data_from_queries/position_data_black_attacking.txt", sep="\t")

In [2]:
if attacker == "white":
    filtered_data = data_import[(data_import["prob_trimmed"] > 0.1) & (data_import["prob"] > 0.01) & (data_import["white_win_prob"] > 0.55) & (data_import["white_wins"] + data_import["draws"] + data_import["black_wins"] > 500)]
if attacker == "black":
    filtered_data = data_import[(data_import["prob_trimmed"] > 0.1) & (data_import["prob"] > 0.01) & (data_import["black_win_prob"] > 0.55) & (data_import["white_wins"] + data_import["draws"] + data_import["black_wins"] > 500)]

# Use merge sort, which is a stable sort in the python implementation
filtered_data = filtered_data.sort_values("prob", ascending=False, kind="mergesort")
if attacker == "white":
    filtered_data = filtered_data.sort_values("white_win_prob", ascending=False, kind="mergesort")
if attacker == "black":
    filtered_data = filtered_data.sort_values("black_win_prob", ascending=False, kind="mergesort")
filtered_data.head()

Unnamed: 0,Opening,UCI_moves,move_index,prob,prob_trimmed,white_wins,draws,black_wins,white_win_prob,draw_prob,black_win_prob
2194,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3",5,0.119459,0.220965,14886,1963,54243,0.209391,0.027612,0.762997
2197,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1,g8f6",8,0.061948,0.114586,3925,502,11667,0.24388,0.031192,0.724929
2195,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1",6,0.119459,0.220965,14376,1916,41571,0.248449,0.033113,0.718438
2196,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1",7,0.061948,0.114586,7851,993,21094,0.262242,0.033169,0.704589
2203,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1,c7c6",8,0.061948,0.114586,952,135,2475,0.267266,0.0379,0.694834


In [3]:
filtered_data["san"] = ""
filtered_data["fen"] = ""
for i in range(filtered_data.shape[0]):
    board = chess.Board()
    moves = filtered_data.iloc[i, filtered_data.columns.get_loc("UCI_moves")].split(",")
    error_occurred = False
    for move in moves:
        try:
            board.push_uci(move)
        except ValueError:
            error_occurred = True
            print("Caught a ValueError (likely illegal move) for index " + str(i) + ", position " + filtered_data.iloc[i, filtered_data.columns.get_loc("UCI_moves")])
            break
    if not error_occurred:
        filtered_data.iloc[i, filtered_data.columns.get_loc("san")] = chess.Board().variation_san(board.move_stack)
        filtered_data.iloc[i, filtered_data.columns.get_loc("fen")] = board.fen()
        
# Remove rows with an illegal move
filtered_data = filtered_data[filtered_data.san != ""]

filtered_data.head()

Unnamed: 0,Opening,UCI_moves,move_index,prob,prob_trimmed,white_wins,draws,black_wins,white_win_prob,draw_prob,black_win_prob,san,fen
2194,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3",5,0.119459,0.220965,14886,1963,54243,0.209391,0.027612,0.762997,1. e4 d5 2. exd5 Bg4 3. Nc3,rn1qkbnr/ppp1pppp/8/3P4/6b1/2N5/PPPP1PPP/R1BQK...
2197,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1,g8f6",8,0.061948,0.114586,3925,502,11667,0.24388,0.031192,0.724929,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1 Nf6,rn1qkb1r/ppp1pppp/5n2/3P4/8/8/PPPP1PPP/R1BNKBN...
2195,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1",6,0.119459,0.220965,14376,1916,41571,0.248449,0.033113,0.718438,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1,rn1qkbnr/ppp1pppp/8/3P4/8/2N5/PPPP1PPP/R1BbKBN...
2196,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1",7,0.061948,0.114586,7851,993,21094,0.262242,0.033169,0.704589,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1,rn1qkbnr/ppp1pppp/8/3P4/8/8/PPPP1PPP/R1BNKBNR ...
2203,B01: Scandinavian Defense,"e2e4,d7d5,e4d5,c8g4,b1c3,g4d1,c3d1,c7c6",8,0.061948,0.114586,952,135,2475,0.267266,0.0379,0.694834,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1 c6,rn1qkbnr/pp2pppp/2p5/3P4/8/8/PPPP1PPP/R1BNKBNR...


In [4]:
# Cluster data
filtered_data["cluster"] = ""
for i in range(filtered_data.shape[0]):
    filtered_data.iloc[i, filtered_data.columns.get_loc("cluster")] = "C" + str(i+1)
    
# Merge if one move sequence is a subset of another, or if positions are identical
for i in range(filtered_data.shape[0]):
    for j in range(i+1,filtered_data.shape[0]):
        moves1 = filtered_data.iloc[i, filtered_data.columns.get_loc("UCI_moves")]
        moves2 = filtered_data.iloc[j, filtered_data.columns.get_loc("UCI_moves")]
        fen1 = filtered_data.iloc[i, filtered_data.columns.get_loc("fen")]
        fen2 = filtered_data.iloc[j, filtered_data.columns.get_loc("fen")]
        # Ignore the halfmove clock / fullmove number in the FEN
        fen1 = " ".join(fen1.split(" ")[0:4])
        fen2 = " ".join(fen2.split(" ")[0:4])
        if (moves1 in moves2) or (moves2 in moves1) or (fen1 == fen2):
            prev_cluster_name = filtered_data.iloc[i, filtered_data.columns.get_loc("cluster")]
            for k in range(j):
                if filtered_data.iloc[k, filtered_data.columns.get_loc("cluster")] == prev_cluster_name:
                    filtered_data.iloc[k, filtered_data.columns.get_loc("cluster")] = filtered_data.iloc[j, filtered_data.columns.get_loc("cluster")]

if attacker == "white":
    # Make any white Bh6 move the same cluster
    for i in range(filtered_data.shape[0]):
        if ". Bh6" in filtered_data.iloc[i, filtered_data.columns.get_loc("san")]:
            filtered_data.iloc[i, filtered_data.columns.get_loc("cluster")] = "C" + str(filtered_data.shape[0] + 1)
    # Make any white Bxd8 move the same cluster
    for i in range(filtered_data.shape[0]):
        if ". Bxd8" in filtered_data.iloc[i, filtered_data.columns.get_loc("san")]:
            prev_cluster_name = filtered_data.iloc[i, filtered_data.columns.get_loc("cluster")]
            for k in range(filtered_data.shape[0]):
                if filtered_data.iloc[k, filtered_data.columns.get_loc("cluster")] == prev_cluster_name:
                    filtered_data.iloc[k, filtered_data.columns.get_loc("cluster")] = "C" + str(filtered_data.shape[0] + 2)
            


In [5]:
# Rename clusters based on first appearance
filtered_data["cluster_idx"] = 0
new_cluster_idx = 1
cluster_map = {}

for i in range(filtered_data.shape[0]):
    old_cluster_val = filtered_data.iloc[i, filtered_data.columns.get_loc("cluster")]
    if not (old_cluster_val in cluster_map):
        cluster_map[old_cluster_val] = new_cluster_idx
        new_cluster_idx = new_cluster_idx + 1
    filtered_data.iloc[i, filtered_data.columns.get_loc("cluster_idx")] = cluster_map[old_cluster_val]
    
filtered_data = filtered_data.drop(columns="cluster")

In [6]:
# Group the clusters together using a stable sort that preserves previous ordering (by white win prob, then prob)
filtered_data = filtered_data.sort_values("cluster_idx", ascending=True, kind="mergesort")

In [7]:
filtered_data[["cluster_idx", "Opening", "prob", "prob_trimmed", "white_win_prob", "san"]].head()

Unnamed: 0,cluster_idx,Opening,prob,prob_trimmed,white_win_prob,san
2194,1,B01: Scandinavian Defense,0.119459,0.220965,0.209391,1. e4 d5 2. exd5 Bg4 3. Nc3
2197,1,B01: Scandinavian Defense,0.061948,0.114586,0.24388,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1 Nf6
2195,1,B01: Scandinavian Defense,0.119459,0.220965,0.248449,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1
2196,1,B01: Scandinavian Defense,0.061948,0.114586,0.262242,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1
2203,1,B01: Scandinavian Defense,0.061948,0.114586,0.267266,1. e4 d5 2. exd5 Bg4 3. Nc3 Bxd1 4. Nxd1 c6


In [8]:
# Get a dataframe containing the first element of each cluster
cluster_representatives = filtered_data[~filtered_data.duplicated(subset=['cluster_idx'])]

In [9]:
# Get stockfish analysis of the cluster_representative positions
cluster_representatives["stockfish_eval"] = ""
engine = chess.engine.SimpleEngine.popen_uci("/usr/local/Cellar/stockfish/15/bin/stockfish")
for i in range(cluster_representatives.shape[0]):
    print(i)
    fen = cluster_representatives.iloc[i, cluster_representatives.columns.get_loc("fen")]
    board = chess.Board(fen)
    info = engine.analyse(board, chess.engine.Limit(depth=25))
    cluster_representatives.iloc[i, cluster_representatives.columns.get_loc("stockfish_eval")] = str(info["score"].white())

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cluster_representatives["stockfish_eval"] = ""


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29


In [10]:
if attacker == "white":
    filtered_data.to_csv("processed_data/white_filtered_data_san_fen_cluster.tsv", sep ='\t', index = False)
    cluster_representatives.to_csv("processed_data/white_cluster_representatives_without_analysis.tsv", sep ='\t', index = False)
if attacker == "black":
    filtered_data.to_csv("processed_data/black_filtered_data_san_fen_cluster.tsv", sep ='\t', index = False)
    cluster_representatives.to_csv("processed_data/black_cluster_representatives_without_analysis.tsv", sep ='\t', index = False)