In [1]:
import pandas as pd
import numpy as np
import chess.pgn
import os
import csv
import re
from time import sleep
from tqdm.notebook import tqdm
from converter.pgn_data import PGNData
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [None]:
#Iterates over a pgn_file and writes the matching games to a new file.
#elo_category is a numerical value from 1-11, min_elo and max_elo are used to represent the different skill levels.
#file_name must be replaced with the actual file_path of the pgn-file.

def filter(min_elo, max_elo, elo_category):
    output_pgn_file = open(f"elo_{elo_category}.pgn", 'w')
    pgn_file = open(file_name)
    
    total_games = 200000
    miniters = 1000
    game_counter = 0

    with tqdm(total=total_games) as pbar:  # Initialize the tqdm progress bar
        while game_counter < total_games:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break

            tags = game.headers
            white_elo = int(tags['WhiteElo'])
            black_elo = int(tags['BlackElo'])

            if min_elo <= white_elo <= max_elo or min_elo <= black_elo <= max_elo:
                output_pgn_file.write(str(game) + '\n' + '\n')
                game_counter += 1
                if game_counter % miniters == 0:
                    pbar.update(miniters)# Updates the progress bar

    pgn_file.close()
    output_pgn_file.close()

In [None]:
#Automated call for filter()

filter(2500, 3600, 1) #Grandmaster
filter(2400, 2500, 2) #International Master
filter(2300, 2400, 3) #FIDE Master
filter(2200, 2300, 4) #FIDE Candidate Master
filter(2000, 2200, 5) #Candidate Master
filter(1800, 2000, 6) #Class A, category 1
filter(1600, 1800, 7) #Class B, category 2
filter(1400, 1600, 8) #Class C, category 3
filter(1200, 1400, 9) #Class D, category 4
filter(1000, 1200, 10)#Class E, category 5
filter(0, 1000, 11)   #Novices

In [None]:
def pgn_to_csv(file_name, iterations):
    for i in range(1, iterations + 1, 1):
        number = str(i)
        file_path = "/Users/ericwan/Jupyter/Chess/pgn_files_small/" + file_name + number + ".pgn"
        moves = PGNData(file_path)
        moves.export()


In [None]:
def filter_csv(file_name, iterations):
    for i in range(1, iterations + 1, 1):
        number = str(i)
        file_path = "/Users/ericwan/Jupyter/Chess/small_csv_files/" + file_name + number + "_moves.csv"
        moves = pd.read_csv(file_path)
        f_moves = moves[['game_id', 'move_no', 'move_no_pair', 'player', 'notation', 'move', 'from_square', 'to_square', 'piece', 'color', 'fen', 'is_check', 'is_check_mate', 'move_sequence']]
        f_moves.to_csv("/Users/ericwan/Jupyter/Chess/filtered_csv_files/filtered_" + file_name + number + "_moves")

In [None]:
def filter_fen_csv(input_file, output_file, character):
    with open(input_file, 'r') as input_csv, open(output_file, 'a', newline='') as output_csv:
        reader = csv.reader(input_csv)
        writer = csv.writer(output_csv)

        #Skip the first line, which are the headers.
        next(reader)
        writer.writerow('')
        for row in reader:
            string = row[7]  # Assuming the string is in the first column of each row
            string = string.lower()
            split = string.split('/')

            if split[0].startswith(character) or split[7].endswith(character):
                writer.writerow(row)

            elif split[7].startswith(character) or split[0].endswith(character):
                writer.writerow(row)
        

In [None]:
def call_fen_filter(input_number, iterations, output_file):
    for i in range(1, iterations + 1, 1):
        file_path = directory + str(input_number) + '.' + str(i) + '_moves.csv'
        filter_fen_csv(file_path, output_file, character)

directory = '/Users/ericwan/Jupyter/Chess/filtered_csv_files/filtered_elo_'
character = 'k'  # Replace with the character you want to check

In [None]:
#Used to get rid of first unnecessary column
directory = '/Users/ericwan/Jupyter/Chess/filtered_csv_files'
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

for file in csv_files:
    file_path = os.path.join(directory, file)
    df = pd.read_csv(file_path)
    df = df[['game_id', 'move_no', 'player', 'notation', 'move', 'piece', 'color', 'fen']]
    df.to_csv(file_path, index=False)

In [None]:
def check_fen_csv(input_file, output_file):
    with open(input_file, 'r') as input_csv, open(output_file, 'w', newline='') as output_csv:
        reader = csv.reader(input_csv)
        writer = csv.writer(output_csv)

        #Skip the first line, which are the headers.
        next(reader)
        writer.writerow('')

        for row in reader:
            string = row[7]
            split_strings = string.split('/')
            color = ["white","black"]
            side= ["left","right"]

            for i in range(0,len(color)):
                for j in range(0,len(side)):
                    validity = determine_corner_mate(split_strings, color[i], side[j])
                    if validity:
                        writer.writerow(row)

