In [1]:
# Imports
import os
import math
import random
import time
import torch
import random
import logging
import coloredlogs

import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from tqdm.notebook import tqdm

log = logging.getLogger(__name__)
coloredlogs.install(level='INFO')  # Change this to DEBUG to see more info.

In [2]:
# @title Set random seed

# @markdown Executing `set_seed(seed=seed)` you are setting the seed

# For DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.
# Read more here: https://pytorch.org/docs/stable/notes/randomness.html

# Call `set_seed` function in the exercises to ensure reproducibility.
def set_seed(seed=None, seed_torch=True):
  """
  Function that controls randomness. NumPy and random modules must be imported.

  Args:
    seed : Integer
      A non-negative integer that defines the random state. Default is `None`.
    seed_torch : Boolean
      If `True` sets the random seed for pytorch tensors, so pytorch module
      must be imported. Default is `True`.

  Returns:
    Nothing.
  """
  if seed is None:
    seed = np.random.choice(2 ** 32)
  random.seed(seed)
  np.random.seed(seed)
  if seed_torch:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

  print(f'Random seed {seed} has been set.')


# In case that `DataLoader` is used
def seed_worker(worker_id):
  """
  DataLoader will reseed workers following randomness in
  multi-process data loading algorithm.

  Args:
    worker_id: integer
      ID of subprocess to seed. 0 means that
      the data will be loaded in the main process
      Refer: https://pytorch.org/docs/stable/data.html#data-loading-randomness for more details

  Returns:
    Nothing
  """
  worker_seed = torch.initial_seed() % 2**32
  np.random.seed(worker_seed)
  random.seed(worker_seed)

# @title Set device (GPU or CPU). Execute `set_device()`
# especially if torch modules used.

# Inform the user if the notebook uses GPU or CPU.

def set_device():
  """
  Set the device. CUDA if available, CPU otherwise

  Args:
    None

  Returns:
    Nothing
  """
  device = "cuda" if torch.cuda.is_available() else "cpu"
  if device != "cuda":
    print("WARNING: For this notebook to perform best, "
        "if possible, in the menu under `Runtime` -> "
        "`Change runtime type.`  select `GPU` ")
  else:
    print("GPU is enabled in this notebook.")

  return device

In [3]:
SEED = 2023
set_seed(seed=SEED)
DEVICE = set_device()

Random seed 2023 has been set.


In [4]:
# import sys
# sys.path.append('../nma_rl_games/alpha-zero')

In [5]:
import Arena

from utils import *
from Game import Game
from MCTS import MCTS
from NeuralNet import NeuralNet

# from othello.OthelloPlayers import *
from othello.OthelloLogic import Board
# from othello.OthelloGame import OthelloGame
from othello.pytorch.NNet import NNetWrapper as NNet

In [6]:
args = dotdict({
    'numIters': 1,  # In training, number of iterations = 1000 and num of episodes = 100
    'numEps': 1,  # Number of complete self-play games to simulate during a new iteration.
    'tempThreshold': 15,  # To control exploration and exploitation
    'updateThreshold': 0.6,  # During arena playoff, new neural net will be accepted if threshold or more of games are won.
    'maxlenOfQueue': 200,  # Number of game examples to train the neural networks.
    'numMCTSSims': 15,  # Number of games moves for MCTS to simulate.
    'arenaCompare': 10,  # Number of games to play during arena play to determine if new net will be accepted.
    'cpuct': 1,
    'maxDepth':5,  # Maximum number of rollouts
    'numMCsims': 5,  # Number of monte carlo simulations
    'mc_topk': 3,  # Top k actions for monte carlo rollout

    'checkpoint': './temp/',
    'load_model': False,
    'load_folder_file': ('/dev/models/8x100x50','best.pth.tar'),
    'numItersForTrainExamplesHistory': 20,

    # Define neural network arguments
    'lr': 0.001,  # learning rate
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'device': DEVICE,
    'num_channels': 512,
})

import ws3_helper
ws3_helper.args = args

