# Convert PGN to extended LAN (xLAN)

### Config conversion

In [None]:
from src.data_preprocessing.pgn_to_xlan import pgn_to_xlan


pgn_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03.pgn"
lan_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_2000plus.xlanplus"

min_number_of_moves_per_game = 0
number_of_games_to_write = -1  # -1 for all games
generate_all_moves = False

pgn_to_lan = pgn_to_xlan(
    pgn_path,
    lan_path,
    min_number_of_moves_per_game=min_number_of_moves_per_game,
    number_of_games_to_write=number_of_games_to_write,
    generate_all_moves=False,
    log=False,
    xLanPlus=True,
    filter_elo=True,
    elo_min=2000,
    elo_max=3500,
)

pgn_to_lan.convert_pgn_parallel()

# Check Common and Duplicate Lines


In [None]:
"""
Use check_duplicates_and_common_lines to check if there are duplicates or common lines in two files.
"""

from src.data_preprocessing.check_duplicates_and_common_lines import (
    check_duplicates_and_common_lines,
)

training_file = (
    "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000.xlanplus"
)
validation_file = (
    "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000_old.xlanplus"
)

check_duplicates_and_common_lines(
    training_file,
    validation_file,
    delete_common=False,
    delete_duplicates_from_file_1=True,
    delete_duplicates_from_file_2=False,
)

# Tokenize Data


In [None]:
from src.tokenizer.tokenizer import tokenize_file

notation = "xLANplus"  # select "xLANplus", "xLAN", "xLANchk" or "xLANcap"
xLAN_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000.xlanplus"
tokenized_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000.tok"

tokenize_file(
    notation=notation, data_path=xLAN_path, out_path=tokenized_path, batch_size=20000
)

# Detokenize Data

In [None]:
from src.tokenizer.detokenizer import detokenize_data

notation = "xLANplus"  # select "xLANplus", "xLAN", "xLANchk" or "xLANcap"
tokens = "6 32 34 76 6 37 35 76 6 24 26 76 6 29 28 76 5 55 49 76 5 62 52 76 5 15 25 76 6 45 44 76 4 23 59 76 4 54 45 76 6 40 41 76 1 46 62 76 2 31 24 76 5 22 37 76 3 7 31 76 6 69 68 76 4 59 66 76 6 35 26 81 4 47 26 81 5 52 35 76 4 66 45 81 2 38 45 81 4 26 40 76 5 35 25 81 2 24 25 81 5 37 52 76 5 49 43 76 5 52 42 76 2 25 17 76 2 45 52 76 1 39 55 76 3 54 38 76 4 40 33 76 5 42 36 76 2 17 24 76 5 36 46 76 3 31 23 76 5 46 29 76 6 56 57 76 4 30 37 76 2 24 27 76 4 37 46 76 2 27 9 76 6 13 12 76 6 48 50 76 6 21 19 76 5 43 58 76 6 19 18 76 5 58 52 79 6 61 52 81 2 9 18 81 3 14 22 76 2 18 32 76 5 29 35 76 6 41 42 76 5 35 18 76 4 33 26 76 6 28 27 76 6 50 51 76 6 27 34 81 6 51 44 81 4 46 19 76 6 44 53 79 1 62 54 76 2 32 68 79 1 54 45 76 2 68 52 79 1 45 54 76 4 26 19 81 3 22 19 81 2 52 68 77 1 54 45 76 3 23 29 77 3 38 37 76 2 53 54 78 71 74"

print(detokenize_data(tokenized_data=tokens, notation=notation))

# Remove lines with more than x tokens


In [None]:
def remove_lines_with_too_many_tokens(
    input_file_path, output_file_path, token_limit=510
):
    with open(input_file_path, "r") as file:
        lines = file.readlines()

    print(f"Number of lines in {input_file_path}: {len(lines)}")
    lines_to_keep = []
    removed_count = 0

    for line in lines:
        if len(line.split()) <= token_limit:
            lines_to_keep.append(line)
        else:
            removed_count += 1

    print(f"Number of lines in {output_file_path}: {len(lines_to_keep)}")
    with open(output_file_path, "w") as file:
        file.writelines(lines_to_keep)

    return removed_count

In [None]:
input_file_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000.tok"
output_file_path = (
    "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000_max_510.tok"
)
removed_lines = remove_lines_with_too_many_tokens(input_file_path, output_file_path)
print(f"Number of removed lines: {removed_lines}")

# Create files with only one line for each starting sequence

### Break down big Dataset to smaller dataset with more variety

In [None]:
"""
    Remove Duplicate Lines by Start Sequence
    -------------------

    Removes duplicate lines from a file by comparing the first n tokens of each line.
    The first n tokens are called the start sequence.

    Parameters:

    lines: The lines to remove duplicates from.
    start_sequences: A list of integers. Each integer is the length of the start sequence.

    Returns:

    A dictionary with the start sequence length as key and the list of lines as value.

"""


