<a href="https://colab.research.google.com/github/NeuromatchAcademy/course-content-dl/blob/w3d3_videos/tutorials/W3D3_ReinforcementLearningForGames/W3D3_Tutorial1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial 1: Learn to play games with RL
**Week 3, Day 3: Reinforcement Learning for Games**

**By Neuromatch Academy**

__Content creators:__ Tim Lilicrap, Blake Richards

__Content reviewers:__ Arush Tagade, Lily Cheng, Melvin Selim Atay

__Content editors:__ Melvin Selim Atay, Spiros Chavlis

__Production editors:__ Namrata Bafna, Spiros Chavlis

---
#Tutorial Objectives

In this tutotial, you will learn how to implement a game loop and improve the performance of a random player. 

The specific objectives for this tutorial:
*   Understand the format of two-players games
*   Learn about value network and policy network 
* Learn about Monte Carlo Tree Search (MCTS) and compare its performance to policy-based and value-based players



In [1]:

#@markdown Tutorial slides
# you should link the slides for all tutorial videos here (we will store pdfs on osf)

from IPython.display import HTML
HTML('<iframe src="https://mfr.ca-1.osf.io/render?url=https://osf.io/3zn9w/?direct%26mode=render%26action=download%26mode=render" frameborder="0" width="960" height="569" allowfullscreen="true" mozallowfullscreen="true" webkitallowfullscreen="true"></iframe>')

In [2]:
#@title Video 0: Introduction
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"v4wafEsgopE", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

---
# Setup

In this section, we have:


