In [316]:
#code: utf-8
from google.colab import drive
import sys
import os
import numpy as np
#import pandas as pd
import random
import time
import copy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

#from tqdm import tqdm
from tqdm.notebook import tqdm

#import matplotlib.pyplot as plt
#import matplotlib.patches as pat

In [317]:
drive.mount('/content/gdrive')
ROOT_PATH = '/content/gdrive/My Drive/Colab Notebooks'
CUR_PATH = '/content/gdrive/My Drive/Colab Notebooks/Othello_AI'
if ROOT_PATH not in sys.path:
  sys.path.append(ROOT_PATH)
if CUR_PATH not in sys.path:
  sys.path.append(CUR_PATH)

#for load params of trained model
SL_MODEL_NAME = "conv4_bn_mini"
SL_No = 40
SL_PARAM_NAME = f"SLpn_{SL_MODEL_NAME}_{SL_No}"
SL_PARAM_PATH = os.path.join(CUR_PATH, "SLpn_params", f"{SL_PARAM_NAME}.pth")

#for rollout policy model
RO_MODEL_NAME = "conv1" #"conv2_bn" #conv1: No.0, conv2_bn: No.15
RO_No = 0 #15
RO_PARAM_NAME = f"SLpn_{RO_MODEL_NAME}_{RO_No}"
RO_PARAM_PATH = os.path.join(CUR_PATH, "SLpn_params", f"{RO_PARAM_NAME}.pth")

#for value model
VALUE_No = 6
VALUE_PARAM_PATH = os.path.join(CUR_PATH, "value_data", f"value_params_{VALUE_No}.pth")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [318]:
import gym
import tools

In [319]:
def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    
SEED = 2021
set_seed(SEED)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #not need gpu?
print(device)

cpu


In [320]:
white, black = (1, 2)
node_print = False

#defining policy model

In [321]:
class Conv4_bn_mini(nn.Module):
  def __init__(self):
    super(Conv4_bn_mini, self).__init__()
    self.relu = nn.ReLU()

    self.bn1 = nn.BatchNorm2d(32)
    self.bn2 = nn.BatchNorm2d(32)
    self.bn3 = nn.BatchNorm2d(64)
    self.bn4 = nn.BatchNorm2d(64)

    self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
    self.conv2 = nn.Conv2d(32, 32, 3, padding=1)
    self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
    self.conv4 = nn.Conv2d(64, 64, 3, padding=1)

    self.flatten = nn.Flatten()
    
    self.fc1 = nn.Linear(64*8*8, 64)

  def forward(self, x):
    x = self.relu(self.bn1(self.conv1(x)))
    x = self.relu(self.bn2(self.conv2(x)))
    x = self.relu(self.bn3(self.conv3(x)))
    x = self.relu(self.bn4(self.conv4(x)))
    x = self.flatten(x)
    x = self.fc1(x)
    return x

#making Rollout Policy

conv2_bn : No 15 is the best model 

In [322]:
class Conv2_bn(nn.Module):
  def __init__(self):
    super(Conv2_bn, self).__init__()
    self.relu = nn.ReLU()

    self.bn1 = nn.BatchNorm2d(64)
    self.bn2 = nn.BatchNorm2d(128)

    self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
    self.conv2 = nn.Conv2d(64, 128, 3, padding=1)

    self.flatten = nn.Flatten()
    
    self.fc1 = nn.Linear(128*8*8, 128)
    self.fc2 = nn.Linear(128, 64)

  def forward(self, x):
    x = self.relu(self.bn1(self.conv1(x)))
    x = self.relu(self.bn2(self.conv2(x)))
    x = self.flatten(x)
    x = self.fc1(x)
    x = self.fc2(x)
    return x

Conv1: conv(3->1) -> flatten -> linear(1*64->64)
trained for 10 Epoch

In [323]:
class Conv1(nn.Module):
  def __init__(self):
    super(Conv1, self).__init__()
    self.relu = nn.ReLU()
    self.bn1 = nn.BatchNorm2d(1)
    self.conv1 = nn.Conv2d(3, 1, 3, padding=1)
    self.flatten = nn.Flatten()  
    self.fc1 = nn.Linear(1*64, 64)

  def forward(self, x):
    x = self.relu(self.bn1(self.conv1(x)))
    x = self.flatten(x)
    x = self.fc1(x)
    return x