def determine_corner_mate(fen, color, side):
    side_dict={"left" : 2,"right" : 7} #determine important row for this mate
    color_dict={"white" : "RQ", "black" : "rq"} #use upperspace and lowerspace depending on the color
    start_index, end_index, knight_pos, king_pos= string_splicer(fen,color)
    valid_string_end = check_string_ending(color, side, knight_pos, king_pos)
    if valid_string_end:
        for row in fen[start_index:end_index]:
            rook_pos = find_rook_position(row, color_dict[color], side_dict[side])
            if rook_pos:
                if color =="black":
                    for subsequent_string in fen[fen.index(row)+1:-1]:
                        piece_pos = find_blocking_position(subsequent_string, color_dict[color], side_dict[side])
                        if piece_pos:
                            return None
                if color == "white":
                    for subsequent_string in fen[fen.index(row)-1::-1]:
                        piece_pos = find_blocking_position(subsequent_string, color_dict[color], side_dict[side])
                        if piece_pos:
                            return None
                return fen
    else: 
        return None

def find_rook_position(row, color, rank):
    #check if rook/queen is on the relevant row
    position = 0
    for char in row:
        if char.isalpha() or char.isdigit():
            position += int(char) if char.isdigit() else 1
            if (char == color[0] or char == color[1]) and position == rank:
                return True
    return None


def find_blocking_position(row, color, rank):
    #checks if any other pieces are standing on the row, where the rook/queen wants to check mate
    count = 0
    for char in row:
        if char.isalpha() and count+1 == rank:
            if (char == color[0] or char == color[1]):
                return(False)
            else:
                return(True)
        if char.isdigit():
            count += int(char)
        elif char.isalpha():
            count +=1
    return None

