<a href="https://colab.research.google.com/github/ShkarupyloMaksym/lichess_datamining/blob/main/db_creation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
!pip install zstandard
!pip install chess

Collecting zstandard
  Downloading zstandard-0.22.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m29.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: zstandard
Successfully installed zstandard-0.22.0
Collecting chess
  Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess
Successfully installed chess-1.10.0


In [None]:
import datetime
from tqdm import tqdm
import requests
import zstandard as zstd
import chess.pgn
import os
import random
import time

# Constants

In [None]:
data_name = '2013 - July'
data_name = '2017 - May'


In [None]:
size_of_evaled_dataset = 1000

In [None]:
size_of_cutted_dataset = 1008

# Data

In [None]:
chunk_size = 1024
def month_name_to_number(month_name):
    date_obj = datetime.datetime.strptime(month_name, "%B")
    month_number = date_obj.month
    return str(month_number).zfill(2)


def download_file_from_link(url, save_path):
    print('Start download')
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        total_size_in_bytes = int(response.headers.get('content-length', 0))
        progress_bar = tqdm(total=total_size_in_bytes, unit='B', unit_scale=True)
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=chunk_size):
                f.write(chunk)
                progress_bar.update(len(chunk))
        progress_bar.close()
        print("File downloaded successfully!")


def decompress_file(name_start, name_end):
    print('Start decompressing')
    with open(name_start, 'rb') as compressed:
        dctx = zstd.ZstdDecompressor()
        with open(name_end, 'wb') as decompressed, dctx.stream_reader(compressed) as reader:
            with tqdm(unit='B', unit_scale=True, desc="Decompressing") as pbar:
                while True:
                    chunk = reader.read(chunk_size)
                    if not chunk:
                        break
                    decompressed.write(chunk)
                    pbar.update(len(chunk))
    print('File decompressed')


def create_link_from_name(data_name):
  year, month = data_name.split(' - ')
  month = month_name_to_number(month)
  link_to_file = f'https://database.lichess.org/standard/lichess_db_standard_rated_{year}-{month}.pgn.zst'
  return link_to_file

def download_and_decompress(link, final_name):
  name = link.split('/')[-1]
  download_file_from_link(link, name)
  decompress_file(name, final_name)

## Download

In [None]:
url = create_link_from_name(data_name)
final_name = url.split('/')[-1][:-4]
download_and_decompress(url, final_name)

Start download


100%|██████████| 3.59G/3.59G [01:53<00:00, 31.5MB/s]


File downloaded successfully!
Start decompressing


Decompressing: 25.0GB [04:28, 92.9MB/s]

File decompressed





## Create small

In [None]:
# output_file_name = f'first_{size_of_cutted_dataset}_lines_{final_name}'

# with open(final_name, 'r') as input_file, open(output_file_name, 'w') as output_file:

#     for _ in range(size_of_cutted_dataset):
#         line = input_file.readline()
#         if not line:
#             break

#         output_file.write(line)

## Count number of games in big pgn file

In [None]:
def count_games_in_pgn_file(input_filename):
    game_count = 0

    with open(input_filename, 'r', encoding='utf-8') as pgn:
        for line in pgn:
            if line.startswith('[Event '):
                game_count += 1

    print(f"Total number of games in {input_filename}: {game_count}")


count_games_in_pgn_file('lichess_db_standard_rated_2017-05.pgn')

Total number of games in lichess_db_standard_rated_2017-05.pgn: 11693919


## Split big pgn in n small

In [None]:
def split_pgn_by_event_with_logging(input_filename, output_filename_pattern, num_of_games_per_file):
    game_count = 0
    file_index = 1
    current_file_game_count = 1
    output_file = open(output_filename_pattern.format(file_index), 'w', encoding='utf-8')

    with open(input_filename, 'r', encoding='utf-8') as pgn:
        for line in pgn:
            # Check if the line is the start of a new game
            if line.startswith('[Event ') and current_file_game_count > 0:
                game_count += 1
                current_file_game_count += 1

                # Log every 10k games processed
                if game_count % 100000 == 0:
                    print(f"Processed {game_count} games so far...")

                # If the current output file has reached the limit, start a new file
                if current_file_game_count > num_of_games_per_file:
                    output_file.close()
                    file_index += 1
                    current_file_game_count = 1  # Reset game count for the new file
                    output_file = open(output_filename_pattern.format(file_index), 'w', encoding='utf-8')

            # Write the current line to the current output file
            output_file.write(line)

    # Close the last output file
    output_file.close()

    print(f"Total games processed: {game_count}")
    print(f"Total files created: {file_index}")