#making value net

In [324]:
class ValueNet_conv4(nn.Module):
  def __init__(self):
    super(ValueNet_conv4, self).__init__()
    self.relu = nn.ReLU()

    self.bn1 = nn.BatchNorm2d(32)
    self.bn2 = nn.BatchNorm2d(32)
    self.bn3 = nn.BatchNorm2d(64)
    self.bn4 = nn.BatchNorm2d(64)

    self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
    self.conv2 = nn.Conv2d(32, 32, 3, padding=1)
    self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
    self.conv4 = nn.Conv2d(64, 64, 3, padding=1)

    self.flatten = nn.Flatten()
    
    self.fc1 = nn.Linear(64*8*8, 1)

  def forward(self, x):
    x = self.relu(self.bn1(self.conv1(x)))
    x = self.relu(self.bn2(self.conv2(x)))
    x = self.relu(self.bn3(self.conv3(x)))
    x = self.relu(self.bn4(self.conv4(x)))
    x = self.flatten(x)
    x = self.fc1(x)
    return x

#making transformer of data

In [325]:
def to_feature(state, player):
    state = state.astype('float32')
    white_st = np.where(state == 3 - player, 1, 0)
    black_st = np.where(state == player, 1, 0)
    bw_st = np.where(state > 0, 1, 0)
    state = np.stack([bw_st, white_st, black_st], axis = 0).reshape(-1, 3, 8, 8)
    state = torch.tensor(state).float()
    state = state.to(device)
    return state


def model_pred(model, state, player):
    with torch.no_grad():
        pred = model(to_feature(state, player))
        pred = nn.Softmax(dim=1)(pred) #in all? or in capable to place?
    return pred.to('cpu').numpy().copy()


def model_pred_inposts(model, state, player, posts):
    with torch.no_grad():
        pred = model(to_feature(state, player))
        pred_inposts = []
        for pt in posts:
            pred_inposts.append(pred[0][pt])
        pred = nn.Softmax(dim=0)(torch.tensor(pred_inposts))
    return pred.to('cpu').numpy().copy()

#making battle class

In [326]:
class TrainedModel(object):
    def __init__(self, mode="max"):
        self.model = Conv4_bn_mini().to(device)
        self.model.load_state_dict(torch.load(SL_PARAM_PATH, map_location=device))
        self.model.eval()
        self.mode = mode
        self.name = "SLpn_"+self.mode


    def get_action(self, state, player, posts, last_action):
        pred = model_pred(self.model, state, player)
        if len(posts) == 0:
            action = -1
        else:
            preds = []
            for pt in posts:
                preds.append(pred[0][pt])
            if self.mode == "max":
                action = posts[preds.index(max(preds))]
            elif self.mode == "prob":
                action = random.choices(posts, weights=preds, k=1)
            else:
                action = posts[preds.index(max(preds))]

        return action



class RandomAction(object):
    def __init__(self):
        self.name = "Random"


    def get_action(self, state, player, posts, last_action):
        if len(posts) == 0:
            action = -1
        else:
            action = random.choice(posts)

        return action



class Human(object):
    def __init__(self):
        self.name = "Human"


    def get_input(self, posts):
        input_key = input("select a number at position. otherwise selecting action in random.")
        if input_key.isdecimal():
            input_num = int(input_key)
        else:
            input_num = -1
        
        if input_num in posts:
            action = input_num
        else:
            action = random.choice(posts)
            print("selected action in random to wrong input.")
        print(f"your action: {action}")
        return action


    def get_action(self, state, player, posts, last_action):
        if len(posts) == 0:
            action = -1
            print("your turn was skipped.")
        else:
            action = self.get_input(posts)
        return action

#making node class

