# Dueling Q-Network
# DDQNのニューラルネットワーク部分の改良のみで実現できる

# ライブラリ

In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
from collections import namedtuple
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

# 動画に保存する関数

In [None]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def save_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)

    anim.save('DDQN-CartPole.mp4')
    display(display_animation(anim, default_mode='loop'))

In [None]:
Transition = namedtuple('Transition', 'state action next_state reward')

# 学習に使う変数を整理
ENV = 'CartPole-v0'
# 報酬割引率
GAMMA = 0.99
# 1試行（1エピソード）の最大ステップ数
MAX_STEP = 200
# 最大試行回数（エピソード数）
NUM_EPISODES = 1000
# バッチサイズ
BATCH_SIZE = 32
# キャパ
CAPACITY = 10000

In [None]:
# ミニバッチ学習のための経験データを保存するクラス
class ReplayMemory:

    def __init__(self, CAPACITY):
        # メモリ容量
        self.capacity = CAPACITY
        # 経験を保存する
        self.memory = []
        # 保存場所を示す変数
        self.index = 0

    # 経験をメモリに保存する
    def push(self, state, action, next_state, reward):

        # メモリの長さが指定したキャパシティ以下の場合にNoneを追加しておく
        if len(self.memory) < self.capacity:
            self.memory.append(None)

        # 経験をメモリに保存
        self.memory[self.index] = Transition(state, action, next_state, reward)

        # インデックスを1ずらす
        self.index = (self.index + 1) % self.capacity

    # 指定したバッチサイズ分，ランダムに経験を取り出す
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    # memoryの長さを返す
    def __len__(self):
        return len(self.memory)

In [None]:
# ニューラルネットをクラス化する
# Dueling Q-Network用に改良
class NN(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(NN, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)

        # もともとQ関数は状態sで決まる部分と行動aで決まる部分に分離できる
        # そこで Dueling Network．DDQNに比べて少ない試行回数で学習できる

        # Dueling Networkでは出力を2種類用意する
        # 行動aで決まるアドバンテージ関数
        self.fc3_adv = nn.Linear(n_mid, n_out)
        # 状態sで決まる関数
        self.fc3_v = nn.Linear(n_mid, 1)


    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))

        # ReLU関数を通さない
        adv = self.fc3_adv(h2)

        # ReLU関数を通さない
        # advとvalを足し合わせて出力としたいので
        # valのサイズを[minibatch * 1]から[minibatch * 2]に変更
        val = self.fc3_v(h2).expand(-1, adv.size(1))

        # val+advからadvの平均値を引く（列方向に平均を出す）
        out = val + adv - adv.mean(1, keepdim=True).expand(-1, adv.size(1))

        return out


In [None]:
# エージェントが行う行動を与えられた状態によって判断する部分（深層強化学習（DQN）を行う部分）
class Brain:

    # num_states：Gym環境の状態数
    # num_actions：Gym環境のアクション数
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions

        # 経験を保存しておくための変数
        self.memory = ReplayMemory(CAPACITY)

        # ニューラルネットワーク
        n_in, n_mid, n_out = num_states, 32, num_actions

        # DDQNはその名のとおり2つのニューラルネットワークを利用する
        self.main_q_network = NN(n_in, n_mid, n_out)
        self.target_q_network = NN(n_in, n_mid, n_out)


        # 以下はクラスにしたので削除
        # self.model = nn.Sequential()
        # self.model.add_module('fc1', nn.Linear(num_states, 32))
        # self.model.add_module('relu1', nn.ReLU())
        # self.model.add_module('fc2', nn.Linear(32, 32))
        # self.model.add_module('relu2', nn.ReLU())
        # self.model.add_module('fc3', nn.Linear(32, num_actions))
        # print(self.model)

        # ネットワークの形を出力
        print(self.main_q_network)

        # 最適化手法
        # self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)

    # 結合パラメータを学習する部分
    # この関数をいくつかの関数に分割します
    def replay(self):

        # 最初にメモリサイズを確認する
        # 指定したバッチサイズより小さい場合は何もしない
        if len(self.memory) < BATCH_SIZE:
            return

        # ミニバッチ用データを作成（関数化）
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_mini_batch()

        # 教師信号となるQ(s_t, a_t)を求める（関数化）
        self.expected_state_action_values = self.get_expected_state_action_values()

        # 結合パラメータを更新する（関数化）
        self.update_main_q_network()


    # DDQN用に一部変更
    # 現在の状態に応じて行動を決定する
    def decide_action(self, state, episode):
        # ε-greedy法で徐々に最適行動を採用するようにする
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            # 推論モードに
            self.main_q_network.eval()
            # ネットワークの出力の最大値のindexを取得
            # view関数で行列サイズを（1 * 1）に調整
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)

        else:
            # 右，左ランダムに行動する
            # actionは[torch.LongTensor of size 1 * 1]
            action = torch.LongTensor([[random.randrange(self.num_actions)]])

        return action

    # ミニバッチ用データ作成関数
    def make_mini_batch(self):
        # ミニバッチ用のデータを取得（ランダム）
        transitions = self.memory.sample(BATCH_SIZE)

        # transitions は (state, action, next_state, reward) * BATCH_SIZE
        # (state * BATCH_SIZE, action * BATCH_SIZE, next_state * BATCH_SIZE, reward * BATCH_SIZE)
        batch = Transition(*zip(*transitions))

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        # この辺まで

        return batch, state_batch, action_batch, reward_batch, non_final_next_states


    # 教師信号となるQ(s_t, a_t)を求める関数
    def get_expected_state_action_values(self):

        # （手順1）ニューラルネットワークを推論モードに
        self.main_q_network.eval()
        self.target_q_network.eval()

        # （手順2）ネットワークが出力したQ（s_t, a_t）を求める
        # self.model(state_batch)は，2つのQ値を出力する
        # [torch.FloatTensor of size BATCH_SIZE * 2]になってるので
        # 実行したアクション（a_t）に対応するQ値をaction_batchで行った行動a_tのindexを使って取得する
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)


        # （手順3）max{Q(s_t+1, a)}を求める
        # CartPole がdoneになっていない，かつ，next_stateがあるかをチェックするためのインデックスマスクを作成

        # 先週の授業ではByteTensorを使ったのですが，非推奨になっていたのでBoolTensorに変更します
        # non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))
        non_final_mask = torch.BoolTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))

        # いったんすべてを0にしておく
        next_state_values = torch.zeros(BATCH_SIZE)
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

        # 次の状態での最大Q値の行動a_mをmain_q_networkから求める
        a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]

        # 次の状態があるものだけにフィルターしてサイズを揃える
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

        # 次の状態があるindexの行動a_mのQ値をtarget_q_networkから求める
        # squeeze()で size[minibatch * 1] を [minibatch]にする
        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        # Q学習の行動価値関数更新式から教師信号Q(S_t, a_t)を求める
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values

        return expected_state_action_values


    # mainネットワークの結合パラメータを更新
    def update_main_q_network(self):
        # モデルを訓練モードに切り替え
        self.main_q_network.train()

        # 二乗誤差の代わりにHuber関数を使う
        loss = F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))

        # 勾配をリセット
        self.optimizer.zero_grad()
        # 誤差逆伝搬
        loss.backward()
        # ニューラルネットワークの重み更新
        self.optimizer.step()


    # targetネットワークを更新
    def update_target_q_network(self):
        # mainと同じにする
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())

