## **概要**

AlphaZeroのアルゴリズムは以下のような手順で構成される：

1. 自己対戦による学習: AlphaZeroは、自分自身と対戦しながら学習する。ランダムな手法からスタートし、強化学習アルゴリズムを用いて次第に強くなる。

2. モンテカルロ木探索: AlphaZeroは、モンテカルロ木探索（MCTS）を使用してゲームのツリーを探索する。これにより、最適な手を見つけるために局面を評価し、次の手を決定する。

3. ニューラルネットワーク: AlphaZeroは、ゲームの状態を評価するためのニューラルネットワークを使用する。このニューラルネットワークは、ゲームの局面を入力とし、局面の価値を出力する。価値は、勝率や局面の良さなどを示す。

4. 強化学習: AlphaZeroは、報酬を最大化するようにニューラルネットワークを調整することで学習します。報酬は、ゲームの勝利や敗北などによって与えられます。

## **オセロのルールを反映させたクラス**

In [7]:
# オセロの状態を表すクラス(構造体)
# 8x8の盤面を持ち、各マスには黒か白か空かのいずれかが入る
# 盤面の状態を配列で表現する
class Board():
    # 8x8の配列を初期化する
    def __init__(self):
        self.board = [[0 for i in range(8)] for j in range(8)]
        self.board[3][3] = 1    # 白(◯)
        self.board[4][4] = 1    # 白
        self.board[3][4] = -1    # 黒(×)
        self.board[4][3] = -1    # 黒
        self.player = -1         # 黒が先手
    # 盤面の配列を取得する
    def getBoard(self):
        return self.board
    # 盤面の状態を表示する
    def show(self):
        for row in self.board:
            print("|", end="")
            for cell in row:
                if cell == 0:
                    print(" ", end="")  # end="" で改行しない
                elif cell == 1:
                    print("o", end="")
                elif cell == -1:
                    print("x", end="")
            print("|")                     # 一行表示して改行
        print("player: o") if self.player == 1 else print("player: x")
    # ファイルに盤面の状態を書き込む
    def fshow(self, filename):
        with open(filename, "w") as f:
            for row in self.board:
                f.write("|")
                for cell in row:
                    if cell == 0:
                        f.write(" ")
                    elif cell == 1:
                        f.write("o")
                    elif cell == -1:
                        f.write("x")
                    f.write(str(cell))
                f.write("|\n")
            if self.player == 1:
                f.write("player: o")
            else:
                f.write("player: x")
    # 置く場所が盤面上にあるか判定する
    def outofBoard(self, x, y):
        return x < 0 or x >= 8 or y < 0 or y >= 8
    # (x, y)に石を置くとき、(dx, dy)方向にひっくり返す石があるか判定する
    def canPutDir(self, x, y, dx, dy):
        i = 1
        if self.outofBoard(x + dx, y + dy):
            return False
        else:
            while self.board[y + dy * i][x + dx * i] == -(self.player):    # 相手の石が続く限り
                i += 1
                if self.outofBoard(x + dx * i, y + dy * i):                 # 盤面外に出たら
                    return False
                elif self.board[y + dy * i][x + dx * i] == 0:               # 石がない場所があれば
                    return False
                elif self.board[y + dy * i][x + dx * i] == self.player:     # 自分の石で挟めるなら
                    return True
    # (x, y)に石を置けるか判定する
    def canPut(self, x, y):
        if not self.outofBoard(x,y) and self.board[y][x] != 0:                                           # すでに石が置かれている
            return False
        # 周囲8方向を調べる; 1方向でもひっくり返せるなら置ける
        for dx in range(-1, 2):
            for dy in range(-1, 2):
                if dx == 0 and dy == 0: 
                    continue
                if self.canPutDir(x, y, dx, dy):                             # (dx, dy)方向にひっくり返せる石がある
                    return True
        return False
    # (x, y)に石を置いたときの盤面クラスを返す
    def put(self, x, y):
        newBoard = Board()
        newBoard.board = [row[:] for row in self.board] # 盤面をコピー
        newBoard.player = -self.player   # プレイヤーを交代
        newBoard.board[y][x] = self.player  # 石を置く
        # ひっくり返せる石を全てひっくり返す
        for dx in range(-1, 2):
            for dy in range(-1, 2):
                if dx == 0 and dy == 0:
                    continue
                if self.canPutDir(x, y, dx, dy):
                    i = 1
                    while newBoard.board[y + dy * i][x + dx * i] == -self.player:
                        newBoard.board[y + dy * i][x + dx * i] = self.player    # 石をひっくり返す
                        i += 1
        return newBoard
    # 石を置ける場所があるか判定する
    def canPlay(self):
        for y in range(8):
            for x in range(8):
                if self.canPut(x, y):
                    return True
        return False
    # 石を置ける場所と置いた場合の盤面クラスを辞書型で返す
    def choices(self):          # 動的計画法において、for choice in board.choices(): で呼び出す
        choices = {}
        for y in range(8):
            for x in range(8):
                if self.canPut(x, y):
                    choices[(x, y)] = self.put(x, y)
        return choices
    # 石を置ける場所がなく場合にプレイヤーを交代するか判定し、交代する (交代する場合は新たな盤面を返す)
    def passPlayer(self):   # Board.passPlayer()で呼び出す
        if not self.canPlay():
            # 新たな盤面を作成し、プレイヤーを交代する
            newBoard = Board()
            newBoard.board = [row[:] for row in self.board] # 盤面をコピー
            newBoard.player = -self.player
            return newBoard
    # 勝負がついたか判定する
    def isEnd(self):
        if not self.canPlay() and not self.passPlayer().canPlay():
            print("Game Over")
            self.winner()
            return True
        else:
            return False
    # 石の数を数え、勝敗を判定する
    def count(self):
        black = 0
        white = 0
        for row in self.board:
            for cell in row:
                if cell == 1:
                    white += 1
                elif cell == -1:
                    black += 1
        return black, white
    def winner(self):
        if self.isEnd() == False:
            return
        black = self.board.count(-1)
        white = self.board.count(1)
        if black > white:
            print("Black(×) wins!")
            return 1
        elif black < white:
            print("White(◯) wins!")
            return -1
        else:
            print("Draw")
            return 0

