<a href="https://colab.research.google.com/github/VasRamesh/chess-cheat-hunter/blob/main/01-data-pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Chess.com API to access data

In [None]:
!pip install chess.com

### Package imports & pprint

In [5]:
# Necessary Imports
from chessdotcom import get_leaderboards, get_country_players, get_player_game_archives
import pprint
import requests
import pandas as pd
import threading
import json
import time
import pickle
import os

printer = pprint.PrettyPrinter()

### Create list of players to analyze

In [6]:
def create_player_archive(iso: str)->list:
  # 10k players from the US for testing purposes:
  iso_alpha_code = "US"
  data = get_country_players(iso=iso_alpha_code, tts=0).json
  player_list = data['players']

  # Get monthly archive per player:
  archive = []
  for i, player in enumerate(player_list):  # All the archives for the first 5 players in player_list
    #page = get_player_game_archives(username=player, tts=i/10).json
    threading.Thread(target=player_game_archives_threading, args=(player, i/10, archive)).start()
    time.sleep(0.23)

  return archive


In [None]:
def player_game_archives_threading(player: str, tts, archive: list):

  page = get_player_game_archives(username=player, tts=tts).json
  archive.extend(page['archives'])
  return page

### Collecting accuracies from archive.text

In [None]:
# H-METHOD: Open URL with requests package
def fetch_json_from_url(url, user_agent=None):

    headers = {'User-agent': user_agent} if user_agent else {}
    response = requests.get(url, headers=headers)
    response.raise_for_status()  # Check if the request was successful
    data = response.json()  # Parse the response JSON data
    return data


In [None]:
# H-METHOD: Find 'accuracy' keys in game_data dict (specific to chess.com API)
def find_acc(game_data: dict)->list:
  for game in game_data['games']:
    if 'accuracies' in game.keys() and game['rules'] == 'chess':
      return_list = {} # [white username, white accuracy, white elo, black username, black accuracy, black elo]
      return_list['w_username'] = game['white']['username']
      return_list['w_accuracy'] = round(game['accuracies']['white'], 2)
      return_list['w_rating'] = game['white']['rating']
      return_list['b_username'] = game['black']['username']
      return_list['b_accuracy'] = round(game['accuracies']['black'], 2)
      return_list['b_rating'] = game['black']['rating']
      return return_list
    return None


### Test add entry method to replace fill_pd() to help threading

In [None]:
def add_entry(url, acc_data:pd.core.frame.DataFrame):
  game_data = fetch_json_from_url(url, user_agent="MLChessProject (vasanth765@gmail.com)")
  accuracies = find_acc(game_data)
  if accuracies:
      # print(f"Accuracies found! --> added {accuracies}")
      acc_data.loc[len(acc_data)] = accuracies
  # else:
  #     print(f"No accuracies found")

### Define thread function for processing speed

In [None]:
# MAIN FUNCTION which returns df with all collected and processed data
def main()->pd.core.frame.DataFrame:

  # Create dataframe
  acc_data = pd.DataFrame(columns=['w_username',
                                   'w_accuracy',
                                   'w_rating',
                                   'b_username',
                                   'b_accuracy',
                                   'b_rating'])

  archive = create_player_archive(iso='US')

  # Threading
  threadlist = []
  count = 0
  for url in archive:

    if count % 100 == 0: print(f"Count: {count}")
    count += 1

    threading.Thread(target=add_entry, args=(url,acc_data)).start()
    time.sleep(0.35)

  return acc_data


### Call thread_function

In [None]:
acc_data = main()

### Save created dataframe with pickle

In [4]:
#from google.colab import drive
#drive.mount('/content/drive')

In [7]:
# Save file to directory via pickle
def save_via_pickle(acc_data: pd.core.frame.DataFrame):
  #path = '/content/drive/My Drive/Colab Notebooks/Chess Project/acc_data.pkl'
  path = 'acc_data.pk1'
  if not os.path.exists(path):
    with open(path, 'wb') as f:
        pickle.dump(acc_data, f)
        print("Saved!")
  else:
    print(f"File at path: {path} already exists. Please delete this to rewrite.")

In [None]:
save_via_pickle(acc_data=acc_data)