In [9]:
import chess
import openai
import os
from stockfish import Stockfish
import time
from dotenv import load_dotenv

load_dotenv()  # allows access to environment variables in .env file


# Initialize Stockfish and OpenAI
openai.api_key = os.environ.get('OPENAI_API_KEY')
path_to_stockfish = "stockfish"

In [10]:
# Notes
# The "chess" Python library can check for valid moves and checkmate
# The openai library communicates with the Open AI API
# The stockfish library provides a Pythonic interface to the
# Stockfish chesss engine.

# To give GPT the best possible input (with the most training data)
# We will give it games in Algebraic notation.
# This requires jumping through small extra hoops to parse its response.

In [11]:
def save_game(file_name, move_history, game_outcome, current_elo):
    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(file_name), exist_ok=True)

    with open(file_name, "w") as file:
        file.write(move_history + "\n")
        file.write("Outcome: " + game_outcome + "\n")
        file.write(f"Stockfish ELO: {current_elo}\n")

In [12]:
def next_white_move(algebraic_notation):
    # Check if the first character is a digit (move number)
    if algebraic_notation[0].isdigit():
        # Split the string into individual parts based on space
        moves = algebraic_notation.split(' ')
        # Separate move numbers from moves and recombine as a new list
        moves_separated = []
        for move in moves:
            if '.' in move:
                move_number, move_san = move.split('.')
                moves_separated.append(move_number + '.')
                if move_san:  # If there's a move after the '.', append it as a separate item
                    moves_separated.append(move_san)
            else:
                moves_separated.append(move)
        # Filter out move numbers and black's moves, keeping only white's moves
        white_moves = [move for i, move in enumerate(moves_separated) if not move.endswith('.') and (i - 1) % 2 == 0]
        # Return the next move by white
        return white_moves[0] if white_moves else None
    else:
        # The case when the notation doesn't start with a move number
        moves = algebraic_notation.split()
        # In this case, white's move is always the first one
        return moves[0] if moves else None

In [13]:
def to_long_algebraic(move_history_san):
    long_form_moves = []
    for i in range(0, len(move_history_san), 2):
        move_number = i // 2 + 1
        white_move = move_history_san[i]
        if i + 1 < len(move_history_san):
            black_move = move_history_san[i + 1]
            long_form_moves.append(f"{move_number}. {white_move} {black_move}")
        else:
            long_form_moves.append(f"{move_number}. {white_move}")
    return " ".join(long_form_moves)

