# 強化学習を用いた3D Score Fourの攻略

変更するべきパラメータ

- Gym環境の報酬
- Network
- ReplayBufferに学習させる
e.t.c

## 各種設定

In [138]:
# Driveのマウント(logをドライブに保存)
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [139]:
!pip install pytorch-summary

[31mERROR: Could not find a version that satisfies the requirement pytorch-summary (from versions: none)[0m
[31mERROR: No matching distribution found for pytorch-summary[0m


In [140]:
# ライブラリのインポート
import os
import random
import shutil
import time
from datetime import datetime, timedelta, timezone
from stat import SF_IMMUTABLE

import gym
import numpy as np
import pandas as pd
import plotly.express as px
import torch
import torchsummary
from google.colab import output
from gym import error, spaces, utils
from gym.utils import seeding
from torch import nn, optim
from torch.utils.tensorboard import SummaryWriter
from tqdm.notebook import tqdm

%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [141]:
# 現在の日本標準時を取得
JST = timezone(timedelta(hours=+9), 'JST')
now = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

In [142]:
# 保存フォルダの準備
project_root = "/content/drive/MyDrive/__MatsuoSeminerResearch/"
save_folder = project_root + "logs/" + now

# 各自のDrive内に「/__MatsuoSeminerResearch/logs/【日付】」という名前の保存フォルダを作成
os.makedirs(save_folder, exist_ok=True)

In [143]:
# cudaが使用可能かどうかを確認
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [144]:
# 割り当てられたGPUの確認
!nvidia-smi

Wed Mar 24 17:39:07 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.56       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   66C    P8    33W / 149W |      3MiB / 11441MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [145]:
# Seed値固定のための関数
def fix_seed(seed):
    os.environ["PYTHONHASHSEED"] = str(seed)
    # random
    random.seed(seed)
    # Numpy
    np.random.seed(seed)
    # Pytorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False # 処理速度は落ちる


## 環境(Environment)の実装

cubeとboardの違い
- cube.shape = (self.num_grid,self.num_grid,self.num_grid)  
- board.shape = (1,self.num_grid,self.num_grid,self.num_grid)

初期化時の引数

|引数|内容|  
|:--------:|:--------:|
|num_grid|一辺の長さ|
|num_win_seq|勝利条件(この数一列に並んだら勝利)|
|win_reward|勝利時の報酬|
|draw_penalty|引き分け時のペナルティ|
|lose_penalty|敗北時のペナルティ|
|couldnt_locate_penalty|おけない場所を選択した時のペナルティ|
|time_penalty|使用していない|
|first_player|先攻|

- 報酬・ペナルティについては全て正の値で設定すること
- ペナルティは-1を掛けた値を足します

#### **ユーティリティクラスの定義**

OpenAI Gym形式でN目並べを実装するに当たって、コードの見やすさの観点からstepやresetといった主要な操作とそれ以外の副次的な操作を分けた。
以下では、主要な操作以外をユーティリティクラスとして実装している。

In [146]:
class UtilClass():
  """Utility class
  This class gives some useful function for this game.
  To make the classses simple, we separated two classes.

  Attributes:
    num_win_seq (int): the number of sequence necessary for winning
    win_reward (float): the reward agent gets when win the game
    draw_penalty (float): the penalty agent gets when it draw the game
    lose_penalty (float): the penalty agent gets when it lose the game
    could_locate_reward (float): the additional reward for agent being able to put the stone
    couldnt_locate_penalty (float): the penalty agent gets when it choose the location where the stone cannot be placed.   
    time_penalty (float): the penalty agents gets along with timesteps
  
  """
  def __init__(
      self, 
      num_grid,
      num_win_seq,
      win_reward, 
      draw_penalty,
      lose_penalty, # 未使用
      could_locate_reward,
      couldnt_locate_penalty,
      time_penalty # 未使用
      ):
    self.num_grid = num_grid
    self.num_win_seq = num_win_seq
    self.win_reward = win_reward
    self.draw_penalty = draw_penalty
    self.lose_penalty = lose_penalty # 未使用
    self.could_locate_reward = could_locate_reward
    self.couldnt_locate_penalty = couldnt_locate_penalty
    self.time_penalty = time_penalty # 未使用
    # 判定用定数
    self.WIN_A = np.full(num_win_seq,1)
    self.WIN_B = np.full(num_win_seq,-1)


  def resolve_placing(self, wide, depth, player_number, board):
    is_put = False
    couldnt_locate = False
    for height in range(self.num_grid):
      if (board[height][wide][depth]==0): # 空いていたら置く
        board[height][wide][depth] = player_number
        reward = self.could_locate_reward
        is_put = True
        break
    # その柱(pile)が満杯で置けなかった場合。（height=0~self.num_grid-1 まで埋まっていた場合）
    if (not is_put):
      reward = -self.couldnt_locate_penalty
      couldnt_locate=True

    return reward, board, couldnt_locate


  def resolve_winning(self, done, player_number, board):
    reward = 0
    winner = 0
    # stepを実行した側（player_number側）は勝つ以外ありえない
    if (done):
      # どちらのプレーヤーが勝利したかにかかわらず、勝利報酬を設定。
      reward = self.win_reward
      # 勝利プレーヤー
      winner = player_number
    # 全てのマスが非ゼロにもかかわらず、doneになっていない場合（引き分けの場合）
    elif (not(0 in np.array(board).flatten())):
      done = True
      reward = -self.draw_penalty
    else:
      pass
    
    return done, reward, winner


  def is_done(self, cube):
    cube = np.array(cube)
    num_stride = self.num_grid - self.num_win_seq + 1

    # 1辺self.num_gridマスの格子内で、1辺self.num_win_seqマスのcubeを1マスずつずらしていく
    for dim_H_stride_id in range(num_stride):
      for dim_W_stride_id in range(num_stride):
        for dim_D_stride_id in range(num_stride):
          searching_cube = cube[
                                dim_H_stride_id:dim_H_stride_id+self.num_win_seq,
                                dim_W_stride_id:dim_W_stride_id+self.num_win_seq,
                                dim_D_stride_id:dim_D_stride_id+self.num_win_seq
                                ]
          
          # x,y,z軸各方向に垂直な面について解析
          cube_list = [
                       searching_cube, 
                       np.rot90(searching_cube, axes=(0, 1)), 
                       searching_cube.T
                       ] 

          # cube内の考えうる全ての二次元平面上でループ
          for each_cube in cube_list:
            for plane in each_cube:
              # 2次元平面上でビンゴしていないか確認
              if (self.is_end_on_2d_plane(plane)):
                return True

          # 立体的な斜め
          if (self.is_diag_on_3d_cube(each_cube)):
            return True
    
    return False


  # N×Nの2次元配列上でN個玉が並んでいるところがあるかを判定する関数。（ビンゴの判定みたいなもの）
  def is_end_on_2d_plane(self, org_plane: np.ndarray) -> bool:
    assert org_plane.shape == (self.num_win_seq, self.num_win_seq)

    # 行・列
    for plane in [org_plane, org_plane.T]:
      for row in plane:
        if (all(row == self.WIN_A) or all(row == self.WIN_B)):
          return True
    
    # 斜め
    if (abs(np.trace(org_plane))==self.num_win_seq or abs(np.trace(np.fliplr(org_plane)))==self.num_win_seq):
      return True

    return False


  # N×N×Nの3次元配列上で、N個の玉が立体対角上に並んでいるかどうかを判定する関数。
  def is_diag_on_3d_cube(self, org_cube):
    assert org_cube.shape == (self.num_win_seq, self.num_win_seq, self.num_win_seq)
    
    for cube in [org_cube, np.rot90(org_cube, axes=(1,2)), np.rot90(org_cube, axes=(0,1)), np.rot90(org_cube.T, axes=(0,1))]:

      oblique_elements = np.empty(0)
      for f in range(self.num_win_seq):
        for a in range(self.num_win_seq):
          for b in range(self.num_win_seq):
            if (f==a and a==b and f==b):
              oblique_elements = np.append(oblique_elements, cube[f][a][b])

      if (all(oblique_elements == np.full(self.num_win_seq, 1)) or all(oblique_elements == np.full(self.num_win_seq, -1))):
        return True
    
    return False


  # 入力をbaseで指定した進数に変換。返り値が文字列になっていることに注意。
  def base_change(self, value, base):
    if (int(value / base)):
      return self.base_change(int(value / base), base) + str(value % base)
    return str(value % base)


  def is_game_end(self, player_number, board):
    done = self.is_done(board)
    is_end, reward, winner = self.resolve_winning(done, player_number, board)

    return is_end, reward, winner

#### **N目並べクラス(OpenAI Gym形式)の定義**

N目並べの環境をOpenAI Gym形式で実装。

In [147]:
class AnyNumberInARow3dEnv(gym.Env):
  """the extended implementation of Five in a Row (Any Number in a Row) environment in manner of OpenAI gym
  Five in a Row is one of the most famous traditional board games in Japan. 
  The rule of this game is simple.  
    1. Two players puts the Go pieces (black & white stones) alternately on an empty intersection
    2. The winner is the first player to form an unbroken chain of five stones horizontally, vertically, or diagonally
  We extended this game to in two ways.
  First, we added another dimention to the board (2D to 3D).
  Second, we extended the required number for winning (five) to hyperparameter, which means programmers can set that number at their will.
  So, we can call the extended style game "Any Number in a Row"

  This class gives "Any Number in a Row" environment following OpenAI Gym interface.

  Attributes:
    num_grid (int): the number of intersections in a board
    action_space (gym.spaces):
    observation_space (gym.spaces):
    player (int):
    utils (UtilClass):


  """
  def __init__(
      self,
      num_grid=4,
      num_win_seq=4,
      win_reward=10, 
      draw_penalty=5,
      lose_penalty=10,
      could_locate_reward=0.1,
      couldnt_locate_penalty=0.1,
      time_penalty=0.1,
      first_player=1
    ):
    super().__init__()

    self.num_grid = num_grid
    
    # 行動空間(action)を定義。今回は重力がある設定（高さ方向は石を置く位置を指定できない）ので、N×Nの離散空間。
    self.action_space = gym.spaces.Discrete(self.num_grid * self.num_grid)
    # 観測空間(state)を定義。今回は自分の色の石が置かれている状態、石の置かれていない状態、相手プレイヤーの石が置かれている状態の3つをそれぞれ-1,0,1の値で表す。
    # 従って、-1, 0, 1の3値をとるN×N×Nの離散空間。 
    self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(self.num_grid, self.num_grid, self.num_grid))

    # 最初のプレーヤーがどちらかを定義
    self.player = first_player

    # 上記で実装したユーティリティクラスの委譲。（継承すると必要以上に依存してしまうため、避けた）
    self.utils = UtilClass(
        num_grid=num_grid,
        num_win_seq=num_win_seq,
        win_reward=win_reward, 
        draw_penalty=draw_penalty,
        lose_penalty=lose_penalty,
        could_locate_reward=could_locate_reward,
        couldnt_locate_penalty=couldnt_locate_penalty,
        time_penalty=time_penalty
        )

    # 環境の初期化
    self.reset()

  def reset(self):
    """reset the board

    Reset the board to the initial state.

    Returns:
        torch.Tensor: the initial board tensor filled with 0 (0 means empty, 1 or -1 means the stone is put)

    """
    self.board = [[[0]*self.num_grid for _ in range(self.num_grid)] for _ in range(self.num_grid)]
    return self.board

  def step(self, action):
    """OpenAI gym style step function

    Receive the action and make transition.

    Args:
        action (int): selected aciton number (range from 0 to self.num_grid**2)

    Returns:
        (tuple): tuple containing:
          obs (torch.Tensor): the observation agents get after the transition
          reward (float): the total reward agents get through the transition
          done (bool): the flag of whether the episode has finished or not
          info (dict): a dictionary containing the following information
            "turn": turn-player ID,
            "winner": winner-player ID, 
            "is_could_locate": whether the stone could be placed or not 

    """
    # 1~self.num_grid**2 の数値で表される action を、「升目のどの位置か」と言う情報に変換
    action = self.utils.base_change(action, self.num_grid).zfill(2) 
    
    # 上記変換後、 action は縦横何マス目かを表す2文字の文字列（ex. '13'なら横2マス目、縦4マス目）になっているので、
    # それぞれの次元について位置を整数型にして取得。
    W = int(action[0])
    D = int(action[1])

    # 各種変数の初期化
    reward = 0
    fixment_reward = 0
    winner = 0
    done = False
    is_couldnt_locate=False

    # 石の配置のダイナミクスを司る部分。石を配置し、次状態を返す。また、石を置ける場所を選択したかどうかに基づいて、追加情報（及び調整報酬）を返す。
    fixment_reward, self.board, is_couldnt_locate = self.utils.resolve_placing(
        wide=W, 
        depth=D, 
        player_number=self.player, 
        board=self.board
        )
    
    # 現在のボードの状態から、ゲーム終了判定をし、（もし終了している場合）試合結果に応じた報酬および勝者情報を返す。
    done, reward, winner = self.utils.is_game_end(
        player_number=self.player, 
        board=self.board
        )

    # このステップがどちらのプレーヤーによってなされたか、勝者はどちらか、このステップでプレーヤーは石の置ける場所を選択したか、の3つの情報を格納した辞書。
    info={"turn": self.player, "winner": winner, "is_couldnt_locate":is_couldnt_locate}

    # プレーヤーの交代
    self.player *= -1

    return self.board, reward+fixment_reward, done, info


  def render(self, mode = "print", isClear = False):
    if (isClear):
      output.clear() #出力の消去
    
    if (mode == "print"):
      i = 0
      for square in self.board:
        print("{}F".format(i))
        for line in square:
          print(line)
        i += 1
    
    elif (mode == "plot"):
      data = pd.DataFrame(index=[],columns=["W","D","H","Player"])
      index = 0
      for i in range(self.num_grid):
        for j in range(self.num_grid):
          for k in range(self.num_grid):
            data.loc[index] = ([j, k, i, self.board[i][j][k]])
            index += 1

      range_list=[-0.4,self.num_grid-0.6]
      fig = px.scatter_3d(data,x="W",y="D",z="H",color="Player",
                          range_x=range_list,range_y=range_list,range_z=range_list,
                          color_discrete_map={0:"rgba(0,0,0,0)",1:"red",-1:"blue"},
                          opacity=0.95,width=854,height=480)
      fig.show()


  # 色が透明にならない問題あり
  def animation(self,obs_history):
    data = pd.DataFrame(index=[],columns=["W","D","H","Player","frame"])
    index = 0
    dict_int_player={0:"no one",1:"A",-1:"B"}
    for frame in range(len(obs_history)):
      for i in range(self.num_grid):
        for j in range(self.num_grid):
          for k in range(self.num_grid):
            data.loc[index] = ([j, k, i, obs_history[frame][i][j][k],frame])
            index += 1

    range_list=[-0.4,self.num_grid-0.6]
    fig = px.scatter_3d(data,x="W",y="D",z="H",color="Player",
                        animation_frame="frame",
                        color_discrete_map={0:"rgba(0,0,0,0)",-1:"red",1:"blue"},
                        range_color=[-1,1],
                        range_x=range_list,range_y=range_list,range_z=range_list,
                        opacity=0.95,width=854,height=480)  
    fig.show()

#### **Conv3dを使用する場合のラッパークラスを定義**

In [148]:
class Conv3dObsWrapper(gym.ObservationWrapper):
  def __init__(self, env):
    super().__init__(env)
    gym.spaces.Box(low=-1, high=1, shape=(1, self.num_grid, self.num_grid, self.num_grid))

  
  def observation(self, obs):
    obs = [obs]
    return torch.tensor(obs).float()

#### **テスト**

環境の実装が、きちんと予期する動作をしているかをテストする。

- パターン1（4＊4＊4の4目並べ）

| step | action | coordinate(W,D,H) | reward | done | player | winner | couldnt_locate | detail |
|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|
| 1 | 0 | (0,0,0) | 10 | False | 1 | 0 | False |  |
| 2 | 0 | (0,0,1) | 10 | False | -1 | 0 | False |  |
| 3 | 1 | (0,1,0) | 10 | False | 1 | 0 | False |  |
| 4 | 1 | (0,1,1) | 10 | False | -1 | 0 | False |  |
| 5 | 2 | (0,2,0) | 10 | False | 1 | 0 | False |  |
| 6 | 2 | (0,2,1) | 10 | False | -1 | 0 | False |  |
| 7 | 4 | (1,0,0) | 10 | False | 1 | 0 | False |  |
| 8 | 4 | (1,0,1) | 10 | False | -1 | 0 | False |  |
| 9 | 0 | (0,0,2) | 10 | False | 1 | 0 | False |  |
| 10 | 0 | (0,0,3) | 10 | False | -1 | 0 | False |  |
| 11 | 1 | (0,1,2) | 10 | False | 1 | 0 | False |  |
| 12 | 1 | (0,1,3) | 10 | False | -1 | 0 | False |  |
| 13 | 2 | (0,2,2) | 10 | False | 1 | 0 | False |  |
| 14 | 2 | (0,2,3) | 10 | False | -1 | 0 | False |  |
| 15 | 0 | - | -10 | False | 1 | 0 | True | (0,0,#)のpileにはこれ以上置けない |
| 16 | 0 | - | -10 | False | -1 | 0 | True | (0,0,#)のpileにはこれ以上置けない |
| 17 | 3 | (0,3,0) | 110 | True | 1 | 1 | False | プレーヤー1の玉が1列揃う。上の実装だと、玉を置いたことによる報酬(10)は受け取ってしまうことに注意。 |

- パターン2（3＊3＊3の2目並べ）

| step | action | coordinate(W,D,H) | reward | done | player | winner | couldnt_locate | detail |
|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|:--------:|
| 1 | 0 | (0,0,0) | 10 | False | 1 | 0 | False |  |
| 2 | 0 | (0,0,1) | 10 | False | -1 | 0 | False |  |
| 3 | 0 | (0,0,2) | 10 | False | 1 | 0 | False |  |
| 4 | 0 | - | -10 | False | -1 | 0 | True | (0,0,#)のpileにはこれ以上置けない |
| 5 | 1 | (0,1,0) | 110 | True | 1 | 1 | False | プレーヤー1の玉が1列揃う。上の実装だと、玉を置いたことによる報酬(10)は受け取ってしまうことに注意。 |

In [149]:
def test_env(env, answer_dict):
  env.reset()
  # 各種報酬・罰の設定。（テスト用なので値はわかりやすくしてある　）
  env.utils.win_reward=100
  env.utils.draw_penalty=50
  env.utils.lose_penalty=100
  env.utils.could_locate_reward=10
  env.utils.couldnt_locate_penalty=10
  env.utils.time_penalty=1
  # （のちの解説を分かりやすくするため、）初期プレーヤーは1固定
  env.player=1
  for idx, action in enumerate(answer_dict["actions"]):
    obs, reward, done, info = env.step(action)
    print(obs, "\n", reward, "\n", done, "\n", info)
    
    if (answer_dict["rewards"][idx] == reward and answer_dict["dones"][idx] == done and answer_dict["players"][idx] == info["turn"] and answer_dict["winners"][idx] == info["winner"] and answer_dict["couldnt_locates"][idx] == info["is_couldnt_locate"]):
      env.render(mode='plot')
      continue
    
    raise Exception("Your environment has failed the test...")
    
  
  print("Your environment has passed the test!!!!")

In [150]:
  # パターン1の正解
  pattern1 = {
      "actions": [0,0,1,1,2,2,4,4,0,0,1,1,2,2,0,0,3],
      "rewards": [10,10,10,10,10,10,10,10,10,10,10,10,10,10,-10,-10,110],
      "dones": [False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True],
      "players": [1,-1,1,-1,1,-1,1,-1,1,-1,1,-1,1,-1,1,-1,1],
      "winners": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1],
      "couldnt_locates": [False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,True,False],
      "num_grid": 4,
      "num_win_seq": 4
      }
  # パターン2の正解
  pattern2 = {
      "actions": [0,0,0,0,1],
      "rewards": [10,10,10,-10,110],
      "dones": [False,False,False,False,True],
      "players": [1,-1,1,-1,1],
      "winners": [0,0,0,0,1],
      "couldnt_locates": [False,False,False,True,False],
      "num_grid": 3,
      "num_win_seq": 2
      }
  
  """
  pattern3 = {
      "actions": [],
      "rewards": [],
      "dones": [],
      "players": [],
      "winners": [],
      "couldnt_locates": [],
      "num_grid": ,
      "num_win_seq": 
      }
  """

'\npattern3 = {\n    "actions": [],\n    "rewards": [],\n    "dones": [],\n    "players": [],\n    "winners": [],\n    "couldnt_locates": [],\n    "num_grid": ,\n    "num_win_seq": \n    }\n'

In [151]:
env.utils.num_win_seq

4

In [152]:
for pattern in [pattern1, pattern2,]:
  env = AnyNumberInARow3dEnv(num_grid=pattern["num_grid"], num_win_seq=pattern["num_win_seq"])
  test_env(env, pattern)

[[[1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 -10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': True}


[[[1, 1, 1, 0], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 -10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': True}


[[[1, 1, 1, 1], [1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [-1, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[1, 1, 1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], [[-1, -1, -1, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]]] 
 110 
 True 
 {'turn': 1, 'winner': 1, 'is_couldnt_locate': False}


Your environment has passed the test!!!!
[[[1, 0, 0], [0, 0, 0], [0, 0, 0]], [[0, 0, 0], [0, 0, 0], [0, 0, 0]], [[0, 0, 0], [0, 0, 0], [0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 0, 0], [0, 0, 0], [0, 0, 0]], [[-1, 0, 0], [0, 0, 0], [0, 0, 0]], [[0, 0, 0], [0, 0, 0], [0, 0, 0]]] 
 10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 0, 0], [0, 0, 0], [0, 0, 0]], [[-1, 0, 0], [0, 0, 0], [0, 0, 0]], [[1, 0, 0], [0, 0, 0], [0, 0, 0]]] 
 10 
 False 
 {'turn': 1, 'winner': 0, 'is_couldnt_locate': False}


[[[1, 0, 0], [0, 0, 0], [0, 0, 0]], [[-1, 0, 0], [0, 0, 0], [0, 0, 0]], [[1, 0, 0], [0, 0, 0], [0, 0, 0]]] 
 -10 
 False 
 {'turn': -1, 'winner': 0, 'is_couldnt_locate': True}


[[[1, 1, 0], [0, 0, 0], [0, 0, 0]], [[-1, 0, 0], [0, 0, 0], [0, 0, 0]], [[1, 0, 0], [0, 0, 0], [0, 0, 0]]] 
 110 
 True 
 {'turn': 1, 'winner': 1, 'is_couldnt_locate': False}


Your environment has passed the test!!!!


In [42]:
def validate_random(net,experiment_times=20,first_player=1):
  win_num=0
  lose_num=0
  draw_num=0

  total_step=0
  total_couldnt_step=0
  
  info={"turn": first_player, "winner": 0}
  for i in range(experiment_times):
    done=False
    obs = env.reset()

    while not done:
      sum_reward = 0
      step_done=0
      isFirstCouldntLocate=False
      while step_done != 2:
        if (info["turn"] == AGENT_TURN):
          if not isFirstCouldntLocate: 
            total_step+=1
            player_action = net.act(obs.float().to(device), 0.05) 
            next_obs, player_reward, done, info = env.step(player_action) # 環境中で実際に行動
            if info["is_couldnt_locate"]==True:
              total_couldnt_step+=1
              isFirstCouldntLocate=True
              pass
            else:
              step_done+=1
              sum_reward += player_reward
          else:
            player_action = env.action_space.sample()
            next_obs, player_reward, done, info = env.step(player_action) # 環境中で実際に行動
            if info["is_couldnt_locate"]==True:
              pass
            else:
              step_done+=1
              sum_reward += 0

          if done :
            break

        elif (info["turn"] == ENEMY_TURN):
          enemy_action = env.action_space.sample() # ランダム
          next_obs, enemy_reward, done, info = env.step(enemy_action) # 環境中で実際に行動
          if info["is_couldnt_locate"]==True:
            pass
          else:
            step_done+=1
          
          if (done):# 相手のcouldnt_locate_penaltyとcould_locate_rewardをsum_rewardに入れないように
            sum_reward -= enemy_reward # 相手が勝利して得た報酬を引く       
            break

        obs = next_obs

    if(info["winner"]==1):
      win_num+=1
    elif(info["winner"]==-1):
      lose_num+=1
    else:
      draw_num+=1

  win_rate=win_num/(experiment_times)*100
  draw_rate=draw_num/(experiment_times)*100
  lose_rate=lose_num/(experiment_times)*100
  couldnt_rate=total_couldnt_step/(total_step)*100
  return win_rate, draw_rate, lose_rate,couldnt_rate


## エージェント(Agent)の実装

In [None]:
"""
   Prioritized Experience Replayを実現するためのメモリクラス.
"""
class PrioritizedReplayBuffer(object):
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.index = 0
        self.buffer = []
        self.priorities = np.zeros(buffer_size, dtype=np.float32)
        self.priorities[0] = 1.0
    
    def __len__(self):
        return len(self.buffer)

    # 経験をリプレイバッファに保存する． 経験は(obs, action, reward, next_obs, done)の5つ組を想定    
    def push(self, experience):
        if len(self.buffer) < self.buffer_size:
            self.buffer.append(experience)
        else:
            self.buffer[self.index] = experience

        # 優先度は最初は大きな値で初期化しておき, 後でサンプルされた時に更新する
        self.priorities[self.index] = self.priorities.max()
        self.index = (self.index + 1) % self.buffer_size
    
    def sample(self, batch_size, alpha=0.6, beta=0.4):
        # 現在経験が入っている部分に対応する優先度を取り出し, サンプルする確率を計算
        priorities = self.priorities[: self.buffer_size if len(self.buffer) == self.buffer_size else self.index]
        priorities = priorities ** alpha
        prob = priorities / priorities.sum()

        # サンプルする経験のインデックス
        indices = np.random.choice(len(self.buffer), batch_size, p=prob)

        # 重点サンプリングの補正のための重みを計算
        weights = (len(self.buffer) * prob[indices])**(-beta)
        weights = weights / np.max(weights)

        # 上でサンプルしたインデックスに基づいて経験をサンプルし, (obs, action, reward, next_obs, done)に分ける
        obs, action, reward, next_obs, done = zip(*[self.buffer[i] for i in indices])

        # あとで計算しやすいようにtorch.Tensorに変換して(obs, action, reward, next_obs, done, indices, weights)の7つ組を返す
        return (torch.stack(obs),
                torch.as_tensor(action), 
                torch.as_tensor(reward, dtype=torch.float32),
                torch.stack(next_obs), 
                torch.as_tensor(done, dtype=torch.uint8),
                indices,
                torch.as_tensor(weights, dtype=torch.float32))

    # 優先度を更新する. 優先度が極端に小さくなって経験が全く選ばれないということがないように, 微小値を加算しておく.
    def update_priorities(self, indices, priorities):
        self.priorities[indices] = priorities + 1e-4

In [None]:
"""
    Dueling Networkを用いたQ関数を実現するためのニューラルネットワークをクラスとして記述します. 
"""
class CNNQNetwork(nn.Module):
    def __init__(self, state_shape, n_action):
        super(CNNQNetwork, self).__init__()
        self.state_shape = state_shape
        self.n_action = n_action
        # Dueling Networkでも, 畳込み部分は共有する
        self.conv_layers = nn.Sequential(
            #nn.Conv3d(state_shape[0],64,kernel_size=2),
            #nn.LeakyReLU()
            nn.Flatten(),
            nn.Linear(state_shape[1]*state_shape[2]*state_shape[3],512),
            nn.LeakyReLU(),
            nn.LayerNorm(512),
            nn.Linear(512, 512),
            nn.LeakyReLU(),
        )

        cnn_out_size = self.check_cnn_size(state_shape) # CNNにかけた後の出力層の次元を解析

        # Dueling Networkのための分岐した全結合層
        # 状態価値
        self.fc_state = nn.Sequential(
            nn.Linear(cnn_out_size, 512),
            nn.LeakyReLU(),
            nn.Linear(512, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 64),
            nn.LeakyReLU(),
            nn.Linear(64, 1)
        )

        # アドバンテージ
        self.fc_advantage = nn.Sequential(
            nn.Linear(cnn_out_size, 512),
            nn.LeakyReLU(),
            nn.Linear(512, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 64),
            nn.LeakyReLU(),
            nn.Linear(64, n_action)
        )

    def check_cnn_size(self, shape):
        shape = torch.FloatTensor(1,shape[0],shape[1],shape[2],shape[3])
        out = self.conv_layers(shape).size()
        out = np.prod(np.array(out))
        return out
    
    def forward(self, obs):
        feature = self.conv_layers(obs)
        feature = feature.view(feature.size(0), -1)

        state_values = self.fc_state(feature)
        advantage = self.fc_advantage(feature)

        # 状態価値 + アドバンテージ で行動価値を計算しますが、安定化のためアドバンテージの（行動間での）平均を引きます
        action_values = state_values + advantage - torch.mean(advantage, dim=1, keepdim=True)
        return action_values

    # epsilon-greedy. 確率epsilonでランダムに行動し, それ以外はニューラルネットワークの予測結果に基づいてgreedyに行動します. 
    def act(self, obs, epsilon):
        if random.random() < epsilon:
            action = random.randrange(self.n_action)
        else:
            # 行動を選択する時には勾配を追跡する必要がない
            with torch.no_grad():
                action = torch.argmax(self.forward(obs.unsqueeze(0))).item()
        return action
    
    def act_greedy(self, obs):
      with torch.no_grad():
          action = torch.argmax(self.forward(obs.unsqueeze(0))).item()
      return action

In [None]:
def update(batch_size, beta):
    obs, action, reward, next_obs, done, indices, weights = replay_buffer.sample(batch_size, beta)
    obs, action, reward, next_obs, done, weights \
        = obs.float().to(device), action.to(device), reward.to(device), next_obs.float().to(device), done.to(device), weights.to(device)

    #　ニューラルネットワークによるQ関数の出力から, .gatherで実際に選択した行動に対応する価値を集めてきます.
    q_values = net(obs).gather(1, action.unsqueeze(1)).squeeze(1)
    
    """
    print("action : ", action)
    print("net(obs) : ", net(obs))
    print("action.unsqueeze(1) : ", action.unsqueeze(1))
    print("net(obs).gather(1, action.unsqueeze(1)) : ", net(obs).gather(1, action.unsqueeze(1)))
    """

    # 目標値の計算なので勾配を追跡しない
    with torch.no_grad():
        # Double DQN. 
        # ① 現在のQ関数でgreedyに行動を選択し, 
        greedy_action_next = torch.argmax(net(next_obs), dim=1)

        # ②　対応する価値はターゲットネットワークのものを参照します.
        q_values_next = target_net(next_obs).gather(1, greedy_action_next.unsqueeze(1)).squeeze(1)

    # ベルマン方程式に基づき, 更新先の価値を計算します.
    # (1 - done)をかけているのは, ゲームが終わった後の価値は0とみなすためです.
    target_q_values = reward + gamma * q_values_next * (1 - done)

    # Prioritized Experience Replayのために, ロスに重み付けを行なって更新します.
    optimizer.zero_grad()
    loss = (weights * loss_func(q_values, target_q_values)).mean()
    loss.backward()
    optimizer.step()

    #　TD誤差に基づいて, サンプルされた経験の優先度を更新します.
    replay_buffer.update_priorities(indices, (target_q_values - q_values).abs().detach().cpu().numpy())

    return loss.item()

## パラメータ

In [None]:
# Gym環境の定義

num_grid = 4
num_win_seq = 4
win_reward = 10
draw_penalty = 5
lose_penalty = 10
could_locate_reward = 0.01
couldnt_locate_penalty = 0.2
time_penalty = 0.1

def make_game_env():
  player_list = [-1,1]
  first_player = player_list[random.randint(0,1)]
  print("first_player is ",first_player)

  env = ScoreFour3dEnv(
    num_grid=num_grid,
    num_win_seq=num_win_seq, 
    win_reward=win_reward, 
    draw_penalty=draw_penalty,
    lose_penalty=lose_penalty, 
    could_locate_reward=could_locate_reward,
    couldnt_locate_penalty=couldnt_locate_penalty, 
    time_penalty=time_penalty, 
    first_player=first_player
  )
  env = Conv3dObsWrapper(env)
  return first_player, env


In [None]:
"""
    ハイパーパラメータ
"""
gamma = 0.99  #　割引率
batch_size = 1
n_episodes = 100000  # 学習を行うエピソード数
enemy_update_interval = 1000 # 敵のネットワークを更新する間隔(episodeに依存)


"""
  SEED値
"""
SEED = 7


"""
    リプレイバッファの宣言
"""
buffer_size = 200000  #　リプレイバッファに入る経験の最大数
initial_buffer_size = 100  # 学習を開始する最低限の経験の数


"""
    ネットワークの宣言
"""
# SEED値の関係で後述
target_update_interval = 200  # 学習安定化のために用いるターゲットネットワークの同期間隔(episodeに依存)
net_save_interval = 4000 # networkの重みを保存する間隔(episode依存)


"""
  ファインチューニング(事前に学習した重みを読み込む)
"""
do_fine_tuning = False
time = "20210312-141214"
episode = 10000
episode = "weights_{}episodes.pth".format(episode)
load_weights_path = os.path.join(each_dir,"logs",time,"weights",episode)


"""
    ロス関数の宣言
"""
loss_func = nn.SmoothL1Loss(reduction='none')  # ロスはSmoothL1loss（別名Huber loss）
# オプティマイザーはSEEDの関係で後述


"""
    Prioritized Experience Replayのためのパラメータβ(episodeに依存)
"""
beta_begin = 0.2
beta_end = 0.95
beta_decay = n_episodes - 2000
# beta_beginから始めてbeta_endまでbeta_decayかけて線形に増やす
beta_func = lambda episode: min(beta_end, beta_begin + (beta_end - beta_begin) * (episode / beta_decay))


"""
    探索のためのパラメータε(episodeに依存)
"""
epsilon_begin = 1.0
epsilon_end = 0.05
epsilon_decay = n_episodes - 2000
# epsilon_beginから始めてepsilon_endまでepsilon_decayかけて線形に減らす
epsilon_func = lambda episode: max(epsilon_end, epsilon_begin - (epsilon_begin - epsilon_end) * (episode / epsilon_decay))

## 学習(DQN)

In [None]:
# TensorBoardをColab内に起動

tensorboard_path=os.path.join(each_dir,'logs3') # うまくtensorboardがでないときは、logs => logs○○と変えてあげて、後から手動でlogsに移動
%tensorboard --logdir="$tensorboard_path" #--port 6060

In [None]:
# このセル以降のみを実行しても再現性があります。(すべてのセルを実行しなくてよいということ)

# SEED値の固定
fix_seed(SEED)
first_player,env=make_game_env()
env.seed(SEED)
env.action_space.seed(SEED)
obs = env.reset()

# replay_buffer & Network & Optimizerの宣言 (再現性のためにここで定義)
replay_buffer = PrioritizedReplayBuffer(buffer_size)
net = CNNQNetwork(env.observation_space.shape, n_action=env.action_space.n).to(device)
target_net = CNNQNetwork(env.observation_space.shape, n_action=env.action_space.n).to(device)
enemy_net = CNNQNetwork(env.observation_space.shape, n_action=env.action_space.n).to(device)
optimizer = optim.Adam(net.parameters(), lr=1e-4)  # オプティマイザはAdam
torchsummary.summary(net,obs.float().to(device).shape)

# fine-tuning
if do_fine_tuning:
  net.load_state_dict(torch.load(load_weights_path))
  target_net.load_state_dict(torch.load(load_weights_path))
  enemy_net.load_state_dict(torch.load(load_weights_path))
  print("loaded weights")

# logとweightsを保存するフォルダの作成
JST = timezone(timedelta(hours=+9), 'JST')
now = datetime.now(JST)
log_path=os.path.join(tensorboard_path,now.strftime('%Y%m%d-%H%M%S') )
weights_path=os.path.join(log_path,"weights")
os.makedirs(weights_path)

# tensorboard
writer = SummaryWriter(log_dir=log_path)
writer.add_graph(net,obs.float().to(device).unsqueeze(0))

# 記録用にコピーを作成
shutil.copyfile(os.path.join(each_dir,"ScoreFour.ipynb"),os.path.join(log_path,"for_record.ipynb")) # each_dirの直下にScoreFour.ipynbという名前で置くこと
os.chmod(os.path.join(log_path,"for_record.ipynb"),SF_IMMUTABLE)


In [None]:
# 再現性の確保
if (datetime.now(JST)-now).seconds > 10:
  raise Exception("前のセルと同時に実行しないと再現性が確保できません")


info={"turn": first_player, "winner": 0}

AGENT_TURN =  1
ENEMY_TURN = -1

total_step = 0
total_reward = 0
enemy_update = 0

win_num=0
lose_num=0
draw_num=0

for episode in tqdm(range(n_episodes)):
  obs = env.reset()
  done = False

  start_step = total_step

  episode_reward=0
  episode_couldnt_locate_num=0

  while not done:
    sum_reward = 0
    step_done=0

    while step_done != 2:
      if (info["turn"] == AGENT_TURN):
        before_action_obs=obs
        player_action = net.act(obs.float().to(device), epsilon_func(episode)) # ε-greedyで行動を選択
        next_obs, player_reward, done, info = env.step(player_action) # 環境中で実際に行動
        after_action_obs = next_obs
        total_step += 1

        if info["is_couldnt_locate"]==True:
          episode_reward += player_reward
          total_reward += player_reward
          replay_buffer.push([before_action_obs, player_action, player_reward, after_action_obs, done])# 置けなかったときのことを学習させる
          episode_couldnt_locate_num += 1
          # ネットワークを更新
          if len(replay_buffer) > initial_buffer_size:
            loss = update(batch_size, beta_func(episode))
            writer.add_scalar('Loss', loss, total_step)
        else:
          step_done+=1
          sum_reward += player_reward

        if done :
          break

      elif (info["turn"] == ENEMY_TURN):
        if n_episodes/enemy_update_interval < enemy_update and random.random() < 0.7: # 常にgreedyだと一生置けないことがあるため確率性ももたせる
          enemy_action = enemy_net.act_greedy(obs.float().to(device)) # 相手はgreedy方策で行動選択
        else:
          enemy_action = env.action_space.sample() # ランダム
        next_obs, enemy_reward, done, info = env.step(enemy_action) # 環境中で実際に行動
        if info["is_couldnt_locate"]==True:
          pass
        else:
          step_done+=1
        
        if (done):# 相手のcouldnt_locate_penaltyとcould_locate_rewardをsum_rewardに入れないように
          sum_reward -= enemy_reward # 相手が勝利して得た報酬を引く
          break

      obs = next_obs
      
    # リプレイバッファに経験を蓄積
    replay_buffer.push([before_action_obs, player_action, sum_reward, after_action_obs, done])

    episode_reward += sum_reward
    total_reward += sum_reward

    
    # ネットワークを更新
    if len(replay_buffer) > initial_buffer_size:
      loss = update(batch_size, beta_func(episode))
      writer.add_scalar('Loss', loss, total_step)
    
    if done:
      if (info["winner"] == AGENT_TURN):
        win_num+=1
      elif (info["winner"] == ENEMY_TURN):
        lose_num+=1
      else:
        draw_num+=1
  
  episode_step = total_step-start_step

  # 一定エピソードごとにコンソールに出力
  if ((episode+1) % 500 == 0):
    print('Episode: {},  TotalStep: {}, EpisodeStep: {},  EpisodeReward: {}'.format(episode + 1, total_step,episode_step, episode_reward))
  
  # validation
  if ((episode+1) % 2000 == 0):
    val_win_rate, val_draw_rate, val_lose_rate,val_couldnt_rate=validate_random(net,experiment_times=100,first_player=first_player)
    writer.add_scalar('Val-Win-Rate',val_win_rate, episode+1) 
    writer.add_scalar('Val-Draw-Rate', val_draw_rate, episode+1) 
    writer.add_scalar('Val-Lose-Rate', val_lose_rate, episode+1) 
    writer.add_scalar('Val-Couldnt-Locate-Rate', val_couldnt_rate, episode+1) 
    print("Win: {}%, Lose: {}%, Draw: {}%, couldnt: {}%".format(val_win_rate, val_lose_rate,val_draw_rate, val_couldnt_rate))

  # tensorboard用に記録
  writer.add_scalar('Total-Reward', total_reward, episode+1)
  writer.add_scalar('Episode-Reward', episode_reward, episode+1)
  writer.add_scalar('Episode-Step', episode_step, episode+1)
  writer.add_scalar('Win-Rate', win_num/(episode+1)*100, episode+1) 
  writer.add_scalar('Draw-Rate', draw_num/(episode+1)*100, episode+1) 
  writer.add_scalar('Lose-Rate', lose_num/(episode+1)*100, episode+1) 
  writer.add_scalar('Episode-Couldnt-Locate-rate', episode_couldnt_locate_num/episode_step*100, episode+1)
  writer.add_scalar('Epsilon',epsilon_func(episode), episode+1)

  # enemyネットワークを定期的に強くする
  if (episode + 1) % enemy_update_interval == 0:
      enemy_net.load_state_dict(target_net.state_dict())
      enemy_update += 1

  # ターゲットネットワークを定期的に同期させる
  if (episode + 1) % target_update_interval == 0:
      target_net.load_state_dict(net.state_dict())

  # networkの重みを定期的に保存
  if ((episode+1) % net_save_interval == 0):
    torch.save(net.state_dict(), weights_path+"/weights_{}episodes.pth".format(episode+1))

torch.save(net.state_dict(), weights_path+"/weights_final.pth")
writer.close()

## 学習結果の確認

In [None]:
first_player,env=make_game_env()
AGENT_TURN =  1
ENEMY_TURN = -1

isCheckPreWeights = Truedddddddddd
time = "20210319-164323"
logs_folder="logs3"
episode = 72000
episode = "weights_{}episodes.pth".format(episode)
load_weights_path = os.path.join(each_dir,logs_folder,time,"weights",episode)
if do_fine_tuning:
  net.load_state_dict(torch.load(load_weights_path))
  print("loaded weights")
net = CNNQNetwork(env.observation_space.shape, n_action=env.action_space.n).to(device)

In [None]:
experiment_times=300

win_num=0
lose_num=0
draw_num=0

log = []

info={"turn": first_player, "winner": 0}

total_step=0
total_couldnt_step=0

for i in range(experiment_times):
  done=False
  log_child = []
  obs = env.reset()

  # for animation
  episode_cube_history = []
  episode_cube_history.append(np.array(obs.squeeze(0)))

  while not done:
    sum_reward = 0
    step_done=0
    isPreCouldntLocate=False

    while step_done != 2:
      if (info["turn"] == AGENT_TURN):
          if not isPreCouldntLocate: 
            total_step+=1
            player_action = net.act(obs.float().to(device), 0) 
            next_obs, player_reward, done, info = env.step(player_action) # 環境中で実際に行動
            sum_reward += player_reward
            if info["is_couldnt_locate"]==True:
              total_couldnt_step+=1
              isPreCouldntLocate=True
              pass
            else:
              step_done+=1
          else:
            player_action = env.action_space.sample()
            next_obs, player_reward, done, info = env.step(player_action) # 環境中で実際に行動
            if info["is_couldnt_locate"]==True:
              pass
            else:
              info["is_couldnt_locate"]=True
              step_done+=1
          if done :
            episode_cube_history.append(np.array(next_obs.squeeze(0))) # for animation
            break

      elif (info["turn"] == ENEMY_TURN):
        enemy_action = env.action_space.sample() # ランダム
        next_obs, enemy_reward, done, info = env.step(enemy_action) # 環境中で実際に行動
        if info["is_couldnt_locate"]==True:
          pass
        else:
          step_done+=1
        
        if (done):# 相手のcouldnt_locate_penaltyとcould_locate_rewardをsum_rewardに入れないように
          sum_reward -= enemy_reward # 相手が勝利して得た報酬を引く
          episode_cube_history.append(np.array(next_obs.squeeze(0))) # for animation
          break

      obs = next_obs
      if info["is_couldnt_locate"]==False:
        episode_cube_history.append(np.array(obs.squeeze(0))) # for animation

    log_child.append([sum_reward,info])  

  if(info["winner"]==1):
    win_num+=1
  elif(info["winner"]==-1):
    lose_num+=1
  else:
    draw_num+=1

  log.append(log_child)

  # 描画
  #env.render(mode="plot",isClear=False)
  #env.animation(episode_cube_history)

# logの出力
for i in range(len(log)):
  print()
  print(i)
  print("Reward, is_couldnt_locate, Winner")
  for log_child in log[i]:
    print(" {:6.2f}, {}, {}".format(log_child[0],log_child[1]["is_couldnt_locate"],log_child[1]["winner"]))


print()
print("Result ({}times)".format(experiment_times))
print("-------------------")
win_rate=win_num/(experiment_times)*100
draw_rate=draw_num/(experiment_times)*100
lose_rate=lose_num/(experiment_times)*100
couldnt_rate=total_couldnt_step/(total_step)*100
print("win_rate:",win_rate)
print("draw_rate:",draw_rate) 
print("lose_rate:",lose_rate) 
print("couldnt locate rate:",couldnt_rate)
print("sum:",win_rate+draw_rate+lose_rate)

# result.mdの書き出し
#with open(os.path.join(log_path,"result.md"),"w") as f: 
with open(os.path.join(each_dir,logs_folder,time,"result.md"),"w") as f:
  f.write("Result ({}times)\n".format(experiment_times))
  f.write("-------------------\n")
  f.write("win_rate: {}\n".format(win_rate))
  f.write("draw_rate: {}\n".format(draw_rate) )
  f.write("lose_rate: {}\n".format(lose_rate) )
  f.write("sum: {}\n".format(win_rate+draw_rate+lose_rate))

In [None]:

# 記録用にコピーを作成
shutil.copyfile(os.path.join(each_dir,"ScoreFour.ipynb"),os.path.join(log_path,"for_record_result.ipynb")) # each_dirの直下にScoreFour.ipynbという名前で置くこと
os.chmod(os.path.join(log_path,"for_record_result.ipynb"),SF_IMMUTABLE)


## 研究メモ

### エラー対処
- element 0 of tensors does not require grad and does not have a grad_fn
  - Reset Runtime