## 6.2 PyTorchでDDQN
オリジナルではクラス化されているが、検証のしやすさや勉強のためにすべて関数のみで書くことにする

In [1]:
# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [2]:
# 動画の描画関数の宣言
# 参考URL http://nbviewer.jupyter.org/github/patrickmineault
# /xcorr-notebooks/blob/master/Render%20OpenAI%20gym%20as%20GIF.ipynb
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
               dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)

    anim.save('movie_cartpole_DDQN.mp4')  # 動画のファイル名と保存です
    display(display_animation(anim, default_mode='loop'))


In [3]:
# namedtupleを生成
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))


In [4]:
# 定数の設定
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 500  # 最大試行回数


In [5]:
env = gym.make(ENV)  # 実行する課題を設定
num_states = env.observation_space.shape[0]  # 課題の状態と行動の数を設定
num_actions = env.action_space.n  # CartPoleの行動（右に左に押す）の2を取得
# 環境内で行動するAgentを生成
# agent = Agent(num_states, num_actions)

  result = entry_point.load(False)


In [6]:
# ディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_out)

    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output


In [7]:
# ニューラルネットワークを構築
n_in, n_mid, n_out = num_states, 32, num_actions
main_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
target_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
print(main_q_network)  # ネットワークの形を出力

# 最適化手法の設定
optimizer = optim.Adam(
    main_q_network.parameters(), lr=0.0001)

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)


In [8]:
def make_minibatch(memory):
    '''2. ミニバッチの作成'''

    # 2.1 メモリからミニバッチ分のデータを取り出す
    transitions = random.sample(memory, BATCH_SIZE)

    # 2.2 各変数をミニバッチに対応する形に変形
    # transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
    # つまり、(state, action, state_next, reward)×BATCH_SIZE
    # これをミニバッチにしたい。つまり
    # (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
    batch = Transition(*zip(*transitions))

    # 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
    # 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
    # それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
    # 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
    # catはConcatenates（結合）のことです。
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                       if s is not None])

    return batch, state_batch, action_batch, reward_batch, non_final_next_states

