<a href="https://colab.research.google.com/github/Mikaner/reinforcement/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get -qq -y install libcusparse8.0 libnvrtc8.0 libnvtoolsext1 > /dev/null
!ln -snf /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so.8.0 /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so
!apt -qq install xvfb freeglut3-dev ffmpeg> /dev/null
!pip -q install gym
!pip -q install JSAnimation
!pip -q install pyglet
!pip -q install pyopengl
!pip -q install pyvirtualdisplay

E: Unable to locate package libcusparse8.0
E: Couldn't find any package by glob 'libcusparse8.0'
E: Couldn't find any package by regex 'libcusparse8.0'
E: Unable to locate package libnvrtc8.0
E: Couldn't find any package by glob 'libnvrtc8.0'
E: Couldn't find any package by regex 'libnvrtc8.0'


  Building wheel for JSAnimation (setup.py) ... [?25l[?25hdone


In [2]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1024, 768))
display.start()
import os
os.environ["DISPLAY"] = f":{display.display}"
# https://github.com/ponty/PyVirtualDisplay/issues/54

In [3]:
# 動画の描画関数の宣言
# 参考URL: http://nbviewer.jupyter.org/github/patrickmineault/xcorr-notebooks/blob/master/Render%20OpenAI%20gym%20as%20GIF.ipynb
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
#from IPython.display import display
from IPython.display import HTML

def make_anim(frames):
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), 
               dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])
    
    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)
    return anim

def save_frames_as_gif(frames):
    """
    DISPLAYs a list of frames as a gif, with controls
    """
    
    anim = make_anim(frames)
    anim.save('movie_cartpole_DQN.mp4')
    #display(display_animation(anim, default_mode='loop'))
    return anim.to_jshtml()

In [4]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [5]:
# namedtupleの実装
# namedtupleにて値とフィールド名をペアで格納できる
# 以下は使用例

from collections import namedtuple

Tr = namedtuple('tr', ('name_a', 'value_b'))
Tr_object = Tr('名前Aです', 100)

print(Tr_object)
print(Tr_object.value_b)

tr(name_a='名前Aです', value_b=100)
100


In [6]:
# namedtupleを生成
from collections import namedtuple

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [7]:
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500

In [8]:
# ミニバッチ学習を実現するために
# 経験を保存するメモリクラスを定義します

class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0

    def push(self, state, action, state_next, reward):
        '''transition = (state, action, state_next, reward)をメモリに保存する'''

        if len(self.memory) < self.capacity:
            self.memory.append(None) # メモリが満タンでないときは足す
            
        # namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity # 保存するindexを1つずらす(最大の場合は最初に上書き)

    def sample(self, batch_size):
        '''batch_size分だけ、ランダムに保存内容を取り出す'''
        return random.sample(self.memory, batch_size)

    def __len__(self):
        '''関数lenに対して、現在の変数memoryの長さを返す'''
        return len(self.memory)

In [10]:
# エージェントが持つ脳となるクラス。DQNを実行する。
# Q関数をディープラーニングのネットワークをクラスとして定義