1.  **Import cell**: imports all libraries you use in the tutorial
2.  **Hidden Figure settings cell**: sets up the plotting style (copy exactly)
3. **Hidden Plotting functions cell**: contains all functions used to create plots throughout the tutorial (so students don't waste time looking at boilerplate matplotlib but can here if they wish to). Please use only matplotlib for plotting for consistency.
4. **Hidden Helper functions cell**: This should contain functions that students have previously used or that are very simple. Any helper functions that are being used for the first time and are important should be placed directly above the relevant text or exercise (see Section 1.1 for an example)


In [3]:
#@title Clone a repo from github
#@markdown Run this cell!
!git clone https://github.com/raymondchua/nma_rl_games.git

import sys
sys.path.append('/content/nma_rl_games/alpha-zero')

Cloning into 'nma_rl_games'...
remote: Enumerating objects: 158, done.[K
remote: Counting objects: 100% (27/27), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 158 (delta 10), reused 27 (delta 10), pack-reused 131[K
Receiving objects: 100% (158/158), 416.03 MiB | 32.92 MiB/s, done.
Resolving deltas: 100% (52/52), done.
Checking out files: 100% (45/45), done.


In [4]:
# Install modules
!pip install tqdm --quiet
!pip install coloredlogs --quiet

# Imports

import os
import math
import time
import torch
import Arena
import random
import logging
import argparse
import coloredlogs

import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from utils import *
from Game import Game
from MCTS import MCTS

from othello.OthelloPlayers import *
from othello.OthelloLogic import Board
from othello.OthelloGame import OthelloGame
from othello.pytorch.NNet import NNetWrapper as NNet

from torchvision import datasets, transforms
from torch.autograd import Variable
from utils import *
from pickle import Pickler, Unpickler
from tqdm.notebook import tqdm
from NeuralNet import NeuralNet
from __future__ import print_function

from tqdm import tqdm
from collections import deque
from random import shuffle
from pickle import Pickler, Unpickler

log = logging.getLogger(__name__)

coloredlogs.install(level='INFO')  # Change this to DEBUG to see more info.

[K     |████████████████████████████████| 51kB 4.2MB/s 
[K     |████████████████████████████████| 92kB 6.5MB/s 
[?25h

In [5]:
#@title Set random seed. 

#@markdown Executing `set_seed(seed=seed)` you are setting the seed

# for DL its critical to set the random seed so that students can have a
# baseline to compare their results to expected results.
# Read more here: https://pytorch.org/docs/stable/notes/randomness.html

# Call `set_seed` function in the exercises to ensure reproducibility.
import random
import torch

def set_seed(seed=None, seed_torch=True):
  if seed is None:
    seed = np.random.choice(2 ** 32)
  random.seed(seed)
  np.random.seed(seed)
  if seed_torch:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

  print(f'Random seed {seed} has been set.')


# In case that `DataLoader` is used
def seed_worker(worker_id):
  worker_seed = torch.initial_seed() % 2**32
  np.random.seed(worker_seed)
  random.seed(worker_seed)

In [6]:
#@title Set device (GPU or CPU). Execute `set_device()`
# especially if torch modules used.

# inform the user if the notebook uses GPU or CPU.

def set_device():
  device = "cuda" if torch.cuda.is_available() else "cpu"
  if device != "cuda":
    print("WARNING: For this notebook to perform best, "
        "if possible, in the menu under `Runtime` -> "
        "`Change runtime type.`  select `GPU` ")
  else:
    print("GPU is enabled in this notebook.")

  return device

In [7]:
SEED = 2021
set_seed(seed=SEED)
DEVICE = set_device()

Random seed 2021 has been set.
GPU is enabled in this notebook.


In [8]:
args = dotdict({
    'numIters': 1,            # in training setting this was 1000 and num of episodes=100
    'numEps': 1,              # Number of complete self-play games to simulate during a new iteration.
    'tempThreshold': 15,      # To control exploration and exploitation
    'updateThreshold': 0.6,   # During arena playoff, new neural net will be accepted if threshold or more of games are won.
    'maxlenOfQueue': 200,     # Number of game examples to train the neural networks.
    'numMCTSSims': 15,        # Number of games moves for MCTS to simulate.
    'arenaCompare': 10,       # Number of games to play during arena play to determine if new net will be accepted.
    'cpuct': 1,
    'maxDepth':5,             # Maximum number of rollouts
    'numMCsims': 5,           # Number of monte carlo simulations
    'mc_topk': 3,             # top k actions for monte carlo rollout

    'checkpoint': './temp/',
    'load_model': False,
    'load_folder_file': ('/dev/models/8x100x50','best.pth.tar'),
    'numItersForTrainExamplesHistory': 20,

    # define neural network arguments
    'lr': 0.001,               # lr: learning rate
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'cuda': torch.cuda.is_available(),
    'num_channels': 512,
})

---
#Section 1: Create a game/agent loop for RL

In [9]:
#@title Video 1: A game loop for RL
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"s4BK_yrknf4", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})


***Goal***: How to setup a game environment with multiple players for reinforcement learning experiments.

***Exercise***: 


*   Build an agent that plays random moves
*   Connect with connect 4 game
*   Generate games including wins and losses

In [10]:
class OthelloGame(Game):
  square_content = {
      -1: "X",
      +0: "-",
      +1: "O"
      }

  @staticmethod
  def getSquarePiece(piece):
    return OthelloGame.square_content[piece]

  def __init__(self, n):
    self.n = n

  def getInitBoard(self):
    # return initial board (numpy board)
    b = Board(self.n)
    return np.array(b.pieces)

  def getBoardSize(self):
    # (a,b) tuple
    return (self.n, self.n)

  def getActionSize(self):
    # return number of actions
    return self.n*self.n + 1

  def getNextState(self, board, player, action):
    # if player takes action on board, return next (board,player)
    # action must be a valid move
    if action == self.n*self.n:
      return (board, -player)
    b = Board(self.n)
    b.pieces = np.copy(board)
    move = (int(action/self.n), action%self.n)
    b.execute_move(move, player)
    return (b.pieces, -player)

  def getValidMoves(self, board, player):
    # return a fixed size binary vector
    valids = [0]*self.getActionSize()
    b = Board(self.n)
    b.pieces = np.copy(board)
    legalMoves =  b.get_legal_moves(player)
    if len(legalMoves)==0:
      valids[-1]=1
      return np.array(valids)
    for x, y in legalMoves:
      valids[self.n*x+y]=1
    return np.array(valids)

  def getGameEnded(self, board, player):
    # return 0 if not ended, 1 if player 1 won, -1 if player 1 lost
    # player = 1
    b = Board(self.n)
    b.pieces = np.copy(board)
    if b.has_legal_moves(player):
      return 0
    if b.has_legal_moves(-player):
      return 0
    if b.countDiff(player) > 0:
      return 1
    return -1

  def getCanonicalForm(self, board, player):
    # return state if player==1, else return -state if player==-1
    return player*board

  def getSymmetries(self, board, pi):
    # mirror, rotational
    assert(len(pi) == self.n**2+1)  # 1 for pass
    pi_board = np.reshape(pi[:-1], (self.n, self.n))
    l = []

    for i in range(1, 5):
      for j in [True, False]:
        newB = np.rot90(board, i)
        newPi = np.rot90(pi_board, i)
        if j:
          newB = np.fliplr(newB)
          newPi = np.fliplr(newPi)
        l += [(newB, list(newPi.ravel()) + [pi[-1]])]
    return l

  def stringRepresentation(self, board):
    return board.tostring()

  def stringRepresentationReadable(self, board):
    board_s = "".join(self.square_content[square] for row in board for square in row)
    return board_s

  def getScore(self, board, player):
    b = Board(self.n)
    b.pieces = np.copy(board)
    return b.countDiff(player)

  @staticmethod
  def display(board):
    n = board.shape[0]
    print("   ", end="")
    for y in range(n):
      print(y, end=" ")
    print("")
    print("-----------------------")
    for y in range(n):
      print(y, "|", end="")    # print the row #
      for x in range(n):
        piece = board[y][x]    # get the piece to print
        print(OthelloGame.square_content[piece], end=" ")
      print("|")
    print("-----------------------")

## Section 1.1: Create a random player

In [11]:
class RandomPlayer():
  def __init__(self, game):
    self.game = game

  def play(self, board):

    valids = self.game.getValidMoves(board, 1)               # STUDENTS
    prob = valids/valids.sum()                               # STUDENTS
    a = np.random.choice(self.game.getActionSize(), p=prob)  # STUDENTS

    return a

## Section 1.2. Initiate the game board


In [12]:
# Display the board
game = OthelloGame(6)
board = game.getInitBoard()
game.display(board)

   0 1 2 3 4 5 
-----------------------
0 |- - - - - - |
1 |- - - - - - |
2 |- - X O - - |
3 |- - O X - - |
4 |- - - - - - |
5 |- - - - - - |
-----------------------


In [13]:
# observe the game board size
print('Board size = {}' .format(game.getBoardSize()))

# observe the action size
print('Action size = {}'.format(game.getActionSize()))

Board size = (6, 6)
Action size = 37


## Section 1.3. Create two random agents to play against each other

In [14]:
# define the random player
player1 = RandomPlayer(game).play  # player 1 is a random player
player2 = RandomPlayer(game).play  # player 2 is a random player

# define number of games
num_games = 20

# start the competition
arena = Arena.Arena(player1, player2 , game, display=None)  # to see the steps of the competition set "display=OthelloGame.display"

result = arena.playGames(num_games, verbose=False)  # return  ( number of games won by player1, num of games won by player2, num of games won by nobody)


Arena.playGames (1): 100%|██████████| 10/10 [00:00<00:00, 26.06it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:00<00:00, 23.91it/s]


In [15]:
print("\nNumber of games won by player1 = {},\nNumber of games won by player2 = {},\nNumber of games won by nobody = {} out of {} games" .format(result[0], result[1], result[2], num_games))


Number of games won by player1 = 11,
Number of games won by player2 = 9,
Number of games won by nobody = 0 out of 20 games


## Section 1.4. Compute win rate for the random player (player 1)

In [16]:
win_rate_player1 = result[0]/num_games
print('\n Win rate for player 1 over 20 games: {}%'.format(win_rate_player1*100))


 Win rate for player 1 over 20 games: 55.00000000000001%


---
# Section 2: Train a value function from expert game data
**Goal:** Learn how to train a value function from a dataset of games played by an expert.


**Exercise:** 

* Load a dataset of expert generated games.
* Train a network to minimize MSE for win/loss predictions given board states sampled throughout the game. This will be done on a very small number of games. We will provide a network trained on a larger dataset.



In [17]:
#@title Video 2: Train a value function
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"RVo6rVP9iC0", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

##Some self-play info/code goes here? 

## Section 2.1. Load expert data

In [18]:
def loadTrainExamples(folder, filename):
  trainExamplesHistory = []
  modelFile = os.path.join(folder, filename)
  examplesFile = modelFile + ".examples"
  if not os.path.isfile(examplesFile):
    print(f'File "{examplesFile}" with trainExamples not found!')
    r = input("Continue? [y|n]")
    if r != "y":
      sys.exit()
  else:
    print("File with train examples found. Loading it...")
    with open(examplesFile, "rb") as f:
      trainExamplesHistory = Unpickler(f).load()
    print('Loading done!')
    # examples based on the model were already collected (loaded)
    return trainExamplesHistory

In [19]:
path = F"/content/nma_rl_games/alpha-zero/pretrained_models/data/"
loaded_games = loadTrainExamples(folder=path, filename='checkpoint_1.pth.tar')

File with train examples found. Loading it...
Loading done!


## Section 2.2. Define the Neural Network Architecture for Othello


### Coding Exercise 2.2: Implement the NN `OthelloNNet` for Othello

In [20]:
class OthelloNNet(nn.Module):
  def __init__(self, game, args):
    # game params
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()
    self.args = args

    super(OthelloNNet, self).__init__()
    self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1,
                           padding=1)
    self.conv3 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)
    self.conv4 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)

    self.bn1 = nn.BatchNorm2d(args.num_channels)
    self.bn2 = nn.BatchNorm2d(args.num_channels)
    self.bn3 = nn.BatchNorm2d(args.num_channels)
    self.bn4 = nn.BatchNorm2d(args.num_channels)

    self.fc1 = nn.Linear(args.num_channels * (self.board_x - 4) * (self.board_y - 4), 1024)
    self.fc_bn1 = nn.BatchNorm1d(1024)

    self.fc2 = nn.Linear(1024, 512)
    self.fc_bn2 = nn.BatchNorm1d(512)

    self.fc3 = nn.Linear(512, self.action_size)

    self.fc4 = nn.Linear(512, 1)

  def forward(self, s):
    # s: batch_size x board_x x board_y
    s = s.view(-1, 1, self.board_x, self.board_y)                # batch_size x 1 x board_x x board_y
    s = F.relu(self.bn1(self.conv1(s)))                          # batch_size x num_channels x board_x x board_y
    s = F.relu(self.bn2(self.conv2(s)))                          # batch_size x num_channels x board_x x board_y
    s = F.relu(self.bn3(self.conv3(s)))                          # batch_size x num_channels x (board_x-2) x (board_y-2)
    s = F.relu(self.bn4(self.conv4(s)))                          # batch_size x num_channels x (board_x-4) x (board_y-4)
    s = s.view(-1, self.args.num_channels * (self.board_x - 4) * (self.board_y - 4))

    s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.args.dropout, training=self.training)  # batch_size x 1024
    s = F.dropout(F.relu(self.fc_bn2(self.fc2(s))), p=self.args.dropout, training=self.training)  # batch_size x 512

    pi = self.fc3(s)  # batch_size x action_size
    v = self.fc4(s)   # batch_size x 1
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Calculate the probability distribution and the value")
    #################################################
    # return a probability distribution over actions at the current state and the value of the current state.
    return ..., ...

In [21]:
#to_remove solution
class OthelloNNet(nn.Module):
  def __init__(self, game, args):
    # game params
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()
    self.args = args

    super(OthelloNNet, self).__init__()
    self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1,
                           padding=1)
    self.conv3 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)
    self.conv4 = nn.Conv2d(args.num_channels, args.num_channels, 3, stride=1)

    self.bn1 = nn.BatchNorm2d(args.num_channels)
    self.bn2 = nn.BatchNorm2d(args.num_channels)
    self.bn3 = nn.BatchNorm2d(args.num_channels)
    self.bn4 = nn.BatchNorm2d(args.num_channels)

    self.fc1 = nn.Linear(args.num_channels * (self.board_x - 4) * (self.board_y - 4), 1024)
    self.fc_bn1 = nn.BatchNorm1d(1024)

    self.fc2 = nn.Linear(1024, 512)
    self.fc_bn2 = nn.BatchNorm1d(512)

    self.fc3 = nn.Linear(512, self.action_size)

    self.fc4 = nn.Linear(512, 1)

  def forward(self, s):
    # s: batch_size x board_x x board_y
    s = s.view(-1, 1, self.board_x, self.board_y)                # batch_size x 1 x board_x x board_y
    s = F.relu(self.bn1(self.conv1(s)))                          # batch_size x num_channels x board_x x board_y
    s = F.relu(self.bn2(self.conv2(s)))                          # batch_size x num_channels x board_x x board_y
    s = F.relu(self.bn3(self.conv3(s)))                          # batch_size x num_channels x (board_x-2) x (board_y-2)
    s = F.relu(self.bn4(self.conv4(s)))                          # batch_size x num_channels x (board_x-4) x (board_y-4)
    s = s.view(-1, self.args.num_channels * (self.board_x - 4) * (self.board_y - 4))

    s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.args.dropout, training=self.training)  # batch_size x 1024
    s = F.dropout(F.relu(self.fc_bn2(self.fc2(s))), p=self.args.dropout, training=self.training)  # batch_size x 512

    pi = self.fc3(s)  # batch_size x action_size
    v = self.fc4(s)   # batch_size x 1

    # return a probability distribution over actions at the current state and the value of the current state.
    return F.log_softmax(pi, dim=1), torch.tanh(v)