### **実際にプレイしてみる**

In [5]:
from IPython.display import clear_output
import time

# ゲームをプレイする
board = Board()
while not board.isEnd():
    time.sleep(1.0)                                     # 0.5秒待つ
    clear_output()                                      # 画面をクリア
    board.show()                                        # 盤面を表示
    if board.canPlay():
        print("Put coordinate x, y: ", end="")
        x, y = map(int, input().split())                # 入力を受け取る
        print("(", x, ", ", y,")")                      # 入力を表示
        if board.canPut(x, y):
            board = board.put(x, y)
        else:
            print("Can't put")
    else:
        board = board.passPlayer()                      # 石を置けない場合はプレイヤーを交代

|        |
|        |
|        |
|   ox   |
|   xo   |
|        |
|        |
|        |
player: x
Put coordinate x, y: 

ValueError: not enough values to unpack (expected 2, got 0)

## **探索アルゴリズム**

**Monte-Carlo-Tree-Search(MCTS)**による学習アルゴリズムを実装する。

### **MCTSのアルゴリズム**

参考文献：[Monte Carlo Tree Search - Wikipedia](https://en.wikipedia.org/wiki/Monte_Carlo_tree_search)

モンテカルロ木探索は4つのステップからなる。

Nodeは、ゲームの状態を表し、プレイアウトの回数と勝利の回数を保持している。

1. 選択：Root Node から始めて、Leaf Node にたどり着くまで、子ノードを選択し続ける。Root Node が現在のゲームの状態で、Leaf Node はシミュレーションが行われていないノード。より有望な方向に木が展開していくように、子ノードの選択を片寄らせる方法を採用するため、以下の Alpha Zero 指標を用いる。
2. 展開：Leaf Node が勝負を決するノードでない限り、Leaf Node から有効手の子ノードの中から Child Node を1つ選ぶ。
3. シミュレーション：Child Node からプレイアウトを行う。Alpha Zero 指標に基づいてプレイアウトを実行する。
4. バックプロパゲーション：Child Node から Root Node へのパスに沿って、プレイアウトの結果を伝搬する。

### **Alpha Zero 指標**

Alpha Zero 指標は、探索と活用のバランスを取る指標である。

$ Q(s, a) + U(s, a) $

が最大となる行動を選択する。ここで、$Q(s, a)$ は状態 $s$ で行動 $a$ を取った時の行動価値関数 $(=V(s'))$、$U(s, a)$ は探索項である。

$ U(s, a) = c(s) P(s, a) \frac{\sqrt{\sum_{b} N(s, b)}}{1 + N(s, a)} $

ここで、$c(s)$ は下記の式で定まる数で、$P(s, a)$ は方策ネットワークによる行動 $a$ の選択確率、$N(s, a)$ は状態 $s$ で行動 $a$ を選択した回数、$\sum_{b} N(s, b)$ は状態 $s$ に到達した回数である。

$ c(s) = c_{init} + \log{\frac{1 + N(s) + c_{base}}{c_{base}}} $

ここで、$c_{base}$ はハイパーパラメータであり、$N(s)$ は状態 $s$ に到達した回数である。

### **Q関数の更新**

チェスは、遷移確率が one-hot つまり $P(s'|s,a)=1$ であるため、状態価値関数 $V(s)$ と行動価値関数 $Q(s, a)$ は同じものとして扱うことができる。したがって、
盤面の状態を64次元のベクトル(とる値は ${-1,0,1}$ )で表現し、それに対して有利不利を $[-1,1]$ の範囲で出力する状態行動価値関数 $Q(s,a)=V(s')$ を学習する。

In [2]:
# Monte-Carlo Tree のノードを表すクラス
class MCTSNode:
    def __init__(self, board):
        self.board = board          # Boardクラス
        self.parent = None          # 親ノード
        self.children = []          # 子ノード
        self.visits = 0             # ノードを訪れた回数
        self.score = 0              # ノードの評価値
    # ノードの展開(子ノードを追加)
    def expand(self):
        if self.board.isEnd():          # ゲーム終了時は展開しない
            return
        elif len(self.children) > 0:    # 既に展開済みの場合は展開しない
            return
        else:
            choices = self.board.choices()              # 石を置ける場所と置いた場合の盤面クラスを取得(Boardクラスのchoicesメソッド)
            for choice, board in choices.items():       # 石を置ける場所全てについて、子ノードに追加
                node = MCTSNode(board)
                node.parent = self
                self.children.append(node)
    # 子ノードの選択確率を計算し、石を置いた場所にその確率を当てはめた二次元配列を返す
    def probabilities(self):
        probs = [[0 for i in range(8)] for j in range(8)]
        for child in self.children:
            x, y = child.board.choices()    # 石を置いた場所を取得
            probs[y][x] = child.visits/self.visits    # 確率を計算
        # 確率の合計がほぼ1であることを確認し、あまりにも差がある場合には、エラーを出力
        if abs(sum([sum(prob) for prob in probs]) - 1) > 0.01:
            print("Error: sum(prob) != 1")
        return probs

In [3]:
import numpy as np
# mctsNodeで、choiceを選択することの評価値(行動価値関数と探索項の和)を返す
def alpha_zero_score(mctsNode, key, mcTree, Policy):
    # ハイパーパラメータ
    c_base = 1000
    c_init = 1.25
    # 行動価値関数
    value = 0                           # 0を行動価値関数Q(mctsNode.board, choice)に置き換える
    # ノードの探索項を計算
    N = mctsNode.visits                                 # mctsNodeを訪れた回数
    childNode = mcTree.move_cop(key)                    # keyの手を打ったときのノードを取得
    Nc = childNode.visits                               # mctsNodeからchoiceを選択した回数
    C = c_init + np.log((N + c_base + 1) / c_base)      # ノードの訪問回数で決まる係数
    exploration = C * np.sqrt(N) / (1 + Nc) * Policy(mctsNode.board, key)          # 探索項
    # ノードの評価値と探索項の和を返す
    return value + exploration

In [6]:
import math

# Monte-Carlo Tree のクラス
class MCTree:
    def __init__(self, board):
        self.root = MCTSNode(board)                 # ルートノードを作成
        self.current = self.root                    # 現在のノード
    # ノードの選択
    def select(self, Policy):
        node = self.root
        choices = self.current.board.choices()              # 石を置ける場所と置いた場合の盤面を取得
        # alpha_zero_score()を用いて、評価値が最大の子ノードを選択
        max_score = -math.inf
        for choice, board in choices.items():
            for child in node.children:
                if child.board == board:
                    score = alpha_zero_score(node, choice, self, Policy)
                    if score > max_score:
                        max_score = score
                        node = child
        return node
    # ノードの移動(クラスをコピーせず、現在のノードを変更する)
    def move_cur(self, key):                        # key: 石を置く場所を指定
        # 現在のノードに対して、keyに石を置いた場合の盤面を辞書のキーから検索
        choices = self.current.board.choices()              # 石を置ける場所と置いた場合の盤面を取得
        for choice, board in choices.items():
            if choice == key:                               # keyに石を置いた場合の盤面クラスを子ノードから見つけたら
                for child in self.current.children:
                    if child.board == board:                # 盤面クラスの一致を確認
                        self.current = child
                        self.path.append(child)
                        return
    # ノードの移動(クラスをコピーして、現在のノードを変更する)
    def move_cop(self, key):                       # key: 石を置く場所を指定
        # 現在のノードに対して、keyに石を置いた場合の盤面を辞書のキーから検索
        choices = self.current.board.choices()              # 石を置ける場所と置いた場合の盤面を取得
        for choice, board in choices.items():
            if choice == key:                               # keyに石を置いた場合の盤面クラスを子ノードから見つけたら
                for child in self.current.children:
                    if child.board == board:                # 盤面クラスの一致を確認
                        self.current = child
                        self.path.append(child)
                        return child

## **自己対戦クラスの実装**

プレイヤークラスのメソッドは以下の通りです。

#### **MC Tree の初期化**

#### **経験の獲得**

今回は、行動毎に報酬を得るわけではなく、１ゲームをプレーした後に報酬を得ることに注意が必要である。さらには、バッチサイズ分ゲームをプレーして、それに対する

#### **MC Tree の更新**

実際のプレイヤーの行動とそれに対する報酬(勝利:1, 敗北:-1, 引き分け:0)を受け取り、MC Tree に追加します。

#### **状態行動価値関数の初期化**

状態行動価値関数 $Q_{\theta}(s_t, a_t)$ について、初期化する。

#### **方策の更新**

基本的には、マルチステップ方策評価・マルチステップ方策更新を行う。その際には、Greedy Policy とするか、$\epsilon$-Greedy Policy 、あるいは Softmax Policy とするかを指定する。

#### **行動の選択**
自分のターンが来たら、方策に従って行動を選択する。決定論的な方策を取るか、確率的な方策を取るかは、方策の種類次第になる。

In [8]:
class Player:
    def __init__(self, Policy):
        self.Policy = Policy            # 方策NN            
        self.path = [[]]                # プレイしたノード(MCTSNodeクラス)の履歴
        self.nodes = []                 # ミニバッチ全体で一度でもプレイしたノード(MCTSNodeクラス)のリスト
        self.cursor = 0                 # ミニバッチにおける現在のインデックス
        self.reward = [0,0]             # [黒の報酬、白の報酬](勝利: 1, 引き分け: 0.5, 敗北: 0)
    # 1手プレイする
    def oneplay(self, mctree, filename):
        # モンテカルロ木探索
        if len(mctree.current.children)==0:             # 子ノードがない場合は展開
            mctree.current.expand()
        mctree.current = mctree.select(self.Policy)     # ノードの選択と現ノードの変更
        self.path[self.cursor].append(mctree.current)                # 選択したノードを記録
        # self.nodesに進んだ先のノードがなければ追加
        if mctree.current not in self.nodes:
            self.nodes.append(mctree.current)
        # 盤面をファイルに書き込む
        mctree.current.board.fshow(filename)
    # 1ゲーム自己対戦する
    def play1game(self, mctree):
        board = Board()
        while not board.isEnd():
            self.oneplay(mctree)
        if board.winner() == 1:
            self.reward[0] += 1
        elif board.winner() == -1:
            self.reward[1] += 1
        else:
            self.reward[0] += 0.5
            self.reward[1] += 0.5
    # プレイしたノードの訪問回数と評価値を更新する
    def backup(self):
        for node in self.path[self.cursor]:                  # プレイしたノード全てについて
            node.visits += 1
            if node.board.player == 1:          # そのノードでプレイしたのが黒の場合
                node.score += self.reward[0]
            else:                               # そのノードでプレイしたのが白の場合
                node.score += self.reward[1]
    # モンテカルロ・シミュレーションのサンプルサイズ分、1ゲーム自己対戦・backupする
    def play1batch(self, mctree, sample_size):
        for i in range(sample_size):
            self.play1game(mctree)
            self.backup()
            if i != sample_size - 1:
                self.cursor += 1
                self.path.append([])
    # self.nodesを、visit回数の降順にソートする
    def sort_nodes(self):
        self.nodes.sort(key=lambda x: x.visits, reverse=True)

## **方策ネットワーククラスの実装**

In [16]:
# pytorchのライブラリをインポート
import torch.nn as nn
import torchvision
import torch.optim as optim

trans = torchvision.transforms.ToTensor()
criterion = nn.CrossEntropyLoss()

class p_net(nn.Module):
    def __init__(self):
        super().__init__()  # 親クラスのnn.Moduleを呼び出し
        self.features = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=5, padding=2, padding_mode='replicate'),    # out_channelsは欲しい特徴マップの数
            nn.ReLU(),
            nn.Conv2d(4, 16, kernel_size=5, padding=2, padding_mode='replicate'),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding=1, padding_mode='replicate'),
            nn.ReLU(),
            nn.Conv2d(16, 4, kernel_size=3, padding=1, padding_mode='replicate'),
            nn.ReLU(),
            nn.Conv2d(4, 1, kernel_size=1),
            nn.Softmax(dim=1)               # 確率分布に変換
        )
        self.classifier = nn.Sequential(
            nn.Linear(64, 64),              # 全結合層
            nn.ReLU(),
            nn.Linear(64, 64),
        )
    # データの準備
    def prepare(self, nodes):
        # ミニバッチのデータを作成
        x = []  # 盤面の状態(入力)
        y = []  # 方策(出力)
        for node in nodes:
            x_item = node.board.getBoard()
            # 盤面の状態をndarray型に変換し、全ての要素を+1して、float型に変換
            x_item = np.array(x_item)           # ndarray型に変換
            if node.board.player == -1:
                x_item = x_item * -1
            x_item = x_item + 1
            x_item.tolist()                     # リストに変換
            x.append(x_item)
            # 教師信号は、mcTreeにおいて、選択されたノードに対する、子ノードの選択確率(ここでは、石を置いた場所に確率を割り当てる)
            y.append(node.probabilities())
        x = np.array(x)
        # 方策をndarray型に変換
        y = np.array(y)
        # ミニバッチのデータをpytorchのテンソルに変換
        input = trans(x)
        target = trans(y)
        # モデル、オプティマイザ、エポック数、バッチサイズを設定
        optimizer = optim.Adam(self.parameters(), lr = 0.001, weight_decay = 5e-4)
        num_epochs = 10
        return input, target, optimizer, num_epochs
    # 盤面の状態を入力として、方策を返す
    def forward(self, input):
        hidden = self.features(input)
        hidden = hidden.view(hidden.size(0),-1)  # x.size(0)は例えばnum_batches
        output = self.classifier(hidden)
        return output
    # 実際の石の置き場所と方策との交差エントロピー誤差を損失関数とする
    def backprop(self, output, optimizer, criterion, target):
        loss = criterion(output,target)
        loss.backward()
        optimizer.step()
        # return loss(output, target)
    

### **学習の一例**

In [None]:
model = p_net()             # 方策NNを作成
batch_size = 64             # バッチサイズ
sample_size = 2000          # サンプルサイズ(モンテカルロ・シミュレーションで確率分布を求めるためのサンプル数)  *スケジューリングで変更したい
board = Board()             # 盤面を初期化
mctree = MCTree(board)      # モンテカルロ探索木を初期化
# 自己対戦プレイヤーを作成
player = Player(model)      
player.play1batch(mctree, sample_size)  # サンプルサイズ分、自己対戦を行い、学習する
player.sort_nodes()         # プレイしたノードを訪問回数の降順にソート
# player.nodesをミニバッチに分割
cursor = len(player.nodes)
node_groups = []
while cursor > batch_size:
    cursor -= batch_size
    nodes = player.nodes[cursor:cursor+batch_size]
    node_groups.append(nodes)
for nodes in node_groups:
    input, target, optimizer, num_epochs = model.prepare(nodes)
    output = model.forward(input)
    model.backprop(output, optimizer, criterion, target)
    

# input, target をミニバッチに分割

## **価値ネットワーククラスの実装**