## Import statements

In [None]:
import pandas as pd
import glob
import chess.pgn
import os
import csv
import re
from collections import Counter
from tqdm.notebook import tqdm
from converter.pgn_data import PGNData
import matplotlib.pyplot as plt
from matplotlib import font_manager
from matplotlib.font_manager import FontProperties
from collections import OrderedDict

## Split PGN-Files into elo brackets.

Elo_1: 0000 - 1000 Novices<br>
Elo_2: 1000 - 1200 Class E, category 5<br>
Elo_3: 1200 - 1400 Class D, category 4<br>
Elo_4: 1600 - 1800 Class B, category 2<br>
Elo_5: 1800 - 2000 Class A, category 1<br>
Elo_6: 2000 - 2200 Candidate Master<br>
Elo_7: 2300 - 2400 FIDE Master<br>
Elo_8: 2400 - 2500 International Master<br>
Elo_9: 2500 - 3600 Grandmaster<br>

In [None]:
def generate_games(file_name):
    """
    Generator function that reads games from a PGN file and yields each game.
    Args: file_name (str): The path to the PGN file.
    Yields: chess.pgn.Game: The next chess game in the PGN file.
    """
    with open(file_name) as pgn_file:
        while True:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break
            yield game

def create_elo_files(min_elo, max_elo, elo_category):
    """
    Creates a new PGN file containing chess games within a specified Elo range.

    Args:
        min_elo (int): The minimum Elo rating (inclusive).
        max_elo (int): The maximum Elo rating (inclusive).
        elo_category (str): The Elo category used for naming the output file.

    Returns:
        None
    """
    min_elo = int(min_elo)
    max_elo = int(max_elo)
    output_file_name = f"/Users/ericwan/Desktop/REDI/pgn-files/elo_{elo_category}.pgn"
    file_path = '/Users/ericwan/Desktop/REDI/lichess_db_standard_rated_2023-01.pgn'
    total_games = 200000
    game_counter = 0
    update_interval = 10000  # Update the progress bar for every 10,000 steps

    with open(output_file_name, 'w') as output_pgn_file:
        with tqdm(total=total_games, ncols=80, unit='game') as pbar:
            for game in generate_games(file_path):
                if game_counter >= total_games:
                    break

                tags = game.headers
                white_elo = int(tags['WhiteElo'])
                black_elo = int(tags['BlackElo'])

                if min_elo <= white_elo <= max_elo or min_elo <= black_elo <= max_elo:
                    output_pgn_file.write(str(game) + '\n\n')
                    game_counter += 1

                    if game_counter % update_interval == 0:
                        pbar.update(update_interval)

                if game_counter >= total_games:
                    pbar.close()
                    break

## Headers from pgn2data

In [None]:
headers = ['game_id', 'move_no', 'move_no_pair', 'player', 'notation', 'move',
       'from_square', 'to_square', 'piece', 'color', 'fen', 'is_check',
       'is_check_mate', 'is_fifty_moves', 'is_fivefold_repetition',
       'is_game_over', 'is_insufficient_material', 'white_count',
       'black_count', 'white_pawn_count', 'black_pawn_count',
       'white_queen_count', 'black_queen_count', 'white_bishop_count',
       'black_bishop_count', 'white_knight_count', 'black_knight_count',
       'white_rook_count', 'black_rook_count', 'captured_score_for_white',
       'captured_score_for_black', 'fen_row1_white_count',
       'fen_row2_white_count', 'fen_row3_white_count', 'fen_row4_white_count',
       'fen_row5_white_count', 'fen_row6_white_count', 'fen_row7_white_count',
       'fen_row8_white_count', 'fen_row1_white_value', 'fen_row2_white_value',
       'fen_row3_white_value', 'fen_row4_white_value', 'fen_row5_white_value',
       'fen_row6_white_value', 'fen_row7_white_value', 'fen_row8_white_value',
       'fen_row1_black_count', 'fen_row2_black_count', 'fen_row3_black_count',
       'fen_row4_black_count', 'fen_row5_black_count', 'fen_row6_black_count',
       'fen_row7_black_count', 'fen_row8_black_count', 'fen_row1_black_value',
       'fen_row2_black_value', 'fen_row3_black_value', 'fen_row4_black_value',
       'fen_row5_black_value', 'fen_row6_black_value', 'fen_row7_black_value',
       'fen_row8_black_value', 'move_sequence']