In [7]:
# @title Helper functions from previous tutorials
from ws3_helper import loadTrainExamples
from ws3_helper import save_model_checkpoint
from ws3_helper import load_model_checkpoint
from ws3_helper import OthelloGame
from ws3_helper import RandomPlayer
from ws3_helper import OthelloNNet
from ws3_helper import ValueNetwork
from ws3_helper import ValueBasedPlayer
from ws3_helper import PolicyNetwork
from ws3_helper import PolicyBasedPlayer
from ws3_helper import MonteCarlo
from ws3_helper import MonteCarloBasedPlayer

## Coding Exercise 1: MCTS planner

In building the MCTS planner, we will focus on the action selection part, particularly the objective function used. MCTS will use a combination of the current action-value function $Q$ and the policy prior as follows:

\begin{equation}
\underset{a}{\operatorname{argmax}} (Q(s_t, a)+u(s_t, a))
\end{equation}

with $u(s_t, a)=c_{puct} \cdot P(s,a) \cdot \frac{\sqrt{\sum_b N(s,b)}}{1+N(s,a)}$. This effectively implements an Upper Confidence bound applied to Trees (UCT). UCT balances exploration and exploitation by taking the values stored from the MCTS into account. The trade-off is parametrized by $c_{puct}$.

**Note**: Polynomial Upper Confidence Trees (PUCT) is the technical term for the alorithm below in which we sequentially run MCTS and store/use information from previous runs to explore and find optimal actions).

<br>

**Exercise**:
* Finish the MCTS planner by using UCT to select actions to build the tree.
* Deploy the MCTS planner to build a tree search for a given board position, producing value estimates and action counts for that position.

### Part 1: Initialization and Attribute Definitions

The `MCTS` class is initialized with three main arguments: `game`, `nnet`, and `args`. These initialize the game environment, the neural network for evaluating game states, and various parameters for running the Monte Carlo Tree Search (MCTS) algorithm. Additionally, several dictionaries (`Qsa`, `Nsa`, `Ns`, `Ps`, `Es`, `Vs`) are defined to store information about the search tree.

```python
class MCTS():
    def __init__(self, game, nnet, args):
        """
        Args:
          game: OthelloGame instance
          nnet: OthelloNet instance
          args: dictionary
            Instantiates number of iterations and episodes, controls temperature threshold, queue length,
            arena, checkpointing, and neural network parameters:
            learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,
            num_channels: 512
        """
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}  # Stores Q values for s,a (as defined in the paper)
        self.Nsa = {}  # Stores #times edge s,a was visited
        self.Ns = {}  # Stores #times board s was visited
        self.Ps = {}  # Stores initial policy (returned by neural net)
        self.Es = {}  # Stores game.getGameEnded ended for board s
        self.Vs = {}  # Stores game.getValidMoves for board s
```

### Part 2: Search Method and Terminal State Check

The `search` function performs one iteration of MCTS. The first step is to check whether the current state `s` is terminal by checking `self.Es`. If it's a terminal state, the outcome of the game is returned, and the value is negated.

```python
    def search(self, canonicalBoard):
        """
        Perform one iteration of MCTS.
        Args:
          canonicalBoard: Canonical Board of size n x n.
        """
        s = self.game.stringRepresentation(canonicalBoard)

        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
        if self.Es[s] != 0:
            # Terminal node
            return -self.Es[s]
```

### Part 3: Leaf Node Detection and Neural Network Prediction

If the state `s` is not a terminal state, the next step is to check whether the state is a leaf node. If it is, the neural network is called to predict the policy (`Ps`) and the value (`v`) of the state. The valid moves are then masked in the policy to eliminate invalid actions.

```python
        if s not in self.Ps:
            # Leaf node
            self.Ps[s], v = self.nnet.predict(canonicalBoard)
            valids = self.game.getValidMoves(canonicalBoard, 1)
            self.Ps[s] = self.Ps[s] * valids  # Masking invalid moves
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s  # Renormalize
            else:
                # If all valid moves were masked make all valid moves equally probable
                # NB! All valid moves may be masked if either your NNet architecture is
                # insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should
                # pay attention to your NNet and/or training process.
                log = logging.getLogger(__name__)
                log.error("All valid moves were masked, doing a workaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids
            self.Ns[s] = 0

            return -v
```

