https://joshvarty.github.io/AlphaZero/

In [None]:
%run Board.ipynb
%run NNet_architecture.ipynb

In [None]:
import numpy as np
from collections import defaultdict

In [None]:
class MonteCarloTreeSearchNode():
    
    def __init__(self, model, board, prior, parent=None, parent_action=None, name=None):
        self.model = model
        self.board = board
        self.prior = prior
        self.parent = parent
        self.name = name
        self.parent_action = parent_action
        self.children = {}
        self._number_of_visits = 0
        self.value_sum = 0

        self._results = defaultdict(int)
        self._results[0] = 0
        self._results[1] = 0
        self._results[-1] = 0

            
    def expand(self, possible_policy):
        """ Expand the node with all children with a positive probability, the policy is obtained by the nn"""
                
        for i, proba in enumerate(possible_policy):
            if proba != 0:
                
                next_board = self.board.move(i) 
                self.children[i] = MonteCarloTreeSearchNode(model=model, board=next_board, prior=proba, parent=self)
                

    def simulate(self, nb_simulation):
        """ Simulate i path"""
        
        
        if self.children == {}:
            
            policy, value = self.model.predict(self.board.board_state.reshape((1, 5, 5, 10)))
            possible_policy = self.board.get_legal_moves(policy[0]).flatten()
            
            self.expand(possible_policy)
        for _ in range(nb_simulation):
                        
            node_to_expand = self.max_UCB() # child that maximize UCB

            value = node_to_expand.board.get_reward_for_player() 

            if value is None:
                # if the game has not ended we expand 

                policy, value = node_to_expand.model.predict(node_to_expand.board.board_state.reshape((1, 5, 5, 10)))
                possible_policy = node_to_expand.board.get_legal_moves(policy[0]).flatten()
                
                node_to_expand.expand(possible_policy)
            
            node_to_expand.backpropagate(value)
    
        return self
    
    def backpropagate(self, value):
        self._number_of_visits += 1.
        self.value_sum += value
        if self.parent:
            self.parent.backpropagate(-value)

#######################################################################
   
    def max_UCB(self):
        """Return the node to expand, the one that maximize UCB"""
        
        current_node = self
        while current_node.children:
            current_node = current_node.best_child()
            current_node.max_UCB()

        return current_node 

    def best_child(self, c_param=0.2):
        """return child that maximize UCB"""

        # C_param is the exploration rate it's supposed to grow slowly with search time
        # Mean action_value + C_param * Prior * sqrt(parent visit count) / (1 + visit count)
        # The value of the child is from the perspective of the opposing player

        child_value = [(action, -child.mean_value() + c_param * child.prior * np.sqrt(child.parent._number_of_visits) / (child._number_of_visits + 1))
                       for action, child in self.children.items()]
    
        action, best_child = max(child_value, key = lambda x:x[1])
        return self.children[action]
    
    def mean_value(self):
        if self._number_of_visits == 0:
            return 0
        return self.value_sum / self._number_of_visits

#######################################################################
    
    def __repr__(self):
        """
        Debugger pretty print node info
        """
        prior = "{0:.2f}".format(self.prior)
        return "{} Prior: {} Count: {} Value: {}".format(self.board.__str__(), prior, self._number_of_visits, self.mean_value())

In [None]:
new_board = Board(deck)

In [None]:
root = MonteCarloTreeSearchNode(model, new_board, prior=0)

In [None]:
root.simulate(800)

In [None]:
root.children

In [None]:
import math
import numpy as np


def ucb_score(parent, child):
    """
    The score for an action that would transition between the parent and child.
    """
    prior_score = child.prior * math.sqrt(parent.visit_count) / (child.visit_count + 1)
    if child.visit_count > 0:
        # The value of the child is from the perspective of the opposing player
        value_score = -child.value()
    else:
        value_score = 0

    return value_score + prior_score


class Node:
    def __init__(self, prior, to_play):
        self.visit_count = 0
        self.to_play = to_play
        self.prior = prior
        self.value_sum = 0
        self.children = {}
        self.state = None

    def expanded(self):
        return len(self.children) > 0

    def value(self):
        if self.visit_count == 0:
            return 0
        return self.value_sum / self.visit_count

    def select_action(self, temperature):
        """
        Select action according to the visit count distribution and the temperature.
        """
        visit_counts = np.array([child.visit_count for child in self.children.values()])
        actions = [action for action in self.children.keys()]
        if temperature == 0:
            action = actions[np.argmax(visit_counts)]
        elif temperature == float("inf"):
            action = np.random.choice(actions)
        else:
            # See paper appendix Data Generation
            visit_count_distribution = visit_counts ** (1 / temperature)
            visit_count_distribution = visit_count_distribution / sum(visit_count_distribution)
            action = np.random.choice(actions, p=visit_count_distribution)

        return action

    def select_child(self):
        """
        Select the child with the highest UCB score.
        """
        best_score = -np.inf
        best_action = -1
        best_child = None

        for action, child in self.children.items():
            score = ucb_score(self, child)
            if score > best_score:
                best_score = score
                best_action = action
                best_child = child

        return best_action, best_child

    def expand(self, state, to_play, action_probs):
        """
        We expand a node and keep track of the prior policy probability given by neural network
        """
        self.to_play = to_play
        self.state = state
        for a, prob in enumerate(action_probs):
            if prob != 0:
                self.children[a] = Node(prior=prob, to_play=self.to_play * -1)

    def __repr__(self):
        """
        Debugger pretty print node info
        """
        prior = "{0:.2f}".format(self.prior)
        return "{} Prior: {} Count: {} Value: {}".format(self.state.__str__(), prior, self.visit_count, self.value())


class MCTS:


    def run(self, model, state, board, to_play, nb_simu):

        root = Node(0, to_play)

        # EXPAND root
        
        policy, value = model.predict(state.board.board_state.reshape((1, 5, 5, 10)))
        possible_policy = state.board.get_legal_moves(policy[0]).flatten()
        
        root.expand(state, to_play, possible_policy)

        for _ in range(nb_simu):
            node = root
            search_path = [node]

            # SELECT
            while node.expanded():
                action, node = node.select_child()
                search_path.append(node)

            parent = search_path[-2]
            state = parent.state
            # Now we're at a leaf node and we would like to expand
            # Players always play from their own perspective
            next_state = board.move(action=action)
            # Get the board from the perspective of the other player
            # The value of the new state from the perspective of the other player
            value = next_state.get_reward_for_player()
            if value is None:

                # If the game has not ended:
                # EXPAND
                
                policy, value = model.predict(next_state.board_state.reshape((1, 5, 5, 10)))
                possible_policy = next_state.get_legal_moves(policy[0]).flatten()
                
                node.expand(next_state, parent.to_play * -1, possible_policy)

            self.backpropagate(search_path, value, parent.to_play * -1)
            
        return root

    def backpropagate(self, search_path, value, to_play):
        """
        At the end of a simulation, we propagate the evaluation all the way up the tree
        to the root.
        """
        for node in reversed(search_path):

            node.value_sum += value if node.to_play == to_play else -value
    
            node.visit_count += 1

In [None]:
test = MCTS()

In [None]:
# a = test.run(model, root, root.board, 1, nb_simu=80)
# a.children