## Converts pgn-files to csv-files.

In [None]:
def pgn_to_csv(folder_path):
    """
    Converts multiple PGN files to CSV format.

    Args:
        folder_path (str): the folder path of the files to be converted.

    Returns:
        None
    """
    files = glob.glob(folder_path + '/*.pgn')

    for file in files:
        print(f'{file} has started.')
        pgn_data = PGNData(file)
        pgn_data.export()
        print(f'{file} has ended.')

## Merge elo brackets.

In [None]:
def merge_csv_files(file1, file2, file3, output_file):
    # Read the CSV files
    df1 = pd.read_csv(file1)
    df2 = pd.read_csv(file2)
    df3 = pd.read_csv(file3)
    
    # Concatenate the dataframes vertically
    merged_df = pd.concat([df1, df2, df3], ignore_index=True)
    
    # Write the merged dataframe to the specified output file
    merged_df.to_csv(output_file, index=False)
    
    print(f"CSV files merged successfully. Merged file saved at {output_file}")

## Filter for chess endgame pattern: corner mate.

In [None]:
def determine_corner_mate(fen, color, side):
    side_dict={"left" : 2,"right" : 7} #determine important row for this mate
    color_dict={"white" : "RQ", "black" : "rq"} #use upperspace and lowerspace depending on the color
    start_index, end_index, knight_pos, king_pos= string_splicer(fen,color)
    valid_string_end = check_string_ending(color, side, knight_pos, king_pos)
    if valid_string_end:
        for row in fen[start_index:end_index]:
            rook_pos = find_rook_position(row, color_dict[color], side_dict[side])
            if rook_pos:
                if color =="black":
                    for subsequent_string in fen[fen.index(row)+1:-1]:
                        piece_pos = find_blocking_position(subsequent_string, color_dict[color], side_dict[side])
                        if piece_pos:
                            return None
                    return True
                if color == "white":
                    for subsequent_string in fen[fen.index(row)-1::-1]:
                        piece_pos = find_blocking_position(subsequent_string, color_dict[color], side_dict[side])
                        if piece_pos:
                            return None
                    return True
                
    else: 
        return None

def find_rook_position(row, color, rank):
    #check if rook/queen is on the relevant row
    position = 0
    for char in row:
        if char.isalpha() or char.isdigit():
            position += int(char) if char.isdigit() else 1
            if (char == color[0] or char == color[1]) and position == rank:
                return True
    return None


def find_blocking_position(row, color, rank):
    #checks if any other pieces are standing on the row, where the rook/queen wants to check mate
    count = 0
    for char in row:
        if char.isalpha() and count+1 == rank:
            if (char == color[0] or char == color[1]):
                return(False)
            else:
                return(True)
        if char.isdigit():
            count += int(char)
        elif char.isalpha():
            count +=1
    return None