### Part 4: Action Selection Using Upper Confidence Bound

After handling the leaf nodes, the algorithm selects the next action to explore based on the Upper Confidence Bound (UCB). This value balances exploration (selecting actions with fewer visits) and exploitation (selecting actions with higher Q-values). The action with the highest UCB is selected.

```python
        valids = self.Vs[s]
        cur_best = -float("inf")
        best_act = -1

        # Pick the action with the highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s, a) in self.Qsa:
                    u = self.Qsa[(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s]) / (
                        1 + self.Nsa[(s, a)]
                    )
                else:
                    u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + 1e-8)

                if u > cur_best:
                    cur_best = u
                    best_act = a

        a = best_act
```

### Part 5: Recursion and Value Propagation

Once the best action is chosen, the algorithm moves to the next state by calling the game environment. The search continues recursively from this next state. Once a value `v` is returned from the deeper searches, the Q-values and visit counts are updated along the path.

```python
        next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)

        v = self.search(next_s)

        if (s, a) in self.Qsa:
            self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
            self.Nsa[(s, a)] += 1
        else:
            self.Qsa[(s, a)] = v
            self.Nsa[(s, a)] = 1

        self.Ns[s] += 1
        return -v
```


In [8]:
# to_remove solution
class MCTS:

    def __init__(self, game, nnet, args):
        """
        Args:
          game: OthelloGame instance
            Instance of the OthelloGame class above;
          nnet: OthelloNet instance
            Instance of the OthelloNNet class above;
          args: dictionary
            Instantiates number of iterations and episodes, controls temperature threshold, queue length,
            arena, checkpointing, and neural network parameters:
            learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,
            num_channels: 512
        """
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}  # Stores Q values for s,a (as defined in the paper)
        self.Nsa = {}  # Stores #times edge s,a was visited
        self.Ns = {}  # Stores #times board s was visited
        self.Ps = {}  # Stores initial policy (returned by neural net)
        self.Es = {}  # Stores game.getGameEnded ended for board s
        self.Vs = {}  # Stores game.getValidMoves for board s

    def search(self, canonicalBoard):
        """
        Perform one iteration of MCTS.

        It is recursively called till a leaf node is found. The action chosen at
        each node is one that has the maximum upper confidence bound.
        Once a leaf node is found, the neural network is called to return an
        initial policy P and a value v for the state. This value is propagated
        up the search path. In case the leaf node is a terminal state, the
        outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
        updated.
        NOTE: the return values are the negative of the value of the current
        state. This is done since v is in [-1,1] and if v is the value of a
        state for the current player, then its value is -v for the other player.

        Args:
          canonicalBoard: np.ndarray
            Canonical Board of size n x n [6x6 in this case]

        Returns:
            v: Float
              The negative of the value of the current canonicalBoard
        """
        s = self.game.stringRepresentation(canonicalBoard)

        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
        if self.Es[s] != 0:
            # Terminal node
            return -self.Es[s]

        if s not in self.Ps:
            # Leaf node
            self.Ps[s], v = self.nnet.predict(canonicalBoard)
            valids = self.game.getValidMoves(canonicalBoard, 1)
            self.Ps[s] = self.Ps[s] * valids  # Masking invalid moves
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s  # Renormalize
            else:
                # If all valid moves were masked make all valid moves equally probable
                # NB! All valid moves may be masked if either your NNet architecture is
                # insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should
                # pay attention to your NNet and/or training process.
                log = logging.getLogger(__name__)
                log.error("All valid moves were masked, doing a workaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids
            self.Ns[s] = 0

            return -v

        valids = self.Vs[s]
        cur_best = -float("inf")
        best_act = -1

        # Pick the action with the highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s, a) in self.Qsa:
                    u = self.Qsa[(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s]) / (
                        1 + self.Nsa[(s, a)]
                    )
                else:
                    u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + 1e-8)

                if u > cur_best:
                    cur_best = u
                    best_act = a

        a = best_act
        next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)

        v = self.search(next_s)

        if (s, a) in self.Qsa:
            self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
            self.Nsa[(s, a)] += 1

        else:
            self.Qsa[(s, a)] = v
            self.Nsa[(s, a)] = 1

        self.Ns[s] += 1
        return -v

    def getNsa(self):
        return self.Nsa

