# DQNで棒倒しを強化学習する

In [1]:
import numpy as np
import matplotlib.pyplot as plt
# %matplotlib inline
import gym
import warnings
warnings.simplefilter('ignore')
from datetime import datetime

In [2]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

In [3]:
def display_frames_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
        
    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
    anim.save('movie_cartpole_DQN{}.mp4'.format(datetime.now().strftime('%Y%m%d%H%M%S')))
#     display(display_animation(anim, default_mode='loop'))

In [4]:
from collections import namedtuple

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [6]:
ENV = 'CartPole-v0' # 課題名
GAMMA = 0.99 # 時間割引率
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 500 # 最大試行回数

In [7]:
class ReplayMemory:
    """
    経験を保存するメモリクラスを定義
    """
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY # メモリ最大の長さ
        self.memory = [] # 経験を保存する
        self.index = 0 # 保存するinndexを示す
        
    def push(self, state, action, state_next, reward):
        """
        transitionをメモリに保存する
        """
        if len(self.memory) < self.capacity:
            # メモリがいっぱいでなければ足す
            self.memory.append(None)

        # 値とフィールド名をペアにして保存する
        self.memory[self.index] = Transition(state, action, state_next, reward)
        # 保存するindexをずらす
        self.index = (self.index + 1) % self.capacity
        
    def sample(self, batch_size):
        """
        バッチサイズ分だけランダムに保存内容を取り出す
        """
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [8]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

In [9]:
BATCH_SIZE = 32
CAPACITY = 10000

In [10]:
class Brain:
    """
    DQNを実行する。
    Q関数をディープラーニングのネットワークをクラスとして定義する
    """
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions # 行動は2つ(右or左)
        self.memory = ReplayMemory(CAPACITY) # 経験を記憶する
        
        # ニューラルネットワークを定義
        # 4→32→32→2
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32, 32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))
        
        print(self.model) # ネットワークの形を出力
        
        # 最適化手法の設定
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        
    def replay(self):
        """
        Experience Replayでネットワークの結合パラメータを学習
        """
        # メモリサイズの確認
        # メモリサイズがミニバッチより小さければ何もしない
        if len(self.memory) < BATCH_SIZE:
            return
        
        # ミニバッチの作成
        # メモリからミニバッチ分のデータを取得する
        transitions = self.memory.sample(BATCH_SIZE)
        # (state, actionn, state_next, reward) x BATCH_SIZEにする
        batch = Transition(*zip(*transitions))
        # 各変数の要素をミニバッチに対応する形に変形する
        # catは結合
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        # 教師信号となるQ(st, at)を求める
        # ネットワークを推論モードに切り替える
        self.model.eval()
        # ネットワークが出力したQ(st, at)を求める
        state_action_values = self.model(state_batch).gather(1, action_batch)
        
        # max{Q(st+1, a)}を求める
        # 終了しておらず、next_stateがあるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))
        # 全部0にセットする
        next_state_values = torch.zeros(BATCH_SIZE)
        # 次の状態があるindexの最大Q値を求める
        next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detach()
        
        # 教師となるQ(st, at)をQ学習の式から求める
        expected_state_action_values = reward_batch + GAMMA * next_state_values
        
        # 結合パラメータの更新
        # ネットワークを訓練モードに切り替える
        self.model.train()
        
        # 損失関数の計算
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
        
        # 結合パラメータを更新
        # 勾配をリセット
        self.optimizer.zero_grad()
        # バックプロパゲーションを計算
        loss.backward()
        # 結合パラメータを更新
        self.optimizer.step()
        
    def decide_action(self, state, episode):
        """
        現在の状態に応じて行動を決定する
        """
        # ε-greedy法で徐々に最適行動のみを採用する
        epsilon = 0.5 * (1 / (episode + 1))
        
        if epsilon <= np.random.uniform(0, 1):
            # ネットワークを推論モードに切り替える
            self.model.eval()
            with torch.no_grad():
                # ネットワークの最大値のindexを取り出す
                action = self.model(state).max(1)[1].view(1, 1)
        else:
            # 0,1の行動をランダムに返す
            action = torch.LongTensor([[random.randrange(self.num_actions)]])
            
        return action