def remove_duplicates_by_start_sequence(lines, start_sequences, debug=True):
    result = {length: [] for length in start_sequences}
    starting_sequences_sets = {length: set() for length in start_sequences}

    for line in lines:
        tokens = line.strip().split()
        for length in start_sequences:
            sequence = " ".join(tokens[:length])
            if sequence not in starting_sequences_sets[length]:
                starting_sequences_sets[length].add(sequence)
                result[length].append(line)

    if debug:
        print("Original:", len(lines))
        for length in start_sequences:
            print(
                f"Stripped duplicates for first {length} tokens:", len(result[length])
            )

    return result

In [None]:
file_path = (
    "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000_max_510.tok"
)
with open(file_path, "r", encoding="utf-8") as file:

    lines = file.readlines()


start_sequences = [13, 16, 20, 24, 28, 32]


results = remove_duplicates_by_start_sequence(lines, start_sequences, debug=True)


for length, saved_lines in results.items():

    out_path = f"D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03_elo_0_1000_max_510_{length}tokens.tok"

    with open(out_path, "w", encoding="utf-8") as file:

        file.writelines(saved_lines)

# Copy File with BOS Token "75 " on start of every line into new file 


In [None]:
in_path = "D:/LEON Safe/Datasets/2023_09/standard_rated_350k.xlanplus.tok"
out_path = "D:/LEON Safe/Datasets/2023_09/standard_rated_350k.xlanplus.tok"

with open(in_path, "r", encoding="utf-8") as file:
    lines = file.readlines()

saved_lines = []
for line in lines:
    new_line = " 75 " + line
    new_line = new_line.replace("  ", " ")
    saved_lines.append(new_line)

with open(out_path, "w", encoding="utf-8") as file:
    file.writelines(saved_lines)

# Creat Validation JSON Files

## Create JSON file for Hard Position Accuracy 
### Make all moves for games in a Textfile and converts to a JSON

In [None]:
import json
import src.notation_converter as converter

lan_file = "./data/validation/hard_positions/hard_pos.xlanplus"
json_file = "./data/validation/hard_positions/hard_positions_xlanplus.json"
games = []
formatted_games = []
c = 0
xlanplus = True


with open(lan_file, "r") as file:
    lines = file.readlines()

for line in lines:
    if not line.strip() or line.startswith("#"):
        continue

    game = line[:-1]
    formatted_game_start_square = {
        "id": c,
        "board_state": game,
        "legal_positions": converter.legal_moves_from_position(game),
    }
    formatted_games.append(formatted_game_start_square)

    c += 1

with open(json_file, "w") as file:
    file.write(json.dumps(formatted_games, indent=2))

## Create Json File for legal piece move accuracy validation
### Checks all positions of a Piece and saves it to a JSON


In [None]:
import json
import src.notation_converter as converter

lan_file = "./data/validation/board_state/board_state_positions.xlanplus"
json_file = "./data/validation/board_state/board_state_positions_xlanplus.json"
games = []
formatted_games = []
c = 0
current_tag = None


with open(lan_file, "r") as file:
    lines = file.readlines()

for line in lines:
    if not line.strip():
        continue

    if line.startswith("#"):
        # replace all hashtags with ""
        current_tag = line.replace("#", "").strip()
        continue

    game = line[:-1]
    # Piece is last char of game
    piece = game[-1]
    board_state = game[:-1]
    legal_start_positions = converter.positions_of_piece_with_legal_moves(
        board_state, piece
    )
    formatted_game_start_square = {
        "id": c,
        "board_state": game,
        "legal_positions": legal_start_positions,
        "piece": piece,
        "tag": current_tag,
    }
    first_start_position = legal_start_positions[0]
    legal_end_positions = converter.end_positions_of_piece_with_legal_moves(
        board_state, piece, first_start_position
    )
    formatted_game_end_square = {
        "id": c + 1,
        "board_state": game + first_start_position,
        "legal_positions": legal_end_positions,
        "piece": piece + first_start_position,
        "tag": current_tag,
    }
    formatted_games.append(formatted_game_start_square)
    formatted_games.append(formatted_game_end_square)

    c += 2

with open(json_file, "w") as file:
    file.write(json.dumps(formatted_games, indent=2))

## Show Board of JSON ID from Hard Position Accuracy und Legal Piece Move Accuracy


### Hard Position Accuracy

In [None]:
from src.validation.validate_position import get_position_by_id
from src.validation.validate_position import show_position_by_id

id_max = 66  # 0 to 66

save_to_file = True