In [327]:
class Node(object):


    def __init__(self, parent=None, prob=0):
        self.parent = parent #parent node
        self.child = {} #dictionaly in child node
        self.n_visits = 0 #times visited node
        self.Q = 0 #modified average value
        self.u = prob #modeified prob by n and Cp
        self.P = prob #prob


    def is_parent(self):
        return self.parent is None


    def is_leaf(self):
        return len(self.child) == 0


    def get_value(self):
        return self.Q + self.u


    def select(self, Cp):
        for key in self.child:
            self.child[key].u = self.child[key].U(Cp)
            if node_print:
                print(f"key [{key:2d}] Q+u: {self.child[key].get_value():.4f}, " +
                 f"prob: {self.child[key].P:.4f}, visits: {self.child[key].n_visits}, parent_visits: {self.n_visits}")
        return max(self.child.items(), key=lambda act_node: act_node[1].get_value()) #max of child.get_value in {[action]: child}


    def U(self, Cp):
        return Cp * self.P * np.sqrt(self.parent.n_visits) / (1 + self.n_visits)


    def expand(self, action_prob):
        for action, prob in action_prob:
            if action not in self.child:
                self.child[action] = Node(self, prob)


    def update(self, leaf_Q):
        self.n_visits += 1
        self.Q += (leaf_Q - self.Q) / self.n_visits


    def update_recursive(self, leaf_Q):
        self.update(leaf_Q)
        if not self.is_parent():
            self.parent.update_recursive(leaf_Q)#player changed 


#making MCTS class

