<a href="https://colab.research.google.com/github/Miyamura80/BotsForGames/blob/main/BotsFightBots.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# environment:
!pip3 install torch
!pip install --upgrade open_spiel

Collecting open_spiel
  Downloading open_spiel-1.1.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.4 MB)
[K     |████████████████████████████████| 4.4 MB 7.9 MB/s 
Collecting scipy>=1.5.4
  Downloading scipy-1.7.3-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (38.1 MB)
[K     |████████████████████████████████| 38.1 MB 1.1 MB/s 
Installing collected packages: scipy, open-spiel
  Attempting uninstall: scipy
    Found existing installation: scipy 1.4.1
    Uninstalling scipy-1.4.1:
      Successfully uninstalled scipy-1.4.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
albumentations 0.1.12 requires imgaug<0.2.7,>=0.2.5, but you have imgaug 0.2.9 which is incompatible.[0m
Successfully installed open-spiel-1.1.0 scipy-1.7.3


# State Definition


In [None]:
import numpy as np
import pyspiel
import copy

BOARD_SIZE = 5
game = pyspiel.load_game("hex",{"board_size":BOARD_SIZE})
BLACK, WHITE = 1, -1  # first turn or second turn player

class State:
    '''Board implementation of BOARD_SIZE x BOARD_SIZE Hex Board'''
    X, Y = 'ABCDEFGHI'[0:BOARD_SIZE],  '123456789'[0:BOARD_SIZE]
    C = {0: '_', BLACK: 'O', WHITE: 'X'}

    def __init__(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE)) # (x, y)
        self.color = 1
        self.win_color = 0
        self.record = []
        self.hex_state = game.new_initial_state()

    def __deepcopy__(self):
        newState = State()
        newState.board = copy.deepcopy(self.board)
        newState.win_color = copy.deepcopy(self.win_color)
        newState.record = copy.deepcopy(self.record)
        newState.hex_state = copy.deepcopy(self.hex_state)
        return newState

    def action2str(self, a: int):
        return self.X[a // BOARD_SIZE] + self.Y[a % BOARD_SIZE]

    def str2action(self, s: str):
        return self.X.find(s[0]) * BOARD_SIZE + self.Y.find(s[1])

    def record_string(self):
        return ' '.join([self.action2str(a) for a in self.record])

    def __str__(self):
        final_bd = [" "+" ".join(self.Y)]
        hex_bd = str(self.hex_state).split("\n")
        for i in range(len(hex_bd)):
            final_bd.append(self.X[i]+" "+hex_bd[i])
        return "\n".join(final_bd)

    def play(self, action):
        # state transition function
        # action is position interger (0~8) or string representation of action sequence
        # Handles the case where action is sequence of actions "0 1 2 3 4"
        if isinstance(action, str):
            for astr in action.split():
                self.play(self.str2action(astr))
            return self

        # Single action case
        x, y = action // BOARD_SIZE, action % BOARD_SIZE
        self.board[x, y] = self.color
        self.hex_state.apply_action(action)

        # check whether 3 stones are on the line
        if self.hex_state.is_terminal():
            self.win_color = self.color

        self.color = -self.color
        self.record.append(action)
        return self

    def terminal(self):
        # terminal state check
        return self.hex_state.is_terminal()

    def terminal_reward(self):
        # terminal reward 
        # return self.win_color if self.color == BLACK else -self.win_color
        return self.win_color

    def legal_actions(self):
        # list of legal actions on each state
        return [a for a in range(BOARD_SIZE * BOARD_SIZE) if self.board[a // BOARD_SIZE, a % BOARD_SIZE] == 0]

    def feature(self):
        # input tensor for neural net (state)
        # return np.stack([self.board == self.color, self.board == -self.color]).astype(np.float32)
        observation =  np.array(self.hex_state.observation_tensor(), np.float32)
        return observation.reshape(9,BOARD_SIZE,BOARD_SIZE)[1:BOARD_SIZE+1,:,:]

    def action_feature(self, action):
        # input tensor for neural net (action)
        a = np.zeros((1, BOARD_SIZE, BOARD_SIZE), dtype=np.float32)
        a[0, action // BOARD_SIZE, action % BOARD_SIZE] = 1
        return a

In [33]:
from collections import defaultdict

a = defaultdict(dict)
print(a["he"])
print("he" in a)
a = {i: random.random() for i in range(10)}
print(np.std(list(a.values())))

{}
True
0.3326408235594726


# MCTS Agent

In [78]:
from typing import List
import random
import time
from math import sqrt, log
from collections import defaultdict
import matplotlib.pyplot as plt

random.seed(0)

class MCTSAgent:
    def __init__(self) -> None:
        self.best = []
        # Both of these :: path -> dict[move, x]
        self.moves = defaultdict(lambda: defaultdict(int))
        self.reward = defaultdict(lambda: defaultdict(float))
    
    def ucb_weight_general(self, state, mv, epoch, c=2.0):
        path = state.record_string()
        expected_reward = self.reward[path][mv]/(self.moves[path][mv]+1)
        n_visit = self.moves[path][mv]
        return expected_reward + c * sqrt(log(epoch)/(n_visit+1))

    def think(self, state: State, sim_num: int, temperature:int, show=False) -> None:
        if show:
            print("Bot to play: \n", state, state.color)
            uncertainties = []

        start, prev_time = time.time(), 0        
        if state.terminal():
            return
        
        init_path = state.record_string()
        for epoch in range(1, sim_num):
            freshState = state.__deepcopy__()
            # Display search result on every second
            if show:
                tmp_time = time.time() - start
                if int(tmp_time) > int(prev_time):
                    prev_time = tmp_time
                    pv = self.pv(freshState)
                    ucb_uncertainty = 2.0 * sqrt(log(epoch)/(self.moves[init_path][pv[0]]+1))
                    uncertainties.append(ucb_uncertainty)
                    print(f"Uncertainty: {ucb_uncertainty}")
                    print('%.2f sec. best %s. q = %.4f. n = %d / %d.'
                          % (tmp_time, state.action2str(pv[0]), self.reward[init_path][pv[0]] / (self.moves[init_path][pv[0]]+1), 
                            self.moves[init_path][pv[0]], epoch))
            not_terminated = True
            rewards = []
            while not_terminated:
                # first_move = random.choice(list(self.moves))
                path = freshState.record_string()
                ucb_weights = [self.ucb_weight_general(freshState, k, epoch) for k in freshState.legal_actions()]
                max_ucb_weight = max(ucb_weights)
                move = [k for k in freshState.legal_actions() if self.ucb_weight_general(freshState, k, epoch)==max_ucb_weight][0]
                if move in self.moves[path]:
                  self.moves[path][move] += 1
                else:
                  self.moves[path][move] = 1
                freshState.play(move)
                if path not in self.reward:
                  self.reward[path] = {move: 0}
                rewards.append((self.reward[path], move))  
                not_terminated = not freshState.terminal()
            for (r,m) in rewards:
                r[m] += freshState.terminal_reward()
        if show:
            plt.plot(uncertainties)
            plt.show()

    def pv(self, state: State) -> List[int]:
        path = state.record_string()
        if path in self.reward:
          max_value = max(self.reward[path].values())
          max_moves = [k for k,v in self.reward[path].items() if v==max_value]
          print(f"Max Value: {max_value} Rewards: {self.reward[path]} Moves: {self.moves[path]}")
        else:
          max_moves = state.legal_actions()
          print("ah")
        return [random.choice(max_moves)]


# Test Bot

In [None]:
agent = MCTSAgent()
state = State()
while True:
  
  distb = agent.think(state, 5000, temperature=1, show=False)
  pv_seq = agent.pv(state)
  state.play(pv_seq[0])

  print(state, state.color)
  if state.terminal():
    break

  while True:
    user_input = input("Input move: ")
    if state.str2action(user_input) in state.legal_actions():
      break
  state.play(user_input)
  if state.terminal():
    break
print(state)
print(state.terminal_reward())

Max Value: 284.0 Rewards: defaultdict(<class 'float'>, {0: 62.0, 1: 62.0, 2: 62.0, 3: 120.0, 4: 284.0, 5: 138.0, 6: 138.0, 7: 189.0, 8: 284.0, 9: 220.0, 10: 220.0, 11: 231.0, 12: 284.0, 13: 255.0, 14: 255.0, 15: 284.0, 16: 284.0, 17: 284.0, 18: 215.0, 19: 215.0, 20: 283.0, 21: 83.0, 22: 83.0, 23: 83.0, 24: 83.0}) Moves: defaultdict(<class 'int'>, {0: 86, 1: 86, 2: 86, 3: 140, 4: 284, 5: 158, 6: 158, 7: 203, 8: 284, 9: 230, 10: 230, 11: 239, 12: 284, 13: 259, 14: 259, 15: 284, 16: 284, 17: 284, 18: 225, 19: 225, 20: 283, 21: 107, 22: 107, 23: 107, 24: 107})
 1 2 3 4 5
A . . . . . 
B  . . . . . 
C   . . . . . 
D    x . . . . 
E     . . . . .  -1
Input move: C3
Max Value: 424.0 Rewards: defaultdict(<class 'float'>, {0: 134.0, 1: 134.0, 2: 89.0, 3: 89.0, 4: 303.0, 5: 198.0, 6: 198.0, 7: 198.0, 8: 302.0, 9: 252.0, 10: 136.0, 11: 162.0, 13: 265.0, 14: 265.0, 16: 386.0, 17: 328.0, 18: 315.0, 19: 315.0, 20: 424.0, 21: -5.0, 22: -5.0, 23: -5.0, 24: -5.0}) Moves: defaultdict(<class 'int'>, {0: 1