## Section 2.3. Define the Value network
 During the training the ground truth will be uploaded from the **MCTS simulations** available at 'checkpoint_x.path.tar.examples'.

### Coding Exercise 2.3: Implement the `ValueNetwork`

In [22]:
class ValueNetwork(NeuralNet):
  def __init__(self, game):
    self.nnet = OthelloNNet(game, args)
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()

    if args.cuda:
      self.nnet.cuda()

  def train(self, games):
    """
    examples: list of examples, each example is of form (board, pi, v)
    """
    optimizer = optim.Adam(self.nnet.parameters())
    for examples in games:
      for epoch in range(args.epochs):
        print('EPOCH ::: ' + str(epoch + 1))
        self.nnet.train()
        v_losses = []   # to store the losses per epoch
        batch_count = int(len(examples) / args.batch_size)  # len(examples)=200, batch-size=64, batch_count=3
        t = tqdm(range(batch_count), desc='Training Value Network')
        for _ in t:
          sample_ids = np.random.randint(len(examples), size=args.batch_size)  # read the ground truth information from MCTS simulation using the loaded examples
          boards, pis, vs = list(zip(*[examples[i] for i in sample_ids]))  # length of boards, pis, vis = 64
          boards = torch.FloatTensor(np.array(boards).astype(np.float64))
          target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))

          # predict
          if args.cuda: # to run on GPU if available
            boards, target_vs = boards.contiguous().cuda(), target_vs.contiguous().cuda()

          #################################################
          ## TODO for students: details of what they should do ##
          # Fill out function and remove
          raise NotImplementedError("Compute the output")
          #################################################
          # compute output
          _, out_v = ...
          l_v = ...
          total_loss = l_v

          # record loss
          v_losses.append(l_v.item())
          t.set_postfix(Loss_v=l_v.item())

          # compute gradient and do SGD step
          optimizer.zero_grad()
          total_loss.backward()
          optimizer.step()

  def predict(self, board):
    """
    board: np array with board
    """
    # timing
    start = time.time()

    # preparing input
    board = torch.FloatTensor(board.astype(np.float64))
    if args.cuda:
      board = board.contiguous().cuda()
    board = board.view(1, self.board_x, self.board_y)
    self.nnet.eval()
    with torch.no_grad():
        _, v = self.nnet(board)
    return v.data.cpu().numpy()[0]

  def loss_v(self, targets, outputs):
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Calculate the loss")
    #################################################
    # Mean squared error (MSE)
    return ...

  def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    filepath = os.path.join(folder, filename)
    if not os.path.exists(folder):
      print("Checkpoint Directory does not exist! Making directory {}".format(folder))
      os.mkdir(folder)
    else:
      print("Checkpoint Directory exists! ")
    torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
    print("Model saved! ")

  def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
    filepath = os.path.join(folder, filename)
    if not os.path.exists(filepath):
      raise ("No model in path {}".format(filepath))
    map_location = None if args.cuda else 'cpu'
    checkpoint = torch.load(filepath, map_location=map_location)
    self.nnet.load_state_dict(checkpoint['state_dict'])

In [23]:
#to_remove solution
class ValueNetwork(NeuralNet):
  def __init__(self, game):
    self.nnet = OthelloNNet(game, args)
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()

    if args.cuda:
      self.nnet.cuda()

  def train(self, games):
    """
    examples: list of examples, each example is of form (board, pi, v)
    """
    optimizer = optim.Adam(self.nnet.parameters())
    for examples in games:
      for epoch in range(args.epochs):
        print('EPOCH ::: ' + str(epoch + 1))
        self.nnet.train()
        v_losses = []   # to store the losses per epoch
        batch_count = int(len(examples) / args.batch_size)  # len(examples)=200, batch-size=64, batch_count=3
        t = tqdm(range(batch_count), desc='Training Value Network')
        for _ in t:
          sample_ids = np.random.randint(len(examples), size=args.batch_size)  # read the ground truth information from MCTS simulation using the loaded examples
          boards, pis, vs = list(zip(*[examples[i] for i in sample_ids]))  # length of boards, pis, vis = 64
          boards = torch.FloatTensor(np.array(boards).astype(np.float64))
          target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))

          # predict
          if args.cuda: # to run on GPU if available
            boards, target_vs = boards.contiguous().cuda(), target_vs.contiguous().cuda()

          # compute output
          _, out_v = self.nnet(boards)
          l_v = self.loss_v(target_vs, out_v)
          total_loss = l_v

          # record loss
          v_losses.append(l_v.item())
          t.set_postfix(Loss_v=l_v.item())

          # compute gradient and do SGD step
          optimizer.zero_grad()
          total_loss.backward()
          optimizer.step()

  def predict(self, board):
    """
    board: np array with board
    """
    # timing
    start = time.time()

    # preparing input
    board = torch.FloatTensor(board.astype(np.float64))
    if args.cuda:
      board = board.contiguous().cuda()
    board = board.view(1, self.board_x, self.board_y)
    self.nnet.eval()
    with torch.no_grad():
        _, v = self.nnet(board)
    return v.data.cpu().numpy()[0]

  def loss_v(self, targets, outputs):
    # Mean squared error (MSE)
    return torch.sum((targets - outputs.view(-1)) ** 2) / targets.size()[0]

  def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    filepath = os.path.join(folder, filename)
    if not os.path.exists(folder):
      print("Checkpoint Directory does not exist! Making directory {}".format(folder))
      os.mkdir(folder)
    else:
      print("Checkpoint Directory exists! ")
    torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
    print("Model saved! ")

  def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
    filepath = os.path.join(folder, filename)
    if not os.path.exists(filepath):
      raise ("No model in path {}".format(filepath))
    map_location = None if args.cuda else 'cpu'
    checkpoint = torch.load(filepath, map_location=map_location)
    self.nnet.load_state_dict(checkpoint['state_dict'])

## Section 2.4. Train the value network and observe the MSE loss progress

In [24]:
game = OthelloGame(6)
vnet = ValueNetwork(game)
vnet.train(loaded_games)

Training Value Network:   0%|          | 0/401 [00:00<?, ?it/s]

EPOCH ::: 1


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 44.74it/s, Loss_v=0.787]
Training Value Network:   1%|▏         | 6/401 [00:00<00:07, 51.12it/s, Loss_v=0.607]

EPOCH ::: 2


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 46.70it/s, Loss_v=0.721]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 47.44it/s, Loss_v=0.79] 

EPOCH ::: 3


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 45.77it/s, Loss_v=0.966]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 45.75it/s, Loss_v=0.848]

EPOCH ::: 4


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 44.74it/s, Loss_v=0.544]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 45.85it/s, Loss_v=0.708]

EPOCH ::: 5


Training Value Network: 100%|██████████| 401/401 [00:09<00:00, 43.85it/s, Loss_v=0.734]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 44.18it/s, Loss_v=0.546]

EPOCH ::: 6


Training Value Network: 100%|██████████| 401/401 [00:09<00:00, 43.58it/s, Loss_v=0.546]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 45.37it/s, Loss_v=0.459]

EPOCH ::: 7


Training Value Network: 100%|██████████| 401/401 [00:09<00:00, 43.77it/s, Loss_v=0.455]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 45.86it/s, Loss_v=0.586]

EPOCH ::: 8


Training Value Network: 100%|██████████| 401/401 [00:09<00:00, 44.42it/s, Loss_v=0.347]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 46.36it/s, Loss_v=0.46] 

