In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import random
import math

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        # print(os.path.join(dirname, filename))
        os.path.join(dirname, filename)

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

import time

In [14]:
##############################################
##############################################
#
#        MONTE CARLO TREE SEARCH
#
#############################################
#############################################

globalResult = -1
# This value is changed by isTerminal() function
# If Player 1 wins, globalResult = 1
# If Player 2 wins, globalResult = 0
# If a draw occurs, globalResult = 0.5
# If non terminal, globalResult = -1

# Global variables so they don't need to be passed around
n_cols = 0
n_rows = 0
inarow = 0

class Node :
    """
    Class that captures the information required to store in a node in the Monte Carlo Search Tree
    """
    def __init__ (self, state, player_ID, parent=None, untried_moves=None, move_from_parent=None):
        self.state = state
        self.player_ID = player_ID
        self.parent = parent
        self.children = []
        self.wins = 0
        self.visits = 0
        self.untried_moves = untried_moves
        self.move_from_parent = move_from_parent

        # Determine and store the untried moves
        if untried_moves is None:
            self.untried_moves = getLegalMoves(state)  # Get all possible moves
        else:
            self.untried_moves = untried_moves

def getLegalMoves (state) :
    """
    This function finds all the possible moves that can be made, for a given board configuration

    Args :
        state : The board state

    Returns :
        legal_moves : A list containing the indices of the columns in which a move can be made
    """
    legal_moves = []
    
    for i in range(n_cols) :
        if state[i] == 0 :
            legal_moves.append(i)
            
    return legal_moves

def isTerminal (board) :
    """
    This function checks if a board state is a terminal state ie. win, loss or draw, or not

    Args :
        board : the board state to be checked

    Returns :
        True  : if the state is terminal
        False : if the state is not terminal
    """
    global globalResult
    globalResult = -1 # Set result value to non terminal state value
    
    # ----- CHECK  DRAW ------
    if board.count(1) + board.count(2) == n_cols*n_rows :
        globalResult = 0.5
        return True
    
    # ----- CHECK AND RETURN WINS -----
    # Check in row
    count1 = 0
    count2 = 0
    for row in range(n_rows) :
        for col in range(n_cols) :
            if board[row*n_cols + col] == 1 :
                count1 += 1
                count2 = 0
            elif board[row*n_cols + col] == 2 :
                count2 += 1
                count1 = 0
            else :
                count1 = 0
                count2 = 0

            if count1 == inarow :
                globalResult = 1
                return True
            if count2 == inarow :
                globalResult = 0
                return True
                
        count1 = 0
        count2 = 0

    # Check in column
    for col in range(n_cols) :
        for row in range(n_rows) :
            if board[row*n_cols + col] == 1 :
                count1 += 1
                count2 = 0
            elif board[row*n_cols + col] == 2 :
                count2 += 1
                count1 = 0
            else :
                count1 = 0
                count2 = 0

            if count1 == inarow :
                globalResult = 1
                return True
            if count2 == inarow :
                globalResult = 0
                return True
                
        count1 = 0
        count2 = 0

    # Check in positive diagonal /
    for row in range(n_rows) :
        for col in range(n_cols) :
            # Assume row,col as the starting point and go up and right
            for i in range(inarow) :
                rowc = row-i
                colc = col+i
                if rowc<0 or colc>=n_cols :
                    count1 = 0
                    count2 = 0
                    break
                    
                if board[rowc*n_cols + colc] == 1 :
                    count1 += 1
                    count2 = 0
                if board[rowc*n_cols + colc] == 2 :
                    count2 += 1
                    count1 = 0
                else :
                    count1 = 0
                    count2 = 0
                
                if count1 == inarow :
                    globalResult = 1
                    return True
                if count2 == inarow :
                    globalResult = 0
                    return True

    # Check in negative diagonal \
    for row in range(n_rows) :
        for col in range(n_cols) :
            # Assume row,col as the starting point and go down and right
            for i in range(inarow) :
                rowc = row+i
                colc = col+i
                if rowc>=n_rows or colc>=n_cols :
                    count1 = 0
                    count2 = 0
                    break
                    
                if board[rowc*n_cols + colc] == 1 :
                    count1 += 1
                    count2 = 0
                if board[rowc*n_cols + colc] == 2 :
                    count2 += 1
                    count1 = 0
                else :
                    count1 = 0
                    count2 = 0

                if count1 == inarow :
                    globalResult = 1
                    return True
                if count2 == inarow :
                    globalResult = 0
                    return True

    # Non terminating state
    return False

