### Monte Carlo Tree Search Simulation

In [None]:
import os
os.cpu_count()

In [None]:
from multiprocessing import Process, current_process

from board import Board
from player import Player
from utils import *

In [None]:
me    = 'o'
agent = 'x'
nrow  = 5
ncol  = 5

board  = Board(nrow=nrow, ncol=ncol, sign_play=[agent,me])
player = Player(sign=agent, order=float('inf'))

In [None]:
# a, N, Q = player.choose_action_mcts(board, num_sim=10**2, return_dicts=True)

processes = []

for 
    process = Process(target=player.choose_action_mcts, args=(board, num_sim=10**2, return_dicts=True))
    process.append(process)
    
    # processes are spawned
    process.start()
    
for process in processes:
    # wait for each process to terminate before contunuing with the code
    process.join()


In [None]:
print(len(Q))

In [None]:
a = 1
b = 2
c = 4

st = f"{a} is {b} + {c}"

print(st)

In [1]:
# from multiprocessing import Process, Manager
import multiprocessing as mp
import itertools as itr
from utils import *
import math
import random
import time
from tqdm import trange

from board import Board

In [2]:
# https://stackoverflow.com/questions/6832554/multiprocessing-how-do-i-share-a-dict-among-multiple-processes

def choose_action_mcts_mp(board, num_sim=10**2, return_dicts=False):
    """
    explain Monte Carlo Tree Search
    """
    manager = mp.Manager()
    N = manager.dict()      # visit count
    Q = manager.dict()      # mean action value
    P = manager.dict()      # prior probability of that action, {} or could load existing dictionary
    c = 1                   # exploration/exploitation trade-off

    p = mp.Pool(processes=mp.cpu_count())
    p.starmap(mcts_simulation, itr.repeat((board, N, Q, P, c), times=num_sim))
    p.close()
    p.join()

#     # perform the simulations
#     for n in trange(num_sim):
#         N, Q = self.mcts_simulation(board, N, Q, P, c)

    # next possible states
    next_states = board.get_next_states(sign='x')    
    # get count for each next state
    next_counts = [N.get(state, 0) for state in next_states]
    # randomly select action according to weights in next_counts
    action = random.choices(board.get_free_positions(), weights=normalize(next_counts, float('inf')))[0]

    return action, N, Q if return_dicts else action


def choose_action_mcts(board, num_sim=10**2, return_dicts=False):
    """
    explain Monte Carlo Tree Search
    """

    N = {}      # visit count
    Q = {}      # mean action value
    P = {}      # prior probability of that action, {} or could load existing dictionary
    c = 1       # exploration/exploitation trade-off
    
    # perform the simulations
    for n in range(num_sim):
        mcts_simulation(board, N, Q, P, c)

    # next possible states
    next_states = board.get_next_states(sign='x')    
    # get count for each next state
    next_counts = [N.get(state, 0) for state in next_states]
    # randomly select action according to weights in next_counts
    action = random.choices(board.get_free_positions(), weights=normalize(next_counts, float('inf')))[0]

    return action, N, Q if return_dicts else action


def mcts_simulation(board, N, Q, P, c):
    """
    explain: select, expand and evaluate, backup
    """
    # play on a copy of the board
    board_cpy = copy.deepcopy(board)
    # store all the states of this MCTS simulation
    board_states = []

    # assume that the game will be a draw
    reward = 0.5
    while not board_cpy.is_full():

        # update visit count (necessary because of self-play = inverse board)
        N[board_cpy.get_state()] = N.get(board_cpy.get_state(), 0) + 1

        # evaluate possible actions
        next_states = board_cpy.get_next_states(sign='x')
        ucb_states  = []
        for state in next_states:
            q  = Q.get(state, 0.5)
            p  = P.get(state, 1/len(next_states))
            na = N.get(state, 0)
            nb = N.get(board_cpy.get_state())
            ucb_states.append(q + c * p * math.sqrt(nb) / (1+na))

        # select action that maximizes the UCB value
        action = random.choices(board_cpy.get_free_positions(), weights=normalize(ucb_states, float('inf')))[0]
        # take action
        board_cpy.add(sign='x', row=action[0], col=action[1])
        # update visit count
        N[board_cpy.get_state()] = N.get(board_cpy.get_state(), 0) + 1
        # add board state to list of visited states
        board_states.append(board_cpy.get_state())

        # check if player won
        if board_cpy.is_won(): 
            reward = 1
            break
        # if nobody won yet, inverse the board
        board_cpy.inverse()   

    # backup
    board_states.reverse()
    # update each board value
    for state in board_states:
        q = Q.get(state, 0.5)
        n = N.get(state)
        # incremental mean formula
        Q[state] = q + (reward - q) / n
        # inverse reward due to self-play
        reward = 1-reward

In [6]:
me    = 'o'
agent = 'x'
nrow  = 3
ncol  = 3

board  = Board(nrow=nrow, ncol=ncol, sign_play=[agent,me])
board.set_state('xox-ox--o')
# board.print()

start = time.time()
action, N, Q = choose_action_mcts_mp(board, num_sim=10**5, return_dicts=True)
end = time.time()
print(f"multi-processing took {end-start} seconds")

start = time.time()
action, N, Q = choose_action_mcts(board, num_sim=10**5, return_dicts=True)
end = time.time()
print(f"no multi-processing took {end-start} seconds")

  0%|          | 98/100000 [00:00<03:19, 501.14it/s]

multi-processing took 319.64283561706543 seconds


100%|██████████| 100000/100000 [04:31<00:00, 368.05it/s]

no multi-processing took 271.7143943309784 seconds





In [4]:
action

array([2, 1])

In [5]:
N.get('xox-ox--o')

10000

### once mp works for mcts, also apply on "value_deeper"

In [None]:
def value_deeper(board, agent, order, all_values):
    """
    compute values to which training converges
    """
    free_pos = board.get_free_positions()
    next_vals = []
    for pos in free_pos:
        # add symbol on that new position
        next_board = copy.deepcopy(board)
        next_board.add(sign=agent, row=pos[0], col=pos[1])
        next_board_state = next_board.get_state()
        # if win
        if next_board.is_won(): 
            val = 1
        # if draw
        elif next_board.is_full():
            val = 0.5
        # if game not done
        else:
            next_board.inverse()
            val = 1 - value_deeper(next_board, agent, order, all_values)

        all_values[next_board_state] = val
        next_vals.append(val)
    # weigthed sum of al lnext values
    weights = normalize(next_vals, order)
    return np.dot(weights, next_vals)