def check_string_ending(color, side, knight_pos, king_pos):
    if color == 'black' and side == 'right':
        condition1 = re.match(r".*n(1[a-zA-Z]|2)[A-P-R-Z]$", knight_pos[0])
        condition2 = re.match(r".*n$", knight_pos[1]) and re.match(r".*[A-P-R-Z]$", knight_pos[0])
        condition3 = re.match(r".*n([a-zA-Z]1|2)$", knight_pos[1]) and re.match(r".*[A-P-R-Z]$", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.endswith("K"):
            return True

    elif color == 'black' and side == 'left':
        condition1 = re.match(r"^[A-P-R-Z](1[a-zA-Z]|2)n.*", knight_pos[0])
        condition2 = re.match(r"^n.*", knight_pos[1]) and re.match(r"^[A-P-R-Z].*", knight_pos[0])
        condition3 = re.match(r"^([a-zA-Z]1|2)n.*", knight_pos[1]) and re.match(r"^[A-P-R-Z].*", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.startswith("K"):
            return True

    elif color == 'white' and side == 'right':
        condition1 = re.match(r".*N(1[a-zA-Z]|2)[a-p-r-z]$", knight_pos[0])
        condition2 = re.match(r".*N$", knight_pos[1]) and re.match(r".*[a-p-r-z]$", knight_pos[0])
        condition3 = re.match(r".*N([a-zA-Z]1|2)$", knight_pos[1]) and re.match(r".*[a-p-r-z]$", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.endswith("k"):
            return True

    elif color == 'white' and side == 'left':
        condition1 = re.match(r"^[a-p-r-z](1[a-zA-Z]|2)N.*", knight_pos[0])
        condition2 = re.match(r"^N.*", knight_pos[1]) and re.match(r"^[a-p-r-z]", knight_pos[0])
        condition3 = re.match(r"^([a-zA-Z]1|2)N.*", knight_pos[1]) and re.match(r"^[a-p-r-z].*", knight_pos[0])
        if (condition1 or condition2 or condition3) and king_pos.startswith("k"):
            return True

    return False


def string_splicer(fen, color):
    #Extract the relevant strings to determine "starting" position
        if color == "black":
            start = 0
            end = -2
            king_pos = fen[-1]
            knight_pos = [fen[-2],fen[-3]]
        elif color == "white":
            start = 2
            end = None
            knight_pos = [fen[1],fen[2]]
            king_pos = fen[0]
        return start, end, knight_pos, king_pos

In [None]:
def write_scenarios(input_file, output_file, game_ids):
    with open(input_file, 'r') as input, open(output_file, 'w', newline='') as output:
        reader = csv.reader(input)
        writer = csv.writer(output)
        headers = next(reader)
        writer.writerow(headers)
        
        for row in reader:
            if row[0] in game_ids:
                writer.writerow(row)

#[0] = game_id, [10] = fen
#Please note that Corner Mate is specified depending on who is attacking, so BL means that black is attacking white on the left side.
variation1 = set() #BL
variation2 = set() #BR
variation3 = set() #WL
variation4 = set() #WR
files = sorted(glob.glob('/Users/ericwan/Desktop/REDI/Original_moves/' + '*.csv'))
output_path = '/Users/ericwan/Desktop/REDI/Games_with_corner_mate/'

for file in files:
    file_name = os.path.basename(file)
    file_name_without_extension = os.path.splitext(file_name)[0]

    with open(file, 'r') as input:
        reader = csv.reader(input)
        headers = next(reader)

        for row in reader:
            game_id = row[0]
            fen = row[10].split('/')
            color = ["black", "white"]
            side = ["left", "right"]
            
            for ü in range(len(color)):
                for ä in range(len(side)):

                    result = determine_corner_mate(fen, color[ü], side[ä])
                    if result:
                        if color[ü] == "black":
                            if side[ä] == "left":
                                variation1.add(game_id)
                            else:   
                                variation2.add(game_id)
                        elif color[ü] == "white":
                            if side[ä] == "left":
                                variation3.add(game_id)
                            else:
                                variation4.add(game_id)

        write_scenarios(file, f'{output_path}{file_name_without_extension}_BL.csv', variation1)
        write_scenarios(file, f'{output_path}{file_name_without_extension}_BR.csv', variation2)
        write_scenarios(file, f'{output_path}{file_name_without_extension}_WL.csv', variation3)
        write_scenarios(file, f'{output_path}{file_name_without_extension}_WR.csv', variation4)

        variation1.clear()
        variation2.clear()
        variation3.clear()
        variation4.clear()
        print(f'Finished file: {file_name}.')

# Filter for game duration, termination status and leave out bot players.

In [None]:
# Check the number of games left after checking for bots and game duration.

def filter_qualifying_game_ids(input1, input2):
    qualifying_game_ids = set()

    additional_info = {}
    with open(input2, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header row
        for row in reader:
            game_id = row[0]
            additional_info[game_id] = row

    with open(input1, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header row
        for row in reader:
            game_id = row[0]
            if 'BOT' not in additional_info[game_id][12:14] and int(additional_info[game_id][22].split('+')[0]) >= 180 and additional_info[game_id][21] != 'Time forfeit':
                qualifying_game_ids.add(game_id)

    return len(qualifying_game_ids)


In [None]:
# Check the number of games left after checking for bots, game duration and termination type.

def filter_acceptable_game_ids(input1, input2):
    qualifying_game_ids = set()

    additional_info = {}
    with open(input2, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header row
        for row in reader:
            game_id = row[0]
            additional_info[game_id] = row

    with open(input1, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header row
        for row in reader:
            game_id = row[0]
            if 'BOT' not in additional_info[game_id][12:14] and int(additional_info[game_id][22].split('+')[0]) >= 180 and additional_info[game_id][21] != 'Time forfeit':
                qualifying_game_ids.add(game_id)

    return qualifying_game_ids

In [None]:
def write_new_file(input_file, output_file, game_ids):
    with open(input_file, 'r') as input_file, open(output_file, 'w', newline = '') as output_file:
        reader = csv.reader(input_file)
        writer = csv.writer(output_file)
        headers = next(reader)
        writer.writerow(headers)

        for row in reader:
            game_id = row[0]
            
            if game_id in game_ids:
                writer.writerow(row)

# Different filters for FEN only, FEN -2, FEN +1.

In [None]:
# Writes out the first occurence of the corner mate position of the input_file to the output_file.
def find_first_fen_occurence(input_file, output_file):

    with open(input_file, 'r') as input, open(output_file, 'w', newline='') as output:
        reader = csv.reader(input)
        writer = csv.writer(output)
        headers = next(reader)
        writer.writerow(headers)
        
        for row in reader:
            split_strings = row[10].split("/")
            color = ["black","white"]
            side= ["left","right"]
            for i in range(0,len(color)):
                for j in range(0,len(side)):
                    validity = determine_corner_mate(split_strings, color[i], side[j])
                    if validity:
                        writer.writerow(row)
        
    output_check = pd.read_csv(output_file)
    output_check = output_check.drop_duplicates(subset='game_id', keep='first')
    print(output_check.shape[0])

In [None]:
def create_fen_files(fen_occurrences, input_file, output_path):
    with open(fen_occurrences, 'r') as sort_file, open(input_file, 'r') as input_file, open(output_path, 'w', newline='') as output_file:
        sort_reader = csv.reader(sort_file)
        input_reader = csv.reader(input_file)
        writer = csv.writer(output_file)
        
        header = next(input_reader)
        writer.writerow(header)
        next(sort_reader)

        for row1 in sort_reader:
            move_no = int(row1[1])
            game_id = row1[0]

            for row2 in input_reader:
                if game_id == row2[0] and (move_no +2) <= int(row2[1]): #CHANGE VALUE HERE FOR DIFFERENT FEN-POSITIONS, e.g. (move_no -2)
                    writer.writerow(row2)
                    
            # Reset the input_reader to the beginning of the file
            input_file.seek(0)
            next(input_reader)

    game = pd.read_csv(output_path)
    game = game.drop_duplicates(subset='game_id')
    print(game.shape[0])

In [None]:
# Remove duplicate rows

folder_path = '/Users/ericwan/Desktop/REDI/4 FEN only/'  # Use the current directory as the folder path

# Get the list of CSV files in the folder
csv_files = [file for file in os.listdir(folder_path) if file.endswith(".csv")]

# Process the CSV files
for file in csv_files:
    file_path = os.path.join(folder_path, file)
    print(f"Processing file: {file}")

    # Read the CSV file and remove duplicate rows
    with open(file_path, "r", newline="") as csvfile:
        reader = csv.DictReader(csvfile)
        rows = [row for row in reader]

    unique_rows = list(OrderedDict.fromkeys(tuple(row.items()) for row in rows))
    num_duplicates = len(rows) - len(unique_rows)

    # Write the unique rows back to the CSV file
    with open(file_path, "w", newline="") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=reader.fieldnames)
        writer.writeheader()
        writer.writerows([dict(row) for row in unique_rows])

    print(f"Duplicate rows removed: {num_duplicates}")

print("Duplicate rows removal completed.")

# Plotting Statistics

In [None]:
# Plotting the number of games before FEN-Matching.

# Path to the folder containing CSV files
folder_path = '/Users/ericwan/Desktop/REDI/1 Original_games/'
font_path = font_manager.findfont('Times New Roman')
font_properties = font_manager.FontProperties(fname=font_path, size=12)

# Get a list of all CSV files in the folder
csv_files = sorted(glob.glob(os.path.join(folder_path, '*.csv')))

# Lists to store the filenames and game frequencies
filenames = []
frequencies = []

# Iterate through each CSV file
for csv_file in csv_files:
    # Read the CSV file using pandas
    df = pd.read_csv(csv_file, dtype=str)
    
    # Count the number of games in the file
    game_count = len(df)
    
    # Append the filename and game frequency to the lists
    filenames.append(os.path.basename(csv_file))
    frequencies.append(game_count)

# Plotting the bar diagram
x_labels = ['+2500', '2400-2500', '2300-2400', '2000-2200', '1800-2000', '1600-1800', '1200-1400', '1000-1200', '0-1000']  # Custom x-axis labels
plt.bar(filenames, frequencies)
plt.title('Number of games per elo bracket before filtering', fontdict={'fontproperties': font_properties})
plt.xlabel('Elo bracket', fontdict={'fontproperties': font_properties})
plt.ylabel('Number of games', fontdict={'fontproperties': font_properties})

# Set custom y-axis tick positions and labels
y_tick_positions = range(0, max(frequencies) + 1, 50000)
plt.yticks(y_tick_positions)

plt.xticks(range(len(x_labels)), x_labels[::-1], rotation=90) # Rotate x-axis labels if needed
plt.show()

In [None]:
# Plotting the number of games after merging the elo brackets and after applying the FEN-Matching.

# Path to the folder containing your CSV files
folder_path = "/Users/ericwan/Desktop/REDI/3 Games_filtered_180seconds"
font_path = font_manager.findfont('Times New Roman')
font_properties = font_manager.FontProperties(fname=font_path, size=12)

# Get a list of all CSV files in the folder
file_list = glob.glob(folder_path + "/*.csv")

# Sort the file list based on the integer in the file names
file_list.sort(key=lambda x: int(''.join(filter(str.isdigit, x))))

# Custom labels for the x-axis
x_labels = ['+2300', '1600-2000', '0-1400']

# List to store the count of unique IDs in each file
counts = []

# Iterate over the file groups (each group has 4 files)
for i in range(0, len(file_list), 4):
    # List to store the unique IDs in each file group
    unique_ids = []

    # Iterate over the files in the current group
    for j in range(i, i + 4):
        try:
            # Read the CSV file
            df = pd.read_csv(file_list[j])
        
            # Get the unique IDs and append them to the list
            unique_ids.extend(df['game_id'].unique())
        except pd.errors.EmptyDataError:
            # Skip empty CSV files
            continue

    # Count the unique IDs in the current file group
    count = len(set(unique_ids))
    counts.append(count)

# Reverse the order of the x_labels list
x_labels = x_labels[::-1]

# Create a bar plot with reversed x-axis labels and bars
plt.bar(x_labels, counts)
plt.xlabel('Elo bracket', fontdict={'fontproperties': font_properties})
plt.ylabel('Number of games', fontdict={'fontproperties': font_properties})
plt.title('Number of games per elo bracket after filtering', fontdict={'fontproperties': font_properties})

# Add value labels to the bars
for i, v in enumerate(counts):
    plt.text(i, v, str(v), ha='center', va='bottom')

# Show the plot
plt.show()

In [None]:
# Plotting the number of games after merging the elobrackets, applying the FEN-Matching and filter for side and color.

# Path to the folder containing your CSV files
folder_path = '/Users/ericwan/Desktop/REDI/3 Games_filtered_180seconds/'
font_path = font_manager.findfont('Times New Roman')
font_properties = font_manager.FontProperties(fname=font_path, size=12)

# Get a list of all CSV files in the folder
file_list = glob.glob(folder_path + "/*.csv")

# Sort the file list based on the integer in the file names
#file_list.sort(key=lambda x: int(''.join(filter(str.isdigit, x))))
file_list.sort(key=lambda x: (int(re.findall(r'\d+', x)[0]), x))

# Custom overarching labels for the x-axis
overarching_labels = ['+0-1400', '1600-2000', '+2300']

# Custom sub-labels for each file within the bracket
sub_labels = ['Black Left', 'Black Right', 'White Left', 'White Right']

# List to store the count of unique IDs for each file
counts = []

# Iterate over each file
for file_path in file_list:
    try:
        # Read the CSV file
        df = pd.read_csv(file_path)
        
        # Get the unique IDs count
        count = len(df['game_id'].unique())
        
        # Append the count to the list
        counts.append(count)
    except pd.errors.EmptyDataError:
        # Skip empty CSV files
        continue

# Custom colors for the bars
bar_colors = ['blue', 'orange', 'green', 'red']

# Create a bar plot with individual file counts
bar_width = 0.2
x = range(3)

fig, ax = plt.subplots()
for i in range(4):
    ax.bar([val + (i * bar_width) - 0.3 for val in x], counts[i::4], width=bar_width, label=sub_labels[i],  color=bar_colors[i])
    # Add values for each sub-bar
    for j, count in enumerate(counts[i::4]):
        ax.text(x[j] + (i * bar_width) - 0.3, count, str(count), ha='center', va='bottom')

# Set the x-axis tick positions and labels
ax.set_xticks([val - 0.3 + (1.5 * bar_width) for val in x])
ax.set_xticklabels(overarching_labels)
ax.set_xlabel('Elo Brackets', fontdict={'fontproperties': font_properties})
ax.set_ylabel('Number of Games', fontdict={'fontproperties': font_properties})
ax.set_title('Number of games per elo bracket per variation', fontdict={'fontproperties': font_properties})

# Move the legend to the side
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))

# Adjust the plot layout to accommodate the legend
plt.tight_layout(rect=[0, 0, 0.85, 1])

# Show the plot
plt.show()

In [None]:
# Plotting the number of variants over all games for each elo bracket.

data = [
    ["1 BL", "11 / 11"],
    ["2 BL", "12 / 13"],
    ["3 BL", "10 / 10"],
    ["1 BR", "90 / 101"],
    ["2 BR", "86 / 90"],
    ["3 BR", "44 / 51"],
    ["1 WL", "11 / 13"],
    ["2 WL", "12 / 15"],
    ["3 WL", "7 / 9"],
    ["1 WR", "73 / 81"],
    ["2 WR", "91 / 103"],
    ["3 WR", "65 / 78"]
]

headers = ["Elo", "Variants / Games"]

fig, ax = plt.subplots(figsize=(6, 6))
ax.axis('off')  # Remove axes

table = ax.table(cellText=data, colLabels=headers, cellLoc='center', loc='center', colWidths=[0.2, 0.3])

font = FontProperties(family='Times New Roman', size=12)
table.auto_set_font_size(False)

# Set font properties for cells
for i, j in table.get_celld().keys():
    table[i, j].get_text().set_fontproperties(font)

table.scale(1.2, 1.2)

# Make headers bold
for j in range(len(headers)):
    table[0, j].get_text().set_weight('bold')

plt.show()

# Other useful functions

In [None]:
# Calculate the number of games of a specified folder containing csv-files.

files = sorted(glob.glob('/Users/ericwan/Desktop/REDI/4 FEN only/*.csv'))
for file in files:
    game = pd.read_csv(file)
    game = game.drop_duplicates(subset='game_id')
    print(f'File {file} has {game.shape[0]} games.')