In [None]:
import numpy as np
import random
import threading
import requests
import json
import tensorflow as tf
import sys
import asyncio
import nest_asyncio
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dropout, Flatten, Dense
from tensorflow.keras.models import Model
import chess
import chess.pgn
from tensorflow.keras.callbacks import Callback


In [8]:
# import lichess games

def parse_pgn(pgn_file, max_games):
    games = []
    with open(pgn_file) as f:
        for game in range(max_games):
            game = chess.pgn.read_game(f)
            if game is None:
                break
            games.append(game)
    return games

training_games = parse_pgn("lichess_elite_2022-03.pgn", max_games = 1000)
testing_games = parse_pgn("lichess_elite_2022-04.pgn", max_games = 200)

In [None]:
# makes stockfish not throw errors
if sys.platform.startswith("win"):
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

# converts lichess's FEN notation to a 8x8 x 12 tensor
def board_to_tensor(board):
    tensor = np.zeros((8, 8, 12), dtype=np.float32) # essentially a 12 stack of 8x8 chess boards
    piece_to_index = {  # encode pieces as numbers
        chess.PAWN: 0,
        chess.KNIGHT: 1,
        chess.BISHOP: 2,
        chess.ROOK: 3,
        chess.QUEEN: 4,
        chess.KING: 5,
    }
    for square, piece in board.piece_map().items(): # place each piece on the right square on the right board
        row = 7 - (square // 8)  # Flip the board so that row 0 is the bottom
        col = square % 8
        idx = piece_to_index[piece.piece_type]
        if piece.color == chess.BLACK: #white pieces occupy boards 0-5, black pieces occupy boards 6-11
            idx += 6
        tensor[row, col, idx] = 1
    return tensor

def move_to_index(move):
    return move.from_square * 64 + move.to_square

# fill an array with FEN notation games
def parse_pgn_draws(pgn_file, num_games):
    games = []
    with open(pgn_file) as f:
        for _ in range(num_games):
            game = chess.pgn.read_game(f)
            if game is None:
                break
            games.append(game)
    return games

# get stockfish engine 
engine = chess.engine.SimpleEngine.popen_uci("stockfish/stockfish-windows-x86-64-sse41-popcnt.exe")

# convert games to training/testing examples, label each board state with best move from stockfish
def convert(games, engine):
    X = []  # Board tensors
    y = []  # Best move labels (as indices)
    
    for game in games:
        board = game.board()  # start at the initial board position
        for move in game.mainline_moves():
            # use 1/10 of the board states in the game
            if random.random() < 0.1 and not board.is_game_over():
                try:
                    result = engine.analyse(board, chess.engine.Limit(time=0.01)) # create labels using stockfish engine
                    best_move = result['pv'][0] if 'pv' in result else None # get stockfish prediction
                    if best_move is not None:
                        board_tensor = board_to_tensor(board)
                        move_index = move_to_index(best_move) # label dataset with stockfish prediction
                        X.append(board_tensor)
                        y.append(move_index)
                except Exception as e:
                    print("Engine analysis failed:", e) # would otherwise break if computer accidentally turns off
            board.push(move)
    
    X = np.array(X)
    y = np.array(y)
    return X, y

X_train, y_train = convert(training_games, engine)
print(f"Training examples: {X_train.shape[0]}")

X_test, y_test = convert(testing_games, engine)
print(f"Testing examples: {X_test.shape[0]}")

engine.quit()  # free stockfish engine

Training examples: 8143
Testing examples: 1603


In [None]:
#creating model

def create_model():
    input_layer = Input(shape=(8, 8, 12))
    
    # convolutional layers are excellent at learning spatial patterns
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(input_layer)
    x = BatchNormalization()(x) # batch norm is a common pairing because they stabilize gradients and training
    
    # second layer that is able to focus more on the complex features of the board
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    
    # flatten the outputs from convo layers into a 1d array
    x = Flatten()(x) 
    x = Dense(128, activation='relu')(x)  # from 1d array to 128 neurons
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    
    # output: predict best move from 4096 classes
    output = Dense(4096, activation='softmax')(x)
    
    model = Model(inputs=input_layer, outputs=output)
    model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
    loss='sparse_categorical_crossentropy', # alternative to one-hot
    metrics=['accuracy']
    )

    return model