def isLeaf (current_node) :
    """
    This function determines whether a given game state is a leaf node in the MCST.
    There are two conditions in which a node may be a leaf node: terminal game state or non-zero untried moves

    Args :
        game_state : the current board state

    Returns :
        A boolean value which is True if the node is a leaf node, and False otherwise
    """
    return isTerminal(current_node.state) or current_node.untried_moves

def calculateUCB1 (node) :
    exploration_parameter = 1.4
    if node.visits == 0:
        return float('inf')  # Prioritize unvisited nodes
    return node.wins/node.visits + (exploration_parameter*math.sqrt(math.log(node.parent.visits)/node.visits))

def select_node(root_node):
    """
    This function implements the Selection phase of the MCTS.
    It traverses through the MCST using the highest UCB score of the children.

    Args :
        root_node : the game state from which the search begins

    Returns :
        current_node : the first encountered node which is terminal or has untried moves
    """
    current_node = root_node

    # Traverse until a node with untried moves or a terminal state is found
    while not isLeaf(current_node):
        best_child = None
        max_ucb_score = -1  # Initialize with a very low value

        for child in current_node.children:
            ucb_score = calculateUCB1(child)
            if ucb_score > max_ucb_score:
                max_ucb_score = ucb_score
                best_child = child
        
        # Move down to the selected child node
        current_node = best_child
    
    return current_node

def playMove (og_state, col, player_ID) :
    """
    This function plays a move on a board and returns the new board state

    Args :
        state : the board on which the move needs to be made
        col : the column in which the move is to be played
        player_ID : the ID of the player who is making the move

    Returns :
        new_board : the state of the board after the move is played on the previous board
    """
    if og_state[col] != 0 : # column already filled. Error!
        return

    state = og_state.copy()
    for row in range(n_rows) :
        if state[row*n_cols + col] != 0 :
            state[(row-1)*n_cols + col] = player_ID
            return state

    state[(n_rows-1)*n_cols + col] = player_ID
    return state

def expand (parent_node) :
    """
    This function expands a non-terminal leaf node by adding a new node corresponding to an untried move from the input state

    Args :
        node : the non-terminal leaf node from which the expansion must occur

    Returns :
        new_node : the newly added node from which rollout will now occur
    """
    # Try the first untried move and remove it from the list of untried moves
    move = parent_node.untried_moves.pop(0)
    # Play the move to get the new board state
    new_state = playMove(parent_node.state, move, parent_node.player_ID)
    # Create the expanded node
    new_node = Node(new_state, 3-parent_node.player_ID, parent=parent_node, move_from_parent=move)
    # Add this new node to the parent's children
    parent_node.children.append(new_node)

    return new_node

def rollout (node) :
    """
    This function simulates a game randomly from a given state

    Args :
        state : the starting state of the simulation

    Returns :
        the value to be backpropagated up the tree
    """

    current_board = node.state.copy()
    current_player = node.player_ID

    # Loop until the game is over
    while not isTerminal(current_board) :
        # Get all legal moves from the current board state
        legal_moves = getLegalMoves(current_board)

        # Check for a draw if no legal moves are left
        if not legal_moves:
            return 0.5 # Return 0.5 for a draw
        
        # Choose a random legal move
        move = random.choice(legal_moves)
        
        # Play the move and update the board
        current_board = playMove(current_board, move, current_player)

        # Switch to the other player for the next turn
        current_player = 3 - current_player # This works for players 1 and 2

    # Once the loop ends, the game is over. Determine the result set by isTerminal
    result = globalResult

    return result # Trying this. I think this should be correct. 
    # The POV logic should be resolved in backropagate function

    # Return the result from the perspective of the *starting* player of the rollout
    # This requires careful handling of whose turn it was when the win occurred
    if (result == 1 and current_player == 1) or (result == 0 and current_player == 2): # Win from POV of starting player
        return 1
    elif result == 0.5:
        return 0.5 # Draw
    else:
        return 0 # Loss

