In [14]:
import numpy as np
import math
from simplified_connectx import *
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
temp = SimplifiedConnectX()

In [11]:
def monte_carlo_tree_search(root):
     
    while resources_left(time, computational power):
        leaf = traverse(root)
        simulation_result = rollout(leaf)
        backpropagate(leaf, simulation_result)
         
    return best_child(root)
 
    # function for node traversal
    def traverse(node):
        while fully_expanded(node):
            node = best_uct(node)

        # in case no children are present / node is terminal
        return pick_unvisited(node.children) or node

    # function for the result of the simulation
    def rollout(node):
        while non_terminal(node):
            node = rollout_policy(node)
        return result(node)

    # function for randomly selecting a child node
    def rollout_policy(node):
        return pick_random(node.children)

    # function for backpropagation
    def backpropagate(node, result):
        if is_root(node) return
        node.stats = update_stats(node, result)
        backpropagate(node.parent)

    # function for selecting the best child
    # node with highest number of visits
    def best_child(node):
        pick child with highest number of visits

SyntaxError: invalid syntax (481565713.py, line 3)

In [4]:
# Chat GPT code
import random

class MCTS:
    def __init__(self, game, iterations):
        self.game = game
        self.iterations = iterations

    def search(self, state):
        root = Node(state)

        for i in range(self.iterations):
            node = root
            # Select a leaf node to expand
            while not node.is_leaf():
                node = self.select(node)

            # Expand the leaf node and simulate a game from it
            if not node.is_terminal():
                node = self.expand(node)
                reward = self.simulate(node.state)
            else:
                reward = node.state.utility()

            # Backpropagate the reward up the tree
            while node is not None:
                node.update(reward)
                node = node.parent

        # Return the child with the highest number of visits
        return self.select_best_child(root)

    def select(self, node):
        # Select the child with the highest UCB1 score
        return max(node.children, key=lambda c: c.ucb1())

    def expand(self, node):
        # Add a random child to the node
        child_state = random.choice(node.state.children())
        child = Node(child_state, node)
        node.add_child(child)
        return child

    def simulate(self, state):
        # Play a random game from the current state and return the reward
        while not state.is_terminal():
            state = random.choice(state.children())
        return state.utility()

    def select_best_child(self, node):
        # Return the child with the highest number of visits
        return max(node.children, key=lambda c: c.visits)

class Node:
    def __init__(self, state, parent=None):
        self.state = state
        self.parent = parent
        self.children = []
        self.visits = 0
        self.total_reward = 0

    def is_leaf(self):
        return len(self.children) == 0

    def is_terminal(self):
        return self.state.is_terminal()

    def add_child(self, child):
        self.children.append(child)

    def update(self, reward):
        self.visits += 1
        self.total_reward += reward

    def ucb1(self):
        # Calculate the UCB1 score for this node
        if self.visits == 0:
            return float('inf')

        exploration_term = math.sqrt(2 * math.log(self.parent.visits) / self.visits)
        return self.total_reward / self.visits + exploration_term

##### In MCTS we store 1 tree