In [328]:
class MCTS(object):


    def __init__(self, lmd=0.5, Cp=5, visit_thr=15, time_limit=10):
        self.root = Node(None, 1.0)

        #definig SLpn, value net and rollout model
        self.policy_model = Conv4_bn_mini().to(device)
        self.policy_model.load_state_dict(torch.load(SL_PARAM_PATH, map_location=device))
        self.policy_model.eval()
        self.value_model = ValueNet_conv4().to(device)
        self.value_model.load_state_dict(torch.load(VALUE_PARAM_PATH, map_location=device))
        self.value_model.eval()
        self.rollout_model = Conv1().to(device) #Conv2_bn().to(device)
        self.rollout_model.load_state_dict(torch.load(RO_PARAM_PATH, map_location=device))
        self.rollout_model.eval()

        #othello env for playout and rollout 
        self.root_env = gym.make("othello-v0")
        self.root_env.reset()
        self.rollout_env = gym.make("othello-v0")
        self.rollout_env.reset()

        self.lmd = lmd
        self.Cp = Cp
        self.visit_thr = visit_thr
        self.time_limit = time_limit
        self.name = "MCTS"


    def policy_func(self, state, player, posts):
        #pred = model_pred_inposts(self.policy_model, state, player, posts)
        pred = model_pred(self.policy_model, state, player)
        action_prob = []
        if len(posts) > 0:
            for pt in posts:
                action_prob.append((pt, pred[0][pt]))
        else:
            action_prob.append((-1, 1))
        return action_prob



    def value_func(self, state, player):
        with torch.no_grad():
            value = self.value_model(to_feature(state, player))
        return value.item()


    def evaluate_rollout(self, state, me_player):
        reward = self.rollout_env.board_reset(state, me_player)
        player = me_player
        while not self.rollout_env.done:
            #self.rollout_env.render()
            player = self.rollout_env.player
            posts, _, _ = self.rollout_env.next_place
            state = self.rollout_env.render("rgb_array")
            pred = model_pred(self.rollout_model, state, player)
            if len(posts) == 0:
                action = -1
            else:
                preds = []
                for pt in posts:
                    preds.append(pred[0][pt])
                action = random.choices(posts, weights=preds, k=1)

            _, action, next, reward, _, player = self.rollout_env.step(action)

        winner = (player + (int(reward)+1)//2 )%2 + 1
        winner *= int(reward**2)
        if winner == me_player:
            score = 1.0
        elif winner == 0:
            score = 0.0
        else:
            score = -1.0
        return score


    def playout(self, state, player, node):
        #node = copy.copy(_node) #why copy?
        self.root_env.board_reset(state, player)
        if node.is_leaf():
            if node.n_visits >= self.visit_thr: #expand step
                posts, _, _ = self.root_env.next_place
                action_prob = self.policy_func(state, player, posts)
                node.expand(action_prob)
                self.playout(state, player, node) #recursion
            
            else: #evaluate step
                value = self.value_func(state, player)
                score = self.evaluate_rollout(state.copy(), player) #win score [-1.0, 0.0, 1.0]
                leaf_Q = (1-self.lmd) * value + self.lmd * score
                node.update_recursive(leaf_Q)
        
        else: #select step
            if self.root_env.done:
                value = self.value_func(state, player)
                #score = self.evaluate_rollout(state.copy(), player) #win score [-1.0, 0.0, 1.0]
                #leaf_Q = (1-self.lmd) * value + self.lmd * score
                node.update_recursive(value)
            else:
                action, node = node.select(self.Cp)
                if node_print:
                    print(f"select action: {action}")
                _, action, next, reward, _, player = self.root_env.step(action)
                self.playout(next, self.root_env.player, node)


    def select_intime(self, state, player):
        start = time.time()
        elapsed = 0
        while elapsed < self.time_limit:
            self.playout(state.copy(), player, self.root)
            last_time = elapsed
            elapsed = time.time() - start
            if node_print:
                print(f"elapsed: {elapsed:.2f} playout time: {elapsed - last_time:.2f}")
        action = max(self.root.child.items(), key=lambda act_node: act_node[1].n_visits)[0]
        if node_print:
            print(f"detetmined action: {action}")
        return action


    def update_root(self, action):
        if action in self.root.child:
            self.root = copy.copy(self.root.child[action])
            self.root.parent = None
        else:
            self.root = Node(None, 1.0)


    def get_action(self, state, player, posts, last_action):
        if last_action > -2:
            self.update_root(last_action)

        if len(posts) > 1:
            action = self.select_intime(state, player)
        elif len(posts) == 1:
            action = posts[0]
        else:
            action = -1
        self.update_root(action)
        
        return action

#othello class

In [329]:
class OthelloPlay(object):
    def __init__(self, white_model=Conv1, black_model=Conv1):
        self.model = (white_model, black_model)
        self.main_env = gym.make("othello-v0")

    
    def _model_shift(self):
        shift_model = (self.model[1], self.model[0])
        self.model = shift_model


    def _count_stones(self):
        state = self.main_env.render("rgb_array")
        white_stones = np.sum(np.where(state == white, 1, 0))
        black_stones = np.sum(np.where(state == black, 1, 0))
        return (white_stones, black_stones)


    def battle_loop(self, visible=True):
        if visible:
            print(f"white: {self.model[0].name} vs. black: {self.model[1].name}")
        self.main_env.reset()
        action = -2
        while not self.main_env.done:
            if visible:
                self.main_env.render("human")
            player = self.main_env.player
            posts, _, _ = self.main_env.next_place
            state = self.main_env.render("rgb_array")
            action = self.model[player-1].get_action(state, player, posts, action)

            _, action, next, reward, _, player = self.main_env.step(action)

        if visible:
            self.main_env.render("human")
        winner = (player + (int(reward)+1)//2 )%2 + 1
        winner *= int(reward**2)

        stones = self._count_stones()

        if visible:
            if winner == white:
                print(f"white: {self.model[0].name} win! white: {stones[0]} black: {stones[1]}")
            elif winner == black:
                print(f"black: {self.model[1].name} win! white: {stones[0]} black: {stones[1]}")
            else:
                print(f"draw! white: {stones[0]} black: {stones[1]}")

        return winner, stones


    def testplay(self, turn="one"):
        if turn == "both":
            self.battle_loop(visible=True)
            self._model_shift()
            self.battle_loop(visible=True)
        else:
            self.battle_loop(visible=True)
        


    def evalplay(self, n=10):
        for _ in range(2):
            winner_cnt = [0, 0, 0] #draw, white, black
            winner_stones = [0, 0, 0]
            for _ in range(n):
                winner, stones = self.battle_loop(visible=False)
                winner_cnt[winner] += 1
                winner_stones[winner] += max(stones) / (64 * n)
            
            print(f"white: {self.model[0].name} black: {self.model[1].name}")
            print(f"win count [{n} battles]")
            print(f"white: {winner_stones[1]}/{winner_cnt[1]} black: {winner_stones[2]}/{winner_cnt[2]} draw: {winner_stones[0]}/{winner_cnt[0]}")
            self._model_shift()

#making battle classes

In [330]:
SLpn = TrainedModel() #conv4_bn_mini
Random = RandomAction() #random
You = Human() #input key
mcts = MCTS(Cp=5, visit_thr=15, time_limit=10) #mcts

#Play Othello

In [333]:
#OthelloPlay(SLpn, mcts).testplay("one")
OthelloPlay(SLpn, mcts).evalplay(1)

white: SLpn_max black: MCTS
win count [1 battles]
white: 0/0 black: 0/0 draw: 0.5/1
white: MCTS black: SLpn_max
win count [1 battles]
white: 0/0 black: 0.703125/1 draw: 0/0