for id in range(id_max + 1):
    file_name = f"hard_positions_{id:02}"
    print(f"id = {id}")
    print(get_position_by_id(id, metric="hard_position", notation="xLANplus"))
    show_position_by_id(
        id, metric="hard_position", notation="xLANplus", save_path=file_name
    )

### Legal Piece Move Accuracy

In [None]:
from src.validation.validate_position import get_position_by_id
from src.validation.validate_position import show_position_by_id

id_max = 193  # 0 to 193

for id in range(id_max + 1):
    file_name = f"legal_piece_moves_{id:02}"
    print(f"id = {id}")
    print(get_position_by_id(id, metric="board_state", notation="xLANplus"))
    show_position_by_id(
        id, metric="board_state", notation="xLANplus", save_path=file_name
    )

# Convert file from xLan to xLan+

In [None]:
from src.notation_converter import xlan_sequence_to_xlanplus


def convert_xlan_to_xlanplus(xlan_file, xlanplus_file):
    with open(xlan_file, "r") as file:
        lines = file.readlines()

    with open(xlanplus_file, "w") as file:
        for line in lines:
            # if empty line or line starts with # copy it
            if not line.strip() or line.startswith("#"):
                file.write(line)
                continue

            xlan_plus = xlan_sequence_to_xlanplus(line)
            file.write(xlan_plus + "\n")

In [None]:
xlan_file = "./data/validation/board_state/board_state_positions.lan"
xlanplus_file = "./data/validation/board_state/board_state_positions.xlanplus"
convert_xlan_to_xlanplus(xlan_file, xlanplus_file)

In [None]:
import matplotlib.pyplot as plt
import numpy as np


file_path = "D:/LEON Safe/Datasets/2024_03/standard_rated_2024-03.pgn"
output_path = "D:/LEON Safe/Datasets/2024_03/elo_distribution.png"


def extract_elos(file_path):
    number_of_elos = 0
    elos = []
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            if "WhiteElo" in line or "BlackElo" in line:
                try:
                    elo = int(line.split('"')[1])
                    elos.append(elo)
                    number_of_elos += 1
                    if number_of_elos % 100000 == 0:
                        print(f"Number of elos extracted: {number_of_elos}")
                except (IndexError, ValueError):
                    pass  # Skip lines that don't have a valid number after WhiteElo or BlackElo
    return elos


elos = extract_elos(file_path)


if elos:
    # Calculate statistics
    mean_elo = np.mean(elos)
    median_elo = np.median(elos)
    std_elo = np.std(elos)

    # Plotting the histogram
    plt.figure(figsize=(10, 6))
    plt.hist(elos, bins=range(0, 3000, 50), edgecolor="black", alpha=0.7)
    plt.title("Elo Distribution", fontsize=15)
    plt.xlabel("Elo", fontsize=12)
    plt.ylabel("Number of games", fontsize=12)
    plt.grid(axis="y", linestyle="--", alpha=0.7)

    # Adding statistics as text on the plot
    plt.text(
        2000,
        800,
        f"Mean: {mean_elo:.2f}\nMedian: {median_elo:.2f}\nStd Dev: {std_elo:.2f}",
        fontsize=12,
        bbox=dict(facecolor="white", alpha=0.5),
    )

    # Adding error count as text on the plot
    plt.text(
        2000,
        700,
        f"Errors: {errors}",
        fontsize=12,
        bbox=dict(facecolor="white", alpha=0.5),
    )

    # Saving the plot
    plt.savefig(output_path)
    plt.show()
else:
    print("No valid Elo data found in the PGN file.")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

median_elo = np.median(elos)
average_elo = np.mean(elos)

# Plot the Elo distribution
plt.figure(figsize=(10, 6))
plt.hist(elos, bins=30, edgecolor="black", alpha=0.7)
plt.axvline(1000, color="r", linestyle="dashed", linewidth=1)
plt.axvline(2000, color="r", linestyle="dashed", linewidth=1)
plt.axvline(
    median_elo,
    color="g",
    linestyle="dashed",
    linewidth=1,
    label=f"Median Elo: {median_elo:.2f}",
)
plt.axvline(
    average_elo,
    color="b",
    linestyle="dashed",
    linewidth=1,
    label=f"Average Elo: {average_elo:.2f}",
)
plt.title("Elo Rating Distribution")
plt.xlabel("Elo Rating")
plt.ylabel("Frequency")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Calculate the number of Elo ratings under 1001, between 1001 and 1999, and over 1999
under_1001 = sum(1 for elo in elos if elo <= 1000)
between_1001_and_1999 = sum(1 for elo in elos if 1001 <= elo <= 1999)
over_1999 = sum(1 for elo in elos if elo >= 2000)

under_1001, between_1001_and_1999, over_1999

In [None]:
between_1001_and_1999 = sum(1 for elo in elos if 1500 <= elo <= 1600)

In [None]:
between_1001_and_1999