---
# Section 2: Use MCTS to play games

*Time estimate: ~10 mins*


**Goal:** Learn how to use the results of MCTS to play games.

**Exercise:**
* Plug the MCTS planner into an agent.
* Play games against other agents.
* Explore the contributions of prior network, value function, number of simulations/time to play and explore/exploit parameters.

In [9]:
# Load MCTS model from the repository
mcts_model_save_name = 'MCTS.pth.tar'
path = "../nma_rl_games/alpha-zero/pretrained_models/models/"

In [14]:
# to_remove solution
class MonteCarloTreeSearchBasedPlayer():

  def __init__(self, game, nnet, args):
    """
    Args:
      game: OthelloGame instance
        Instance of the OthelloGame class above;
      nnet: OthelloNet instance
        Instance of the OthelloNNet class above;
      args: dictionary
        Instantiates number of iterations and episodes, controls temperature threshold, queue length,
        arena, checkpointing, and neural network parameters:
        learning-rate: 0.001, dropout: 0.3, epochs: 10, batch_size: 64,
        num_channels: 512
    """
    self.game = game
    self.nnet = nnet
    self.args = args
    self.mcts = MCTS(game, nnet, args)

  def play(self, canonicalBoard, temp=1):
    """
    Args:
      canonicalBoard: np.ndarray
        Canonical Board of size n x n [6x6 in this case]
      temp: Integer
        Signifies if game is in terminal state

    Returns:
      List of probabilities for all actions if temp is 0
      Best action based on max probability otherwise
    """
    for i in range(self.args.numMCTSSims):
      self.mcts.search(canonicalBoard)

    s = self.game.stringRepresentation(canonicalBoard)
    self.Nsa = self.mcts.getNsa()
    self.counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

    if temp == 0:
      bestAs = np.array(np.argwhere(self.counts == np.max(self.counts))).flatten()
      bestA = np.random.choice(bestAs)
      probs = [0] * len(self.counts)
      probs[bestA] = 1
      return probs

    self.counts = [x ** (1. / temp) for x in self.counts]
    self.counts_sum = float(sum(self.counts))
    probs = [x / self.counts_sum for x in self.counts]
    return np.argmax(probs)

  def getActionProb(self, canonicalBoard, temp=1):
    """
    Args:
      canonicalBoard: np.ndarray
        Canonical Board of size n x n [6x6 in this case]
      temp: Integer
        Signifies if game is in terminal state

    Returns:
      action_probs: List
        Probability associated with corresponding action
    """
    action_probs = np.zeros((self.game.getActionSize()))
    best_action = self.play(canonicalBoard)
    action_probs[best_action] = 1

    return action_probs


set_seed(seed=SEED)
game = OthelloGame(6)
rp = RandomPlayer(game).play  # All players
num_games = 20  # Games
n1 = NNet(game)  # nnet players
n1.load_checkpoint(folder=path, filename=mcts_model_save_name)
args1 = dotdict({'numMCTSSims': 50, 'cpuct':1.0})

## Uncomment below to check your agent!
print('\n******MCTS player versus random player******')
mcts1 = MonteCarloTreeSearchBasedPlayer(game, n1, args1)
n1p = lambda x: np.argmax(mcts1.getActionProb(x, temp=0))
arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)
MCTS_result = arena.playGames(num_games, verbose=False)
print(f"\nNumber of games won by player1 = {MCTS_result[0]}, "
      f"number of games won by player2 = {MCTS_result[1]}, out of {num_games} games")
win_rate_player1 = MCTS_result[0]/num_games
print(f"\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%")