EPOCH ::: 9


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 44.82it/s, Loss_v=0.434]
Training Value Network:   1%|          | 5/401 [00:00<00:08, 46.05it/s, Loss_v=0.39] 

EPOCH ::: 10


Training Value Network: 100%|██████████| 401/401 [00:08<00:00, 45.03it/s, Loss_v=0.327]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 49.80it/s, Loss_v=1.19]

EPOCH ::: 1


Training Value Network: 100%|██████████| 407/407 [00:08<00:00, 45.28it/s, Loss_v=0.823]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 46.54it/s, Loss_v=0.825]

EPOCH ::: 2


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 45.19it/s, Loss_v=0.636]
Training Value Network:   1%|          | 5/407 [00:00<00:09, 43.64it/s, Loss_v=0.675]

EPOCH ::: 3


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 45.20it/s, Loss_v=0.651]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 47.83it/s, Loss_v=0.697]

EPOCH ::: 4


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.93it/s, Loss_v=0.592]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 48.41it/s, Loss_v=0.594]

EPOCH ::: 5


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.77it/s, Loss_v=0.421]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 44.97it/s, Loss_v=0.507]

EPOCH ::: 6


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.87it/s, Loss_v=0.444]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 46.02it/s, Loss_v=0.381]

EPOCH ::: 7


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.90it/s, Loss_v=0.382]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 45.82it/s, Loss_v=0.382]

EPOCH ::: 8


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.65it/s, Loss_v=0.232]
Training Value Network:   1%|▏         | 6/407 [00:00<00:08, 48.96it/s, Loss_v=0.303]

EPOCH ::: 9


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.58it/s, Loss_v=0.305]
Training Value Network:   1%|          | 5/407 [00:00<00:08, 48.45it/s, Loss_v=0.383]

EPOCH ::: 10


Training Value Network: 100%|██████████| 407/407 [00:09<00:00, 44.77it/s, Loss_v=0.291]


---
#Section 3: Use a trained value network to play games
**Goal**: Learn how to use a value function in order to make a player that works better than a random player.

**Exercise:**
* Sample random valid moves and use the value function to rank them
* Choose the best move as the action and play it
Show that doing so beats the random player


In [25]:
#@title Video 3: Play games using a value function
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"HreQzd7iusI", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 3.1: Value-based player

In [26]:
# you might need to change the checkpoint address based on the file name that's in your colab
model_save_name = 'ValueNetwork.pth.tar'
path = F"/content/nma_rl_games/alpha-zero/pretrained_models/models/"
game = OthelloGame(6)
vnet = ValueNetwork(game)
vnet.load_checkpoint(folder=path, filename=model_save_name)

In [27]:
class ValueBasedPlayer():
  def __init__(self, game, vnet):
    self.game = game
    self.vnet = vnet

  def play(self, board):
    valids = self.game.getValidMoves(board, 1)
    candidates = []
    max_num_actions = 3
    va = np.where(valids)[0]
    va_list = va.tolist()
    shuffle(va_list)
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Implement the value-based player")
    #################################################
    for a in va_list:
      # return next board state using getNextState() function
      nextBoard, _ = ...
      # predict the value of next state using value network
      value = ...
      # add the value and the action as a tuple to the candidate lists, note that you might need to change the sign of the value based on the player
      candidates += ...

      if len(candidates) == max_num_actions:
        break

    candidates.sort()

    return candidates[0][1]


# playing games between a value-based player and a random player
num_games = 20
player1 = ValueBasedPlayer(game, vnet).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
## Uncomment the code below to check your code!
# result = arena.playGames(num_games, verbose=False)
# print(result)

In [28]:
#to_remove solution
class ValueBasedPlayer():
  def __init__(self, game, vnet):
    self.game = game
    self.vnet = vnet

  def play(self, board):
    valids = self.game.getValidMoves(board, 1)
    candidates = []
    max_num_actions = 3
    va = np.where(valids)[0]
    va_list = va.tolist()
    shuffle(va_list)
    for a in va_list:
      # return next board state using getNextState() function
      nextBoard, _ = self.game.getNextState(board, 1, a)
      # predict the value of next state using value network
      value = self.vnet.predict(nextBoard)
      # add the value and the action as a tuple to the candidate lists, note that you might need to change the sign of the value based on the player
      candidates += [(-value, a)]

      if len(candidates) == max_num_actions:
        break

    candidates.sort()

    return candidates[0][1]


# playing games between a value-based player and a random player
num_games = 20
player1 = ValueBasedPlayer(game, vnet).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
## Uncomment the code below to check your code!
result = arena.playGames(num_games, verbose=False)
print(result)

Arena.playGames (1): 100%|██████████| 10/10 [00:01<00:00,  7.88it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:01<00:00,  7.95it/s]

(12, 8, 0)





```
Arena.playGames (1): 100%|██████████| 10/10 [00:01<00:00,  9.30it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:00<00:00, 10.60it/s](13, 7, 0)
```

**Result of pitting a value-based player against a random player**

In [29]:
print("\nNumber of games won by player1 = {}, \nNumber of games won by player2 = {}, \nNumber of games won by nobody = {} out of {} games" .format(result[0], result[1], result[2], num_games))

win_rate_player1 = result[0]/num_games # result[0] is the number of times that player 1 wins
print('\nWin rate for player 1 over {} games: {}%'.format(num_games, win_rate_player1*100))


Number of games won by player1 = 12, 
Number of games won by player2 = 8, 
Number of games won by nobody = 0 out of 20 games

Win rate for player 1 over 20 games: 60.0%


---
# Section 4: Train a policy network from expert game data
**Goal**: How to train a policy network via supervised learning / behavioural cloning.

**Exercise**:
* Train a network to predict the next move in an expert dataset by maximizing the log likelihood of the next action.

In [30]:
#@title Video 4: Train a policy network
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"DVSJE2d9tNI", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 4.1: Implement `PolicyNetwork`

In [31]:
class PolicyNetwork(NeuralNet):
  def __init__(self, game):
    self.nnet = OthelloNNet(game, args)
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()

    if args.cuda:
      self.nnet.cuda()

  def train(self, games):
    """
    examples: list of examples, each example is of form (board, pi, v)
    """
    optimizer = optim.Adam(self.nnet.parameters())

    for examples in games:
      for epoch in range(args.epochs):
        print('EPOCH ::: ' + str(epoch + 1))
        self.nnet.train()
        pi_losses = []

        batch_count = int(len(examples) / args.batch_size)

        t = tqdm(range(batch_count), desc='Training Policy Network')
        for _ in t:
          sample_ids = np.random.randint(len(examples), size=args.batch_size)
          boards, pis, _ = list(zip(*[examples[i] for i in sample_ids]))
          boards = torch.FloatTensor(np.array(boards).astype(np.float64))
          target_pis = torch.FloatTensor(np.array(pis))

          # predict
          if args.cuda:
            boards, target_pis = boards.contiguous().cuda(), target_pis.contiguous().cuda()

          #################################################
          ## TODO for students: details of what they should do ##
          # Fill out function and remove
          raise NotImplementedError("Compute the output")
          #################################################
          # compute output
          out_pi, _ = ...
          l_pi = ...
          total_loss = l_pi

          # record loss
          pi_losses.append(l_pi.item())
          t.set_postfix(Loss_pi=l_pi.item())

          # compute gradient and do SGD step
          optimizer.zero_grad()
          l_pi.backward()
          optimizer.step()

  def predict(self, board):
    """
    board: np array with board
    """
    # timing
    start = time.time()

    # preparing input
    board = torch.FloatTensor(board.astype(np.float64))
    if args.cuda: board = board.contiguous().cuda()
    board = board.view(1, self.board_x, self.board_y)
    self.nnet.eval()
    with torch.no_grad():
      pi,_ = self.nnet(board)
    return torch.exp(pi).data.cpu().numpy()[0]

  def loss_pi(self, targets, outputs):
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Compute the loss")
    #################################################
    # loss function. Be careful with the sign!
    return ...

  def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    filepath = os.path.join(folder, filename)
    if not os.path.exists(folder):
      print("Checkpoint Directory does not exist! Making directory {}".format(folder))
      os.mkdir(folder)
    else:
      print("Checkpoint Directory exists! ")
    torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
    print("Model saved! ")

  def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
    filepath = os.path.join(folder, filename)
    if not os.path.exists(filepath):
      raise ("No model in path {}".format(filepath))
    map_location = None if args.cuda else 'cpu'
    checkpoint = torch.load(filepath, map_location=map_location)
    self.nnet.load_state_dict(checkpoint['state_dict'])