In [44]:
class my_mcts():
    '''
    Implementation of MCTS algorithm based on the lecture 9
    from David Silver's course on youtube.
    
    Before leaf node is found, we run UCB, and after that we run
    the current policy.
    
    Leaf node is defined as a node which has not been seen by the UCB algorithm,
    Each time a node is found which is not seen, it gets added into the UCB
    tree (which is S_counts).
    '''
    def __init__(self,cur_state,nnet,args):
        '''
        state - It is the root state (in this case the game (which is a wrapper over
        the simplifiedconnectx gym environment))
        cur_policy - This is the policy which is run when the leaf node is found
        args - this has arguments like exploration rate, num_mcts_sims
        
        '''
        self.args = args
        self.game = cur_state
        #This is the dictionary which keeps track of the score for all states and actions
        self.SA_score = {}
        #This stores the state action pair counts
        self.SA_counts = {}
        # This is the dictionary which counts the number of times a state is visited
        self.S_counts = {}
        # This stores the total number of times a simulation has been run
        self.total_counts = 0
        # This is the exploration constant, which governs how much we should explore
        # Higher c means higher exploration
        self.c = args["c"]
        # This could be a NN or any policy. The input would be the board
        # The output should be a probability distribution of moves
        self.nnet = nnet
        
    def getActionProb(self,game,opt_player):
        '''
        This function runs all the simulations of the MCTS starting
        from the root node and then returns the final prob distribution
        
        arguments - 
        game: this is the current state of the board
        opt_player: this is the player to optimize for
        '''
        for i in range(self.args["num_mcts_sims"]):
            print("inside mcts, iteration number",i)
            self.search(game.create_copy(),opt_player)
        
        s = game.stringRepresentation()
        # Lets get the scores of children of the current state s 
        prob = [self.SA_score[(s,a)] if (s,a) in self.SA_score else 0\
                for a in range(game.getActionSize())]
        # Now we need to normalize these scores
        total = sum(prob)
        
        if total == 0:
            # should never reach here
            print("There divide by zero in getActionProb")
        # Normalizing the probabilities
        prob = [x/total for x in prob]
        
        # The value of being the state s is 
        # following the policy after all searches, what is the value
        # Lets make this optimistic, which means that the current state is 
        # as good as the next best state and so
        index_max = max(range(len(prob)), key=prob.__getitem__)
        v = self.SA_score[(s,index_max)] / self.SA_counts[(s,index_max)]
        
        print("The probabilities for state S are :", prob)
        print("The value of being in state S: ", v)
        
        return prob,v
        
    def search(self,game,opt_player):
        '''
        This actually runs 1 episode from the total number of simulations
        
        If we are not at the leaf node, we should use UCB to pick which node to expand,
        If we are at the leaf node, we should use the current policy to expand the tree
        
        '''
        s = game.stringRepresentation()
        
        
        #check if the game has ended
        game_result = game.getGameEnded(opt_player)
        if game_result != 0:
            # The game has ended
            return game_result
        #check for leaf node
        if s not in self.S_counts: # If this is true then this is a leaf node
            # Now we will play a full game from here, using the NN and record the result
            canonicalBoard = game.getCanonicalForm()
            q_score,v_score = self.nnet.predict(canonicalBoard)
            valids = game.getValidMoves()
            q_score = q_score * valids  # masking invalid moves
            sum_Ps_s = np.sum(q_score)
            if sum_Ps_s > 0:
                # q_score represents the prob distribution of our NN
                q_score /= sum_Ps_s  # renormalize
            else:
                # if all valid moves were masked make all valid moves equally probable

                # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.   
                log.error("All valid moves were masked, doing a workaround.")
                q_score = q_score + valids
                q_score /= np.sum(q_score)
            
            # Lets choose a move from the prob distribution of this NN
            action = np.random.choice(game.getActionSize(),p = q_score)
            
            nest_s, reward, done, info = game.getNextState(action)
            result = self.search(game,opt_player)
            
            if result != 0:
                self.SA_score[(s, action)] = result
                self.SA_counts[(s, action)] = 1
                self.S_counts[s] = 1
                return result
            
        else: # not a leaf node
            # We have to apply UCB as this has been visited before
            best_action = None
            best_action_score = -float("inf")
            valids = game.getValidMoves()
            for action,move in enumerate(valids):
                # action is valid
                if move != 0:
                    #check if we have visited this action
                    if (s,action) in self.SA_score:
                        UCB_score = self.SA_score[(s,action)] + self.c*(math.sqrt(self.total_counts))/(self.SA_counts[(s,action)])
                    else:
                        # This is max because self.SA_counts[(s,action)] = 0 which is the denominator
                        UCB_score = float("inf")
                    if UCB_score > best_action_score:
                        best_action_score = UCB_score
                        best_action = action
            
            # Now that we have the best action, lets move it and continue
            nest_s, reward, done, info = game.getNextState(best_action)
            result = self.search(game,opt_player)
            self.S_counts[s] += 1
            # updating the score for the current state
            if (s, best_action) in self.SA_score:
                self.SA_score[(s, best_action)] = (self.SA_counts[(s, best_action)] * self.SA_score[(s, best_action)] + result) / (self.SA_counts[(s, best_action)] + 1)
                self.SA_counts[(s, best_action)] += 1
            else:
                self.SA_score[(s, best_action)] = result
                self.SA_counts[(s, best_action)] = 1
            #passing the result to the parent
            return result


##### MCTS Variant, instead of using UCB_score to be infinite and forcing the 
##### Algorithm to explore the whole space, we can "guide" it with NN predictions
##### example p*(math.sqrt(self.total_counts))/(self.SA_counts[(s,action)])
##### Where p is the probability of picking that action according to the NN.

In [51]:
import logging

import coloredlogs

from Coach import Coach
from Game import Game
from NeuralNet import NeuralNet as nn
from utils import *

In [52]:
# arguments
args = {
    "load_model": False, # Whether to use a pretrained network 
    "load_folder_file":None, # 
    "checkpoint":"./temp/", # stores checkpoints here
    "num_mcts_sims":20,
    "c":0.1,
    "num_iters":50, # number of times the NN is updated
    "maxlenOfQueue": 200000,
    "numItersForTrainExamplesHistory":20,
    "numEps":20
}


In [53]:
# Main cell to run the algorithm
g = Game()
nnet = nn(g)
c = Coach(g, nnet, args)
c.learn()

Self Play:   0%|                                                                                                                                                                                                                                                  | 0/20 [00:00<?, ?it/s]

Currently on move  1
inside mcts, iteration number 0
inside mcts, iteration number 1
inside mcts, iteration number 2
inside mcts, iteration number 3
inside mcts, iteration number 4
inside mcts, iteration number 5
inside mcts, iteration number 6
inside mcts, iteration number 7
inside mcts, iteration number 8
inside mcts, iteration number 9
inside mcts, iteration number 10
inside mcts, iteration number 11
inside mcts, iteration number 12
inside mcts, iteration number 13
inside mcts, iteration number 14
inside mcts, iteration number 15
inside mcts, iteration number 16
inside mcts, iteration number 17
inside mcts, iteration number 18
inside mcts, iteration number 19


Self Play:   0%|                                                                                                                                                                                                                                                  | 0/20 [00:07<?, ?it/s]

The probabilities for state S are : [0.21666666666666667, 0.21666666666666667, 0.21666666666666667, -0.0, -0.08333333333333334, 0.21666666666666667, 0.21666666666666667]
The value of being in state S:  -1.0





ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.