# Study notes

Alphazero on a game "5 in a Row"

```
 initialize neural network
 for each iteration i do:
    initialize s0
    for each step t until termination step T do:
        while computational resource remains do:
            get_move_probs() [check notes in MCTSplayer]
        end
        play: get_action()
    end
    
    score the game to give a final reward
    
    for each step t in last game do:
        z <-- +,- rT
           store data as (st,pit.zt)
    end
    
    sample data (s,pi,z) uniformly among all time-steps of the last
    iterations of self-play
    
    adjust the neural network (p,v) = f(s)
    minimize: v and z
    maximize: p and pi
end
        
```

In [23]:
from __future__ import print_function
import numpy as np
import copy

# Game: 5 in a row 五子棋

In [None]:
class Board(object):
    """
    board for the game
    """

    def __init__(self, **kwargs):
        self.width = int(kwargs.get('width', 8))
        self.height = int(kwargs.get('height', 8))
        self.states = {} # board states, key:move as location on the board, value:player as pieces type
        self.n_in_row = int(kwargs.get('n_in_row', 5)) # need how many pieces in a row to win
        self.players = [1, 2] # player1 and player2
        
    def init_board(self, start_player=0):
        if self.width < self.n_in_row or self.height < self.n_in_row:
            raise Exception('board width and height can not less than %d' % self.n_in_row)
        self.current_player = self.players[start_player]  # start player        
        self.availables = list(range(self.width * self.height)) # available moves 
        self.states = {} # board states, key:move as location on the board, value:player as pieces type
        self.last_move = -1

    def move_to_location(self, move):
        """       
        3*3 board's moves like:
        6 7 8
        3 4 5
        0 1 2
        and move 5's location is (1,2)
        """
        h = move  // self.width
        w = move  %  self.width
        return [h, w]

    def location_to_move(self, location):
        if(len(location) != 2):
            return -1
        h = location[0]
        w = location[1]
        move = h * self.width + w
        if(move not in range(self.width * self.height)):
            return -1
        return move

    def current_state(self): 
        """return the board state from the perspective of the current player
        shape: 4*width*height"""
        
        square_state = np.zeros((4, self.width, self.height))
        if self.states:
            moves, players = np.array(list(zip(*self.states.items())))
            move_curr = moves[players == self.current_player]
            move_oppo = moves[players != self.current_player]                           
            square_state[0][move_curr // self.width, move_curr % self.height] = 1.0
            square_state[1][move_oppo // self.width, move_oppo % self.height] = 1.0   
            square_state[2][self.last_move //self.width, self.last_move % self.height] = 1.0 # last move indication   
        if len(self.states)%2 == 0:
            square_state[3][:,:] = 1.0
        return square_state[:,::-1,:]

    def do_move(self, move):
        self.states[move] = self.current_player
        self.availables.remove(move)
        self.current_player = self.players[0] if self.current_player == self.players[1] else self.players[1] 
        self.last_move = move

    def has_a_winner(self):
        width = self.width
        height = self.height
        states = self.states
        n = self.n_in_row

        moved = list(set(range(width * height)) - set(self.availables))
        if(len(moved) < self.n_in_row + 2):
            return False, -1

        for m in moved:
            h = m // width
            w = m % width
            player = states[m]

            if (w in range(width - n + 1) and
                len(set(states.get(i, -1) for i in range(m, m + n))) == 1):
                return True, player

            if (h in range(height - n + 1) and
                len(set(states.get(i, -1) for i in range(m, m + n * width, width))) == 1):
                return True, player

            if (w in range(width - n + 1) and h in range(height - n + 1) and
                len(set(states.get(i, -1) for i in range(m, m + n * (width + 1), width + 1))) == 1):
                return True, player

            if (w in range(n - 1, width) and h in range(height - n + 1) and
                len(set(states.get(i, -1) for i in range(m, m + n * (width - 1), width - 1))) == 1):
                return True, player

        return False, -1

    def game_end(self):
        """Check whether the game is ended or not"""
        win, winner = self.has_a_winner()
        if win:
            return True, winner
        elif not len(self.availables):#            
            return True, -1
        return False, -1

    def get_current_player(self):
        return self.current_player

# Game server

In [25]:
class Game(object):
    def __init__(self, board, **kwargs):
        self.board = board
    
    def graphic(self, board, player1, player2):
        """
        Draw the board and show game info
        """
        width = board.width
        height = board.height

        print("Player", player1, "with X".rjust(3))
        print("Player", player2, "with O".rjust(3))
        print()
        for x in range(width):
            print("{0:8}".format(x), end='')
        print('\r\n')
        for i in range(height - 1, -1, -1):
            print("{0:4d}".format(i), end='')
            for j in range(width):
                loc = i * width + j
                p = board.states.get(loc, -1)
                if p == player1:
                    print('X'.center(8), end='')
                elif p == player2:
                    print('O'.center(8), end='')
                else:
                    print('_'.center(8), end='')
            print('\r\n\r\n')
    
    def start_play(self, player1, player2, start_player=0, is_shown=1):
        """
        start a game between two players
        """
        if start_player not in (0,1):
            raise Exception('start_player should be 0 (player1 first) or 1 (player2 first)')
        # Who plays first
        self.board.init_board(start_player)
        # set up two players [1,2]
        p1, p2 = self.board.players
        player1.set_player_ind(p1)
        player2.set_player_ind(p2)
        players = {p1:player1, p2:player2}
        if is_shown:
            self.graphic(self.board, player1.player, player2.player)
        while(1):
            current_player = self.board.get_current_player()
            #players[1] --> player1, 2 --> player2
            player_in_turn = players[current_player]
            move = player_in_turn.get_action(self.board)
            self.board.do_move(move)
            if is_shown:
                self.graphic(self.board, player1.player, player2.player)
            end, winner = self.board.game_end()
            if end:
                if is_shown:
                    if winner != -1:
                        print("Game end. Winner is", players[winner])
                    else:
                        print("Game end. Tie")
                return winner


## Markov Chain Search Tree class

Soft max function: I am confused with np.max() operation here.
$$softmax(x) = \frac{e^{x_i}}{\sum_{j}e^{x_j}}$$

In [10]:
def softmax(x):
    probs = np.exp(x - np.max(x))
    probs /= np.sum(probs)
    return probs

## Define a tree node class

Each node in MCTS tree should contain:

Q(s,a): an action value {from state s to take action a}.

P(s,a): a prior probability to take action a at state s.

N(s,a): number of visits from state s to take action a through MCST process.

u(s,a): vist-count-adjusted score in AlphaGo Zero $$\frac{P(s,a)}{1+N(s,a)}$$
here the author define u(s,a) as:
$$\frac{\alpha P(s,a)\sqrt{(N(s,a)}}{1+N(s,a)}$$

MCTS will select an action which returns the maximum Q(s,a) + u(s,a)
among all actions.


In [1]:
class TreeNode(object):
    """A node in the MCTS tree. Each node keeps track of its own value Q.
    prior probability P, and its visit-count-adjusted prior score u.
    """
    
    def __init__(self, parent, prior_p):
        self._parent = parent
        self._children = {} # a map from action to tree node
        self._n_visits = 0
        self._Q = 0
        self._u = 0
        self._P = prior_p
    
    def expand(self, action_priors):
        """Expand tree by creating new children.
        action_priors -- output from policy function - a list of tuples
        of actions and their prior probability according to the policy
        function.
        policy function f(theta) will be trained by a convolutional
        neural network.
        """
        for action, prob in action_priors:
            if action not in self._children:
                self._children[action] = TreeNode(self, prob)
        
    def select(self, c_puct):
        """Select action among children that gives maximum aciton value,
        Q(s,a) + U(s,a).
        Returns:
        A tuple of (action, next_node)
        """
        return max(self._children.items(), 
                   key=lambda act_node: act_node[1].get_value(c_puct))
    
    def update(self, leaf_value):
        """Update node Q values from leaf evaluation.
        Arguments:
        leaf_value -- the Q value of subtree evaluation from the current
        player's perspective.
        """
        # Count visit
        self._n_visits += 1
        # Update Q, a running average of values for all visits
        self._Q += 1.0*(leaf_value - self._Q) / self._n_visits
        
    def update_recursive(self, leaf_value):
        """Like a call to update(). but applied recursively for all
        ancestors.
        """
        # if it is not root, this node's parent should be updated first
        if self._parent:
            self._parent.update_recursive(-leaf_value)
        self.update(leaf_value)
    
    def get_value(self, c_puct):
        """Calculate and return the Q+U for this node: a combination
        of leaft evaluations, Q and U.
        c_punct -- a number in (0, inf) controlling the relative impact
        of values, Q, and prior probability, P, on this node's score.
        """
        self._u = c_puct * self._P * np.sqrt(self._parent._n_visits)/
        (1 + self._n_visits)
        return self._Q + self._u
    
    def is_leaf(self):
        """Check if it is the leaf node
        """
        return self._children == {}
    
    def is_root(self):
        """Check if it is the parent node
        """
        return self._parent is None
    

SyntaxError: unexpected EOF while parsing (<ipython-input-1-60b96278b930>, line 35)

## Monte Carlo Tree Search

### get_move_probs()
act_probs = $$softmax(\frac{1}{temp}*log(N+1e-10))$$

In [17]:
class MCTS(object):
    """A simple implementation of Monte Carlo Tree Search
    """
    def __init__(self, policy_value_fn, c_puct=5, n_playout=10000):
        """Arguments:
        policy_value_fn -- a function that takes in a board state and outputs
        a list of (action, probability) tuples and also a score in [-1,1] (i.e.
        the expected value of the end game score from the current players's
        perspective) for the current player.
        c_puct -- a number in (0, inf) that controls how quickly exploration
        converges to the maximum-value policy, where a higher value means relying
        on the prior more
        """
        self._root = TreeNode(None, 1.0)
        self._policy = policy_value_fn
        self._c_puct = c_puct
        self._n_playout = n_playout
    
    def _playout(self, state):
        """Run a single playout from the root to the leaf, getting a value at
        the leaf and propagating it back through its parents. State is modified
        in-place, so a copy must be provided.
        Arguments:
        state -- a copy of the state.
        """
        node = self._root
        while(1):
            if node.is_leaf():
                break
            # Greedily select next move.
            # choose Max Q + U action
            action, node = node.select(self._c_puct)
            #do_move in game class
            state.do_move(action)
            
        # Evaluate the leaf using a network which outputs a list of (action, probability)
        # tuples p and also a score v in [-1,1] for the current player.
        action_probs, leaf_value = self._policy(state)
        # Check for end of game
        end, winner = state.game_end()
        if not end:
            # expand tree by creating new children
            node.expand(action_probs)
        else:
            # for end state, return the true leaf_value
            if winner == -1: #tie
                leaf_value = 0.0
            else:
                leaf_value = 1.0 if winner == state.get_current_player() else -1.0
        
        # Update value and visit count of nodes in this traversal
        node.update_recursive(-leaf_value)
    
    def get_move_probs(self, state, temp=1e-3):
        """Run all playouts sequentially and returns the available action and
        their corresponding probabilities
        Arguments:
        state -- the current state, including both game state and the current player
        temp -- temperature parameter in (0,1] that controls the level of exploration
        Returns:
        the available actions and the corresponding probabilities
        """
        for n in range(self._n_playout):
            state_copy = copy.deepcopy(state)
            self._playout(state_copy)
        
        # calc the move probabilities based on the visit counts at the root node
        act_visits = [(act, node._n_visits) for act, node in self._root._children.items()]
        acts, visits = zip(*act_visits)
        act_probs = softmax(1.0/temp * np.log(np.array(visits) + 1e-10))
        
        return acts, act_probs
    
    def update_with_move(self, last_move):
        """Step forward in the tree, keeping everything we already know
        about the subtree. To avoid recursive update to change value of
        ancestors later.这样新的一步就是起点，对着新的起点开始，之前几步的value不
        再去改变。
        """
        if last_move in self._root._children:
            self._root = self._root._children[last_move]
            self._root._parent = None
        else:
            self._root = TreeNode(None, 1.0)
    
    def __str__(self):
        return "MCTS"

SyntaxError: unexpected EOF while parsing (<ipython-input-17-51f747eb3b4e>, line 81)

# MCTS Player

get_move_probs -> _playout --> update_recursive

for state s repeat the following procedure n times:
1. select action and move to this action until it is a leaf
2. input leaf state to network and return action prob + leaf value
3. check if such state is end of game
4. if not: expand the leaf
5. if is: set leaf value
6. update the leaf_value
end

find action probs based on number of visit from state s to children nodes
return acts and probs

### get_action 
input current board state,

if there are sensible moves:
1. get_move_probs
2. choice the action
3. update the action

In [18]:
class MCTSPlayer(object):
    """AI player based on MCTS"""
    def __init__(self, policy_value_function, c_puct=5, n_playout=2000,
                is_selfplay=0):
        self.mcts = MCTS(policy_value_function, c_puct, n_playout)
        self._is_selfplay = is_selfplay
    
    def set_player_ind(self, p):
        self.player = p
    
    def reset_player(self):
        self.mcts.update_with_move(-1)
        
    def get_action(self, board, temp=1e-3, return_prob=0):
        sensible_moves = board.availables
        # the pi vector returned by MCTS as in the alphago zero
        move_probs = np.zeros(board.width*board.height)
        if len(sensible_moves) > 0:
            acts, probs = self.mcts.get_move_probs(board, temp)
            move_probs[list(acts)] = probs
            if self._is_selfplay:
                # add dirichlet noise for exploration
                move = np.random.choice(acts, p=0.75*probs + 
                                        0.25*np.random.dirichlet(0.3*np.ones(len(probs))))
                self.mcts.update_with_move(move) # update the root node and reuse the search tree
            else:
                # with the default temp=1e-3, this is almost equivalent to choosing the move 
                #with the highest prob
                move = np.random.choice(acts, p=probs)
                # reset the root node
                self.mcts.update_with_move(-1)
                
            if return_prob:
                return move, move_probs
            else:
                return move
        else:
            print("Warning: the board is full")
    
    def __str__(self):
        return "MCTS {}".format(self.player)

SyntaxError: unexpected EOF while parsing (<ipython-input-18-d0a6bc56ee6f>, line 8)