# Example usage
input_filename = 'lichess_db_standard_rated_2017-05.pgn'
output_filename_pattern = 'output_pgn_part_{}.pgn'  # {} will be replaced by the part number
num_of_games_per_file = 1_000_000  # Example: 1 million games per file

split_pgn_by_event_with_logging(input_filename, output_filename_pattern, num_of_games_per_file)

## Create pgn with only evaluated games

In [None]:
random.seed(42)

def game_contains_eval(game):
    """Check if a game contains evaluation comments."""
    node = game
    while not node.is_end():
        next_node = node.variation(0)
        if "[%eval" in next_node.comment:
            return True
        node = next_node
    return False

def random_chance_with_seed(probability):
    """Return True with the given probability using a fixed seed."""
    return random.random() < probability

def filter_games_with_eval(input_filename, output_filename):
    file_size = os.path.getsize(input_filename)
    progress_bar = tqdm(total=file_size, unit='B', unit_scale=True, desc="Processing")
    current_position = 0

    with open(input_filename, "r", encoding="utf-8") as pgn_in:
        # Initialize tqdm with the file size as total progress measurement
            with open(output_filename, "w", encoding="utf-8") as pgn_out:
                while True:
                    current_pos = pgn_in.tell()
                    game = chess.pgn.read_game(pgn_in)
                    if game is None:
                        break
                    # Update tqdm based on the bytes read
                    progress_bar.update(pgn_in.tell() - current_pos)

                    if game_contains_eval(game):
                        pgn_out.write(str(game) + "\n\n")

def filter_games_with_eval(input_filename, output_filename, num_of_games=None, logging_num=100, probability_of_watch_game=0.08, start_index=1, end_index=None):
    """Filter games that contain evaluation comments and save them to a new file."""

    num_of_written_games = 0
    total_analyzed_games = 0
    total_games = 0
    now = time.time()

    with open(input_filename, "r", encoding="utf-8") as pgn_in:

        while True:
            start_position = pgn_in.tell()  # Get current position in file
            game = chess.pgn.read_game(pgn_in)
            if game is None:
                break  # End of file
            total_games += 1

            # Skip games until reaching the start_index
            if total_games < start_index:
                continue

            # Stop processing if end_index is reached
            if end_index and total_games > end_index:
                break

            if random_chance_with_seed(probability_of_watch_game):
              total_analyzed_games += 1

              if game_contains_eval(game):
                  # Open output file in append mode to prevent overwriting content on each write
                  with open(output_filename, "a", encoding="utf-8") as pgn_out:
                      print(game, file=pgn_out, end="\n\n")
                      num_of_written_games += 1
                      if num_of_written_games % logging_num == 0 and num_of_written_games != 0:
                            now_new = time.time()
                            taken_time = round(now_new - now)
                            taken_time_min, taken_time_sec = taken_time // 60, taken_time % 60
                            taken_time_sec = str(taken_time_sec).zfill(2)
                            print(f'Has written {num_of_written_games} evaluated games, analized {total_analyzed_games}, total games = {total_games}, taken time = {taken_time_min}:{taken_time_sec}')
                            now = now_new
                      if num_of_games is not None and num_of_written_games >= num_of_games:
                        break


In [None]:
partitional_pgn_file_name = 'output_pgn_part_2.pgn'

evaled_games_filename = f'evaled_{partitional_pgn_file_name}'
filter_games_with_eval(partitional_pgn_file_name, evaled_games_filename)

# Export to csv

In [None]:
import chess.pgn
import pandas as pd