import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000  # メモリの最大値(メモリが足りなくなるなんてことがあるのかしら)

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions # CartPoleの行動(右に左に押す)の2を取得

        # 経験を記憶するメモリオブジェクトを生成
        self.memory = ReplayMemory(CAPACITY)
        
        # ニューラルネットワークを構築
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32, 32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))

        print(self.model)

        # 最適化手法の設定
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)

    def replay(self):
        '''Experience Replayでネットワークの結合パラメータを学習'''

        # -------------------------------------------------
        # 1. メモリサイズの確認
        # -------------------------------------------------
        # 1.1 メモリサイズがミニバッチより小さい間は何もしない
        if len(self.memory) < BATCH_SIZE:
            return
        
        # -------------------------------------------------
        # 2. ミニバッチの作成
        # -------------------------------------------------
        # 2.1 メモリからミニバッチ分のデータを取り出す
        transitions = self.memory.sample(BATCH_SIZE)

        # 2.2 各変数をミニバッチに対応する形に変形
        # transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
        # つまり、(state, action, state_next, reward)×BATCH_SIZE
        # これをミニバッチにしたい。つまり
        # (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
        # ちなみにそれぞれは「各要素の数」を表してそうですね。
        batch = Transition(*zip(*transitions))
        # 例えば、
        # step (state, action, state_next, reward)が
        #    1 (    2,      0,          1,      1)
        #    2 (    1,      1,          2,      0)となっている場合
        # batch内は
        # ([2,1], [0,1], [1,2], [1,0])となっている感じかしら。

        # 2.3 各変数の要素をミニバッチに対応する形に変形する。
        # 例えばstateの場合、[torch.FloatTensor of size 1×4]がBATCH_SIZE分並んでいる(カートの位置、速度、棒の角度、角速度)
        # のですが、それを torch.FloatTensor of size BATCH_SIZE×4に変換します。←要は1をBATCH_SIZE数に変更するということですね
        # catはConcatenates(結合)のことです。
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        # 要はstateが
        # ([[20, 3, 1/2pi, 0.5], [22, 2, 3/4pi, 1]])と並んでいるのを
        # ([[20, 22], [3, 2], [1/2pi, 3/4pi], [0.5, 1]])みたいに変えるって事かな

        # -------------------------------------------------
        # 3. 教師信号となるQ(s_t, a_t)値を求める
        # -------------------------------------------------
        # 3.1 ネットワークを推論モードに切り替える
        self.model.eval()

        # 3.2 ネットワークが出力したQ(s_t, a_t)を求める
        # self.model(state_batch)は、右左の両方のQ値を出力しており
        # [torch.FloatTensor of size BATCH_SIZE×2]になっている
        # ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
        # それに対応するQ値をgatherで引っ張り出す。 ← gatherで引っ張り出すって具体的な挙動は何をする関数なのだろうか ← おそらくgatherでQ値を引っ張り出すって意味だと思われる。具体的な挙動は下セルに書いた。
        # ようわからんけど、state_batchの時に出力した値からgather使ってどっちを選んでるかを判断するみたいな感じかしら
        # いや、state_batchのデータで推論して、それのアクション(左右)とaction_batchと対応付けているんじゃないかな
        # まあ簡単に言うと、action_batchで判断したインデックスのQ値を求めてるっぽい

        state_action_values = self.model(state_batch).gather(1, action_batch)

        print(action_batch)
        print(state_action_values)

        # 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意

        # cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
        # ここlambda使ったワンライナーの悪行風味
        non_final_mask = torch.ByteTensor(
            tuple(map(lambda s: s is not None, batch.next_state)))
        # まずは全部0にしておく
        next_state_values = torch.zeros(BATCH_SIZE)

        # 次の状態があるindexの最大Q値を求める
        # 出力にアクセスし、max(1)で列方向の最大値の[値、index]を求めます
        # そしてそのQ値(上で取り出した出力[値、index]の0番目)を出力します。
        # detachでその値を取り出します。
        # えっと？つまり？次が存在するもののマックス値を求めてるってことか？
        # いや、正確に言うと、次が存在する状態のQ値の集合(?)のそれぞれの最大を取得しているみたい。
        next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detach()
        
        # 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める (要は誤差を求めるわけですわ)
        expected_state_action_values = reward_batch + GAMMA * next_state_values

        # -------------------------------------------------
        # 4. 結合パラメータの更新
        # -------------------------------------------------
        # 4.1 ネットワークを訓練モードに切り替える
        self.model.train()

        # 4.2 損失関数を計算する (smooth_l1_lossはHuberloss) ← Huberlossとは
        # expected_state_action_valusは
        # sizeが[minibatch]になっているので、unsqueezeで[minibatch × 1]へ
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze (1)) # この(1)って引数？ 間のスペースいる？

        # 4.3 結合パラメータを更新する
        self.optimizer.zero_grad() # 勾配をリセット ← なんでリセットするんだっけ。前の更新したデータが残ってるからか。
        loss.backward() # バックプロパゲーションを計算
        self.optimizer.step() # 結合パラメータを更新

    def decide_action(self, state, episode):
        ''' 現在の状況に応じて、行動を決定する '''
        # ε-greedy法で徐々に最適行動のみを採用する
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.model.eval() # ネットワークを推論モードに切り替える
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1, 1)
            # ネットワークの出力の最大値のindexを取り出します = max(1)[1]
            # .view(1, 1)は[torch.LongTensor of size 1] を size 1x1 に変換します
        
        else:
            # 0, 1の行動をランダムに返す
            action = torch.LongTensor([[random.randrange(self.num_actions)]]) # 0, 1の行動をランダムに返す
            # actionは[torch.LongTensor of size 1x1]の形になります

        return action

In [42]:
# torch.gather(input, dim, index)
t = torch.tensor([[0, 1], [2, 3]])
print(torch.gather(t, 1, torch.tensor([[0, 0],[1, 0]])))
# print(t.gather(1, torch.tensor([[0, 0],[1, 0]])))
# dim = 次元 ( 0は行, 1は列 )
# index = その次元のindex。0は0番目
# input = 解析する元のデータ。torch.tensorのインスタンスの場合はインスタンス.gather()でも動く

tensor([[0, 0],
        [3, 2]])