In [14]:
def play_chess(ELO):

    # Initialize Stockfish with the given ELO
    stockfish = Stockfish(path=path_to_stockfish)
    stockfish.update_engine_parameters({"UCI_Elo": ELO})

    # Initialize the chess board
    board = chess.Board()

    # Keep track of the move history
    move_history_uci = []  # List of moves using Universal Chess Interface Notation (["e2e4",...])
    move_history_san = []  # List of moves using Standard Algebraic Notation (["e4,...])
    move_history_san_str = ""  # String with moves in Standard Algebraic Notation ("1. e4...")

    # Keep track of the invalid moves by GPT
    invalid_moves = []

    response = openai.ChatCompletion.create(
        # model="gpt-3.5-turbo",  # replace with actual GPT-4 model name when available
        model="gpt-4",  # replace with actual GPT-4 model name when available
        messages=[
            {"role": "system", "content": "You are an expert chess player."},
            {"role": "user", "content":
                """Our chess game just started, and you are playing with the white pieces.

                What would you like to make your first move? Please reply with algebraic notation only.

                Here are examples of responses you could provide:

                1. e4
                1. d4

                Your response:"""}],
        max_tokens=20
    )

    # Get the move from the response
    gpt_first_move = response['choices'][0]['message']['content'].strip()

    # The reply includes the full move history, so we need to get the last element
    # Here, there is only one move.
    first_move_san = next_white_move(gpt_first_move)

    # Try to make the move
    try:
        parsed_move = board.parse_san(first_move_san)
        board.push(parsed_move)
        uci_move = parsed_move.uci()

        # save our versions of move history for Stockfish and the GPT
        move_history_uci.append(uci_move)
        move_history_san.append(first_move_san)
        move_history_san_str = gpt_first_move

        stockfish.set_position(move_history_uci)  # Update the Stockfish board

    except Exception:
        print(f"GPT suggested an invalid first move: {gpt_first_move}.")

    # While the game isn't over
    while not board.is_checkmate() and not board.is_stalemate():
        if not board.turn:  # True if it's black's turn
            # Get Stockfish's move
            result = stockfish.get_best_move()  # Stockfish replies using UCI, ("c7c5")

            # Convert to alternative notations
            move = chess.Move.from_uci(result)  # Move.from_uci('c7c5')
            move_san = board.san(move)  # standard algebraic notation: ("c5")

            # Try to make the move
            try:
                board.push(move)  # Update the chess board
                move_history_uci.append(result)
                move_history_san.append(move_san)
                move_history_san_str = to_long_algebraic(move_history_san)
            except Exception:
                print("Stockfish suggested an invalid move.")
                break

            # Print the game state
            print(move_history_san_str)

        if not board.is_checkmate() and not board.is_stalemate():
            # Calculate the delay based on your rate limit
            standard_delay = 8

            # Sleep for the delay
            time.sleep(standard_delay)    # currently 3 seconds

            # Initial delay in seconds before a retry attempt.
            initial_retry_delay = 8
            max_retry_delay = 60  # You can set a maximum delay limit

            current_retry_delay = initial_retry_delay

            # Ask GPT-4 for the next move
            while True:
                try:
                    # Ask GPT for the next move
                    game_state = move_history_san_str

                    # Check if there were any invalid moves
                    invalid_moves_string = ""
                    if invalid_moves:
                        invalid_moves_string = "\nPlease avoid these invalid moves: " + ', '.join(invalid_moves)

                    response = openai.ChatCompletion.create(
                        model="gpt-4",  # replace with actual GPT-4 model name when available
                        messages=[
                            {"role": "system", "content": "You are an expert chess player."},
                            {"role": "user", "content":
                                """You're playing with the white pieces, and it's your turn.

                                Please use Standard Algebraic Notation to reply. Use Standard Algebraic Notation "+" and "#" for check and checkmate.

                                Our game is currently:""" + game_state + invalid_moves_string + """

                                Your next move:"""}
                        ],
                        max_tokens=400
                    )

                    # Get the move from the response
                    gpt_move = response['choices'][0]['message']['content'].strip()
                    gpt_move_san = next_white_move(gpt_move)  # retrieve just the next white move

                    parsed_move = board.parse_san(gpt_move_san)
                    board.push(parsed_move)
                    uci_move = parsed_move.uci()

                    # save our versions of move history for Stockfish, chess, and GPT
                    move_history_uci.append(uci_move)
                    move_history_san.append(gpt_move_san)
                    move_history_san_str = to_long_algebraic(move_history_san)

                    # Tell stockfish about the latest move
                    stockfish.set_position(move_history_uci)  # Update the Stockfish board

                    break  # If parsing and applying the move was successful, break the while loop

                except ValueError as ve:
                    print("Error parsing the AI's move. Asking AI for a new move...")
                    print(f"GPT may have suggested an invalid move: {gpt_move_san}")
                    invalid_moves.append(gpt_move_san)

                except openai.error.APIError as e:
                    if e.http_status == 502:  # Bad Gateway
                        print(f"Encountered a temporary error. Waiting {current_retry_delay} seconds before retrying...")
                        time.sleep(current_retry_delay)

                        # Increase the delay for each subsequent retry attempt
                        current_retry_delay *= 2

                        # Do not exceed the maximum delay
                        if current_retry_delay > max_retry_delay:
                            current_retry_delay = max_retry_delay
                    else:
                        # If it's a different type of ApiError, re-raise it
                        raise

                except openai.error.RateLimitError as rle:  # Assuming the RateLimitError is in openai.error module
                    print(f"Encountered rate limit error. Waiting {current_retry_delay} seconds before retrying...")
                    time.sleep(current_retry_delay)

                    # Increase the delay for each subsequent retry attempt
                    current_retry_delay *= 2

                    # Do not exceed the maximum delay
                    if current_retry_delay > max_retry_delay:
                        current_retry_delay = max_retry_delay

    # Determine the winner
    if board.is_checkmate():
        if board.turn:
            game_outcome = "Black (Stockfish) wins!"
        else:
            game_outcome = "White (GPT) wins!"
    elif board.is_stalemate() or board.is_insufficient_material():
        game_outcome = "The game is a draw."

    # Save the game and outcome to a text file
    file_name = os.path.join("saved_chess_games", f"game_{time.strftime('%Y%m%d_%H%M%S')}.txt")
    save_game(file_name, move_history_san_str, game_outcome, ELO)
    return game_outcome

In [15]:
def find_gpt_elo():

    min_elo = 1000
    max_elo = 2200
    elo_tolerance = 20  # The ELO difference we'll accept as "close enough"

    while max_elo - min_elo > elo_tolerance:
        current_elo = (min_elo + max_elo) // 2
        result = play_chess(current_elo)
        print(f"Result against Stockfish with ELO {current_elo}: {result}")

        if result == "White (GPT) wins!":
            min_elo = current_elo
        else:
            max_elo = current_elo

    print(f"GPT's approximate ELO is {current_elo}")

In [None]:
find_gpt_elo()

1. e4 c6
1. e4 c6 2. d4 Nf6
1. e4 c6 2. d4 Nf6 3. Nc3 g6