def check_string_ending(color, side, knight_pos, king_pos):
    if color == 'black' and side == 'right':
        condition1 = re.match(r".*n(1[a-zA-Z]|2)[A-P-R-Z]$", knight_pos[0])
        condition2 = re.match(r".*n$", knight_pos[1]) and re.match(r".*[A-P-R-Z]$", knight_pos[0])
        condition3 = re.match(r".*n([a-zA-Z]1|2)$", knight_pos[1]) and re.match(r".*[A-P-R-Z]$", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.endswith("K"):
            return True

    elif color == 'black' and side == 'left':
        condition1 = re.match(r"^[A-P-R-Z](1[a-zA-Z]|2)n.*", knight_pos[0])
        condition2 = re.match(r"^n.*", knight_pos[1]) and re.match(r"^[A-P-R-Z].*", knight_pos[0])
        condition3 = re.match(r"^([a-zA-Z]1|2)n.*", knight_pos[1]) and re.match(r"^[A-P-R-Z].*", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.startswith("K"):
            return True

    elif color == 'white' and side == 'right':
        condition1 = re.match(r".*N(1[a-zA-Z]|2)[a-p-r-z]$", knight_pos[0])
        condition2 = re.match(r".*N$", knight_pos[1]) and re.match(r".*[a-p-r-z]$", knight_pos[0])
        condition3 = re.match(r".*N([a-zA-Z]1|2)$", knight_pos[1]) and re.match(r".*[a-p-r-z]$", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.endswith("k"):
            return True

    elif color == 'white' and side == 'left':
        condition1 = re.match(r"^[a-p-r-z](1[a-zA-Z]|2)N.*", knight_pos[0])
        condition2 = re.match(r"^N.*", knight_pos[1]) and re.match(r"^[a-p-r-z]", knight_pos[0])
        condition3 = re.match(r"^([a-zA-Z]1|2)N.*", knight_pos[1]) and re.match(r"^[a-p-r-z].*", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.startswith("k"):
            return True

    return False


def string_splicer(fen, color):
    #Extract the relevant strings to determine "starting" position
        if color == "black":
            start = 0
            end = -2
            king_pos = fen[-1]
            knight_pos = [fen[-2],fen[-3]]
        elif color == "white":
            start = 2
            end = None
            knight_pos = [fen[1],fen[2]]
            king_pos = fen[0]
        return start, end, knight_pos, king_pos

In [None]:
# Read the contents of the CSV file
with open('/Users/ericwan/Jupyter/Chess/csv_fen_1/elo_11.csv', 'r') as file:
    reader = csv.reader(file)
    rows = list(reader)

# Filter out empty rows
non_empty_rows = [row for row in rows if any(field.strip() for field in row)]

# Write the filtered rows back to the CSV file
with open('/Users/ericwan/Jupyter/Chess/csv_fen_1/elo_11.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(non_empty_rows)


In [None]:
input_path = '/Users/ericwan/Jupyter/Chess/csv_fen_1/elo_'
output_path = '/Users/ericwan/Desktop/csv_fen_2/'

for i in range (1, 12, 1):
    i_path = input_path + str(i) + '.csv'
    o_path = output_path + 'game_' + str(i) + '.csv'
    check_fen_csv(i_path, o_path)
    print(f'File {i} has finished.')

In [None]:
games = pd.read_csv('/Users/ericwan/Desktop/csv_fen_2/game_id.csv')
f_games = games.drop_duplicates(subset='game_id')
print(f_games.shape[0])

In [None]:
input_file = '/Users/ericwan/Desktop/csv_fen_2/game_id.csv'
output_file = '/Users/ericwan/Desktop/csv_fen_2/games.csv'

with open(input_file, 'r') as file_in, open(output_file, 'w', newline='') as file_out:
    reader = csv.reader(file_in)
    writer = csv.writer(file_out)
    
    for row in reader:
        if any(row):
            writer.writerow(row)


In [None]:
info = pd.read_csv('/Users/ericwan/Desktop/csv_fen_2/games.csv')
f_info = info.drop_duplicates(subset='game_id')
print(f_info.shape[0])

In [None]:
input_path = '/Users/ericwan/Desktop/csv_fen_2/game_'

for i in range (1, 12, 1):
    i_path = input_path + str(i) + '.csv'
    games = pd.read_csv(i_path)
    games = games[['game_id']]
    games = games.drop_duplicates(subset='game_id')
    games.to_csv(i_path)
    print(f'File {i} has finished.')

In [14]:
def extract_game_id(input_file):
    game_ids = set()

    with open(input_file, 'r') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)
        for row in csv_reader:
            for game_id in row:
                game_ids.add(game_id)
    
    return game_ids

In [19]:
folder_path = '/Users/ericwan/Jupyter/Chess/small_csv_files/'
output_path = '/Users/ericwan/Jupyter/Chess/final_csv/elo_'
headers = ['game_id', 'move_no', 'move_no_pair', 'player', 'notation', 'move',
       'from_square', 'to_square', 'piece', 'color', 'fen', 'is_check',
       'is_check_mate', 'is_fifty_moves', 'is_fivefold_repetition',
       'is_game_over', 'is_insufficient_material', 'white_count',
       'black_count', 'white_pawn_count', 'black_pawn_count',
       'white_queen_count', 'black_queen_count', 'white_bishop_count',
       'black_bishop_count', 'white_knight_count', 'black_knight_count',
       'white_rook_count', 'black_rook_count', 'captured_score_for_white',
       'captured_score_for_black', 'fen_row1_white_count',
       'fen_row2_white_count', 'fen_row3_white_count', 'fen_row4_white_count',
       'fen_row5_white_count', 'fen_row6_white_count', 'fen_row7_white_count',
       'fen_row8_white_count', 'fen_row1_white_value', 'fen_row2_white_value',
       'fen_row3_white_value', 'fen_row4_white_value', 'fen_row5_white_value',
       'fen_row6_white_value', 'fen_row7_white_value', 'fen_row8_white_value',
       'fen_row1_black_count', 'fen_row2_black_count', 'fen_row3_black_count',
       'fen_row4_black_count', 'fen_row5_black_count', 'fen_row6_black_count',
       'fen_row7_black_count', 'fen_row8_black_count', 'fen_row1_black_value',
       'fen_row2_black_value', 'fen_row3_black_value', 'fen_row4_black_value',
       'fen_row5_black_value', 'fen_row6_black_value', 'fen_row7_black_value',
       'fen_row8_black_value', 'move_sequence']


for i in range(1, 12, 1):
    game_ids = extract_game_id(f'/Users/ericwan/Jupyter/Chess/csv_fen_2/game_{i}.csv')
    output_file = output_path + str(i) + '.csv'
    with open(output_file, 'w', newline='') as output_csv:
        writer = csv.writer(output_csv)
        writer.writerow(headers)

        for file_name in os.listdir(folder_path):
            if file_name.startswith(str(i)):
                file_path = os.path.join(folder_path, file_name)

                with open(file_path, 'r') as csv_file:
                    reader = csv.reader(csv_file)
                    for row in reader:
                        if row[0] in game_ids:
                            writer.writerow(row)
        print(f'File {i} has finished.')

File 1 has finished.
File 2 has finished.
File 3 has finished.
File 4 has finished.
File 5 has finished.
File 6 has finished.
File 7 has finished.
File 8 has finished.
File 9 has finished.
File 10 has finished.
File 11 has finished.


In [20]:
for i in range(1, 12, 1):
    game = pd.read_csv(f'/Users/ericwan/Jupyter/Chess/final_csv/elo_{i}.csv')
    game = game.drop_duplicates(subset='game_id')
    print(game.shape[0])


158
165
169
165
169
163
154
132
125
128
114