# we use the same actor-critic network to output a policy
# game = OthelloGame(6)
# pnet = PolicyNetwork(game)
# pnet.train(loaded_games)

In [32]:
#to_remove solution
class PolicyNetwork(NeuralNet):
  def __init__(self, game):
    self.nnet = OthelloNNet(game, args)
    self.board_x, self.board_y = game.getBoardSize()
    self.action_size = game.getActionSize()

    if args.cuda:
      self.nnet.cuda()

  def train(self, games):
    """
    examples: list of examples, each example is of form (board, pi, v)
    """
    optimizer = optim.Adam(self.nnet.parameters())

    for examples in games:
      for epoch in range(args.epochs):
        print('EPOCH ::: ' + str(epoch + 1))
        self.nnet.train()
        pi_losses = []

        batch_count = int(len(examples) / args.batch_size)

        t = tqdm(range(batch_count), desc='Training Policy Network')
        for _ in t:
          sample_ids = np.random.randint(len(examples), size=args.batch_size)
          boards, pis, _ = list(zip(*[examples[i] for i in sample_ids]))
          boards = torch.FloatTensor(np.array(boards).astype(np.float64))
          target_pis = torch.FloatTensor(np.array(pis))

          # predict
          if args.cuda:
            boards, target_pis = boards.contiguous().cuda(), target_pis.contiguous().cuda()

          # compute output
          out_pi, _ = self.nnet(boards)
          l_pi = self.loss_pi(target_pis, out_pi)
          total_loss = l_pi

          # record loss
          pi_losses.append(l_pi.item())
          t.set_postfix(Loss_pi=l_pi.item())

          # compute gradient and do SGD step
          optimizer.zero_grad()
          l_pi.backward()
          optimizer.step()

  def predict(self, board):
    """
    board: np array with board
    """
    # timing
    start = time.time()

    # preparing input
    board = torch.FloatTensor(board.astype(np.float64))
    if args.cuda: board = board.contiguous().cuda()
    board = board.view(1, self.board_x, self.board_y)
    self.nnet.eval()
    with torch.no_grad():
      pi,_ = self.nnet(board)
    return torch.exp(pi).data.cpu().numpy()[0]

  def loss_pi(self, targets, outputs):
    # loss function. Be careful with the sign!
    return -torch.sum(targets * outputs) / targets.size()[0]

  def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    filepath = os.path.join(folder, filename)
    if not os.path.exists(folder):
      print("Checkpoint Directory does not exist! Making directory {}".format(folder))
      os.mkdir(folder)
    else:
      print("Checkpoint Directory exists! ")
    torch.save({'state_dict': self.nnet.state_dict(),}, filepath)
    print("Model saved! ")

  def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
    # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
    filepath = os.path.join(folder, filename)
    if not os.path.exists(filepath):
      raise ("No model in path {}".format(filepath))
    map_location = None if args.cuda else 'cpu'
    checkpoint = torch.load(filepath, map_location=map_location)
    self.nnet.load_state_dict(checkpoint['state_dict'])


# we use the same actor-critic network to output a policy
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.train(loaded_games)

Training Policy Network:   1%|▏         | 6/401 [00:00<00:07, 51.01it/s, Loss_pi=3.37]

EPOCH ::: 1


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.70it/s, Loss_pi=2.14]
Training Policy Network:   1%|▏         | 6/401 [00:00<00:08, 49.21it/s, Loss_pi=2.41]

EPOCH ::: 2


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.80it/s, Loss_pi=1.8]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 47.21it/s, Loss_pi=2.11]

EPOCH ::: 3


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.79it/s, Loss_pi=1.97]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 47.52it/s, Loss_pi=1.95]

EPOCH ::: 4


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.88it/s, Loss_pi=1.76]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 46.40it/s, Loss_pi=1.7] 

EPOCH ::: 5


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.79it/s, Loss_pi=1.85]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 48.73it/s, Loss_pi=1.6] 

EPOCH ::: 6


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.90it/s, Loss_pi=1.7]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 46.58it/s, Loss_pi=1.64]

EPOCH ::: 7


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.94it/s, Loss_pi=1.62]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 47.55it/s, Loss_pi=1.44]

EPOCH ::: 8


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.88it/s, Loss_pi=1.41]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 47.07it/s, Loss_pi=1.44]

EPOCH ::: 9


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.98it/s, Loss_pi=1.38]
Training Policy Network:   1%|          | 5/401 [00:00<00:08, 48.55it/s, Loss_pi=1.21]

EPOCH ::: 10


Training Policy Network: 100%|██████████| 401/401 [00:08<00:00, 44.90it/s, Loss_pi=1.16]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 46.78it/s, Loss_pi=1.75]

EPOCH ::: 1


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.84it/s, Loss_pi=1.56]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 46.00it/s, Loss_pi=1.56]

EPOCH ::: 2


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.90it/s, Loss_pi=1.58]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 47.82it/s, Loss_pi=1.18]

EPOCH ::: 3


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.76it/s, Loss_pi=1.33]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 45.55it/s, Loss_pi=1.28]

EPOCH ::: 4


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 45.10it/s, Loss_pi=1]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 48.55it/s, Loss_pi=0.98]

EPOCH ::: 5


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.98it/s, Loss_pi=1.11]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 46.74it/s, Loss_pi=1.09]

EPOCH ::: 6


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.92it/s, Loss_pi=1.19]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 46.27it/s, Loss_pi=1.02] 

EPOCH ::: 7


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.83it/s, Loss_pi=1.03]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 46.48it/s, Loss_pi=1.07] 

EPOCH ::: 8


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 44.91it/s, Loss_pi=0.69]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 47.21it/s, Loss_pi=0.907]

EPOCH ::: 9


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 45.07it/s, Loss_pi=0.89]
Training Policy Network:   1%|          | 5/407 [00:00<00:08, 47.48it/s, Loss_pi=0.745]

EPOCH ::: 10


Training Policy Network: 100%|██████████| 407/407 [00:09<00:00, 45.00it/s, Loss_pi=0.755]


---
#Section 5: Use a trained policy network to play games
**Goal**: How to use a policy network to play games.

**Exercise:** 
* Use the policy network to give probabilities for the next move.
* Build a player that takes the move given the maximum probability by the network.
* Compare this to another player that samples moves according to the probability distribution output by the network.


In [33]:
#@title Video 5: Play games using a policy network
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"hhhBmSXIZGY", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 5.1: `Implement the PolicyBasedPlayer`

In [34]:
# you might need to change the checkpoint address based on the file name that's in your colab

model_save_name = 'PolicyNetwork.pth.tar'
path = F"/content/nma_rl_games/alpha-zero/pretrained_models/models/"
game = OthelloGame(6)
pnet = PolicyNetwork(game)
pnet.load_checkpoint(folder=path, filename=model_save_name)

In [35]:
class PolicyBasedPlayer():
  def __init__(self, game, pnet, greedy=True):
    self.game = game
    self.pnet = pnet
    self.greedy = greedy

  def play(self, board):
    valids = self.game.getValidMoves(board, 1)
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Define the play")
    #################################################
    action_probs = ...
    vap = ...  # masking invalid moves
    sum_vap = ...

    if sum_vap > 0:
      vap /= sum_vap  # renormalize
    else:
      # if all valid moves were masked we make all valid moves equally probable
      print("All valid moves were masked, doing a workaround.")
      vap = vap + valids
      vap /= np.sum(vap)

    if self.greedy:
      # greedy policy player
      a = np.where(vap == np.max(vap))[0][0]
    else:
      # sample-based policy player
      a = np.random.choice(self.game.getActionSize(), p=vap)

    return a


# playing games
num_games = 20
player1 = PolicyBasedPlayer(game, pnet, greedy=True).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
## Uncomment below to test!
# result = arena.playGames(num_games, verbose=False)
# print(result)