In [11]:
class Agent:
    """
    エージェントクラス
    """
    def __init__(self, num_states, num_actions):
        """
        課題の状態と行動の数を設定する
        """
        # エージェントが行動を決定するための頭脳を生成
        self.brain = Brain(num_states, num_actions)
        
    def update_q_function(self):
        """
        Q関数の更新
        """
        self.brain.replay()
        
    def get_action(self, state, episode):
        """
        行動の決定
        """
        action = self.brain.decide_action(state, episode)
        return action
    
    def memorize(self, state, action, state_next, reward):
        """
        memoryオブジェクトにstatte,action,state_next,rewardを保存する
        """
        self.brain.memory.push(state, action, state_next, reward)

In [12]:
class Environment:
    """
    環境クラス
    """
    def __init__(self):
        # 実行する課題を設定
        self.env = gym.make(ENV)
        # 課題の状態数(4)を取得
        num_states = self.env.observation_space.shape[0]
        # 行動数(2)を取得
        num_actions = self.env.action_space.n
        # 環境内で行動するAgentを生成
        self.agent = Agent(num_states, num_actions)
        
    def run(self):
        """
        実行
        """
        # 10試行分の立ち続けたstep数を格納する
        episode_10_list = np.zeros(10)
        # 195step以上立ち続けた試行数
        complete_episodes = 0
        # 最後の試行フラグ
        episode_final = False
        # 画像を格納する
        frames = []
        
        # 最大試行数分繰り返す
        for episode in range(NUM_EPISODES):
            # 環境の初期化
            observation = self.env.reset()
            
            # 観測を状態sとして使用
            state = observation
            state = torch.from_numpy(state).type(torch.FloatTensor)
            state = torch.unsqueeze(state, 0)
            
            # 1エピソードのループ
            for step in range(MAX_STEPS):
                # 最終試行では画像を保存する
                if episode_final:
                    frames.append(self.env.render(mode='rgb_array'))
                # 行動を求める
                action = self.agent.get_action(state, episode)
                
                # atの実行によりst+1とdoneフラグを求める
                observation_next, _, done, _ = self.env.step(action.item())
                
                # 報酬を与える。episodeの終了評価とstate_nextの設定
                # ステップ数が200を超える、または一定角度以上傾くとdone=Falseとなる
                if done:
                    state_next = None
                    
                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))
                    
                    # 失敗ならば報酬-1を与える
                    if step < 195:
                        reward = torch.FloatTensor([-1.0])
                        complete_episodes = 0
                    else: # 成功ならば報酬1を与える
                        reward = torch.FloatTensor([1.0])
                        complete_episodes = complete_episodes + 1
                else:
                    # 普段は報酬0
                    reward = torch.FloatTensor([0.0])
                    # 観測を状態とする
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)
                    
                # メモリに経験を追加
                self.agent.memorize(state, action, state_next, reward)
                # Q関数の更新
                self.agent.update_q_function()
                # 観測の更新
                state = state_next

                # 終了時の処理
                if done:
                    print('{} Episode: Finished after {} steps: 10試行の平均step数: {:.1f}'.format(episode, step + 1, episode_10_list.mean()))
                    break
                    
            if episode_final is True:
                display_frames_as_gif(frames)
                break

            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True

In [None]:
for i in range(20):
    cartpole_env = Environment()
    cartpole_env.run()

Sequential(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (relu1): ReLU()
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (relu2): ReLU()
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)
0 Episode: Finished after 16 steps: 10試行の平均step数: 1.6
1 Episode: Finished after 9 steps: 10試行の平均step数: 2.5
2 Episode: Finished after 8 steps: 10試行の平均step数: 3.3
3 Episode: Finished after 10 steps: 10試行の平均step数: 4.3
4 Episode: Finished after 10 steps: 10試行の平均step数: 5.3
5 Episode: Finished after 11 steps: 10試行の平均step数: 6.4
6 Episode: Finished after 8 steps: 10試行の平均step数: 7.2
7 Episode: Finished after 10 steps: 10試行の平均step数: 8.2
8 Episode: Finished after 9 steps: 10試行の平均step数: 9.1
9 Episode: Finished after 12 steps: 10試行の平均step数: 10.3
10 Episode: Finished after 10 steps: 10試行の平均step数: 9.7
11 Episode: Finished after 9 steps: 10試行の平均step数: 9.7
12 Episode: Finished after 9 steps: 10試行の平均step数: 9.8
13 Episode: Finished after 11 steps: 10試行の平均step数: 9.9
14 Episode: Finis