## [『つくりながら学ぶ！深層強化学習-PyTorchによる実践プログラミング-』](https://www.amazon.co.jp/つくりながら学ぶ！深層強化学習-PyTorchによる実践プログラミング-株式会社電通国際情報サービス-小川雄太郎-ebook/dp/B07DZVRXFK)

上の本のサンプルプログラムを実装することで、以下の様々な強化学習の手法を学ぶ。

| 手法                         | 特徴                                                                                          |
| :--------------------------: | :-------------------------------------------------------------------------------------------: |
| Deep Q Network               | Neural Network を用いて Q-table の値を学習する方法の基本                                      |
| Dual Deep Q Network          | 行動価値関数と行動評価関数を分けることで、学習時の不安定さを取り除く                          |
| Dueling Network              | 行動価値関数Qを、状態に依存する部分と行動に依存する部分に分ける。                             |
| Advantage Actor-Critic (A2C) | 2ステップ以上先まで動かして学習させ（不安定さもある）、さらに行動反復と方策反復を取り入れる。 |

### 参考： [書籍「つくりながら学ぶ！深層強化学習」のサポートリポジトリです](https://github.com/YutaroOgawa/Deep-Reinforcement-Learning-Book)

### Deep Q Network

In [53]:
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
from collections import namedtuple

#=== パラメータの設定 ===
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 500  # 最大試行回数
Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

In [54]:
# main クラス
cartpole_env = Environment(ENV)
cartpole_env.run()

0 Episode: Finished after 19 steps：10試行の平均step数 = 1.9
1 Episode: Finished after 11 steps：10試行の平均step数 = 3.0
2 Episode: Finished after 15 steps：10試行の平均step数 = 4.5
3 Episode: Finished after 10 steps：10試行の平均step数 = 5.5
4 Episode: Finished after 10 steps：10試行の平均step数 = 6.5
5 Episode: Finished after 9 steps：10試行の平均step数 = 7.4
6 Episode: Finished after 10 steps：10試行の平均step数 = 8.4
7 Episode: Finished after 10 steps：10試行の平均step数 = 9.4
8 Episode: Finished after 10 steps：10試行の平均step数 = 10.4
9 Episode: Finished after 11 steps：10試行の平均step数 = 11.5
10 Episode: Finished after 10 steps：10試行の平均step数 = 10.6
11 Episode: Finished after 9 steps：10試行の平均step数 = 10.4
12 Episode: Finished after 9 steps：10試行の平均step数 = 9.8
13 Episode: Finished after 10 steps：10試行の平均step数 = 9.8
14 Episode: Finished after 11 steps：10試行の平均step数 = 9.9
15 Episode: Finished after 11 steps：10試行の平均step数 = 10.1
16 Episode: Finished after 12 steps：10試行の平均step数 = 10.3
17 Episode: Finished after 19 steps：10試行の平均step数 = 11.2
18 Episode: Fini

145 Episode: Finished after 176 steps：10試行の平均step数 = 182.8
146 Episode: Finished after 200 steps：10試行の平均step数 = 183.3
147 Episode: Finished after 200 steps：10試行の平均step数 = 190.2
148 Episode: Finished after 183 steps：10試行の平均step数 = 188.5
149 Episode: Finished after 200 steps：10試行の平均step数 = 192.5
150 Episode: Finished after 200 steps：10試行の平均step数 = 192.5
151 Episode: Finished after 200 steps：10試行の平均step数 = 192.5
152 Episode: Finished after 200 steps：10試行の平均step数 = 195.9
153 Episode: Finished after 200 steps：10試行の平均step数 = 195.9
154 Episode: Finished after 200 steps：10試行の平均step数 = 195.9
155 Episode: Finished after 200 steps：10試行の平均step数 = 198.3
156 Episode: Finished after 200 steps：10試行の平均step数 = 198.3
157 Episode: Finished after 163 steps：10試行の平均step数 = 194.6
158 Episode: Finished after 200 steps：10試行の平均step数 = 196.3
159 Episode: Finished after 200 steps：10試行の平均step数 = 196.3
160 Episode: Finished after 200 steps：10試行の平均step数 = 196.3
161 Episode: Finished after 200 steps：10試行の平均step数 = 196