In [36]:
#to_remove solution
class PolicyBasedPlayer():
  def __init__(self, game, pnet, greedy=True):
    self.game = game
    self.pnet = pnet
    self.greedy = greedy

  def play(self, board):
    valids = self.game.getValidMoves(board, 1)
    action_probs = self.pnet.predict(board)
    vap = action_probs*valids  # masking invalid moves
    sum_vap = np.sum(vap)

    if sum_vap > 0:
      vap /= sum_vap  # renormalize
    else:
      # if all valid moves were masked we make all valid moves equally probable
      print("All valid moves were masked, doing a workaround.")
      vap = vap + valids
      vap /= np.sum(vap)

    if self.greedy:
      # greedy policy player
      a = np.where(vap == np.max(vap))[0][0]
    else:
      # sample-based policy player
      a = np.random.choice(self.game.getActionSize(), p=vap)

    return a


# playing games
num_games = 20
player1 = PolicyBasedPlayer(game, pnet, greedy=True).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
## Uncomment below to test!
result = arena.playGames(num_games, verbose=False)
print(result)

Arena.playGames (1): 100%|██████████| 10/10 [00:00<00:00, 15.09it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:00<00:00, 15.05it/s]

(19, 1, 0)





In [37]:
win_rate_player1 = result[0] / num_games
print('\n Win rate for player 1 over {} games: {}%'.format(num_games, win_rate_player1*100))


 Win rate for player 1 over 20 games: 95.0%


## Section 5.1. Comparing a player that samples from the action probablities versus the policy player which takes the maximum probability
We can see that the greedy policy player achieves better performance than the sample based policy player.

In [38]:
num_games = 20
game = OthelloGame(6)
player1 = PolicyBasedPlayer(game, pnet, greedy=False).play
player2 = RandomPlayer(game).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
result = arena.playGames(num_games, verbose=False)
print(result)

Arena.playGames (1): 100%|██████████| 10/10 [00:00<00:00, 14.79it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:00<00:00, 14.45it/s]

(11, 9, 0)





In [39]:
win_rate_player1 = result[0]/num_games
print('\n Win rate for player 1 over {} games: {}%'.format(num_games, win_rate_player1*100))


 Win rate for player 1 over 20 games: 55.00000000000001%


## Section 5.2. Compare greedy policy based player versus value based player 

In [40]:
num_games = 20
game = OthelloGame(6)
player1 = PolicyBasedPlayer(game, pnet).play
player2 = ValueBasedPlayer(game, vnet).play
arena = Arena.Arena(player1, player2, game, display=OthelloGame.display)
result = arena.playGames(num_games, verbose=False)
print(result)

Arena.playGames (1): 100%|██████████| 10/10 [00:01<00:00,  6.50it/s]
Arena.playGames (2): 100%|██████████| 10/10 [00:01<00:00,  6.46it/s]

(13, 7, 0)





In [41]:
win_rate_player1 = result[0]/num_games
print('\n Win rate for player 1 over {} games: {}%'.format(num_games, win_rate_player1*100))


 Win rate for player 1 over 20 games: 65.0%


---
# Section 6: Plan using Monte Carlo rollouts


**Goal**: 
Teach the students the core idea behind using simulated rollouts to understand the future and value actions.


**Exercise**: 
* Build a loop to run Monte Carlo simulations using the policy network.
* Use this to obtain better estimates of the value of moves.



In [42]:
#@title Video 6: Play using Monte-Carlo rollouts
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"EpoIjzytpxQ", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 6.1: `MonteCarlo`

In [43]:
class MonteCarlo():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args

    self.Ps = {}  # stores initial policy (returned by neural net)
    self.Es = {}  # stores game.getGameEnded ended for board s

  # call this rollout
  def simulate(self, canonicalBoard):
    """
    This function performs one monte carlo rollout
    """

    s = self.game.stringRepresentation(canonicalBoard)
    init_start_state = s
    temp_v = 0
    isfirstAction = None

    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Build the loop")
    #################################################
    for i in range(self.args.maxDepth): # maxDepth

      if s not in self.Es:
        self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
      if self.Es[s] != 0:
        # terminal state
        temp_v= -self.Es[s]
        break

      self.Ps[s], v = self.nnet.predict(canonicalBoard)
      valids = self.game.getValidMoves(canonicalBoard, 1)
      self.Ps[s] = self.Ps[s] * valids  # masking invalid moves
      sum_Ps_s = np.sum(self.Ps[s])

      if sum_Ps_s > 0:
        self.Ps[s] /= sum_Ps_s  # renormalize
      else:
        # if all valid moves were masked make all valid moves equally probable
        # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
        # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
        log.error("All valid moves were masked, doing a workaround.")
        self.Ps[s] = self.Ps[s] + valids
        self.Ps[s] /= np.sum(self.Ps[s])

      #################################################
      ## TODO for students: details of what they should do ##
      # Fill out function and remove
      raise NotImplementedError("Take the action, find the next state")
      #################################################
      # Take a random action
      a = ...
      # Find the next state and the next player
      next_s, next_player = self.game.getNextState(..., ..., ...)
      next_s = self.game.getCanonicalForm(..., ...)

      s = self.game.stringRepresentation(next_s)
      temp_v = v

    return temp_v

In [44]:
#to_remove solution
class MonteCarlo():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args

    self.Ps = {}  # stores initial policy (returned by neural net)
    self.Es = {}  # stores game.getGameEnded ended for board s

  # call this rollout
  def simulate(self, canonicalBoard):
    """
    This function performs one monte carlo rollout
    """

    s = self.game.stringRepresentation(canonicalBoard)
    init_start_state = s
    temp_v = 0
    isfirstAction = None

    for i in range(self.args.maxDepth): # maxDepth

      if s not in self.Es:
        self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
      if self.Es[s] != 0:
        # terminal state
        temp_v= -self.Es[s]
        break

      self.Ps[s], v = self.nnet.predict(canonicalBoard)
      valids = self.game.getValidMoves(canonicalBoard, 1)
      self.Ps[s] = self.Ps[s] * valids  # masking invalid moves
      sum_Ps_s = np.sum(self.Ps[s])

      if sum_Ps_s > 0:
        self.Ps[s] /= sum_Ps_s  # renormalize
      else:
        # if all valid moves were masked make all valid moves equally probable
        # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
        # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
        log.error("All valid moves were masked, doing a workaround.")
        self.Ps[s] = self.Ps[s] + valids
        self.Ps[s] /= np.sum(self.Ps[s])

      # Take a random action
      a = np.random.choice(self.game.getActionSize(), p=self.Ps[s])
      # Find the next state and the next player
      next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
      next_s = self.game.getCanonicalForm(next_s, next_player)

      s = self.game.stringRepresentation(next_s)
      temp_v = v

    return temp_v

---
# Section 7: Use Monte Carlo simulations to play games

**Goal:** 
Teach students how to use simple Monte Carlo planning to play games.


In [45]:
#@title Video 7: Play with planning
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"-KV8DvNjn5Q", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 7.1: Monte-Carlo simulations

* Incorporate Monte Carlo simulations into an agent.
* Run the resulting player versus the random, value-based, and policy-based players.

In [46]:
class MonteCarloBasedPlayer():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Use Monte Carlo!")
    #################################################
    self.mc = ...
    self.K = self.args.mc_topk

  def play(self, canonicalBoard):
    self.qsa = []
    s = self.game.stringRepresentation(canonicalBoard)
    Ps, v = self.nnet.predict(canonicalBoard)
    valids = self.game.getValidMoves(canonicalBoard, 1)
    Ps = Ps * valids  # masking invalid moves
    sum_Ps_s = np.sum(Ps)

    if sum_Ps_s > 0:
      Ps /= sum_Ps_s  # renormalize
    else:
      # if all valid moves were masked make all valid moves equally probable
      # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
      # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
      log = logging.getLogger(__name__)
      log.error("All valid moves were masked, doing a workaround.")
      Ps = Ps + valids
      Ps /= np.sum(Ps)

    num_valid_actions = np.shape(np.nonzero(Ps))[1]

    if num_valid_actions < self.K:
      top_k_actions = np.argpartition(Ps,-num_valid_actions)[-num_valid_actions:]
    else:
      top_k_actions = np.argpartition(Ps,-self.K)[-self.K:]  # to get actions that belongs to top k prob
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Loop for the top actions")
    #################################################
    for action in ...:
      next_s, next_player = self.game.getNextState(..., ..., ...)
      next_s = self.game.getCanonicalForm(..., ...)

      values = []

      # do some rollouts
      for rollout in range(self.args.numMCsims):
        value = self.mc.simulate(canonicalBoard)
        values.append(value)

      # average out values
      avg_value = np.mean(values)
      self.qsa.append((avg_value, action))

    self.qsa.sort(key=lambda a: a[0])
    self.qsa.reverse()
    best_action = self.qsa[0][1]
    return best_action

  def getActionProb(self, canonicalBoard, temp=1):
    if self.game.getGameEnded(canonicalBoard, 1) != 0:
      return np.zeros((self.game.getActionSize()))

    else:
      action_probs = np.zeros((self.game.getActionSize()))
      best_action = self.play(canonicalBoard)
      action_probs[best_action] = 1

    return action_probs