In [9]:
def get_expected_state_action_values(batch, state_batch, action_batch, reward_batch, non_final_next_states):
    '''3. 教師信号となるQ(s_t, a_t)値を求める'''

    # 3.1 ネットワークを推論モードに切り替える
    main_q_network.eval()
    target_q_network.eval()

    # 3.2 ネットワークが出力したQ(s_t, a_t)を求める
    # model(state_batch)は、右左の両方のQ値を出力しており
    # [torch.FloatTensor of size BATCH_SIZEx2]になっている。
    # ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
    # それに対応するQ値をgatherでひっぱり出す。
    state_action_values = main_q_network(
        state_batch).gather(1, action_batch)

    # 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

    # cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
    non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
                                                batch.next_state)))
    # まずは全部0にしておく
    next_state_values = torch.zeros(BATCH_SIZE)

    a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

    # 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
    # 最後の[1]で行動に対応したindexが返る
    a_m[non_final_mask] = main_q_network(
        non_final_next_states).detach().max(1)[1]

    # 次の状態があるものだけにフィルターし、size 32を32×1へ
    a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

    # 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
    # detach()で取り出す
    # squeeze()でsize[minibatch×1]を[minibatch]に。
    next_state_values[non_final_mask] = target_q_network(
        non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

    # 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
    expected_state_action_values = reward_batch + GAMMA * next_state_values

    return state_action_values, expected_state_action_values

In [10]:
def update_main_q_network(state_action_values, expected_state_action_values):
    '''4. 結合パラメータの更新'''

    # 4.1 ネットワークを訓練モードに切り替える
    main_q_network.train()

    # 4.2 損失関数を計算する（smooth_l1_lossはHuberloss）
    # expected_state_action_valuesは
    # sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
    loss = F.smooth_l1_loss(state_action_values,
                            expected_state_action_values.unsqueeze(1))

    # 4.3 結合パラメータを更新する
    optimizer.zero_grad()  # 勾配をリセット
    loss.backward()  # バックプロパゲーションを計算
    optimizer.step()  # 結合パラメータを更新

In [11]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

def replay(memory):
    '''Experience Replayでネットワークの結合パラメータを学習'''

    # 1. メモリサイズの確認
    if len(memory) < BATCH_SIZE:
        return

    # 2. ミニバッチの作成
    batch, state_batch, action_batch, reward_batch, non_final_next_states = make_minibatch(memory)

    # 3. 教師信号となるQ(s_t, a_t)値を求める
    state_action_values, expected_state_action_values = get_expected_state_action_values(batch, state_batch, action_batch, reward_batch, non_final_next_states)

    # 4. 結合パラメータの更新
    update_main_q_network(state_action_values, expected_state_action_values)

In [12]:
def decide_action(state, episode):
    '''現在の状態に応じて、行動を決定する'''
    # ε-greedy法で徐々に最適行動のみを採用する
    epsilon = 0.5 * (1 / (episode + 1))

    if epsilon <= np.random.uniform(0, 1):
        main_q_network.eval()  # ネットワークを推論モードに切り替える
        with torch.no_grad():
            action = main_q_network(state).max(1)[1].view(1, 1)
        # ネットワークの出力の最大値のindexを取り出します = max(1)[1]
        # .view(1,1)は[torch.LongTensor of size 1]　を size 1x1 に変換します

    else:
        # 0,1の行動をランダムに返す
        action = torch.LongTensor(
            [[random.randrange(num_actions)]])  # 0,1の行動をランダムに返す
        # actionは[torch.LongTensor of size 1x1]の形になります

    return action

In [13]:
def push(memory, index, state, action, state_next, reward):
    '''transition = (state, action, state_next, reward)をメモリに保存する'''

    if len(memory) < CAPACITY:
        memory.append(None)  # メモリが満タンでないときは足す

    # namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
    memory[index] = Transition(state, action, state_next, reward)

    index = (index + 1) % CAPACITY  # 保存するindexを1つずらす
    return memory, index

In [14]:
memory = []
episode = 1
index = 0
'''実行'''
episode_10_list = np.zeros(10)  # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
complete_episodes = 0  # 195step以上連続で立ち続けた試行数
episode_final = False  # 最後の試行フラグ
frames = []  # 最後の試行を動画にするために画像を格納する変数

for episode in range(NUM_EPISODES):  # 試行数分繰り返す
    observation = env.reset()  # 環境の初期化

    state = observation  # 観測をそのまま状態sとして使用
    state = torch.from_numpy(state).type(
        torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
    state = torch.unsqueeze(state, 0)  # size 4をsize 1x4に変換

    for step in range(MAX_STEPS):  # 1エピソードのループ

        # 動画描画をコメントアウトしています
        if episode_final is True:  # 最終試行ではframesに各時刻の画像を追加していく
            frames.append(env.render(mode='rgb_array'))

        action = decide_action(state, episode)  # 行動を求める

        # 行動a_tの実行により、s_{t+1}とdoneフラグを求める
        # actionから.item()を指定して、中身を取り出す
        observation_next, _, done, _ = env.step(
            action.item())  # rewardとinfoは使わないので_にする

        # 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
        if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
            state_next = None  # 次の状態はないので、Noneを格納

            # 直近10episodeの立てたstep数リストに追加
            episode_10_list = np.hstack(
                (episode_10_list[1:], step + 1))

            if step < 195:
                reward = torch.FloatTensor(
                    [-1.0])  # 途中でこけたら罰則として報酬-1を与える
                complete_episodes = 0  # 連続成功記録をリセット
            else:
                reward = torch.FloatTensor([1.0])  # 立ったまま終了時は報酬1を与える
                complete_episodes = complete_episodes + 1  # 連続記録を更新
        else:
            reward = torch.FloatTensor([0.0])  # 普段は報酬0
            state_next = observation_next  # 観測をそのまま状態とする
            state_next = torch.from_numpy(state_next).type(
                torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
            state_next = torch.unsqueeze(state_next, 0)  # size 4をsize 1x4に変換

        # メモリに経験を追加
        memory, index = push(memory, index, state, action, state_next, reward)

        # Experience ReplayでQ関数を更新する
        replay(memory)

        # 観測の更新
        state = state_next

        # 終了時の処理
        if done:
            print('%d Episode: Finished after %d steps：10試行の平均step数 = %.1lf' % (
                episode, step + 1, episode_10_list.mean()))

            # DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
            if(episode % 2 == 0):
                target_q_network.load_state_dict(main_q_network.state_dict())
            break


    if episode_final is True:
        # 動画描画をコメントアウトしています
        # 動画を保存と描画
        #display_frames_as_gif(frames)
        break

    # 10連続で200step経ち続けたら成功
    if complete_episodes >= 10:
        print('10回連続成功')
        episode_final = True  # 次の試行を描画を行う最終試行とする

0 Episode: Finished after 10 steps：10試行の平均step数 = 1.0
1 Episode: Finished after 10 steps：10試行の平均step数 = 2.0
2 Episode: Finished after 9 steps：10試行の平均step数 = 2.9
3 Episode: Finished after 10 steps：10試行の平均step数 = 3.9
4 Episode: Finished after 9 steps：10試行の平均step数 = 4.8
5 Episode: Finished after 10 steps：10試行の平均step数 = 5.8
6 Episode: Finished after 10 steps：10試行の平均step数 = 6.8
7 Episode: Finished after 8 steps：10試行の平均step数 = 7.6
8 Episode: Finished after 10 steps：10試行の平均step数 = 8.6
9 Episode: Finished after 10 steps：10試行の平均step数 = 9.6
10 Episode: Finished after 10 steps：10試行の平均step数 = 9.6
11 Episode: Finished after 9 steps：10試行の平均step数 = 9.5
12 Episode: Finished after 10 steps：10試行の平均step数 = 9.6
13 Episode: Finished after 10 steps：10試行の平均step数 = 9.6
14 Episode: Finished after 10 steps：10試行の平均step数 = 9.7
15 Episode: Finished after 9 steps：10試行の平均step数 = 9.6
16 Episode: Finished after 9 steps：10試行の平均step数 = 9.5
17 Episode: Finished after 9 steps：10試行の平均step数 = 9.6
18 Episode: Finished after 

In [15]:
env.close()