NotImplementedError: abstract

In [51]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        """
        Neural Network の構築
        @param num_state ：状態変数の種類(x,v,θ,ω)
        @param num_action：行動の種類(right, left)
        """
        self.num_actions = num_actions # 行動の種類
        self.memory = ReplayMemory(CAPACITY) # 経験を記憶するメモリオブジェクト

        #=== Neural Network を構築 ===
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32, 32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))
        #=== 最適化手法の設定 ===
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)

    def replay(self):
        """
        Experience Replayでネットワークの結合パラメータを学習する。
        これによって時系列の相関関係を取り除くとともに、
        Neural Network においてバッチ学習が可能になる。
        """
        # メモリサイズがミニバッチより小さい間は何もしない
        if len(self.memory) < BATCH_SIZE:
            return

        # ミニバッチの作成
        transitions = self.memory.sample(BATCH_SIZE)

        #=== 各変数をミニバッチに対応する形に変形 ===
        # transitionsには、1stepごとの (state, action, state_next, reward) が、BATCH_SIZE 分格納されている。
        # つまり、(state, action, state_next, reward) × BATCH_SIZE
        # これをミニバッチの形に変形したい。つまり、
        # (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする。
        batch = Transition(*zip(*transitions))
        # ============================================
        #   【少し難しいので、 "zip", "*" の使い方サンプル 】
        #   lst = [
        #       [ 1, 2, 3],
        #       [ 4, 5, 6],
        #   ]
        #   for cols in zip(*lst):  for row in lst:
        #       print(cols)             print(row)
        #   >>>(1, 4)               >>>[1, 2, 3]
        #      (2, 5)                  [4, 5, 6]
        #      (3, 6)
        # =============================================

        # 型の変形 (Variable) にし、cat (=cncatenates) で shape も修正。
        state_batch  = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])

        """ ネットワークを推論モードに切り替える """
        # 教師信号となる Q(s_t, a_t)値 を求める。
        self.model.eval() # ネットワークを推論モードに切り替える。

        # ネットワークが出力した Q(s_t, a_t) を求める。
        # なお、self.model(state_batch) は、右左の両方の Q値 を出力しているが、
        # 実行したアクション a_t に対応する next_state しかわからないので、
        # action_batch から実際に行った行動 a_t が右か左かの index を求め、それに対応する Q値 のみを gather で抜き出す。
        state_action_values = self.model(state_batch).gather(1, action_batch)

        # max{Q(s_t+1, a)}値 を求める。ただし、次の状態があるかには注意する！

        # cartpole が done になっておらず、next_state があるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
                                                    batch.next_state)))
        # まずは全部 0 にしておく
        next_state_values = torch.zeros(BATCH_SIZE)

        # 次の状態がある index の最大 Q値:max{Q(s_t+1, a)} を求める。
        # 既存モデルで出力を求め、max(1) で列方向の最大値を求める。
        # これは、[値,index] という形をしているので、index=0 にアクセスし、Q値 を取り出す。(detach)
        next_state_values[non_final_mask] = self.model(
            non_final_next_states).max(1)[0].detach()

        # Q(s_t, a_t) 値を、Q学習の式から求める。これが、正解データとなる。
        expected_state_action_values = reward_batch + GAMMA * next_state_values

        """ ネットワークを訓練モードに切り替える """
        self.model.train()

        # 損失関数を計算する(smooth_l1_loss = Huberloss)
        # expected_state_action_valuesは
        # sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ変更。
        loss = F.smooth_l1_loss(state_action_values,
                                expected_state_action_values.unsqueeze(1))

        # 結合パラメータの更新。
        self.optimizer.zero_grad()  # 勾配をリセット
        loss.backward()  # バックプロパゲーションを計算
        self.optimizer.step()  # 結合パラメータを更新

    def decide_action(self, state, episode):
        """
        関数の概要：現在の状態に応じて、行動を決定する。なお、最適解にとどまらずに探索すべきなので、
        　　　　　　ε-greedy法を利用し、徐々に最適行動のみを採用するようにしていく。
        @param  state  ：現在の状況
        @param  episode：試行の回数を記録する。
        @return action ：選んだ行動([torch.LongTensor of size 1x1]の形)
        """
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            """最適化行動"""
            self.model.eval()  # 推論モードに切り替え
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1, 1)
        else:
            """ランダムチョイス"""
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])

        return action