def backpropagate(node, result):
    """
    Propagates the simulation result back up the tree to the root.

    Args:
        node: The leaf node from which the simulation was started.
        result: The result of the simulation (1 for win, 0 for draw, -1 for loss).
    """
    current_node = node
    
    # Iterate as long as the current node has a parent
    while current_node is not None:
        # Increment the visit count for the current node
        current_node.visits += 1
        
        # Add the result to the win count.
        # This handles wins (1), losses (0), and draws (0.5)
        if (result==1 and current_node.player_ID==2) or (result==0 and current_node.player_ID==1):
            current_node.wins += 1
        else:
            current_node.wins += 0.5
        
        # Move up to the parent node for the next iteration
        current_node = current_node.parent

def MCTS (observation, configuration):
    """
    This function is called by the bot every time it is the bot's turn to play
    It uses Monte Carlo Tree Search to find the best move for the bot to play

    Args:
        observation: It is the dictionary that contains the current game state
            observation['mark'] : The current player's ID
            observation['board']: 1-D list representing the game board
                                  0 : empty slot
                                  1 : Player 1
                                  2 : PLayer 2
        configuration: It is a dictionary containing the rules and dimensions of the game
            configuration['columns'] : The number of columns on the board
            configuration['rows']    : The number of rows on the board
            configuration['inarow']  : The number of pieces in a row needed to win

    Output:
        column_to_play: It is the 0-based index of the column in which the bot wants to play its move
    """
    
    start_time = time.time()
    TIME_LIMIT = 1.95
    
    global n_rows, n_cols, inarow
    n_rows = configuration['rows']
    n_cols = configuration['columns']
    inarow = configuration['inarow']

    board = observation['board']
    player_ID = observation['mark']

    root_node = Node(board, player_ID)
    current_node = root_node

    column_to_play = -1

    while time.time() - start_time < TIME_LIMIT :
        """
        # PHASE 1 : SELECTION
        This phase navigates the tree from the root node (the current game state) down to a leaf node. 
        At each step, the algorithm selects the child node with the highest Upper Confidence Bound 1 (UCB1) score.
        """
        current_node = select_node(root_node)
    
        """
        PHASE 2 : EXPANSION
        Now current_state is at a leaf node.
        If it is not a terminal state (i.e., the game hasn't ended) but has untried moves, the algorithm expands the tree. 
        It creates a new child node for one of the untried moves, and that new node becomes the starting point for the next phase.
        """
        if current_node.untried_moves:
            current_node = expand(current_node)
    
        """
        PHASE 3 : ROLLOUT
        From the newly expanded node, the algorithm performs a random playout or rollout. 
        It plays a full game from this state by making random, legal moves for both players until the game reaches a terminal state (win, loss, or draw).
        """
        result = rollout(current_node)
    
        """
        PHASE 4 : BACKPROPAGATION
        Once the simulation is complete, the result (win, loss, or draw) is propagated back up the tree. 
        The algorithm updates the visits and wins statistics for every node on the path from the newly simulated node all the way back to the root node.
        """
        backpropagate(current_node, result)

    """
    MAKING THE MOVE
    After the MCTS loop has run for the allocated time or number of iterations, the bot needs to make a final decision. 
    The best move is simply the one corresponding to the child node of the root with the highest visit count. 
    A high visit count means the MCTS algorithm spent a lot of time exploring that branch, indicating it's the most promising path.
    """
    max_child_visits = 0
    if root_node.children:
        for child in root_node.children:
            if child.visits > max_child_visits:
                max_child_visits = child.visits
                best_child = child

        if best_child:
            column_to_play = best_child.move_from_parent

    # If no children were created, fall back to a random move
    else:
        print("Randomly selecting child")
        legal_moves = getLegalMoves(board)
        if legal_moves:
            column_to_play = random.choice(legal_moves)

    # print(f"Playing move in {time.time()-start_time} seconds")

    """Thanks, Gemini! for the explanations!!"""
    return column_to_play

In [15]:
def my_agent(observation, configuration):
    # return minimaxAlgorithm(observation, configuration) # Minimax Algorithm with alpha-beta pruning
    return MCTS(observation, configuration) # Monte Carlo Search Tree

In [16]:
from kaggle_environments import make, evaluate

# Create a Connect X environment
env = make("connectx", debug=True)

# Run a single game
env.run([my_agent, "negamax"])

# Render the final state of the board and the game replay
env.render(mode="ipython")