In [1]:
import requests
import pandas as pd
import time
import json
import re

In [8]:
API_TOKEN = "I removed this for obvious reasons"

headers = {
    'Authorization': f'Bearer {API_TOKEN}'
}

response = requests.get('https://lichess.org/api/account', headers=headers)

# if response.status_code == 200:
#     print(response.json())
# else:
#     print(f"Error fetching account data: {response.status_code}")

## Getting team members from a specific team

In [57]:
def get_team_members(team_id, full=False, max_members=None):
    url = f'https://lichess.org/api/team/{team_id}/users'
    params = {'full': str(full).lower()}
    response = requests.get(url, stream=True)  # Use stream=True to process data incrementally

    if response.status_code == 200:
        members = []
        for i, line in enumerate(response.iter_lines(decode_unicode=True)):
            if max_members and i >= max_members:
                break  # Stop processing if reach limit that I set
            members.append(json.loads(line))
        return members
    else:
        print(f"Error fetching team members: {response.status_code}")
        return []


## Getting the games of a specific user

In [51]:
def get_user_analyzed_games(username, max_games=10, retries=3):
    url = f'https://lichess.org/api/games/user/{username}'
    params = {
        'max': max_games,          # Limit the number of games
        'analysed': 'true',        # Only fetch analyzed games
        'evals': 'true',           # Include centipawn evaluations
        'literate': 'true',        # Include annotations like "mistake" and "inaccuracy"
        'sort': 'dateDesc',        # Most recent games first
    }
    
    for i in range(retries):
        try:
            response = requests.get(url, headers=headers, params=params, timeout=10)
            if response.status_code == 200:
                return response.text  # get the PGN response as a plain string
            elif response.status_code == 429:
                print(f"Rate limit hit. Retrying in {2 ** i} seconds...")
                time.sleep(2 ** i)  # Backoff exponentially to give it time to rest
            else:
                print(f"Error fetching games for {username}: {response.status_code}")
                return ""
        except requests.exceptions.Timeout:
            print(f"Timeout occurred. Retrying in {2 ** i} seconds...")
            time.sleep(2 ** i)
        except requests.exceptions.ConnectionError as e:
            print(f"Connection error for user {username}: {e}. Retrying in {60*i} seconds...")
            time.sleep(60 * i) 
    return ""


## Extracting the player error counts

In [52]:
def count_mistakes(moves):
    white_inaccuracies = white_mistakes = white_blunders = 0
    black_inaccuracies = black_mistakes = black_blunders = 0

    # Regex pattern to capture moves annotations
    move_pattern = r'(\d+\.)\s+(\S+)(?:\s+\{[^}]*?(Inaccuracy|Mistake|Blunder)[^}]*?\})?|\d+\.\.\.\s+(\S+)(?:\s+\{[^}]*?(Inaccuracy|Mistake|Blunder)[^}]*?\})?'

    matches = re.findall(move_pattern, moves)

    for match in matches:
        move_number, white_move, white_error, black_move, black_error = match

        if white_move and white_error:
            if white_error == "Inaccuracy":
                white_inaccuracies += 1
            elif white_error == "Mistake":
                white_mistakes += 1
            elif white_error == "Blunder":
                white_blunders += 1

        if black_move and black_error:
            if black_error == "Inaccuracy":
                black_inaccuracies += 1
            elif black_error == "Mistake":
                black_mistakes += 1
            elif black_error == "Blunder":
                black_blunders += 1

    return white_inaccuracies, white_mistakes, white_blunders, black_inaccuracies, black_mistakes, black_blunders

## Seperate all game data into their respective columns

In [23]:
def process_pgn_data_with_mistakes(pgn_data):
    games = []
    games_data = pgn_data.strip().split("\n\n")  # Separate games based on blank lines

    temp_game = None

    for game in games_data:
        if re.search(r'\[Event "', game):  # Can find main rows with the '[Event' tag
            if temp_game:
                games.append(temp_game)  # Make sure we save the last game before starting a new one
            temp_game = {}  
            temp_game['event'] = re.search(r'\[Event "(.*?)"\]', game).group(1)
            temp_game['site'] = re.search(r'\[Site "(.*?)"\]', game).group(1)
            temp_game['date'] = re.search(r'\[Date "(.*?)"\]', game).group(1)
            temp_game['white'] = re.search(r'\[White "(.*?)"\]', game).group(1)
            temp_game['black'] = re.search(r'\[Black "(.*?)"\]', game).group(1)
            temp_game['result'] = re.search(r'\[Result "(.*?)"\]', game).group(1)
            temp_game['white_elo'] = re.search(r'\[WhiteElo "(.*?)"\]', game).group(1) or "Unknown"
            temp_game['black_elo'] = re.search(r'\[BlackElo "(.*?)"\]', game).group(1) or "Unknown"
            temp_game['time_control'] = re.search(r'\[TimeControl "(.*?)"\]', game).group(1)
            temp_game['eco'] = re.search(r'\[ECO "(.*?)"\]', game).group(1) or "Unknown"
            temp_game['termination'] = re.search(r'\[Termination "(.*?)"\]', game).group(1) or "Unknown"
        elif temp_game:
            # For unknown rows with moves
            moves = game.strip()
            if not moves.startswith('['):  # Make sure its not just another header tag
                temp_game['moves'] = moves  # Attach  moves to the current game

                # Count mistakes from the moves with the function
                (
                    temp_game['white_inaccuracies'],
                    temp_game['white_mistakes'],
                    temp_game['white_blunders'],
                    temp_game['black_inaccuracies'],
                    temp_game['black_mistakes'],
                    temp_game['black_blunders']
                ) = count_mistakes(moves)

                games.append(temp_game)  
                temp_game = None 

    if temp_game:
        games.append(temp_game)  

    return games


## Saving

In [24]:
def save_to_csv(games, filename='lichess_user_games.csv'):
    """Save the collected game data to a CSV file."""
    df = pd.DataFrame(games)
    df.to_csv(filename, index=False)
    print(f"Saved {len(games)} games to {filename}")

# Master

Output has been cleared for obvious reasons

In [None]:
team_ids = ['lichess-swiss' ,'coders', 'bengal-tiger', 'im-eric-rosen-fan-club', 'zhigalko_sergei-fan-club', 'arab-world-team']  # Replace with your list of team IDs

all_games = []

for team_id in team_ids:
    print(f"Fetching members for team: {team_id}")
    members = get_team_members(team_id, max_members = 500)  # Fetch members for the current team
    
    if members:
        for member in members:
            username = member['id']  # 'id' = username
            print(f"Fetching games for user: {username} from team: {team_id}")
            pgn_data = get_user_analyzed_games(username, max_games=10)  # Limit to 10 games per user 
            
            if pgn_data:
                games = process_pgn_data_with_mistakes(pgn_data) 
                all_games.extend(games)
    else:
        print(f"No members found or error retrieving members for team: {team_id}")
    
    print(f"Finished team: {team_id}. Total games collected: {len(all_games)} rows")

save_to_csv(all_games, 'games1.csv')