game = OthelloGame(6)
rp = RandomPlayer(game).play  # all players
num_games = 20  # Feel free to change this number

n1 = NNet(game)  # nNet players
args1 = dotdict({'numMCsims': 10, 'maxRollouts':5, 'maxDepth':5, 'mc_topk': 3})

## Uncomment below to check Monte Carlo agent!
# mc1 = MonteCarloBasedPlayer(game, n1, args1)
# n1p = lambda x: np.argmax(mc1.getActionProb(x))
# arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)
# MC_result = arena.playGames(num_games, verbose=False)
# print("\n Number of games won by player1 = {}, num of games won by player2 = {}, num of games won by nobody = {} out of {} games" .format(MC_result[0], MC_result[1], MC_result[2], num_games))

In [47]:
#to_remove solution
class MonteCarloBasedPlayer():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    self.mc = MonteCarlo(game, nnet, args)
    self.K = self.args.mc_topk

  def play(self, canonicalBoard):
    self.qsa = []
    s = self.game.stringRepresentation(canonicalBoard)
    Ps, v = self.nnet.predict(canonicalBoard)
    valids = self.game.getValidMoves(canonicalBoard, 1)
    Ps = Ps * valids  # masking invalid moves
    sum_Ps_s = np.sum(Ps)

    if sum_Ps_s > 0:
      Ps /= sum_Ps_s  # renormalize
    else:
      # if all valid moves were masked make all valid moves equally probable
      # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
      # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
      log = logging.getLogger(__name__)
      log.error("All valid moves were masked, doing a workaround.")
      Ps = Ps + valids
      Ps /= np.sum(Ps)

    num_valid_actions = np.shape(np.nonzero(Ps))[1]

    if num_valid_actions < self.K:
      top_k_actions = np.argpartition(Ps,-num_valid_actions)[-num_valid_actions:]
    else:
      top_k_actions = np.argpartition(Ps,-self.K)[-self.K:]  # to get actions that belongs to top k prob

    for action in top_k_actions:
      next_s, next_player = self.game.getNextState(canonicalBoard, 1, action)
      next_s = self.game.getCanonicalForm(next_s, next_player)

      values = []

      # do some rollouts
      for rollout in range(self.args.numMCsims):
        value = self.mc.simulate(canonicalBoard)
        values.append(value)

      # average out values
      avg_value = np.mean(values)
      self.qsa.append((avg_value, action))

    self.qsa.sort(key=lambda a: a[0])
    self.qsa.reverse()
    best_action = self.qsa[0][1]
    return best_action

  def getActionProb(self, canonicalBoard, temp=1):
    if self.game.getGameEnded(canonicalBoard, 1) != 0:
      return np.zeros((self.game.getActionSize()))

    else:
      action_probs = np.zeros((self.game.getActionSize()))
      best_action = self.play(canonicalBoard)
      action_probs[best_action] = 1

    return action_probs


game = OthelloGame(6)
rp = RandomPlayer(game).play  # all players
num_games = 20  # Feel free to change this number

n1 = NNet(game)  # nNet players
args1 = dotdict({'numMCsims': 10, 'maxRollouts':5, 'maxDepth':5, 'mc_topk': 3})

## Uncomment below to check Monte Carlo agent!
mc1 = MonteCarloBasedPlayer(game, n1, args1)
n1p = lambda x: np.argmax(mc1.getActionProb(x))
arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)
MC_result = arena.playGames(num_games, verbose=False)
print("\n Number of games won by player1 = {}, num of games won by player2 = {}, num of games won by nobody = {} out of {} games" .format(MC_result[0], MC_result[1], MC_result[2], num_games))

Arena.playGames (1): 100%|██████████| 10/10 [00:56<00:00,  5.63s/it]
Arena.playGames (2): 100%|██████████| 10/10 [00:54<00:00,  5.43s/it]


 Number of games won by player1 = 13, num of games won by player2 = 7, num of games won by nobody = 0 out of 20 games





---
# Section 8: Plan using Monte Carlo Tree Search

**Goal:** 
Teach students to understand the core ideas behind Monte Carlo Tree Search.


In [48]:
#@title Video 8: Plan with MCTS
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"tKBcMtoEzQA", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 8.1: MCTS planner

* Plug together pre-built Selection, Expansion & Backpropagation code to complete an MCTS planner.
* Deploy the MCTS planner to understand an interesting position, producing value estimates and action counts.

In [49]:
class MCTS():
  """
  This class handles the MCTS tree.
  """

  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    self.Qsa = {}    # stores Q values for s,a (as defined in the paper)
    self.Nsa = {}    # stores #times edge s,a was visited
    self.Ns = {}     # stores #times board s was visited
    self.Ps = {}     # stores initial policy (returned by neural net)
    self.Es = {}     # stores game.getGameEnded ended for board s
    self.Vs = {}     # stores game.getValidMoves for board s

  def search(self, canonicalBoard):
    """
    This function performs one iteration of MCTS. It is recursively called
    till a leaf node is found. The action chosen at each node is one that
    has the maximum upper confidence bound as in the paper.

    Once a leaf node is found, the neural network is called to return an
    initial policy P and a value v for the state. This value is propagated
    up the search path. In case the leaf node is a terminal state, the
    outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
    updated.

    NOTE: the return values are the negative of the value of the current
    state. This is done since v is in [-1,1] and if v is the value of a
    state for the current player, then its value is -v for the other player.

    Returns:
        v: the negative of the value of the current canonicalBoard
    """
    s = self.game.stringRepresentation(canonicalBoard)

    if s not in self.Es:
      self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
    if self.Es[s] != 0:
      # terminal node
      return -self.Es[s]

    if s not in self.Ps:
      # leaf node
      self.Ps[s], v = self.nnet.predict(canonicalBoard)
      valids = self.game.getValidMoves(canonicalBoard, 1)
      self.Ps[s] = self.Ps[s] * valids  # masking invalid moves
      sum_Ps_s = np.sum(self.Ps[s])
      if sum_Ps_s > 0:
        self.Ps[s] /= sum_Ps_s  # renormalize
      else:
        # if all valid moves were masked make all valid moves equally probable
        # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
        # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
        log = logging.getLogger(__name__)
        log.error("All valid moves were masked, doing a workaround.")
        self.Ps[s] = self.Ps[s] + valids
        self.Ps[s] /= np.sum(self.Ps[s])

      self.Vs[s] = valids
      self.Ns[s] = 0

      return -v

    valids = self.Vs[s]
    cur_best = -float('inf')
    best_act = -1

    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Complete the for loop")
    #################################################
    # pick the action with the highest upper confidence bound
    for a in range(self.game.getActionSize()):
      if valids[a]:
        if (s, a) in self.Qsa:
          u = ... + ... * ... * math.sqrt(...) / (1 + ...)
        else:
          u = ... * ... * math.sqrt(... + 1e-8)

        if u > cur_best:
          cur_best = u
          best_act = a

    a = best_act
    next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
    next_s = self.game.getCanonicalForm(next_s, next_player)

    v = self.search(next_s)

    if (s, a) in self.Qsa:
      self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
      self.Nsa[(s, a)] += 1

    else:
      self.Qsa[(s, a)] = v
      self.Nsa[(s, a)] = 1

    self.Ns[s] += 1
    return -v

  def getNsa(self):
    return self.Nsa