Random seed 2023 has been set.

******MCTS player versus random player******


Arena.playGames (1): 100%|██████████| 10/10 [00:16<00:00,  1.69s/it]
Arena.playGames (2): 100%|██████████| 10/10 [00:16<00:00,  1.64s/it]


Number of games won by player1 = 19, number of games won by player2 = 1, out of 20 games

Win rate for player1 over 20 games: 95.0%





In [None]:
# @title Load in trained value and policy networks
model_save_name = 'ValueNetwork.pth.tar'
path = "../nma_rl_games/alpha-zero/pretrained_models/models/"
set_seed(seed=SEED)
game = OthelloGame(6)
vnet = ValueNetwork(game)
vnet.load_checkpoint(folder=path, filename=model_save_name)

model_save_name = 'PolicyNetwork.pth.tar'
path = "../nma_rl_games/alpha-zero/pretrained_models/models/"
set_seed(seed=SEED)
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.load_checkpoint(folder=path, filename=model_save_name)

# Alternative if the downloading of trained model didn't work (will train the model)
if not os.listdir('../nma_rl_games/alpha-zero/pretrained_models/models/'):
  path = "../nma_rl_games/alpha-zero/pretrained_models/data/"
  loaded_games = loadTrainExamples(folder=path, filename='checkpoint_1.pth.tar')

  set_seed(seed=SEED)
  game = OthelloGame(6)
  vnet = ValueNetwork(game)
  vnet.train(loaded_games)

  set_seed(seed=SEED)
  game = OthelloGame(6)
  pnet = PolicyNetwork(game)
  pnet.train(loaded_games)

### MCTS player against Value-based player

In [None]:
print('\n******MCTS player versus value-based player******')
set_seed(seed=SEED)
vp = ValueBasedPlayer(game, vnet).play  # Value-based player
arena = Arena.Arena(n1p, vp, game, display=OthelloGame.display)
MC_result = arena.playGames(num_games, verbose=False)

print(f"\nNumber of games won by player1 = {MC_result[0]}, "
      f"number of games won by player2 = {MC_result[1]}, out of {num_games} games")
win_rate_player1 = MC_result[0]/num_games
print(f"\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%")

### MCTS player against Policy-based player

In [None]:
print('\n******MCTS player versus policy-based player******')
set_seed(seed=SEED)
pp = PolicyBasedPlayer(game, pnet).play  # Policy-based player
arena = Arena.Arena(n1p, pp, game, display=OthelloGame.display)
MC_result = arena.playGames(num_games, verbose=False)

print(f"\nNumber of games won by player1 = {MC_result[0]}, "
      f"number of games won by player2 = {MC_result[1]}, out of {num_games} games")
win_rate_player1 = MC_result[0]/num_games
print(f"\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%")

### MCTS player against Monte-Carlo player

In [None]:
mc_model_save_name = 'MC.pth.tar'
path = "nma_rl_games/alpha-zero/pretrained_models/models/"

n2 = NNet(game)  # nNet players
n2.load_checkpoint(folder=path, filename=mc_model_save_name)
args2 = dotdict({'numMCsims': 10, 'maxRollouts':5, 'maxDepth':5, 'mc_topk': 3})

In [None]:
print('\n******MCTS player versus MC player******')
set_seed(seed=SEED)
mc = MonteCarloBasedPlayer(game, n2, args2)
n2p = lambda x: np.argmax(mc.getActionProb(x))
arena = Arena.Arena(n1p, n2p, game, display=OthelloGame.display)
MC_result = arena.playGames(num_games, verbose=False)

print(f"\nNumber of games won by player1 = {MC_result[0]}, "
      f"number of games won by player2 = {MC_result[1]}, out of {num_games} games")
win_rate_player1 = MC_result[0]/num_games
print(f"\nWin rate for player1 over {num_games} games: {round(win_rate_player1*100, 1)}%")

---
# Summary

In this tutorial, you have learned about players with Monte Carlo Tree Search planner and compared them to random, value-based, policy-based, and Monte-Carlo players.