model = create_model()

In [None]:
# Train the model 
history = model.fit(
    X_train, y_train,
    epochs=20, # converges at around 20 epochs or earlier
    batch_size=32,
    validation_data=(X_test, y_test)
)

Epoch 1/500
[1m255/255[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 12ms/step - accuracy: 0.9269 - loss: 0.2451 - val_accuracy: 0.0780 - val_loss: 12.0971
Epoch 2/500
[1m255/255[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 11ms/step - accuracy: 0.9273 - loss: 0.2563 - val_accuracy: 0.0786 - val_loss: 11.9109
Epoch 3/500
[1m255/255[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 12ms/step - accuracy: 0.9182 - loss: 0.2836 - val_accuracy: 0.0742 - val_loss: 11.3052
Epoch 4/500
[1m255/255[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 12ms/step - accuracy: 0.9205 - loss: 0.2637 - val_accuracy: 0.0792 - val_loss: 12.1831
Epoch 5/500
[1m255/255[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 12ms/step - accuracy: 0.9238 - loss: 0.2624 - val_accuracy: 0.0830 - val_loss: 11.9132
Epoch 6/500
[1m 58/255[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m2s[0m 10ms/step - accuracy: 0.9216 - loss: 0.2654

KeyboardInterrupt: 

In [None]:
# run only test set
loss, accuracy = model.evaluate(X_test, y_test, batch_size=32)
print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

[1m51/51[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - accuracy: 0.0810 - loss: 11.5365
Test Loss: 11.959287643432617
Test Accuracy: 0.07610730081796646


In [None]:
# functions to connect to lichess

TOKEN = "lip_El1m7DHHgeoJM3a8prVi"         
BOT_ID = "BuilderRobot"    
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
BASE_URL = "https://lichess.org"
game_colors = {}


#make educated guess
def make_best_move(game_id, board):

    move = choose_best_move(board)
    if move:
        #connect move to Lichess
        uci_move = move.uci()
        url = f"{BASE_URL}/api/bot/game/{game_id}/move/{uci_move}"
        response = requests.post(url, headers=HEADERS)
        print(f"Played move {uci_move} \n") #print chosen move to console
    else: #end of game - checkmate or draw
        print("No legal moves available!")
        
        
#choose a random legal move and play it
def make_random_move(game_id, board):
    legal_moves = list(board.legal_moves)
    
    if legal_moves:
        #connect move to Lichess url
        move = choose_random_move(board)
        uci_move = move.uci()
        url = f"{BASE_URL}/api/bot/game/{game_id}/move/{uci_move}"
        response = requests.post(url, headers=HEADERS)
        print(f"Played move {uci_move} \n") #print chosen move to console
    else: #end of game - checkmate or draw
        print("No legal moves available!")
        

def accept_challenge(challenge_id):
    url = f"{BASE_URL}/api/challenge/{challenge_id}/accept"
    response = requests.post(url, headers=HEADERS)
    print("got challenge")
    
    if response.status_code == 200: #code 200 = good
        print(f"Accepted challenge: {challenge_id}")
    else: #code 404 = not found
        print(f"Failed to accept challenge: {challenge_id} - {response.text}")

# stream game events for a given game
# uses python-chess to maintain the board state
def handle_game(game_id):

    url = f"{BASE_URL}/api/bot/game/stream/{game_id}"
    response = requests.get(url, headers=HEADERS, stream=True)
    board = chess.Board()
    
    for line in response.iter_lines(decode_unicode=True):
        if line:
            try:
                event = json.loads(line)
            except Exception as e:
                print("Error parsing line:", line)
                continue
            
            # when the game starts, record white or black
            if event.get("type") == "gameFull":
                
                white_id = event.get("white", {}).get("id")
                black_id = event.get("black", {}).get("id")
                my_color = "white" if white_id.lower() == BOT_ID.lower() else "black"
                game_colors[game_id] = my_color
                print(f"Game {game_id} started. My color: {my_color}")
                # Process initial moves if there are any.
                moves_str = event.get("state", {}).get("moves", "")
                if moves_str:
                    for move in moves_str.split():
                        board.push_uci(move)
            
            # Update board state with new moves.
            elif event.get("type") == "gameState":
                moves_str = event.get("moves", "")
                board = chess.Board()  # reset board and replay moves
                if moves_str:
                    for move in moves_str.split():
                        board.push_uci(move)
            
            # Check if it's our turn.
            my_color = game_colors.get(game_id)
            # In python-chess, board.turn == True means White's turn.
            if my_color:
                if (board.turn and my_color == "white") or (not board.turn and my_color == "black"):
                    make_best_move(game_id, board)

# listens to the Lichess event stream for incoming challenges
# accepts any challenges
def event_stream():

    url = f"{BASE_URL}/api/stream/event"
    response = requests.get(url, headers=HEADERS, stream=True)
    
    for line in response.iter_lines(decode_unicode=True):
        if line:
            try:
                event = json.loads(line)
            except Exception as e:
                print("Error parsing event stream line:", line)
                continue
            
            event_type = event.get("type")
            
            if event_type == "challenge":
                challenge_id = event["challenge"]["id"]
                print(f"Received challenge: {challenge_id}")
                accept_challenge(challenge_id)
            
            elif event_type == "gameStart":
                game_id = event["game"]["id"]
                print(f"Game started: {game_id}")
                # Launch a separate thread for handling the game.
                threading.Thread(target=handle_game, args=(game_id,), daemon=True).start()


In [None]:
# start the lichess bot and run it in the background
threading.Thread(target=event_stream, daemon=True).start()
print("Lichess bot is running. Waiting for challenges...")

Lichess bot is running. Waiting for challenges...
Received challenge: S6aifkT3
got challenge
Accepted challenge: S6aifkT3
Game started: S6aifkT3
Game S6aifkT3 started. My color: white
Played move c2c3 

Played move g1f3 

Played move h1g1 

Played move d1b3 

Played move b3c4 

Played move c4b4 

Played move b4a5 

Played move c3b4 

Played move g2g3 

Played move g1g2 

Played move d2d4 

Played move h2h3 

Played move e1f1 

Played move f2f3 

Played move a5c7 

Played move c7b8 

Played move b8a8 

Played move a8b7 

Played move b7c8 

Played move c8e6 

Played move e6f5 

Played move f5f6 

Played move f1e1 

Played move e1f1 

Played move e2f3 

Played move f1g1 

Played move g1f2 

Played move f2f1 

Played move f1g2 

Played move b1d2 

Played move d2e4 

Played move g2h1 

Played move h3g4 

Played move g4h5 

Played move e4f6 

Played move h5h6 

Played move a2a3 

Played move h1g2 

Played move g2g3 

Played move g3g2 

Played move h6h7 

Played move g2g3 

Played move g3g2 


In [53]:
#model saves
#model.save("chess_model1.keras")
#model.save("chess_model2.keras") 
model.save("chess_model3.keras")  

In [None]:
# simulate games
from concurrent.futures import ThreadPoolExecutor

@tf.function
def predict_board(board_tensor):
    return model(board_tensor)

def choose_best_move(board):
    # Convert current board state to tensor and add batch dimension.
    board_tensor = board_to_tensor(board)
    board_tensor = np.expand_dims(board_tensor, axis=0)
    
    # Convert the NumPy array to a TensorFlow tensor.
    board_tensor_tf = tf.convert_to_tensor(board_tensor)
    
    # Use the compiled prediction function.
    predictions = predict_board(board_tensor_tf)[0].numpy()  # shape: (4096,)
    
    legal_moves = list(board.legal_moves)
    if not legal_moves:
        return None  # No legal moves available
    
    best_move = None
    best_prob = -1
    for move in legal_moves:
        idx = move_to_index(move)
        prob = predictions[idx]
        if prob > best_prob:
            best_prob = prob
            best_move = move
            
    return best_move

def choose_random_move(board):
    moves = list(board.legal_moves)
    return random.choice(moves)


def simulate_game(white_move_func, black_move_func):
    board = chess.Board()
    while not board.is_game_over():
        if board.turn == chess.WHITE:
            move = white_move_func(board)
        else:
            move = black_move_func(board)
        board.push(move)
    return board.result()

def run_single_game(game_num):
    result = simulate_game(choose_best_move, choose_random_move)
    return result

num_parallel_games = 100
results_parallel = {"1-0": 0, "0-1": 0, "1/2-1/2": 0}

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(run_single_game, i) for i in range(1, num_parallel_games + 1)]
    for future in futures:
        res = future.result()
        results_parallel[res] += 1

print("Final parallel simulation results:")
print(f"Educated bot wins (1-0): {results_parallel['1-0']}")
print(f"Random bot wins (0-1): {results_parallel['0-1']}")
print(f"Draws (1/2-1/2): {results_parallel['1/2-1/2']}")


Final parallel simulation results:
Educated bot wins (1-0): 7
Random bot wins (0-1): 4
Draws (1/2-1/2): 89


In [None]:
#observations 

# show the board (FEN): string representation from top left where lowercase is black, uppercase is white
# R/r = rook, N/n = knight, B/b = bishop, Q/q = queen, K/k = king, P/p = pawn
# board rows are seperated by a /
# non-piece occupied tiles are numbers, multiple blank spaces = higher number
print("FEN:", board.fen())
print(board)

#this model architecture learned to predict 0.5 every time
#suggests model didn't learn complex differences
#losses weighted at 0.5

# takes an input of an 8x8 with 12 channels
# outputs a win probability between 0 and 1 
def create_model():
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=(8, 8, 12)),
        Conv2D(64, (3, 3), activation='relu'),
        Flatten(),
        Dense(64, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

model = create_model()

model.fit(X, y, epochs=5, batch_size=32)

#NEXT MODEL

def create_model2():
    input_layer = Input(shape=(8, 8, 12))
    
    # First convolution block
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(input_layer)
    x = BatchNormalization()(x)
    
    # Second convolution block
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    
    # Flatten and dense layers
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)
    
    # Output layer: Predict win probability (0 to 1)
    output = Dense(1, activation='sigmoid')(x)
    
    model = Model(inputs=input_layer, outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

#model = create_model2()

lr_scheduler = ReduceLROnPlateau(monitor='loss', factor=0.5, patience=3, verbose=1)
model.fit(X, y, epochs=500, batch_size=32, callbacks=[lr_scheduler])

#went 12 - 80 - 8 
# 67% accuracy
# 500 epochs 
# 20,000 games
# included losses

class CyclicLR(Callback):
    def __init__(self, base_lr=0.01, max_lr=0.05, step_size=2000):
        super(CyclicLR, self).__init__()
        self.base_lr = base_lr
        self.max_lr = max_lr
        self.step_size = step_size
        self.iterations = 0

    def on_train_batch_begin(self, batch, logs=None):
        cycle = 1 + self.iterations // (2 * self.step_size)
        x = abs(self.iterations / self.step_size - 2 * cycle + 1)
        new_lr = self.base_lr + (self.max_lr - self.base_lr) * max(0, (1 - x))
        lr = self.model.optimizer.learning_rate
        
        # Update depending on type
        if isinstance(lr, tf.Variable):
            tf.keras.backend.set_value(lr, new_lr)
        else:
            self.model.optimizer.learning_rate = new_lr

        self.iterations += 1

def create_model():
    input_layer = Input(shape=(8, 8, 12))
    
    # Convolutional layers
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(input_layer)
    x = BatchNormalization()(x)
    
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)

    # Fully connected layers
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    # Output layer: predict win probability between 0 and 1
    output = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), 
                  loss='binary_crossentropy', metrics=['accuracy'])
    return model

# went 7 - 86 - 7
# 50% accuracy
# 60 epochs
# 100,000 games
# no losses

# 8 - 89 - 3
# 68% accuracy
# 60 epochs
# 20,000 games
# no losses
# model performs 50% on test set

#model would converge with a dataset of 20,000 games but failed to find any patterns in the 100,000 dataset even after 6 hours of training it 
#was still stuck at 50%

In [52]:
import tensorflow as tf
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available: 0