def parse_pgn(input_filename, output_filename):
    pgn = open(input_filename)
    game_id = 1
    metadata_list = []

    while True:
        game = chess.pgn.read_game(pgn)
        if game is None:
            break

        num_moves = 0
        moves_with_comments = []
        node = game

        while not node.is_end():
            next_node = node.variation(0)
            move_san = node.board().san(next_node.move)
            comment = next_node.comment if next_node.comment else ""
            nags_str = ''.join([nag_to_symbol(nag) for nag in next_node.nags])
            move_and_comment = f"{move_san}{nags_str} {{{comment}}}" if comment else move_san
            moves_with_comments.append(move_and_comment)
            node = next_node
            num_moves += 1


        moves_str = ' '.join(moves_with_comments)

        metadata = {
            'GameID': game_id,
            'Event': game.headers.get('Event', ''),
            'Site': game.headers.get('Site', ''),
            'Date': game.headers.get('UTCDate', ''),
            'Time': game.headers.get('UTCTime', ''),
            'White': game.headers.get('White', ''),
            'Black': game.headers.get('Black', ''),
            'Result': game.headers.get('Result', ''),
            'WhiteElo': game.headers.get('WhiteElo', ''),
            'BlackElo': game.headers.get('BlackElo', ''),
            'WhiteRatingDiff': game.headers.get('WhiteRatingDiff', ''),
            'BlackRatingDiff': game.headers.get('BlackRatingDiff', ''),
            'ECO': game.headers.get('ECO', ''),
            'Opening': game.headers.get('Opening', ''),
            'TimeControl': game.headers.get('TimeControl', ''),
            'TotalMoves': num_moves,
            'Moves': moves_str
        }
        metadata_list.append(metadata)

        game_id += 1
        if game_id % 1000 == 0:
          print(f'completed {game_id} in {input_filename}')


    chess_games = pd.DataFrame(metadata_list)
    chess_games.to_csv(output_filename, index=False)
    return chess_games

In [None]:
# number of database parts
for i in range(1, 13):
  chess_games = parse_pgn(f'evaled_output_pgn_part_{i}.pgn', f'games_metadata_{i}.csv')
  print(f'{i} file was completed')



# Use API to get players info

In [None]:
def get_profile(username, fields):
    profile = {}
    url = f'https://lichess.org/api/user/{username}'
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        account_info = response.json()
        print(url)
        for field in fields:
            try:
                keys = field.split('/')
                value = account_info
                for key in keys:
                    if key in value:
                        value = value.get(key, {})
                    else:
                        value = None
                        break
                profile[field] = value
            except Exception as e:
                print(f"Error processing {field} for {username}: {e}")
                profile[field] = None
    else:
      print(response.text)
    return profile

In [None]:
def update_profiles(df, fields, name_for_saving):
  now = time.time()
  columns_names = ['White', 'Black']
  for column in columns_names:
      for field in fields:
          df[f"{column}_{field.replace('/', '_')}"] = None

  for index, row in df.iterrows():
      if index % 10 == 0 and index != 0:
        now_new = time.time()
        taken_time = round(now_new - now)
        taken_time_min, taken_time_sec = taken_time // 60, taken_time % 60
        taken_time_sec = str(taken_time_sec).zfill(2)
        print(f'{index} was completed, taken time = {taken_time_min}:{taken_time_sec}')
        df.to_csv(name_for_saving, index=False)
        return None
      for column in columns_names:
        username = row[column]
        profile = get_profile(username, fields)
        print(profile)
        for field, value in profile.items():
            df.at[index, f"{column}_{field.replace('/', '_')}"] = value

In [None]:
token = 'YOUR_LICHESS_TOKEN'

headers = {
    'Authorization': token
}

fields=["profile/flag", "createdAt", "playTime/total", "count/all", "tosViolation", "title"]

# number of metadata csv, you wanted to update
i=9
games_metadata = pd.read_csv(f'games_metadata_{i}.csv')
update_profiles(games_metadata, fields, f'games_metadata_profile_{i}.csv')
games_metadata.to_csv(f'games_metadata_profile_{i}.csv', index=False)
print(f"{i} file was updated")

# Clear files

In [None]:
import os

def delete_files_with_prefix(directory, prefix):
    """
    Delete files in the specified directory that start with the given prefix.

    :param directory: Path to the directory containing the files.
    :param prefix: The prefix to match for file deletion.
    """
    for filename in os.listdir(directory):
        if filename.startswith(prefix):
            file_path = os.path.join(directory, filename)
            try:
                os.remove(file_path)
                print(f"Deleted: {file_path}")
            except Exception as e:
                print(f"Error deleting {file_path}: {e}")

# Example usage
directory = '/content'
prefix = 'output_pgn_part_'
delete_files_with_prefix(directory, prefix)

In [None]:
from google.colab import files

# Specify the file path in Colab you want to download
file_path = '/content/output_pgn_part_1.pgn'

# Download the file to your local machine
files.download(file_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>