### Train

In [1]:
import importlib
import Agent
import ReplayBuffer
import Game

# use reload module instead of restarting kernel
importlib.reload(Agent)
importlib.reload(Game)
importlib.reload(ReplayBuffer)

<module 'ReplayBuffer' from 'F:\\Code\\Othello_Final\\ReplayBuffer.py'>

In [2]:
from Agent import DDQNAgent, RandomAgent, PositionAgent
from Game import Othello_vec
from ReplayBuffer import ReplayBuffer

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

import logging 
import os, shutil
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
from IPython.display import clear_output

def play(agent_black, agent_white, env, epoch=0): 
    state,round = env.reset()
    done = np.full(env.env_nums, False)
    cumu_reward = 0 # evaluate時計算所有env平均reward
    
    while not done.all():
        legal_act = env.get_legal_move() # 取得所有環境可以落子的位置 (env_num,action_size)，dytpe:Bool
        player = env.get_player() # 目前輪到落子的player 

        # 該輪落子的環境編號，如果該輪不能下則存在skip_idx
        black_idx = (legal_act.any(axis=1)) & (player == -1)
        white_idx = (legal_act.any(axis=1)) & (player == 1)
        skip_idx = ~legal_act.any(axis=1)

        # 記錄各環境落子的位置
        action = np.full(env.env_nums,0,dtype=np.int32)   
        
        if black_idx.any():
            # evaluate時以black作為基準對手
            action_black = agent_black.get_action(state[black_idx],legal_act[black_idx])
            action[black_idx] = action_black
        if white_idx.any():
            action_white = agent_white.get_action(state[white_idx],legal_act[white_idx])
            action[white_idx] = action_white

        # 把不能落子的環境跳過一回合
        if skip_idx.any():
            env.skip_round(skip_idx)
            player = env.get_player()
            legal_act = env.get_legal_move()

            black_idx = legal_act.any(axis=1) & (player == -1) & skip_idx
            white_idx = legal_act.any(axis=1) & (player == 1) & skip_idx
            done_idx = ~legal_act.any(axis=1) & skip_idx 

            # 補齊被pass的action
            if white_idx.any():
                action_white = agent_white.get_action(state[white_idx],legal_act[white_idx])
                action[white_idx] = action_white
            if black_idx.any():
                action_black = agent_black.get_action(state[black_idx],legal_act[black_idx])
                action[black_idx] = action_black
            if done_idx.any(): # 雙方都不能下，提早結束該局
                env.set_done(done_idx)

        done,next_state,reward,round,winner,player = env.step(action)
        
        # update memory & agent (player已經換成對手，所以要再乘-1還原)
        black_idx = (player == 1)
        white_idx = (player == -1)
        if black_idx.any():
            agent_black.step(state[black_idx], action[black_idx], reward[black_idx], next_state[black_idx], done[black_idx])
        if white_idx.any():
            agent_white.step(state[white_idx], action[white_idx], reward[white_idx], next_state[white_idx], done[white_idx])
        state = env.get_state()
    _,winner = env.get_done_score()
    return env.get_done_score()

# 用random & position agent測試表現
def eval_agent(agent_white,epoch,writer,model_name,max_win_rate):
    env = Othello_vec(env_nums=200)
    agent_eval = RandomAgent()
    _,winner = play(agent_eval,agent_white,env,epoch = epoch)
    random_win_rate = sum(winner == 1)/len(winner)
    print(f'Evaluate with [Random] \t win rate:{random_win_rate}')
    writer.add_scalar('Win Rate [random]',random_win_rate,epoch)
    
    env = Othello_vec(env_nums=2)
    agent_eval = PositionAgent()
    _,winner = play(agent_eval,agent_white,env,epoch = epoch)
    position_win_rate = sum(winner == 1)/len(winner)
    print(f'Evaluate with [Position] \t win rate:{position_win_rate}')
    writer.add_scalar('Win Rate [position]',position_win_rate,epoch)

    if random_win_rate > max_win_rate and position_win_rate == 1:
        agent_white.save(f'{model_name}_best_{random_win_rate}.pth')
        print(f'save best model {model_name} win_rate:{random_win_rate}')
        max_win_rate = random_win_rate
    agent_white.save(f'{model_name}_latest.pth')
    print('===========================')
    return max_win_rate

### DNN
#### 使用reward:
* stage 1:
    * positional reward
    * end game reward
    * 避免被佔corner
* stage 2: 
    * 避免被dominate

In [None]:
exp_name = 'DNN_best_2nd_stage_survive'

logging.basicConfig(filename=f'{exp_name}.log', encoding='utf-8', level=logging.INFO)
log_dir = f'logs/{exp_name}'
if not os.path.exists(log_dir):
    os.mkdir(log_dir)
else:
    for root,_,files in os.walk(log_dir):
        for f in files:
            if '.ipynb_checkpoints' not in f:
                os.remove(os.path.join(root,f))
                
writer = SummaryWriter(log_dir)
writer.add_text("Experiment Info",exp_name)

# use same agent for self play
agent_black = DDQNAgent(eps=0.2,net_type='DNN') 
agent_white = agent_black

epochs = 1000
eval_freq = 10
env = Othello_vec(env_nums=50)
max_win_rate = 0

for epoch in range(epochs):
    agent_black.set_eps(0.1 + (epochs-epoch)*0.2/epochs)
    agent_white.set_eps(0.1 + (epochs-epoch)*0.2/epochs)
    done,winner = play(agent_black,agent_white,env)
    black,white,round = env.get_info()
    
    # Evaluation 
    if epoch%eval_freq == 0:
        print(f'[Training] epoch: {epoch} black: {sum(winner == -1)} white: {sum(winner == 1)}')
        agent_white.set_eps(0) # for only exploitation
        max_win_rate = eval_agent(agent_white,epoch,writer,exp_name,max_win_rate)
        writer.add_scalar('average_reward',agent_white.memory.rewards.numpy().mean(),epoch)
        

### CNN

In [None]:
exp_name = 'CNN_survive_large'

logging.basicConfig(filename=f'{exp_name}.log', encoding='utf-8', level=logging.INFO)
log_dir = f'logs/{exp_name}'
if not os.path.exists(log_dir):
    os.mkdir(log_dir)
else:
    for root,_,files in os.walk(log_dir):
        for f in files:
            if '.ipynb_checkpoints' not in f:
                os.remove(os.path.join(root,f))
                
writer = SummaryWriter(log_dir)
writer.add_text("Experiment Info",exp_name)

# use same agent for self play
agent_black = DDQNAgent(eps=0.2,net_type='CNN') 
# agent_black.load('./model/CNN_pos+corner_latest-Copy3.pth')
agent_white = agent_black

epochs = 10000
eval_freq = 10
env = Othello_vec(env_nums=50)
max_win_rate = 0

for epoch in range(epochs):
    agent_black.set_eps(0.1 + (epochs-epoch)*0.2/epochs)
    agent_white.set_eps(0.1 + (epochs-epoch)*0.2/epochs)
    done,winner = play(agent_black,agent_white,env)
    black,white,round = env.get_info()
    
    # Evaluation 
    if epoch%eval_freq == 0:
        print(f'[Training] epoch: {epoch} black: {sum(winner == -1)} white: {sum(winner == 1)}')
        agent_white.set_eps(0) # for only exploitation
        max_win_rate = eval_agent(agent_white,epoch,writer,exp_name,max_win_rate)
        writer.add_scalar('average_reward',agent_white.memory.rewards.numpy().mean(),epoch)
        