In [None]:
class Environment:

    def __init__(self, ENV):
        self.env = gym.make(ENV)  # 実行する課題を設定
        num_states = self.env.observation_space.shape[0]  # 課題の状態数を取得
        num_actions = self.env.action_space.n             # 課題の行動数を取得
        self.agent = Agent(num_states, num_actions) # 環境内で行動するAgentを生成

    def run(self):
        '''実行'''
        episode_10_list = np.zeros(10)  # 10 試行分の立ち続けた step 数を格納し、平均を出力する(学習具合を測る指標として)
        complete_episodes = 0  # 195 step 以上連続で立ち続けた試行数
        episode_final = False  # 最後の試行フラグ
        frames = [] # 最後の試行を動画にするために画像を格納する変数

        for episode in range(NUM_EPISODES):
            observation = self.env.reset() # 環境の初期化

            state = observation  # 観測をそのまま状態 s として使用
            state = torch.from_numpy(state).type(torch.FloatTensor)  # PyTorchのテンソルに変換
            state = torch.unsqueeze(state, 0)  # size 4 を size 1x4 に変換

            for step in range(MAX_STEPS):
                if episode_final is True:  # 最終試行ではframesに各時刻の画像を追加していく → gif
                    frames.append(self.env.render(mode='rgb_array'))

                action = self.agent.get_action(state, episode) # 行動を求める

                # 行動 a_t の実行により、s_{t+1} と done フラグ (終了したか) を求める。
                observation_next, _, done, _ = self.env.step(action.item())

                # 次の状態から、報酬や episode の終了評価、next_state の設定を行う。
                if done:
                    next_state = None  # 次の状態はないので、Noneを格納
                    # 直近 10 episode の step 数リストに追加
                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))

                    ''' 「ステップ数が 200 経過」 or 「一定角度以上」 → 「done=True」なので、どちらなのかの判断を行う。'''
                    if step < 195:
                        reward = torch.FloatTensor([-1.0])  # 途中でこけてた場合は、罰則として報酬 -1 を与える。
                        complete_episodes = 0  # 連続成功記録をリセット
                    else:
                        reward = torch.FloatTensor([1.0])  # 立ったまま終了時は報酬 1 を与える
                        complete_episodes = complete_episodes + 1  # 連続記録を更新
                else:
                    reward = torch.FloatTensor([0.0])  # 普段は報酬0
                    next_state = observation_next  # 観測をそのまま状態とする
                    next_state = torch.from_numpy(next_state).type(torch.FloatTensor)  # PyTorchのテンソルに変換
                    next_state = torch.unsqueeze(next_state, 0)  # size 4 を size 1x4 に変換

                # メモリに経験を追加
                self.agent.memorize(state, action, next_state, reward)

                # Experience Replayで Q関数 を更新
                self.agent.update_q_function()

                # 観測の更新
                state = next_state

                # 終了時の処理
                if done:
                    print('%d Episode: Finished after %d steps：10試行の平均step数 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    break

            if episode_final is True:
                # 動画を保存と描画
                display_frames_as_gif(frames)
                break

            # 10連続で200step経ち続けたら成功
            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True  # 次の試行を描画を行う最終試行とする