In [50]:
#to_remove solution
class MCTS():
  """
  This class handles the MCTS tree.
  """

  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    self.Qsa = {}    # stores Q values for s,a (as defined in the paper)
    self.Nsa = {}    # stores #times edge s,a was visited
    self.Ns = {}     # stores #times board s was visited
    self.Ps = {}     # stores initial policy (returned by neural net)
    self.Es = {}     # stores game.getGameEnded ended for board s
    self.Vs = {}     # stores game.getValidMoves for board s

  def search(self, canonicalBoard):
    """
    This function performs one iteration of MCTS. It is recursively called
    till a leaf node is found. The action chosen at each node is one that
    has the maximum upper confidence bound as in the paper.

    Once a leaf node is found, the neural network is called to return an
    initial policy P and a value v for the state. This value is propagated
    up the search path. In case the leaf node is a terminal state, the
    outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
    updated.

    NOTE: the return values are the negative of the value of the current
    state. This is done since v is in [-1,1] and if v is the value of a
    state for the current player, then its value is -v for the other player.

    Returns:
        v: the negative of the value of the current canonicalBoard
    """
    s = self.game.stringRepresentation(canonicalBoard)

    if s not in self.Es:
      self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
    if self.Es[s] != 0:
      # terminal node
      return -self.Es[s]

    if s not in self.Ps:
      # leaf node
      self.Ps[s], v = self.nnet.predict(canonicalBoard)
      valids = self.game.getValidMoves(canonicalBoard, 1)
      self.Ps[s] = self.Ps[s] * valids  # masking invalid moves
      sum_Ps_s = np.sum(self.Ps[s])
      if sum_Ps_s > 0:
        self.Ps[s] /= sum_Ps_s  # renormalize
      else:
        # if all valid moves were masked make all valid moves equally probable
        # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
        # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
        log = logging.getLogger(__name__)
        log.error("All valid moves were masked, doing a workaround.")
        self.Ps[s] = self.Ps[s] + valids
        self.Ps[s] /= np.sum(self.Ps[s])

      self.Vs[s] = valids
      self.Ns[s] = 0

      return -v

    valids = self.Vs[s]
    cur_best = -float('inf')
    best_act = -1

    # pick the action with the highest upper confidence bound
    for a in range(self.game.getActionSize()):
      if valids[a]:
        if (s, a) in self.Qsa:
          u = self.Qsa[(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s]) / (1 + self.Nsa[(s, a)])
        else:
          u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + 1e-8)

        if u > cur_best:
          cur_best = u
          best_act = a

    a = best_act
    next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
    next_s = self.game.getCanonicalForm(next_s, next_player)

    v = self.search(next_s)

    if (s, a) in self.Qsa:
      self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
      self.Nsa[(s, a)] += 1

    else:
      self.Qsa[(s, a)] = v
      self.Nsa[(s, a)] = 1

    self.Ns[s] += 1
    return -v

  def getNsa(self):
    return self.Nsa

---
# Section 9: Use MCTS to play games 

**Goal:** 
Teach the students how to use the results of an MCTS to play games.

**Exercise:** 
* Plug the MCTS planner into an agent.
* Play games against other agents.
* Explore the contributions of prior network, value function, number of simulations / time to play, and explore/exploit parameters.


In [51]:
#@title Video 9: Play with MCTS
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"ejG3kN_leRk", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

## Coding Exercise 9.1: Agent that uses an MCTS planner

* Plug the MCTS planner into an agent.
* Play games against other agents.
* Explore the contributions of prior network, value function, number of simulations / time to play, and explore/exploit parameters.

In [52]:
class MonteCarloTreeSearchBasedPlayer():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    #################################################
    ## TODO for students: details of what they should do ##
    # Fill out function and remove
    raise NotImplementedError("Plug the planner")
    #################################################
    self.mcts = MCTS(game, nnet, args)

  def play(self, canonicalBoard, temp=1):
    for i in range(self.args.numMCTSSims):
      self.mcts.search(canonicalBoard)

    s = self.game.stringRepresentation(canonicalBoard)
    self.Nsa = self.mcts.getNsa()
    self.counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

    if temp == 0:
      bestAs = np.array(np.argwhere(self.counts == np.max(self.counts))).flatten()
      bestA = np.random.choice(bestAs)
      probs = [0] * len(self.counts)
      probs[bestA] = 1
      return probs

    self.counts = [x ** (1. / temp) for x in self.counts]
    self.counts_sum = float(sum(self.counts))
    probs = [x / self.counts_sum for x in self.counts]
    return np.argmax(probs)

  def getActionProb(self, canonicalBoard, temp=1):
    action_probs = np.zeros((self.game.getActionSize()))
    best_action = self.play(canonicalBoard)
    action_probs[best_action] = 1

    return action_probs


game = OthelloGame(6)
rp = RandomPlayer(game).play  # all players
num_games = 20  # games
n1 = NNet(game)  # nnet players
args1 = dotdict({'numMCTSSims': 50, 'cpuct':1.0})
## Uncomment below to check your agent!
# mcts1 = MonteCarloTreeSearchBasedPlayer(game, n1, args1)
# n1p = lambda x: np.argmax(mcts1.getActionProb(x, temp=0))
# arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)
# MCTS_result = arena.playGames(num_games, verbose=False)
# print("\n Number of games won by player1 = {}, num of games won by player2 = {}, num of games won by nobody = {} out of {} games" .format(MCTS_result[0], MCTS_result[1], MCTS_result[2], num_games))

In [53]:
#to_remove solution
class MonteCarloTreeSearchBasedPlayer():
  def __init__(self, game, nnet, args):
    self.game = game
    self.nnet = nnet
    self.args = args
    self.mcts = MCTS(game, nnet, args)

  def play(self, canonicalBoard, temp=1):
    for i in range(self.args.numMCTSSims):
      self.mcts.search(canonicalBoard)

    s = self.game.stringRepresentation(canonicalBoard)
    self.Nsa = self.mcts.getNsa()
    self.counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

    if temp == 0:
      bestAs = np.array(np.argwhere(self.counts == np.max(self.counts))).flatten()
      bestA = np.random.choice(bestAs)
      probs = [0] * len(self.counts)
      probs[bestA] = 1
      return probs

    self.counts = [x ** (1. / temp) for x in self.counts]
    self.counts_sum = float(sum(self.counts))
    probs = [x / self.counts_sum for x in self.counts]
    return np.argmax(probs)

  def getActionProb(self, canonicalBoard, temp=1):
    action_probs = np.zeros((self.game.getActionSize()))
    best_action = self.play(canonicalBoard)
    action_probs[best_action] = 1

    return action_probs


game = OthelloGame(6)
rp = RandomPlayer(game).play  # all players
num_games = 20  # games
n1 = NNet(game)  # nnet players
args1 = dotdict({'numMCTSSims': 50, 'cpuct':1.0})
## Uncomment below to check your agent!
mcts1 = MonteCarloTreeSearchBasedPlayer(game, n1, args1)
n1p = lambda x: np.argmax(mcts1.getActionProb(x, temp=0))
arena = Arena.Arena(n1p, rp, game, display=OthelloGame.display)
MCTS_result = arena.playGames(num_games, verbose=False)
print("\n Number of games won by player1 = {}, num of games won by player2 = {}, num of games won by nobody = {} out of {} games" .format(MCTS_result[0], MCTS_result[1], MCTS_result[2], num_games))

Arena.playGames (1): 100%|██████████| 10/10 [00:18<00:00,  1.86s/it]
Arena.playGames (2): 100%|██████████| 10/10 [00:19<00:00,  1.90s/it]


 Number of games won by player1 = 8, num of games won by player2 = 12, num of games won by nobody = 0 out of 20 games





---
# Section 10: Ethical aspects

In [54]:
#@title Video 10: Unstoppable opponents
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"4LKZwDP_Qac", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})

---
# Summary

In [55]:
#@title Video 11: Outro
# Insert the ID of the corresponding youtube video
from ipywidgets import widgets

out2 = widgets.Output()
with out2:
  from IPython.display import IFrame
  class BiliVideo(IFrame):
    def __init__(self, id, page=1, width=400, height=300, **kwargs):
      self.id=id
      src = "https://player.bilibili.com/player.html?bvid={0}&page={1}".format(id, page)
      super(BiliVideo, self).__init__(src, width, height, **kwargs)

  video = BiliVideo(id=f"", width=854, height=480, fs=1)
  print("Video available at https://www.bilibili.com/video/{0}".format(video.id))
  display(video)

out1 = widgets.Output()
with out1:
  from IPython.display import YouTubeVideo
  video = YouTubeVideo(id=f"8JcHw-2cwtM", width=854, height=480, fs=1, rel=0)
  print("Video available at https://youtube.com/watch?v=" + video.id)
  display(video)

out = widgets.Tab([out1, out2])
out.set_title(0, 'Youtube')
out.set_title(1, 'Bilibili')

display(out)

Tab(children=(Output(), Output()), _titles={'0': 'Youtube', '1': 'Bilibili'})