In [None]:
# エージェントクラス
class Agent:
    def __init__(self, num_states, num_actions):
        # Brainクラスをインスタンス化
        self.brain = Brain(num_states, num_actions)

    # Q関数の更新
    def update_q_function(self):
        self.brain.replay()

    # targetネットワークを更新
    def update_target_q_function(self):
        self.brain.update_target_q_network()

    # アクションを決定する
    def get_action(self, state, episode):
        action = self.brain.decide_action(state, episode)
        return action

    # 状態を保存
    def memorize(self, state, action, next_state, reward):
        self.brain.memory.push(state, action, next_state, reward)

In [None]:
# CartPoleを実行する環境クラス
class Environment:

    def __init__(self):
        self.env = gym.make(ENV)
        num_states = self.env.observation_space.shape[0]
        num_actions = self.env.action_space.n
        self.agent = Agent(num_states, num_actions)

    def run(self):

        episode_10_list = np.zeros(10) # 10試行分の立ち続けた平均ステップ数の出力に使う
        complete_episodes = 0  # 195step以上連続で立ち続けた試行数
        is_episode_final = False  # 最終試行フラグ
        frames = []  # 動画用に画像を格納する変数

        # 全エピソードループ
        for episode in range(NUM_EPISODES):
            # エピソード毎に環境を初期化
            observation = self.env.reset()

            state = observation

            # numpyからpytorchのテンソルに変換
            state = torch.from_numpy(state).type(torch.FloatTensor)

            # size を 1*4 に変換
            state = torch.unsqueeze(state, 0)

            for step in range(MAX_STEP):

                # 最終試行はframesに画像を追加しておく
                if is_episode_final:
                    frames.append(self.env.render(mode='rgb_array'))

                # 最初の行動を決める
                action = self.agent.get_action(state, episode)

                # 最初の行動から次の状態を求める
                observation_next, _, done, _ = self.env.step(action.item())

                # 報酬を与える
                if done:
                    # 次の状態はないのでNoneを代入
                    state_next = None

                    # 直前10エピソードで立てた平均ステップ数を格納
                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))

                    if step < 195:
                        # こけたら報酬-1
                        reward = torch.FloatTensor([-1.0])
                        # 連続成功回数を0にリセット
                        complete_episodes = 0
                    else:
                        # 立ったまま終了した場合は報酬1
                        reward = torch.FloatTensor([1.0])
                        # 連続成功回数をインクリメント
                        complete_episodes += 1
                else:
                    # 途中の報酬は0
                    reward = torch.FloatTensor([0.0])
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)

                # メモリに経験を追加
                self.agent.memorize(state, action, state_next, reward)

                # Q関数をニューラルネットで更新
                self.agent.update_q_function()

                # 状態を次の状態に更新
                state = state_next

                # エピソード終了時
                if done:
                    print('{0}エピソード: {1}ステップで終了'.format(episode, step + 1))

                    # DDQNの処理を追加
                    # 2エピソード毎にtargetネットワークを更新する
                    if episode % 2 == 0:
                        self.agent.update_target_q_function()

                    break

            # 最終エピソードの場合は動画を保存
            if is_episode_final:
                save_as_gif(frames)
                break

            # 10回連続で成功したら、次のエピソードで終わりにする
            if complete_episodes >= 10:
                print('10回連続成功')
                is_episode_final = True


In [None]:
cartpole_env = Environment()
cartpole_env.run()